From a83133aa26a314250696794ce2d23df0e3c0fa56 Mon Sep 17 00:00:00 2001 From: David Mzareulyan Date: Tue, 14 Jan 2025 14:06:28 +0300 Subject: [PATCH] Add realtime notification on attachment update --- app/models.d.ts | 1 + app/models/attachment.js | 8 +++ app/pubsub-listener.js | 35 ++++++++- app/pubsub.ts | 4 ++ app/support/PubSubAdapter.ts | 7 ++ test/functional/attachments.js | 108 +++++++++++++++++++++++++++- test/functional/realtime-session.js | 23 ++++++ 7 files changed, 182 insertions(+), 4 deletions(-) diff --git a/app/models.d.ts b/app/models.d.ts index e0501ae32..31d85b8b2 100644 --- a/app/models.d.ts +++ b/app/models.d.ts @@ -170,6 +170,7 @@ export class Post { isVisibleFor(viewer: Nullable): Promise; getCommentsListeners(): Promise; getUserSpecificProps(user: User): Promise; + linkAttachments(attachments: UUID[]): Promise; } export class Timeline { diff --git a/app/models/attachment.js b/app/models/attachment.js index 3f29e6187..387d90561 100644 --- a/app/models/attachment.js +++ b/app/models/attachment.js @@ -14,6 +14,7 @@ import { sanitizeMediaMetadata, SANITIZE_NONE, SANITIZE_VERSION } from '../suppo import { processMediaFile } from '../support/media-files/process'; import { currentConfig } from '../support/app-async-context'; import { createPrepareVideoJob } from '../jobs/attachment-prepare-video'; +import { PubSub as pubSub } from '../models'; const mvAsync = util.promisify(mv); @@ -207,6 +208,13 @@ export function addModel(dbAdapter) { }; await dbAdapter.updateAttachment(this.id, toUpdate); } + + // Realtime events + await pubSub.attachmentUpdated(this.id); + + if (this.postId) { + await pubSub.updatePost(this.postId); + } } /** diff --git a/app/pubsub-listener.js b/app/pubsub-listener.js index 31ae8ef0c..8044eaa1e 100644 --- a/app/pubsub-listener.js +++ b/app/pubsub-listener.js @@ -33,6 +33,7 @@ import { serializeUsersByIds } from './serializers/v2/user'; import { serializeEvents } from './serializers/v2/event'; import { API_VERSION_ACTUAL, API_VERSION_MINIMAL } from './api-versions'; import { connect as redisConnection } from './setup/database'; +import { serializeAttachment } from './serializers/v2/attachment'; /** @typedef {import('./support/types').UUID} UUID */ const sentryIsEnabled = 'sentryDsn' in config; @@ -238,6 +239,8 @@ export default class PubsubListener { [eventNames.GROUP_TIMES_UPDATED]: this.onGroupTimesUpdate, [eventNames.EVENT_CREATED]: this.onEventCreated, + + [eventNames.ATTACHMENT_UPDATE]: this.onAttachmentUpdate, }; try { @@ -617,6 +620,36 @@ export default class PubsubListener { ); }; + onAttachmentUpdate = async (attId) => { + const att = await dbAdapter.getAttachmentById(attId); + + const payload = { + attachments: [serializeAttachment(att)], + }; + await this.broadcastMessage( + [ + `user:${att.userId}`, // for the user who owns the attachment + `attachment:${att.id}`, // for whomever listens specifically to this attachment + ], + eventNames.ATTACHMENT_UPDATE, + payload, + { + emitter: async (socket, type, json) => { + const { userId } = socket; + const { realtimeChannels } = json; + + if (userId !== att.userId && !realtimeChannels.includes(`attachment:${attId}`)) { + // Other users can only listen to `attachment:${userId}` + return; + } + + const users = await serializeUsersByIds([att.userId], userId); + await socket.emit(type, { ...json, users }); + }, + }, + ); + }; + onCommentLikeNew = async (data) => { await this._sendCommentLikeMsg(data, eventNames.COMMENT_LIKE_ADDED); }; @@ -766,8 +799,6 @@ export default class PubsubListener { */ _singleUserEmitter = (userId, emitter = defaultEmitter) => this._onlyUsersEmitter(List.from([userId]), emitter); - // (socket, type, json) => - // socket.userId === userId && defaultEmitter(socket, type, json); _withUserIdEmitter = (socket, type, json) => socket.userId && defaultEmitter(socket, type, { ...json, id: socket.userId }); diff --git a/app/pubsub.ts b/app/pubsub.ts index 18b2f8ead..b2f56149e 100644 --- a/app/pubsub.ts +++ b/app/pubsub.ts @@ -133,4 +133,8 @@ export default class pubSub { async newEvent(eventId: UUID) { await this.publisher.eventCreated(JSON.stringify(eventId)); } + + async attachmentUpdated(attachmentId: UUID) { + await this.publisher.attachmentUpdated(JSON.stringify(attachmentId)); + } } diff --git a/app/support/PubSubAdapter.ts b/app/support/PubSubAdapter.ts index ce8d334ad..29f79a548 100644 --- a/app/support/PubSubAdapter.ts +++ b/app/support/PubSubAdapter.ts @@ -17,6 +17,7 @@ export const eventNames = { GLOBAL_USER_UPDATED: 'global:user:update', GROUP_TIMES_UPDATED: ':GROUP_TIMES_UPDATED', EVENT_CREATED: 'event:new', + ATTACHMENT_UPDATE: 'attachment:update', } as const; export type EventName = (typeof eventNames)[keyof typeof eventNames]; @@ -116,6 +117,12 @@ export class PubSubAdapter { /////////////////////////////////////////////////// + attachmentUpdated(payload: string) { + return this.publish(eventNames.ATTACHMENT_UPDATE, payload); + } + + /////////////////////////////////////////////////// + private async publish(channel: EventName, payload: string) { await this.publisher.publish(channel, payload); } diff --git a/test/functional/attachments.js b/test/functional/attachments.js index 03b0a40f0..1f85aa255 100644 --- a/test/functional/attachments.js +++ b/test/functional/attachments.js @@ -1,22 +1,27 @@ /* eslint-env node, mocha */ -/* global $pg_database */ +/* global $pg_database, $database */ import fs from 'fs'; import path from 'path'; import unexpected from 'unexpected'; import unexpectedDate from 'unexpected-date'; import { Blob, fileFrom } from 'node-fetch'; +import { beforeEach } from 'mocha'; import cleanDB from '../dbCleaner'; -import { dbAdapter } from '../../app/models'; +import { dbAdapter, PubSub } from '../../app/models'; import { initJobProcessing } from '../../app/jobs'; +import { eventNames, PubSubAdapter } from '../../app/support/PubSubAdapter'; +import { getSingleton } from '../../app/app'; import { createTestUser, updateUserAsync, performJSONRequest, authHeaders, + justCreatePost, } from './functional_test_helper'; +import Session from './realtime-session'; const expect = unexpected.clone().use(unexpectedDate); @@ -457,4 +462,103 @@ describe('Attachments', () => { }); }); }); + + describe('Realtime events for attachments processing', () => { + let jobManager; + before(async () => { + const pubsubAdapter = new PubSubAdapter($database); + PubSub.setPublisher(pubsubAdapter); + jobManager = await initJobProcessing(); + }); + + /** @type {Session} */ + let lunaSubscribedToHerChannel; + /** @type {Session} */ + let lunaSubscribedToPost; + /** @type {Session} */ + let lunaSubscribedToAttachment; + /** @type {Session} */ + let anonSubscribedToPost; + /** @type {Session} */ + let anonSubscribedToAttachment; + + beforeEach(async () => { + const app = await getSingleton(); + const port = process.env.PEPYATKA_SERVER_PORT || app.context.config.port; + + lunaSubscribedToHerChannel = await Session.create(port, 'Luna subscribed to her channel'); + await lunaSubscribedToHerChannel.sendAsync('auth', { authToken: luna.authToken }); + + lunaSubscribedToPost = await Session.create(port, 'Luna subscribed to post'); + await lunaSubscribedToPost.sendAsync('auth', { authToken: luna.authToken }); + + lunaSubscribedToAttachment = await Session.create(port, 'Luna subscribed to attachment'); + await lunaSubscribedToAttachment.sendAsync('auth', { authToken: luna.authToken }); + + anonSubscribedToPost = await Session.create(port, 'Anonymous subscribed to post'); + anonSubscribedToAttachment = await Session.create(port, 'Anonymous subscribed to attachment'); + }); + afterEach(() => + [ + lunaSubscribedToHerChannel, + lunaSubscribedToPost, + lunaSubscribedToAttachment, + anonSubscribedToPost, + anonSubscribedToAttachment, + ].forEach((s) => s.disconnect()), + ); + + it(`should send realtime events to the listener's channels`, async () => { + // Create an attachment + const filePath = path.join(__dirname, '../fixtures/media-files/polyphon.mp4'); + const data = new FormData(); + data.append('file', await fileFrom(filePath, 'image/gif')); + const resp = await performJSONRequest('POST', '/v1/attachments', data, authHeaders(luna)); + const { id: attId } = resp.attachments; + + const post = await justCreatePost(luna, `Luna post`); + await post.linkAttachments([attId]); + + await Promise.all([ + lunaSubscribedToHerChannel.sendAsync('subscribe', { user: [luna.user.id] }), + lunaSubscribedToPost.sendAsync('subscribe', { post: [post.id] }), + lunaSubscribedToAttachment.sendAsync('subscribe', { attachment: [attId] }), + anonSubscribedToPost.sendAsync('subscribe', { post: [post.id] }), + anonSubscribedToAttachment.sendAsync('subscribe', { attachment: [attId] }), + ]); + + // Run processing + [ + lunaSubscribedToHerChannel, + lunaSubscribedToPost, + lunaSubscribedToAttachment, + anonSubscribedToPost, + anonSubscribedToAttachment, + ].forEach((s) => (s.collected.length = 0)); + + await jobManager.fetchAndProcess(); + + const events = await Promise.all([ + lunaSubscribedToHerChannel.haveCollected(eventNames.ATTACHMENT_UPDATE), + lunaSubscribedToPost.haveCollected(eventNames.POST_UPDATED), + lunaSubscribedToAttachment.haveCollected(eventNames.ATTACHMENT_UPDATE), + anonSubscribedToPost.haveCollected(eventNames.POST_UPDATED), + anonSubscribedToAttachment.haveCollected(eventNames.ATTACHMENT_UPDATE), + ]); + + expect(events, 'to satisfy', [ + { attachments: [{ id: attId, url: expect.it('to end with', '.mp4') }] }, + { + posts: { id: post.id }, + attachments: [{ id: attId, url: expect.it('to end with', '.mp4') }], + }, + { attachments: [{ id: attId, url: expect.it('to end with', '.mp4') }] }, + { + posts: { id: post.id }, + attachments: [{ id: attId, url: expect.it('to end with', '.mp4') }], + }, + { attachments: [{ id: attId, url: expect.it('to end with', '.mp4') }] }, + ]); + }); + }); }); diff --git a/test/functional/realtime-session.js b/test/functional/realtime-session.js index e0a7c3ce5..d4c83f4fa 100644 --- a/test/functional/realtime-session.js +++ b/test/functional/realtime-session.js @@ -12,6 +12,8 @@ export default class Session { socket = null; name = ''; listeners = new Set(); + /** @type {{string: {event: string, data: unknown}[]} */ + collected = []; static create(port, name = '', extraOptions = {}) { const options = { @@ -53,6 +55,7 @@ export default class Session { packet.data = ['*'].concat(args); onevent.call(this, packet); // additional call to catch-all }; + this.listeners.add(({ event, data }) => this.collected.push({ event, data })); socket.on('*', (event, data) => [...this.listeners].forEach((l) => l({ event, data }))); } @@ -158,4 +161,24 @@ export default class Session { const [result] = await Promise.all([listen, ...tasks.map((t) => t())]); return result; } + + haveCollected(event) { + const found = this.collected.find(({ event: collectedEvent }) => collectedEvent === event); + + if (found) { + return Promise.resolve(found.data); + } + + return this.receive(event); + } + + haveNotCollected(event) { + if (this.collected.some(({ event: collectedEvent }) => collectedEvent === event)) { + return Promise.reject( + new Error(`${this.name ? `${this.name}: ` : ''}Got unexpected '${event}' event`), + ); + } + + return this.notReceive(event); + } }