diff --git a/store/neurostore/ingest/extracted_features.py b/store/neurostore/ingest/extracted_features.py new file mode 100644 index 00000000..efb0face --- /dev/null +++ b/store/neurostore/ingest/extracted_features.py @@ -0,0 +1,103 @@ +"""Ingest extracted features into the database.""" + +import json +import os.path as op +from pathlib import Path +import hashlib +from dateutil.parser import parse as parse_date + +from neurostore.database import db +from neurostore.models import ( + Pipeline, + PipelineConfig, + PipelineRun, + PipelineRunResult, +) + + +def ingest_feature(feature_directory): + """Ingest demographics data into the database.""" + # read pipeline_info.json from the base feature directory + with open(op.join(feature_directory, "pipeline_info.json")) as f: + pipeline_info = json.load(f) + + # search if there is an existing pipeline with the same name and version + pipeline = ( + db.session.query(Pipeline) + .filter( + Pipeline.name == pipeline_info["name"], + Pipeline.version == pipeline_info["version"], + ) + .first() + ) + # create a pipeline if it does not exist + if not pipeline: + pipeline = Pipeline( + name=pipeline_info["name"], + version=pipeline_info["version"], + description=pipeline_info.get("description"), + study_dependent=pipeline_info.get("study_dependent", False), + ace_compatible=pipeline_info.get("ace_compatible", False), + pubget_compatible=pipeline_info.get("pubget_compatible", False), + derived_from=pipeline_info.get("derived_from", None), + ) + db.session.add(pipeline) + + # search within the pipeline and see if there are any existing pipeline configs + # that match the "arguements" field in the pipeline_info.json + # create a hash of the config arguments + config_hash = hashlib.sha256( + json.dumps(pipeline_info["arguments"]).encode() + ).hexdigest() + pipeline_config = ( + db.session.query(PipelineConfig) + .filter( + PipelineConfig.pipeline_id == pipeline.id, + PipelineConfig.config_hash == config_hash, + ) + .first() + ) + # create a pipeline config if it does not exist + if not pipeline_config: + pipeline_config = PipelineConfig( + pipeline_id=pipeline.id, + config=pipeline_info["arguments"], + config_hash=config_hash, + ) + db.session.add(pipeline_config) + + # create a new pipeline run + pipeline_run = PipelineRun( + pipeline_id=pipeline.id, + config_id=pipeline_config.id, + ) + + # get a list of all the paper directories in the feature directory + paper_dirs = [d for d in Path(feature_directory).iterdir() if d.is_dir()] + + # for each subject directory, read the results.json file and the info.json file + pipeline_run_results = [] + for paper_dir in paper_dirs: + with open(op.join(paper_dir, "results.json")) as f: + results = json.load(f) + + with open(op.join(paper_dir, "info.json")) as f: + info = json.load(f) + + # use the directory name as the base_study_id + base_study_id = paper_dir.name + # create a new result record + pipeline_run_results.append( + PipelineRunResult( + base_study_id=base_study_id, + data=results, + date_executed=parse_date(info["date"]), + file_inputs=info["inputs"], + run=pipeline_run, + ) + ) + + db.session.add(pipeline_run) + db.session.add_all(pipeline_run_results) + + db.session.commit() diff --git a/store/neurostore/models/__init__.py b/store/neurostore/models/__init__.py index 6a4fc7d7..c20e1a9f 100644 --- a/store/neurostore/models/__init__.py +++ b/store/neurostore/models/__init__.py @@ -12,6 +12,10 @@ AnnotationAnalysis, PointValue, AnalysisConditions, + Pipeline, + PipelineConfig, + PipelineRun, + PipelineRunResult, ) from .auth import User, Role @@ -31,4 +35,8 @@ "AnalysisConditions", "User", "Role", + "Pipeline", + "PipelineConfig", + "PipelineRun", + "PipelineRunResult", ] diff --git a/store/neurostore/models/data.py b/store/neurostore/models/data.py index a3866d21..1225d2cf 100644 --- a/store/neurostore/models/data.py +++ b/store/neurostore/models/data.py @@ -279,6 +279,7 @@ class Study(BaseMixin, db.Model): level = db.Column(db.String) metadata_ = db.Column(JSONB) source = db.Column(db.String, index=True) + base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True) source_id = db.Column(db.String, index=True) source_updated_at = db.Column(db.DateTime(timezone=True)) base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True) @@ -538,6 +539,59 @@ class PointValue(BaseMixin, db.Model): user = relationship("User", backref=backref("point_values", passive_deletes=True)) +class Pipeline(BaseMixin, db.Model): + __tablename__ = "pipelines" + + name = db.Column(db.String) + description = db.Column(db.String) + version = db.Column(db.String) + study_dependent = db.Column(db.Boolean, default=False) + ace_compatible = db.Column(db.Boolean, default=False) + pubget_compatible = db.Column(db.Boolean, default=False) + derived_from = db.Column(db.Text) + + +class PipelineConfig(BaseMixin, db.Model): + __tablename__ = "pipeline_configs" + + pipeline_id = db.Column( + db.Text, db.ForeignKey("pipelines.id", ondelete="CASCADE"), index=True + ) + config = db.Column(JSONB) + config_hash = db.Column(db.String, index=True) + pipeline = relationship( + "Pipeline", backref=backref("configs", passive_deletes=True) + ) + + +class PipelineRun(BaseMixin, db.Model): + __tablename__ = "pipeline_runs" + + pipeline_id = db.Column( + db.Text, db.ForeignKey("pipelines.id", ondelete="CASCADE"), index=True + ) + config_id = db.Column( + db.Text, db.ForeignKey("pipeline_configs.id", ondelete="CASCADE"), index=True + ) + config = relationship( + "PipelineConfig", backref=backref("runs", passive_deletes=True) + ) + run_index = db.Column(db.Integer()) + + +class PipelineRunResult(BaseMixin, db.Model): + __tablename__ = "pipeline_run_results" + + run_id = db.Column( + db.Text, db.ForeignKey("pipeline_runs.id", ondelete="CASCADE"), index=True + ) + base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True) + date_executed = db.Column(db.DateTime(timezone=True)) + data = db.Column(JSONB) + file_inputs = db.Column(JSONB) + run = relationship("PipelineRun", backref=backref("results", passive_deletes=True)) + + # from . import event_listeners # noqa E402 # del event_listeners diff --git a/store/neurostore/resources/base.py b/store/neurostore/resources/base.py index 76a0b076..a5f700b1 100644 --- a/store/neurostore/resources/base.py +++ b/store/neurostore/resources/base.py @@ -619,7 +619,7 @@ def search(self): validate_search_query(s) except errors.SyntaxError as e: abort(400, description=e.args[0]) - tsquery = func.to_tsquery('english', pubmed_to_tsquery(s)) + tsquery = func.to_tsquery("english", pubmed_to_tsquery(s)) q = q.filter(m._ts_vector.op("@@")(tsquery)) # Alternatively (or in addition), search on individual fields. diff --git a/store/neurostore/tests/conftest.py b/store/neurostore/tests/conftest.py index 1a7417d8..bfda7d7b 100644 --- a/store/neurostore/tests/conftest.py +++ b/store/neurostore/tests/conftest.py @@ -1,4 +1,6 @@ import pytest +import random +import json from os import environ from neurostore.models.data import Analysis, Condition from sqlalchemy.orm import scoped_session, sessionmaker @@ -588,6 +590,52 @@ def simple_neurosynth_annotation(session, ingest_neurosynth): return smol_annot +@pytest.fixture(scope="function") +def create_demographic_features(session, ingest_neurosynth, tmp_path): + output_dir = tmp_path / "output" / "demographics" / "v1.0.0" + output_dir.mkdir(exist_ok=True, parents=True) + pipeline_info = { + "name": "demographics", + "version": "v1.0.0", + "description": "demographic features", + "study_dependent": False, + "ace_compatible": True, + "pubget_compatible": True, + "derived_from": None, + "arguments": {"parallel": 1}, + } + with open(output_dir / "pipeline_info.json", "w") as f: + json.dump(pipeline_info, f) + + studies = BaseStudy.query.all() + diseases = ["schizophrenia", "bipolar disorder", "depression", "healthy"] + studies_data = [ + [ + {"age": random.randint(18, 100), "group": group} + for group in random.sample(diseases, k=random.randint(1, 2)) + ] + for study in studies + ] + + for study, study_data in zip(studies, studies_data): + study_dir = output_dir / study.id + study_dir.mkdir(exist_ok=True, parents=True) + with open(study_dir / "results.json", "w") as f: + json.dump({"predictions": study_data}, f) + with open(study_dir / "info.json", "w") as f: + json.dump( + { + "inputs": { + f"/path/to/input/{study.id}.txt": f"md5{random.randint(0, 100)}" + }, + "date": f"2021-01-{random.randint(1, 30)}", + }, + f, + ) + + return output_dir + + """ Queries for testing """ @@ -603,9 +651,7 @@ def simple_neurosynth_annotation(session, ingest_neurosynth): 'OR ("ASD")) AND (("decision*" OR "Dec', "Unmatched parentheses", ), - ( - 'smoking AND NOT memory', "Consecutive operators are not allowed" - ) + ("smoking AND NOT memory", "Consecutive operators are not allowed"), ] valid_queries = [ diff --git a/store/neurostore/tests/test_ingestion.py b/store/neurostore/tests/test_ingestion.py index 7008d5a2..6626616f 100644 --- a/store/neurostore/tests/test_ingestion.py +++ b/store/neurostore/tests/test_ingestion.py @@ -1,5 +1,7 @@ """Test Ingestion Functions""" +from neurostore.ingest.extracted_features import ingest_feature + def test_ingest_ace(ingest_neurosynth, ingest_ace, session): pass @@ -11,3 +13,7 @@ def test_ingest_neurovault(ingest_neurovault, session): def test_ingest_neuroquery(ingest_neuroquery, session): pass + + +def test_ingest_features(create_demographic_features, session): + ingest_feature(create_demographic_features)