Skip to content

Commit

Permalink
Update schema registry upload
Browse files Browse the repository at this point in the history
  • Loading branch information
bsmartradio committed Aug 12, 2024
1 parent 905d7fe commit e8e7bd8
Showing 1 changed file with 8 additions and 19 deletions.
27 changes: 8 additions & 19 deletions python/lsst/alert/packet/bin/syncLatestSchemaToRegistry.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,16 @@ def parse_args():
return parser.parse_args()


def all_schemas():
"""Load in all schemas"""
schema_registry = lsst.alert.packet.schemaRegistry.SchemaRegistry().all_schemas_from_filesystem()
version_numbers = []
def upload_schema(registry_url, subject, schema_registry):
"""Parse schema registry and upload all schemas."""
for version in schema_registry.known_versions:
schema = schema_registry.get_by_version(version)
numbers = re.findall(r'\d+', version)
# Join the numbers into a single string
version_numbers.append(int(''.join(numbers)))

normalized_schemas = fastavro.schema.to_parsing_canonical_form(schema.definition)
return normalized_schemas, version_numbers


def upload_schema(registry_url, subject, schemas, version_numbers):

for i, normalized_schema in enumerate(schemas):
version_number = re.findall(r'\d+', version)
normalized_schema = fastavro.schema.to_parsing_canonical_form(
schema.definition)
normalized_schema["subject"] = normalized_schema["version"]
confluent_schema = {"schemaType": "Avro", "version": 1,
"id": version_numbers[i], "schema": normalized_schema}
"id": version_number, "schema": normalized_schema}
payload = json.dumps(confluent_schema)
headers = {"Content-Type": "application/vnd.schemaregistry.v1+json"}
url = f"{registry_url}/subjects/{subject}/versions"
Expand All @@ -79,12 +69,11 @@ def upload_schema(registry_url, subject, schemas, version_numbers):

def main():
args = parse_args()
normalized_schemas, version_numbers = all_schemas()
schema_registry = lsst.alert.packet.schemaRegistry.SchemaRegistry().all_schemas_from_filesystem()
upload_schema(
args.schema_registry_url,
subject=args.subject,
schemas=normalized_schemas,
version_numbers=version_numbers
schema_registry=schema_registry
)


Expand Down

0 comments on commit e8e7bd8

Please sign in to comment.