Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ordered concurrency #31

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
10 changes: 10 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"typescript": "^5.5.4"
},
"dependencies": {
"@supercharge/promise-pool": "^3.2.0",
"@types/showdown": "^2.0.6",
"axios": "^1.7.2",
"dotenv": "^16.4.5",
Expand Down
38 changes: 32 additions & 6 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ dotenv.config()
import { AxiosError } from 'axios'
import lineByLine from 'n-readlines'
import { exit } from 'node:process'
import { PromisePool } from '@supercharge/promise-pool'
import 'reflect-metadata'
import { Entity, entities } from './Entities'
import { handleDirectChats } from './handlers/directChats'
import { handleRoomMemberships } from './handlers/handleRoomMemberships'
import { handle as handleMessage } from './handlers/messages'
import { handle as handleMessage, RcMessage } from './handlers/messages'
import { handlePinnedMessages } from './handlers/pinnedMessages'
import { handle as handleRoom } from './handlers/rooms'
import { handle as handleUser } from './handlers/users'
import { handle as handleRoom, RcRoom } from './handlers/rooms'
import { handle as handleUser, RcUser } from './handlers/users'
import log from './helpers/logger'
import { initStorage } from './helpers/storage'
import { whoami } from './helpers/synapse'
Expand All @@ -22,28 +23,53 @@ log.info('rocketchat2matrix starts.')
* @param entity The Entity with it's file name and type definitions
*/
async function loadRcExport(entity: Entity) {
const concurrency = parseInt(process.env.CONCURRENCY_LIMIT || '50')
const user_queue: RcUser[] = []
const room_queue: RcRoom[] = []
const messages_per_room: Map<string, RcMessage[]> = new Map()

const rl = new lineByLine(`./inputs/${entities[entity].filename}`)

let line: false | Buffer
while ((line = rl.next())) {
const item = JSON.parse(line.toString())
switch (entity) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I am thinking to move the switch (entity) outside and looping and handling the queue inside each case

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, threadmessages are handled just like any message, so the sequence is the same as from the RC export.
I converted the variables to camelCase. Thanks!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I am thinking to move the switch (entity) outside and looping and handling the queue inside each case

I've thought of it, but that would add as many loops as there are cases, not sure which of the two versions would be faster.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the function is called 3 times, each with one entity type, it would check 3 times and loop within it. We also don't need to check non-affected queues, then.

Copy link
Author

@flying-scorpio flying-scorpio Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 9fa8e03,
64935fd
and 7a7fdec :

I moved the switch outside of the loop. For users and rooms, the loop is handled by the PromisePool, so no need to use queues anymore. For messages, to maintain the sequence, I kept the two steps of populating the Map and then loading.
Because PromisePool loops over an iterator, I added a wrapper to turn lineByLine into an iterator (its .next() method made me think it was implemented as an iterator, but it isn't).

I feel the message handling can still be improved, maybe avoiding the first step of loading. To take care of #3 (comment), cut the messages in batches or something.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the message handling can still be improved, maybe avoiding the first step of loading. To take care of #3 (comment), cut the messages in batches or something.

I added batches for the message handling in bee1c2f.
Tested with:

./reset.sh && npm run compile && systemd-run --scope -p MemoryMax=75M -p MemorySwapMax=1K --user node dist/app.js 

case Entity.Users:
await handleUser(item)
user_queue.push(item)
break

case Entity.Rooms:
await handleRoom(item)
room_queue.push(item)
break

case Entity.Messages:
await handleMessage(item)
if (messages_per_room.has(item.rid)) {
messages_per_room.get(item.rid)?.push(item)
} else {
messages_per_room.set(item.rid, [item])
}
break

default:
throw new Error(`Unhandled Entity: ${entity}`)
}
}

await PromisePool.withConcurrency(concurrency)
.for(user_queue)
.process((item) => handleUser(item))

await PromisePool.withConcurrency(concurrency)
.for(room_queue)
.process((item) => handleRoom(item))

await PromisePool.withConcurrency(concurrency)
.for(messages_per_room.values())
.process(async (room) => {
for (const item of room) {
await handleMessage(item)
}
})
}

async function main() {
Expand Down