diff --git a/recipes/llm-voice-assistant/python/cli/main.py b/recipes/llm-voice-assistant/python/cli/main.py index 0eb737d..909453c 100644 --- a/recipes/llm-voice-assistant/python/cli/main.py +++ b/recipes/llm-voice-assistant/python/cli/main.py @@ -4,10 +4,10 @@ import sys import time from argparse import ArgumentParser -from concurrent.futures import ThreadPoolExecutor from itertools import chain from multiprocessing import Event, Pipe, Process, Queue, active_children from multiprocessing.connection import Connection +from threading import Thread from typing import Optional, Sequence @@ -128,11 +128,10 @@ def __init__( self.speaking = False self.flushing = False self.pcmBuffer = [] - self.executor = ThreadPoolExecutor() self.future = None def close(self): - self.executor.shutdown() + self.interrupt() def start(self): self.started = True @@ -156,6 +155,8 @@ def tick(self): def stop(): self.speaker.flush() self.speaker.stop() + ppn_prompt = self.config['ppn_prompt'] + print(f'$ Say {ppn_prompt} ...', flush=True) if not self.speaking and len(self.pcmBuffer) > self.orca_warmup: self.speaking = True self.speaker.start() @@ -167,11 +168,7 @@ def stop(): self.started = False self.speaking = False self.flushing = False - self.future = self.executor.submit(stop) - if self.future and self.future.done(): - self.future = None - ppn_prompt = self.config['ppn_prompt'] - print(f'$ Say {ppn_prompt} ...', flush=True) + Thread(target=stop).start() class Synthesizer: @@ -187,8 +184,12 @@ def __init__( self.config = config def close(self): - self.orca_connection.send({'command': Commands.CLOSE}) - self.orca_process.join() + try: + self.orca_connection.send({'command': Commands.CLOSE}) + self.orca_process.join(1.0) + except Exception as e: + sys.stderr.write(str(e)) + self.orca_process.kill() def start(self, utterance_end_sec): self.speaker.start() @@ -201,10 +202,11 @@ def flush(self): self.orca_connection.send({'command': Commands.FLUSH}) def interrupt(self): - self.orca_connection.send({'command': Commands.INTERRUPT}) - while self.orca_connection.poll() and self.orca_connection.recv()['command'] != Commands.INTERRUPT: - time.sleep(0.1) - self.speaker.interrupt() + try: + self.orca_connection.send({'command': Commands.INTERRUPT}) + self.speaker.interrupt() + except Exception as e: + sys.stderr.write(str(e)) def tick(self): while self.orca_connection.poll(): @@ -233,7 +235,7 @@ def handler(_, __) -> None: signal.signal(signal.SIGINT, handler) orca = pvorca.create(access_key=config['access_key']) - orca_stream = orca.stream_open() + orca_stream = orca.stream_open(speech_rate=config['orca_speech_rate']) connection.send(orca.sample_rate) connection.send({'version': orca.version}) @@ -247,11 +249,15 @@ def handler(_, __) -> None: flushing = False text_queue = Queue() while not close: + time.sleep(0.1) while connection.poll(): - time.sleep(0.1) message = connection.recv() if message['command'] == Commands.CLOSE: close = True + synthesizing = False + flushing = False + while not text_queue.empty(): + text_queue.get() elif message['command'] == Commands.START: synthesizing = True utterance_end_sec = message['utterance_end_sec'] @@ -266,19 +272,19 @@ def handler(_, __) -> None: while not text_queue.empty(): text_queue.get() orca_stream.flush() - connection.send({'command': Commands.INTERRUPT}) orca_profiler.reset() utterance_end_sec = 0 delay_sec = -1 while not text_queue.empty(): text = text_queue.get() - orca_profiler.tick() - pcm = orca_stream.synthesize(text) - orca_profiler.tock(pcm) - if pcm is not None: - connection.send({'command': Commands.SPEAK, 'pcm': pcm}) - if delay_sec == -1: - delay_sec = time.perf_counter() - utterance_end_sec + if synthesizing: + orca_profiler.tick() + pcm = orca_stream.synthesize(text) + orca_profiler.tock(pcm) + if pcm is not None: + connection.send({'command': Commands.SPEAK, 'pcm': pcm}) + if delay_sec == -1: + delay_sec = time.perf_counter() - utterance_end_sec if synthesizing and flushing and text_queue.empty(): synthesizing = False flushing = False @@ -309,8 +315,12 @@ def __init__( self.config = config def close(self): - self.pllm_connection.send({'command': Commands.CLOSE}) - self.pllm_process.join() + try: + self.pllm_connection.send({'command': Commands.CLOSE}) + self.pllm_process.join(1.0) + except Exception as e: + sys.stderr.write(str(e)) + self.pllm_process.kill() def process(self, text: str, utterance_end_sec): ppn_prompt = self.config['ppn_prompt'] @@ -321,9 +331,6 @@ def process(self, text: str, utterance_end_sec): def interrupt(self): self.pllm_connection.send({'command': Commands.INTERRUPT}) - while self.pllm_connection.poll() and self.pllm_connection.recv()['command'] != Commands.INTERRUPT: - time.sleep(0.1) - print('', flush=True) self.synthesizer.interrupt() def tick(self): @@ -363,7 +370,6 @@ def handler(_, __) -> None: dialog = pllm.get_dialog(system=config['picollm_system_prompt']) else: dialog = pllm.get_dialog() - generating = False pllm_profiler = TPSProfiler() @@ -378,66 +384,53 @@ def handler(_, __) -> None: def llm_callback(text): pllm_profiler.tock() - if generating: - completion.append(text) - new_tokens = completion.get_new_tokens() - if len(new_tokens) > 0: - connection.send({'command': Commands.SYNTHESIZE, 'text': new_tokens}) - - def llm_task(text): - short_answers_instruction = \ - "You are a voice assistant and your answers are very short but informative" - dialog.add_human_request( - f"{short_answers_instruction}. {text}" if config['short_answers'] else text) - - completion.reset() - return pllm.generate( - prompt=dialog.prompt(), - completion_token_limit=config['picollm_completion_token_limit'], - stop_phrases=stop_phrases, - presence_penalty=config['picollm_presence_penalty'], - frequency_penalty=config['picollm_frequency_penalty'], - temperature=config['picollm_temperature'], - top_p=config['picollm_top_p'], - stream_callback=llm_callback) + completion.append(text) + new_tokens = completion.get_new_tokens() + if len(new_tokens) > 0: + connection.send({'command': Commands.SYNTHESIZE, 'text': new_tokens}) + + close = [False] + prompt = [None] + + def event_manager(): + while not close[0]: + message = connection.recv() + if message['command'] == Commands.CLOSE: + close[0] = True + pllm.interrupt() + return + elif message['command'] == Commands.INTERRUPT: + pllm.interrupt() + elif message['command'] == Commands.PROCESS: + prompt[0] = message['text'] + Thread(target=event_manager).start() try: - close = False - executor = ThreadPoolExecutor() - llm_future = None - interrupting = False - while not close: - time.sleep(0.1) - while connection.poll(): - message = connection.recv() - if message['command'] == Commands.CLOSE: - close = True - elif message['command'] == Commands.PROCESS: - generating = True - text = message['text'] - pllm_profiler.reset() - llm_future = executor.submit(llm_task, text) - elif message['command'] == Commands.INTERRUPT: - interrupting = True - generating = False - pllm.interrupt() - if llm_future and llm_future.done(): - generating = False - llm_result = llm_future.result() - dialog.add_llm_response(llm_result.completion) - if llm_result.endpoint == picollm.PicoLLMEndpoints.INTERRUPTED: - interrupting = False - connection.send({'command': Commands.INTERRUPT}) - else: + while not close[0]: + if prompt[0] is not None: + short_answers_instruction = \ + "You are a voice assistant and your answers are very short but informative" + dialog.add_human_request( + f"{short_answers_instruction}. {prompt[0]}" if config['short_answers'] else prompt[0]) + prompt[0] = None + + completion.reset() + result = pllm.generate( + prompt=dialog.prompt(), + completion_token_limit=config['picollm_completion_token_limit'], + stop_phrases=stop_phrases, + presence_penalty=config['picollm_presence_penalty'], + frequency_penalty=config['picollm_frequency_penalty'], + temperature=config['picollm_temperature'], + top_p=config['picollm_top_p'], + stream_callback=llm_callback) + + dialog.add_llm_response(result.completion) + if result.endpoint != picollm.PicoLLMEndpoints.INTERRUPTED: connection.send({'command': Commands.FLUSH, 'profile': pllm_profiler.tps()}) - llm_future = None - if not llm_future and interrupting: - interrupting = False - connection.send({'command': Commands.INTERRUPT}) + else: + time.sleep(0.25) finally: - while llm_future and llm_future.done(): - time.sleep(0.1) - del executor pllm.release() @@ -578,15 +571,14 @@ def handler(_, __) -> None: try: while not stop[0]: + if not pllm_process.is_alive() or not orca_process.is_alive(): + break + recorder.tick() generator.tick() synthesizer.tick() speaker.tick() finally: - generator.interrupt() - generator.tick() - synthesizer.tick() - speaker.tick() recorder.close() listener.close() generator.close() @@ -594,7 +586,7 @@ def handler(_, __) -> None: speaker.close() for child in active_children(): - child.terminate() + child.kill() porcupine.delete() cheetah.delete() @@ -614,8 +606,8 @@ def handler(_, __) -> None: '--picollm_model_path', help='Absolute path to the file containing LLM parameters (`.pllm`).') parser.add_argument( - '--keyword-model_path', - help='Absolute path to the keyword model file (`.ppn`). If not set, `Picovoice` will be the wake phrase') + '--keyword_model_path', + help='Absolute path to the keyword model file (`.ppn`). If not set, `Jarvis` will be the wake phrase') parser.add_argument( '--cheetah_endpoint_duration_sec', type=float, @@ -666,6 +658,10 @@ def handler(_, __) -> None: type=float, help="Duration of the synthesized audio to buffer before streaming it out. A higher value helps slower " "(e.g., Raspberry Pi) to keep up with real-time at the cost of increasing the initial delay.") + parser.add_argument( + '--orca_speech_rate', + type=float, + help="Rate of speech of the generated audio.") parser.add_argument( '--porcupine_sensitivity', type=float, @@ -704,6 +700,7 @@ def handler(_, __) -> None: 'picollm_top_p': 1, 'picollm_system_prompt': None, 'orca_warmup_sec': 0, + 'orca_speech_rate': 1.0, 'porcupine_sensitivity': 0.5, 'short_answers': False, 'profile': False