Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RegistryApi.register_schema always talks to Schema Registry when supplied a subject #12

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions src/kafkit/registry/sansio.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,11 @@ async def register_schema(
# look in cache first
try:
schema_id = self.schema_cache[schema]
return schema_id
# If subject, skip short circuit to give us a chance to register
# schema with new subjects in Schema registry and in subject cache
if subject is None:
return schema_id

except KeyError:
pass

Expand All @@ -459,6 +463,11 @@ async def register_schema(
# add to cache
self.schema_cache.insert(schema, result["id"])

# Fetch subject/schema_id mapping and add it to the cache
# Since we don't get subject version in previous result, we always
# do a second request to Schema Registry by fetching "latest"
await self.get_schema_by_subject(subject, "latest")

return result["id"]

async def get_schema_by_id(self, schema_id: int) -> Dict[str, Any]:
Expand Down Expand Up @@ -601,15 +610,31 @@ def __init__(
self.response_headers = headers if headers else self.DEFAULT_HEADERS
self.response_body = body

self.requests = [] # stores info about all requests made
self.counter = 0

async def _request(
self, method: str, url: str, headers: Mapping[str, str], body: bytes
) -> Any:
# reset if we've looped through all available responses
if isinstance(self.response_body, list) and self.counter == len(
self.response_body
):
self.counter = 0
self.requests.append(
{"method": method, "url": url, "headers": headers, "body": body}
)
self.method = method
self.url = url
self.headers = headers
self.body = body
response_headers = copy.deepcopy(self.response_headers)
return self.response_code, response_headers, self.response_body
if isinstance(self.response_body, list):
response_body = self.response_body[self.counter]
else:
response_body = self.response_body
self.counter += 1
return self.response_code, response_headers, response_body


class SchemaCache:
Expand Down
62 changes: 57 additions & 5 deletions tests/registry_sansio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,25 +195,35 @@ async def test_register_schema() -> None:
}

# Body that we expect the registry API to return given the request.
expected_body = json.dumps({"id": 1}).encode("utf-8")
expected_body = [
json.dumps({"id": 1}).encode("utf-8"),
json.dumps(
{
"subject": "test-schemas.schema1",
"version": 1,
"id": 1,
"schema": json.dumps(input_schema),
}
).encode("utf-8"),
]

client = MockRegistryApi(url="http://registry:8081", body=expected_body)
schema_id = await client.register_schema(input_schema)
assert schema_id == 1

# Test details of the request itself
assert client.method == "POST"
assert client.requests[0]["method"] == "POST"
assert (
client.url
client.requests[0]["url"]
== "http://registry:8081/subjects/test-schemas.schema1/versions"
)
sent_json = json.loads(client.body)
sent_json = json.loads(client.requests[0]["body"])
assert "schema" in sent_json
sent_schema = json.loads(sent_json["schema"])
assert "__fastavro_parsed" not in sent_schema
assert sent_schema["name"] == "test-schemas.schema1"

# Check that the schema is in the cache and is parsed
# Check that the schema is in the schema cache and is parsed
# Value of type "Union[int, Dict[str, Any]]" is not indexable
cached_schema = client.schema_cache[1]
assert cached_schema["name"] == "test-schemas.schema1"
Expand All @@ -224,6 +234,48 @@ async def test_register_schema() -> None:
assert new_schema_id == schema_id


@pytest.mark.asyncio
async def test_register_schema_with_different_subjects():
input_schema = {
"type": "record",
"name": "schema1",
"namespace": "test-schemas",
"fields": [{"name": "a", "type": "int"}],
}
# Responses returned from the MockRegistryApi, in turn
mock_responses = [
json.dumps({"id": 1}).encode("utf-8"),
json.dumps(
{
"subject": "subject1",
"version": 1,
"id": 1,
"schema": json.dumps(input_schema),
}
).encode("utf-8"),
json.dumps({"id": 1}).encode("utf-8"),
json.dumps(
{
"subject": "subject2",
"version": 1,
"id": 1,
"schema": json.dumps(input_schema),
}
).encode("utf-8"),
]

client = MockRegistryApi(url="http://registry:8081", body=mock_responses)
schema_id = await client.register_schema(input_schema, "subject1")
subject_cache_entry = client.subject_cache.get("subject1", 1)
assert schema_id == 1
assert subject_cache_entry["id"] == schema_id

await client.register_schema(input_schema, "subject2")
subject_cache_entry2 = client.subject_cache.get("subject2", 1)
assert schema_id == 1
assert subject_cache_entry2["id"] == schema_id


@pytest.mark.asyncio
async def test_get_schema_by_id() -> None:
"""Test the RegistryApi.get_schema_by_id method."""
Expand Down
26 changes: 24 additions & 2 deletions tests/registry_serializer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def test_unpacking_short_message() -> None:
@pytest.mark.asyncio
async def test_serializer() -> None:
"""Test the Serializer class."""
client = MockRegistryApi(body=json.dumps({"id": 1}).encode("utf-8"))
schema1 = {
"type": "record",
"name": "schema1",
Expand All @@ -48,6 +47,19 @@ async def test_serializer() -> None:
{"name": "b", "type": "string"},
],
}
expected_body = [
json.dumps({"id": 1}).encode("utf-8"),
json.dumps(
{
"subject": "test-schemas.schema1",
"version": 1,
"id": 1,
"schema": json.dumps(schema1),
}
).encode("utf-8"),
]
client = MockRegistryApi(body=expected_body)

serializer = await Serializer.register(registry=client, schema=schema1)
assert serializer.id == 1

Expand Down Expand Up @@ -186,7 +198,17 @@ async def test_polyserializer_given_schema() -> None:
],
}

body = json.dumps({"id": 1}).encode("utf-8")
body = [
json.dumps({"id": 1}).encode("utf-8"),
json.dumps(
{
"subject": "test-schemas.schema1",
"version": 1,
"id": 1,
"schema": json.dumps(schema),
}
).encode("utf-8"),
]
client = MockRegistryApi(body=body)

serializer = PolySerializer(registry=client)
Expand Down