Skip to content

Commit 1070e00

Browse files
committed
chore: rollback catching Throwable in callback proxies
1 parent 57d26fe commit 1070e00

9 files changed

+53
-93
lines changed

src/RdKafka/FFI/ConsumeCallbackProxy.php

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,9 @@ class ConsumeCallbackProxy extends CallbackProxy
1111
{
1212
public function __invoke(CData $nativeMessage, ?CData $opaque = null): void
1313
{
14-
try {
15-
($this->callback)(
16-
new Message($nativeMessage),
17-
OpaqueMap::get($opaque)
18-
);
19-
} catch (\Throwable $exception) {
20-
error_log($exception->getMessage(), E_ERROR);
21-
}
14+
($this->callback)(
15+
new Message($nativeMessage),
16+
OpaqueMap::get($opaque)
17+
);
2218
}
2319
}

src/RdKafka/FFI/DrMsgCallbackProxy.php

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,10 @@ class DrMsgCallbackProxy extends CallbackProxy
1212
{
1313
public function __invoke(CData $producer, CData $nativeMessage, ?CData $opaque = null): void
1414
{
15-
try {
16-
($this->callback)(
17-
RdKafka::resolveFromCData($producer),
18-
new Message($nativeMessage),
19-
OpaqueMap::get($opaque)
20-
);
21-
} catch (\Throwable $exception) {
22-
error_log($exception->getMessage(), E_ERROR);
23-
}
15+
($this->callback)(
16+
RdKafka::resolveFromCData($producer),
17+
new Message($nativeMessage),
18+
OpaqueMap::get($opaque)
19+
);
2420
}
2521
}

src/RdKafka/FFI/ErrorCallbackProxy.php

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,11 @@ class ErrorCallbackProxy extends CallbackProxy
1111
{
1212
public function __invoke(CData $consumerOrProducer, int $err, string $reason, ?CData $opaque = null): void
1313
{
14-
try {
15-
($this->callback)(
16-
RdKafka::resolveFromCData($consumerOrProducer),
17-
$err,
18-
$reason,
19-
OpaqueMap::get($opaque)
20-
);
21-
} catch (\Throwable $exception) {
22-
error_log($exception->getMessage(), E_ERROR);
23-
}
14+
($this->callback)(
15+
RdKafka::resolveFromCData($consumerOrProducer),
16+
$err,
17+
$reason,
18+
OpaqueMap::get($opaque)
19+
);
2420
}
2521
}

src/RdKafka/FFI/LogCallbackProxy.php

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,11 @@ class LogCallbackProxy extends CallbackProxy
1111
{
1212
public function __invoke(CData $rdkafka, int $level, string $facility, string $message): void
1313
{
14-
try {
15-
($this->callback)(
16-
RdKafka::resolveFromCData($rdkafka),
17-
$level,
18-
$facility,
19-
$message
20-
);
21-
} catch (\Throwable $exception) {
22-
error_log($exception->getMessage(), E_ERROR);
23-
}
14+
($this->callback)(
15+
RdKafka::resolveFromCData($rdkafka),
16+
$level,
17+
$facility,
18+
$message
19+
);
2420
}
2521
}

src/RdKafka/FFI/NativePartitionerCallbackProxy.php

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,14 @@ public function __invoke(
2525
?CData $topic_opaque = null,
2626
?CData $msg_opaque = null
2727
): int {
28-
try {
29-
return (int) Library::{$this->partitionerMethod}(
30-
$topic,
31-
$keydata,
32-
$keylen,
33-
$partition_cnt,
34-
OpaqueMap::get($topic_opaque),
35-
OpaqueMap::get($msg_opaque)
36-
);
37-
} catch (\Throwable $exception) {
38-
error_log($exception->getMessage(), E_ERROR);
39-
}
40-
41-
return RD_KAFKA_PARTITION_UA;
28+
return (int) Library::{$this->partitionerMethod}(
29+
$topic,
30+
$keydata,
31+
$keylen,
32+
$partition_cnt,
33+
OpaqueMap::get($topic_opaque),
34+
OpaqueMap::get($msg_opaque)
35+
);
4236
}
4337

4438
public static function create(string $partitionerMethod): Closure

src/RdKafka/FFI/OffsetCommitCallbackProxy.php

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,11 @@ class OffsetCommitCallbackProxy extends CallbackProxy
1212
{
1313
public function __invoke(CData $consumer, int $err, CData $nativeTopicPartitionList, ?CData $opaque = null): void
1414
{
15-
try {
16-
($this->callback)(
17-
RdKafka::resolveFromCData($consumer),
18-
$err,
19-
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
20-
OpaqueMap::get($opaque)
21-
);
22-
} catch (\Throwable $exception) {
23-
error_log($exception->getMessage(), E_ERROR);
24-
}
15+
($this->callback)(
16+
RdKafka::resolveFromCData($consumer),
17+
$err,
18+
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
19+
OpaqueMap::get($opaque)
20+
);
2521
}
2622
}

src/RdKafka/FFI/PartitionerCallbackProxy.php

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,11 @@ public function __invoke(
1717
?CData $topic_opaque = null,
1818
?CData $msg_opaque = null
1919
): int {
20-
try {
21-
return (int) ($this->callback)(
22-
$keydata === null ? null : FFI::string($keydata, $keylen),
23-
$partition_cnt,
24-
OpaqueMap::get($topic_opaque),
25-
OpaqueMap::get($msg_opaque)
26-
);
27-
} catch (\Throwable $exception) {
28-
error_log($exception->getMessage(), E_ERROR);
29-
}
30-
31-
return RD_KAFKA_PARTITION_UA;
20+
return (int) ($this->callback)(
21+
$keydata === null ? null : FFI::string($keydata, $keylen),
22+
$partition_cnt,
23+
OpaqueMap::get($topic_opaque),
24+
OpaqueMap::get($msg_opaque)
25+
);
3226
}
3327
}

src/RdKafka/FFI/RebalanceCallbackProxy.php

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,11 @@ class RebalanceCallbackProxy extends CallbackProxy
1212
{
1313
public function __invoke(CData $consumer, int $err, CData $nativeTopicPartitionList, ?CData $opaque = null): void
1414
{
15-
try {
16-
($this->callback)(
17-
RdKafka::resolveFromCData($consumer),
18-
$err,
19-
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
20-
OpaqueMap::get($opaque)
21-
);
22-
} catch (\Throwable $exception) {
23-
error_log($exception->getMessage(), E_ERROR);
24-
}
15+
($this->callback)(
16+
RdKafka::resolveFromCData($consumer),
17+
$err,
18+
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
19+
OpaqueMap::get($opaque)
20+
);
2521
}
2622
}

src/RdKafka/FFI/StatsCallbackProxy.php

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,12 @@ class StatsCallbackProxy extends CallbackProxy
1212
{
1313
public function __invoke(CData $consumerOrProducer, CData $json, int $json_len, ?CData $opaque = null): int
1414
{
15-
try {
16-
($this->callback)(
17-
RdKafka::resolveFromCData($consumerOrProducer),
18-
FFI::string($json, $json_len),
19-
$json_len,
20-
OpaqueMap::get($opaque)
21-
);
22-
} catch (\Throwable $exception) {
23-
error_log($exception->getMessage(), E_ERROR);
24-
}
15+
($this->callback)(
16+
RdKafka::resolveFromCData($consumerOrProducer),
17+
FFI::string($json, $json_len),
18+
$json_len,
19+
OpaqueMap::get($opaque)
20+
);
2521

2622
return 0;
2723
}

0 commit comments

Comments
 (0)