Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/mongo/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,12 @@ def get_more_operation
if view.respond_to?(:options) && view.options.is_a?(Hash) && !view.options[:comment].nil?
spec[:comment] = view.options[:comment]
end
# A cursor built from a runCursorCommand response carries a getMore-specific
# maxTimeMS that is sent on getMore commands. Regular find/aggregate views
# do not expose this, so their getMores are unaffected.
if view.respond_to?(:max_time_ms_for_get_more) && view.max_time_ms_for_get_more
spec[:max_time_ms] = view.max_time_ms_for_get_more
end
Operation::GetMore.new(spec)
end

Expand Down
144 changes: 144 additions & 0 deletions lib/mongo/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

require 'mongo/database/view'
require 'mongo/database/cursor_command_view'

module Mongo
# Represents a database on the db server and operations that can execute on
Expand Down Expand Up @@ -259,6 +260,119 @@ def command(operation, opts = {})
end
end

# Run a command that returns a cursor and parse the response as a cursor.
#
# The command is sent to the server unmodified; the driver MUST NOT inspect
# or alter it. If the response does not contain a cursor field an error is
# raised. The command is never retried.
#
# Note: if a +maxTimeMS+ field is already set on the command document it is
# left as-is. The +max_time_ms+ option below applies only to getMore
# commands. Setting both +timeout_ms+ and +max_time_ms+ is not supported
# and has undefined behavior.
#
# @example Run a cursor-returning command.
# database.cursor_command(checkMetadataConsistency: 1)
#
# @param [ Hash ] command The command to execute.
# @param [ Hash ] options The command options.
#
# @option options [ Hash ] :read The read preference for this command,
# used for server selection and reused for subsequent getMores.
# @option options [ Session ] :session The session to use. If none is
# given an implicit session is created and reused for the cursor's
# lifetime.
# @option options [ Integer ] :timeout_ms The operation timeout in
# milliseconds.
# @option options [ Integer ] :batch_size The batchSize to send on getMore
# commands.
# @option options [ Integer ] :max_time_ms The maxTimeMS to send on getMore
# commands.
# @option options [ Object ] :comment A comment to attach to getMore
# commands.
# @option options [ Symbol ] :cursor_type The cursor type, :tailable or
# :tailable_await. Must match the flags set on the command document.
# @option options [ Symbol ] :timeout_mode :cursor_lifetime or :iteration.
#
# @return [ Mongo::Cursor ] A cursor over the command results.
#
# @raise [ Error::InvalidCursorOperation ] If the response does not contain
# a cursor.
def cursor_command(command, options = {})
options = options.dup
execution_opts = options.delete(:execution_options) || {}
view_options = extract_cursor_command_view_options(options)

txn_read_pref = (options[:session].txn_read_preference if options[:session] && options[:session].in_transaction?)
txn_read_pref ||= options[:read] || ServerSelector::PRIMARY
Lint.validate_underscore_read_preference(txn_read_pref)
selector = ServerSelector.get(txn_read_pref)

# The session is intentionally not wrapped in #with_session: an implicit
# session must outlive this method and is ended by the cursor when it is
# exhausted or closed. Until the cursor takes ownership, the session and
# any load-balanced connection are cleaned up here on every exit path.
session = client.get_session(options)
context = Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(options)
)
op = Operation::CursorCommand.new(
selector: command,
db_name: name,
read: selector,
session: session
)

# Per the client-backpressure spec, retrying a generic command on
# overload errors requires both retryable reads and writes to be
# enabled, same as Database#command.
retry_enabled = client.options[:retry_reads] != false &&
client.options[:retry_writes] != false

server = nil
connection = nil
cursor = nil
begin
result = with_overload_retry(context: context, retry_enabled: retry_enabled) do
server = selector.select_server(cluster, nil, session)
if server.load_balancer?
# The connection is checked in by the cursor when it is drained.
connection = check_out_cursor_command_connection(server, context)
begin
op.execute_with_connection(connection, context: context, options: execution_opts)
rescue StandardError
# Release the connection before the error propagates so that
# a retried attempt checks out a fresh one.
connection.connection_pool.check_in(connection) unless connection.pinned?
connection = nil
raise
end
else
op.execute(server, context: context, options: execution_opts)
end
end

unless result.cursor?
raise Error::InvalidCursorOperation,
'The command response did not include a cursor. ' \
'Use Database#command for commands that do not return a cursor.'
end

view = CursorCommandView.new(self, view_options)
cursor = Cursor.new(view, result, server, session: session, context: context)
ensure
# If the cursor was created it owns the session and connection;
# otherwise (error or no cursor in the response) release them here.
unless cursor
connection.connection_pool.check_in(connection) if connection && !connection.pinned?
session.end_session if session && session.implicit?
end
end
cursor
end

