Skip to content
Open
Comment thread
mcdgavin marked this conversation as resolved.
Comment thread
mcdgavin marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def __init__(
numerals: bool = False,
mip_opt_out: bool = False,
vad_events: bool = True,
extra_headers: NotGivenOr[dict[str, str]] = NOT_GIVEN,
# deprecated
keyterms: NotGivenOr[list[str]] = NOT_GIVEN,
) -> None:
Expand Down Expand Up @@ -135,9 +136,13 @@ def __init__(
mip_opt_out: Whether to take part in the model improvement program
vad_events: Whether to enable VAD (Voice Activity Detection) events.
When enabled, SpeechStarted events are sent when speech is detected. Defaults to True.
extra_headers: Additional HTTP headers sent on every connection, merged over the
default ``Authorization: Token`` header. Useful for self-hosted Deepgram or
gateways with custom auth. When no API key is set, these become the sole auth.

Raises:
ValueError: If no API key is provided or found in environment variables.
ValueError: If no API key is provided or found in environment variables
(unless ``extra_headers`` is supplied).

Note:
The api_key must be set either through the constructor argument or by setting
Expand All @@ -154,12 +159,17 @@ def __init__(
)

deepgram_api_key = api_key if is_given(api_key) else os.environ.get("DEEPGRAM_API_KEY")
if not deepgram_api_key:
extra = dict(extra_headers) if is_given(extra_headers) else {}
if not deepgram_api_key and not extra:
raise ValueError(
"Deepgram API key is required, either as argument or set"
" DEEPGRAM_API_KEY environment variable"
)
self._api_key = deepgram_api_key
# default Token auth only when a key is present; extra_headers merge on top (and are
# the sole auth when no key is set, e.g. the Cloudflare AI Gateway)
self._connect_headers = (
{"Authorization": f"Token {deepgram_api_key}"} if deepgram_api_key else {}
) | extra

model = _validate_model(model, language)
if is_given(keyterms):
Expand Down Expand Up @@ -203,6 +213,70 @@ def model(self) -> str:
def provider(self) -> str:
return "Deepgram"

@staticmethod
def with_cloudflare(
*,
model: DeepgramModels | str = "nova-3",
account_id: str | None = None,
gateway_id: str = "default",
cf_aig_token: str | None = None,
base_url: str | None = None,
language: DeepgramLanguages | str = "en-US",
interim_results: bool = True,
sample_rate: int = 16000,
http_session: aiohttp.ClientSession | None = None,
) -> STT:
"""Create a Deepgram STT routed through the Cloudflare AI Gateway.

Connects to the gateway's ``workers-ai`` WebSocket, which proxies Deepgram's
streaming protocol. Auth uses the ``cf-aig-authorization`` header; no Deepgram
API key is required.

Args:
model: Deepgram model name (e.g. ``"nova-3"``); the ``@cf/deepgram/`` prefix is
added automatically. A value already prefixed with ``@cf/`` is used as-is.
account_id: Cloudflare account ID. Falls back to ``CLOUDFLARE_ACCOUNT_ID``.
Required unless ``base_url`` is given.
gateway_id: Gateway name. Defaults to ``"default"``.
cf_aig_token: Gateway token for ``cf-aig-authorization``. Falls back to
``CLOUDFLARE_AI_GATEWAY_TOKEN``.
base_url: Full gateway endpoint; overrides ``account_id`` / ``gateway_id``.
language: Recognition language, forwarded to ``STT``. Defaults to ``"en-US"``.
interim_results: Whether to emit interim transcripts, forwarded to ``STT``.
sample_rate: Audio sample rate in Hz, forwarded to ``STT``.
http_session: Optional aiohttp session, forwarded to ``STT``.
"""
cf_aig_token = cf_aig_token or os.environ.get("CLOUDFLARE_AI_GATEWAY_TOKEN")
if not cf_aig_token:
raise ValueError(
"Cloudflare AI Gateway token is required, either as argument or set"
" CLOUDFLARE_AI_GATEWAY_TOKEN environment variable"
)
if base_url is None:
account_id = account_id or os.environ.get("CLOUDFLARE_ACCOUNT_ID")
if not account_id:
raise ValueError(
"Cloudflare account_id is required, either as argument or set"
" CLOUDFLARE_ACCOUNT_ID environment variable (or pass base_url directly)"
)
base_url = f"https://gateway.ai.cloudflare.com/v1/{account_id}/{gateway_id}/workers-ai"

if not model.startswith("@cf/"):
model = f"@cf/deepgram/{model}"
Comment thread
mcdgavin marked this conversation as resolved.

return STT(
model=model,
language=language,
interim_results=interim_results,
sample_rate=sample_rate,
base_url=base_url,
http_session=http_session,
# explicit empty key opts out of the DEEPGRAM_API_KEY env fallback, so the gateway
# only ever receives cf-aig-authorization (no stray Authorization: Token header)
api_key="",
extra_headers={"cf-aig-authorization": cf_aig_token},
)
Comment thread
mcdgavin marked this conversation as resolved.

def _ensure_session(self) -> aiohttp.ClientSession:
if not self._session:
self._session = utils.http_context.http_session()
Expand Down Expand Up @@ -243,7 +317,7 @@ async def _recognize_impl(
url=_to_deepgram_url(recognize_config, self._opts.endpoint_url, websocket=False),
data=rtc.combine_audio_frames(buffer).to_wav_bytes(),
headers={
"Authorization": f"Token {self._api_key}",
**self._connect_headers,
"Accept": "application/json",
"Content-Type": "audio/wav",
},
Expand Down Expand Up @@ -280,9 +354,9 @@ def stream(
stt=self,
conn_options=conn_options,
opts=config,
api_key=self._api_key,
http_session=self._ensure_session(),
base_url=self._opts.endpoint_url,
connect_headers=self._connect_headers,
)
self._streams.add(stream)
return stream
Expand Down Expand Up @@ -403,9 +477,9 @@ def __init__(
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions,
api_key: str,
http_session: aiohttp.ClientSession,
base_url: str,
connect_headers: dict[str, str],
) -> None:
if opts.detect_language or opts.language is None:
raise ValueError(
Expand All @@ -415,7 +489,7 @@ def __init__(

super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate)
self._opts = opts
self._api_key = api_key
self._connect_headers = dict(connect_headers)
self._session = http_session
self._opts.endpoint_url = base_url
self._speaking = False
Expand Down Expand Up @@ -671,7 +745,7 @@ async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse:
ws = await asyncio.wait_for(
self._session.ws_connect(
_to_deepgram_url(live_config, base_url=self._opts.endpoint_url, websocket=True),
headers={"Authorization": f"Token {self._api_key}"},
headers=self._connect_headers,
),
self._conn_options.timeout,
)
Expand Down Expand Up @@ -841,6 +915,12 @@ def prerecorded_transcription_to_speech_event(
)


def _bare_model(model: DeepgramModels | str) -> str:
# Cloudflare AI Gateway routes Deepgram models as "@cf/deepgram/<model>"; strip the routing
# prefix so model-name checks see the underlying Deepgram model (e.g. "nova-3").
return model.removeprefix("@cf/deepgram/")


def _validate_model(
model: DeepgramModels | str, language: NotGivenOr[DeepgramLanguages | str]
) -> DeepgramModels | str:
Expand All @@ -855,11 +935,14 @@ def _validate_model(
"nova-2-drivethru",
"nova-2-automotive",
}
if is_given(language) and language not in ("en-US", "en") and model in en_only_models:
bare = _bare_model(model)
if is_given(language) and language not in ("en-US", "en") and bare in en_only_models:
logger.warning(
f"{model} does not support language {language}, falling back to nova-2-general"
)
return "nova-2-general"
# preserve any routing prefix (e.g. "@cf/deepgram/") on the fallback model
prefix = model[: len(model) - len(bare)]
return f"{prefix}nova-2-general"
return model


Expand All @@ -880,13 +963,13 @@ def _validate_keyterm(
Validating keyterm and keywords for model compatibility.
See: https://developers.deepgram.com/docs/keyterm and https://developers.deepgram.com/docs/keywords
"""
if model.startswith("nova-3") and is_given(keywords):
if _bare_model(model).startswith("nova-3") and is_given(keywords):
raise ValueError(
"Keywords is only available for use with Nova-2, Nova-1, Enhanced, and "
"Base speech to text models. For Nova-3, use Keyterm Prompting."
)

if is_given(keyterm) and (not model.startswith("nova-3")):
if is_given(keyterm) and (not _bare_model(model).startswith("nova-3")):
raise ValueError(
"Keyterm Prompting is only available for transcription using the Nova-3 Model. "
"To boost recognition of keywords using another model, use the Keywords feature."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class _TTSOptions:
sample_rate: int
word_tokenizer: tokenize.WordTokenizer
base_url: str
api_key: str
mip_opt_out: bool = False


Expand All @@ -55,6 +54,7 @@ def __init__(
word_tokenizer: NotGivenOr[tokenize.WordTokenizer] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
mip_opt_out: bool = False,
extra_headers: NotGivenOr[dict[str, str]] = NOT_GIVEN,
) -> None:
"""
Create a new instance of Deepgram TTS.
Expand All @@ -67,6 +67,9 @@ def __init__(
base_url (str): Base URL for Deepgram TTS API. Defaults to "https://api.deepgram.com/v1/speak"
word_tokenizer (tokenize.WordTokenizer): Tokenizer for processing text. Defaults to basic WordTokenizer.
http_session (aiohttp.ClientSession): Optional aiohttp session to use for requests.
extra_headers: Additional HTTP headers sent on every connection, merged over the
default ``Authorization: Token`` header. When no API key is set, these become
the sole auth (e.g. the Cloudflare AI Gateway).

""" # noqa: E501
super().__init__(
Expand All @@ -75,9 +78,16 @@ def __init__(
num_channels=NUM_CHANNELS,
)

api_key = api_key or os.environ.get("DEEPGRAM_API_KEY")
if not api_key:
# only fall back to the env var when no api_key was passed at all; an explicit "" means
# "no Deepgram key" (e.g. with_cloudflare), so it must not pick up DEEPGRAM_API_KEY
if api_key is None:
api_key = os.environ.get("DEEPGRAM_API_KEY")
extra = dict(extra_headers) if is_given(extra_headers) else {}
if not api_key and not extra:
raise ValueError("Deepgram API key required. Set DEEPGRAM_API_KEY or provide api_key.")
# default Token auth only when a key is present; extra_headers merge on top (and are
# the sole auth when no key is set, e.g. the Cloudflare AI Gateway)
self._connect_headers = ({"Authorization": f"Token {api_key}"} if api_key else {}) | extra

if not is_given(word_tokenizer):
word_tokenizer = tokenize.basic.WordTokenizer(ignore_punctuation=False)
Expand All @@ -88,7 +98,6 @@ def __init__(
sample_rate=sample_rate,
word_tokenizer=word_tokenizer,
base_url=base_url,
api_key=api_key,
mip_opt_out=mip_opt_out,
)
self._session = http_session
Expand All @@ -109,6 +118,70 @@ def model(self) -> str:
def provider(self) -> str:
return "Deepgram"

@staticmethod
def with_cloudflare(
*,
model: str = "aura-1",
account_id: str | None = None,
gateway_id: str = "default",
cf_aig_token: str | None = None,
base_url: str | None = None,
encoding: str = "linear16",
sample_rate: int = 24000,
word_tokenizer: NotGivenOr[tokenize.WordTokenizer] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
) -> TTS:
"""Create a Deepgram TTS routed through the Cloudflare AI Gateway.

Connects to the gateway's ``workers-ai`` WebSocket, which proxies Deepgram's
streaming protocol. Auth uses the ``cf-aig-authorization`` header; no Deepgram
API key is required.

Args:
model: Deepgram model name (e.g. ``"aura-1"``); the ``@cf/deepgram/`` prefix is
added automatically. A value already prefixed with ``@cf/`` is used as-is.
account_id: Cloudflare account ID. Falls back to ``CLOUDFLARE_ACCOUNT_ID``.
Required unless ``base_url`` is given.
gateway_id: Gateway name. Defaults to ``"default"``.
cf_aig_token: Gateway token for ``cf-aig-authorization``. Falls back to
``CLOUDFLARE_AI_GATEWAY_TOKEN``.
base_url: Full gateway endpoint; overrides ``account_id`` / ``gateway_id``.
encoding: Audio encoding, forwarded to ``TTS``. Defaults to ``"linear16"``.
sample_rate: Audio sample rate in Hz, forwarded to ``TTS``.
word_tokenizer: Optional tokenizer, forwarded to ``TTS``.
http_session: Optional aiohttp session, forwarded to ``TTS``.
"""
cf_aig_token = cf_aig_token or os.environ.get("CLOUDFLARE_AI_GATEWAY_TOKEN")
if not cf_aig_token:
raise ValueError(
"Cloudflare AI Gateway token is required, either as argument or set"
" CLOUDFLARE_AI_GATEWAY_TOKEN environment variable"
)
if base_url is None:
account_id = account_id or os.environ.get("CLOUDFLARE_ACCOUNT_ID")
if not account_id:
raise ValueError(
"Cloudflare account_id is required, either as argument or set"
" CLOUDFLARE_ACCOUNT_ID environment variable (or pass base_url directly)"
)
base_url = f"https://gateway.ai.cloudflare.com/v1/{account_id}/{gateway_id}/workers-ai"

if not model.startswith("@cf/"):
model = f"@cf/deepgram/{model}"

return TTS(
model=model,
encoding=encoding,
sample_rate=sample_rate,
base_url=base_url,
word_tokenizer=word_tokenizer,
http_session=http_session,
# explicit empty key opts out of the DEEPGRAM_API_KEY env fallback, so the gateway
# only ever receives cf-aig-authorization (no stray Authorization: Token header)
api_key="",
extra_headers={"cf-aig-authorization": cf_aig_token},
)

async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse:
session = self._ensure_session()
config = {
Expand All @@ -120,7 +193,7 @@ async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse:
ws = await asyncio.wait_for(
session.ws_connect(
_to_deepgram_url(config, self._opts.base_url, websocket=True),
headers={"Authorization": f"Token {self._opts.api_key}"},
headers=self._connect_headers,
),
timeout,
)
Expand Down Expand Up @@ -214,7 +287,7 @@ async def _run(self, output_emitter: tts.AudioEmitter) -> None:
websocket=False,
),
headers={
"Authorization": f"Token {self._opts.api_key}",
**self._tts._connect_headers,
"Content-Type": "application/json",
},
json={"text": self._input_text},
Expand Down
Loading