Skip to content

Commit

Permalink
Merge pull request #826 from aaxelb/feature/perishable-metadata
Browse files Browse the repository at this point in the history
[ENG-6313] allow raw data to expire
  • Loading branch information
aaxelb authored Oct 17, 2024
2 parents 86b1a9b + 36ea734 commit 4fb6ed8
Show file tree
Hide file tree
Showing 15 changed files with 462 additions and 19 deletions.
1 change: 1 addition & 0 deletions how-to/use-the-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ query params:
- `record_identifier` (required): a source-specific identifier for the metadata record (no format restrictions) -- sending another record with the same `record_identifier` is considered a full update (only the most recent is used)
- `nonurgent`: if present (regardless of value), ingestion may be given a lower priority -- recommended for bulk or background operations
- `is_supplementary`: if present (regardless of value), this record's metadata will be added to all pre-existing index-cards from the same user with the same `focus_iri` (if any), but will not get an index-card of its own nor affect the last-updated timestamp (e.g. in OAI-PMH) of the index-cards it supplements
- `expiration_date`: optional date (in format `YYYY-MM-DD`) when the record is no longer valid and should be removed

## Deleting index-cards

Expand Down
4 changes: 4 additions & 0 deletions project/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ def split(string, delim):
'task': 'share.tasks.harvest',
'schedule': 120,
},
'Expel expired data': {
'task': 'trove.digestive_tract.task__expel_expired_data',
'schedule': crontab(hour=0, minute=0), # every day at midnight UTC
},
}

if not DEBUG:
Expand Down
4 changes: 1 addition & 3 deletions share/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
SourceUniqueIdentifier,
)
from trove import digestive_tract
from trove.models import Indexcard


class ShareAdminSite(admin.AdminSite):
Expand Down Expand Up @@ -292,8 +291,7 @@ def reingest(self, request, queryset):
def delete_cards_for_suid(self, request, queryset):
for suid in queryset:
FormattedMetadataRecord.objects.delete_formatted_records(suid)
for _indexcard in Indexcard.objects.filter(source_record_suid__in=queryset):
_indexcard.pls_delete()
digestive_tract.expel_suid(suid)

def get_search_results(self, request, queryset, search_term):
if not search_term:
Expand Down
18 changes: 18 additions & 0 deletions share/migrations/0075_rawdatum_expiration_date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2.25 on 2024-10-14 15:52

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('share', '0074_sourceuniqueidentifier_is_supplementary'),
]

operations = [
migrations.AddField(
model_name='rawdatum',
name='expiration_date',
field=models.DateField(blank=True, help_text='An (optional) date after which this datum is no longer valid.', null=True),
),
]
17 changes: 17 additions & 0 deletions share/migrations/0076_rawdatum_share_rawdatum_expiration_idx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from django.db import migrations, models
from django.contrib.postgres.operations import AddIndexConcurrently


class Migration(migrations.Migration):
atomic = False # allow adding indexes concurrently (without locking tables)

dependencies = [
('share', '0075_rawdatum_expiration_date'),
]

operations = [
AddIndexConcurrently(
model_name='rawdatum',
index=models.Index(fields=['expiration_date'], name='share_rawdatum_expiration_idx'),
),
]
27 changes: 25 additions & 2 deletions share/models/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,14 +355,23 @@ def store_data(self, config, fetch_result):

return rd

