From b99cbecd21ed419e1e54b21755f069879659b93f Mon Sep 17 00:00:00 2001 From: Ryan Barrett Date: Thu, 5 Oct 2023 12:39:50 -0700 Subject: [PATCH] datastore_storage: add new AtpRemoteBlob model get_or_create fetches a URL, calculates its CID, and stores it for #13 --- arroba/datastore_storage.py | 72 +++++++++++++++++++++++++- arroba/storage.py | 2 +- arroba/tests/test_datastore_storage.py | 27 +++++++++- arroba/tests/testutil.py | 5 +- arroba/util.py | 2 +- 5 files changed, 103 insertions(+), 5 deletions(-) diff --git a/arroba/datastore_storage.py b/arroba/datastore_storage.py index aad8ee3..381e486 100644 --- a/arroba/datastore_storage.py +++ b/arroba/datastore_storage.py @@ -1,6 +1,7 @@ """Google Cloud Datastore implementation of repo storage.""" import json import logging +import requests from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec @@ -164,7 +165,7 @@ def create(*, repo_did, data, seq): assert seq > 0 encoded = dag_cbor.encode(data) digest = multihash.digest(encoded, 'sha2-256') - cid = CID('base58btc', 1, multicodec.get('dag-cbor'), digest) + cid = CID('base58btc', 1, 'dag-cbor', digest) repo_key = ndb.Key(AtpRepo, repo_did) atp_block = AtpBlock.get_or_insert(cid.encode('base32'), repo=repo_key, @@ -256,6 +257,75 @@ def last(cls, nsid): return seq.next - 1 +class AtpRemoteBlob(ndb.Model): + """A blob available at a public HTTP URL that we don't store ourselves. + + Key ID is the URL. + + TODO: follow redirects, use final URL as key id + """ + cid = ndb.StringProperty(required=True) + size = ndb.IntegerProperty(required=True) + mime_type = ndb.StringProperty(required=True, default='application/octet-stream') + + created = ndb.DateTimeProperty(auto_now_add=True) + updated = ndb.DateTimeProperty(auto_now=True) + + @classmethod + @ndb.transactional() + def get_or_create(cls, *, url=None, cid=None, get_fn=requests.get): + """Returns a new or existing :class:`AtpRemoteBlob` for a given URL. + + If there isn't an existing :class:`AtpRemoteBlob`, fetches the URL over + the network and creates a new one for it. + + Args: + url (str) + cid (CID) + get_fn (callable): for making HTTP GET requests + + Returns: + AtpRemoteBlob: existing or newly created :class:`AtpRemoteBlob`, or + None if ``cid`` was provided and no stored :class:`AtpRemoteBlob` has + that CID. + """ + assert url or cid + + if url: + existing = cls.get_by_id(url) + if existing: + return existing + elif cid: + assert isinstance(cid, CID) + return cls.query(cls.cid == cid.encode('base32')).get() + + resp = get_fn(url) + resp.raise_for_status() + + digest = multihash.digest(resp.content, 'sha2-256') + cid = CID('base58btc', 1, 'raw', digest).encode('base32') + logger.info(f'Creating new AtpRemoteBlob for {url} CID {cid}') + blob = cls(id=url, cid=cid, mime_type=resp.headers.get('Content-Type'), + size=len(resp.content)) + blob.put() + return blob + + def as_ref(self): + """Returns an ATProto `ref` object for this blob. + + https://atproto.com/specs/data-model#blob-type + + Returns: + dict: ATProto `ref` + """ + return { + '$type': 'blob', + 'ref': self.cid, + 'mimeType': self.mime_type, + 'size': self.size, + } + + class DatastoreStorage(Storage): """Google Cloud Datastore implementation of :class:`Storage`. diff --git a/arroba/storage.py b/arroba/storage.py index 6cd3e5c..ccd760a 100644 --- a/arroba/storage.py +++ b/arroba/storage.py @@ -88,7 +88,7 @@ def __str__(self): def cid(self): if self._cid is None: digest = multihash.digest(self.encoded, 'sha2-256') - self._cid = CID('base58btc', 1, multicodec.get('dag-cbor'), digest) + self._cid = CID('base58btc', 1, 'dag-cbor', digest) return self._cid @property diff --git a/arroba/tests/test_datastore_storage.py b/arroba/tests/test_datastore_storage.py index 7cd2e1e..0d39185 100644 --- a/arroba/tests/test_datastore_storage.py +++ b/arroba/tests/test_datastore_storage.py @@ -1,5 +1,6 @@ """Unit tests for datastore_storage.py.""" import os +from unittest.mock import MagicMock, patch from google.cloud import ndb @@ -10,6 +11,7 @@ from ..datastore_storage import ( AtpBlock, + AtpRemoteBlob, AtpRepo, AtpSequence, DatastoreStorage, @@ -20,7 +22,7 @@ from ..util import dag_cbor_cid, new_key, next_tid from . import test_repo -from .testutil import DatastoreTest +from .testutil import DatastoreTest, requests_response CIDS = [ CID.decode('bafyreie5cvv4h45feadgeuwhbcutmh6t2ceseocckahdoe6uat64zmz454'), @@ -204,3 +206,26 @@ def test_apply_commit(self): atp_repo = AtpRepo.get_by_id('did:web:user.com') self.assertEqual(cid, CID.decode(atp_repo.head)) + + def test_create_remote_blob(self): + mock_get = MagicMock(return_value=requests_response( + 'blob contents', headers={'Content-Type': 'foo/bar'})) + cid = 'bafkreicqpqncshdd27sgztqgzocd3zhhqnnsv6slvzhs5uz6f57cq6lmtq' + + blob = AtpRemoteBlob.get_or_create(url='http://blob', get_fn=mock_get) + mock_get.assert_called_with('http://blob') + self.assertEqual({ + '$type': 'blob', + 'ref': cid, + 'mimeType': 'foo/bar', + 'size': 13, + }, blob.as_ref()) + + mock_get.reset_mock() + got = AtpRemoteBlob.get_or_create(url='http://blob') + self.assertEqual(blob, got) + mock_get.assert_not_called() + + got = AtpRemoteBlob.get_or_create(cid=CID.decode(cid)) + self.assertEqual(blob, got) + mock_get.assert_not_called() diff --git a/arroba/tests/testutil.py b/arroba/tests/testutil.py index 7c3b751..60415c7 100644 --- a/arroba/tests/testutil.py +++ b/arroba/tests/testutil.py @@ -38,7 +38,7 @@ os.environ.setdefault('DATASTORE_EMULATOR_HOST', 'localhost:8089') -def requests_response(body, status=200): +def requests_response(body, status=200, headers=None): """ Args: body: dict or list, JSON response @@ -54,6 +54,9 @@ def requests_response(body, status=200): else: resp._text = body + if headers: + resp.headers.update(headers) + resp._content = resp._text.encode() resp.encoding = 'utf-8' resp.status_code = status diff --git a/arroba/util.py b/arroba/util.py index e278c11..96665f9 100644 --- a/arroba/util.py +++ b/arroba/util.py @@ -63,7 +63,7 @@ def dag_cbor_cid(obj): """ encoded = dag_cbor.encode(obj) digest = multihash.digest(encoded, 'sha2-256') - return CID('base58btc', 1, multicodec.get('dag-cbor'), digest) + return CID('base58btc', 1, 'dag-cbor', digest) def s32encode(num):