Skip to content

Commit

Permalink
Merge pull request #1497 from FlowiseAI/bugfix/Concurrent-Chat-Session
Browse files Browse the repository at this point in the history
Bugfix/Concurrent Chat Session
  • Loading branch information
HenryHengZJ authored Jan 15, 2024
2 parents 8847fc9 + 7ba4a06 commit b3d2d7c
Show file tree
Hide file tree
Showing 38 changed files with 1,746 additions and 1,388 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface'
import { initializeAgentExecutorWithOptions, AgentExecutor, InitializeAgentExecutorOptions } from 'langchain/agents'
import { Tool } from 'langchain/tools'
import { BaseChatMemory } from 'langchain/memory'
import { getBaseClasses, mapChatHistory } from '../../../src/utils'
import { BaseChatModel } from 'langchain/chat_models/base'
import { flatten } from 'lodash'
import { additionalCallbacks } from '../../../src/handler'
import { AgentStep, BaseMessage, ChainValues, AIMessage, HumanMessage } from 'langchain/schema'
import { RunnableSequence } from 'langchain/schema/runnable'
import { getBaseClasses } from '../../../src/utils'
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams } from '../../../src/Interface'
import { AgentExecutor } from '../../../src/agents'
import { ChatConversationalAgent } from 'langchain/agents'
import { renderTemplate } from '@langchain/core/prompts'

const DEFAULT_PREFIX = `Assistant is a large language model trained by OpenAI.
Expand All @@ -15,6 +18,15 @@ Assistant is constantly learning and improving, and its capabilities are constan
Overall, Assistant is a powerful system that can help with a wide range of tasks and provide valuable insights and information on a wide range of topics. Whether you need help with a specific question or just want to have a conversation about a particular topic, Assistant is here to assist.`

const TEMPLATE_TOOL_RESPONSE = `TOOL RESPONSE:
---------------------
{observation}
USER'S INPUT
--------------------
Okay, so what is the response to my last comment? If using information obtained from the tools you must mention it explicitly without mentioning the tool names - I have forgotten all TOOL RESPONSES! Remember to respond with a markdown code snippet of a json blob with a single action, and NOTHING else.`