def store_datum_for_suid(self, *, suid, datum: str, mediatype, datestamp: datetime.datetime):
def store_datum_for_suid(
self,
*,
suid,
datum: str,
mediatype: str | None, # `None` indicates sharev2-legacy ingestion
datestamp: datetime.datetime,
expiration_date: datetime.date | None = None,
):
_raw, _raw_created = self.get_or_create(
suid=suid,
sha256=hashlib.sha256(datum.encode()).hexdigest(),
defaults={
'datum': datum,
'mediatype': mediatype,
'datestamp': datestamp,
'expiration_date': expiration_date,
},
)
if not _raw_created:
Expand All @@ -371,10 +380,11 @@ def store_datum_for_suid(self, *, suid, datum: str, mediatype, datestamp: dateti
logger.critical(_msg)
sentry_sdk.capture_message(_msg)
_raw.mediatype = mediatype
_raw.expiration_date = expiration_date
# keep the latest datestamp
if (not _raw.datestamp) or (datestamp > _raw.datestamp):
_raw.datestamp = datestamp
_raw.save(update_fields=('mediatype', 'datestamp'))
_raw.save(update_fields=('mediatype', 'datestamp', 'expiration_date'))
return _raw

def latest_by_suid_id(self, suid_id) -> models.QuerySet:
Expand Down Expand Up @@ -420,6 +430,11 @@ class RawDatum(models.Model):
'This may be, but is not limited to, a deletion, modification, publication, or creation datestamp. '
'Ideally, this datetime should be appropriate for determining the chronological order its data will be applied.'
))
expiration_date = models.DateField(
null=True,
blank=True,
help_text='An (optional) date after which this datum is no longer valid.',
)

date_modified = models.DateTimeField(auto_now=True, editable=False)
date_created = models.DateTimeField(auto_now_add=True, editable=False)
Expand Down Expand Up @@ -447,11 +462,19 @@ def is_latest(self):
.exists()
)

@property
def is_expired(self) -> bool:
return (
self.expiration_date is not None
and self.expiration_date <= datetime.date.today()
)

class Meta:
unique_together = ('suid', 'sha256')
verbose_name_plural = 'Raw Data'
indexes = [
models.Index(fields=['no_output'], name='share_rawda_no_outp_f0330f_idx'),
models.Index(fields=['expiration_date'], name='share_rawdatum_expiration_idx'),
]

class JSONAPIMeta(BaseJSONAPIMeta):
Expand Down
13 changes: 13 additions & 0 deletions tests/share/models/test_rawdata.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import pytest
import hashlib

Expand Down Expand Up @@ -65,3 +66,15 @@ def test_store_data_dedups_complex(self, source_config):
assert rd2.created is False
assert rd1.date_modified < rd2.date_modified
assert rd1.date_created == rd2.date_created

def test_is_expired(self):
rd = RawDatum()
assert rd.expiration_date is None
assert not rd.is_expired
_today = datetime.date.today()
rd.expiration_date = datetime.date(_today.year - 1, _today.month, _today.day)
assert rd.is_expired
rd.expiration_date = datetime.date(_today.year, _today.month, _today.day)
assert rd.is_expired
rd.expiration_date = datetime.date(_today.year + 1, _today.month, _today.day)
assert not rd.is_expired
207 changes: 207 additions & 0 deletions tests/trove/digestive_tract/test_expel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import datetime
from unittest import mock

from django.test import TestCase
from primitive_metadata import primitive_rdf as rdf

from share import models as share_db
from tests import factories
from trove import digestive_tract
from trove import models as trove_db


_BLARG = rdf.IriNamespace('https://blarg.example/')


class TestDigestiveTractExpel(TestCase):
@classmethod
def setUpTestData(cls):
cls.focus_1 = _BLARG.this1
cls.focus_2 = _BLARG.this2
cls.raw_1, cls.indexcard_1 = _setup_ingested(cls.focus_1)
cls.raw_2, cls.indexcard_2 = _setup_ingested(cls.focus_2)
cls.raw_supp = _setup_supplementary(cls.focus_1, cls.raw_1.suid, cls.indexcard_1)

def setUp(self):
super().setUp()
self.notified_indexcard_ids = set()
self.enterContext(mock.patch(
'share.search.index_messenger.IndexMessenger.notify_indexcard_update',
new=self._replacement_notify_indexcard_update,
))
self.mock_derive_task = self.enterContext(mock.patch('trove.digestive_tract.task__derive'))

def _replacement_notify_indexcard_update(self, indexcards, **kwargs):
self.notified_indexcard_ids.update(_card.id for _card in indexcards)

def enterContext(self, context_manager):
# TestCase.enterContext added in python3.11 -- implementing here until then
result = context_manager.__enter__()
self.addCleanup(lambda: context_manager.__exit__(None, None, None))
return result

