-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
128 lines (109 loc) · 4.64 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import json
import logging
import os
import pandas as pd
import streamlit as st
from openai import OpenAI
from pydub import AudioSegment
from models import build_response_model
upload_path = "./uploads/"
download_path = "./downloads/"
logging.basicConfig(level=logging.INFO)
def convert_df(df):
return df.to_csv(index=False).encode('utf-8')
def results_to_csv(results: dict):
df = pd.DataFrame.from_dict(results, orient='index').reset_index().rename(columns={'index': 'question'})
return convert_df(df)
@st.cache_data
def process_mp3_file(audio_file, upload_path, download_path, output_audio_file):
audio_data = AudioSegment.from_mp3(os.path.join(upload_path,audio_file.name))
audio_data.export(os.path.join(download_path,output_audio_file), format="mp3", tags={"comments": "Converted with pydub"})
logging.info(f"File {audio_file.name} converted to {output_audio_file}")
return output_audio_file
@st.cache_data
def process_audio(audio_file: str) -> str:
client = OpenAI()
audio = open(audio_file, 'rb')
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio,
response_format="verbose_json",
timestamp_granularities=["segment"]
)
logging.info(f"Transcript: {transcript.text}")
segments = transcript.segments # type: ignore
data = ""
for segment in segments:
data += f"Timestamp: {round(float(segment['start']), 1)}s - {round(float(segment['end']), 2)}\n{segment['text']}\n"
return data
@st.cache_data
def process_csv_file(csv_file):
questions = pd.read_csv(csv_file)
logging.info(f"Questions: {questions}")
return questions
@st.cache_data
def process_results(transcript, questions):
response_model = build_response_model(questions)
client = OpenAI()
resp = client.chat.completions.create(
model="gpt-3.5-turbo-16k-0613",
messages = [
{"role": "system", "content": "You are a claims adjuster. You have a list of questions that need to be filled out. Use the provided transcript to answer the questions. Return the answers as a JSON object. For each answer, include the timestamp(s) from which the answer was obtained"},
{"role": "user", "content": transcript}
],
tools = [
{
"type": "function",
"function": {
"name": "Response",
"description": "Requested answers to the questions",
"parameters": response_model.model_json_schema()
}
}
],
tool_choice={
"type": "function",
"function": {"name": "Response"}
}
)
logging.info(f"Results: {resp}")
args = resp.choices[0].message.tool_calls[0].function.arguments # type: ignore
obj = json.loads(args)
cleaned_obj = {k.replace('_', ' '): v for k, v in obj.items()}
return cleaned_obj
mp3_file = st.file_uploader('Upload a recording of the claims call', type='mp3', help='Upload a recording of the claims call')
if mp3_file is not None:
with open(os.path.join(upload_path, mp3_file.name), "wb") as f:
f.write((mp3_file).getbuffer())
output_audio_file = process_mp3_file(mp3_file, upload_path, download_path, 'output.mp3')
audio_file = open(os.path.join(download_path, output_audio_file), 'rb')
audio_bytes = audio_file.read()
st.markdown("---")
st.markdown("**Original Audio**")
st.audio(audio_bytes)
transcript = process_audio(str(os.path.abspath(os.path.join(download_path,output_audio_file))))
csv_file = st.file_uploader('Upload the questions CSV file', type='csv', help='Upload the questions CSV file')
if csv_file is not None:
questions = process_csv_file(csv_file)
if st.toggle('Show Questions'):
st.markdown("**Your Questions**")
st.write(questions)
st.markdown("---")
results = process_results(transcript, questions)
if st.toggle('Show Results'):
st.markdown("**Results**")
for k, v in results.items():
st.markdown(f"**{k}:**")
st.write(f"{v['answer']}")
st.markdown(f"**Timestamp:** {v['timestamp']}s")
if results:
st.markdown("---")
st.markdown("**Download Results**")
st.markdown("Download the results as a CSV file")
st.download_button(
"Download Results",
results_to_csv(results),
"results.csv",
"text/csv",
key="download-results"
)