Skip to content

Commit f2807c6

Browse files
Added shutdown functionality to connection handler and connection pool
1 parent 6a36063 commit f2807c6

File tree

2 files changed

+38
-6
lines changed

2 files changed

+38
-6
lines changed

src/Connection/ConnectionPoolInterface.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,10 @@ interface ConnectionPoolInterface
2020
* @param float $timeoutLoop The timeout value to use when waiting for activity on incoming connections
2121
*/
2222
public function operate(ConnectionHandlerFactoryInterface $connectionHandlerFactory, $timeoutLoop);
23+
24+
/**
25+
* Shutdown to connection pool cleanly. Usually triggered following a
26+
* SIGINT.
27+
*/
28+
public function shutdown();
2329
}

src/Connection/StreamSocketConnectionPool.php

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,8 @@ public function operate(ConnectionHandlerFactoryInterface $connectionHandlerFact
5858

5959
$read = array_merge(['pool' => $this->serverSocket], $this->clientSockets);
6060

61-
if (false === stream_select($read, $write, $except, $timeoutLoopSeconds, $timeoutLoopMicroseconds)) {
62-
// @codeCoverageIgnoreStart
61+
if (false === @stream_select($read, $write, $except, $timeoutLoopSeconds, $timeoutLoopMicroseconds)) {
6362
throw new \RuntimeException('stream_select failed');
64-
// @codeCoverageIgnoreEnd
6563
}
6664

6765
foreach (array_keys($read) as $id) {
@@ -72,7 +70,35 @@ public function operate(ConnectionHandlerFactoryInterface $connectionHandlerFact
7270
}
7371
}
7472

75-
$this->removeClosedConnections();
73+
$this->removeConnections();
74+
}
75+
76+
/**
77+
* {@inheritdoc}
78+
*/
79+
public function shutdown()
80+
{
81+
$this->acceptingNewConnections = false;
82+
83+
foreach ($this->connectionHandlers as $connectionHandler) {
84+
$connectionHandler->shutdown();
85+
}
86+
87+
$this->removeConnections();
88+
89+
while (count($this->connections) > 0) {
90+
$write = $except = [];
91+
92+
$read = $this->clientSockets;
93+
94+
stream_select($read, $write, $except, 1);
95+
96+
foreach (array_keys($read) as $id) {
97+
$this->connectionHandlers[$id]->ready();
98+
}
99+
100+
$this->removeConnections();
101+
}
76102
}
77103

78104
/**
@@ -97,9 +123,9 @@ protected function acceptConnection(ConnectionHandlerFactoryInterface $connectio
97123
}
98124

99125
/**
100-
* Remove closed connections.
126+
* Remove connections.
101127
*/
102-
protected function removeClosedConnections()
128+
protected function removeConnections()
103129
{
104130
foreach ($this->connections as $id => $connection) {
105131
if ($connection->isClosed()) {

0 commit comments

Comments
 (0)