def test_setup(self):
self.indexcard_1.refresh_from_db()
self.indexcard_2.refresh_from_db()
self.assertIsNone(self.indexcard_1.deleted)
self.assertIsNone(self.indexcard_2.deleted)
self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3)
self.assertEqual(share_db.RawDatum.objects.count(), 3)
self.assertIsNotNone(self.indexcard_1.latest_rdf)
self.assertIsNotNone(self.indexcard_2.latest_rdf)
self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0)
self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1)
self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1)
# neither notified indexes nor enqueued re-derive
self.assertEqual(self.notified_indexcard_ids, set())
self.mock_derive_task.delay.assert_not_called()

def test_expel(self):
with mock.patch('trove.digestive_tract.expel_suid') as _mock_expel_suid:
_user = self.raw_1.suid.source_config.source.user
digestive_tract.expel(from_user=_user, record_identifier=self.raw_1.suid.identifier)
_mock_expel_suid.assert_called_once_with(self.raw_1.suid)

def test_expel_suid(self):
digestive_tract.expel_suid(self.raw_1.suid)
self.indexcard_1.refresh_from_db()
self.indexcard_2.refresh_from_db()
self.assertIsNotNone(self.indexcard_1.deleted)
self.assertIsNone(self.indexcard_2.deleted)
self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3)
self.assertEqual(share_db.RawDatum.objects.count(), 3)
with self.assertRaises(trove_db.LatestIndexcardRdf.DoesNotExist):
self.indexcard_1.latest_rdf # deleted
self.assertIsNotNone(self.indexcard_2.latest_rdf)
self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1) # not deleted
self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 1) # not deleted
self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0)
self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 0) # deleted
self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1)
# notified indexes of update; did not enqueue re-derive
self.assertEqual(self.notified_indexcard_ids, {self.indexcard_1.id})
self.mock_derive_task.delay.assert_not_called()

def test_expel_supplementary_suid(self):
digestive_tract.expel_suid(self.raw_supp.suid)
self.indexcard_1.refresh_from_db()
self.indexcard_2.refresh_from_db()
self.assertIsNone(self.indexcard_1.deleted)
self.assertIsNone(self.indexcard_2.deleted)
self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3)
self.assertEqual(share_db.RawDatum.objects.count(), 3)
self.assertIsNotNone(self.indexcard_1.latest_rdf)
self.assertIsNotNone(self.indexcard_2.latest_rdf)
self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 0) # deleted
self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0)
self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1)
self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1)
# did not notify indexes of update; did enqueue re-derive
self.assertEqual(self.notified_indexcard_ids, set())
self.mock_derive_task.delay.assert_called_once_with(self.indexcard_1.id)

def test_expel_expired_task(self):
with mock.patch('trove.digestive_tract.expel_expired_data') as _mock_expel_expired:
digestive_tract.task__expel_expired_data.apply()
_mock_expel_expired.assert_called_once_with(datetime.date.today())

def test_expel_expired(self):
_today = datetime.date.today()
self.raw_2.expiration_date = _today
self.raw_2.save()
digestive_tract.expel_expired_data(_today)
self.indexcard_1.refresh_from_db()
self.indexcard_2.refresh_from_db()
self.assertIsNone(self.indexcard_1.deleted)
self.assertIsNotNone(self.indexcard_2.deleted) # marked deleted
self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3)
self.assertEqual(share_db.RawDatum.objects.count(), 3)
self.assertIsNotNone(self.indexcard_1.latest_rdf)
with self.assertRaises(trove_db.LatestIndexcardRdf.DoesNotExist):
self.indexcard_2.latest_rdf # deleted
self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1) # not deleted
self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0) # deleted
self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1)
self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 0) # deleted
# notified indexes of update; did not enqueue re-derive
self.assertEqual(self.notified_indexcard_ids, {self.indexcard_2.id})
self.mock_derive_task.delay.assert_not_called()

