Skip to content

Commit 301afb5

Browse files
authored
Add Completion on _waitForConfirmationActionBlock (#298)
* Add Completion on _waitForConfirmationActionBlock to wait _waitForConfirmationActionBlock.SendAsync in case it is not completed * Add an exception handler on the ConfirmHandler since the code could raise an exception user side. * Ref: #295 * Add the tls configurtation for the k8s cluster. --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent c8e3726 commit 301afb5

File tree

6 files changed

+83
-11
lines changed

6 files changed

+83
-11
lines changed

RabbitMQ.Stream.Client/RawProducer.cs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,24 @@ private async Task Init()
121121
{
122122
foreach (var id in publishingIds.Span)
123123
{
124-
_config.ConfirmHandler(new Confirmation
124+
try
125125
{
126-
PublishingId = id,
127-
Code = ResponseCode.Ok,
128-
Stream = _config.Stream
129-
});
126+
_config.ConfirmHandler(new Confirmation
127+
{
128+
PublishingId = id,
129+
Code = ResponseCode.Ok,
130+
Stream = _config.Stream
131+
});
132+
}
133+
catch (Exception e)
134+
{
135+
// The call is exposed to the user so we need to catch any exception
136+
// there could be an exception in the user code.
137+
// So here we log the exception and we continue.
138+
139+
_logger.LogError(e, "Error during confirm handler, publishing id: {Id}. " +
140+
"Hint: Check the user ConfirmHandler callback", id);
141+
}
130142
}
131143

132144
_semaphore.Release(publishingIds.Length);

RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ private async void OnTimedEvent(object sender, ElapsedEventArgs e)
124124

125125
foreach (var pair in timedOutMessages)
126126
{
127-
await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value.PublishingId, null).ConfigureAwait(false);
127+
await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value.PublishingId, null)
128+
.ConfigureAwait(false);
128129
}
129130
}
130131

@@ -145,8 +146,13 @@ internal void AddUnConfirmedMessage(ulong publishingId, List<Message> messages)
145146
});
146147
}
147148

148-
internal Task RemoveUnConfirmedMessage(ConfirmationStatus confirmationStatus, ulong publishingId, string stream)
149+
internal async Task RemoveUnConfirmedMessage(ConfirmationStatus confirmationStatus, ulong publishingId,
150+
string stream)
149151
{
150-
return _waitForConfirmationActionBlock.SendAsync((confirmationStatus, publishingId, stream));
152+
if (!await _waitForConfirmationActionBlock.SendAsync((confirmationStatus, publishingId, stream))
153+
.ConfigureAwait(false))
154+
{
155+
await _waitForConfirmationActionBlock.Completion.ConfigureAwait(false);
156+
}
151157
}
152158
}

RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private async Task<IProducer> SuperStreamProducer()
5555
_ => ConfirmationStatus.UndefinedError
5656
};
5757
_confirmationPipe.RemoveUnConfirmedMessage(confirmationStatus, confirmation.PublishingId,
58-
stream);
58+
stream).ConfigureAwait(false);
5959
}
6060
}, BaseLogger).ConfigureAwait(false);
6161
}
@@ -97,7 +97,7 @@ private async Task<IProducer> StandardProducer()
9797
_ => ConfirmationStatus.UndefinedError
9898
};
9999
_confirmationPipe.RemoveUnConfirmedMessage(confirmationStatus, confirmation.PublishingId,
100-
confirmation.Stream);
100+
confirmation.Stream).ConfigureAwait(false);
101101
}
102102
}, BaseLogger).ConfigureAwait(false);
103103
}

kubernetes/stream_cluster.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ metadata:
66
apiVersion: rabbitmq.com/v1beta1
77
kind: RabbitmqCluster
88
metadata:
9-
name: rabbitmq-stream
9+
name: tls
1010
namespace: stream-clients-test
1111
spec:
1212
replicas: 1
1313
image: rabbitmq:3-management
1414
service:
1515
type: LoadBalancer
16+
tls:
17+
secretName: tls-secret
1618
resources:
1719
requests:
1820
cpu: 500m

