Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/Reuse DB Connection for DataSource #3661

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ BLOB_STORAGE_PATH=/root/.flowise/storage
# APIKEY_STORAGE_TYPE=json (json | db)
# SHOW_COMMUNITY_NODES=true

# Reuse app database connection for agent and record manager to reduce concurrent connections
# REUSE_DB_CONNECTION_AGENT_MEMORY=true
# REUSE_DB_CONNECTION_RECORD_MANAGER=true

######################
# METRICS COLLECTION
#######################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export class MySQLSaver extends BaseCheckpointSaver implements MemoryMethods {
config: SaverOptions
threadId: string
tableName = 'checkpoints'
reuseDbConnection: boolean

constructor(config: SaverOptions, serde?: SerializerProtocol<Checkpoint>) {
super(serde)
Expand All @@ -20,6 +21,24 @@ export class MySQLSaver extends BaseCheckpointSaver implements MemoryMethods {
}

private async getDataSource(): Promise<DataSource> {
if (process.env.REUSE_DB_CONNECTION_AGENT_MEMORY === 'true') {
const datasource = this.config.appDataSource
if (!datasource) {
throw new Error('No datasource provided')
}
if (datasource.options.type !== 'mysql') {
throw new Error('Invalid datasource type')
}
if (datasource.options.port === 5432) {
throw new Error('Invalid port number')
}
if (!datasource.isInitialized) {
await datasource.initialize()
}
this.reuseDbConnection = true
return datasource
}

const { datasourceOptions } = this.config
if (!datasourceOptions) {
throw new Error('No datasource options provided')
Expand Down Expand Up @@ -97,7 +116,9 @@ export class MySQLSaver extends BaseCheckpointSaver implements MemoryMethods {
console.error(`Error retrieving ${this.tableName}`, error)
throw new Error(`Error retrieving ${this.tableName}`)
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
return undefined
}
Expand Down Expand Up @@ -145,7 +166,9 @@ export class MySQLSaver extends BaseCheckpointSaver implements MemoryMethods {
console.error(`Error listing checkpoints`, error)
throw new Error(`Error listing checkpoints`)
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
}

Expand Down Expand Up @@ -174,7 +197,9 @@ export class MySQLSaver extends BaseCheckpointSaver implements MemoryMethods {
console.error('Error saving checkpoint', error)
throw new Error('Error saving checkpoint')
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}

return {
Expand All @@ -199,7 +224,9 @@ export class MySQLSaver extends BaseCheckpointSaver implements MemoryMethods {
} catch (error) {
console.error(`Error deleting thread_id ${threadId}`, error)
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export class PostgresSaver extends BaseCheckpointSaver implements MemoryMethods
config: SaverOptions
threadId: string
tableName = 'checkpoints'
reuseDbConnection: boolean

constructor(config: SaverOptions, serde?: SerializerProtocol<Checkpoint>) {
super(serde)
Expand All @@ -20,6 +21,24 @@ export class PostgresSaver extends BaseCheckpointSaver implements MemoryMethods
}

private async getDataSource(): Promise<DataSource> {
if (process.env.REUSE_DB_CONNECTION_AGENT_MEMORY === 'true') {
const datasource = this.config.appDataSource
if (!datasource) {
throw new Error('No datasource provided')
}
if (datasource.options.type !== 'postgres') {
throw new Error('Invalid datasource type')
}
if (datasource.options.port === 3006) {
throw new Error('Invalid port number')
}
if (!datasource.isInitialized) {
await datasource.initialize()
}
this.reuseDbConnection = true
return datasource
}

const { datasourceOptions } = this.config
if (!datasourceOptions) {
throw new Error('No datasource options provided')
Expand Down Expand Up @@ -92,7 +111,9 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
console.error(`Error retrieving ${this.tableName}`, error)
throw new Error(`Error retrieving ${this.tableName}`)
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
} else {
try {
Expand All @@ -101,6 +122,7 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
const sql = `SELECT thread_id, checkpoint_id, parent_id, checkpoint, metadata FROM ${this.tableName} WHERE thread_id = $1 ORDER BY checkpoint_id DESC LIMIT 1`

const rows = await queryRunner.manager.query(sql, keys)

await queryRunner.release()

if (rows && rows.length > 0) {
Expand All @@ -127,7 +149,9 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
console.error(`Error retrieving ${this.tableName}`, error)
throw new Error(`Error retrieving ${this.tableName}`)
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
}
return undefined
Expand Down Expand Up @@ -182,7 +206,9 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
console.error(`Error listing ${this.tableName}`, error)
throw new Error(`Error listing ${this.tableName}`)
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
}

Expand Down Expand Up @@ -212,7 +238,9 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
console.error('Error saving checkpoint', error)
throw new Error('Error saving checkpoint')
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}

return {
Expand Down Expand Up @@ -240,7 +268,9 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
} catch (error) {
console.error(`Error deleting thread_id ${threadId}`, error)
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export class SqliteSaver extends BaseCheckpointSaver implements MemoryMethods {
config: SaverOptions
threadId: string
tableName = 'checkpoints'
reuseDbConnection: boolean

constructor(config: SaverOptions, serde?: SerializerProtocol<Checkpoint>) {
super(serde)
Expand All @@ -20,6 +21,21 @@ export class SqliteSaver extends BaseCheckpointSaver implements MemoryMethods {
}

private async getDataSource(): Promise<DataSource> {
if (process.env.REUSE_DB_CONNECTION_AGENT_MEMORY === 'true') {
const datasource = this.config.appDataSource
if (!datasource) {
throw new Error('No datasource provided')
}
if (datasource.options.type !== 'sqlite') {
throw new Error('Invalid datasource type')
}
if (!datasource.isInitialized) {
await datasource.initialize()
}
this.reuseDbConnection = true
return datasource
}

const { datasourceOptions } = this.config
const dataSource = new DataSource(datasourceOptions)
await dataSource.initialize()
Expand Down Expand Up @@ -85,7 +101,9 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
console.error(`Error retrieving ${this.tableName}`, error)
throw new Error(`Error retrieving ${this.tableName}`)
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
} else {
try {
Expand Down Expand Up @@ -120,7 +138,9 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
console.error(`Error retrieving ${this.tableName}`, error)
throw new Error(`Error retrieving ${this.tableName}`)
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
}
return undefined
Expand Down Expand Up @@ -170,7 +190,9 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
console.error(`Error listing ${this.tableName}`, error)
throw new Error(`Error listing ${this.tableName}`)
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
}

Expand All @@ -197,7 +219,9 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
console.error('Error saving checkpoint', error)
throw new Error('Error saving checkpoint')
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}

return {
Expand Down Expand Up @@ -225,7 +249,9 @@ CREATE TABLE IF NOT EXISTS ${this.tableName} (
} catch (error) {
console.error(`Error deleting thread_id ${threadId}`, error)
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class MySQLRecordManager_RecordManager implements INode {
throw new Error('Invalid JSON in the Additional Configuration: ' + exception)
}
}
const appDataSource = options.appDataSource as DataSource

const mysqlOptions = {
...additionalConfiguration,
Expand All @@ -148,7 +149,8 @@ class MySQLRecordManager_RecordManager implements INode {

const args = {
mysqlOptions,
tableName: tableName
tableName: tableName,
appDataSource
}

const recordManager = new MySQLRecordManager(namespace, args)
Expand All @@ -163,13 +165,15 @@ class MySQLRecordManager_RecordManager implements INode {
type MySQLRecordManagerOptions = {
mysqlOptions: any
tableName?: string
appDataSource: DataSource
}

class MySQLRecordManager implements RecordManagerInterface {
lc_namespace = ['langchain', 'recordmanagers', 'mysql']
config: MySQLRecordManagerOptions
tableName: string
namespace: string
reuseDbConnection: boolean

constructor(namespace: string, config: MySQLRecordManagerOptions) {
const { tableName } = config
Expand All @@ -179,6 +183,24 @@ class MySQLRecordManager implements RecordManagerInterface {
}

private async getDataSource(): Promise<DataSource> {
if (process.env.REUSE_DB_CONNECTION_RECORD_MANAGER === 'true') {
const datasource = this.config.appDataSource
if (!datasource) {
throw new Error('No datasource provided')
}
if (datasource.options.type !== 'mysql') {
throw new Error('Invalid datasource type')
}
if (datasource.options.port === 5432) {
throw new Error('Invalid port number')
}
if (!datasource.isInitialized) {
await datasource.initialize()
}
this.reuseDbConnection = true
return datasource
}

const { mysqlOptions } = this.config
if (!mysqlOptions) {
throw new Error('No datasource options provided')
Expand Down Expand Up @@ -242,7 +264,9 @@ class MySQLRecordManager implements RecordManagerInterface {
console.error('Error getting time in MySQLRecordManager:')
throw error
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
}

Expand Down Expand Up @@ -291,7 +315,9 @@ class MySQLRecordManager implements RecordManagerInterface {
console.error('Error updating in MySQLRecordManager:')
throw error
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
}

Expand Down Expand Up @@ -328,7 +354,9 @@ class MySQLRecordManager implements RecordManagerInterface {
console.error('Error checking existence of keys')
throw error
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
}

Expand Down Expand Up @@ -374,7 +402,9 @@ class MySQLRecordManager implements RecordManagerInterface {
console.error('MySQLRecordManager listKeys Error: ')
throw error
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
}

Expand All @@ -398,7 +428,9 @@ class MySQLRecordManager implements RecordManagerInterface {
console.error('Error deleting keys')
throw error
} finally {
await dataSource.destroy()
if (!this.reuseDbConnection) {
await dataSource.destroy()
}
}
}
}
Expand Down
Loading
Loading