diff --git a/src/Command.php b/src/Command.php index e56b0b51c..ad03064fa 100644 --- a/src/Command.php +++ b/src/Command.php @@ -12,6 +12,7 @@ use MongoDB\Driver\Exception\RuntimeException; use MongoDB\Driver\ReadConcern; use MongoDB\Driver\ReadPreference; +use MongoDB\Driver\Session as DriverSession; use MongoDB\Driver\WriteConcern; use MongoDB\Driver\WriteResult; use Yii; @@ -85,7 +86,10 @@ class Command extends BaseObject * @var ReadConcern|string read concern to be used by this command */ private $_readConcern; - + /** + * @var DriverSession session to be used by this command + */ + private $_session; /** * Returns read preference for this command. @@ -167,6 +171,17 @@ public function setReadConcern($readConcern) return $this; } + /** + * Sets session for this command. + * @param DriverSession $session session + * @return $this self reference + */ + public function setSession($session) + { + $this->_session = $session; + return $this; + } + /** * Executes this command. * @return \MongoDB\Driver\Cursor result cursor. @@ -183,7 +198,11 @@ public function execute() $this->db->open(); $mongoCommand = new \MongoDB\Driver\Command($this->document); - $cursor = $this->db->manager->executeCommand($databaseName, $mongoCommand, $this->getReadPreference()); + $options = array_filter([ + 'readPreference' => $this->getReadPreference(), + 'session' => $this->_session, + ]); + $cursor = $this->db->manager->executeCommand($databaseName, $mongoCommand, $options); $cursor->setTypeMap($this->db->typeMap); $this->endProfile($token, __METHOD__); @@ -236,7 +255,11 @@ public function executeBatch($collectionName, $options = []) } $this->db->open(); - $writeResult = $this->db->manager->executeBulkWrite($databaseName . '.' . $collectionName, $batch, $this->getWriteConcern()); + $options = array_filter([ + 'writeConcern' => $this->getWriteConcern(), + 'session' => $this->_session, + ]); + $writeResult = $this->db->manager->executeBulkWrite($databaseName . '.' . $collectionName, $batch, $options); $this->endProfile($token, __METHOD__); } catch (RuntimeException $e) { @@ -283,7 +306,11 @@ public function query($collectionName, $options = []) $query = new \MongoDB\Driver\Query($this->document, $options); $this->db->open(); - $cursor = $this->db->manager->executeQuery($databaseName . '.' . $collectionName, $query, $this->getReadPreference()); + $options = array_filter([ + 'readPreference' => $this->getReadPreference(), + 'session' => $this->_session, + ]); + $cursor = $this->db->manager->executeQuery($databaseName . '.' . $collectionName, $query, $options); $cursor->setTypeMap($this->db->typeMap); $this->endProfile($token, __METHOD__); diff --git a/src/Connection.php b/src/Connection.php index 590f29d82..2f37e1c51 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -181,6 +181,10 @@ class Connection extends Component */ private $_fileStreamWrapperRegistered = false; + /** + * @var \MongoDB\Driver\Session session + */ + private $_session; /** * Sets default database name. @@ -408,11 +412,17 @@ protected function initConnection() */ public function createCommand($document = [], $databaseName = null) { - return new Command([ + $command = new Command([ 'db' => $this, 'databaseName' => $databaseName, 'document' => $document, ]); + + if ($this->_session) { + $command->setSession($this->_session); + } + + return $command; } /** @@ -432,4 +442,33 @@ public function registerFileStreamWrapper($force = false) return $this->fileStreamProtocol; } + + /** + * @see Session::startTransaction() + */ + public function startTransaction($options = []) + { + $this->_session = $this->manager->startSession(); + $this->_session->startTransaction($options); + } + + /** + * @see Session::abortTransaction() + */ + public function abortTransaction() + { + $this->_session->abortTransaction(); + $this->_session->endSession(); + $this->_session = null; + } + + /** + * @see Session::commitTransaction() + */ + public function commitTransaction() + { + $this->_session->commitTransaction(); + $this->_session->endSession(); + $this->_session = null; + } }