class ConversationalAgent_Agents implements INode {
label: string
name: string
Expand All @@ -25,8 +37,9 @@ class ConversationalAgent_Agents implements INode {
category: string
baseClasses: string[]
inputs: INodeParams[]
sessionId?: string

constructor() {
constructor(fields?: { sessionId?: string }) {
this.label = 'Conversational Agent'
this.name = 'conversationalAgent'
this.version = 2.0
Expand All @@ -43,7 +56,7 @@ class ConversationalAgent_Agents implements INode {
list: true
},
{
label: 'Language Model',
label: 'Chat Model',
name: 'model',
type: 'BaseChatModel'
},
Expand All @@ -62,52 +75,114 @@ class ConversationalAgent_Agents implements INode {
additionalParams: true
}
]
this.sessionId = fields?.sessionId
}

async init(nodeData: INodeData): Promise<any> {
const model = nodeData.inputs?.model as BaseChatModel
let tools = nodeData.inputs?.tools as Tool[]
tools = flatten(tools)
const memory = nodeData.inputs?.memory as BaseChatMemory
const systemMessage = nodeData.inputs?.systemMessage as string
async init(nodeData: INodeData, input: string, options: ICommonObject): Promise<any> {
return prepareAgent(nodeData, { sessionId: this.sessionId, chatId: options.chatId, input }, options.chatHistory)
}

const obj: InitializeAgentExecutorOptions = {
agentType: 'chat-conversational-react-description',
verbose: process.env.DEBUG === 'true' ? true : false
}
async run(nodeData: INodeData, input: string, options: ICommonObject): Promise<string> {
const memory = nodeData.inputs?.memory as FlowiseMemory
const executor = await prepareAgent(nodeData, { sessionId: this.sessionId, chatId: options.chatId, input }, options.chatHistory)

const agentArgs: any = {}
if (systemMessage) {
agentArgs.systemMessage = systemMessage
}
const loggerHandler = new ConsoleCallbackHandler(options.logger)
const callbacks = await additionalCallbacks(nodeData, options)

if (Object.keys(agentArgs).length) obj.agentArgs = agentArgs
let res: ChainValues = {}

const executor = await initializeAgentExecutorWithOptions(tools, model, obj)
executor.memory = memory
return executor
}

async run(nodeData: INodeData, input: string, options: ICommonObject): Promise<string> {
const executor = nodeData.instance as AgentExecutor
const memory = nodeData.inputs?.memory as BaseChatMemory

if (options && options.chatHistory) {
const chatHistoryClassName = memory.chatHistory.constructor.name
// Only replace when its In-Memory
if (chatHistoryClassName && chatHistoryClassName === 'ChatMessageHistory') {
memory.chatHistory = mapChatHistory(options)
executor.memory = memory
}
if (options.socketIO && options.socketIOClientId) {
const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId)
res = await executor.invoke({ input }, { callbacks: [loggerHandler, handler, ...callbacks] })
} else {
res = await executor.invoke({ input }, { callbacks: [loggerHandler, ...callbacks] })
}

;(executor.memory as any).returnMessages = true // Return true for BaseChatModel
await memory.addChatMessages(
[
{
text: input,
type: 'userMessage'
},
{
text: res?.output,
type: 'apiMessage'
}
],
this.sessionId
)

return res?.output
}
}

const callbacks = await additionalCallbacks(nodeData, options)
const prepareAgent = async (
nodeData: INodeData,
flowObj: { sessionId?: string; chatId?: string; input?: string },
chatHistory: IMessage[] = []
) => {
const model = nodeData.inputs?.model as BaseChatModel
let tools = nodeData.inputs?.tools as Tool[]
tools = flatten(tools)
const memory = nodeData.inputs?.memory as FlowiseMemory
const systemMessage = nodeData.inputs?.systemMessage as string
const memoryKey = memory.memoryKey ? memory.memoryKey : 'chat_history'
const inputKey = memory.inputKey ? memory.inputKey : 'input'

/** Bind a stop token to the model */
const modelWithStop = model.bind({
stop: ['\nObservation']
})

const outputParser = ChatConversationalAgent.getDefaultOutputParser({
llm: model,
toolNames: tools.map((tool) => tool.name)
})

const prompt = ChatConversationalAgent.createPrompt(tools, {
systemMessage: systemMessage ? systemMessage : DEFAULT_PREFIX,
outputParser
})

const runnableAgent = RunnableSequence.from([
{
[inputKey]: (i: { input: string; steps: AgentStep[] }) => i.input,
agent_scratchpad: async (i: { input: string; steps: AgentStep[] }) => await constructScratchPad(i.steps),
[memoryKey]: async (_: { input: string; steps: AgentStep[] }) => {
const messages = (await memory.getChatMessages(flowObj?.sessionId, true, chatHistory)) as BaseMessage[]
return messages ?? []
}
},
prompt,
modelWithStop,
outputParser
])

const executor = AgentExecutor.fromAgentAndTools({
agent: runnableAgent,
tools,
sessionId: flowObj?.sessionId,
chatId: flowObj?.chatId,
input: flowObj?.input,
verbose: process.env.DEBUG === 'true' ? true : false
})

return executor
}

const result = await executor.call({ input }, [...callbacks])
return result?.output
const constructScratchPad = async (steps: AgentStep[]): Promise<BaseMessage[]> => {
const thoughts: BaseMessage[] = []
for (const step of steps) {
thoughts.push(new AIMessage(step.action.log))
thoughts.push(
new HumanMessage(
renderTemplate(TEMPLATE_TOOL_RESPONSE, 'f-string', {
observation: step.observation
})
)
)
}
return thoughts
}

module.exports = { nodeClass: ConversationalAgent_Agents }
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface'
import { initializeAgentExecutorWithOptions, AgentExecutor } from 'langchain/agents'
import { getBaseClasses, mapChatHistory } from '../../../src/utils'
import { ChainValues, AgentStep, BaseMessage } from 'langchain/schema'
import { flatten } from 'lodash'
import { BaseChatMemory } from 'langchain/memory'
import { ChatOpenAI } from 'langchain/chat_models/openai'
import { ChatPromptTemplate, MessagesPlaceholder } from 'langchain/prompts'
import { formatToOpenAIFunction } from 'langchain/tools'
import { RunnableSequence } from 'langchain/schema/runnable'
import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams } from '../../../src/Interface'
import { getBaseClasses } from '../../../src/utils'
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
import { OpenAIFunctionsAgentOutputParser } from 'langchain/agents/openai/output_parser'
import { AgentExecutor, formatAgentSteps } from '../../../src/agents'

const defaultMessage = `Do your best to answer the questions. Feel free to use any tools available to look up relevant information, only if necessary.`

Expand All @@ -17,8 +22,9 @@ class ConversationalRetrievalAgent_Agents implements INode {
category: string
baseClasses: string[]
inputs: INodeParams[]
sessionId?: string

constructor() {
constructor(fields?: { sessionId?: string }) {
this.label = 'Conversational Retrieval Agent'
this.name = 'conversationalRetrievalAgent'
this.version = 3.0
Expand Down Expand Up @@ -54,55 +60,96 @@ class ConversationalRetrievalAgent_Agents implements INode {
additionalParams: true
}
]
this.sessionId = fields?.sessionId
}

async init(nodeData: INodeData): Promise<any> {
const model = nodeData.inputs?.model
const memory = nodeData.inputs?.memory as BaseChatMemory
const systemMessage = nodeData.inputs?.systemMessage as string

let tools = nodeData.inputs?.tools
tools = flatten(tools)

const executor = await initializeAgentExecutorWithOptions(tools, model, {
agentType: 'openai-functions',
verbose: process.env.DEBUG === 'true' ? true : false,
agentArgs: {
prefix: systemMessage ?? defaultMessage
},
returnIntermediateSteps: true
})
executor.memory = memory
return executor
async init(nodeData: INodeData, input: string, options: ICommonObject): Promise<any> {
return prepareAgent(nodeData, { sessionId: this.sessionId, chatId: options.chatId, input }, options.chatHistory)
}

async run(nodeData: INodeData, input: string, options: ICommonObject): Promise<string> {
const executor = nodeData.instance as AgentExecutor

if (executor.memory) {
;(executor.memory as any).memoryKey = 'chat_history'
;(executor.memory as any).outputKey = 'output'
;(executor.memory as any).returnMessages = true

const chatHistoryClassName = (executor.memory as any).chatHistory.constructor.name
// Only replace when its In-Memory
if (chatHistoryClassName && chatHistoryClassName === 'ChatMessageHistory') {
;(executor.memory as any).chatHistory = mapChatHistory(options)
}
}
const memory = nodeData.inputs?.memory as FlowiseMemory
const executor = prepareAgent(nodeData, { sessionId: this.sessionId, chatId: options.chatId, input }, options.chatHistory)

const loggerHandler = new ConsoleCallbackHandler(options.logger)
const callbacks = await additionalCallbacks(nodeData, options)

let res: ChainValues = {}

if (options.socketIO && options.socketIOClientId) {
const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId)
const result = await executor.call({ input }, [loggerHandler, handler, ...callbacks])
return result?.output
res = await executor.invoke({ input }, { callbacks: [loggerHandler, handler, ...callbacks] })
} else {
const result = await executor.call({ input }, [loggerHandler, ...callbacks])
return result?.output
res = await executor.invoke({ input }, { callbacks: [loggerHandler, ...callbacks] })
}

await memory.addChatMessages(
[
{
text: input,
type: 'userMessage'
},
{
text: res?.output,
type: 'apiMessage'
}
],
this.sessionId
)

return res?.output
}
}

const prepareAgent = (
nodeData: INodeData,
flowObj: { sessionId?: string; chatId?: string; input?: string },
chatHistory: IMessage[] = []
) => {
const model = nodeData.inputs?.model as ChatOpenAI
const memory = nodeData.inputs?.memory as FlowiseMemory
const systemMessage = nodeData.inputs?.systemMessage as string
let tools = nodeData.inputs?.tools
tools = flatten(tools)
const memoryKey = memory.memoryKey ? memory.memoryKey : 'chat_history'
const inputKey = memory.inputKey ? memory.inputKey : 'input'

const prompt = ChatPromptTemplate.fromMessages([
['ai', systemMessage ? systemMessage : defaultMessage],
new MessagesPlaceholder(memoryKey),
['human', `{${inputKey}}`],
new MessagesPlaceholder('agent_scratchpad')
])

const modelWithFunctions = model.bind({
functions: [...tools.map((tool: any) => formatToOpenAIFunction(tool))]
})

const runnableAgent = RunnableSequence.from([
{
[inputKey]: (i: { input: string; steps: AgentStep[] }) => i.input,
agent_scratchpad: (i: { input: string; steps: AgentStep[] }) => formatAgentSteps(i.steps),
[memoryKey]: async (_: { input: string; steps: AgentStep[] }) => {
const messages = (await memory.getChatMessages(flowObj?.sessionId, true, chatHistory)) as BaseMessage[]
return messages ?? []
}
},
prompt,
modelWithFunctions,
new OpenAIFunctionsAgentOutputParser()
])

const executor = AgentExecutor.fromAgentAndTools({
agent: runnableAgent,
tools,
sessionId: flowObj?.sessionId,
chatId: flowObj?.chatId,
input: flowObj?.input,
returnIntermediateSteps: true,
verbose: process.env.DEBUG === 'true' ? true : false
})

return executor
}

module.exports = { nodeClass: ConversationalRetrievalAgent_Agents }
Loading

0 comments on commit b3d2d7c

Please sign in to comment.