From 0cf1f6287f59eb9adc69b9c50b6fee241b843361 Mon Sep 17 00:00:00 2001 From: James Kent Date: Mon, 4 Nov 2024 16:43:46 -0600 Subject: [PATCH 01/10] wip: add feature tables --- store/neurostore/models/data.py | 51 +++++++++++++++++++++++++++++- store/neurostore/tests/conftest.py | 27 ++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/store/neurostore/models/data.py b/store/neurostore/models/data.py index a3866d21..41d37118 100644 --- a/store/neurostore/models/data.py +++ b/store/neurostore/models/data.py @@ -278,7 +278,7 @@ class Study(BaseMixin, db.Model): public = db.Column(db.Boolean, default=True) level = db.Column(db.String) metadata_ = db.Column(JSONB) - source = db.Column(db.String, index=True) + source = db.Column(db.String, index=True)base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True) source_id = db.Column(db.String, index=True) source_updated_at = db.Column(db.DateTime(timezone=True)) base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True) @@ -538,6 +538,55 @@ class PointValue(BaseMixin, db.Model): user = relationship("User", backref=backref("point_values", passive_deletes=True)) +class Pipeline(BaseMixin, db.Model): + __tablename__ = "pipelines" + + name = db.Column(db.String) + description = db.Column(db.String) + version = db.Column(db.String) + study_depenedent = db.Column(db.Boolean, default=False) + ace_compatible = db.Column(db.Boolean, default=False) + pubget_compatible = db.Column(db.Boolean, default=False) + derived_from = db.Column(db.Text) + + +class PipelineConfig(BaseMixin, db.Model): + __tablename__ = "pipeline_configs" + + pipeline_id = db.Column( + db.Text, db.ForeignKey("pipelines.id", ondelete="CASCADE"), index=True + ) + config = db.Column(JSONB) + config_hash = db.Column(db.String, index=True) + pipeline = relationship("Pipeline", backref=backref("configs", passive_deletes=True)) + + +class PipelineRun(BaseMixin, db.Model): + __tablename__ = "pipeline_runs" + + pipeline_id = db.Column( + db.Text, db.ForeignKey("pipelines.id", ondelete="CASCADE"), index=True + ) + config_id = db.Column( + db.Text, db.ForeignKey("pipeline_configs.id", ondelete="CASCADE"), index=True + ) + config = relationship("PipelineConfig", backref=backref("runs", passive_deletes=True)) + run_index = db.Column(db.Integer()) + + +class PipelineRunResult(BaseMixin, db.Model): + __tablename__ = "pipeline_run_results" + + run_id = db.Column( + db.Text, db.ForeignKey("pipeline_runs.id", ondelete="CASCADE"), index=True + ) + base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True) + feature_index = db.Column(db.Integer) # the same categories of information can be extracted multiple times from a single paper (e.g., multiple demographic groups, multiple software packages, etc) + feature_group = db.Column(db.String) # task, disease, software, age + feature = db.Column(db.String) # stroop task, schizophrenia, fsl + value = db.Column(db.Float) # 0.67, 0.3, 0.5 (some measure of confidence for the result) + run = relationship("PipelineRun", backref=backref("results", passive_deletes=True)) + # from . import event_listeners # noqa E402 # del event_listeners diff --git a/store/neurostore/tests/conftest.py b/store/neurostore/tests/conftest.py index 099ff129..7b896383 100644 --- a/store/neurostore/tests/conftest.py +++ b/store/neurostore/tests/conftest.py @@ -1,4 +1,6 @@ import pytest +import random +import json from os import environ from neurostore.models.data import Analysis, Condition from sqlalchemy.orm import scoped_session, sessionmaker @@ -586,3 +588,28 @@ def simple_neurosynth_annotation(session, ingest_neurosynth): session.commit() return smol_annot + + +@pytest.fixture(scope="function") +def create_demographic_features(session, ingest_neurosynth, tmp_path): + output_dir = tmp_path / "output" / "demographics" / "v1.0.0" + output_dir.mkdir(exist_ok=True, parents=True) + studies = Study.query.all() + diseases = ["schizophrenia", "bipolar disorder", "depression", "healthy"] + studies_data = [ + [ + { + "age": random.randint(18, 100), + "group": group + } for group in random.sample(diseases, k=random.randint(1, 2)) + ] for study in studies + ] + + for study, study_data in zip(studies, studies_data): + study_dir = output_dir / study.id + with open(study_dir / "results.json", "w") as f: + for entry in study_data: + json.dump(entry, f) + f.write('\n') + + return output_dir From 3c2447ce78818e5e7cbefad0cdcaee9b18dea027 Mon Sep 17 00:00:00 2001 From: James Kent Date: Thu, 9 Jan 2025 13:19:32 -0600 Subject: [PATCH 02/10] wip: add functioning tests --- store/neurostore/models/__init__.py | 8 ++++++++ store/neurostore/models/data.py | 15 +++++++++------ store/neurostore/tests/conftest.py | 16 +++++++++++++++- store/neurostore/tests/test_ingestion.py | 6 +++++- 4 files changed, 37 insertions(+), 8 deletions(-) diff --git a/store/neurostore/models/__init__.py b/store/neurostore/models/__init__.py index 6a4fc7d7..c20e1a9f 100644 --- a/store/neurostore/models/__init__.py +++ b/store/neurostore/models/__init__.py @@ -12,6 +12,10 @@ AnnotationAnalysis, PointValue, AnalysisConditions, + Pipeline, + PipelineConfig, + PipelineRun, + PipelineRunResult, ) from .auth import User, Role @@ -31,4 +35,8 @@ "AnalysisConditions", "User", "Role", + "Pipeline", + "PipelineConfig", + "PipelineRun", + "PipelineRunResult", ] diff --git a/store/neurostore/models/data.py b/store/neurostore/models/data.py index 41d37118..6079d207 100644 --- a/store/neurostore/models/data.py +++ b/store/neurostore/models/data.py @@ -278,7 +278,8 @@ class Study(BaseMixin, db.Model): public = db.Column(db.Boolean, default=True) level = db.Column(db.String) metadata_ = db.Column(JSONB) - source = db.Column(db.String, index=True)base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True) + source = db.Column(db.String, index=True) + base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True) source_id = db.Column(db.String, index=True) source_updated_at = db.Column(db.DateTime(timezone=True)) base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True) @@ -544,7 +545,7 @@ class Pipeline(BaseMixin, db.Model): name = db.Column(db.String) description = db.Column(db.String) version = db.Column(db.String) - study_depenedent = db.Column(db.Boolean, default=False) + study_dependent = db.Column(db.Boolean, default=False) ace_compatible = db.Column(db.Boolean, default=False) pubget_compatible = db.Column(db.Boolean, default=False) derived_from = db.Column(db.Text) @@ -581,10 +582,12 @@ class PipelineRunResult(BaseMixin, db.Model): db.Text, db.ForeignKey("pipeline_runs.id", ondelete="CASCADE"), index=True ) base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True) - feature_index = db.Column(db.Integer) # the same categories of information can be extracted multiple times from a single paper (e.g., multiple demographic groups, multiple software packages, etc) - feature_group = db.Column(db.String) # task, disease, software, age - feature = db.Column(db.String) # stroop task, schizophrenia, fsl - value = db.Column(db.Float) # 0.67, 0.3, 0.5 (some measure of confidence for the result) + data = db.Column(JSONB) + file_inputs = db.Column(JSONB) + # feature_index = db.Column(db.Integer) # the same categories of information can be extracted multiple times from a single paper (e.g., multiple demographic groups, multiple software packages, etc) + # feature_group = db.Column(db.String) # task, disease, software, age + # feature = db.Column(db.String) # stroop task, schizophrenia, fsl + # value = db.Column(db.Float) # 0.67, 0.3, 0.5 (some measure of confidence for the result) run = relationship("PipelineRun", backref=backref("results", passive_deletes=True)) # from . import event_listeners # noqa E402 diff --git a/store/neurostore/tests/conftest.py b/store/neurostore/tests/conftest.py index 9184f5fd..d550af4d 100644 --- a/store/neurostore/tests/conftest.py +++ b/store/neurostore/tests/conftest.py @@ -594,7 +594,20 @@ def simple_neurosynth_annotation(session, ingest_neurosynth): def create_demographic_features(session, ingest_neurosynth, tmp_path): output_dir = tmp_path / "output" / "demographics" / "v1.0.0" output_dir.mkdir(exist_ok=True, parents=True) - studies = Study.query.all() + pipeline_info = { + "name": "demographics", + "version": "v1.0.0", + "description": "demographic features", + "study_dependent": False, + "ace_compatible": True, + "pubget_compatible": True, + "derived_from": None, + "arguments": {"parallel": 1}, + } + with open(output_dir / "pipeline_info.json", "w") as f: + json.dump(pipeline_info, f) + + studies = BaseStudy.query.all() diseases = ["schizophrenia", "bipolar disorder", "depression", "healthy"] studies_data = [ [ @@ -607,6 +620,7 @@ def create_demographic_features(session, ingest_neurosynth, tmp_path): for study, study_data in zip(studies, studies_data): study_dir = output_dir / study.id + study_dir.mkdir(exist_ok=True, parents=True) with open(study_dir / "results.json", "w") as f: for entry in study_data: json.dump(entry, f) diff --git a/store/neurostore/tests/test_ingestion.py b/store/neurostore/tests/test_ingestion.py index 7008d5a2..58de079c 100644 --- a/store/neurostore/tests/test_ingestion.py +++ b/store/neurostore/tests/test_ingestion.py @@ -1,5 +1,5 @@ """Test Ingestion Functions""" - +from neurostore.ingest.extracted_features import ingest_feature def test_ingest_ace(ingest_neurosynth, ingest_ace, session): pass @@ -11,3 +11,7 @@ def test_ingest_neurovault(ingest_neurovault, session): def test_ingest_neuroquery(ingest_neuroquery, session): pass + + +def test_ingest_features(create_demographic_features, session): + ingest_feature(create_demographic_features, session) From 35081ef4d61a5d4cfb6f88eb4820c8149908b258 Mon Sep 17 00:00:00 2001 From: James Kent Date: Mon, 13 Jan 2025 15:25:59 -0600 Subject: [PATCH 03/10] fix tests --- store/neurostore/models/data.py | 1 + store/neurostore/tests/conftest.py | 11 ++++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/store/neurostore/models/data.py b/store/neurostore/models/data.py index 6079d207..ae8ebbbf 100644 --- a/store/neurostore/models/data.py +++ b/store/neurostore/models/data.py @@ -582,6 +582,7 @@ class PipelineRunResult(BaseMixin, db.Model): db.Text, db.ForeignKey("pipeline_runs.id", ondelete="CASCADE"), index=True ) base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True) + date_executed = db.Column(db.DateTime(timezone=True)) data = db.Column(JSONB) file_inputs = db.Column(JSONB) # feature_index = db.Column(db.Integer) # the same categories of information can be extracted multiple times from a single paper (e.g., multiple demographic groups, multiple software packages, etc) diff --git a/store/neurostore/tests/conftest.py b/store/neurostore/tests/conftest.py index d550af4d..e7084cce 100644 --- a/store/neurostore/tests/conftest.py +++ b/store/neurostore/tests/conftest.py @@ -622,9 +622,14 @@ def create_demographic_features(session, ingest_neurosynth, tmp_path): study_dir = output_dir / study.id study_dir.mkdir(exist_ok=True, parents=True) with open(study_dir / "results.json", "w") as f: - for entry in study_data: - json.dump(entry, f) - f.write('\n') + json.dump({"predictions": study_data}, f) + with open(study_dir / "info.json", "w") as f: + json.dump( + { + "inputs": {f"/path/to/input/{study.id}.txt": f"md5{random.randint(0, 100)}"}, + "date": f"2021-01-{random.randint(1, 30)}", + }, f + ) return output_dir From 3796e662b76f815a38ab83e4d203055a9c34a2be Mon Sep 17 00:00:00 2001 From: James Kent Date: Mon, 13 Jan 2025 15:53:03 -0600 Subject: [PATCH 04/10] run black --- store/neurostore/models/data.py | 9 +++++++-- store/neurostore/resources/base.py | 2 +- store/neurostore/tests/conftest.py | 23 ++++++++++++----------- store/neurostore/tests/test_ingestion.py | 2 ++ 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/store/neurostore/models/data.py b/store/neurostore/models/data.py index ae8ebbbf..809219ea 100644 --- a/store/neurostore/models/data.py +++ b/store/neurostore/models/data.py @@ -559,7 +559,9 @@ class PipelineConfig(BaseMixin, db.Model): ) config = db.Column(JSONB) config_hash = db.Column(db.String, index=True) - pipeline = relationship("Pipeline", backref=backref("configs", passive_deletes=True)) + pipeline = relationship( + "Pipeline", backref=backref("configs", passive_deletes=True) + ) class PipelineRun(BaseMixin, db.Model): @@ -571,7 +573,9 @@ class PipelineRun(BaseMixin, db.Model): config_id = db.Column( db.Text, db.ForeignKey("pipeline_configs.id", ondelete="CASCADE"), index=True ) - config = relationship("PipelineConfig", backref=backref("runs", passive_deletes=True)) + config = relationship( + "PipelineConfig", backref=backref("runs", passive_deletes=True) + ) run_index = db.Column(db.Integer()) @@ -591,6 +595,7 @@ class PipelineRunResult(BaseMixin, db.Model): # value = db.Column(db.Float) # 0.67, 0.3, 0.5 (some measure of confidence for the result) run = relationship("PipelineRun", backref=backref("results", passive_deletes=True)) + # from . import event_listeners # noqa E402 # del event_listeners diff --git a/store/neurostore/resources/base.py b/store/neurostore/resources/base.py index 76a0b076..a5f700b1 100644 --- a/store/neurostore/resources/base.py +++ b/store/neurostore/resources/base.py @@ -619,7 +619,7 @@ def search(self): validate_search_query(s) except errors.SyntaxError as e: abort(400, description=e.args[0]) - tsquery = func.to_tsquery('english', pubmed_to_tsquery(s)) + tsquery = func.to_tsquery("english", pubmed_to_tsquery(s)) q = q.filter(m._ts_vector.op("@@")(tsquery)) # Alternatively (or in addition), search on individual fields. diff --git a/store/neurostore/tests/conftest.py b/store/neurostore/tests/conftest.py index e7084cce..bfda7d7b 100644 --- a/store/neurostore/tests/conftest.py +++ b/store/neurostore/tests/conftest.py @@ -606,16 +606,15 @@ def create_demographic_features(session, ingest_neurosynth, tmp_path): } with open(output_dir / "pipeline_info.json", "w") as f: json.dump(pipeline_info, f) - + studies = BaseStudy.query.all() diseases = ["schizophrenia", "bipolar disorder", "depression", "healthy"] studies_data = [ [ - { - "age": random.randint(18, 100), - "group": group - } for group in random.sample(diseases, k=random.randint(1, 2)) - ] for study in studies + {"age": random.randint(18, 100), "group": group} + for group in random.sample(diseases, k=random.randint(1, 2)) + ] + for study in studies ] for study, study_data in zip(studies, studies_data): @@ -626,13 +625,17 @@ def create_demographic_features(session, ingest_neurosynth, tmp_path): with open(study_dir / "info.json", "w") as f: json.dump( { - "inputs": {f"/path/to/input/{study.id}.txt": f"md5{random.randint(0, 100)}"}, + "inputs": { + f"/path/to/input/{study.id}.txt": f"md5{random.randint(0, 100)}" + }, "date": f"2021-01-{random.randint(1, 30)}", - }, f + }, + f, ) return output_dir + """ Queries for testing """ @@ -648,9 +651,7 @@ def create_demographic_features(session, ingest_neurosynth, tmp_path): 'OR ("ASD")) AND (("decision*" OR "Dec', "Unmatched parentheses", ), - ( - 'smoking AND NOT memory', "Consecutive operators are not allowed" - ) + ("smoking AND NOT memory", "Consecutive operators are not allowed"), ] valid_queries = [ diff --git a/store/neurostore/tests/test_ingestion.py b/store/neurostore/tests/test_ingestion.py index 58de079c..f21b5b9d 100644 --- a/store/neurostore/tests/test_ingestion.py +++ b/store/neurostore/tests/test_ingestion.py @@ -1,6 +1,8 @@ """Test Ingestion Functions""" + from neurostore.ingest.extracted_features import ingest_feature + def test_ingest_ace(ingest_neurosynth, ingest_ace, session): pass From f14f3438bc4e56697135cfd87ae6df41213c0ce8 Mon Sep 17 00:00:00 2001 From: James Kent Date: Mon, 13 Jan 2025 15:58:59 -0600 Subject: [PATCH 05/10] remove commented lines --- store/neurostore/ingest/extracted_features.py | 110 ++++++++++++++++++ store/neurostore/models/data.py | 4 - 2 files changed, 110 insertions(+), 4 deletions(-) create mode 100644 store/neurostore/ingest/extracted_features.py diff --git a/store/neurostore/ingest/extracted_features.py b/store/neurostore/ingest/extracted_features.py new file mode 100644 index 00000000..a982c3a1 --- /dev/null +++ b/store/neurostore/ingest/extracted_features.py @@ -0,0 +1,110 @@ +"""Ingest extracted features into the database.""" + +import json +import os.path as op +import re +from pathlib import Path +import hashlib + +import numpy as np +import pandas as pd +import requests +from scipy import sparse +from dateutil.parser import parse as parse_date +from sqlalchemy import or_ + +from neurostore.database import db +from neurostore.models import ( + Pipeline, + PipelineConfig, + PipelineRun, + PipelineRunResult, +) + + +def ingest_feature(feature_directory, session): + """Ingest demographics data into the database.""" + # read pipeline_info.json from the base feature directory + with open(op.join(feature_directory, "pipeline_info.json")) as f: + pipeline_info = json.load(f) + + # search if there is an existing pipeline with the same name and version + pipeline = ( + session.query(Pipeline) + .filter( + Pipeline.name == pipeline_info["name"], + Pipeline.version == pipeline_info["version"], + ) + .first() + ) + # create a pipeline if it does not exist + if not pipeline: + pipeline = Pipeline( + name=pipeline_info["name"], + version=pipeline_info["version"], + description=pipeline_info.get("description"), + study_dependent=pipeline_info.get("study_dependent", False), + ace_compatible=pipeline_info.get("ace_compatible", False), + pubget_compatible=pipeline_info.get("pubget_compatible", False), + derived_from=pipeline_info.get("derived_from", None), + ) + session.add(pipeline) + + # search within the pipeline and see if there are any existing pipeline configs + # that match the "arguements" field in the pipeline_info.json + # create a hash of the config arguments + config_hash = hashlib.sha256( + json.dumps(pipeline_info["arguments"]).encode() + ).hexdigest() + pipeline_config = ( + session.query(PipelineConfig) + .filter( + PipelineConfig.pipeline_id == pipeline.id, + PipelineConfig.config_hash == config_hash, + ) + .first() + ) + # create a pipeline config if it does not exist + if not pipeline_config: + pipeline_config = PipelineConfig( + pipeline_id=pipeline.id, + config=pipeline_info["arguments"], + config_hash=config_hash, + ) + session.add(pipeline_config) + + # create a new pipeline run + pipeline_run = PipelineRun( + pipeline_id=pipeline.id, + config_id=pipeline_config.id, + ) + + # get a list of all the paper directories in the feature directory + paper_dirs = [d for d in Path(feature_directory).iterdir() if d.is_dir()] + + # for each subject directory, read the results.json file and the info.json file + pipeline_run_results = [] + for paper_dir in paper_dirs: + with open(op.join(paper_dir, "results.json")) as f: + results = json.load(f) + + with open(op.join(paper_dir, "info.json")) as f: + info = json.load(f) + + # use the directory name as the base_study_id + base_study_id = paper_dir.name + # create a new result record + pipeline_run_results.append( + PipelineRunResult( + base_study_id=base_study_id, + data=results, + date_executed=parse_date(info["date"]), + file_inputs=info["inputs"], + run=pipeline_run, + ) + ) + + session.add(pipeline_run) + session.add_all(pipeline_run_results) + + session.commit() diff --git a/store/neurostore/models/data.py b/store/neurostore/models/data.py index 809219ea..1225d2cf 100644 --- a/store/neurostore/models/data.py +++ b/store/neurostore/models/data.py @@ -589,10 +589,6 @@ class PipelineRunResult(BaseMixin, db.Model): date_executed = db.Column(db.DateTime(timezone=True)) data = db.Column(JSONB) file_inputs = db.Column(JSONB) - # feature_index = db.Column(db.Integer) # the same categories of information can be extracted multiple times from a single paper (e.g., multiple demographic groups, multiple software packages, etc) - # feature_group = db.Column(db.String) # task, disease, software, age - # feature = db.Column(db.String) # stroop task, schizophrenia, fsl - # value = db.Column(db.Float) # 0.67, 0.3, 0.5 (some measure of confidence for the result) run = relationship("PipelineRun", backref=backref("results", passive_deletes=True)) From 3c512811082090e52ade7778a3fdcddf796d4c54 Mon Sep 17 00:00:00 2001 From: James Kent Date: Mon, 13 Jan 2025 16:22:09 -0600 Subject: [PATCH 06/10] use db for ingestion, and remove extra imports --- store/neurostore/ingest/extracted_features.py | 23 +++++++------------ store/neurostore/tests/test_ingestion.py | 2 +- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/store/neurostore/ingest/extracted_features.py b/store/neurostore/ingest/extracted_features.py index a982c3a1..efb0face 100644 --- a/store/neurostore/ingest/extracted_features.py +++ b/store/neurostore/ingest/extracted_features.py @@ -2,16 +2,9 @@ import json import os.path as op -import re from pathlib import Path import hashlib - -import numpy as np -import pandas as pd -import requests -from scipy import sparse from dateutil.parser import parse as parse_date -from sqlalchemy import or_ from neurostore.database import db from neurostore.models import ( @@ -22,7 +15,7 @@ ) -def ingest_feature(feature_directory, session): +def ingest_feature(feature_directory): """Ingest demographics data into the database.""" # read pipeline_info.json from the base feature directory with open(op.join(feature_directory, "pipeline_info.json")) as f: @@ -30,7 +23,7 @@ def ingest_feature(feature_directory, session): # search if there is an existing pipeline with the same name and version pipeline = ( - session.query(Pipeline) + db.session.query(Pipeline) .filter( Pipeline.name == pipeline_info["name"], Pipeline.version == pipeline_info["version"], @@ -48,7 +41,7 @@ def ingest_feature(feature_directory, session): pubget_compatible=pipeline_info.get("pubget_compatible", False), derived_from=pipeline_info.get("derived_from", None), ) - session.add(pipeline) + db.session.add(pipeline) # search within the pipeline and see if there are any existing pipeline configs # that match the "arguements" field in the pipeline_info.json @@ -57,7 +50,7 @@ def ingest_feature(feature_directory, session): json.dumps(pipeline_info["arguments"]).encode() ).hexdigest() pipeline_config = ( - session.query(PipelineConfig) + db.session.query(PipelineConfig) .filter( PipelineConfig.pipeline_id == pipeline.id, PipelineConfig.config_hash == config_hash, @@ -71,7 +64,7 @@ def ingest_feature(feature_directory, session): config=pipeline_info["arguments"], config_hash=config_hash, ) - session.add(pipeline_config) + db.session.add(pipeline_config) # create a new pipeline run pipeline_run = PipelineRun( @@ -104,7 +97,7 @@ def ingest_feature(feature_directory, session): ) ) - session.add(pipeline_run) - session.add_all(pipeline_run_results) + db.session.add(pipeline_run) + db.session.add_all(pipeline_run_results) - session.commit() + db.session.commit() diff --git a/store/neurostore/tests/test_ingestion.py b/store/neurostore/tests/test_ingestion.py index f21b5b9d..6626616f 100644 --- a/store/neurostore/tests/test_ingestion.py +++ b/store/neurostore/tests/test_ingestion.py @@ -16,4 +16,4 @@ def test_ingest_neuroquery(ingest_neuroquery, session): def test_ingest_features(create_demographic_features, session): - ingest_feature(create_demographic_features, session) + ingest_feature(create_demographic_features) From 58c9ad4182b56abef7e2bb975828bcefa4937e85 Mon Sep 17 00:00:00 2001 From: James Kent Date: Fri, 17 Jan 2025 15:24:58 -0600 Subject: [PATCH 07/10] add crud interface for pipeline --- store/neurostore/ingest/extracted_features.py | 6 +- store/neurostore/models/__init__.py | 2 + store/neurostore/models/data.py | 13 ++ store/neurostore/resources/__init__.py | 12 + store/neurostore/resources/pipeline.py | 169 +++++++++++++++ store/neurostore/schemas/pipeline.py | 62 ++++++ .../tests/api/test_pipeline_resources.py | 205 ++++++++++++++++++ store/neurostore/tests/conftest.py | 10 +- store/neurostore/tests/test_models.py | 24 ++ store/neurostore/tests/test_schemas.py | 76 +++++++ 10 files changed, 572 insertions(+), 7 deletions(-) create mode 100644 store/neurostore/resources/pipeline.py create mode 100644 store/neurostore/schemas/pipeline.py create mode 100644 store/neurostore/tests/api/test_pipeline_resources.py diff --git a/store/neurostore/ingest/extracted_features.py b/store/neurostore/ingest/extracted_features.py index efb0face..7a2cfc4a 100644 --- a/store/neurostore/ingest/extracted_features.py +++ b/store/neurostore/ingest/extracted_features.py @@ -36,9 +36,9 @@ def ingest_feature(feature_directory): name=pipeline_info["name"], version=pipeline_info["version"], description=pipeline_info.get("description"), - study_dependent=pipeline_info.get("study_dependent", False), - ace_compatible=pipeline_info.get("ace_compatible", False), - pubget_compatible=pipeline_info.get("pubget_compatible", False), + study_dependent=True if pipeline_info.get("type", False) == "dependent" else False, + ace_compatible="ace" in pipeline_info.get("arguments", {}).get("input_sources", []), + pubget_compatible="pubget" in pipeline_info.get("arguments", {}).get("input_sources", []), derived_from=pipeline_info.get("derived_from", None), ) db.session.add(pipeline) diff --git a/store/neurostore/models/__init__.py b/store/neurostore/models/__init__.py index c20e1a9f..6a5e7478 100644 --- a/store/neurostore/models/__init__.py +++ b/store/neurostore/models/__init__.py @@ -16,6 +16,7 @@ PipelineConfig, PipelineRun, PipelineRunResult, + PipelineRunResultVote, ) from .auth import User, Role @@ -39,4 +40,5 @@ "PipelineConfig", "PipelineRun", "PipelineRunResult", + "PipelineRunResultVote", ] diff --git a/store/neurostore/models/data.py b/store/neurostore/models/data.py index 1225d2cf..8047e367 100644 --- a/store/neurostore/models/data.py +++ b/store/neurostore/models/data.py @@ -592,6 +592,19 @@ class PipelineRunResult(BaseMixin, db.Model): run = relationship("PipelineRun", backref=backref("results", passive_deletes=True)) +class PipelineRunResultVote(BaseMixin, db.Model): + __tablename__ = "pipeline_run_result_votes" + + run_result_id = db.Column( + db.Text, db.ForeignKey("pipeline_run_results.id", ondelete="CASCADE"), index=True + ) + user_id = db.Column(db.Text, db.ForeignKey("users.external_id"), index=True) + accurate = db.Column(db.Boolean) + run_result = relationship( + "PipelineRunResult", backref=backref("votes", passive_deletes=True) + ) + user = relationship("User", backref=backref("votes", passive_deletes=True)) + # from . import event_listeners # noqa E402 # del event_listeners diff --git a/store/neurostore/resources/__init__.py b/store/neurostore/resources/__init__.py index 900be462..9ff44169 100644 --- a/store/neurostore/resources/__init__.py +++ b/store/neurostore/resources/__init__.py @@ -15,6 +15,13 @@ UsersView, ) +from .pipeline import ( + PipelinesView, + PipelineConfigsView, + PipelineRunsView, + PipelineRunResultsView, + PipelineRunResultVotesView, +) __all__ = [ "StudysetsView", "AnnotationsView", @@ -27,4 +34,9 @@ "PointsView", "PointValuesView", "UsersView", + "PipelinesView", + "PipelineConfigsView", + "PipelineRunsView", + "PipelineRunResultsView", + "PipelineRunResultVotesView", ] diff --git a/store/neurostore/resources/pipeline.py b/store/neurostore/resources/pipeline.py new file mode 100644 index 00000000..828f16da --- /dev/null +++ b/store/neurostore/resources/pipeline.py @@ -0,0 +1,169 @@ +from flask import request, jsonify +from neurostore.models.data import Pipeline, PipelineConfig, PipelineRun, PipelineRunResult, PipelineRunResultVote +from neurostore.schemas.pipeline import PipelineSchema, PipelineConfigSchema, PipelineRunSchema, PipelineRunResultSchema, PipelineRunResultVoteSchema +from neurostore.database import db +from .base import ObjectView, ListView + +class PipelinesView(ObjectView, ListView): + model = Pipeline + schema = PipelineSchema + + def get(self, id=None): + if id: + pipeline = self.model.query.get(id) + return self.schema().dump(pipeline) + pipelines = self.model.query.all() + return self.schema(many=True).dump(pipelines) + + def post(self): + data = request.get_json() + pipeline = self.schema().load(data) + db.session.add(pipeline) + db.session.commit() + return self.schema().dump(pipeline), 201 + + def put(self, id): + data = request.get_json() + pipeline = self.model.query.get(id) + for key, value in data.items(): + setattr(pipeline, key, value) + db.session.commit() + return self.schema().dump(pipeline) + + def delete(self, id): + pipeline = self.model.query.get(id) + db.session.delete(pipeline) + db.session.commit() + return '', 204 + + +class PipelineConfigsView(ObjectView, ListView): + model = PipelineConfig + schema = PipelineConfigSchema + + def get(self, id=None): + if id: + pipeline_config = self.model.query.get(id) + return self.schema().dump(pipeline_config) + pipeline_configs = self.model.query.all() + return self.schema(many=True).dump(pipeline_configs) + + def post(self): + data = request.get_json() + pipeline_config = self.schema().load(data) + db.session.add(pipeline_config) + db.session.commit() + return self.schema().dump(pipeline_config), 201 + + def put(self, id): + data = request.get_json() + pipeline_config = self.model.query.get(id) + for key, value in data.items(): + setattr(pipeline_config, key, value) + db.session.commit() + return self.schema().dump(pipeline_config) + + def delete(self, id): + pipeline_config = self.model.query.get(id) + db.session.delete(pipeline_config) + db.session.commit() + return '', 204 + + +class PipelineRunsView(ObjectView, ListView): + model = PipelineRun + schema = PipelineRunSchema + + def get(self, pipeline_run_id=None): + if pipeline_run_id: + pipeline_run = self.model.query.get(pipeline_run_id) + return self.schema().dump(pipeline_run) + pipeline_runs = self.model.query.all() + return self.schema(many=True).dump(pipeline_runs) + + def post(self): + data = request.get_json() + pipeline_run = self.schema().load(data) + db.session.add(pipeline_run) + db.session.commit() + return self.schema().dump(pipeline_run), 201 + + def put(self, pipeline_run_id): + data = request.get_json() + pipeline_run = self.model.query.get(pipeline_run_id) + for key, value in data.items(): + setattr(pipeline_run, key, value) + db.session.commit() + return self.schema().dump(pipeline_run) + + def delete(self, pipeline_run_id): + pipeline_run = self.model.query.get(pipeline_run_id) + db.session.delete(pipeline_run) + db.session.commit() + return '', 204 + + +class PipelineRunResultsView(ObjectView, ListView): + model = PipelineRunResult + schema = PipelineRunResultSchema + + def get(self, pipeline_run_result_id=None): + if pipeline_run_result_id: + pipeline_run_result = self.model.query.get(pipeline_run_result_id) + return self.schema().dump(pipeline_run_result) + pipeline_run_results = self.model.query.all() + return self.schema(many=True).dump(pipeline_run_results) + + def post(self): + data = request.get_json() + pipeline_run_result = self.schema().load(data) + db.session.add(pipeline_run_result) + db.session.commit() + return self.schema().dump(pipeline_run_result), 201 + + def put(self, pipeline_run_result_id): + data = request.get_json() + pipeline_run_result = self.model.query.get(pipeline_run_result_id) + for key, value in data.items(): + setattr(pipeline_run_result, key, value) + db.session.commit() + return self.schema().dump(pipeline_run_result) + + def delete(self, pipeline_run_result_id): + pipeline_run_result = self.model.query.get(pipeline_run_result_id) + db.session.delete(pipeline_run_result) + db.session.commit() + return '', 204 + + +class PipelineRunResultVotesView(ObjectView, ListView): + model = PipelineRunResultVote + schema = PipelineRunResultVoteSchema + + def get(self, pipeline_run_result_vote_id=None): + if pipeline_run_result_vote_id: + pipeline_run_result_vote = self.model.query.get(pipeline_run_result_vote_id) + return self.schema().dump(pipeline_run_result_vote) + pipeline_run_result_votes = self.model.query.all() + return self.schema(many=True).dump(pipeline_run_result_votes) + + def post(self): + data = request.get_json() + pipeline_run_result_vote = self.schema().load(data) + db.session.add(pipeline_run_result_vote) + db.session.commit() + return self.schema().dump(pipeline_run_result_vote), 201 + + def put(self, pipeline_run_result_vote_id): + data = request.get_json() + pipeline_run_result_vote = self.model.query.get(pipeline_run_result_vote_id) + for key, value in data.items(): + setattr(pipeline_run_result_vote, key, value) + db.session.commit() + return self.schema().dump(pipeline_run_result_vote) + + def delete(self, pipeline_run_result_vote_id): + pipeline_run_result_vote = self.model.query.get(pipeline_run_result_vote_id) + db.session.delete(pipeline_run_result_vote) + db.session.commit() + return '', 204 diff --git a/store/neurostore/schemas/pipeline.py b/store/neurostore/schemas/pipeline.py new file mode 100644 index 00000000..e3d2a186 --- /dev/null +++ b/store/neurostore/schemas/pipeline.py @@ -0,0 +1,62 @@ +from marshmallow import Schema, fields, post_load +from neurostore.models.data import Pipeline, PipelineConfig, PipelineRun, PipelineRunResult, PipelineRunResultVote + +class PipelineSchema(Schema): + id = fields.String(dump_only=True) + name = fields.String(required=True) + description = fields.String() + version = fields.String() + study_dependent = fields.Boolean() + ace_compatible = fields.Boolean() + pubget_compatible = fields.Boolean() + derived_from = fields.String() + + @post_load + def make_pipeline(self, data, **kwargs): + return Pipeline(**data) + + +class PipelineConfigSchema(Schema): + id = fields.String(dump_only=True) + pipeline_id = fields.String(required=True) + config = fields.Dict(required=True) + config_hash = fields.String() + + @post_load + def make_pipeline_config(self, data, **kwargs): + return PipelineConfig(**data) + + +class PipelineRunSchema(Schema): + id = fields.String(dump_only=True) + pipeline_id = fields.String(required=True) + config_id = fields.String(required=True) + run_index = fields.Integer() + + @post_load + def make_pipeline_run(self, data, **kwargs): + return PipelineRun(**data) + + +class PipelineRunResultSchema(Schema): + id = fields.String(dump_only=True) + run_id = fields.String(required=True) + base_study_id = fields.String() + date_executed = fields.DateTime() + data = fields.Dict() + file_inputs = fields.Dict() + + @post_load + def make_pipeline_run_result(self, data, **kwargs): + return PipelineRunResult(**data) + + +class PipelineRunResultVoteSchema(Schema): + id = fields.String(dump_only=True) + run_result_id = fields.String(required=True) + user_id = fields.String(required=True) + accurate = fields.Boolean() + + @post_load + def make_pipeline_run_result_vote(self, data, **kwargs): + return PipelineRunResultVote(**data) diff --git a/store/neurostore/tests/api/test_pipeline_resources.py b/store/neurostore/tests/api/test_pipeline_resources.py new file mode 100644 index 00000000..ed42964c --- /dev/null +++ b/store/neurostore/tests/api/test_pipeline_resources.py @@ -0,0 +1,205 @@ +import pytest +from flask import url_for +from neurostore.models.data import Pipeline, PipelineConfig, PipelineRun, PipelineRunResult, PipelineRunResultVote, BaseStudy +from neurostore.schemas.pipeline import PipelineSchema, PipelineConfigSchema, PipelineRunSchema, PipelineRunResultSchema, PipelineRunResultVoteSchema +from neurostore.database import db + +@pytest.fixture +def pipeline(session, pipeline_payload): + pipeline = Pipeline(**pipeline_payload) + session.add(pipeline) + session.commit() + return pipeline + +@pytest.fixture +def pipeline_config(session, pipeline_config_payload): + pipeline_config = PipelineConfig(**pipeline_config_payload) + session.add(pipeline_config) + session.commit() + return pipeline_config + +@pytest.fixture +def pipeline_run(session, pipeline_run_payload): + pipeline_run = PipelineRun(**pipeline_run_payload) + session.add(pipeline_run) + session.commit() + return pipeline_run + +@pytest.fixture +def pipeline_run_result(session, pipeline_run_result_payload): + pipeline_run_result = PipelineRunResult(**pipeline_run_result_payload) + session.add(pipeline_run_result) + session.commit() + return pipeline_run_result + + +@pytest.fixture +def pipeline_payload(): + return { + "name": "Test Pipeline", + "description": "A test pipeline", + "version": "1.0", + "study_dependent": True, + "ace_compatible": False, + "pubget_compatible": True, + "derived_from": "Base Pipeline" + } + +@pytest.fixture +def pipeline_config_payload(pipeline): + return { + "pipeline_id": pipeline.id, + "config": {"param1": "value1", "param2": "value2"}, + "config_hash": "abc123" + } + +@pytest.fixture +def pipeline_run_payload(pipeline, pipeline_config): + return { + "pipeline_id": pipeline.id, + "config_id": pipeline_config.id, + "run_index": 1 + } + +@pytest.fixture +def pipeline_run_result_payload(pipeline_run): + base_study = BaseStudy(name="Test Study") + db.session.add(base_study) + db.session.commit() + return { + "run_id": pipeline_run.id, + "base_study_id": base_study.id, + "date_executed": "2023-01-01T00:00:00Z", + "data": {"result": "success"}, + "file_inputs": {"input1": "file1"} + } + + +def test_create_pipeline(auth_client, pipeline_payload): + response = auth_client.post('/api/pipelines/', data=pipeline_payload) + assert response.status_code == 201 + data = response.json() + assert data["name"] == pipeline_payload["name"] + +def test_get_pipeline(auth_client, pipeline_payload, session): + pipeline = Pipeline(**pipeline_payload) + db.session.add(pipeline) + db.session.commit() + response = auth_client.get(f'/api/pipelines/{pipeline.id}') + assert response.status_code == 200 + data = response.json() + assert data["name"] == pipeline_payload["name"] + +def test_update_pipeline(auth_client, pipeline_payload, session): + pipeline = Pipeline(**pipeline_payload) + db.session.add(pipeline) + db.session.commit() + updated_payload = {"name": "Updated Pipeline"} + response = auth_client.put(f'/api/pipelines/{pipeline.id}', data=updated_payload) + assert response.status_code == 200 + data = response.json() + assert data["name"] == "Updated Pipeline" + +def test_delete_pipeline(auth_client, pipeline_payload): + pipeline = Pipeline(**pipeline_payload) + db.session.add(pipeline) + db.session.commit() + response = auth_client.delete(f'/api/pipelines/{pipeline.id}') + assert response.status_code == 204 + + +def test_create_pipeline_config(auth_client, pipeline_config_payload, session): + response = auth_client.post('/api/pipeline-configs/', data=pipeline_config_payload) + assert response.status_code == 201 + data = response.json() + assert data["config"] == pipeline_config_payload["config"] + +def test_get_pipeline_config(auth_client, pipeline_config_payload, session): + pipeline_config = PipelineConfig(**pipeline_config_payload) + db.session.add(pipeline_config) + db.session.commit() + response = auth_client.get(f'/api/pipeline-configs/{pipeline_config.id}') + assert response.status_code == 200 + data = response.json() + assert data["config"] == pipeline_config_payload["config"] + +def test_update_pipeline_config(auth_client, pipeline_config_payload, session): + pipeline_config = PipelineConfig(**pipeline_config_payload) + db.session.add(pipeline_config) + db.session.commit() + updated_payload = {"config": {"param1": "new_value"}} + response = auth_client.put(f'/api/pipeline-configs/{pipeline_config.id}', data=updated_payload) + assert response.status_code == 200 + data = response.json() + assert data["config"] == {"param1": "new_value"} + +def test_delete_pipeline_config(auth_client, pipeline_config_payload, session): + pipeline_config = PipelineConfig(**pipeline_config_payload) + db.session.add(pipeline_config) + db.session.commit() + response = auth_client.delete(f'/api/pipeline-configs/{pipeline_config.id}') + assert response.status_code == 204 + +def test_create_pipeline_run(auth_client, pipeline_run_payload, session): + response = auth_client.post('/api/pipeline-runs/', data=pipeline_run_payload) + assert response.status_code == 201 + data = response.json() + assert data["pipeline_id"] == pipeline_run_payload["pipeline_id"] + +def test_get_pipeline_run(auth_client, pipeline_run_payload, session): + pipeline_run = PipelineRun(**pipeline_run_payload) + db.session.add(pipeline_run) + db.session.commit() + response = auth_client.get(f'/api/pipeline-runs/{pipeline_run.id}') + assert response.status_code == 200 + data = response.json() + assert data["pipeline_id"] == pipeline_run_payload["pipeline_id"] + +def test_update_pipeline_run(auth_client, pipeline_run_payload, session): + pipeline_run = PipelineRun(**pipeline_run_payload) + db.session.add(pipeline_run) + db.session.commit() + updated_payload = {"run_index": 2} + response = auth_client.put(f'/api/pipeline-runs/{pipeline_run.id}', data=updated_payload) + assert response.status_code == 200 + data = response.json() + assert data["run_index"] == 2 + +def test_delete_pipeline_run(auth_client, pipeline_run_payload, session): + pipeline_run = PipelineRun(**pipeline_run_payload) + db.session.add(pipeline_run) + db.session.commit() + response = auth_client.delete(f'/api/pipeline-runs/{pipeline_run.id}') + assert response.status_code == 204 + +def test_create_pipeline_run_result(auth_client, pipeline_run_result_payload, session): + response = auth_client.post('/api/pipeline-run-results/', data=pipeline_run_result_payload) + assert response.status_code == 201 + data = response.json() + assert data["run_id"] == pipeline_run_result_payload["run_id"] + +def test_get_pipeline_run_result(auth_client, pipeline_run_result_payload, session): + pipeline_run_result = PipelineRunResult(**pipeline_run_result_payload) + db.session.add(pipeline_run_result) + db.session.commit() + response = auth_client.get(f'/api/pipeline-run-results/{pipeline_run_result.id}') + assert response.status_code == 200 + data = response.json() + assert data["run_id"] == pipeline_run_result_payload["run_id"] + +def test_update_pipeline_run_result(auth_client, pipeline_run_result_payload, session): + pipeline_run_result = PipelineRunResult(**pipeline_run_result_payload) + db.session.add(pipeline_run_result) + db.session.commit() + updated_payload = {"data": {"result": "failure"}} + response = auth_client.put(f'/api/pipeline-run-results/{pipeline_run_result.id}', data=updated_payload) + assert response.status_code == 200 + data = response.json() + assert data["data"] == {"result": "failure"} + +def test_delete_pipeline_run_result(auth_client, pipeline_run_result_payload, session): + pipeline_run_result = PipelineRunResult(**pipeline_run_result_payload) + db.session.add(pipeline_run_result) + db.session.commit() + response = auth_client.delete(f'/api/pipeline-run-results/{pipeline_run_result.id}') + assert response.status_code == 204 diff --git a/store/neurostore/tests/conftest.py b/store/neurostore/tests/conftest.py index bfda7d7b..b292e2b3 100644 --- a/store/neurostore/tests/conftest.py +++ b/store/neurostore/tests/conftest.py @@ -598,11 +598,13 @@ def create_demographic_features(session, ingest_neurosynth, tmp_path): "name": "demographics", "version": "v1.0.0", "description": "demographic features", - "study_dependent": False, - "ace_compatible": True, - "pubget_compatible": True, + "type": "independent", "derived_from": None, - "arguments": {"parallel": 1}, + "arguments": { + "parallel": 1, + "inputs": ['text'], + "input_sources": ['pubget'], + }, } with open(output_dir / "pipeline_info.json", "w") as f: json.dump(pipeline_info, f) diff --git a/store/neurostore/tests/test_models.py b/store/neurostore/tests/test_models.py index cee5827a..08553906 100644 --- a/store/neurostore/tests/test_models.py +++ b/store/neurostore/tests/test_models.py @@ -7,6 +7,11 @@ PointValue, Image, Studyset, + Pipeline, + PipelineConfig, + PipelineRun, + PipelineRunResult, + PipelineRunResultVote, ) @@ -40,3 +45,22 @@ def test_Image(): def test_Studyset(): Studyset() + +def test_Pipeline(): + Pipeline() + + +def test_PipelineConfig(): + PipelineConfig() + + +def test_PipelineRun(): + PipelineRun() + + +def test_PipelineRunResult(): + PipelineRunResult() + + +def test_PipelineRunResultVote(): + PipelineRunResultVote() diff --git a/store/neurostore/tests/test_schemas.py b/store/neurostore/tests/test_schemas.py index 4ddd0fb7..094fcf02 100644 --- a/store/neurostore/tests/test_schemas.py +++ b/store/neurostore/tests/test_schemas.py @@ -40,3 +40,79 @@ def test_compare_dataset_with_snapshot(ingest_neurosynth): quick_ss = StudysetSnapshot().dump(studyset) assert marshmallow_ss == quick_ss + +import pytest +from neurostore.schemas.pipeline import PipelineSchema, PipelineConfigSchema, PipelineRunSchema, PipelineRunResultSchema, PipelineRunResultVoteSchema +from neurostore.models.data import Pipeline, PipelineConfig, PipelineRun, PipelineRunResult, PipelineRunResultVote + +def test_PipelineSchema(): + payload = { + "name": "Test Pipeline", + "description": "A test pipeline", + "version": "1.0", + "study_dependent": True, + "ace_compatible": False, + "pubget_compatible": True, + "derived_from": "Base Pipeline" + } + schema = PipelineSchema() + result = schema.load(payload) + assert result.name == "Test Pipeline" + assert result.description == "A test pipeline" + assert result.version == "1.0" + assert result.study_dependent is True + assert result.ace_compatible is False + assert result.pubget_compatible is True + assert result.derived_from == "Base Pipeline" + +def test_PipelineConfigSchema(): + payload = { + "pipeline_id": "123", + "config": {"param1": "value1", "param2": "value2"}, + "config_hash": "abc123" + } + schema = PipelineConfigSchema() + result = schema.load(payload) + assert result.pipeline_id == "123" + assert result.config == {"param1": "value1", "param2": "value2"} + assert result.config_hash == "abc123" + +def test_PipelineRunSchema(): + payload = { + "pipeline_id": "123", + "config_id": "456", + "run_index": 1 + } + schema = PipelineRunSchema() + result = schema.load(payload) + assert result.pipeline_id == "123" + assert result.config_id == "456" + assert result.run_index == 1 + +def test_PipelineRunResultSchema(): + payload = { + "run_id": "123", + "base_study_id": "456", + "date_executed": "2023-01-01T00:00:00Z", + "data": {"result": "success"}, + "file_inputs": {"input1": "file1"} + } + schema = PipelineRunResultSchema() + result = schema.load(payload) + assert result.run_id == "123" + assert result.base_study_id == "456" + assert result.date_executed.isoformat() == "2023-01-01T00:00:00+00:00" + assert result.data == {"result": "success"} + assert result.file_inputs == {"input1": "file1"} + +def test_PipelineRunResultVoteSchema(): + payload = { + "run_result_id": "123", + "user_id": "456", + "accurate": True + } + schema = PipelineRunResultVoteSchema() + result = schema.load(payload) + assert result.run_result_id == "123" + assert result.user_id == "456" + assert result.accurate is True From 3b6e3e4610efc0e4eca877f33641de92c2d84b0f Mon Sep 17 00:00:00 2001 From: James Kent Date: Fri, 17 Jan 2025 16:24:35 -0600 Subject: [PATCH 08/10] update openapi --- store/neurostore/openapi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/neurostore/openapi b/store/neurostore/openapi index 3e4cba60..9539b36b 160000 --- a/store/neurostore/openapi +++ b/store/neurostore/openapi @@ -1 +1 @@ -Subproject commit 3e4cba60f52a6c6bdd1ac5b55cb70d0ae3399aab +Subproject commit 9539b36b6a85077b80a865ec3e11cf9ee176f0cb From bc1ccef71906ad992abaa7aa2972c4d83da87e32 Mon Sep 17 00:00:00 2001 From: James Kent Date: Fri, 17 Jan 2025 16:27:08 -0600 Subject: [PATCH 09/10] style with black --- store/neurostore/ingest/extracted_features.py | 10 ++- store/neurostore/models/data.py | 5 +- store/neurostore/resources/__init__.py | 1 + store/neurostore/resources/pipeline.py | 27 ++++-- store/neurostore/schemas/pipeline.py | 9 +- .../tests/api/test_pipeline_resources.py | 90 +++++++++++++------ store/neurostore/tests/conftest.py | 4 +- store/neurostore/tests/test_models.py | 1 + store/neurostore/tests/test_schemas.py | 40 +++++---- 9 files changed, 132 insertions(+), 55 deletions(-) diff --git a/store/neurostore/ingest/extracted_features.py b/store/neurostore/ingest/extracted_features.py index 7a2cfc4a..43b7c2c6 100644 --- a/store/neurostore/ingest/extracted_features.py +++ b/store/neurostore/ingest/extracted_features.py @@ -36,9 +36,13 @@ def ingest_feature(feature_directory): name=pipeline_info["name"], version=pipeline_info["version"], description=pipeline_info.get("description"), - study_dependent=True if pipeline_info.get("type", False) == "dependent" else False, - ace_compatible="ace" in pipeline_info.get("arguments", {}).get("input_sources", []), - pubget_compatible="pubget" in pipeline_info.get("arguments", {}).get("input_sources", []), + study_dependent=( + True if pipeline_info.get("type", False) == "dependent" else False + ), + ace_compatible="ace" + in pipeline_info.get("arguments", {}).get("input_sources", []), + pubget_compatible="pubget" + in pipeline_info.get("arguments", {}).get("input_sources", []), derived_from=pipeline_info.get("derived_from", None), ) db.session.add(pipeline) diff --git a/store/neurostore/models/data.py b/store/neurostore/models/data.py index 8047e367..f0f297e5 100644 --- a/store/neurostore/models/data.py +++ b/store/neurostore/models/data.py @@ -596,7 +596,9 @@ class PipelineRunResultVote(BaseMixin, db.Model): __tablename__ = "pipeline_run_result_votes" run_result_id = db.Column( - db.Text, db.ForeignKey("pipeline_run_results.id", ondelete="CASCADE"), index=True + db.Text, + db.ForeignKey("pipeline_run_results.id", ondelete="CASCADE"), + index=True, ) user_id = db.Column(db.Text, db.ForeignKey("users.external_id"), index=True) accurate = db.Column(db.Boolean) @@ -605,6 +607,7 @@ class PipelineRunResultVote(BaseMixin, db.Model): ) user = relationship("User", backref=backref("votes", passive_deletes=True)) + # from . import event_listeners # noqa E402 # del event_listeners diff --git a/store/neurostore/resources/__init__.py b/store/neurostore/resources/__init__.py index 9ff44169..dabe4a01 100644 --- a/store/neurostore/resources/__init__.py +++ b/store/neurostore/resources/__init__.py @@ -22,6 +22,7 @@ PipelineRunResultsView, PipelineRunResultVotesView, ) + __all__ = [ "StudysetsView", "AnnotationsView", diff --git a/store/neurostore/resources/pipeline.py b/store/neurostore/resources/pipeline.py index 828f16da..c5d3d62f 100644 --- a/store/neurostore/resources/pipeline.py +++ b/store/neurostore/resources/pipeline.py @@ -1,9 +1,22 @@ from flask import request, jsonify -from neurostore.models.data import Pipeline, PipelineConfig, PipelineRun, PipelineRunResult, PipelineRunResultVote -from neurostore.schemas.pipeline import PipelineSchema, PipelineConfigSchema, PipelineRunSchema, PipelineRunResultSchema, PipelineRunResultVoteSchema +from neurostore.models.data import ( + Pipeline, + PipelineConfig, + PipelineRun, + PipelineRunResult, + PipelineRunResultVote, +) +from neurostore.schemas.pipeline import ( + PipelineSchema, + PipelineConfigSchema, + PipelineRunSchema, + PipelineRunResultSchema, + PipelineRunResultVoteSchema, +) from neurostore.database import db from .base import ObjectView, ListView + class PipelinesView(ObjectView, ListView): model = Pipeline schema = PipelineSchema @@ -34,7 +47,7 @@ def delete(self, id): pipeline = self.model.query.get(id) db.session.delete(pipeline) db.session.commit() - return '', 204 + return "", 204 class PipelineConfigsView(ObjectView, ListView): @@ -67,7 +80,7 @@ def delete(self, id): pipeline_config = self.model.query.get(id) db.session.delete(pipeline_config) db.session.commit() - return '', 204 + return "", 204 class PipelineRunsView(ObjectView, ListView): @@ -100,7 +113,7 @@ def delete(self, pipeline_run_id): pipeline_run = self.model.query.get(pipeline_run_id) db.session.delete(pipeline_run) db.session.commit() - return '', 204 + return "", 204 class PipelineRunResultsView(ObjectView, ListView): @@ -133,7 +146,7 @@ def delete(self, pipeline_run_result_id): pipeline_run_result = self.model.query.get(pipeline_run_result_id) db.session.delete(pipeline_run_result) db.session.commit() - return '', 204 + return "", 204 class PipelineRunResultVotesView(ObjectView, ListView): @@ -166,4 +179,4 @@ def delete(self, pipeline_run_result_vote_id): pipeline_run_result_vote = self.model.query.get(pipeline_run_result_vote_id) db.session.delete(pipeline_run_result_vote) db.session.commit() - return '', 204 + return "", 204 diff --git a/store/neurostore/schemas/pipeline.py b/store/neurostore/schemas/pipeline.py index e3d2a186..f1ea8a93 100644 --- a/store/neurostore/schemas/pipeline.py +++ b/store/neurostore/schemas/pipeline.py @@ -1,5 +1,12 @@ from marshmallow import Schema, fields, post_load -from neurostore.models.data import Pipeline, PipelineConfig, PipelineRun, PipelineRunResult, PipelineRunResultVote +from neurostore.models.data import ( + Pipeline, + PipelineConfig, + PipelineRun, + PipelineRunResult, + PipelineRunResultVote, +) + class PipelineSchema(Schema): id = fields.String(dump_only=True) diff --git a/store/neurostore/tests/api/test_pipeline_resources.py b/store/neurostore/tests/api/test_pipeline_resources.py index ed42964c..136f9172 100644 --- a/store/neurostore/tests/api/test_pipeline_resources.py +++ b/store/neurostore/tests/api/test_pipeline_resources.py @@ -1,9 +1,23 @@ import pytest from flask import url_for -from neurostore.models.data import Pipeline, PipelineConfig, PipelineRun, PipelineRunResult, PipelineRunResultVote, BaseStudy -from neurostore.schemas.pipeline import PipelineSchema, PipelineConfigSchema, PipelineRunSchema, PipelineRunResultSchema, PipelineRunResultVoteSchema +from neurostore.models.data import ( + Pipeline, + PipelineConfig, + PipelineRun, + PipelineRunResult, + PipelineRunResultVote, + BaseStudy, +) +from neurostore.schemas.pipeline import ( + PipelineSchema, + PipelineConfigSchema, + PipelineRunSchema, + PipelineRunResultSchema, + PipelineRunResultVoteSchema, +) from neurostore.database import db + @pytest.fixture def pipeline(session, pipeline_payload): pipeline = Pipeline(**pipeline_payload) @@ -11,6 +25,7 @@ def pipeline(session, pipeline_payload): session.commit() return pipeline + @pytest.fixture def pipeline_config(session, pipeline_config_payload): pipeline_config = PipelineConfig(**pipeline_config_payload) @@ -18,6 +33,7 @@ def pipeline_config(session, pipeline_config_payload): session.commit() return pipeline_config + @pytest.fixture def pipeline_run(session, pipeline_run_payload): pipeline_run = PipelineRun(**pipeline_run_payload) @@ -25,6 +41,7 @@ def pipeline_run(session, pipeline_run_payload): session.commit() return pipeline_run + @pytest.fixture def pipeline_run_result(session, pipeline_run_result_payload): pipeline_run_result = PipelineRunResult(**pipeline_run_result_payload) @@ -42,24 +59,23 @@ def pipeline_payload(): "study_dependent": True, "ace_compatible": False, "pubget_compatible": True, - "derived_from": "Base Pipeline" + "derived_from": "Base Pipeline", } + @pytest.fixture def pipeline_config_payload(pipeline): return { "pipeline_id": pipeline.id, "config": {"param1": "value1", "param2": "value2"}, - "config_hash": "abc123" + "config_hash": "abc123", } + @pytest.fixture def pipeline_run_payload(pipeline, pipeline_config): - return { - "pipeline_id": pipeline.id, - "config_id": pipeline_config.id, - "run_index": 1 - } + return {"pipeline_id": pipeline.id, "config_id": pipeline_config.id, "run_index": 1} + @pytest.fixture def pipeline_run_result_payload(pipeline_run): @@ -71,135 +87,157 @@ def pipeline_run_result_payload(pipeline_run): "base_study_id": base_study.id, "date_executed": "2023-01-01T00:00:00Z", "data": {"result": "success"}, - "file_inputs": {"input1": "file1"} + "file_inputs": {"input1": "file1"}, } def test_create_pipeline(auth_client, pipeline_payload): - response = auth_client.post('/api/pipelines/', data=pipeline_payload) + response = auth_client.post("/api/pipelines/", data=pipeline_payload) assert response.status_code == 201 data = response.json() assert data["name"] == pipeline_payload["name"] + def test_get_pipeline(auth_client, pipeline_payload, session): pipeline = Pipeline(**pipeline_payload) db.session.add(pipeline) db.session.commit() - response = auth_client.get(f'/api/pipelines/{pipeline.id}') + response = auth_client.get(f"/api/pipelines/{pipeline.id}") assert response.status_code == 200 data = response.json() assert data["name"] == pipeline_payload["name"] + def test_update_pipeline(auth_client, pipeline_payload, session): pipeline = Pipeline(**pipeline_payload) db.session.add(pipeline) db.session.commit() updated_payload = {"name": "Updated Pipeline"} - response = auth_client.put(f'/api/pipelines/{pipeline.id}', data=updated_payload) + response = auth_client.put(f"/api/pipelines/{pipeline.id}", data=updated_payload) assert response.status_code == 200 data = response.json() assert data["name"] == "Updated Pipeline" + def test_delete_pipeline(auth_client, pipeline_payload): pipeline = Pipeline(**pipeline_payload) db.session.add(pipeline) db.session.commit() - response = auth_client.delete(f'/api/pipelines/{pipeline.id}') + response = auth_client.delete(f"/api/pipelines/{pipeline.id}") assert response.status_code == 204 def test_create_pipeline_config(auth_client, pipeline_config_payload, session): - response = auth_client.post('/api/pipeline-configs/', data=pipeline_config_payload) + response = auth_client.post("/api/pipeline-configs/", data=pipeline_config_payload) assert response.status_code == 201 data = response.json() assert data["config"] == pipeline_config_payload["config"] + def test_get_pipeline_config(auth_client, pipeline_config_payload, session): pipeline_config = PipelineConfig(**pipeline_config_payload) db.session.add(pipeline_config) db.session.commit() - response = auth_client.get(f'/api/pipeline-configs/{pipeline_config.id}') + response = auth_client.get(f"/api/pipeline-configs/{pipeline_config.id}") assert response.status_code == 200 data = response.json() assert data["config"] == pipeline_config_payload["config"] + def test_update_pipeline_config(auth_client, pipeline_config_payload, session): pipeline_config = PipelineConfig(**pipeline_config_payload) db.session.add(pipeline_config) db.session.commit() updated_payload = {"config": {"param1": "new_value"}} - response = auth_client.put(f'/api/pipeline-configs/{pipeline_config.id}', data=updated_payload) + response = auth_client.put( + f"/api/pipeline-configs/{pipeline_config.id}", data=updated_payload + ) assert response.status_code == 200 data = response.json() assert data["config"] == {"param1": "new_value"} + def test_delete_pipeline_config(auth_client, pipeline_config_payload, session): pipeline_config = PipelineConfig(**pipeline_config_payload) db.session.add(pipeline_config) db.session.commit() - response = auth_client.delete(f'/api/pipeline-configs/{pipeline_config.id}') + response = auth_client.delete(f"/api/pipeline-configs/{pipeline_config.id}") assert response.status_code == 204 + def test_create_pipeline_run(auth_client, pipeline_run_payload, session): - response = auth_client.post('/api/pipeline-runs/', data=pipeline_run_payload) + response = auth_client.post("/api/pipeline-runs/", data=pipeline_run_payload) assert response.status_code == 201 data = response.json() assert data["pipeline_id"] == pipeline_run_payload["pipeline_id"] + def test_get_pipeline_run(auth_client, pipeline_run_payload, session): pipeline_run = PipelineRun(**pipeline_run_payload) db.session.add(pipeline_run) db.session.commit() - response = auth_client.get(f'/api/pipeline-runs/{pipeline_run.id}') + response = auth_client.get(f"/api/pipeline-runs/{pipeline_run.id}") assert response.status_code == 200 data = response.json() assert data["pipeline_id"] == pipeline_run_payload["pipeline_id"] + def test_update_pipeline_run(auth_client, pipeline_run_payload, session): pipeline_run = PipelineRun(**pipeline_run_payload) db.session.add(pipeline_run) db.session.commit() updated_payload = {"run_index": 2} - response = auth_client.put(f'/api/pipeline-runs/{pipeline_run.id}', data=updated_payload) + response = auth_client.put( + f"/api/pipeline-runs/{pipeline_run.id}", data=updated_payload + ) assert response.status_code == 200 data = response.json() assert data["run_index"] == 2 + def test_delete_pipeline_run(auth_client, pipeline_run_payload, session): pipeline_run = PipelineRun(**pipeline_run_payload) db.session.add(pipeline_run) db.session.commit() - response = auth_client.delete(f'/api/pipeline-runs/{pipeline_run.id}') + response = auth_client.delete(f"/api/pipeline-runs/{pipeline_run.id}") assert response.status_code == 204 + def test_create_pipeline_run_result(auth_client, pipeline_run_result_payload, session): - response = auth_client.post('/api/pipeline-run-results/', data=pipeline_run_result_payload) + response = auth_client.post( + "/api/pipeline-run-results/", data=pipeline_run_result_payload + ) assert response.status_code == 201 data = response.json() assert data["run_id"] == pipeline_run_result_payload["run_id"] + def test_get_pipeline_run_result(auth_client, pipeline_run_result_payload, session): pipeline_run_result = PipelineRunResult(**pipeline_run_result_payload) db.session.add(pipeline_run_result) db.session.commit() - response = auth_client.get(f'/api/pipeline-run-results/{pipeline_run_result.id}') + response = auth_client.get(f"/api/pipeline-run-results/{pipeline_run_result.id}") assert response.status_code == 200 data = response.json() assert data["run_id"] == pipeline_run_result_payload["run_id"] + def test_update_pipeline_run_result(auth_client, pipeline_run_result_payload, session): pipeline_run_result = PipelineRunResult(**pipeline_run_result_payload) db.session.add(pipeline_run_result) db.session.commit() updated_payload = {"data": {"result": "failure"}} - response = auth_client.put(f'/api/pipeline-run-results/{pipeline_run_result.id}', data=updated_payload) + response = auth_client.put( + f"/api/pipeline-run-results/{pipeline_run_result.id}", data=updated_payload + ) assert response.status_code == 200 data = response.json() assert data["data"] == {"result": "failure"} + def test_delete_pipeline_run_result(auth_client, pipeline_run_result_payload, session): pipeline_run_result = PipelineRunResult(**pipeline_run_result_payload) db.session.add(pipeline_run_result) db.session.commit() - response = auth_client.delete(f'/api/pipeline-run-results/{pipeline_run_result.id}') + response = auth_client.delete(f"/api/pipeline-run-results/{pipeline_run_result.id}") assert response.status_code == 204 diff --git a/store/neurostore/tests/conftest.py b/store/neurostore/tests/conftest.py index b292e2b3..aff5ad89 100644 --- a/store/neurostore/tests/conftest.py +++ b/store/neurostore/tests/conftest.py @@ -602,8 +602,8 @@ def create_demographic_features(session, ingest_neurosynth, tmp_path): "derived_from": None, "arguments": { "parallel": 1, - "inputs": ['text'], - "input_sources": ['pubget'], + "inputs": ["text"], + "input_sources": ["pubget"], }, } with open(output_dir / "pipeline_info.json", "w") as f: diff --git a/store/neurostore/tests/test_models.py b/store/neurostore/tests/test_models.py index 08553906..9305f095 100644 --- a/store/neurostore/tests/test_models.py +++ b/store/neurostore/tests/test_models.py @@ -46,6 +46,7 @@ def test_Image(): def test_Studyset(): Studyset() + def test_Pipeline(): Pipeline() diff --git a/store/neurostore/tests/test_schemas.py b/store/neurostore/tests/test_schemas.py index 094fcf02..6d477ece 100644 --- a/store/neurostore/tests/test_schemas.py +++ b/store/neurostore/tests/test_schemas.py @@ -41,9 +41,23 @@ def test_compare_dataset_with_snapshot(ingest_neurosynth): assert marshmallow_ss == quick_ss + import pytest -from neurostore.schemas.pipeline import PipelineSchema, PipelineConfigSchema, PipelineRunSchema, PipelineRunResultSchema, PipelineRunResultVoteSchema -from neurostore.models.data import Pipeline, PipelineConfig, PipelineRun, PipelineRunResult, PipelineRunResultVote +from neurostore.schemas.pipeline import ( + PipelineSchema, + PipelineConfigSchema, + PipelineRunSchema, + PipelineRunResultSchema, + PipelineRunResultVoteSchema, +) +from neurostore.models.data import ( + Pipeline, + PipelineConfig, + PipelineRun, + PipelineRunResult, + PipelineRunResultVote, +) + def test_PipelineSchema(): payload = { @@ -53,7 +67,7 @@ def test_PipelineSchema(): "study_dependent": True, "ace_compatible": False, "pubget_compatible": True, - "derived_from": "Base Pipeline" + "derived_from": "Base Pipeline", } schema = PipelineSchema() result = schema.load(payload) @@ -65,11 +79,12 @@ def test_PipelineSchema(): assert result.pubget_compatible is True assert result.derived_from == "Base Pipeline" + def test_PipelineConfigSchema(): payload = { "pipeline_id": "123", "config": {"param1": "value1", "param2": "value2"}, - "config_hash": "abc123" + "config_hash": "abc123", } schema = PipelineConfigSchema() result = schema.load(payload) @@ -77,25 +92,23 @@ def test_PipelineConfigSchema(): assert result.config == {"param1": "value1", "param2": "value2"} assert result.config_hash == "abc123" + def test_PipelineRunSchema(): - payload = { - "pipeline_id": "123", - "config_id": "456", - "run_index": 1 - } + payload = {"pipeline_id": "123", "config_id": "456", "run_index": 1} schema = PipelineRunSchema() result = schema.load(payload) assert result.pipeline_id == "123" assert result.config_id == "456" assert result.run_index == 1 + def test_PipelineRunResultSchema(): payload = { "run_id": "123", "base_study_id": "456", "date_executed": "2023-01-01T00:00:00Z", "data": {"result": "success"}, - "file_inputs": {"input1": "file1"} + "file_inputs": {"input1": "file1"}, } schema = PipelineRunResultSchema() result = schema.load(payload) @@ -105,12 +118,9 @@ def test_PipelineRunResultSchema(): assert result.data == {"result": "success"} assert result.file_inputs == {"input1": "file1"} + def test_PipelineRunResultVoteSchema(): - payload = { - "run_result_id": "123", - "user_id": "456", - "accurate": True - } + payload = {"run_result_id": "123", "user_id": "456", "accurate": True} schema = PipelineRunResultVoteSchema() result = schema.load(payload) assert result.run_result_id == "123" From 1e225fb02e888efbcba978cbf91e66ffccfd38a2 Mon Sep 17 00:00:00 2001 From: James Kent Date: Fri, 17 Jan 2025 16:39:42 -0600 Subject: [PATCH 10/10] pacify flake8 --- store/neurostore/resources/pipeline.py | 2 +- .../tests/api/test_pipeline_resources.py | 10 +------- store/neurostore/tests/test_schemas.py | 25 ++++++------------- 3 files changed, 9 insertions(+), 28 deletions(-) diff --git a/store/neurostore/resources/pipeline.py b/store/neurostore/resources/pipeline.py index c5d3d62f..3cafddd7 100644 --- a/store/neurostore/resources/pipeline.py +++ b/store/neurostore/resources/pipeline.py @@ -1,4 +1,4 @@ -from flask import request, jsonify +from flask import request from neurostore.models.data import ( Pipeline, PipelineConfig, diff --git a/store/neurostore/tests/api/test_pipeline_resources.py b/store/neurostore/tests/api/test_pipeline_resources.py index 136f9172..55ab163e 100644 --- a/store/neurostore/tests/api/test_pipeline_resources.py +++ b/store/neurostore/tests/api/test_pipeline_resources.py @@ -1,20 +1,12 @@ import pytest -from flask import url_for from neurostore.models.data import ( Pipeline, PipelineConfig, PipelineRun, PipelineRunResult, - PipelineRunResultVote, BaseStudy, ) -from neurostore.schemas.pipeline import ( - PipelineSchema, - PipelineConfigSchema, - PipelineRunSchema, - PipelineRunResultSchema, - PipelineRunResultVoteSchema, -) + from neurostore.database import db diff --git a/store/neurostore/tests/test_schemas.py b/store/neurostore/tests/test_schemas.py index 6d477ece..c1e86914 100644 --- a/store/neurostore/tests/test_schemas.py +++ b/store/neurostore/tests/test_schemas.py @@ -2,7 +2,13 @@ from ..schemas import StudySchema, StudysetSchema, StudysetSnapshot from ..models import Study, Studyset - +from neurostore.schemas.pipeline import ( + PipelineSchema, + PipelineConfigSchema, + PipelineRunSchema, + PipelineRunResultSchema, + PipelineRunResultVoteSchema, +) # Things I the schemas to do: # 1. Cloning: I need a deep copy of the object, with new versions of all the sub-objects # a. cloning a study, create new everything @@ -42,23 +48,6 @@ def test_compare_dataset_with_snapshot(ingest_neurosynth): assert marshmallow_ss == quick_ss -import pytest -from neurostore.schemas.pipeline import ( - PipelineSchema, - PipelineConfigSchema, - PipelineRunSchema, - PipelineRunResultSchema, - PipelineRunResultVoteSchema, -) -from neurostore.models.data import ( - Pipeline, - PipelineConfig, - PipelineRun, - PipelineRunResult, - PipelineRunResultVote, -) - - def test_PipelineSchema(): payload = { "name": "Test Pipeline",