Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: add feature tables #842

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions store/neurostore/ingest/extracted_features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""Ingest extracted features into the database."""

import json
import os.path as op
from pathlib import Path
import hashlib
from dateutil.parser import parse as parse_date

from neurostore.database import db
from neurostore.models import (
Pipeline,
PipelineConfig,
PipelineRun,
PipelineRunResult,
)


def ingest_feature(feature_directory):
"""Ingest demographics data into the database."""
# read pipeline_info.json from the base feature directory
with open(op.join(feature_directory, "pipeline_info.json")) as f:
pipeline_info = json.load(f)

# search if there is an existing pipeline with the same name and version
pipeline = (
db.session.query(Pipeline)
.filter(
Pipeline.name == pipeline_info["name"],
Pipeline.version == pipeline_info["version"],
)
.first()
)
# create a pipeline if it does not exist
if not pipeline:
pipeline = Pipeline(
name=pipeline_info["name"],
version=pipeline_info["version"],
description=pipeline_info.get("description"),
study_dependent=pipeline_info.get("study_dependent", False),
ace_compatible=pipeline_info.get("ace_compatible", False),
pubget_compatible=pipeline_info.get("pubget_compatible", False),
derived_from=pipeline_info.get("derived_from", None),
)
db.session.add(pipeline)

# search within the pipeline and see if there are any existing pipeline configs
# that match the "arguements" field in the pipeline_info.json
# create a hash of the config arguments
config_hash = hashlib.sha256(
json.dumps(pipeline_info["arguments"]).encode()
).hexdigest()
pipeline_config = (
db.session.query(PipelineConfig)
.filter(
PipelineConfig.pipeline_id == pipeline.id,
PipelineConfig.config_hash == config_hash,
)
.first()
)
# create a pipeline config if it does not exist
if not pipeline_config:
pipeline_config = PipelineConfig(
pipeline_id=pipeline.id,
config=pipeline_info["arguments"],
config_hash=config_hash,
)
db.session.add(pipeline_config)

# create a new pipeline run
pipeline_run = PipelineRun(
pipeline_id=pipeline.id,
config_id=pipeline_config.id,
)

# get a list of all the paper directories in the feature directory
paper_dirs = [d for d in Path(feature_directory).iterdir() if d.is_dir()]

# for each subject directory, read the results.json file and the info.json file
pipeline_run_results = []
for paper_dir in paper_dirs:
with open(op.join(paper_dir, "results.json")) as f:
results = json.load(f)

with open(op.join(paper_dir, "info.json")) as f:
info = json.load(f)

# use the directory name as the base_study_id
base_study_id = paper_dir.name
# create a new result record
pipeline_run_results.append(
PipelineRunResult(
base_study_id=base_study_id,
data=results,
date_executed=parse_date(info["date"]),
file_inputs=info["inputs"],
run=pipeline_run,
)
)

db.session.add(pipeline_run)
db.session.add_all(pipeline_run_results)

db.session.commit()
8 changes: 8 additions & 0 deletions store/neurostore/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
AnnotationAnalysis,
PointValue,
AnalysisConditions,
Pipeline,
PipelineConfig,
PipelineRun,
PipelineRunResult,
)
from .auth import User, Role

Expand All @@ -31,4 +35,8 @@
"AnalysisConditions",
"User",
"Role",
"Pipeline",
"PipelineConfig",
"PipelineRun",
"PipelineRunResult",
]
54 changes: 54 additions & 0 deletions store/neurostore/models/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ class Study(BaseMixin, db.Model):
level = db.Column(db.String)
metadata_ = db.Column(JSONB)
source = db.Column(db.String, index=True)
base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True)
source_id = db.Column(db.String, index=True)
source_updated_at = db.Column(db.DateTime(timezone=True))
base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True)
Expand Down Expand Up @@ -538,6 +539,59 @@ class PointValue(BaseMixin, db.Model):
user = relationship("User", backref=backref("point_values", passive_deletes=True))


class Pipeline(BaseMixin, db.Model):
__tablename__ = "pipelines"

name = db.Column(db.String)
description = db.Column(db.String)
version = db.Column(db.String)
study_dependent = db.Column(db.Boolean, default=False)
ace_compatible = db.Column(db.Boolean, default=False)
pubget_compatible = db.Column(db.Boolean, default=False)
derived_from = db.Column(db.Text)


class PipelineConfig(BaseMixin, db.Model):
__tablename__ = "pipeline_configs"

pipeline_id = db.Column(
db.Text, db.ForeignKey("pipelines.id", ondelete="CASCADE"), index=True
)
config = db.Column(JSONB)
config_hash = db.Column(db.String, index=True)
pipeline = relationship(
"Pipeline", backref=backref("configs", passive_deletes=True)
)


class PipelineRun(BaseMixin, db.Model):
__tablename__ = "pipeline_runs"

pipeline_id = db.Column(
db.Text, db.ForeignKey("pipelines.id", ondelete="CASCADE"), index=True
)
config_id = db.Column(
db.Text, db.ForeignKey("pipeline_configs.id", ondelete="CASCADE"), index=True
)
config = relationship(
"PipelineConfig", backref=backref("runs", passive_deletes=True)
)
run_index = db.Column(db.Integer())


class PipelineRunResult(BaseMixin, db.Model):
__tablename__ = "pipeline_run_results"

run_id = db.Column(
db.Text, db.ForeignKey("pipeline_runs.id", ondelete="CASCADE"), index=True
)
base_study_id = db.Column(db.Text, db.ForeignKey("base_studies.id"), index=True)
date_executed = db.Column(db.DateTime(timezone=True))
data = db.Column(JSONB)
file_inputs = db.Column(JSONB)
run = relationship("PipelineRun", backref=backref("results", passive_deletes=True))


# from . import event_listeners # noqa E402

# del event_listeners
2 changes: 1 addition & 1 deletion store/neurostore/resources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ def search(self):
validate_search_query(s)
except errors.SyntaxError as e:
abort(400, description=e.args[0])
tsquery = func.to_tsquery('english', pubmed_to_tsquery(s))
tsquery = func.to_tsquery("english", pubmed_to_tsquery(s))
q = q.filter(m._ts_vector.op("@@")(tsquery))

# Alternatively (or in addition), search on individual fields.
Expand Down
52 changes: 49 additions & 3 deletions store/neurostore/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import pytest
import random
import json
from os import environ
from neurostore.models.data import Analysis, Condition
from sqlalchemy.orm import scoped_session, sessionmaker
Expand Down Expand Up @@ -588,6 +590,52 @@ def simple_neurosynth_annotation(session, ingest_neurosynth):
return smol_annot


@pytest.fixture(scope="function")
def create_demographic_features(session, ingest_neurosynth, tmp_path):
output_dir = tmp_path / "output" / "demographics" / "v1.0.0"
output_dir.mkdir(exist_ok=True, parents=True)
pipeline_info = {
"name": "demographics",
"version": "v1.0.0",
"description": "demographic features",
"study_dependent": False,
"ace_compatible": True,
"pubget_compatible": True,
"derived_from": None,
"arguments": {"parallel": 1},
}
with open(output_dir / "pipeline_info.json", "w") as f:
json.dump(pipeline_info, f)

studies = BaseStudy.query.all()
diseases = ["schizophrenia", "bipolar disorder", "depression", "healthy"]
studies_data = [
[
{"age": random.randint(18, 100), "group": group}
for group in random.sample(diseases, k=random.randint(1, 2))
]
for study in studies
]

for study, study_data in zip(studies, studies_data):
study_dir = output_dir / study.id
study_dir.mkdir(exist_ok=True, parents=True)
with open(study_dir / "results.json", "w") as f:
json.dump({"predictions": study_data}, f)
with open(study_dir / "info.json", "w") as f:
json.dump(
{
"inputs": {
f"/path/to/input/{study.id}.txt": f"md5{random.randint(0, 100)}"
},
"date": f"2021-01-{random.randint(1, 30)}",
},
f,
)

return output_dir


"""
Queries for testing
"""
Expand All @@ -603,9 +651,7 @@ def simple_neurosynth_annotation(session, ingest_neurosynth):
'OR ("ASD")) AND (("decision*" OR "Dec',
"Unmatched parentheses",
),
(
'smoking AND NOT memory', "Consecutive operators are not allowed"
)
("smoking AND NOT memory", "Consecutive operators are not allowed"),
]

valid_queries = [
Expand Down
6 changes: 6 additions & 0 deletions store/neurostore/tests/test_ingestion.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Test Ingestion Functions"""

from neurostore.ingest.extracted_features import ingest_feature


def test_ingest_ace(ingest_neurosynth, ingest_ace, session):
pass
Expand All @@ -11,3 +13,7 @@ def test_ingest_neurovault(ingest_neurovault, session):

def test_ingest_neuroquery(ingest_neuroquery, session):
pass


def test_ingest_features(create_demographic_features, session):
ingest_feature(create_demographic_features)
Loading