-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstorage_manager.py
170 lines (143 loc) · 5.86 KB
/
storage_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import os
import re
import time
import sounddevice as sd
from threading import Thread
from core import AudioPacket
BLACK_BOX_DIR = "blackbox"
IMAGES_DIR = os.path.join(BLACK_BOX_DIR, "sample-images")
COMMANDS_CACHE_DIR = os.path.join(BLACK_BOX_DIR, "commands-audio-cache")
LOG_DIR = os.path.join(BLACK_BOX_DIR, "logs")
WORLD_STATE_DIR = os.path.join(BLACK_BOX_DIR, "world-state")
GENERATED_AUDIO_DIR = os.path.join(BLACK_BOX_DIR, "generated-audio")
for dir in [
IMAGES_DIR,
COMMANDS_CACHE_DIR,
LOG_DIR,
WORLD_STATE_DIR,
GENERATED_AUDIO_DIR,
]:
if not os.path.exists(dir):
os.makedirs(dir)
class StorageManager:
"""Storage manager for audio and images"""
_self = None
def __new__(cls):
"""Singleton pattern"""
if cls._self is None:
cls._self = super().__new__(cls)
return cls._self
def __init__(self):
self.threads_pool = []
@classmethod
def establish_session(cls):
"""Establish session, generate session id and open log file"""
cls = StorageManager()
cls._generate_session_id()
def _generate_session_id(self):
"""Generate session id and open log file"""
try:
self.log_file.close()
except:
# No log file open
pass
self.session_id = str(time.time())
self.log_file = open(
os.path.join(LOG_DIR, f"session_{self.session_id}.log"), mode="w"
)
@classmethod
def clean_up(cls):
"""Clean up upon disconnection and delegate logging"""
cls = StorageManager()
try:
cls.log_file.close()
except:
# No log file open
write_output("No log file open to close.")
def _enqueue_task(self, func, *args):
"""Enqueue task to thread pool
Args:
func (function): Function to execute
*args: Arguments to pass to function
"""
self = StorageManager()
thread = Thread(target=func, args=args)
thread.start()
self.threads_pool.append(thread)
@classmethod
def play_audio_packet(cls, audio_packet, transcription=None, block=False):
def play_save_packet(audio_packet, transcription=None):
write_output("Here is response frames played out.. pay attention")
sd.play(audio_packet.float, audio_packet.sample_rate)
sd.wait()
# if transcription is not None:
# session_id = f'session_{int(time.time()*1000)}_'
# with open(os.path.join(COMMANDS_CACHE_DIR, f"{transcription}_{session_id}.txt"), mode='wb') as f:
# f.write(audio_packet.bytes)
# TODO Write meta data too
cls = StorageManager()
if block:
play_save_packet(audio_packet, transcription)
else:
cls._enqueue_task(play_save_packet, audio_packet, transcription)
def _write_bin(self, audio_buffer, text, prefix):
# sd.play(np.frombuffer(session_audio_buffer, dtype=np.int16), 16000)
audio_filepath = self.get_recorded_audio_filepath(text, "bin", prefix=prefix)
with open(audio_filepath, mode="wb") as f:
f.write(audio_buffer.bytes)
def _write_wav(self, audio_packet: AudioPacket, text, prefix):
"""Write audio file to disk as wav
Args:
audio_packet (AudioPacket): Audio packet to write
text (str): Text (transcription) to use as file name
prefix (str): Prefix of file name
"""
import soundfile as sf
audio_filepath = self.get_recorded_audio_filepath(text, "wav", prefix=prefix)
# save as WAV file
sf.write(audio_filepath, audio_packet.float, audio_packet.sample_rate)
@classmethod
def write_audio_file(self, audio_buffer: AudioPacket, text="", format="wav"):
"""Write audio file to disk"""
self = StorageManager()
_write = {
"binary": lambda a, t, p: self._write_bin(a, t, p),
"wav": lambda a, t, p: self._write_wav(a, t, p),
}
session_id = f"session_{int(time.time()*1000)}_"
self._enqueue_task(_write[format], audio_buffer, text, session_id)
@classmethod
def ensure_completion(self):
"""Ensure all threads are completed"""
self = StorageManager()
for i, thread in enumerate(self.threads_pool):
if not thread:
write_output(f"Discarding none valid thread # {i}")
continue
thread.join()
def log_state(self, state):
"""Log state to file"""
self = StorageManager()
def _write_state(state):
self.log_file.write(str(state))
self.log_file.flush()
self._enqueue_task(_write_state, state)
def get_blackbox_audio_filepath(self, text, extension='wav', prefix="recorded_audio_", directory=COMMANDS_CACHE_DIR):
"""Get recorded audio path from text"""
recording_id = f"{prefix}{int(time.time()*1000)}"
audio_filepath = os.path.join(directory, f"{recording_id}.{extension}")
text_filepath = os.path.join(directory, f"{recording_id}.txt")
os.makedirs(directory, exist_ok=True)
with open(text_filepath, mode="w") as f:
clean_text = re.sub(r"[^a-zA-Z0-9]+", "_", text)
f.write(clean_text)
return audio_filepath
def get_recorded_audio_filepath(self, text, extension='wav', prefix=""):
"""Get recorded audio path from text"""
return self.get_blackbox_audio_filepath(text, extension, f"{prefix}_", directory=COMMANDS_CACHE_DIR)
def get_generated_audio_path(self, text):
"""Get generated audio path from text"""
return self.get_blackbox_audio_filepath(text, "wav", "generated_audio_", directory=GENERATED_AUDIO_DIR)
def write_output(msg, end="\n"):
"""Write output to console with flush"""
print(str(msg), end=end, flush=True)