Skip to content

Commit

Permalink
Use: select & async fetch functions from BaseHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
ahdamin committed Oct 11, 2024
1 parent aad7b0d commit 8fd9f21
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 63 deletions.
11 changes: 5 additions & 6 deletions tests/database/filters/test_plate_filters.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Module to test the plate filters."""

from sqlalchemy.future import select
from sqlalchemy.orm import Query

from genotype_api.database.filters.plate_filters import (
Expand All @@ -16,13 +17,12 @@ async def test_filter_plates_by_id(base_store: Store, test_plate: Plate):
# GIVEN a store with a plate

# WHEN filtering plates by id
query: Query = base_store._get_query(Plate)
query: Query = select(Plate)
filter_functions = filter_plates_by_id(entry_id=test_plate.id, plates=query)
filtered_query = apply_plate_filter(
plates=query, filter_functions=filter_functions, entry_id=test_plate.id
)
result = await base_store.session.execute(filtered_query)
plate: Plate = result.scalars().first()
plate: Plate = await base_store.fetch_first_row(filtered_query)

# THEN the plate is returned
assert plate
Expand All @@ -34,13 +34,12 @@ async def test_filter_plates_by_plate_id(base_store: Store, test_plate: Plate):
# GIVEN a store with a plate

# WHEN filtering plates by plate id
query: Query = base_store._get_query(Plate)
query: Query = select(Plate)
filter_functions = filter_plates_by_plate_id(plate_id=test_plate.id, plates=query)
filtered_query = apply_plate_filter(
plates=query, filter_functions=filter_functions, plate_id=test_plate.id
)
result = await base_store.session.execute(filtered_query)
plate: Plate = result.scalars().first()
plate: Plate = await base_store.fetch_first_row(filtered_query)

# THEN the plate is returned
assert plate
Expand Down
89 changes: 39 additions & 50 deletions tests/database/filters/test_sample_filters.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Module to test the sample filters."""

from sqlalchemy.future import select
from sqlalchemy.orm import Query

