Skip to content

Commit

Permalink
Add realtime notification on attachment update
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmz committed Jan 14, 2025
1 parent f17eb2d commit a83133a
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 4 deletions.
1 change: 1 addition & 0 deletions app/models.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ export class Post {
isVisibleFor(viewer: Nullable<User>): Promise<boolean>;
getCommentsListeners(): Promise<UUID[]>;
getUserSpecificProps(user: User): Promise<PostUserState>;
linkAttachments(attachments: UUID[]): Promise<void>;
}

export class Timeline {
Expand Down
8 changes: 8 additions & 0 deletions app/models/attachment.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { sanitizeMediaMetadata, SANITIZE_NONE, SANITIZE_VERSION } from '../suppo
import { processMediaFile } from '../support/media-files/process';
import { currentConfig } from '../support/app-async-context';
import { createPrepareVideoJob } from '../jobs/attachment-prepare-video';
import { PubSub as pubSub } from '../models';

const mvAsync = util.promisify(mv);

Expand Down Expand Up @@ -207,6 +208,13 @@ export function addModel(dbAdapter) {
};
await dbAdapter.updateAttachment(this.id, toUpdate);
}

// Realtime events
await pubSub.attachmentUpdated(this.id);

if (this.postId) {
await pubSub.updatePost(this.postId);
}
}

/**
Expand Down
35 changes: 33 additions & 2 deletions app/pubsub-listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { serializeUsersByIds } from './serializers/v2/user';
import { serializeEvents } from './serializers/v2/event';
import { API_VERSION_ACTUAL, API_VERSION_MINIMAL } from './api-versions';
import { connect as redisConnection } from './setup/database';
import { serializeAttachment } from './serializers/v2/attachment';
/** @typedef {import('./support/types').UUID} UUID */

const sentryIsEnabled = 'sentryDsn' in config;
Expand Down Expand Up @@ -238,6 +239,8 @@ export default class PubsubListener {
[eventNames.GROUP_TIMES_UPDATED]: this.onGroupTimesUpdate,

[eventNames.EVENT_CREATED]: this.onEventCreated,

[eventNames.ATTACHMENT_UPDATE]: this.onAttachmentUpdate,
};

try {
Expand Down Expand Up @@ -617,6 +620,36 @@ export default class PubsubListener {
);
};

onAttachmentUpdate = async (attId) => {
const att = await dbAdapter.getAttachmentById(attId);

const payload = {
attachments: [serializeAttachment(att)],
};
await this.broadcastMessage(
[
`user:${att.userId}`, // for the user who owns the attachment
`attachment:${att.id}`, // for whomever listens specifically to this attachment
],
eventNames.ATTACHMENT_UPDATE,
payload,
{
emitter: async (socket, type, json) => {
const { userId } = socket;
const { realtimeChannels } = json;

if (userId !== att.userId && !realtimeChannels.includes(`attachment:${attId}`)) {
// Other users can only listen to `attachment:${userId}`
return;
}

const users = await serializeUsersByIds([att.userId], userId);
await socket.emit(type, { ...json, users });
},
},
);
};

onCommentLikeNew = async (data) => {
await this._sendCommentLikeMsg(data, eventNames.COMMENT_LIKE_ADDED);
};
Expand Down Expand Up @@ -766,8 +799,6 @@ export default class PubsubListener {
*/
_singleUserEmitter = (userId, emitter = defaultEmitter) =>
this._onlyUsersEmitter(List.from([userId]), emitter);
// (socket, type, json) =>
// socket.userId === userId && defaultEmitter(socket, type, json);

_withUserIdEmitter = (socket, type, json) =>
socket.userId && defaultEmitter(socket, type, { ...json, id: socket.userId });
Expand Down
4 changes: 4 additions & 0 deletions app/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,8 @@ export default class pubSub {
async newEvent(eventId: UUID) {
await this.publisher.eventCreated(JSON.stringify(eventId));
}

async attachmentUpdated(attachmentId: UUID) {
await this.publisher.attachmentUpdated(JSON.stringify(attachmentId));
}
}
7 changes: 7 additions & 0 deletions app/support/PubSubAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export const eventNames = {
GLOBAL_USER_UPDATED: 'global:user:update',
GROUP_TIMES_UPDATED: ':GROUP_TIMES_UPDATED',
EVENT_CREATED: 'event:new',
ATTACHMENT_UPDATE: 'attachment:update',
} as const;

export type EventName = (typeof eventNames)[keyof typeof eventNames];
Expand Down Expand Up @@ -116,6 +117,12 @@ export class PubSubAdapter {

///////////////////////////////////////////////////

attachmentUpdated(payload: string) {
return this.publish(eventNames.ATTACHMENT_UPDATE, payload);
}

///////////////////////////////////////////////////

private async publish(channel: EventName, payload: string) {
await this.publisher.publish(channel, payload);
}
Expand Down
108 changes: 106 additions & 2 deletions test/functional/attachments.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
/* eslint-env node, mocha */
/* global $pg_database */
/* global $pg_database, $database */
import fs from 'fs';
import path from 'path';

import unexpected from 'unexpected';
import unexpectedDate from 'unexpected-date';
import { Blob, fileFrom } from 'node-fetch';
import { beforeEach } from 'mocha';

import cleanDB from '../dbCleaner';
import { dbAdapter } from '../../app/models';
import { dbAdapter, PubSub } from '../../app/models';
import { initJobProcessing } from '../../app/jobs';
import { eventNames, PubSubAdapter } from '../../app/support/PubSubAdapter';
import { getSingleton } from '../../app/app';

import {
createTestUser,
updateUserAsync,
performJSONRequest,
authHeaders,
justCreatePost,
} from './functional_test_helper';
import Session from './realtime-session';

const expect = unexpected.clone().use(unexpectedDate);

Expand Down Expand Up @@ -457,4 +462,103 @@ describe('Attachments', () => {
});
});
});

