-
Notifications
You must be signed in to change notification settings - Fork 0
/
step-transcribe-collect.py
180 lines (165 loc) · 7.06 KB
/
step-transcribe-collect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#
# This software is Copyright ©️ 2020 The University of Southern California. All Rights Reserved.
# Permission to use, copy, modify, and distribute this software and its documentation for educational, research and non-profit purposes, without fee, and without a written agreement is hereby granted, provided that the above copyright notice and subject to the full license file found in the root of this software deliverable. Permission to make commercial use of this software may be obtained by contacting: USC Stevens Center for Innovation University of Southern California 1150 S. Olive Street, Suite 2300, Los Angeles, CA 90115, USA Email: [email protected]
#
# The full terms of this copyright and license should always be found in the root directory of this software deliverable as "license.txt" and if these terms are not found with this software, please contact the USC Stevens Center for the full license.
#
import json
import boto3
import botocore
import tempfile
import os
from typing import Dict
from module.logger import get_logger
from module.api import (
AnswerUpdateRequest,
UpdateTaskStatusRequest,
upload_answer_and_task_status_update,
)
from module.utils import (
get_text_from_file,
s3_bucket,
load_sentry,
require_env,
fetch_from_graphql,
)
load_sentry()
s3 = boto3.client("s3")
log = get_logger("answer-transcribe-handler")
aws_region = require_env("REGION")
sfn_client = boto3.client("stepfunctions", region_name=aws_region)
def process_event(
record, mentor, question, stored_task, auth_headers: Dict[str, str] = {}
):
key = record["s3"]["object"]["key"]
s3_path = os.path.dirname(key)
with tempfile.TemporaryDirectory() as work_dir:
# since 2 files get dropped, there're 2 lambda invocations
# its possible that not both files are in s3 when lambda runs for the first time
# try to get
try:
job_file = os.path.join(work_dir, "transcribe.json")
s3.download_file(
record["s3"]["bucket"]["name"], f"{s3_path}/transcribe.json", job_file
)
except botocore.exceptions.ClientError as e:
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
if e.response["Error"]["Code"] == "404":
log.info("failed to fetch transcribe json")
return
raise e
with open(job_file, "r") as f:
job = json.loads(f.read())
transcript = job["results"]["transcripts"][0]["transcript"]
log.debug(transcript)
# If there is no transcript, then a vtt file will never be produced by the AWS transcription job, so we're done
if transcript == "":
log.debug("No transcript was produced, skipping check for vtt file")
upload_answer_and_task_status_update(
AnswerUpdateRequest(
mentor=mentor,
question=question,
transcript=transcript,
has_edited_transcript=False,
),
UpdateTaskStatusRequest(
mentor=mentor,
question=question,
transcript=transcript,
transcribe_task={"status": "DONE"},
),
auth_headers,
)
sfn_client.send_task_success(taskToken=stored_task["payload"], output="{}")
try:
vtt_file = os.path.join(work_dir, "transcribe.vtt")
s3.download_file(
record["s3"]["bucket"]["name"], f"{s3_path}/transcribe.vtt", vtt_file
)
except botocore.exceptions.ClientError as e:
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
if e.response["Error"]["Code"] == "404":
log.info("failed to fetch subtitle vtt")
return
raise e
s3.upload_file(
vtt_file,
s3_bucket,
f"videos/{mentor}/{question}/en.vtt",
ExtraArgs={"ContentType": "text/vtt"},
)
vtt_text = get_text_from_file(vtt_file)
vtt_media = {
"type": "subtitles",
"tag": "en",
"url": f"videos/{mentor}/{question}/en.vtt",
"vttText": vtt_text,
}
upload_answer_and_task_status_update(
AnswerUpdateRequest(
mentor=mentor,
question=question,
transcript=transcript,
vtt_media=vtt_media,
has_edited_transcript=False,
),
UpdateTaskStatusRequest(
mentor=mentor,
question=question,
transcript=transcript,
transcribe_task={"status": "DONE"},
vtt_media=vtt_media,
),
auth_headers,
)
sfn_client.send_task_success(taskToken=stored_task["payload"], output="{}")
def handler(event, context):
"""This lambda is triggered with an S3 event - when the transcribe job is done,
and NOT by the Step Function. Therefore it must in all scenarios report
execution status back to the Step Function, otherwise the Step Function
won't be able to continue execution.
"""
# first get auth headers file from bucket
log.info(event)
for record in event["Records"]:
key = record["s3"]["object"]["key"]
s3_path = os.path.dirname(key)
with tempfile.TemporaryDirectory() as work_dir:
try:
auth_file = os.path.join(work_dir, "auth_headers.json")
s3.download_file(
record["s3"]["bucket"]["name"],
f"{s3_path}/auth_headers.json",
auth_file,
)
except botocore.exceptions.ClientError as e:
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
if e.response["Error"]["Code"] == "404":
log.info("failed to fetch auth headers json file from bucket")
return
raise e
with open(auth_file, "r") as f:
auth_headers = json.loads(f.read())
[mentor, question, *_] = s3_path.split("/")
stored_task = fetch_from_graphql(
mentor, question, "transcribeTask", auth_headers
)
if not stored_task:
log.warning(
"task not found, cannot continue! step function will have to timeout"
)
return
if stored_task["status"].startswith("CANCEL"):
log.info("task cancelled, skipping transcription")
sfn_client.send_task_success(taskToken=stored_task["payload"], output="{}")
return
try:
process_event(record, mentor, question, stored_task, auth_headers)
except Exception as err:
log.error(err)
sfn_client.send_task_failure(
taskToken=stored_task["payload"],
error=str(err),
cause=str(err.__cause__),
)
raise err