def test_expel_expired_supplement(self):
_today = datetime.date.today()
self.raw_supp.expiration_date = _today
self.raw_supp.save()
digestive_tract.expel_expired_data(_today)
self.indexcard_1.refresh_from_db()
self.indexcard_2.refresh_from_db()
self.assertIsNone(self.indexcard_1.deleted)
self.assertIsNone(self.indexcard_2.deleted)
self.assertEqual(share_db.SourceUniqueIdentifier.objects.count(), 3)
self.assertEqual(share_db.RawDatum.objects.count(), 3)
self.assertIsNotNone(self.indexcard_1.latest_rdf)
self.assertIsNotNone(self.indexcard_2.latest_rdf)
self.assertEqual(self.indexcard_1.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_2.archived_rdf_set.count(), 1)
self.assertEqual(self.indexcard_1.supplementary_rdf_set.count(), 0) # deleted
self.assertEqual(self.indexcard_2.supplementary_rdf_set.count(), 0)
self.assertEqual(self.indexcard_1.derived_indexcard_set.count(), 1)
self.assertEqual(self.indexcard_2.derived_indexcard_set.count(), 1)
# did not notify indexes of update; did enqueue re-derive
self.assertEqual(self.notified_indexcard_ids, set())
self.mock_derive_task.delay.assert_called_once_with(self.indexcard_1.id)


def _setup_ingested(focus_iri: str):
_focus_ident = trove_db.ResourceIdentifier.objects.get_or_create_for_iri(focus_iri)
_suid = factories.SourceUniqueIdentifierFactory(
focus_identifier=_focus_ident,
)
_raw = factories.RawDatumFactory(suid=_suid)
_indexcard = trove_db.Indexcard.objects.create(source_record_suid=_raw.suid)
_indexcard.focus_identifier_set.add(_focus_ident)
_latest_rdf = trove_db.LatestIndexcardRdf.objects.create(
indexcard=_indexcard,
from_raw_datum=_raw,
focus_iri=focus_iri,
rdf_as_turtle='...',
)
trove_db.ArchivedIndexcardRdf.objects.create(
indexcard=_indexcard,
from_raw_datum=_raw,
focus_iri=focus_iri,
rdf_as_turtle=_latest_rdf.rdf_as_turtle,
)
_deriver_iri = _BLARG.deriver
_deriver_ident = trove_db.ResourceIdentifier.objects.get_or_create_for_iri(_deriver_iri)
trove_db.DerivedIndexcard.objects.create(
upriver_indexcard=_indexcard,
deriver_identifier=_deriver_ident,
derived_checksum_iri='...',
derived_text='...',
)
return _raw, _indexcard


def _setup_supplementary(focus_iri, main_suid, indexcard):
_supp_suid = factories.SourceUniqueIdentifierFactory(
focus_identifier=main_suid.focus_identifier,
source_config=main_suid.source_config,
is_supplementary=True,
)
_supp_raw = factories.RawDatumFactory(suid=_supp_suid)
trove_db.SupplementaryIndexcardRdf.objects.create(
indexcard=indexcard,
from_raw_datum=_supp_raw,
supplementary_suid=_supp_suid,
focus_iri=focus_iri,
rdf_as_turtle='...',
)
return _supp_raw
12 changes: 12 additions & 0 deletions tests/trove/digestive_tract/test_extract.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import datetime
from django.test import TestCase
from primitive_metadata import primitive_rdf as rdf

from tests import factories
from trove import digestive_tract
from trove import exceptions as trove_exceptions
from trove import models as trove_db


Expand Down Expand Up @@ -128,3 +130,13 @@ def test_extract_empty_supplementary(self):
(_indexcard,) = digestive_tract.extract(_empty_raw)
self.assertEqual(_indexcard.id, _orig_indexcard.id)
self.assertFalse(_orig_indexcard.supplementary_rdf_set.exists())

def test_extract_expired(self):
self.raw.expiration_date = datetime.date.today()
with self.assertRaises(trove_exceptions.CannotDigestExpiredDatum):
digestive_tract.extract(self.raw)

def test_extract_expired_supplement(self):
self.supplementary_raw.expiration_date = datetime.date.today()
with self.assertRaises(trove_exceptions.CannotDigestExpiredDatum):
digestive_tract.extract(self.supplementary_raw)
Loading

0 comments on commit 4fb6ed8

Please sign in to comment.