Skip to content

Commit

Permalink
Adds transaction support
Browse files Browse the repository at this point in the history
  • Loading branch information
Lisio committed Apr 11, 2021
1 parent 24422d4 commit a81132c
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 5 deletions.
35 changes: 31 additions & 4 deletions src/Command.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use MongoDB\Driver\Exception\RuntimeException;
use MongoDB\Driver\ReadConcern;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Session as DriverSession;
use MongoDB\Driver\WriteConcern;
use MongoDB\Driver\WriteResult;
use Yii;
Expand Down Expand Up @@ -85,7 +86,10 @@ class Command extends BaseObject
* @var ReadConcern|string read concern to be used by this command
*/
private $_readConcern;

/**
* @var DriverSession session to be used by this command
*/
private $_session;

/**
* Returns read preference for this command.
Expand Down Expand Up @@ -167,6 +171,17 @@ public function setReadConcern($readConcern)
return $this;
}

/**
* Sets session for this command.
* @param DriverSession $session session
* @return $this self reference
*/
public function setSession($session)
{
$this->_session = $session;
return $this;
}

/**
* Executes this command.
* @return \MongoDB\Driver\Cursor result cursor.
Expand All @@ -183,7 +198,11 @@ public function execute()

$this->db->open();
$mongoCommand = new \MongoDB\Driver\Command($this->document);
$cursor = $this->db->manager->executeCommand($databaseName, $mongoCommand, $this->getReadPreference());
$options = array_filter([
'readPreference' => $this->getReadPreference(),
'session' => $this->_session,
]);
$cursor = $this->db->manager->executeCommand($databaseName, $mongoCommand, $options);
$cursor->setTypeMap($this->db->typeMap);

$this->endProfile($token, __METHOD__);
Expand Down Expand Up @@ -236,7 +255,11 @@ public function executeBatch($collectionName, $options = [])
}

$this->db->open();
$writeResult = $this->db->manager->executeBulkWrite($databaseName . '.' . $collectionName, $batch, $this->getWriteConcern());
$options = array_filter([
'writeConcern' => $this->getWriteConcern(),
'session' => $this->_session,
]);
$writeResult = $this->db->manager->executeBulkWrite($databaseName . '.' . $collectionName, $batch, $options);

$this->endProfile($token, __METHOD__);
} catch (RuntimeException $e) {
Expand Down Expand Up @@ -283,7 +306,11 @@ public function query($collectionName, $options = [])

$query = new \MongoDB\Driver\Query($this->document, $options);
$this->db->open();
$cursor = $this->db->manager->executeQuery($databaseName . '.' . $collectionName, $query, $this->getReadPreference());
$options = array_filter([
'readPreference' => $this->getReadPreference(),
'session' => $this->_session,
]);
$cursor = $this->db->manager->executeQuery($databaseName . '.' . $collectionName, $query, $options);
$cursor->setTypeMap($this->db->typeMap);

$this->endProfile($token, __METHOD__);
Expand Down
41 changes: 40 additions & 1 deletion src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ class Connection extends Component
*/
private $_fileStreamWrapperRegistered = false;

/**
* @var \MongoDB\Driver\Session session
*/
private $_session;

/**
* Sets default database name.
Expand Down Expand Up @@ -408,11 +412,17 @@ protected function initConnection()
*/
public function createCommand($document = [], $databaseName = null)
{
return new Command([
$command = new Command([
'db' => $this,
'databaseName' => $databaseName,
'document' => $document,
]);

if ($this->_session) {
$command->setSession($this->_session);
}

return $command;
}

/**
Expand All @@ -432,4 +442,33 @@ public function registerFileStreamWrapper($force = false)

return $this->fileStreamProtocol;
}

/**
* @see Session::startTransaction()
*/
public function startTransaction($options = [])
{
$this->_session = $this->manager->startSession();
$this->_session->startTransaction($options);
}

/**
* @see Session::abortTransaction()
*/
public function abortTransaction()
{
$this->_session->abortTransaction();
$this->_session->endSession();
$this->_session = null;
}

/**
* @see Session::commitTransaction()
*/
public function commitTransaction()
{
$this->_session->commitTransaction();
$this->_session->endSession();
$this->_session = null;
}
}

0 comments on commit a81132c

Please sign in to comment.