Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions cpp/src/ray/runtime/object/local_mode_object_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ namespace internal {
LocalModeObjectStore::LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_tuntime)
: io_context_("LocalModeObjectStore"),
local_mode_ray_tuntime_(local_mode_ray_tuntime) {
memory_store_ = std::make_unique<CoreWorkerMemoryStore>(io_context_.GetIoService());
memory_store_ = std::make_unique<CoreWorkerMemoryStore>(io_context_.GetIoService(),
/*reference_counting=*/false);
}

void LocalModeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,
Expand All @@ -41,8 +42,11 @@ void LocalModeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,
const ObjectID &object_id) {
auto buffer = std::make_shared<::ray::LocalMemoryBuffer>(
reinterpret_cast<uint8_t *>(data->data()), data->size(), true);
// NOTE: you can't have reference when reference counting is disabled in local mode
memory_store_->Put(
::ray::RayObject(buffer, nullptr, std::vector<rpc::ObjectReference>()), object_id);
::ray::RayObject(buffer, nullptr, std::vector<rpc::ObjectReference>()),
object_id,
/*has_reference=*/false);
}

std::shared_ptr<msgpack::sbuffer> LocalModeObjectStore::GetRaw(const ObjectID &object_id,
Expand All @@ -61,7 +65,6 @@ std::vector<std::shared_ptr<msgpack::sbuffer>> LocalModeObjectStore::GetRaw(
(int)ids.size(),
timeout_ms,
local_mode_ray_tuntime_.GetWorkerContext(),
false,
&results);
if (!status.ok()) {
throw RayException("Get object error: " + status.ToString());
Expand Down
2 changes: 1 addition & 1 deletion src/mock/ray/core_worker/memory_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class DefaultCoreWorkerMemoryStoreWithThread : public CoreWorkerMemoryStore {
private:
explicit DefaultCoreWorkerMemoryStoreWithThread(
std::unique_ptr<InstrumentedIOContextWithThread> io_context)
: CoreWorkerMemoryStore(io_context->GetIoService()),
: CoreWorkerMemoryStore(io_context->GetIoService(), /*reference_counting=*/false),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
: CoreWorkerMemoryStore(io_context->GetIoService(), /*reference_counting=*/false),
: CoreWorkerMemoryStore(io_context->GetIoService(), /*reference_counting=*/true),

There's no reason for the default core worker memory store to be created with reference counting disabled.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Mock Initialization Mismatch Causes Test Inconsistency

The DefaultCoreWorkerMemoryStoreWithThread mock is initialized with reference_counting=false. PR discussions suggested this should be true, as there's no reason to disable it. This inconsistency can cause the mock to behave differently from production code, potentially masking reference counting bugs in tests.

Fix in Cursor Fix in Web

io_context_(std::move(io_context)) {}

std::unique_ptr<InstrumentedIOContextWithThread> io_context_;
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ ray_cc_library(
hdrs = ["store_provider/memory_store/memory_store.h"],
deps = [
":core_worker_context",
":reference_counter_interface",
"//src/ray/common:asio",
"//src/ray/common:id",
"//src/ray/common:ray_config",
Expand All @@ -290,6 +289,7 @@ ray_cc_library(
name = "task_manager_interface",
hdrs = ["task_manager_interface.h"],
deps = [
":reference_counter_interface",
"//src/ray/common:id",
"//src/ray/common:status",
"//src/ray/common:task_common",
Expand Down Expand Up @@ -348,6 +348,7 @@ ray_cc_library(
hdrs = ["future_resolver.h"],
deps = [
":memory_store",
":reference_counter_interface",
"//src/ray/common:id",
"//src/ray/core_worker_rpc_client:core_worker_client_pool",
],
Expand Down Expand Up @@ -408,7 +409,6 @@ ray_cc_library(
deps = [
":common",
":core_worker_context",
":reference_counter_interface",
"//src/ray/common:buffer",
"//src/ray/common:id",
"//src/ray/common:ray_config",
Expand Down
54 changes: 43 additions & 11 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,9 @@ Status CoreWorker::PutInLocalPlasmaStore(const RayObject &object,
RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id));
}
}
memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id);
memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
object_id,
reference_counter_->HasReference(object_id));
return Status::OK();
}

Expand All @@ -993,7 +995,7 @@ Status CoreWorker::Put(const RayObject &object,
RAY_RETURN_NOT_OK(WaitForActorRegistered(contained_object_ids));
if (options_.is_local_mode) {
RAY_LOG(DEBUG).WithField(object_id) << "Put object in memory store";
memory_store_->Put(object, object_id);
memory_store_->Put(object, object_id, reference_counter_->HasReference(object_id));
return Status::OK();
}
return PutInLocalPlasmaStore(object, object_id, pin_object);
Expand Down Expand Up @@ -1092,7 +1094,9 @@ Status CoreWorker::CreateOwnedAndIncrementLocalRef(
} else if (*data == nullptr) {
// Object already exists in plasma. Store the in-memory value so that the
// client will check the plasma store.
memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), *object_id);
memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
*object_id,
reference_counter_->HasReference(*object_id));
}
}
return Status::OK();
Expand Down Expand Up @@ -1189,7 +1193,9 @@ Status CoreWorker::SealExisting(const ObjectID &object_id,
RAY_RETURN_NOT_OK(plasma_store_provider_->Release(object_id));
reference_counter_->FreePlasmaObjects({object_id});
}
memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id);
memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
object_id,
reference_counter_->HasReference(object_id));
return Status::OK();
}

