From e8e7bd8de782683bb6a066760cb082f0be8faba6 Mon Sep 17 00:00:00 2001 From: Brianna Smart Date: Mon, 12 Aug 2024 13:46:07 -0700 Subject: [PATCH] Update schema registry upload --- .../packet/bin/syncLatestSchemaToRegistry.py | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/python/lsst/alert/packet/bin/syncLatestSchemaToRegistry.py b/python/lsst/alert/packet/bin/syncLatestSchemaToRegistry.py index 6eee395..d95203e 100644 --- a/python/lsst/alert/packet/bin/syncLatestSchemaToRegistry.py +++ b/python/lsst/alert/packet/bin/syncLatestSchemaToRegistry.py @@ -47,26 +47,16 @@ def parse_args(): return parser.parse_args() -def all_schemas(): - """Load in all schemas""" - schema_registry = lsst.alert.packet.schemaRegistry.SchemaRegistry().all_schemas_from_filesystem() - version_numbers = [] +def upload_schema(registry_url, subject, schema_registry): + """Parse schema registry and upload all schemas.""" for version in schema_registry.known_versions: schema = schema_registry.get_by_version(version) - numbers = re.findall(r'\d+', version) - # Join the numbers into a single string - version_numbers.append(int(''.join(numbers))) - - normalized_schemas = fastavro.schema.to_parsing_canonical_form(schema.definition) - return normalized_schemas, version_numbers - - -def upload_schema(registry_url, subject, schemas, version_numbers): - - for i, normalized_schema in enumerate(schemas): + version_number = re.findall(r'\d+', version) + normalized_schema = fastavro.schema.to_parsing_canonical_form( + schema.definition) normalized_schema["subject"] = normalized_schema["version"] confluent_schema = {"schemaType": "Avro", "version": 1, - "id": version_numbers[i], "schema": normalized_schema} + "id": version_number, "schema": normalized_schema} payload = json.dumps(confluent_schema) headers = {"Content-Type": "application/vnd.schemaregistry.v1+json"} url = f"{registry_url}/subjects/{subject}/versions" @@ -79,12 +69,11 @@ def upload_schema(registry_url, subject, schemas, version_numbers): def main(): args = parse_args() - normalized_schemas, version_numbers = all_schemas() + schema_registry = lsst.alert.packet.schemaRegistry.SchemaRegistry().all_schemas_from_filesystem() upload_schema( args.schema_registry_url, subject=args.subject, - schemas=normalized_schemas, - version_numbers=version_numbers + schema_registry=schema_registry )