Skip to content

Commit 773afa8

Browse files
chore: add retry to tests requiring forwarding
1 parent 04dc4c7 commit 773afa8

File tree

4 files changed

+177
-54
lines changed

4 files changed

+177
-54
lines changed

tests/integration/conftest.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
)
3232
from tests.integration.utils.network import PortRangeInclusive
3333
from tests.integration.utils.process import stop_process, wait_for_port_subprocess
34+
from tests.integration.utils.rest_client import RetryRestClient
3435
from tests.integration.utils.synchronization import lock_path_for
3536
from tests.integration.utils.zookeeper import configure_and_start_zk
3637
from tests.utils import repeat_until_successful_request
@@ -576,6 +577,11 @@ async def fixture_registry_async_client(
576577
await client.close()
577578

578579

580+
@pytest.fixture(scope="function", name="registry_async_retry_client")
581+
async def fixture_registry_async_retry_client(registry_async_client: Client) -> RetryRestClient:
582+
return RetryRestClient(registry_async_client)
583+
584+
579585
@pytest.fixture(scope="function", name="credentials_folder")
580586
def fixture_credentials_folder() -> str:
581587
integration_test_folder = os.path.dirname(__file__)
@@ -715,6 +721,11 @@ async def fixture_registry_async_client_auth(
715721
await client.close()
716722

717723

724+
@pytest.fixture(scope="function", name="registry_async_retry_client_auth")
725+
async def fixture_registry_async_retry_client_auth(registry_async_client_auth: Client) -> RetryRestClient:
726+
return RetryRestClient(registry_async_client_auth)
727+
728+
718729
@pytest.fixture(scope="function", name="registry_async_auth_pair")
719730
async def fixture_registry_async_auth_pair(
720731
request: SubRequest,

tests/integration/test_master_coordinator.py

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@
88
from karapace.coordinator.master_coordinator import MasterCoordinator
99
from tests.integration.utils.kafka_server import KafkaServers
1010
from tests.integration.utils.network import PortRangeInclusive
11+
from tests.integration.utils.rest_client import RetryRestClient
1112
from tests.utils import new_random_name
1213

1314
import asyncio
1415
import json
1516
import pytest
16-
import requests
1717

1818

1919
async def init_admin(config):
@@ -195,7 +195,10 @@ async def test_no_eligible_master(kafka_servers: KafkaServers, port_range: PortR
195195
await mc.close()
196196

197197

198-
async def test_schema_request_forwarding(registry_async_pair):
198+
async def test_schema_request_forwarding(
199+
registry_async_pair,
200+
registry_async_retry_client: RetryRestClient,
201+
) -> None:
199202
master_url, slave_url = registry_async_pair
200203
max_tries, counter = 5, 0
201204
wait_time = 0.5
@@ -209,11 +212,11 @@ async def test_schema_request_forwarding(registry_async_pair):
209212
else:
210213
path = "config"
211214
for compat in ["FULL", "BACKWARD", "FORWARD", "NONE"]:
212-
resp = requests.put(f"{slave_url}/{path}", json={"compatibility": compat})
215+
resp = await registry_async_retry_client.put(f"{slave_url}/{path}", json={"compatibility": compat})
213216
assert resp.ok
214217
while True:
215218
assert counter < max_tries, "Compat update not propagated"
216-
resp = requests.get(f"{master_url}/{path}")
219+
resp = await registry_async_retry_client.get(f"{master_url}/{path}")
217220
if not resp.ok:
218221
print(f"Invalid http status code: {resp.status_code}")
219222
continue
@@ -232,14 +235,16 @@ async def test_schema_request_forwarding(registry_async_pair):
232235

233236
# New schema updates, last compatibility is None
234237
for s in [schema, other_schema]:
235-
resp = requests.post(f"{slave_url}/subjects/{subject}/versions", json={"schema": json.dumps(s)})
238+
resp = await registry_async_retry_client.post(
239+
f"{slave_url}/subjects/{subject}/versions", json={"schema": json.dumps(s)}
240+
)
236241
assert resp.ok
237242
data = resp.json()
238243
assert "id" in data, data
239244
counter = 0
240245
while True:
241246
assert counter < max_tries, "Subject schema data not propagated yet"
242-
resp = requests.get(f"{master_url}/subjects/{subject}/versions")
247+
resp = await registry_async_retry_client.get(f"{master_url}/subjects/{subject}/versions")
243248
if not resp.ok:
244249
print(f"Invalid http status code: {resp.status_code}")
245250
counter += 1
@@ -255,12 +260,14 @@ async def test_schema_request_forwarding(registry_async_pair):
255260
break
256261

257262
# Schema deletions
258-
resp = requests.delete(f"{slave_url}/subjects/{subject}/versions/1")
263+
resp = await registry_async_retry_client.delete(f"{slave_url}/subjects/{subject}/versions/1")
259264
assert resp.ok
260265
counter = 0
261266
while True:
262267
assert counter < max_tries, "Subject version deletion not propagated yet"
263-
resp = requests.get(f"{master_url}/subjects/{subject}/versions/1")
268+
resp = await registry_async_retry_client.get(
269+
f"{master_url}/subjects/{subject}/versions/1", expected_response_code=404
270+
)
264271
if resp.ok:
265272
print(f"Subject {subject} still has version 1 on master")
266273
counter += 1
@@ -270,16 +277,16 @@ async def test_schema_request_forwarding(registry_async_pair):
270277
break
271278

272279
# Subject deletion
273-
resp = requests.get(f"{master_url}/subjects/")
280+
resp = await registry_async_retry_client.get(f"{master_url}/subjects/")
274281
assert resp.ok
275282
data = resp.json()
276283
assert subject in data
277-
resp = requests.delete(f"{slave_url}/subjects/{subject}")
284+
resp = await registry_async_retry_client.delete(f"{slave_url}/subjects/{subject}")
278285
assert resp.ok
279286
counter = 0
280287
while True:
281288
assert counter < max_tries, "Subject deletion not propagated yet"
282-
resp = requests.get(f"{master_url}/subjects/")
289+
resp = await registry_async_retry_client.get(f"{master_url}/subjects/")
283290
if not resp.ok:
284291
print("Could not retrieve subject list on master")
285292
counter += 1

tests/integration/test_schema_registry_auth.py

Lines changed: 67 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
Copyright (c) 2023 Aiven Ltd
55
See LICENSE for details
66
"""
7-
from karapace.client import Client
87
from karapace.kafka.admin import KafkaAdminClient
98
from karapace.schema_models import SchemaType, ValidatedTypedSchema
9+
from tests.integration.utils.rest_client import RetryRestClient
1010
from tests.utils import (
1111
new_random_name,
1212
new_topic,
@@ -29,139 +29,157 @@
2929
reader = aiohttp.BasicAuth("reader", "secret")
3030

3131

32-
async def test_sr_auth(registry_async_client_auth: Client) -> None:
32+
async def test_sr_auth(registry_async_retry_client_auth: RetryRestClient) -> None:
3333
subject = new_random_name("cave-")
3434

35-
res = await registry_async_client_auth.post(f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json})
35+
res = await registry_async_retry_client_auth.post(
36+
f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}, expected_response_code=401
37+
)
3638
assert res.status_code == 401
3739

38-
res = await registry_async_client_auth.post(
40+
res = await registry_async_retry_client_auth.post(
3941
f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}, auth=aladdin
4042
)
4143
assert res.status_code == 200
4244
sc_id = res.json()["id"]
4345
assert sc_id >= 0
4446

45-
res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/latest")
47+
res = await registry_async_retry_client_auth.get(
48+
f"subjects/{quote(subject)}/versions/latest", expected_response_code=401
49+
)
4650
assert res.status_code == 401
47-
res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/latest", auth=aladdin)
51+
res = await registry_async_retry_client_auth.get(f"subjects/{quote(subject)}/versions/latest", auth=aladdin)
4852
assert res.status_code == 200
4953
assert sc_id == res.json()["id"]
5054
assert ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json) == ValidatedTypedSchema.parse(
5155
SchemaType.AVRO, res.json()["schema"]
5256
)
5357

5458

55-
async def test_sr_auth_endpoints(registry_async_client_auth: Client) -> None:
59+
async def test_sr_auth_endpoints(registry_async_retry_client_auth: RetryRestClient) -> None:
5660
"""Test endpoints for authorization"""
5761

5862
subject = new_random_name("any-")
5963

60-
res = await registry_async_client_auth.post(
61-
f"compatibility/subjects/{quote(subject)}/versions/1", json={"schema": schema_avro_json}
64+
res = await registry_async_retry_client_auth.post(
65+
f"compatibility/subjects/{quote(subject)}/versions/1",
66+
json={"schema": schema_avro_json},
67+
expected_response_code=401,
6268
)
6369
assert res.status_code == 401
6470

65-
res = await registry_async_client_auth.get(f"config/{quote(subject)}")
71+
res = await registry_async_retry_client_auth.get(f"config/{quote(subject)}", expected_response_code=401)
6672
assert res.status_code == 401
6773

68-
res = await registry_async_client_auth.put(f"config/{quote(subject)}", json={"compatibility": "NONE"})
74+
res = await registry_async_retry_client_auth.put(
75+
f"config/{quote(subject)}",
76+
json={"compatibility": "NONE"},
77+
expected_response_code=401,
78+
)
6979
assert res.status_code == 401
7080

71-
res = await registry_async_client_auth.get("config")
81+
res = await registry_async_retry_client_auth.get("config", expected_response_code=401)
7282
assert res.status_code == 401
7383

74-
res = await registry_async_client_auth.put("config", json={"compatibility": "NONE"})
84+
res = await registry_async_retry_client_auth.put("config", json={"compatibility": "NONE"}, expected_response_code=401)
7585
assert res.status_code == 401
7686

77-
res = await registry_async_client_auth.get("schemas/ids/1/versions")
87+
res = await registry_async_retry_client_auth.get("schemas/ids/1/versions", expected_response_code=401)
7888
assert res.status_code == 401
7989

8090
# This is an exception that does not require authorization
81-
res = await registry_async_client_auth.get("schemas/types")
91+
res = await registry_async_retry_client_auth.get("schemas/types")
8292
assert res.status_code == 200
8393

8494
# but let's verify it answers normally if sending authorization header
85-
res = await registry_async_client_auth.get("schemas/types", auth=admin)
95+
res = await registry_async_retry_client_auth.get("schemas/types", auth=admin)
8696
assert res.status_code == 200
8797

88-
res = await registry_async_client_auth.post(f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json})
98+
res = await registry_async_retry_client_auth.post(
99+
f"subjects/{quote(subject)}/versions", json={"schema": schema_avro_json}, expected_response_code=401
100+
)
89101
assert res.status_code == 401
90102

91-
res = await registry_async_client_auth.delete(f"subjects/{quote(subject)}/versions/1")
103+
res = await registry_async_retry_client_auth.delete(f"subjects/{quote(subject)}/versions/1", expected_response_code=401)
92104
assert res.status_code == 401
93105

94-
res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/1/schema")
106+
res = await registry_async_retry_client_auth.get(
107+
f"subjects/{quote(subject)}/versions/1/schema", expected_response_code=401
108+
)
95109
assert res.status_code == 401
96110

97-
res = await registry_async_client_auth.get(f"subjects/{quote(subject)}/versions/1/referencedby")
111+
res = await registry_async_retry_client_auth.get(
112+
f"subjects/{quote(subject)}/versions/1/referencedby", expected_response_code=401
113+
)
98114
assert res.status_code == 401
99115

100-
res = await registry_async_client_auth.delete(f"subjects/{quote(subject)}")
116+
res = await registry_async_retry_client_auth.delete(f"subjects/{quote(subject)}", expected_response_code=401)
101117
assert res.status_code == 401
102118

103-
res = await registry_async_client_auth.get("mode")
119+
res = await registry_async_retry_client_auth.get("mode", expected_response_code=401)
104120
assert res.status_code == 401
105121

106-
res = await registry_async_client_auth.get(f"mode/{quote(subject)}")
122+
res = await registry_async_retry_client_auth.get(f"mode/{quote(subject)}", expected_response_code=401)
107123
assert res.status_code == 401
108124

109125

110-
async def test_sr_list_subjects(registry_async_client_auth: Client) -> None:
126+
async def test_sr_list_subjects(registry_async_retry_client_auth: RetryRestClient) -> None:
111127
cavesubject = new_random_name("cave-")
112128
carpetsubject = new_random_name("carpet-")
113129

114-
res = await registry_async_client_auth.post(
130+
res = await registry_async_retry_client_auth.post(
115131
f"subjects/{quote(cavesubject)}/versions", json={"schema": schema_avro_json}, auth=aladdin
116132
)
117133
assert res.status_code == 200
118134
sc_id = res.json()["id"]
119135
assert sc_id >= 0
120136

121-
res = await registry_async_client_auth.post(
137+
res = await registry_async_retry_client_auth.post(
122138
f"subjects/{quote(carpetsubject)}/versions", json={"schema": schema_avro_json}, auth=admin
123139
)
124140
assert res.status_code == 200
125141

126-
res = await registry_async_client_auth.get("subjects", auth=admin)
142+
res = await registry_async_retry_client_auth.get("subjects", auth=admin)
127143
assert res.status_code == 200
128144
assert [cavesubject, carpetsubject] == res.json()
129145

130-
res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions")
146+
res = await registry_async_retry_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", expected_response_code=401)
131147
assert res.status_code == 401
132148

133-
res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=admin)
149+
res = await registry_async_retry_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=admin)
134150
assert res.status_code == 200
135151
assert [sc_id] == res.json()
136152

137-
res = await registry_async_client_auth.get("subjects", auth=aladdin)
153+
res = await registry_async_retry_client_auth.get("subjects", auth=aladdin)
138154
assert res.status_code == 200
139155
assert [cavesubject] == res.json()
140156

141-
res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=aladdin)
157+
res = await registry_async_retry_client_auth.get(
158+
f"subjects/{quote(carpetsubject)}/versions", auth=aladdin, expected_response_code=403
159+
)
142160
assert res.status_code == 403
143161

144-
res = await registry_async_client_auth.get("subjects", auth=reader)
162+
res = await registry_async_retry_client_auth.get("subjects", auth=reader)
145163
assert res.status_code == 200
146164
assert [carpetsubject] == res.json()
147165

148-
res = await registry_async_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=reader)
166+
res = await registry_async_retry_client_auth.get(f"subjects/{quote(carpetsubject)}/versions", auth=reader)
149167
assert res.status_code == 200
150168
assert [1] == res.json()
151169

152170

153-
async def test_sr_ids(registry_async_client_auth: Client) -> None:
171+
async def test_sr_ids(registry_async_retry_client_auth: RetryRestClient) -> None:
154172
cavesubject = new_random_name("cave-")
155173
carpetsubject = new_random_name("carpet-")
156174

157-
res = await registry_async_client_auth.post(
175+
res = await registry_async_retry_client_auth.post(
158176
f"subjects/{quote(cavesubject)}/versions", json={"schema": schema_avro_json}, auth=aladdin
159177
)
160178
assert res.status_code == 200
161179
avro_sc_id = res.json()["id"]
162180
assert avro_sc_id >= 0
163181

164-
res = await registry_async_client_auth.post(
182+
res = await registry_async_retry_client_auth.post(
165183
f"subjects/{quote(carpetsubject)}/versions",
166184
json={"schemaType": "JSON", "schema": schema_jsonschema_json},
167185
auth=admin,
@@ -170,30 +188,34 @@ async def test_sr_ids(registry_async_client_auth: Client) -> None:
170188
jsonschema_sc_id = res.json()["id"]
171189
assert jsonschema_sc_id >= 0
172190

173-
res = await registry_async_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=aladdin)
191+
res = await registry_async_retry_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=aladdin)
174192
assert res.status_code == 200
175193

176-
res = await registry_async_client_auth.get(f"schemas/ids/{jsonschema_sc_id}", auth=aladdin)
194+
res = await registry_async_retry_client_auth.get(
195+
f"schemas/ids/{jsonschema_sc_id}", auth=aladdin, expected_response_code=404
196+
)
177197
assert res.status_code == 404
178198
assert {"error_code": 40403, "message": "Schema not found"} == res.json()
179199

180-
res = await registry_async_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=reader)
200+
res = await registry_async_retry_client_auth.get(f"schemas/ids/{avro_sc_id}", auth=reader, expected_response_code=404)
181201
assert res.status_code == 404
182202
assert {"error_code": 40403, "message": "Schema not found"} == res.json()
183203

184-
res = await registry_async_client_auth.get(f"schemas/ids/{jsonschema_sc_id}", auth=reader)
204+
res = await registry_async_retry_client_auth.get(f"schemas/ids/{jsonschema_sc_id}", auth=reader)
185205
assert res.status_code == 200
186206

187207

188-
async def test_sr_auth_forwarding(registry_async_auth_pair: List[str]) -> None:
208+
async def test_sr_auth_forwarding(
209+
registry_async_auth_pair: List[str], registry_async_retry_client_auth: RetryRestClient
210+
) -> None:
189211
auth = requests.auth.HTTPBasicAuth("admin", "admin")
190212

191213
# Test primary/replica forwarding with global config setting
192214
primary_url, replica_url = registry_async_auth_pair
193215
max_tries, counter = 5, 0
194216
wait_time = 0.5
195217
for compat in ["FULL", "BACKWARD", "FORWARD", "NONE"]:
196-
resp = requests.put(f"{replica_url}/config", json={"compatibility": compat}, auth=auth)
218+
resp = await registry_async_retry_client_auth.put(f"{replica_url}/config", json={"compatibility": compat}, auth=auth)
197219
assert resp.ok
198220
while True:
199221
assert counter < max_tries, "Compat update not propagated"
@@ -213,7 +235,9 @@ async def test_sr_auth_forwarding(registry_async_auth_pair: List[str]) -> None:
213235

214236

215237
# Test that Kafka REST API works when configured with Schema Registry requiring authorization
216-
async def test_rest_api_with_sr_auth(rest_async_client_registry_auth: Client, admin_client: KafkaAdminClient) -> None:
238+
async def test_rest_api_with_sr_auth(
239+
rest_async_client_registry_auth: RetryRestClient, admin_client: KafkaAdminClient
240+
) -> None:
217241
client = rest_async_client_registry_auth
218242

219243
topic = new_topic(admin_client, prefix="cave-rest-")

0 commit comments

Comments
 (0)