# Execute a read command on the database, retrying the read if necessary.
#
# @param [ Hash ] operation The command to execute.
Expand Down Expand Up @@ -570,5 +684,35 @@ def operation_timeouts(opts)
end
end
end

private

# Removes the getMore and cursor options from the options hash and returns
# them as a separate hash for the CursorCommandView. The remaining options
# (e.g. :session, :read, :timeout_ms) are left for command execution.
#
# @param [ Hash ] options The cursor_command options (mutated).
#
# @return [ Hash ] The view options.
def extract_cursor_command_view_options(options)
%i[ batch_size max_time_ms comment cursor_type timeout_mode ].each_with_object({}) do |key, view_options|
view_options[key] = options.delete(key) if options.key?(key)
end
end

# Checks out a load balanced connection for a cursor command. If the
# session is pinned to a connection (e.g. in a transaction), that
# connection is reused.
#
# @param [ Server ] server The load balancer server.
# @param [ Operation::Context ] context The operation context.
#
# @return [ Server::Connection ] The checked out connection.
def check_out_cursor_command_connection(server, context)
connection = if context.connection_global_id
server.pool.check_out_pinned_connection(context.connection_global_id)
end
connection || server.pool.check_out(context: context)
end
end
end
95 changes: 95 additions & 0 deletions lib/mongo/database/cursor_command_view.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# frozen_string_literal: true

# Copyright (C) 2025 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
class Database
# The minimal view a Cursor needs when it is built from an arbitrary
# command response rather than from a collection query.
#
# It carries the getMore-specific options (batchSize, maxTimeMS, comment)
# and the cursor type and timeout mode, and answers the few methods the
# Cursor reads from its view.
#
# @api private
class CursorCommandView
# @param [ Mongo::Database ] database The database the command ran on.
# @param [ Hash ] options The getMore and timeout options.
#
# @option options [ Integer ] :batch_size The batchSize for getMores.
# @option options [ Integer ] :max_time_ms The maxTimeMS for getMores.
# @option options [ Object ] :comment The comment for getMores.
# @option options [ Symbol ] :cursor_type :tailable or :tailable_await.
# @option options [ Symbol ] :timeout_mode :cursor_lifetime or :iteration.
def initialize(database, options = {})
@database = database
@options = options
end

# @return [ Mongo::Database ] The database the command ran on.
attr_reader :database

# @return [ Hash ] The view options. Used by the Cursor to read the
# getMore comment.
attr_reader :options

# @return [ Mongo::Client ] The client.
def client
database.client
end

# A placeholder collection used only so the Cursor can reach the client
# and database. The actual namespace for getMore and killCursors is taken
# from the command response, not from this collection.
#
# @return [ Mongo::Collection ] The $cmd pseudo collection.
def collection
@collection ||= Collection.new(database, '$cmd')
end

# @return [ Integer | nil ] The batchSize sent on getMore commands.
def batch_size
options[:batch_size]
end

# @return [ Integer | nil ] The maxTimeMS sent on getMore commands.
def max_time_ms_for_get_more
options[:max_time_ms]
end

# @return [ Symbol | nil ] The cursor type.
def cursor_type
options[:cursor_type]
end

# @return [ Symbol | nil ] The timeout mode.
def timeout_mode
options[:timeout_mode]
end

# @return [ Hash ] timeout values for the operation context.
def operation_timeouts(opts = {})
database.operation_timeouts(opts)
end

private

# Cursors do not support a limit when built from a command response.
def limit
nil
end
end
end
end
1 change: 1 addition & 0 deletions lib/mongo/operation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
require 'mongo/operation/drop_database'
require 'mongo/operation/get_more'
require 'mongo/operation/find'
require 'mongo/operation/cursor_command'
require 'mongo/operation/explain'
require 'mongo/operation/kill_cursors'
require 'mongo/operation/indexes'
Expand Down
33 changes: 33 additions & 0 deletions lib/mongo/operation/cursor_command.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true

# Copyright (C) 2025 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'mongo/operation/cursor_command/op_msg'
require 'mongo/operation/cursor_command/result'

module Mongo
module Operation
# A command operation whose response is parsed as a cursor.
#
# Unlike Command, the result exposes the firstBatch, namespace, and cursor
# id from the command response so that a Cursor can be built from it.
#
# @api private
class CursorCommand
include Specifiable
include OpMsgExecutable
end
end
end
37 changes: 37 additions & 0 deletions lib/mongo/operation/cursor_command/op_msg.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

# Copyright (C) 2025 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
module Operation
class CursorCommand
# A cursor command operation sent as an op message.
#
# @api private
class OpMsg < OpMsgBase
include PolymorphicResult

private

# The user's command is sent verbatim. The driver MUST NOT inspect or
# modify it; $db, lsid and other internal fields are attached by the
# shared command building code.
def selector(_connection)
spec[:selector].dup
end
end
end
end
end
Loading
Loading