Skip to content

Commit

Permalink
Change id calculation to new convention and add known_id property
Browse files Browse the repository at this point in the history
Add get_schema_id to Schema class

Update unit tests
  • Loading branch information
bsmartradio committed Aug 23, 2024
1 parent bcc9fc0 commit ee93394
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 39 deletions.
32 changes: 14 additions & 18 deletions python/lsst/alert/packet/bin/syncAllSchemasToRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import argparse
import json
import re
import fastavro
import requests

Expand All @@ -36,7 +35,7 @@ def parse_args():
"--schema-registry-url",
type=str,
default="http://alert-schemas.localhost",
help="URL of a Schema Registry service",
help="URL of a Confluent Schema Registry service",
)
parser.add_argument(
"--subject",
Expand All @@ -47,38 +46,35 @@ def parse_args():
return parser.parse_args()


def upload_schema(registry_url, subject, schema_registry):
def upload_schemas(registry_url, subject, schema_registry):
"""Parse schema registry and upload all schemas.
"""
for version in schema_registry.known_versions:
schema = schema_registry.get_by_version(version)
numbers = re.findall(r'\d+', version)
numbers[1] = str(numbers[1]).zfill(2)
version_number = int(''.join(numbers))
for schema_id in schema_registry.known_ids:
schema = schema_registry.get_by_id(schema_id)
normalized_schema = fastavro.schema.to_parsing_canonical_form(
schema.definition)
confluent_schema = {"version": version_number,
"id": version_number, "schema": normalized_schema}
confluent_schema = {"version": schema_id,
"id": schema_id, "schema": normalized_schema}
payload = json.dumps(confluent_schema)
headers = {"Content-Type": "application/vnd.schemaregistry.v1+json"}
url = f"{registry_url}/subjects/{subject}/versions"
print(f"uploading schema to {url}")
response = requests.post(url=url, data=payload, headers=headers)
# response.raise_for_status()
response.raise_for_status()
print(f"done, status={response.status_code}")
print(f"response text={response.text}")


def delete_schema(registry_url, subject):
"""Delete schema and then remake it in import mode"""
def clear_schema_registry_for_import(registry_url, subject):
"""Delete schemas in the registry and then remake it in import mode"""
# Define the URLs
url_mode = f"{registry_url}/mode/{subject}"
url_schemas = f"{registry_url}/subjects/{subject}"
url_schema_versions = f"{registry_url}/subjects/{subject}/versions"
response = requests.get(url_schema_versions)

# Schema registry must be empty to put it in import mode. If it exists,
# remove it and remkae the schema. If not, continue.
# remove it and remake the schema. If not, continue.
if response.status_code == 200:
print('The schema will be deleted and remade in import mode.')
response = requests.delete(url_schemas)
Expand All @@ -105,7 +101,7 @@ def delete_schema(registry_url, subject):
print('Response Text:', response.text)


def close_schema(registry_url, subject):
def close_schema_registry(registry_url, subject):
"""Return the schema registry from import mode to readwrite.
"""
data = {
Expand All @@ -126,14 +122,14 @@ def close_schema(registry_url, subject):

def main():
args = parse_args()
delete_schema(args.schema_registry_url, args.subject)
clear_schema_registry_for_import(args.schema_registry_url, args.subject)
schema_registry = lsst.alert.packet.schemaRegistry.SchemaRegistry().all_schemas_from_filesystem()
upload_schema(
upload_schemas(
args.schema_registry_url,
subject=args.subject,
schema_registry=schema_registry
)
close_schema(args.schema_registry_url, args.subject)
close_schema_registry(args.schema_registry_url, args.subject)


if __name__ == "__main__":
Expand Down
14 changes: 13 additions & 1 deletion python/lsst/alert/packet/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@

import io
import tempfile
import re
import fastavro
from importlib import resources
from pathlib import PurePath

from lsst.resources import ResourcePath

import fastavro

__all__ = ["get_schema_root", "get_latest_schema_version", "get_schema_path",
"Schema", "get_path_to_latest_schema", "get_schema_root_uri",
Expand Down Expand Up @@ -428,3 +430,13 @@ def from_file(cls, filename=None):
if schema['name'] == root_name)

return cls(schema_definition)

def get_schema_id(self):
"""Retrieve the schema id used in the schema registry.
"""
numbers = re.findall(r'\d+', self.definition['name'])
assert (len(numbers) == 2)
numbers[1] = str(numbers[1]).zfill(2)
schema_id = int(''.join(numbers))

return schema_id
30 changes: 11 additions & 19 deletions python/lsst/alert/packet/schemaRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Provide a lookup table for alert schemas.
"""Provide a lookup table for versioned alert schemas.
"""

import json
import os
import zlib

__all__ = ["SchemaRegistry"]

Expand All @@ -38,6 +36,7 @@ class SchemaRegistry:
def __init__(self):
self._version_to_id = {}
self._id_to_schema = {}
self._ids = []

def register_schema(self, schema, version):
"""Register a new schema in the registry.
Expand All @@ -61,8 +60,9 @@ def register_schema(self, schema, version):
schema_id : `int`
The ID that has been allocated to the schema.
"""
schema_id = self.calculate_id(schema)
schema_id = schema.get_schema_id()
self._version_to_id[version] = schema_id
self._ids.append(schema_id)
self._id_to_schema[schema_id] = schema
return schema_id

Expand Down Expand Up @@ -108,27 +108,19 @@ def known_versions(self):
"""
return set(self._version_to_id)

@staticmethod
def calculate_id(schema):
"""Calculate an ID for the given schema.
Parameters
----------
schema : `lsst.alert.packet.Schema`
Schema for which an ID will be derived.
@property
def known_ids(self):
"""Return all the schema ids tracked by this registry.
Returns
-------
schema_id : `int`
The calculated ID.
schemas : `list` of `int`
List of schema ids.
"""
# Significant risk of collisions with more than a few schemas;
# CRC32 is ok for prototyping but isn't sensible in production.
return zlib.crc32(json.dumps(schema.definition,
sort_keys=True).encode('utf-8'))
return set(self._ids)

@classmethod
def from_filesystem(cls, root=None, schema_root="lsst.v7_0.alert"):
def from_filesystem(cls, root=None, schema_root="lsst.v7_1.alert"):
"""Populate a schema registry based on the filesystem.
Walk the directory tree from the root provided, locating files named
Expand Down
6 changes: 5 additions & 1 deletion test/test_schemaRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def write_schema(root_dir, filename, version_major, version_minor):
# Generate a new schema for each version to avoid ID collisions.
schema = {
"name": "example",
"namespace": "lsst",
"namespace": f"lsst.v{version_major}_{version_minor}",
"type": "record",
"fields": [
{"name": "field%s%s" % (version_major, version_minor),
Expand Down Expand Up @@ -75,3 +75,7 @@ def test_from_filesystem(self):
for version in versions:
registry.get_by_version(version)
self.assertRaises(KeyError, registry.get_by_version, "2.2")

for id in registry.known_ids:
registry.get_by_id(id)
self.assertRaises(KeyError, registry.get_by_id, "202")

0 comments on commit ee93394

Please sign in to comment.