Skip to content

Commit fbe6900

Browse files
authored
Merge pull request #23 from lsst-sqre/tickets/DM-39648
DM-39648: Add kafkit.settings module
2 parents 8e8ed8e + 4eaa342 commit fbe6900

File tree

8 files changed

+258
-1
lines changed

8 files changed

+258
-1
lines changed

.prettierignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
changelog.d/_template.md

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# Change log
22

3+
<!-- scriv-insert-here -->
4+
35
## Unreleased
46

57
### New features
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
### Backwards-incompatible changes
2+
3+
-
4+
5+
### New features
6+
7+
- Added a new `kafkit.settings` module that provides a `KafkaConnectionSettings` class. this class uses Pydantic's BaseSettings to gather environment variables. Applications can use this settings to consistently configure their Kafka clients. `KafkaConnectionSettings` also provides a ready-to-use `SSLContext` for connecting to Kafka brokers over TLS.
8+
9+
### Bug fixes
10+
11+
-
12+
13+
### Other changes
14+
15+
- We're now using [scriv](https://scriv.readthedocs.io) to manage the changelog.

changelog.d/_template.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<!-- Delete the sections that don't apply -->
2+
{%- for cat in config.categories %}
3+
4+
### {{ cat }}
5+
6+
-
7+
{%- endfor %}

docs/api.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,8 @@ Kafkit API reference
1717
.. automodapi:: kafkit.registry.sansio
1818
:no-inheritance-diagram:
1919

20+
.. automodapi:: kafkit.settings
21+
:no-inheritance-diagram:
22+
2023
.. automodapi:: kafkit.ssl
2124
:no-inheritance-diagram:

docs/documenteer.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@ nitpick_ignore = [
2424
"py:class",
2525
"dataclasses_avroschema.avrodantic.AvroBaseModel",
2626
],
27+
[
28+
"py:class",
29+
"pydantic.env_settings.BaseSettings",
30+
],
31+
[
32+
"py:class",
33+
"pydantic.main.BaseModel",
34+
],
35+
[
36+
"py:class",
37+
"pydantic.utils.Representation",
38+
]
2739
]
2840

2941
[sphinx.intersphinx.projects]

