Skip to content

Commit

Permalink
feat: Integration with langfuse for tracing (#132)
Browse files Browse the repository at this point in the history
* test langfuse

tracing the chat and retrieve

fix

* trace retrieving service

* support streaming mode and show tracing url
  • Loading branch information
Mini256 authored May 15, 2024
1 parent b98cc4a commit 99a517c
Show file tree
Hide file tree
Showing 22 changed files with 493 additions and 160 deletions.
1 change: 1 addition & 0 deletions ddl/0-initial-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ CREATE TABLE chat_message
finished_at DATETIME NULL,
deleted_at DATETIME NULL,
delete_reason ENUM('FORCE', 'REGENERATE') NULL,
trace_url VARCHAR(255) NULL,
PRIMARY KEY (id),
FOREIGN KEY fk_cm_on_chat_id (chat_id) REFERENCES chat (id)
);
Expand Down
1 change: 1 addition & 0 deletions embeddable-javascript/src/components/Chatbot/ChatItem.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export const enum AppChatStreamState {
}

export type MyChatMessageAnnotation = {
traceURL?: string,
context?: AppChatStreamSource[],
state?: AppChatStreamState,
stateMessage?: string
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"hast-util-select": "^6.0.2",
"hast-util-to-text": "^4.0.0",
"kysely": "^0.27.2",
"langfuse": "^3.10.0",
"liquidjs": "^10.10.0",
"llamaindex": "^0.3.10",
"luxon": "^3.4.4",
Expand Down
22 changes: 22 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/app/api/v1/chats/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {CHAT_CAN_NOT_ASSIGN_SESSION_ID_ERROR} from '@/lib/errors';
import {defineHandler} from '@/lib/next/handler';
import {baseRegistry} from '@/rag-spec/base';
import {getFlow} from '@/rag-spec/createFlow';
import {Langfuse} from "langfuse";
import {notFound} from 'next/navigation';
import {NextResponse} from 'next/server';
import {z} from 'zod';
Expand Down Expand Up @@ -86,7 +87,8 @@ export const POST = defineHandler({

const index = await getIndexByNameOrThrow(indexName);
const flow = await getFlow(baseRegistry);
const chatService = new LlamaindexChatService({ flow, index });
const langfuse = new Langfuse();
const chatService = new LlamaindexChatService({ flow, index, langfuse });

if (body.regenerate) {
if (!body.messageId) {
Expand Down
2 changes: 1 addition & 1 deletion src/app/api/v1/documents/import/from/urls/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ export const POST = defineHandler({

export const dynamic = 'force-dynamic';

export const maxDuration = 150;
export const maxDuration = 60;
2 changes: 1 addition & 1 deletion src/app/api/v1/documents/index/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,5 @@ export const POST = defineHandler({

export const dynamic = 'force-dynamic';

export const maxDuration = 150;
export const maxDuration = 60;

41 changes: 31 additions & 10 deletions src/components/chat/conversation-message-groups.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import { MessageHeading } from '@/components/chat/message-heading';
import { MessageOperations } from '@/components/chat/message-operations';
import { type ConversationMessageGroupProps, useGroupedConversationMessages } from '@/components/chat/use-grouped-conversation-messages';
import { Alert, AlertDescription, AlertTitle } from '@/components/ui/alert';
import {Button} from "@/components/ui/button";
import {Collapsible, CollapsibleContent, CollapsibleTrigger} from "@/components/ui/collapsible";
import type { ChatMessage } from '@/core/repositories/chat';
import { getErrorMessage } from '@/lib/errors';
import { AlertTriangleIcon } from 'lucide-react';
import {AlertTriangleIcon, BugIcon, InfoIcon} from 'lucide-react';
import {useState} from "react";

export function ConversationMessageGroups ({ history }: { history: ChatMessage[] }) {
const { error, messages, isLoading, isWaiting } = useMyChatContext();
Expand All @@ -30,17 +33,35 @@ export function ConversationMessageGroups ({ history }: { history: ChatMessage[]
}

function ConversationMessageGroup ({ group }: { group: ConversationMessageGroupProps }) {
const [debugInfoOpen, setDebugInfoOpen] = useState(false);
return (
<section className="space-y-6 p-4 border-b pb-10 last-of-type:border-b-0 last-of-type:border-pb-4">
<h2 className="text-2xl font-normal">{group.userMessage.content}</h2>
<MessageContextSources group={group} />
<Collapsible open={debugInfoOpen} onOpenChange={setDebugInfoOpen}>
<div className="flex items-center justify-between">
<h2 className="text-2xl font-normal">{group.userMessage.content}</h2>
<CollapsibleTrigger asChild>
<Button variant="ghost" size="sm">
<InfoIcon className="h-4 w-4"/>
<span className="sr-only">Toggle</span>
</Button>
</CollapsibleTrigger>
</div>
<CollapsibleContent>
<div className="line-clamp-1 text-xs p-y-4">
<span className="font-medium">Tracing URL: </span>
<a target="_blank" href={group.assistantAnnotation.traceURL}>{group.assistantAnnotation.traceURL}</a>
</div>
</CollapsibleContent>
</Collapsible>

<MessageContextSources group={group}/>
<section className="space-y-2">
<MessageHeading />
<MessageError group={group} />
<MessageContent group={group} />
{!group.finished && <MessageAnnotation group={group} />}
</section>
<MessageOperations group={group} />
<MessageHeading/>
<MessageError group={group}/>
<MessageContent group={group}/>
{!group.finished && <MessageAnnotation group={group}/>}
</section>
<MessageOperations group={group}/>
</section>
);
);
}
2 changes: 1 addition & 1 deletion src/components/chat/message-content.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { RemarkContent } from '@/components/remark-content';

export function MessageContent ({ group }: { group: ConversationMessageGroupProps }) {
return (
<article className="prose prose-sm prose-neutral dark:prose-invert overflow-x-hidden break-all">
<article className="prose prose-sm prose-neutral dark:prose-invert overflow-x-hidden break-keep">
{group.assistantMessage?.content && <RemarkContent>
{group.assistantMessage.content}
</RemarkContent>}
Expand Down
1 change: 1 addition & 0 deletions src/components/chat/use-grouped-conversation-messages.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export type PendingConversationMessage = {
export type MyChatMessageAnnotation = {
ts: number;
messageId: number;
traceURL?: string,
context?: AppChatStreamSource[],
state?: AppChatStreamState,
stateMessage?: string
Expand Down
1 change: 1 addition & 0 deletions src/core/db/schema.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export interface ChatMessage {
ordinal: number;
role: string;
status: "CREATED" | "FAILED" | "GENERATING" | "SUCCEED";
trace_url: string | null;
}

export interface ChatMessageRetrieveRel {
Expand Down
1 change: 1 addition & 0 deletions src/core/repositories/chat_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {Rewrite} from '@/lib/type-utils';
import type {Insertable, Selectable, Updateable} from 'kysely';
import {notFound} from 'next/navigation';

export type ChatEngineRequiredOptions = Required<Pick<ChatEngineOptions, 'llm' | 'retriever' | 'graph_retriever' | 'prompts' >> & Omit<ChatEngineOptions, 'llm' | 'retriever' | 'graph_retriever' | 'prompts'>;
export type ChatEngineOptions = CondenseQuestionChatEngineOptions;
export type ChatEngine = Rewrite<Selectable<DBv1['chat_engine']>, { engine: ChatEngineProvider, engine_options: ChatEngineOptions }>;
export type CreateChatEngine = Rewrite<Insertable<DBv1['chat_engine']>, { engine: ChatEngineProvider, engine_options: ChatEngineOptions }>;
Expand Down
4 changes: 4 additions & 0 deletions src/core/services/base.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { Flow } from '@/core';
import type { Index } from '@/core/repositories/index_';
import EventEmitter from 'events';
import {Langfuse} from "langfuse";

export interface AppFlowBaseServiceOptions {
flow: Flow;
}

export interface AppIndexBaseServiceOptions extends AppFlowBaseServiceOptions {
index: Index;
langfuse?: Langfuse;
}

export abstract class AppFlowBaseService extends EventEmitter {
Expand All @@ -21,9 +23,11 @@ export abstract class AppFlowBaseService extends EventEmitter {

export abstract class AppIndexBaseService extends AppFlowBaseService {
protected readonly index: Index;
protected readonly langfuse?: Langfuse;

constructor (options: AppIndexBaseServiceOptions) {
super(options);
this.index = options.index;
this.langfuse = options.langfuse;
}
}
24 changes: 14 additions & 10 deletions src/core/services/chating.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@ import { type Chat, type ChatMessage, createChatMessage, createChatMessageRetrie
import { AppIndexBaseService } from '@/core/services/base';
import { AppChatStream, type AppChatStreamSource, AppChatStreamState } from '@/lib/ai/AppChatStream';
import { AUTH_FORBIDDEN_ERROR, getErrorMessage } from '@/lib/errors';
import {LangfuseTraceClient} from "langfuse";
import { notFound } from 'next/navigation';

export type ChatOptions = {
userInput: string
userId: string
history: ChatMessage[]
userInput: string;
userId: string;
respondMessage: ChatMessage;
history: ChatMessage[];
}

// TODO: split into different type of events?
export type ChatStreamEvent = {
status: AppChatStreamState;
statusMessage: string;
traceURL?: string;
sources: AppChatStreamSource[];
content: string;
retrieveId?: number;
Expand All @@ -25,26 +28,27 @@ export abstract class AppChatService extends AppIndexBaseService {

async chat (sessionId: string, userId: string, userInput: string, regenerating: boolean) {
const { chat, history } = await this.getSessionInfo(sessionId, userId);
const message = await this.startChat(chat, history, userInput, regenerating);
const respondMessage = await this.startChat(chat, history, userInput, regenerating);

return new AppChatStream(sessionId, message.id, async controller => {
return new AppChatStream(sessionId, respondMessage.id, async controller => {
try {
let content = '';
let retrieveIds = new Set<number>();
for await (const chunk of this.run(chat, { userInput, history, userId })) {
controller.appendText(chunk.content, chunk.status === AppChatStreamState.CREATING /* force send an empty text chunk first, to avoid a dependency BUG */);
for await (const chunk of this.run(chat, { userInput, history, userId, respondMessage })) {
controller.appendText(chunk.content, chunk.status === AppChatStreamState.CREATING /* force sends an empty text chunk first, to avoid a dependency BUG */);
controller.setChatState(chunk.status, chunk.statusMessage);
controller.setTraceURL(chunk.traceURL);
controller.setSources(chunk.sources);
content += chunk.content;
if (chunk.retrieveId) {
retrieveIds.add(chunk.retrieveId);
}
}
controller.setChatState(AppChatStreamState.FINISHED);
await this.finishChat(message, content, retrieveIds);
await this.finishChat(respondMessage, content, retrieveIds);
} catch (error) {
controller.setChatState(AppChatStreamState.ERROR, getErrorMessage(error));
await this.terminateChat(message, error);
await this.terminateChat(respondMessage, error);
return Promise.reject(error);
}
});
Expand Down Expand Up @@ -133,6 +137,6 @@ export abstract class AppChatService extends AppIndexBaseService {
});
}

protected abstract run (chat: Chat, options: ChatOptions): AsyncIterable<ChatStreamEvent>;
protected abstract run (chat: Chat, options: ChatOptions, trace?: LangfuseTraceClient): AsyncIterable<ChatStreamEvent>;
}

Loading

0 comments on commit 99a517c

Please sign in to comment.