-
Notifications
You must be signed in to change notification settings - Fork 1
/
aws.py
59 lines (46 loc) · 2.1 KB
/
aws.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from amazon_transcribe.client import TranscribeStreamingClient
from amazon_transcribe.handlers import TranscriptResultStreamHandler
from amazon_transcribe.model import TranscriptEvent
from pyaudio import Stream
import asyncio
"""
Here's an example of a custom event handler you can extend to
process the returned transcription results as needed. This
handler will simply print the text out to your interpreter.
"""
BYTES_PER_SAMPLE = 2
CHANNEL_NUMS = 1
# An example file can be found at tests/integration/assets/test.wav
REGION = "us-east-1"
class MyEventHandler(TranscriptResultStreamHandler):
async def handle_transcript_event(self, transcript_event: TranscriptEvent):
# This handler can be implemented to handle transcriptions as needed.
# Here's an example to get started.
results = transcript_event.transcript.results
for result in results:
for alt in result.alternatives:
print(alt.transcript)
async def basic_transcribe(audio_stream: Stream, sample_rate: int, chunk_size: int):
# Setup up our client with our chosen AWS region
client = TranscribeStreamingClient(region=REGION)
# Start transcription to generate our async stream
stream = await client.start_stream_transcription(
language_code="en-US",
media_sample_rate_hz=sample_rate,
media_encoding="pcm",
)
async def write_chunks():
# NOTE: For pre-recorded files longer than 5 minutes, the sent audio
# chunks should be rate limited to match the realtime bitrate of the
# audio stream to avoid signing issues.
# what we do now is basically 'for chunk in audio_stream'
# but we need to do it asynchronously:
while True:
chunk = audio_stream.read(chunk_size)
if not chunk:
break
await stream.input_stream.send_audio_event(audio_chunk=chunk)
await stream.input_stream.end_stream()
# Instantiate our handler and start processing events
handler = MyEventHandler(stream.output_stream)
await asyncio.gather(write_chunks(), handler.handle_events())