Skip to content

Commit

Permalink
Merge pull request #51 from boldare/feat/assistant-streaming
Browse files Browse the repository at this point in the history
feat(openai-assistant): assistants streaming events
  • Loading branch information
sebastianmusial authored Mar 29, 2024
2 parents 40d1910 + 209abbb commit 30593b6
Show file tree
Hide file tree
Showing 27 changed files with 623 additions and 233 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { Component, Input, OnInit } from '@angular/core';
import { ChatClientService } from '../../../modules/+chat/shared/chat-client.service';
import { ChatMessage, SpeechVoice } from '../../../modules/+chat/shared/chat.model';
import {
ChatMessage,
SpeechVoice,
} from '../../../modules/+chat/shared/chat.model';
import { environment } from '../../../../environments/environment';
import { MatIconModule } from '@angular/material/icon';
import { delay } from 'rxjs';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { Component, HostBinding, Input } from '@angular/core';
import { ChatRole, ChatMessage } from '../../../modules/+chat/shared/chat.model';
import {
ChatRole,
ChatMessage,
} from '../../../modules/+chat/shared/chat.model';
import { MarkdownComponent } from 'ngx-markdown';
import { ChatAudioComponent } from '../chat-audio/chat-audio.component';
import { NgClass } from '@angular/common';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@

<ai-spinner [isActive]="isLoading()" />
@if (isConfigEnabled && !threadId()) {
<ai-configuration-form class="chat__content" />
<ai-configuration-form class="chat__content" />
} @else {
<ai-chat-messages
[messages]="messages()"
[isTyping]="isTyping()"
[tips]="tips"
(tipSelected$)="chatService.sendMessage($event)"
class="chat__content" />
<ai-chat-footer
[isDisabled]="isTyping()"
[isTranscriptionEnabled]="isTranscriptionEnabled"
[isAttachmentEnabled]="isAttachmentEnabled"
(sendMessage$)="chatService.sendMessage($event)"
(sendAudio$)="chatService.sendAudio($event)" />
<ai-chat-messages
[messages]="messages()"
[isTyping]="isTyping()"
[tips]="tips"
(tipSelected$)="chatService.sendMessage($event)"
class="chat__content" />
<ai-chat-footer
[isDisabled]="isTyping()"
[isTranscriptionEnabled]="isTranscriptionEnabled"
[isAttachmentEnabled]="isAttachmentEnabled"
(sendMessage$)="chatService.sendMessage($event)"
(sendAudio$)="chatService.sendAudio($event)" />
}
</ai-card>
37 changes: 29 additions & 8 deletions apps/spa/src/app/modules/+chat/shared/chat-gateway.service.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,43 @@
import { Injectable } from '@angular/core';
import { ChatEvents } from './chat.model';
import io from 'socket.io-client';
import { ChatCallDto } from '@boldare/openai-assistant';
import {
ChatCallDto,
TextCreatedPayload,
TextDeltaPayload,
TextDonePayload,
} from '@boldare/openai-assistant';
import { Observable } from 'rxjs';
import { environment } from '../../../../environments/environment';