kubernetes/test-server-key.pem

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
-----BEGIN PRIVATE KEY-----
2+
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCzfGD7b37pIVtW
3+
O0lO27ZVJRdgFrEjI4F/9GUQvzb9zsYZBD4YbHIWISqrh1Y/4osQLrojKSkGZrX5
4+
6x/RegCNNsfIeqAM6gNQ1sVH5pxVp2essOitBoy8Udgf6kqH46+yrTX0Dk23cgSp
5+
7j0gCXL7KM15DBxDygjA96E1SHp44DgkwCvWLeYXFie5uVL1SUDx4drCGfnu2Fcg
6+
rvGfPBZpUIR9okmJWZIJcESkL2u4GaGQPGGbQOkCZOcqwRs80mwyYHsKSthKc58n
7+
lqJlZd76s/LNMbGkMdppyInYnqacx3yOPWVsbH6hWKPb1wQsNWnzCVFL3krvrqDI
8+
rCXOOKN1AgMBAAECggEAEXHelDmZPmv3TmmLPb6lmV92RujVtSpbiRX2J7tKCz+o
9+
aeCLb4DE0ulM1iicha+NYBiGj2nN+rkLaVvEty2yNYd0QgRHH5I0GcySFqOvoLSZ
10+
Y0O7jaukDJ6w0KNLNKt41Xc31f146MJMeT62UrGQayBjXidC7QTLNoQq9zyQ6MQK
11+
crbj8f/TqT68V5f0nQeYFRGPYZPiaLcDB6mOCL09B4bfMxcOl0/6SVouV8BrTHxs
12+
kqtO6yxrpc0XL1vfCrBqljlVLnXyGNmkaegSQMOTDfGqM6Mkc6771DYt5MJsmVCH
13+
VgDZeMs/BJA/srV2rXW4cwfO0OOSTkE9cCNzZM6Q6QKBgQDmLk+bLb6cafFceuGl
14+
gktNYi1TbTD62/shKulmFm8C7jNMTdbGi4pEwlnxzU57A+spr0nzody5c8qMIfUF
15+
Cpf9aRWs7xG0WOCuYbUSI+gbICxaJOKe00TyWwf/xn13p6J4wecg4n5C8Wo1QIRi
16+
nFwgaqYATkyzhE9+YOQKpxgeTQKBgQDHnlmMxiTuD3j9BxEkD3aY4E97qTfPNqgM
17+
umrwXoU0paYtgfuxMe7yjkE/qVKn3QP+wzN+XMR6YcBdphQPlSCcEyJrAZZCGrgJ
18+
CSO9anY6CA5bgZ9Mk3pBCldHEQqxWrg27bejkn4KV3PXmDtQwMYrpsgEu3DPCgy/
19+
TUVTnpK9yQKBgQC6PGQaWOOtKCapvZ6OTCJjJPkpU+JaRdwlVNPszl/ZTiLhLOWG
20+
VOZ1hY5Cjutdqqj9XB8IaUDuJ5qM0PiusIiS9xAbkH6RnYuEa/eWCslEET7xXICj
21+
IqrZMAAD2XQweMiCzdgUikzAGxXkqiOyqXH8pG1VOATlBjtPNFOtrs5bzQKBgD08
22+
1cn6609gzcQJy/ddCwwBHEEae3WFFe65rZ7J0GGDQ8SIMLd+Uwh0HY4zGplGkzgv
23+
l/d27AuDO2k/Tr4tCJD4ycE7/mWPHtAezqkIJPbOi+EEleL/By02x+mUT8xywTqQ
24+
mJqEkUgI5g/IssGmMeUoSAozmnrZYWm6gb8SUYAJAoGATjkZuRTAx+85ninZCbMU
25+
fKpxM5KhCOHOBzxEKjmHp/fRUga+XoZMeWtmcqABWa3pD3kgTa+9z1hrQaUCh1cw
26+
01QjqGxY9Z7VdBMPexXiht1JDZULaJ3Cif4YKUrwrDWc737HRqLP6c7EMwXnAod2
27+
RPe55pKjmk0X/uuple2WCAE=
28+
-----END PRIVATE KEY-----

