From 9a0ce45b5d3b0956d86a92e97bf2b21695a7f8f3 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Tue, 9 Jun 2026 13:06:34 +0200 Subject: [PATCH 1/3] RUBY-3213 Add Database#cursor_command to create a cursor from a command response Implements the runCursorCommand API from the run-command spec. The command is run unmodified and its response is parsed as a cursor; if the response has no cursor field an Error::InvalidCursorOperation is raised. The implicit session is reused across getMores and ended when the cursor is exhausted or closed. getMore commands can be configured with batch_size, max_time_ms and comment options. The command is never retried. Adds the runCursorCommand unified test fixtures and the runCursorCommand / createCommandCursor operations to the unified test runner. --- lib/mongo/cursor.rb | 6 + lib/mongo/database.rb | 112 +++++ lib/mongo/database/cursor_command_view.rb | 95 +++++ lib/mongo/operation.rb | 1 + lib/mongo/operation/cursor_command.rb | 33 ++ lib/mongo/operation/cursor_command/op_msg.rb | 37 ++ lib/mongo/operation/cursor_command/result.rb | 60 +++ spec/runners/unified/ddl_operations.rb | 5 +- spec/runners/unified/support_operations.rb | 33 ++ .../run_command_unified/runCursorCommand.yml | 391 ++++++++++++++++++ 10 files changed, 772 insertions(+), 1 deletion(-) create mode 100644 lib/mongo/database/cursor_command_view.rb create mode 100644 lib/mongo/operation/cursor_command.rb create mode 100644 lib/mongo/operation/cursor_command/op_msg.rb create mode 100644 lib/mongo/operation/cursor_command/result.rb create mode 100644 spec/spec_tests/data/run_command_unified/runCursorCommand.yml diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 04c350479f..7d86d8665e 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -463,6 +463,12 @@ def get_more_operation if view.respond_to?(:options) && view.options.is_a?(Hash) && !view.options[:comment].nil? spec[:comment] = view.options[:comment] end + # A cursor built from a runCursorCommand response carries a getMore-specific + # maxTimeMS that is sent on getMore commands. Regular find/aggregate views + # do not expose this, so their getMores are unaffected. + if view.respond_to?(:max_time_ms_for_get_more) && view.max_time_ms_for_get_more + spec[:max_time_ms] = view.max_time_ms_for_get_more + end Operation::GetMore.new(spec) end diff --git a/lib/mongo/database.rb b/lib/mongo/database.rb index cb125b48e3..e5d7e199c6 100644 --- a/lib/mongo/database.rb +++ b/lib/mongo/database.rb @@ -15,6 +15,7 @@ # limitations under the License. require 'mongo/database/view' +require 'mongo/database/cursor_command_view' module Mongo # Represents a database on the db server and operations that can execute on @@ -259,6 +260,102 @@ def command(operation, opts = {}) end end + # Run a command that returns a cursor and parse the response as a cursor. + # + # The command is sent to the server unmodified; the driver MUST NOT inspect + # or alter it. If the response does not contain a cursor field an error is + # raised. The command is never retried. + # + # Note: if a +maxTimeMS+ field is already set on the command document it is + # left as-is. The +max_time_ms+ option below applies only to getMore + # commands. Setting both +timeout_ms+ and +max_time_ms+ is not supported + # and has undefined behavior. + # + # @example Run a cursor-returning command. + # database.cursor_command(checkMetadataConsistency: 1) + # + # @param [ Hash ] command The command to execute. + # @param [ Hash ] options The command options. + # + # @option options [ Hash ] :read The read preference for this command, + # used for server selection and reused for subsequent getMores. + # @option options [ Session ] :session The session to use. If none is + # given an implicit session is created and reused for the cursor's + # lifetime. + # @option options [ Integer ] :timeout_ms The operation timeout in + # milliseconds. + # @option options [ Integer ] :batch_size The batchSize to send on getMore + # commands. + # @option options [ Integer ] :max_time_ms The maxTimeMS to send on getMore + # commands. + # @option options [ Object ] :comment A comment to attach to getMore + # commands. + # @option options [ Symbol ] :cursor_type The cursor type, :tailable or + # :tailable_await. Must match the flags set on the command document. + # @option options [ Symbol ] :timeout_mode :cursor_lifetime or :iteration. + # + # @return [ Mongo::Cursor ] A cursor over the command results. + # + # @raise [ Error::InvalidCursorOperation ] If the response does not contain + # a cursor. + def cursor_command(command, options = {}) + options = options.dup + execution_opts = options.delete(:execution_options) || {} + view_options = extract_cursor_command_view_options(options) + + txn_read_pref = (options[:session].txn_read_preference if options[:session] && options[:session].in_transaction?) + txn_read_pref ||= options[:read] || ServerSelector::PRIMARY + Lint.validate_underscore_read_preference(txn_read_pref) + selector = ServerSelector.get(txn_read_pref) + + # The session is intentionally not wrapped in #with_session: an implicit + # session must outlive this method and is ended by the cursor when it is + # exhausted or closed. Until the cursor takes ownership, the session and + # any load-balanced connection are cleaned up here on every exit path. + session = client.get_session(options) + context = Operation::Context.new( + client: client, + session: session, + operation_timeouts: operation_timeouts(options) + ) + op = Operation::CursorCommand.new( + selector: command, + db_name: name, + read: selector, + session: session + ) + + connection = nil + cursor = nil + begin + server = selector.select_server(cluster, nil, session) + result = if server.load_balancer? + # The connection is checked in by the cursor when it is drained. + connection = server.pool.check_out(context: context) + op.execute_with_connection(connection, context: context, options: execution_opts) + else + op.execute(server, context: context, options: execution_opts) + end + + unless result.cursor? + raise Error::InvalidCursorOperation, + 'The command response did not include a cursor. ' \ + 'Use Database#command for commands that do not return a cursor.' + end + + view = CursorCommandView.new(self, view_options) + cursor = Cursor.new(view, result, server, session: session, context: context) + ensure + # If the cursor was created it owns the session and connection; + # otherwise (error or no cursor in the response) release them here. + unless cursor + connection.connection_pool.check_in(connection) if connection && !connection.pinned? + session.end_session if session && session.implicit? + end + end + cursor + end + # Execute a read command on the database, retrying the read if necessary. # # @param [ Hash ] operation The command to execute. @@ -570,5 +667,20 @@ def operation_timeouts(opts) end end end + + private + + # Removes the getMore and cursor options from the options hash and returns + # them as a separate hash for the CursorCommandView. The remaining options + # (e.g. :session, :read, :timeout_ms) are left for command execution. + # + # @param [ Hash ] options The cursor_command options (mutated). + # + # @return [ Hash ] The view options. + def extract_cursor_command_view_options(options) + %i[ batch_size max_time_ms comment cursor_type timeout_mode ].each_with_object({}) do |key, view_options| + view_options[key] = options.delete(key) if options.key?(key) + end + end end end diff --git a/lib/mongo/database/cursor_command_view.rb b/lib/mongo/database/cursor_command_view.rb new file mode 100644 index 0000000000..3cbe2474a9 --- /dev/null +++ b/lib/mongo/database/cursor_command_view.rb @@ -0,0 +1,95 @@ +# frozen_string_literal: true + +# Copyright (C) 2025 MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + class Database + # The minimal view a Cursor needs when it is built from an arbitrary + # command response rather than from a collection query. + # + # It carries the getMore-specific options (batchSize, maxTimeMS, comment) + # and the cursor type and timeout mode, and answers the few methods the + # Cursor reads from its view. + # + # @api private + class CursorCommandView + # @param [ Mongo::Database ] database The database the command ran on. + # @param [ Hash ] options The getMore and timeout options. + # + # @option options [ Integer ] :batch_size The batchSize for getMores. + # @option options [ Integer ] :max_time_ms The maxTimeMS for getMores. + # @option options [ Object ] :comment The comment for getMores. + # @option options [ Symbol ] :cursor_type :tailable or :tailable_await. + # @option options [ Symbol ] :timeout_mode :cursor_lifetime or :iteration. + def initialize(database, options = {}) + @database = database + @options = options + end + + # @return [ Mongo::Database ] The database the command ran on. + attr_reader :database + + # @return [ Hash ] The view options. Used by the Cursor to read the + # getMore comment. + attr_reader :options + + # @return [ Mongo::Client ] The client. + def client + database.client + end + + # A placeholder collection used only so the Cursor can reach the client + # and database. The actual namespace for getMore and killCursors is taken + # from the command response, not from this collection. + # + # @return [ Mongo::Collection ] The $cmd pseudo collection. + def collection + @collection ||= Collection.new(database, '$cmd') + end + + # @return [ Integer | nil ] The batchSize sent on getMore commands. + def batch_size + options[:batch_size] + end + + # @return [ Integer | nil ] The maxTimeMS sent on getMore commands. + def max_time_ms_for_get_more + options[:max_time_ms] + end + + # @return [ Symbol | nil ] The cursor type. + def cursor_type + options[:cursor_type] + end + + # @return [ Symbol | nil ] The timeout mode. + def timeout_mode + options[:timeout_mode] + end + + # @return [ Hash ] timeout values for the operation context. + def operation_timeouts(opts = {}) + database.operation_timeouts(opts) + end + + private + + # Cursors do not support a limit when built from a command response. + def limit + nil + end + end + end +end diff --git a/lib/mongo/operation.rb b/lib/mongo/operation.rb index 24db192397..7570bd7ae9 100644 --- a/lib/mongo/operation.rb +++ b/lib/mongo/operation.rb @@ -38,6 +38,7 @@ require 'mongo/operation/drop_database' require 'mongo/operation/get_more' require 'mongo/operation/find' +require 'mongo/operation/cursor_command' require 'mongo/operation/explain' require 'mongo/operation/kill_cursors' require 'mongo/operation/indexes' diff --git a/lib/mongo/operation/cursor_command.rb b/lib/mongo/operation/cursor_command.rb new file mode 100644 index 0000000000..70e899d3e7 --- /dev/null +++ b/lib/mongo/operation/cursor_command.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +# Copyright (C) 2025 MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'mongo/operation/cursor_command/op_msg' +require 'mongo/operation/cursor_command/result' + +module Mongo + module Operation + # A command operation whose response is parsed as a cursor. + # + # Unlike Command, the result exposes the firstBatch, namespace, and cursor + # id from the command response so that a Cursor can be built from it. + # + # @api private + class CursorCommand + include Specifiable + include OpMsgExecutable + end + end +end diff --git a/lib/mongo/operation/cursor_command/op_msg.rb b/lib/mongo/operation/cursor_command/op_msg.rb new file mode 100644 index 0000000000..b4fb736138 --- /dev/null +++ b/lib/mongo/operation/cursor_command/op_msg.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +# Copyright (C) 2025 MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module Operation + class CursorCommand + # A cursor command operation sent as an op message. + # + # @api private + class OpMsg < OpMsgBase + include PolymorphicResult + + private + + # The user's command is sent verbatim. The driver MUST NOT inspect or + # modify it; $db, lsid and other internal fields are attached by the + # shared command building code. + def selector(_connection) + spec[:selector].dup + end + end + end + end +end diff --git a/lib/mongo/operation/cursor_command/result.rb b/lib/mongo/operation/cursor_command/result.rb new file mode 100644 index 0000000000..0be274786e --- /dev/null +++ b/lib/mongo/operation/cursor_command/result.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +# Copyright (C) 2025 MongoDB Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + module Operation + class CursorCommand + # Parses the cursor field of a command response so a Cursor can be built + # from the result. The parsing is identical to a find command result. + # + # @api private + class Result < Operation::Result + # @return [ true | false ] Whether the command response contained a + # cursor field. + def cursor? + !cursor_document.nil? + end + + # @return [ Integer | nil ] The cursor id from the cursor document. + def cursor_id + cursor? ? cursor_document[CURSOR_ID] : super + end + + # @return [ Array ] The first batch of documents. + def documents + cursor? ? cursor_document[FIRST_BATCH] : [] + end + + # @return [ String | nil ] The cursor namespace, "database.collection". + def namespace + cursor? ? cursor_document['ns'] : super + end + + private + + def cursor_document + return @cursor_document if defined?(@cursor_document) + + @cursor_document = first_document[CURSOR] + end + + def first_document + @first_document ||= reply.documents[0] + end + end + end + end +end diff --git a/spec/runners/unified/ddl_operations.rb b/spec/runners/unified/ddl_operations.rb index 5d49c8aace..bc95e4ca83 100644 --- a/spec/runners/unified/ddl_operations.rb +++ b/spec/runners/unified/ddl_operations.rb @@ -26,6 +26,7 @@ def list_dbs(op, name_only: false) def create_collection(op) database = entities.get(:database, op.use!('object')) + save_entity = op.use('saveResultAsEntity') use_arguments(op) do |args| opts = {} if session = args.use('session') @@ -59,7 +60,9 @@ def create_collection(op) if max = args.use('max') collection_opts[:max] = max end - database[args.use!('collection'), collection_opts].create(**opts) + collection = database[args.use!('collection'), collection_opts] + collection.create(**opts) + entities.set(:collection, save_entity, collection) if save_entity end end diff --git a/spec/runners/unified/support_operations.rb b/spec/runners/unified/support_operations.rb index 9a9e901593..1b183b3998 100644 --- a/spec/runners/unified/support_operations.rb +++ b/spec/runners/unified/support_operations.rb @@ -25,6 +25,39 @@ def run_command(op) end end + def run_cursor_command(op) + build_command_cursor(op).to_a + end + + def create_command_cursor(op) + cursor = build_command_cursor(op) + if name = op.use('saveResultAsEntity') + entities.set(:cursor, name, cursor) + end + cursor + end + + def build_command_cursor(op) + database = entities.get(:database, op.use!('object')) + + use_arguments(op) do |args| + args.use!('commandName') + cmd = args.use!('command') + + session = args.use('session') + read_preference = args.use('readPreference') + + opts = extract_options(args, 'batchSize', 'maxTimeMS', 'comment', + 'timeoutMS', 'cursorType', 'timeoutMode') + symbolize_options!(opts, :cursor_type, :timeout_mode) + + opts[:session] = entities.get(:session, session) if session + opts[:read] = ::Utils.snakeize_hash(read_preference) if read_preference + + database.cursor_command(cmd, **opts) + end + end + def fail_point(op) consume_test_runner(op) use_arguments(op) do |args| diff --git a/spec/spec_tests/data/run_command_unified/runCursorCommand.yml b/spec/spec_tests/data/run_command_unified/runCursorCommand.yml new file mode 100644 index 0000000000..1f9bf532c3 --- /dev/null +++ b/spec/spec_tests/data/run_command_unified/runCursorCommand.yml @@ -0,0 +1,391 @@ +description: runCursorCommand + +schemaVersion: '1.9' + +createEntities: + - client: + id: &client client + useMultipleMongoses: false + observeEvents: [commandStartedEvent, connectionReadyEvent, connectionCheckedOutEvent, connectionCheckedInEvent] + - session: + id: &session session + client: *client + - database: + id: &db db + client: *client + databaseName: *db + - collection: + id: &collection collection + database: *db + collectionName: *collection + +initialData: + - collectionName: collection + databaseName: *db + documents: &documents + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + - { _id: 3, x: 33 } + - { _id: 4, x: 44 } + - { _id: 5, x: 55 } + +tests: + # This is what this API was invented to do. + - description: successfully executes checkMetadataConsistency cursor creating command + runOnRequirements: + - minServerVersion: '7.0' + topologies: [sharded] + operations: + - name: runCursorCommand + object: *db + arguments: + commandName: checkMetadataConsistency + command: { checkMetadataConsistency: 1 } + expectEvents: + - client: *client + eventType: command + events: + - commandStartedEvent: + command: + checkMetadataConsistency: 1 + $db: *db + lsid: { $$exists: true } + commandName: checkMetadataConsistency + + - description: errors if the command response is not a cursor + operations: + - name: createCommandCursor + object: *db + arguments: + commandName: ping + command: { ping: 1 } + expectError: + isClientError: true + + + # Driver Sessions + - description: creates an implicit session that is reused across getMores + operations: + - name: runCursorCommand + object: *db + arguments: + commandName: find + command: { find: *collection, batchSize: 2 } + expectResult: *documents + - name: assertSameLsidOnLastTwoCommands + object: testRunner + arguments: + client: *client + expectEvents: + - client: *client + eventType: command + events: + - commandStartedEvent: + command: + find: *collection + batchSize: 2 + $db: *db + lsid: { $$exists: true } + commandName: find + - commandStartedEvent: + command: + getMore: { $$type: [int, long] } + collection: *collection + $db: *db + lsid: { $$exists: true } + commandName: getMore + + - description: accepts an explicit session that is reused across getMores + operations: + - name: runCursorCommand + object: *db + arguments: + commandName: find + session: *session + command: { find: *collection, batchSize: 2 } + expectResult: *documents + - name: assertSameLsidOnLastTwoCommands + object: testRunner + arguments: + client: *client + expectEvents: + - client: *client + eventType: command + events: + - commandStartedEvent: + command: + find: *collection + batchSize: 2 + $db: *db + lsid: { $$sessionLsid: *session } + commandName: find + - commandStartedEvent: + command: + getMore: { $$type: [int, long] } + collection: *collection + $db: *db + lsid: { $$sessionLsid: *session } + commandName: getMore + + # Load Balancers + - description: returns pinned connections to the pool when the cursor is exhausted + runOnRequirements: + - topologies: [ load-balanced ] + operations: + - name: createCommandCursor + object: *db + arguments: + commandName: find + batchSize: 2 + session: *session + command: { find: *collection, batchSize: 2 } + saveResultAsEntity: &cursor cursor + - name: assertNumberConnectionsCheckedOut + object: testRunner + arguments: + client: *client + connections: 1 + - name: iterateUntilDocumentOrError + object: *cursor + expectResult: { _id: 1, x: 11 } + - name: iterateUntilDocumentOrError + object: *cursor + expectResult: { _id: 2, x: 22 } + - name: iterateUntilDocumentOrError + object: *cursor + expectResult: { _id: 3, x: 33 } + - name: iterateUntilDocumentOrError + object: *cursor + expectResult: { _id: 4, x: 44 } + - name: iterateUntilDocumentOrError + object: *cursor + expectResult: { _id: 5, x: 55 } + - name: assertNumberConnectionsCheckedOut + object: testRunner + arguments: + client: *client + connections: 0 + expectEvents: + - client: *client + eventType: command + events: + - commandStartedEvent: + command: + find: *collection + batchSize: 2 + $db: *db + lsid: { $$sessionLsid: *session } + commandName: find # 2 documents + - commandStartedEvent: + command: + getMore: { $$type: [int, long] } + collection: *collection + $db: *db + lsid: { $$sessionLsid: *session } + commandName: getMore # 2 documents + - commandStartedEvent: + command: + getMore: { $$type: [int, long] } + collection: *collection + $db: *db + lsid: { $$sessionLsid: *session } + commandName: getMore # 1 document + # Total documents: 5 + - client: *client + eventType: cmap + events: + - connectionReadyEvent: {} + - connectionCheckedOutEvent: {} + - connectionCheckedInEvent: {} + + - description: returns pinned connections to the pool when the cursor is closed + runOnRequirements: + - topologies: [ load-balanced ] + operations: + - name: createCommandCursor + object: *db + arguments: + commandName: find + command: { find: *collection, batchSize: 2 } + saveResultAsEntity: *cursor + - name: assertNumberConnectionsCheckedOut + object: testRunner + arguments: + client: *client + connections: 1 + - name: close + object: *cursor + - name: assertNumberConnectionsCheckedOut + object: testRunner + arguments: + client: *client + connections: 0 + + # Iterating the Cursor / Executing GetMores + - description: supports configuring getMore batchSize + operations: + - name: runCursorCommand + object: *db + arguments: + commandName: find + batchSize: 5 + command: { find: *collection, batchSize: 1 } + expectResult: *documents + expectEvents: + - client: *client + eventType: command + events: + - commandStartedEvent: + command: + find: *collection + batchSize: 1 + $db: *db + lsid: { $$exists: true } + commandName: find + - commandStartedEvent: + command: + getMore: { $$type: [int, long] } + collection: *collection + batchSize: 5 + $db: *db + lsid: { $$exists: true } + commandName: getMore + + - description: supports configuring getMore maxTimeMS + operations: + - name: runCursorCommand + object: *db + arguments: + commandName: find + maxTimeMS: 300 + command: { find: *collection, maxTimeMS: 200, batchSize: 1 } + ignoreResultAndError: true + expectEvents: + - client: *client + eventType: command + # The getMore should receive an error here because we do not have the right kind of cursor + # So drivers should run a killCursors, but neither the error nor the killCursors command is relevant to this test + ignoreExtraEvents: true + events: + - commandStartedEvent: + command: + find: *collection + maxTimeMS: 200 + batchSize: 1 + $db: *db + lsid: { $$exists: true } + commandName: find + - commandStartedEvent: + command: + getMore: { $$type: [int, long] } + collection: *collection + $db: *db + maxTimeMS: 300 + lsid: { $$exists: true } + commandName: getMore + + - description: supports configuring getMore comment + runOnRequirements: + - minServerVersion: '4.4' + operations: + - name: runCursorCommand + object: *db + arguments: + commandName: find + comment: { hello: 'getMore' } + command: { find: *collection, batchSize: 1, comment: { hello: 'find' } } + expectResult: *documents + expectEvents: + - client: *client + eventType: command + events: + - commandStartedEvent: + command: + find: *collection + batchSize: 1 + comment: { hello: 'find' } + $db: *db + lsid: { $$exists: true } + commandName: find + - commandStartedEvent: + command: + getMore: { $$type: [int, long] } + collection: *collection + comment: { hello: 'getMore' } + $db: *db + lsid: { $$exists: true } + commandName: getMore + + # Tailable cursor + - description: does not close the cursor when receiving an empty batch + runOnRequirements: + - serverless: forbid + operations: + - name: dropCollection + object: *db + arguments: + collection: &cappedCollection cappedCollection + - name: createCollection + object: *db + arguments: + collection: *cappedCollection + capped: true + size: 4096 + max: 3 + saveResultAsEntity: *cappedCollection + - name: insertMany + object: *cappedCollection + arguments: + documents: + - { _id: 1, x: 11 } + - { _id: 2, x: 22 } + - name: createCommandCursor + object: *db + arguments: + cursorType: tailable + commandName: find + batchSize: 2 + command: { find: *cappedCollection, tailable: true } + saveResultAsEntity: &cursor cursor + - name: iterateOnce + object: *cursor + - name: iterateOnce + object: *cursor + - name: iterateOnce + object: *cursor + - name: close + object: *cursor + expectEvents: + - client: *client + eventType: command + events: + - commandStartedEvent: + command: + drop: *cappedCollection + commandName: drop + - commandStartedEvent: + command: + create: *cappedCollection + commandName: create + - commandStartedEvent: + command: + insert: *cappedCollection + commandName: insert + - commandStartedEvent: + command: + find: *cappedCollection + $db: *db + lsid: { $$exists: true } + commandName: find + - commandStartedEvent: + command: + getMore: { $$type: [int, long] } + collection: *cappedCollection + $db: *db + lsid: { $$exists: true } + commandName: getMore + - commandStartedEvent: + command: + killCursors: *cappedCollection + cursors: { $$type: array } + commandName: killCursors From cbfc37f7dadc8da9c17e433f780dae4a5fd81e95 Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Thu, 11 Jun 2026 13:46:18 +0200 Subject: [PATCH 2/3] RUBY-3213 Retry cursor_command on overload errors and reuse pinned connections Wrap the initial cursor command in with_overload_retry, matching Database#command and Cursor#get_more. Retry requires both retryable reads and writes, per the client-backpressure spec. In load balanced mode a failed attempt checks its connection back in before retrying. Also reuse the session's pinned connection in load balanced mode when running inside a transaction, mirroring Collection::View::Iterable. --- lib/mongo/database.rb | 48 +++++++++-- .../cursor_command_overload_spec.rb | 82 +++++++++++++++++++ spec/mongo/database_spec.rb | 46 +++++++++++ 3 files changed, 168 insertions(+), 8 deletions(-) create mode 100644 spec/integration/cursor_command_overload_spec.rb diff --git a/lib/mongo/database.rb b/lib/mongo/database.rb index e5d7e199c6..b22dc5daeb 100644 --- a/lib/mongo/database.rb +++ b/lib/mongo/database.rb @@ -325,17 +325,34 @@ def cursor_command(command, options = {}) session: session ) + # Per the client-backpressure spec, retrying a generic command on + # overload errors requires both retryable reads and writes to be + # enabled, same as Database#command. + retry_enabled = client.options[:retry_reads] != false && + client.options[:retry_writes] != false + + server = nil connection = nil cursor = nil begin - server = selector.select_server(cluster, nil, session) - result = if server.load_balancer? - # The connection is checked in by the cursor when it is drained. - connection = server.pool.check_out(context: context) - op.execute_with_connection(connection, context: context, options: execution_opts) - else - op.execute(server, context: context, options: execution_opts) - end + result = with_overload_retry(context: context, retry_enabled: retry_enabled) do + server = selector.select_server(cluster, nil, session) + if server.load_balancer? + # The connection is checked in by the cursor when it is drained. + connection = check_out_cursor_command_connection(server, context) + begin + op.execute_with_connection(connection, context: context, options: execution_opts) + rescue StandardError + # Release the connection before the error propagates so that + # a retried attempt checks out a fresh one. + connection.connection_pool.check_in(connection) unless connection.pinned? + connection = nil + raise + end + else + op.execute(server, context: context, options: execution_opts) + end + end unless result.cursor? raise Error::InvalidCursorOperation, @@ -682,5 +699,20 @@ def extract_cursor_command_view_options(options) view_options[key] = options.delete(key) if options.key?(key) end end + + # Checks out a load balanced connection for a cursor command. If the + # session is pinned to a connection (e.g. in a transaction), that + # connection is reused. + # + # @param [ Server ] server The load balancer server. + # @param [ Operation::Context ] context The operation context. + # + # @return [ Server::Connection ] The checked out connection. + def check_out_cursor_command_connection(server, context) + connection = if context.connection_global_id + server.pool.check_out_pinned_connection(context.connection_global_id) + end + connection || server.pool.check_out(context: context) + end end end diff --git a/spec/integration/cursor_command_overload_spec.rb b/spec/integration/cursor_command_overload_spec.rb new file mode 100644 index 0000000000..cb73b19d52 --- /dev/null +++ b/spec/integration/cursor_command_overload_spec.rb @@ -0,0 +1,82 @@ +# frozen_string_literal: true + +require 'spec_helper' + +describe 'Database#cursor_command overload retry' do + require_topology :replica_set + min_server_version '4.4' + + let(:subscriber) { Mrss::EventSubscriber.new } + + let(:client_options) { {} } + + let(:client) do + authorized_client.with(client_options).tap do |client| + client.subscribe(Mongo::Monitoring::COMMAND, subscriber) + end + end + + let(:collection) { authorized_client['cursor_command_overload_spec'] } + + let(:find_started_events) do + subscriber.started_events.select { |e| e.command_name == 'find' } + end + + before do + collection.drop + collection.insert_many((1..4).map { |i| { _id: i } }) + authorized_client.use(:admin).command( + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + failCommands: %w[find], + errorCode: 6, + errorLabels: %w[RetryableError SystemOverloadedError] + } + ) + end + + after do + authorized_client.use(:admin).command( + configureFailPoint: 'failCommand', + mode: 'off' + ) + client.close + end + + context 'when retryable reads and writes are enabled' do + it 'retries the initial command' do + cursor = client.database.cursor_command( + { find: collection.name, batchSize: 2 } + ) + expect(cursor.to_a.length).to eq(4) + expect(find_started_events.length).to eq(2) + end + end + + context 'when retryable reads are disabled' do + let(:client_options) { { retry_reads: false } } + + it 'does not retry and raises the overload error' do + expect do + client.database.cursor_command({ find: collection.name, batchSize: 2 }) + end.to raise_error(Mongo::Error::OperationFailure) { |e| + expect(e.label?('SystemOverloadedError')).to be true + } + expect(find_started_events.length).to eq(1) + end + end + + context 'when retryable writes are disabled' do + let(:client_options) { { retry_writes: false } } + + it 'does not retry and raises the overload error' do + expect do + client.database.cursor_command({ find: collection.name, batchSize: 2 }) + end.to raise_error(Mongo::Error::OperationFailure) { |e| + expect(e.label?('SystemOverloadedError')).to be true + } + expect(find_started_events.length).to eq(1) + end + end +end diff --git a/spec/mongo/database_spec.rb b/spec/mongo/database_spec.rb index 091c18e09f..0343f6ed5b 100644 --- a/spec/mongo/database_spec.rb +++ b/spec/mongo/database_spec.rb @@ -1226,4 +1226,50 @@ end end end + + describe '#check_out_cursor_command_connection' do + let(:database) do + described_class.new(authorized_client, SpecConfig.instance.test_db) + end + + let(:pool) { instance_double(Mongo::Server::ConnectionPool) } + let(:server) { instance_double(Mongo::Server, pool: pool) } + let(:connection) { instance_double(Mongo::Server::Connection) } + + context 'when the context is pinned to a connection' do + let(:context) do + instance_double(Mongo::Operation::Context, connection_global_id: 42) + end + + it 'checks out the pinned connection' do + expect(pool).to receive(:check_out_pinned_connection).with(42).and_return(connection) + expect( + database.send(:check_out_cursor_command_connection, server, context) + ).to be(connection) + end + + context 'when the pinned connection is not checked out' do + it 'falls back to a regular check out' do + expect(pool).to receive(:check_out_pinned_connection).with(42).and_return(nil) + expect(pool).to receive(:check_out).with(context: context).and_return(connection) + expect( + database.send(:check_out_cursor_command_connection, server, context) + ).to be(connection) + end + end + end + + context 'when the context is not pinned to a connection' do + let(:context) do + instance_double(Mongo::Operation::Context, connection_global_id: nil) + end + + it 'checks out a new connection' do + expect(pool).to receive(:check_out).with(context: context).and_return(connection) + expect( + database.send(:check_out_cursor_command_connection, server, context) + ).to be(connection) + end + end + end end From 5e89265eb0fb5f8455daae963d1823f15ca3ef6e Mon Sep 17 00:00:00 2001 From: Dmitry Rybakov Date: Thu, 11 Jun 2026 16:51:00 +0200 Subject: [PATCH 3/3] RUBY-3213 Make cursor_command retry test deterministic on no-retry variants The 'retries the initial command' test inherited the suite-wide retry_writes setting via authorized_client, so it failed on the no-retry-writes Evergreen variant where overload retry is disabled. Force retry_reads and retry_writes on in that context. --- spec/integration/cursor_command_overload_spec.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/spec/integration/cursor_command_overload_spec.rb b/spec/integration/cursor_command_overload_spec.rb index cb73b19d52..5fbbaed82f 100644 --- a/spec/integration/cursor_command_overload_spec.rb +++ b/spec/integration/cursor_command_overload_spec.rb @@ -45,6 +45,10 @@ end context 'when retryable reads and writes are enabled' do + # Force both on so the test is deterministic on Evergreen variants that + # disable retryable reads or writes suite-wide (e.g. no-retry-writes). + let(:client_options) { { retry_reads: true, retry_writes: true } } + it 'retries the initial command' do cursor = client.database.cursor_command( { find: collection.name, batchSize: 2 }