from genotype_api.database.filters.sample_filters import (
Expand All @@ -21,13 +22,9 @@ async def test_filter_samples_by_id(base_store: Store, test_sample: Sample):
# GIVEN a store with a sample

# WHEN filtering samples by id
query: Query = base_store._get_query(Sample)
filter_functions = filter_samples_by_id(sample_id=test_sample.id, samples=query)
filtered_query = apply_sample_filter(
samples=query, filter_functions=filter_functions, sample_id=test_sample.id
)
result = await base_store.session.execute(filtered_query)
sample: Sample = result.scalars().first()
query: Query = select(Sample)
filtered_query = filter_samples_by_id(sample_id=test_sample.id, samples=query)
sample: Sample = await base_store.fetch_first_row(filtered_query)

# THEN the sample is returned
assert sample
Expand All @@ -39,13 +36,12 @@ async def test_filter_samples_contain_id(base_store: Store, test_sample: Sample)
# GIVEN a store with a sample

# WHEN filtering samples by id
query: Query = base_store._get_query(Sample)
query: Query = select(Sample)
filter_functions = filter_samples_contain_id(sample_id=test_sample.id, samples=query)
filtered_query = apply_sample_filter(
samples=query, filter_functions=filter_functions, sample_id=test_sample.id
)
result = await base_store.session.execute(filtered_query)
sample: Sample = result.scalars().first()
sample: Sample = await base_store.fetch_first_row(filtered_query)

# THEN the sample is returned
assert sample
Expand All @@ -55,16 +51,16 @@ async def test_filter_samples_contain_id(base_store: Store, test_sample: Sample)
async def test_filter_samples_contain_id_when_no_id(base_store: Store, test_sample: Sample):
"""Test filtering samples by id when no id is provided."""
# GIVEN a store with two samples
assert len(await base_store._get_query(Sample).all()) == 2
query: Query = select(Sample)
samples: list[Sample] = await base_store.fetch_all_rows(query)
assert len(samples) == 2

# WHEN filtering samples by id
query: Query = base_store._get_query(Sample)
filter_functions = filter_samples_contain_id(sample_id=None, samples=query)
filtered_query = apply_sample_filter(
samples=query, filter_functions=filter_functions, sample_id=None
)
result = await base_store.session.execute(filtered_query)
samples: list[Sample] = result.scalars().all()
samples: list[Sample] = await base_store.fetch_all_rows(filtered_query)

# THEN all samples are returned
assert len(samples) == 2
Expand All @@ -75,18 +71,18 @@ async def test_filter_samples_having_comment(
):
"""Test filtering samples by having comment."""
# GIVEN a store with samples having a comment and one without
assert len(await base_store._get_query(Sample).all()) == 2
query: Query = select(Sample)
samples: list[Sample] = await base_store.fetch_all_rows(query)
assert len(samples) == 2
sample_without_comment: Sample = test_sample
sample_without_comment.comment = None
sample_without_comment.id = "sample_without_status"
await helpers.ensure_sample(store=base_store, sample=sample_without_comment)

# WHEN filtering samples by having comment
query: Query = base_store._get_query(Sample)
filter_functions = filter_samples_having_comment(samples=query, is_commented=True)
filtered_query = apply_sample_filter(samples=query, filter_functions=filter_functions)
result = await base_store.session.execute(filtered_query)
samples: list[Sample] = result.scalars().all()
samples: list[Sample] = await base_store.fetch_all_rows(filtered_query)

# THEN samples with comments are returned
assert samples
Expand All @@ -97,14 +93,14 @@ async def test_filter_samples_having_comment(
async def test_filter_samples_having_comment_none_provided(base_store: Store, test_sample: Sample):
"""Test filtering samples by having comment."""
# GIVEN a store with samples having a comment and one without
assert len(await base_store._get_query(Sample).all()) == 2
query: Query = select(Sample)
samples: list[Sample] = await base_store.fetch_all_rows(query)
assert len(samples) == 2

# WHEN filtering samples by having comment
query: Query = base_store._get_query(Sample)
filter_functions = filter_samples_having_comment(samples=query, is_commented=None)
filtered_query = apply_sample_filter(samples=query, filter_functions=filter_functions)
result = await base_store.session.execute(filtered_query)
samples: list[Sample] = result.scalars().all()
samples: list[Sample] = await base_store.fetch_all_rows(filtered_query)

# THEN the sample is returned
assert len(samples) == 2
Expand All @@ -121,11 +117,9 @@ async def test_filter_samples_without_status(
await helpers.ensure_sample(store=base_store, sample=sample_without_status)

# WHEN filtering samples by having a status
query: Query = base_store._get_query(Sample)
filter_functions = filter_samples_without_status(samples=query, is_missing=True)
filtered_query = apply_sample_filter(samples=query, filter_functions=filter_functions)
result = await base_store.session.execute(filtered_query)
samples: list[Sample] = result.scalars().all()
query: Query = select(Sample)
filtered_query = filter_samples_without_status(samples=query, is_missing=True)
samples: list[Sample] = await base_store.fetch_all_rows(filtered_query)

# THEN no sample is returned
assert samples
Expand All @@ -136,14 +130,15 @@ async def test_filter_samples_without_status(
async def test_filter_samples_without_status_none_provided(base_store: Store, test_sample: Sample):
"""Test filtering samples by having status."""
# GIVEN a store with a sample that has a status
assert len(await base_store._get_query(Sample).all()) == 2
query: Query = select(Sample)
samples: list[Sample] = await base_store.fetch_all_rows(query)
assert len(samples) == 2

# WHEN filtering samples by having a status
query: Query = base_store._get_query(Sample)
query: Query = select(Sample)
filter_functions = filter_samples_without_status(samples=query, is_missing=None)
filtered_query = apply_sample_filter(samples=query, filter_functions=filter_functions)
result = await base_store.session.execute(filtered_query)
samples: list[Sample] = result.scalars().all()
samples: list[Sample] = await base_store.fetch_all_rows(filtered_query)

# THEN all samples are returned
assert len(samples) == 2
Expand All @@ -156,13 +151,9 @@ async def test_filter_samples_analysed_on_plate(
# GIVEN a store with analysed samples

# WHEN filtering samples analysed on a plate
query: Query = base_store._get_join_analysis_on_sample()
filter_functions = filter_samples_analysed_on_plate(samples=query, plate_id=test_plate.id)
filtered_query = apply_sample_filter(
samples=query, filter_functions=filter_functions, plate_id=test_plate.id
)
result = await base_store.session.execute(filtered_query)
sample: Sample = result.scalars().first()
query: Query = base_store._get_samples_with_analyses()
filtered_query = filter_samples_analysed_on_plate(samples=query, plate_id=test_plate.id)
sample: Sample = await base_store.fetch_first_row(filtered_query)

# THEN one sample is returned
assert sample.analyses[0].plate_id == test_plate.id
Expand All @@ -174,16 +165,14 @@ async def test_filter_samples_analysed_on_plate_none_provided(
):
"""Test filtering samples by having comment."""
# GIVEN a store with analysed samples
assert len(await base_store._get_query(Sample).all()) == 2
query: Query = select(Sample)
samples: list[Sample] = await base_store.fetch_all_rows(query)
assert len(samples) == 2

# WHEN filtering samples analysed on a plate
query: Query = base_store._get_join_analysis_on_sample()
filter_functions = filter_samples_analysed_on_plate(samples=query, plate_id=None)
filtered_query = apply_sample_filter(
samples=query, filter_functions=filter_functions, plate_id=None
)
result = await base_store.session.execute(filtered_query)
samples: list[Sample] = result.scalars().all()
query: Query = base_store._get_samples_with_analyses()
filtered_query = filter_samples_analysed_on_plate(samples=query, plate_id=None)
samples: list[Sample] = await base_store.fetch_all_rows(filtered_query)

# THEN all samples are returned
assert len(samples) == 2
Expand All @@ -192,13 +181,13 @@ async def test_filter_samples_analysed_on_plate_none_provided(
async def test_add_skip_and_limit(base_store: Store, test_sample: Sample):
"""Test add_skip_and_limit function."""
# GIVEN a store with two samples
assert len(await base_store._get_query(Sample).all()) == 2
query: Query = select(Sample)
samples: list[Sample] = await base_store.fetch_all_rows(query)
assert len(samples) == 2

# WHEN adding skip and limit to the query
query: Query = base_store._get_query(Sample)
filtered_query = add_skip_and_limit(query, skip=0, limit=1)
result = await base_store.session.execute(filtered_query)
samples: list[Sample] = result.scalars().all()
samples: list[Sample] = await base_store.fetch_all_rows(filtered_query)

# THEN one sample is returned
assert samples
Expand Down
14 changes: 7 additions & 7 deletions tests/database/filters/test_snp_filters.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Module to test the SNP filters."""

from sqlalchemy.future import select
from sqlalchemy.orm import Query

from genotype_api.database.filters.snp_filters import (
Expand All @@ -17,13 +18,12 @@ async def test_filter_snps_by_id(base_store: Store, test_snp: SNP):
# GIVEN a store with a SNP

# WHEN filtering a SNP by id
query: Query = base_store._get_query(SNP)
query: Query = select(SNP)
filter_functions = filter_snps_by_id(snp_id=test_snp.id, snps=query)
filtered_query = apply_snp_filter(
snps=query, filter_functions=filter_functions, snp_id=test_snp.id
)
result = await base_store.session.execute(filtered_query)
snp: SNP = result.scalars().first()
snp: SNP = base_store.fetch_first_row(filtered_query)

# THEN the SNP is returned
assert snp
Expand All @@ -34,13 +34,13 @@ async def test_add_skip_and_limit(base_store: Store, test_snp: SNP):
"""Test add_skip_and_limit function."""

# GIVEN a store with two SNPs
assert len(await base_store._get_query(SNP).all()) == 2
query: Query = select(SNP)
snps: list[SNP] = await base_store.fetch_all_rows(query)
assert len(snps) == 2

# WHEN adding skip and limit to the query
query: Query = base_store._get_query(SNP)
filtered_query = add_skip_and_limit(query, skip=0, limit=1)
result = await base_store.session.execute(filtered_query)
snps: list[SNP] = result.scalars().all()
snps: list[SNP] = base_store.fetch_all_rows(filtered_query)

# THEN one SNP is returned
assert snps
Expand Down

0 comments on commit 8fd9f21

Please sign in to comment.