1
1
using Jordnaer . Database ;
2
2
using Jordnaer . Extensions ;
3
3
using Jordnaer . Features . Chat ;
4
+ using Jordnaer . Features . Metrics ;
4
5
using Jordnaer . Shared ;
5
6
using MassTransit ;
6
7
using Microsoft . AspNetCore . SignalR ;
7
8
using Microsoft . EntityFrameworkCore ;
8
9
9
10
namespace Jordnaer . Consumers ;
10
11
11
- public class SendMessageConsumer : IConsumer < SendMessage >
12
+ public class SendMessageConsumer (
13
+ JordnaerDbContext context ,
14
+ ILogger < SendMessageConsumer > logger ,
15
+ IHubContext < ChatHub , IChatHub > chatHub )
16
+ : IConsumer < SendMessage >
12
17
{
13
- private readonly JordnaerDbContext _context ;
14
- private readonly ILogger < SendMessageConsumer > _logger ;
15
- private readonly IHubContext < ChatHub , IChatHub > _chatHub ;
16
-
17
- public SendMessageConsumer ( JordnaerDbContext context , ILogger < SendMessageConsumer > logger , IHubContext < ChatHub , IChatHub > chatHub )
18
- {
19
- _context = context ;
20
- _logger = logger ;
21
- _chatHub = chatHub ;
22
- }
23
-
24
18
public async Task Consume ( ConsumeContext < SendMessage > consumeContext )
25
19
{
26
- _logger . LogDebug ( "Consuming SendMessage message. ChatId: {ChatId}" , consumeContext . Message . ChatId ) ;
20
+ logger . LogDebug ( "Consuming SendMessage message. ChatId: {ChatId}" , consumeContext . Message . ChatId ) ;
27
21
28
22
var chatMessage = consumeContext . Message ;
29
23
30
- _context . ChatMessages . Add (
24
+ context . ChatMessages . Add (
31
25
new ChatMessage
32
26
{
33
27
ChatId = chatMessage . ChatId ,
@@ -38,14 +32,14 @@ public async Task Consume(ConsumeContext<SendMessage> consumeContext)
38
32
SentUtc = chatMessage . SentUtc
39
33
} ) ;
40
34
41
- var recipientIds = await _context . UserChats
35
+ var recipientIds = await context . UserChats
42
36
. Where ( userChat => userChat . ChatId == chatMessage . ChatId )
43
37
. Select ( userChat => userChat . UserProfileId )
44
38
. ToListAsync ( consumeContext . CancellationToken ) ;
45
39
46
40
foreach ( var recipientId in recipientIds . Where ( recipientId => recipientId != chatMessage . SenderId ) )
47
41
{
48
- _context . UnreadMessages . Add ( new UnreadMessage
42
+ context . UnreadMessages . Add ( new UnreadMessage
49
43
{
50
44
RecipientId = recipientId ,
51
45
ChatId = chatMessage . ChatId ,
@@ -55,20 +49,22 @@ public async Task Consume(ConsumeContext<SendMessage> consumeContext)
55
49
56
50
try
57
51
{
58
- await _context . SaveChangesAsync ( consumeContext . CancellationToken ) ;
52
+ await context . SaveChangesAsync ( consumeContext . CancellationToken ) ;
59
53
60
- await _chatHub . Clients . Users ( recipientIds ) . ReceiveChatMessage ( chatMessage ) ;
54
+ await chatHub . Clients . Users ( recipientIds ) . ReceiveChatMessage ( chatMessage ) ;
61
55
62
- await _context . Chats
56
+ await context . Chats
63
57
. Where ( chat => chat . Id == chatMessage . ChatId )
64
58
. ExecuteUpdateAsync ( call =>
65
59
call . SetProperty ( chat => chat . LastMessageSentUtc , DateTime . UtcNow ) ,
66
60
consumeContext . CancellationToken ) ;
67
61
}
68
62
catch ( Exception exception )
69
63
{
70
- _logger . LogException ( exception ) ;
64
+ logger . LogException ( exception ) ;
71
65
throw ;
72
66
}
67
+
68
+ JordnaerMetrics . ChatMessagesSentCounter . Add ( 1 ) ;
73
69
}
74
70
}
0 commit comments