Skip to content

Commit

Permalink
feat: add queue for online users
Browse files Browse the repository at this point in the history
  • Loading branch information
davidrdsilva committed Jun 23, 2024
1 parent cf54872 commit 3bc4c1d
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 25 deletions.
2 changes: 2 additions & 0 deletions src/auth/auth.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export class AuthService {
const { password, ...userWithoutPassword } = user;
const payload = { sub: user.id };

await this.usersService.createOnlineUserJob(userWithoutPassword.username);

return {
accessToken: await this.jwtService.signAsync(payload),
user: userWithoutPassword,
Expand Down
22 changes: 22 additions & 0 deletions src/consumers/user-activity.consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Process, Processor } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Job } from 'bull';
import { UserActivity } from 'src/entities/user-activity.entity';
import { CreateUserActivityInput } from 'src/models/dtos/create-user-activity.input';
import { Repository } from 'typeorm';

@Injectable()
@Processor('user-activity')
export class UserActivityConsumer {
constructor(
@InjectRepository(UserActivity)
private readonly userActivityRepository: Repository<UserActivity>,
) {}

@Process('user-activity')
async processUserActivityJob(job: Job<CreateUserActivityInput>) {
const { data } = job;
await this.userActivityRepository.save(data);
}
}
14 changes: 9 additions & 5 deletions src/modules/user.module.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { UserActivityConsumer } from 'src/consumers/user-activity.consumer';
import { UserController } from 'src/controllers/user.controller';
import { UserActivity } from 'src/entities/user-activity.entity';
import { User } from 'src/entities/user.entity';
import { UserStatsView } from 'src/entities/views/user-stats.view';
import { StorageClientService } from 'src/providers/storage-client.service';
import { UserService } from '../providers/user.service';
import { UserResolver } from '../resolvers/user.resolver';
import { UserController } from 'src/controllers/user.controller';
import { StorageClientService } from 'src/providers/storage-client.service';
import { BullModule } from '@nestjs/bull';
import { UserActivity } from 'src/entities/user-activity.entity';

@Module({
imports: [
TypeOrmModule.forFeature([User, UserStatsView, UserActivity]),
BullModule.registerQueue({
name: 'user-activity',
}),
BullModule.registerQueue({
name: 'online-users',
}),
],
controllers: [UserController],
providers: [UserResolver, UserService, StorageClientService],
providers: [UserResolver, UserService, StorageClientService, UserActivityConsumer],
exports: [UserService, StorageClientService],
})
export class UserModule {}
10 changes: 5 additions & 5 deletions src/providers/post.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class PostService {
post.user = user;
post.files = savedFiles;

await this.userService.createJob({
await this.userService.createUserActivityJob({
user: user,
description: `${user.firstName} created a post.`,
});
Expand Down Expand Up @@ -103,7 +103,7 @@ export class PostService {

const likedPostId = (await this.postRepository.save(post)).id;

await this.userService.createJob({
await this.userService.createUserActivityJob({
user: user,
description: `${user.firstName} liked a post.`,
});
Expand All @@ -122,7 +122,7 @@ export class PostService {

const unlikedPostId = (await this.postRepository.save(post)).id;

await this.userService.createJob({
await this.userService.createUserActivityJob({
user: user,
description: `${user.firstName} unliked a post.`,
});
Expand Down Expand Up @@ -171,7 +171,7 @@ export class PostService {

const updatedPost = await this.postRepository.save(post);

await this.userService.createJob({
await this.userService.createUserActivityJob({
user: user,
description: `${user.firstName} updated their post.`,
});
Expand All @@ -198,7 +198,7 @@ export class PostService {
}

await this.postRepository.remove(post);
await this.userService.createJob({
await this.userService.createUserActivityJob({
user: user,
description: `${user.firstName} removed their post.`,
});
Expand Down
41 changes: 26 additions & 15 deletions src/providers/user.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InjectQueue, Process, Processor } from '@nestjs/bull';
import { InjectQueue } from '@nestjs/bull';
import {
BadRequestException,
ConflictException,
Expand All @@ -8,7 +8,7 @@ import {
} from '@nestjs/common';
import { InjectEntityManager, InjectRepository } from '@nestjs/typeorm';
import * as bcrypt from 'bcrypt';
import { Job, Queue } from 'bull';
import { Queue } from 'bull';
import { UserActivity } from 'src/entities/user-activity.entity';
import { User } from 'src/entities/user.entity';
import { UserStatsView } from 'src/entities/views/user-stats.view';
Expand All @@ -21,7 +21,6 @@ import { EntityManager, Repository } from 'typeorm';
import { StorageClientService } from './storage-client.service';

@Injectable()
@Processor('user-activity')
export class UserService {
constructor(
@InjectRepository(User)
Expand All @@ -34,19 +33,31 @@ export class UserService {
private entityManager: EntityManager,

@InjectQueue('user-activity')
private readonly queue: Queue,
private readonly userActivityQueue: Queue,

@InjectQueue('online-users')
private readonly onlineUsersQueue: Queue,

private storageClientService: StorageClientService,
) {}

@Process('user-activity')
async processJob(job: Job<CreateUserActivityInput>) {
const { data } = job;
await this.userActivityRepository.save(data);
async createOnlineUserJob(username: string) {
await this.onlineUsersQueue.add('online-users', username, {
attempts: 3,
backoff: 5000,
removeOnComplete: true,
});
}

async isCurrentlyOnline(username: string) {
const jobs = await this.onlineUsersQueue.getWaiting();
const isOnline = !!jobs.find((job) => job.data === username);

return isOnline;
}

async createJob(createUserActivityInput: CreateUserActivityInput) {
await this.queue.add('user-activity', createUserActivityInput, {
async createUserActivityJob(createUserActivityInput: CreateUserActivityInput) {
await this.userActivityQueue.add('user-activity', createUserActivityInput, {
attempts: 3,
backoff: 5000,
removeOnComplete: true,
Expand Down Expand Up @@ -92,7 +103,7 @@ export class UserService {

const { password, ...userWithoutPassword } = user;

await this.createJob({
await this.createUserActivityJob({
user: user,
description: `${user.firstName} changed their profile picture.`,
});
Expand Down Expand Up @@ -121,7 +132,7 @@ export class UserService {

const { password, ...userWithoutPassword } = user;

await this.createJob({
await this.createUserActivityJob({
user: user,
description: `${user.firstName} changed their cover image.`,
});
Expand All @@ -139,7 +150,7 @@ export class UserService {
bio: updateBioInput.bio,
});

await this.createJob({
await this.createUserActivityJob({
user: user,
description: `${user.firstName} changed their bio.`,
});
Expand All @@ -154,7 +165,7 @@ export class UserService {
isPrivate: isPrivate,
});

await this.createJob({
await this.createUserActivityJob({
user: user,
description: `${user.firstName} changed their visibility settings.`,
});
Expand All @@ -177,7 +188,7 @@ export class UserService {
});
}

await this.createJob({
await this.createUserActivityJob({
user: user,
description: `${user.firstName}'s role was updated.`,
});
Expand Down
5 changes: 5 additions & 0 deletions src/resolvers/user.resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,9 @@ export class UserResolver {
) {
return this.userService.unfollow(context.req, userToUnfollowId);
}

@Query(() => Boolean)
isCurrentlyOnline(@Args('username') username: string) {
return this.userService.isCurrentlyOnline(username);
}
}

0 comments on commit 3bc4c1d

Please sign in to comment.