Expand Down Expand Up @@ -1361,13 +1367,20 @@ Status CoreWorker::GetObjects(const std::vector<ObjectID> &ids,
// If any of the objects have been promoted to plasma, then we retry their
// gets at the provider plasma. Once we get the objects from plasma, we flip
// the transport type again and return them for the original direct call ids.

// Prepare object ids vector and owner addresses vector
std::vector<ObjectID> object_ids =
std::vector<ObjectID>(plasma_object_ids.begin(), plasma_object_ids.end());
auto owner_addresses = reference_counter_->GetOwnerAddresses(object_ids);

int64_t local_timeout_ms = timeout_ms;
if (timeout_ms >= 0) {
local_timeout_ms = std::max(static_cast<int64_t>(0),
timeout_ms - (current_time_ms() - start_time));
}
RAY_LOG(DEBUG) << "Plasma GET timeout " << local_timeout_ms;
RAY_RETURN_NOT_OK(plasma_store_provider_->Get(plasma_object_ids,
RAY_RETURN_NOT_OK(plasma_store_provider_->Get(object_ids,
owner_addresses,
local_timeout_ms,
*worker_context_,
&result_map,
Expand Down Expand Up @@ -1515,8 +1528,14 @@ Status CoreWorker::Wait(const std::vector<ObjectID> &ids,
// num_objects ready since we want to at least make the request to start pulling
// these objects.
if (!plasma_object_ids.empty()) {
// Prepare object ids map
std::vector<ObjectID> object_ids =
std::vector<ObjectID>(plasma_object_ids.begin(), plasma_object_ids.end());
auto owner_addresses = reference_counter_->GetOwnerAddresses(object_ids);

RAY_RETURN_NOT_OK(plasma_store_provider_->Wait(
plasma_object_ids,
object_ids,
owner_addresses,
std::min(static_cast<int>(plasma_object_ids.size()),
num_objects - static_cast<int>(ready.size())),
timeout_ms,
Expand Down Expand Up @@ -3003,8 +3022,12 @@ bool CoreWorker::PinExistingReturnObject(const ObjectID &return_id,
reference_counter_->AddLocalReference(return_id, "<temporary (pin return object)>");
reference_counter_->AddBorrowedObject(return_id, ObjectID::Nil(), owner_address);

// Resolve owner address of return id
std::vector<ObjectID> object_ids = {return_id};
auto owner_addresses = reference_counter_->GetOwnerAddresses(object_ids);

auto status = plasma_store_provider_->Get(
{return_id}, 0, *worker_context_, &result_map, &got_exception);
object_ids, owner_addresses, 0, *worker_context_, &result_map, &got_exception);
// Remove the temporary ref.
RemoveLocalReference(return_id);

Expand Down Expand Up @@ -3222,7 +3245,8 @@ Status CoreWorker::GetAndPinArgsForExecutor(const TaskSpecification &task,
// otherwise, the put is a no-op.
if (!options_.is_local_mode) {
memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
task.ArgObjectId(i));
task.ArgObjectId(i),
reference_counter_->HasReference(task.ArgObjectId(i)));
}
} else {
// A pass-by-value argument.
Expand Down Expand Up @@ -3271,8 +3295,12 @@ Status CoreWorker::GetAndPinArgsForExecutor(const TaskSpecification &task,
RAY_RETURN_NOT_OK(memory_store_->Get(
by_ref_ids, -1, *worker_context_, &result_map, &got_exception));
} else {
// Resolve owner addresses of by-ref ids
std::vector<ObjectID> object_ids =
std::vector<ObjectID>(by_ref_ids.begin(), by_ref_ids.end());
auto owner_addresses = reference_counter_->GetOwnerAddresses(object_ids);
RAY_RETURN_NOT_OK(plasma_store_provider_->Get(
by_ref_ids, -1, *worker_context_, &result_map, &got_exception));
object_ids, owner_addresses, -1, *worker_context_, &result_map, &got_exception));
}
for (const auto &it : result_map) {
for (size_t idx : by_ref_indices[it.first]) {
Expand Down Expand Up @@ -4103,7 +4131,9 @@ Status CoreWorker::DeleteImpl(const std::vector<ObjectID> &object_ids, bool loca
memory_store_->Delete(object_ids);
for (const auto &object_id : object_ids) {
RAY_LOG(DEBUG).WithField(object_id) << "Freeing object";
memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_FREED), object_id);
memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_FREED),
object_id,
reference_counter_->HasReference(object_id));
}

// We only delete from plasma, which avoids hangs (issue #7105). In-memory
Expand Down Expand Up @@ -4253,7 +4283,9 @@ void CoreWorker::HandleAssignObjectOwner(rpc::AssignObjectOwnerRequest request,
/*add_local_ref=*/false,
/*pinned_at_node_id=*/NodeID::FromBinary(borrower_address.node_id()));
reference_counter_->AddBorrowerAddress(object_id, borrower_address);
memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id);
memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
object_id,
reference_counter_->HasReference(object_id));
send_reply_callback(Status::OK(), nullptr, nullptr);
}

Expand Down
3 changes: 1 addition & 2 deletions src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
auto plasma_store_provider = std::make_shared<CoreWorkerPlasmaStoreProvider>(
options.store_socket,
raylet_ipc_client,
*reference_counter,
options.check_signals,
/*warmup=*/
(options.worker_type != WorkerType::SPILL_WORKER &&
Expand All @@ -367,7 +366,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
});
auto memory_store = std::make_shared<CoreWorkerMemoryStore>(
io_service_,
reference_counter.get(),
/*reference_counting=*/reference_counter != nullptr,
raylet_ipc_client,
options.check_signals,
[this](const RayObject &obj) {
Expand Down
11 changes: 8 additions & 3 deletions src/ray/core_worker/future_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,18 @@ void FutureResolver::ProcessResolvedObject(const ObjectID &object_id,
if (!status.ok()) {
// The owner is unreachable. Store an error so that an exception will be
// thrown immediately when the worker tries to get the value.
in_memory_store_->Put(RayObject(rpc::ErrorType::OWNER_DIED), object_id);
in_memory_store_->Put(RayObject(rpc::ErrorType::OWNER_DIED),
object_id,
reference_counter_->HasReference(object_id));
} else if (reply.status() == rpc::GetObjectStatusReply::OUT_OF_SCOPE) {
// The owner replied that the object has gone out of scope (this is an edge
// case in the distributed ref counting protocol where a borrower dies
// before it can notify the owner of another borrower). Store an error so
// that an exception will be thrown immediately when the worker tries to
// get the value.
in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_DELETED), object_id);
in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_DELETED),
object_id,
reference_counter_->HasReference(object_id));
} else if (reply.status() == rpc::GetObjectStatusReply::CREATED) {
// The object is either an indicator that the object is in Plasma, or
// the object has been returned directly in the reply. In either
Expand Down Expand Up @@ -106,7 +110,8 @@ void FutureResolver::ProcessResolvedObject(const ObjectID &object_id,
inlined_ref.owner_address());
}
in_memory_store_->Put(RayObject(data_buffer, metadata_buffer, inlined_refs),
object_id);
object_id,
reference_counter_->HasReference(object_id));
}
}

Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/future_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <utility>

#include "ray/common/id.h"
#include "ray/core_worker/reference_counter_interface.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/core_worker_rpc_client/core_worker_client_pool.h"
#include "src/ray/protobuf/core_worker.pb.h"
Expand Down
7 changes: 5 additions & 2 deletions src/ray/core_worker/object_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
// (core_worker.cc removes the object from memory store before calling this method),
// we need to add it back to indicate that it's available.
// If the object is already in the memory store then the put is a no-op.
in_memory_store_.Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id);
in_memory_store_.Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
object_id,
reference_counter_.HasReference(object_id));
}
return true;
}
Expand Down Expand Up @@ -121,7 +123,8 @@ void ObjectRecoveryManager::PinExistingObjectCopy(
const Status &status, const rpc::PinObjectIDsReply &reply) mutable {
if (status.ok() && reply.successes(0)) {
in_memory_store_.Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
object_id);
object_id,
reference_counter_.HasReference(object_id));
reference_counter_.UpdateObjectPinnedAtRaylet(object_id, node_id);
} else {
RAY_LOG(INFO).WithField(object_id)
Expand Down
Loading