pyproject.toml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ dynamic = ["version"]
3030
[project.optional-dependencies]
3131
aiohttp = ["aiohttp"]
3232
httpx = ["httpx"]
33-
pydantic = ["dataclasses-avroschema[pydantic]"]
33+
pydantic = ["pydantic", "dataclasses-avroschema[pydantic]"]
3434
dev = [
3535
# Testing
3636
"coverage[toml]",
@@ -140,3 +140,16 @@ strict_equality = true
140140
warn_redundant_casts = true
141141
warn_unreachable = true
142142
warn_unused_ignores = true
143+
144+
[tool.scriv]
145+
categories = [
146+
"Backwards-incompatible changes",
147+
"New features",
148+
"Bug fixes",
149+
"Other changes",
150+
]
151+
entry_title_template = "{{ version }} ({{ date.strftime('%Y-%m-%d') }})"
152+
format = "md"
153+
md_header_level = "2"
154+
new_fragment_template = "file:changelog.d/_template.md"
155+
skip_fragments = "_template.md"

src/kafkit/settings.py

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
"""Pydantic BaseSettings for configuring Kafka clients."""
2+
3+
from __future__ import annotations
4+
5+
from enum import Enum
6+
from pathlib import Path
7+
from ssl import SSLContext
8+
9+
from pydantic import BaseSettings, DirectoryPath, Field, FilePath, SecretStr
10+
11+
from .ssl import create_ssl_context
12+
13+
__all__ = [
14+
"KafkaConnectionSettings",
15+
"KafkaSecurityProtocol",
16+
"KafkaSaslMechanism",
17+
]
18+
19+
20+
class KafkaSecurityProtocol(str, Enum):
21+
"""Kafka security protocols understood by aiokafka."""
22+
23+
PLAINTEXT = "PLAINTEXT"
24+
"""Plain-text connection."""
25+
26+
SSL = "SSL"
27+
"""TLS-encrypted connection."""
28+
29+
30+
class KafkaSaslMechanism(str, Enum):
31+
"""Kafka SASL mechanisms understood by aiokafka."""
32+
33+
PLAIN = "PLAIN"
34+
"""Plain-text SASL mechanism."""
35+
36+
SCRAM_SHA_256 = "SCRAM-SHA-256"
37+
"""SCRAM-SHA-256 SASL mechanism."""
38+
39+
SCRAM_SHA_512 = "SCRAM-SHA-512"
40+
"""SCRAM-SHA-512 SASL mechanism."""
41+
42+
43+
class KafkaConnectionSettings(BaseSettings):
44+
"""Settings for connecting to Kafka."""
45+
46+
bootstrap_servers: str = Field(
47+
...,
48+
title="Kafka bootstrap servers",
49+
env="KAFKA_BOOTSTRAP_SERVERS",
50+
description=(
51+
"A comma-separated list of Kafka brokers to connect to. "
52+
"This should be a list of hostnames or IP addresses, "
53+
"each optionally followed by a port number, separated by "
54+
"commas. "
55+
"For example: `kafka-1:9092,kafka-2:9092,kafka-3:9092`."
56+
),
57+
)
58+
59+
security_protocol: KafkaSecurityProtocol = Field(
60+
KafkaSecurityProtocol.PLAINTEXT,
61+
env="KAFKA_SECURITY_PROTOCOL",
62+
description="The security protocol to use when connecting to Kafka.",
63+
)
64+
65+
cert_temp_dir: DirectoryPath | None = Field(
66+
None,
67+
env="KAFKA_CERT_TEMP_DIR",
68+
description=(
69+
"Temporary writable directory for concatenating certificates."
70+
),
71+
)
72+
73+
cluster_ca_path: FilePath | None = Field(
74+
None,
75+
title="Path to CA certificate file",
76+
env="KAFKA_SSL_CLUSTER_CAFILE",
77+
description=(
78+
"The path to the CA certificate file to use for verifying the "
79+
"broker's certificate. "
80+
"This is only needed if the broker's certificate is not signed "
81+
"by a CA trusted by the operating system."
82+
),
83+
)
84+
85+
client_ca_path: FilePath | None = Field(
86+
None,
87+
title="Path to client CA certificate file",
88+
env="KAFKA_SSL_CLIENT_CAFILE",
89+
description=(
90+
"The path to the client CA certificate file to use for "
91+
"authentication. "
92+
"This is only needed when the client certificate needs to be"
93+
"concatenated with the client CA certificate, which is common"
94+
"for Strimzi installations."
95+
),
96+
)
97+
98+
client_cert_path: FilePath | None = Field(
99+
None,
100+
title="Path to client certificate file",
101+
env="KAFKA_SSL_CLIENT_CERTFILE",
102+
description=(
103+
"The path to the client certificate file to use for "
104+
"authentication. "
105+
"This is only needed if the broker is configured to require "
106+
"SSL client authentication."
107+
),
108+
)
109+
110+
client_key_path: FilePath | None = Field(
111+
None,
112+
title="Path to client key file",
113+
env="KAFKA_SSL_CLIENT_KEYFILE",
114+
description=(
115+
"The path to the client key file to use for authentication. "
116+
"This is only needed if the broker is configured to require "
117+
"SSL client authentication."
118+
),
119+
)
120+
121+
client_key_password: SecretStr | None = Field(
122+
None,
123+
title="Password for client key file",
124+
env="KAFKA_SSL_CLIENT_KEY_PASSWORD",
125+
description=(
126+
"The password to use for decrypting the client key file. "
127+
"This is only needed if the client key file is encrypted."
128+
),
129+
)
130+
131+
sasl_mechanism: KafkaSaslMechanism | None = Field(
132+
KafkaSaslMechanism.PLAIN,
133+
title="SASL mechanism",
134+
env="KAFKA_SASL_MECHANISM",
135+
description=(
136+
"The SASL mechanism to use for authentication. "
137+
"This is only needed if SASL authentication is enabled."
138+
),
139+
)
140+
141+
sasl_username: str | None = Field(
142+
None,
143+
title="SASL username",
144+
env="KAFKA_SASL_USERNAME",
145+
description=(
146+
"The username to use for SASL authentication. "
147+
"This is only needed if SASL authentication is enabled."
148+
),
149+
)
150+
151+
sasl_password: SecretStr | None = Field(
152+
None,
153+
title="SASL password",
154+
env="KAFKA_SASL_PASSWORD",
155+
description=(
156+
"The password to use for SASL authentication. "
157+
"This is only needed if SASL authentication is enabled."
158+
),
159+
)
160+
161+
@property
162+
def ssl_context(self) -> SSLContext | None:
163+
"""An SSL context for connecting to Kafka with aiokafka, if the
164+
Kafka connection is configured to use SSL.
165+
"""
166+
if (
167+
self.security_protocol != KafkaSecurityProtocol.SSL
168+
or self.cluster_ca_path is None
169+
or self.client_cert_path is None
170+
or self.client_key_path is None
171+
):
172+
return None
173+
174+
# For type checking
175+
assert self.client_cert_path is not None
176+
assert self.cluster_ca_path is not None
177+
assert self.client_key_path is not None
178+
179+
client_cert_path = Path(self.client_cert_path)
180+
181+
if self.client_ca_path is not None:
182+
# Need to contatenate the client cert and CA certificates. This is
183+
# typical for Strimzi-based Kafka clusters.
184+
if self.cert_temp_dir is None:
185+
raise RuntimeError(
186+
"KAFKIT_KAFKA_CERT_TEMP_DIR must be set when "
187+
"a client CA certificate is provided."
188+
)
189+
client_ca = Path(self.client_ca_path).read_text()
190+
client_cert = Path(self.client_cert_path).read_text()
191+
if client_ca.endswith("\n"):
192+
sep = ""
193+
else:
194+
sep = "\n"
195+
new_client_cert = sep.join([client_cert, client_ca])
196+
new_client_cert_path = Path(self.cert_temp_dir) / "client.crt"
197+
new_client_cert_path.write_text(new_client_cert)
198+
client_cert_path = Path(new_client_cert_path)
199+
200+
return create_ssl_context(
201+
cluster_ca_path=Path(self.cluster_ca_path),
202+
client_cert_path=client_cert_path,
203+
client_key_path=Path(self.client_key_path),
204+
)

0 commit comments

Comments
 (0)