From 1c80577c5c8f36ce8166f8d4792b2a7f6503b5ee Mon Sep 17 00:00:00 2001 From: Ulrik Date: Wed, 2 Dec 2020 14:32:55 +0100 Subject: [PATCH 1/4] Make MockRegistryApi support multiple requests This is a bit of a hack to be able to specify different responses for consecutive requests. If we want something more flexible than this, we should look at integrating something like "aioresponses" instead --- src/kafkit/registry/sansio.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/kafkit/registry/sansio.py b/src/kafkit/registry/sansio.py index 4a12c67..c7e06ad 100644 --- a/src/kafkit/registry/sansio.py +++ b/src/kafkit/registry/sansio.py @@ -601,15 +601,31 @@ def __init__( self.response_headers = headers if headers else self.DEFAULT_HEADERS self.response_body = body + self.requests = [] # stores info about all requests made + self.counter = 0 + async def _request( self, method: str, url: str, headers: Mapping[str, str], body: bytes ) -> Any: + # reset if we've looped through all available responses + if isinstance(self.response_body, list) and self.counter == len( + self.response_body + ): + self.counter = 0 + self.requests.append( + {"method": method, "url": url, "headers": headers, "body": body} + ) self.method = method self.url = url self.headers = headers self.body = body response_headers = copy.deepcopy(self.response_headers) - return self.response_code, response_headers, self.response_body + if isinstance(self.response_body, list): + response_body = self.response_body[self.counter] + else: + response_body = self.response_body + self.counter += 1 + return self.response_code, response_headers, response_body class SchemaCache: From 10736389f17d3d67a0060b38a03170bbbd5388c0 Mon Sep 17 00:00:00 2001 From: Ulrik Date: Wed, 2 Dec 2020 14:38:41 +0100 Subject: [PATCH 2/4] Add failing test for issue #4 --- tests/registry_sansio_test.py | 42 +++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/registry_sansio_test.py b/tests/registry_sansio_test.py index 82c3954..16fc2c1 100644 --- a/tests/registry_sansio_test.py +++ b/tests/registry_sansio_test.py @@ -224,6 +224,48 @@ async def test_register_schema() -> None: assert new_schema_id == schema_id +@pytest.mark.asyncio +async def test_register_schema_with_different_subjects(): + input_schema = { + "type": "record", + "name": "schema1", + "namespace": "test-schemas", + "fields": [{"name": "a", "type": "int"}], + } + # Responses returned from the MockRegistryApi, in turn + mock_responses = [ + json.dumps({"id": 1}).encode("utf-8"), + json.dumps( + { + "subject": "subject1", + "version": 1, + "id": 1, + "schema": json.dumps(input_schema), + } + ).encode("utf-8"), + json.dumps({"id": 1}).encode("utf-8"), + json.dumps( + { + "subject": "subject2", + "version": 1, + "id": 1, + "schema": json.dumps(input_schema), + } + ).encode("utf-8"), + ] + + client = MockRegistryApi(url="http://registry:8081", body=mock_responses) + schema_id = await client.register_schema(input_schema, "subject1") + subject_cache_entry = client.subject_cache.get("subject1", 1) + assert schema_id == 1 + assert subject_cache_entry["id"] == schema_id + + await client.register_schema(input_schema, "subject2") + subject_cache_entry2 = client.subject_cache.get("subject2", 1) + assert schema_id == 1 + assert subject_cache_entry2["id"] == schema_id + + @pytest.mark.asyncio async def test_get_schema_by_id() -> None: """Test the RegistryApi.get_schema_by_id method.""" From db94bdab75879a618a1fc3814500dd363af70d3a Mon Sep 17 00:00:00 2001 From: Ulrik Date: Wed, 2 Dec 2020 14:57:54 +0100 Subject: [PATCH 3/4] Don't short circuit register_schema when supplied a subject --- src/kafkit/registry/sansio.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/kafkit/registry/sansio.py b/src/kafkit/registry/sansio.py index c7e06ad..c1cc878 100644 --- a/src/kafkit/registry/sansio.py +++ b/src/kafkit/registry/sansio.py @@ -437,7 +437,11 @@ async def register_schema( # look in cache first try: schema_id = self.schema_cache[schema] - return schema_id + # If subject, skip short circuit to give us a chance to register + # schema with new subjects in Schema registry and in subject cache + if subject is None: + return schema_id + except KeyError: pass @@ -459,6 +463,11 @@ async def register_schema( # add to cache self.schema_cache.insert(schema, result["id"]) + # Fetch subject/schema_id mapping and add it to the cache + # Since we don't get subject version in previous result, we always + # do a second request to Schema Registry by fetching "latest" + await self.get_schema_by_subject(subject, "latest") + return result["id"] async def get_schema_by_id(self, schema_id: int) -> Dict[str, Any]: From 433c87eccc4f306ab4ab799f3f6b2127fa871b7a Mon Sep 17 00:00:00 2001 From: Ulrik Date: Wed, 2 Dec 2020 15:01:56 +0100 Subject: [PATCH 4/4] Fix failing existing tests Failed b.c we now make 2 requests in register_schema when using subjects --- tests/registry_sansio_test.py | 20 +++++++++++++++----- tests/registry_serializer_test.py | 26 ++++++++++++++++++++++++-- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/tests/registry_sansio_test.py b/tests/registry_sansio_test.py index 16fc2c1..6c31dd3 100644 --- a/tests/registry_sansio_test.py +++ b/tests/registry_sansio_test.py @@ -195,25 +195,35 @@ async def test_register_schema() -> None: } # Body that we expect the registry API to return given the request. - expected_body = json.dumps({"id": 1}).encode("utf-8") + expected_body = [ + json.dumps({"id": 1}).encode("utf-8"), + json.dumps( + { + "subject": "test-schemas.schema1", + "version": 1, + "id": 1, + "schema": json.dumps(input_schema), + } + ).encode("utf-8"), + ] client = MockRegistryApi(url="http://registry:8081", body=expected_body) schema_id = await client.register_schema(input_schema) assert schema_id == 1 # Test details of the request itself - assert client.method == "POST" + assert client.requests[0]["method"] == "POST" assert ( - client.url + client.requests[0]["url"] == "http://registry:8081/subjects/test-schemas.schema1/versions" ) - sent_json = json.loads(client.body) + sent_json = json.loads(client.requests[0]["body"]) assert "schema" in sent_json sent_schema = json.loads(sent_json["schema"]) assert "__fastavro_parsed" not in sent_schema assert sent_schema["name"] == "test-schemas.schema1" - # Check that the schema is in the cache and is parsed + # Check that the schema is in the schema cache and is parsed # Value of type "Union[int, Dict[str, Any]]" is not indexable cached_schema = client.schema_cache[1] assert cached_schema["name"] == "test-schemas.schema1" diff --git a/tests/registry_serializer_test.py b/tests/registry_serializer_test.py index 73e06c9..fdbf003 100644 --- a/tests/registry_serializer_test.py +++ b/tests/registry_serializer_test.py @@ -38,7 +38,6 @@ def test_unpacking_short_message() -> None: @pytest.mark.asyncio async def test_serializer() -> None: """Test the Serializer class.""" - client = MockRegistryApi(body=json.dumps({"id": 1}).encode("utf-8")) schema1 = { "type": "record", "name": "schema1", @@ -48,6 +47,19 @@ async def test_serializer() -> None: {"name": "b", "type": "string"}, ], } + expected_body = [ + json.dumps({"id": 1}).encode("utf-8"), + json.dumps( + { + "subject": "test-schemas.schema1", + "version": 1, + "id": 1, + "schema": json.dumps(schema1), + } + ).encode("utf-8"), + ] + client = MockRegistryApi(body=expected_body) + serializer = await Serializer.register(registry=client, schema=schema1) assert serializer.id == 1 @@ -186,7 +198,17 @@ async def test_polyserializer_given_schema() -> None: ], } - body = json.dumps({"id": 1}).encode("utf-8") + body = [ + json.dumps({"id": 1}).encode("utf-8"), + json.dumps( + { + "subject": "test-schemas.schema1", + "version": 1, + "id": 1, + "schema": json.dumps(schema), + } + ).encode("utf-8"), + ] client = MockRegistryApi(body=body) serializer = PolySerializer(registry=client)