kubernetes/test-server.pem

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
-----BEGIN CERTIFICATE-----
2+
MIID7zCCAtegAwIBAgIBATANBgkqhkiG9w0BAQsFADBMMTswOQYDVQQDDDJUTFNH
3+
ZW5TZWxmU2lnbmVkdFJvb3RDQSAyMDIzLTA5LTEzVDE2OjU0OjQ0LjU2NzY0MjEN
4+
MAsGA1UEBwwEJCQkJDAeFw0yMzA5MTMxNDU0NDRaFw0zMzA5MTAxNDU0NDRaMDYx
5+
IzAhBgNVBAMMGmdzYW50b21hZ2c2TFZETS52bXdhcmUuY29tMQ8wDQYDVQQKDAZz
6+
ZXJ2ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCzfGD7b37pIVtW
7+
O0lO27ZVJRdgFrEjI4F/9GUQvzb9zsYZBD4YbHIWISqrh1Y/4osQLrojKSkGZrX5
8+
6x/RegCNNsfIeqAM6gNQ1sVH5pxVp2essOitBoy8Udgf6kqH46+yrTX0Dk23cgSp
9+
7j0gCXL7KM15DBxDygjA96E1SHp44DgkwCvWLeYXFie5uVL1SUDx4drCGfnu2Fcg
10+
rvGfPBZpUIR9okmJWZIJcESkL2u4GaGQPGGbQOkCZOcqwRs80mwyYHsKSthKc58n
11+
lqJlZd76s/LNMbGkMdppyInYnqacx3yOPWVsbH6hWKPb1wQsNWnzCVFL3krvrqDI
12+
rCXOOKN1AgMBAAGjgfEwge4wCQYDVR0TBAIwADALBgNVHQ8EBAMCBaAwEwYDVR0l
13+
BAwwCgYIKwYBBQUHAwEwTAYDVR0RBEUwQ4IaZ3NhbnRvbWFnZzZMVkRNLnZtd2Fy
14+
ZS5jb22CGmdzYW50b21hZ2c2TFZETS52bXdhcmUuY29tgglsb2NhbGhvc3QwHQYD
15+
VR0OBBYEFDHW2nif3ILxi7DWd4TNyn63Q7apMB8GA1UdIwQYMBaAFKccKJsr3YYn
16+
MVNXfujEGRCONpu9MDEGA1UdHwQqMCgwJqAkoCKGIGh0dHA6Ly9jcmwtc2VydmVy
17+
OjgwMDAvYmFzaWMuY3JsMA0GCSqGSIb3DQEBCwUAA4IBAQCwa+ksiRPR06JZzKFd
18+
pcD4K5oZ6F5mVpTqn3Kf5jS1cz6Ippi/T8nU8k/xVKmDMqqCWCYGal1U8DmHGPzQ
19+
WOWMk/Ibb72feCS4txIH4GuV/ZO868/5qOy1rmP/UjOY6Kpyju/Eg13AdzcuSnZ3
20+
rZcSncm/gY5BHMmUJdMutTe+Scz32VW7yV8Mi+2ZwsMiqLksZMpBJqPyxroGTksI
21+
p7bklWf1pOgQqh9XJqu3x4rceH0o3xHZ/wana4RnSWL7Q4N6TNinAjLzlLvDByW7
22+
JX9ivpCVpM0n6tIT+E7UbWVX6WoICjCJeDLNwq/jYVEDP80O3yjDYKpYIOqp/e7Q
23+
/BJy
24+
-----END CERTIFICATE-----

0 commit comments

Comments
 (0)