describe('Realtime events for attachments processing', () => {
let jobManager;
before(async () => {
const pubsubAdapter = new PubSubAdapter($database);
PubSub.setPublisher(pubsubAdapter);
jobManager = await initJobProcessing();
});

/** @type {Session} */
let lunaSubscribedToHerChannel;
/** @type {Session} */
let lunaSubscribedToPost;
/** @type {Session} */
let lunaSubscribedToAttachment;
/** @type {Session} */
let anonSubscribedToPost;
/** @type {Session} */
let anonSubscribedToAttachment;

beforeEach(async () => {
const app = await getSingleton();
const port = process.env.PEPYATKA_SERVER_PORT || app.context.config.port;

lunaSubscribedToHerChannel = await Session.create(port, 'Luna subscribed to her channel');
await lunaSubscribedToHerChannel.sendAsync('auth', { authToken: luna.authToken });

lunaSubscribedToPost = await Session.create(port, 'Luna subscribed to post');
await lunaSubscribedToPost.sendAsync('auth', { authToken: luna.authToken });

lunaSubscribedToAttachment = await Session.create(port, 'Luna subscribed to attachment');
await lunaSubscribedToAttachment.sendAsync('auth', { authToken: luna.authToken });

anonSubscribedToPost = await Session.create(port, 'Anonymous subscribed to post');
anonSubscribedToAttachment = await Session.create(port, 'Anonymous subscribed to attachment');
});
afterEach(() =>
[
lunaSubscribedToHerChannel,
lunaSubscribedToPost,
lunaSubscribedToAttachment,
anonSubscribedToPost,
anonSubscribedToAttachment,
].forEach((s) => s.disconnect()),
);

it(`should send realtime events to the listener's channels`, async () => {
// Create an attachment
const filePath = path.join(__dirname, '../fixtures/media-files/polyphon.mp4');
const data = new FormData();
data.append('file', await fileFrom(filePath, 'image/gif'));
const resp = await performJSONRequest('POST', '/v1/attachments', data, authHeaders(luna));
const { id: attId } = resp.attachments;

const post = await justCreatePost(luna, `Luna post`);
await post.linkAttachments([attId]);

await Promise.all([
lunaSubscribedToHerChannel.sendAsync('subscribe', { user: [luna.user.id] }),
lunaSubscribedToPost.sendAsync('subscribe', { post: [post.id] }),
lunaSubscribedToAttachment.sendAsync('subscribe', { attachment: [attId] }),
anonSubscribedToPost.sendAsync('subscribe', { post: [post.id] }),
anonSubscribedToAttachment.sendAsync('subscribe', { attachment: [attId] }),
]);

// Run processing
[
lunaSubscribedToHerChannel,
lunaSubscribedToPost,
lunaSubscribedToAttachment,
anonSubscribedToPost,
anonSubscribedToAttachment,
].forEach((s) => (s.collected.length = 0));

await jobManager.fetchAndProcess();

const events = await Promise.all([
lunaSubscribedToHerChannel.haveCollected(eventNames.ATTACHMENT_UPDATE),
lunaSubscribedToPost.haveCollected(eventNames.POST_UPDATED),
lunaSubscribedToAttachment.haveCollected(eventNames.ATTACHMENT_UPDATE),
anonSubscribedToPost.haveCollected(eventNames.POST_UPDATED),
anonSubscribedToAttachment.haveCollected(eventNames.ATTACHMENT_UPDATE),
]);

expect(events, 'to satisfy', [
{ attachments: [{ id: attId, url: expect.it('to end with', '.mp4') }] },
{
posts: { id: post.id },
attachments: [{ id: attId, url: expect.it('to end with', '.mp4') }],
},
{ attachments: [{ id: attId, url: expect.it('to end with', '.mp4') }] },
{
posts: { id: post.id },
attachments: [{ id: attId, url: expect.it('to end with', '.mp4') }],
},
{ attachments: [{ id: attId, url: expect.it('to end with', '.mp4') }] },
]);
});
});
});
23 changes: 23 additions & 0 deletions test/functional/realtime-session.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export default class Session {
socket = null;
name = '';
listeners = new Set();
/** @type {{string: {event: string, data: unknown}[]} */
collected = [];

static create(port, name = '', extraOptions = {}) {
const options = {
Expand Down Expand Up @@ -53,6 +55,7 @@ export default class Session {
packet.data = ['*'].concat(args);
onevent.call(this, packet); // additional call to catch-all
};
this.listeners.add(({ event, data }) => this.collected.push({ event, data }));
socket.on('*', (event, data) => [...this.listeners].forEach((l) => l({ event, data })));
}

Expand Down Expand Up @@ -158,4 +161,24 @@ export default class Session {
const [result] = await Promise.all([listen, ...tasks.map((t) => t())]);
return result;
}

haveCollected(event) {
const found = this.collected.find(({ event: collectedEvent }) => collectedEvent === event);

if (found) {
return Promise.resolve(found.data);
}

return this.receive(event);
}

haveNotCollected(event) {
if (this.collected.some(({ event: collectedEvent }) => collectedEvent === event)) {
return Promise.reject(
new Error(`${this.name ? `${this.name}: ` : ''}Got unexpected '${event}' event`),
);
}

return this.notReceive(event);
}
}

0 comments on commit a83133a

Please sign in to comment.