@Injectable({ providedIn: 'root' })
export class ChatGatewayService {
private socket = io(environment.websocketUrl);

sendMessage(payload: ChatCallDto): void {
this.socket.emit(ChatEvents.SendMessage, payload);
}

getMessages(): Observable<ChatCallDto> {
return new Observable<ChatCallDto>(observer => {
this.socket.on(ChatEvents.MessageReceived, data => observer.next(data));
watchEvent<T>(event: ChatEvents): Observable<T> {
return new Observable<T>(observer => {
this.socket.on(event, data => observer.next(data));
return () => this.socket.disconnect();
});
}

callStart(payload: ChatCallDto): void {
this.socket.emit(ChatEvents.CallStart, payload);
}

callDone(): Observable<ChatCallDto> {
return this.watchEvent(ChatEvents.CallDone);
}

textCreated(): Observable<TextCreatedPayload> {
return this.watchEvent(ChatEvents.TextCreated);
}

textDelta(): Observable<TextDeltaPayload> {
return this.watchEvent(ChatEvents.TextDelta);
}

textDone(): Observable<TextDonePayload> {
return this.watchEvent(ChatEvents.TextDone);
}
}
17 changes: 15 additions & 2 deletions apps/spa/src/app/modules/+chat/shared/chat.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,21 @@ export interface ChatMessage {
}

export enum ChatEvents {
SendMessage = 'send_message',
MessageReceived = 'message_received',
CallStart = 'callStart',
CallDone = 'callDone',
MessageCreated = 'messageCreated',
MessageDelta = 'messageDelta',
MessageDone = 'messageDone',
TextCreated = 'textCreated',
TextDelta = 'textDelta',
TextDone = 'textDone',
ImageFileDone = 'imageFileDone',
ToolCallCreated = 'toolCallCreated',
ToolCallDelta = 'toolCallDelta',
ToolCallDone = 'toolCallDone',
RunStepCreated = 'runStepCreated',
RunStepDelta = 'runStepDelta',
RunStepDone = 'runStepDone',
}

export enum ChatMessageStatus {
Expand Down
46 changes: 42 additions & 4 deletions apps/spa/src/app/modules/+chat/shared/chat.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import { OpenAiFile, GetThreadResponseDto } from '@boldare/openai-assistant';
import { Message } from 'openai/resources/beta/threads/messages';
import { TextContentBlock } from 'openai/resources/beta/threads/messages/messages';


@Injectable({ providedIn: 'root' })
export class ChatService {
isLoading$ = new BehaviorSubject<boolean>(false);
Expand All @@ -35,11 +34,21 @@ export class ChatService {
) {
document.body.classList.add('ai-chat');

this.subscribeMessages();
this.setInitialValues();
this.watchMessages();
this.watchVisibility();
}

subscribeMessages(): void {
if (!environment.isStreamingEnabled) {
this.watchMessages();
} else {
this.watchTextCreated();
this.watchTextDelta();
this.watchTextDone();
}
}

isMessageInvisible(message: Message): boolean {
const metadata = message.metadata as Record<string, unknown>;
return metadata?.['status'] === ChatMessageStatus.Invisible;
Expand Down Expand Up @@ -87,11 +96,13 @@ export class ChatService {

refresh(): void {
this.isLoading$.next(true);
this.isTyping$.next(false);
this.messages$.next([]);
this.threadService.start().subscribe();
}

clear(): void {
this.isTyping$.next(false);
this.threadService.clear();
this.messages$.next([]);
}
Expand Down Expand Up @@ -120,15 +131,42 @@ export class ChatService {
const files = await this.chatFilesService.sendFiles();
this.addFileMessage(files);

this.chatGatewayService.sendMessage({
this.chatGatewayService.callStart({
content,
threadId: this.threadService.threadId$.value,
file_ids: files.map(file => file.id) || [],
});
}

watchTextCreated(): Subscription {
return this.chatGatewayService.textCreated().subscribe(data => {
this.isTyping$.next(false);
this.addMessage({ content: data.text.value, role: ChatRole.Assistant });
});
}

watchTextDelta(): Subscription {
return this.chatGatewayService.textDelta().subscribe(data => {
const length = this.messages$.value.length;
this.messages$.value[length - 1].content = data.text.value;
});
}

watchTextDone(): Subscription {
return this.chatGatewayService.textDone().subscribe(data => {
this.isTyping$.next(false);
this.messages$.next([
...this.messages$.value.slice(0, -1),
{
content: data.text.value,
role: ChatRole.Assistant,
},
]);
});
}

watchMessages(): Subscription {
return this.chatGatewayService.getMessages().subscribe(data => {
return this.chatGatewayService.callDone().subscribe(data => {
this.addMessage({
content: data.content,
role: ChatRole.Assistant,
Expand Down
19 changes: 12 additions & 7 deletions apps/spa/src/app/modules/+chat/shared/thread.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import { Injectable } from '@angular/core';
import { BehaviorSubject, catchError, Observable, Subject, take, tap } from 'rxjs';
import {
BehaviorSubject,
catchError,
Observable,
Subject,
take,
tap,
} from 'rxjs';
import { environment } from '../../../../environments/environment';
import { ThreadClientService } from './thread-client.service';
import { ConfigurationFormService } from '../../+configuration/shared/configuration-form.service';
Expand Down Expand Up @@ -43,11 +50,9 @@ export class ThreadService {
}

getThread(id: string): Observable<GetThreadResponseDto> {
return this.threadClientService
.getThread(id)
.pipe(
take(1),
catchError(() => this.start()),
);
return this.threadClientService.getThread(id).pipe(
take(1),
catchError(() => this.start()),
);
}
}
1 change: 1 addition & 0 deletions apps/spa/src/environments/environment.development.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ export const environment = {
isRefreshEnabled: true,
isConfigEnabled: true,
isAutoOpen: true,
isStreamingEnabled: true,
};
1 change: 1 addition & 0 deletions apps/spa/src/environments/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ export const environment = {
isRefreshEnabled: true,
isConfigEnabled: true,
isAutoOpen: true,
isStreamingEnabled: true,
};
4 changes: 2 additions & 2 deletions libs/openai-assistant/src/lib/agent/agent.mock.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { AssistantCreateParams } from 'openai/resources/beta';
import { FunctionTool } from 'openai/resources/beta';

export const agentNameMock = 'agent-name';

export const agentMock = async () => 'agent-result';

export const definitionMock: AssistantCreateParams.AssistantToolsFunction = {
export const definitionMock: FunctionTool = {
type: 'function',
function: { name: agentNameMock },
};
5 changes: 1 addition & 4 deletions libs/openai-assistant/src/lib/agent/agent.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ export class AgentService {
public agents: Agents = {};
public tools: FunctionTool[] = [];

add(
definition: FunctionTool,
fn: Agent,
): void {
add(definition: FunctionTool, fn: Agent): void {
this.tools.push(definition);
this.agents[definition.function.name] = fn;
}
Expand Down
8 changes: 7 additions & 1 deletion libs/openai-assistant/src/lib/assistant/assistant.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { DynamicModule, Inject, Module, OnModuleInit, Optional } from '@nestjs/common';
import {
DynamicModule,
Inject,
Module,
OnModuleInit,
Optional,
} from '@nestjs/common';
import { HttpModule } from '@nestjs/axios';
import {
AssistantService,
Expand Down
2 changes: 1 addition & 1 deletion libs/openai-assistant/src/lib/chat/chat.gateway.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe('ChatGateway', () => {

await chatGateway.listenForMessages(request, {} as Socket);

expect(chatService.call).toHaveBeenCalledWith(request);
expect(chatService.call).toHaveBeenCalled();
});
});

Expand Down
Loading

0 comments on commit 30593b6

Please sign in to comment.