Skip to content

Commit

Permalink
Merge pull request #1 from n-hor/1.0.1
Browse files Browse the repository at this point in the history
fix common callback behavior
  • Loading branch information
n-hor authored Oct 16, 2024
2 parents 6ab123f + 87aad3e commit 81d67d0
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 5 deletions.
6 changes: 2 additions & 4 deletions src/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ class Channel

public Generator $reader;

protected string $buffer = '';

public function __construct(
protected Socket $socket,
protected int $bufferSize,
Expand All @@ -34,7 +32,7 @@ public function __construct(

public function __destruct()
{
socket_close($this->socket);
$this->close();
}

public function read(): Generator
Expand Down Expand Up @@ -93,7 +91,7 @@ protected function write(string $data): void
}
}

protected function socketSelect($isRead): bool
protected function socketSelect(bool $isRead): bool
{
$write = ! $isRead ? [$this->socket] : null;
$read = $isRead ? [$this->socket] : null;
Expand Down
2 changes: 1 addition & 1 deletion src/ParallelTasks.php
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected function getExecutableTask(Closure $task): Closure
{
return isset($this->commonBeforeExecutionCallback) ? function () use ($task) {
($this->commonBeforeExecutionCallback)();
$task();
return $task();
} : $task;
}
}
52 changes: 52 additions & 0 deletions tests/PcntlParallelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,56 @@ function () {
$pool->wait();
$result = $pool->pullWorkersOutput(); //[6,7,8,9]
expect(30)->toEqual(array_sum($result));
$pool->destroy();
});

test('common execution callback')->expect(function () {

$parallelTasks = ParallelTasks::add([
function () {
global $value;
return 1 + $value;
},
function () {
global $value;
return 2 + $value;
},
function () {
global $value;
return 3 + $value;
},
])
->setCommonBeforeExecutionCallback(function () {
global $value;
$value = 1;
})
->run();

$res = $parallelTasks->waitOutput();

expect(9)->toEqual(array_sum($res));
});

test('workers pool common execution callback')->expect(function () {
$pool = PersistenceWorkersPool::create(5)
->setCommonBeforeExecutionCallback(function () {
global $value;
$value = 1;
})
->run(function (mixed $job) {
global $value;
return $job + $value;
});

$tasks = [1,2,3,4,5,6,7,8,9];
foreach ($tasks as $task) {
$pool->dispatch($task, waitAvailableWorkerTimeout: 2_000_000);
}

$result = $pool->pullWorkersOutput();
expect(20)->toEqual(array_sum($result));
$pool->wait();
$result = $pool->pullWorkersOutput();
expect(34)->toEqual(array_sum($result));
$pool->destroy();
});

0 comments on commit 81d67d0

Please sign in to comment.