Skip to content

Commit

Permalink
BufferWithCount operator and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mbonneau committed Feb 7, 2016
1 parent b59dc45 commit eee0a2f
Show file tree
Hide file tree
Showing 3 changed files with 325 additions and 0 deletions.
16 changes: 16 additions & 0 deletions lib/Rx/Observable.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Rx\Observable\TimerObservable;
use Rx\Observer\CallbackObserver;
use Rx\Operator\AsObservableOperator;
use Rx\Operator\BufferWithCountOperator;
use Rx\Operator\CombineLatestOperator;
use Rx\Operator\ConcatOperator;
use Rx\Operator\CountOperator;
Expand Down Expand Up @@ -883,4 +884,19 @@ public function timeout($timeout, ObservableInterface $timeoutObservable = null,
return new TimeoutOperator($timeout, $timeoutObservable, $scheduler);
});
}

/**
* Projects each element of an observable sequence into zero or more buffers which are produced based on
* element count information.
*
* @param $count
* @param int $skip
* @return AnonymousObservable
*/
public function bufferWithCount($count, $skip = null)
{
return $this->lift(function () use ($count, $skip) {
return new BufferWithCountOperator($count, $skip);
});
}
}
83 changes: 83 additions & 0 deletions lib/Rx/Operator/BufferWithCountOperator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?php

namespace Rx\Operator;

use Rx\ObservableInterface;
use Rx\Observer\CallbackObserver;
use Rx\ObserverInterface;
use Rx\SchedulerInterface;

class BufferWithCountOperator implements OperatorInterface
{
/** @var int */
private $count;

/** @var */
private $skip;

/** @var int */
private $index = 0;

/**
* BufferOperator constructor.
* @param int $count
* @param int $skip
*/
public function __construct($count, $skip = null)
{
if (!is_int($count) || $count < 1) {
throw new \InvalidArgumentException("count must be an integer greater than or equal to 1");
}

if ($skip === null) {
$skip = $count;
}

if (!is_int($skip) || $skip < 1) {
throw new \InvalidArgumentException("skip must be an integer great than 0");
}

$this->count = $count;
$this->skip = $skip;
}

/**
* @param ObservableInterface $observable
* @param ObserverInterface $observer
* @param SchedulerInterface|null $scheduler
* @return mixed
*/
public function __invoke(
ObservableInterface $observable,
ObserverInterface $observer,
SchedulerInterface $scheduler = null
) {
$currentGroups = [];

return $observable->subscribe(new CallbackObserver(
function ($x) use (&$currentGroups, $observer) {
if ($this->index % $this->skip === 0) {
$currentGroups[] = [];
}
$this->index++;

foreach ($currentGroups as $key => &$group) {
$group[] = $x;
if (count($group) === $this->count) {
$observer->onNext($group);
unset($currentGroups[$key]);
}
}
},
function ($err) use ($observer) {
$observer->onError($err);
},
function () use (&$currentGroups, $observer) {
foreach ($currentGroups as &$group) {
$observer->onNext($group);
}
$observer->onCompleted();
}
), $scheduler);
}
}
226 changes: 226 additions & 0 deletions test/Rx/Functional/Operator/BufferWithCountTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
<?php

namespace Rx\Functional\Operator;

use Rx\Functional\FunctionalTestCase;

