Skip to content

Commit c40d256

Browse files
mmasiukevichzloyuser
authored andcommitted
Consumer tag assignment fix; timeout consistency (#7)
1 parent 65cdbd7 commit c40d256

File tree

3 files changed

+14
-9
lines changed

3 files changed

+14
-9
lines changed

src/Channel.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,9 @@ public function consume
224224
/** @var Protocol\BasicConsumeOkFrame $result */
225225
$result = yield $this->await(Protocol\BasicConsumeOkFrame::class);
226226

227-
$consumerTag = $result->consumerTag;
227+
if('' === $consumerTag) {
228+
$consumerTag = $result->consumerTag;
229+
}
228230
}
229231

230232
$this->startConsuming();

src/Client.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,10 @@ public function connect(): Promise
8383

8484
$this->connection = new Connection($this->config->uri());
8585

86+
$timeout = $this->config->timeout() * 1000;
87+
8688
yield $this->connection->open(
87-
$this->config->timeout(),
89+
$timeout,
8890
$this->config->tcpAttempts(),
8991
$this->config->tcpNoDelay()
9092
);

src/Connection.php

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public function send(Protocol\AbstractFrame $frame): Promise
8888
// payload already supplied
8989
} elseif ($frame instanceof MethodFrame || $frame instanceof ContentHeaderFrame) {
9090
$buffer = $frame->pack();
91-
91+
9292
$frame->size = $buffer->size();
9393
$frame->payload = $buffer;
9494
} elseif ($frame instanceof Protocol\ContentBodyFrame) {
@@ -156,10 +156,11 @@ public function cancel(int $channel): void
156156
public function open(int $timeout, int $maxAttempts, bool $noDelay): Promise
157157
{
158158
return call(function () use ($timeout, $maxAttempts, $noDelay) {
159-
$context = (new ClientConnectContext)
160-
->withConnectTimeout($timeout)
161-
->withMaxAttempts($maxAttempts)
162-
;
159+
$context = (new ClientConnectContext)->withMaxAttempts($maxAttempts);
160+
161+
if($timeout > 0) {
162+
$context = $context->withConnectTimeout($timeout);
163+
}
163164

164165
if ($noDelay) {
165166
$context->withTcpNoDelay();
@@ -182,7 +183,7 @@ public function open(int $timeout, int $maxAttempts, bool $noDelay): Promise
182183
}
183184
}
184185

185-
unset($this->socket);
186+
$this->socket = null;
186187
});
187188
});
188189
}
@@ -197,7 +198,7 @@ public function heartbeat(int $interval): void
197198
$milliseconds = $interval * 1000;
198199

199200
$this->heartbeat = Loop::repeat($milliseconds, function($watcher) use ($milliseconds) {
200-
if (!$this->socket) {
201+
if (null === $this->socket) {
201202
Loop::cancel($watcher);
202203

203204
return;

0 commit comments

Comments
 (0)