class BufferWithCountTest extends FunctionalTestCase
{
/**
* @test
*/
public function bufferWithCountpartialwindow()
{
$xs = $this->createHotObservable([
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onNext(230, 4),
onNext(240, 5),
onCompleted(250)
]);

$results = $this->scheduler->startWithCreate(function () use ($xs) {
return $xs->bufferWithCount(5);
});

$this->assertMessages([
onNext(250, [2, 3, 4, 5]),
onCompleted(250)
], $results->getMessages());
}

/**
* @test
*/
public function bufferWithCountfullwindows()
{
$xs = $this->createHotObservable([
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onNext(230, 4),
onNext(240, 5),
onCompleted(250)
]);

$results = $this->scheduler->startWithCreate(function () use ($xs) {
return $xs->bufferWithCount(2);
});

$this->assertMessages([
onNext(220, [2, 3]),
onNext(240, [4, 5]),
onCompleted(250)
], $results->getMessages());
}

/**
* @test
*/
public function bufferWithCountfullandpartialwindows()
{
$xs = $this->createHotObservable([
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onNext(230, 4),
onNext(240, 5),
onCompleted(250)
]);

$results = $this->scheduler->startWithCreate(function () use ($xs) {
return $xs->bufferWithCount(3);
});

$this->assertMessages([
onNext(230, [2, 3, 4]),
onNext(250, [5]),
onCompleted(250)
], $results->getMessages());
}

/**
* @test
*/
public function bufferWithCountError()
{
$error = new \Exception();

$xs = $this->createHotObservable([
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onNext(230, 4),
onNext(240, 5),
onError(250, $error)
]);

$results = $this->scheduler->startWithCreate(function () use ($xs) {
return $xs->bufferWithCount(5);
});

$this->assertMessages([
onError(250, $error)
], $results->getMessages());
}

/**
* @test
*/
public function bufferWithCountskipless()
{
$xs = $this->createHotObservable([
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onNext(230, 4),
onNext(240, 5),
onCompleted(250)
]);

$results = $this->scheduler->startWithCreate(function () use ($xs) {
return $xs->bufferWithCount(3, 1);
});

$this->assertMessages([
onNext(230, [2, 3, 4]),
onNext(240, [3, 4, 5]),
onNext(250, [4, 5]),
onNext(250, [5]),
onCompleted(250)
], $results->getMessages());
}

/**
* @test
*/
public function bufferWithCountskipmore()
{
$xs = $this->createHotObservable([
onNext(150, 1),
onNext(210, 2),
onNext(220, 3),
onNext(230, 4),
onNext(240, 5),
onCompleted(250)
]);

$results = $this->scheduler->startWithCreate(function () use ($xs) {
return $xs->bufferWithCount(2, 3);
});

$this->assertMessages([
onNext(220, [2, 3]),
onNext(250, [5]),
onCompleted(250)
], $results->getMessages());
}

/**
* @test
*/
public function bufferWithCountbasic()
{
$xs = $this->createHotObservable([
onNext(100, 1),
onNext(210, 2),
onNext(240, 3),
onNext(280, 4),
onNext(320, 5),
onNext(350, 6),
onNext(380, 7),
onNext(420, 8),
onNext(470, 9),
onCompleted(600)
]);

$results = $this->scheduler->startWithCreate(function () use ($xs) {
return $xs->bufferWithCount(3, 2);
});

$this->assertMessages([
onNext(280, [2, 3, 4]),
onNext(350, [4, 5, 6]),
onNext(420, [6, 7, 8]),
onNext(600, [8, 9]),
onCompleted(600)
], $results->getMessages());

$this->assertSubscriptions([
subscribe(200, 600)
], $xs->getSubscriptions());
}

/**
* @test
*/
public function bufferWithCountdisposed()
{
$xs = $this->createHotObservable([
onNext(100, 1),
onNext(210, 2),
onNext(240, 3),
onNext(280, 4),
onNext(320, 5),
onNext(350, 6),
onNext(380, 7),
onNext(420, 8),
onNext(470, 9),
onCompleted(600)
]);

$results = $this->scheduler->startWithDispose(function () use ($xs) {
return $xs->bufferWithCount(3, 2);
}, 370);

$this->assertMessages([
onNext(280, [2, 3, 4]),
onNext(350, [4, 5, 6])
], $results->getMessages());

$this->assertSubscriptions([
subscribe(200, 370)
], $xs->getSubscriptions());
}
}

0 comments on commit eee0a2f

Please sign in to comment.