Skip to content

CXX-3294 Relocate all remaining Manual Doc Page examples into the examples directory #1407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 21, 2025
2 changes: 1 addition & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ foreach(source ${mongocxx_examples_sources})
set(extra_libs "")
set(add_to_run_examples ON)

if (${source} MATCHES "pool")
if (${source} MATCHES "pool|change_streams_examples")
list(APPEND extra_libs Threads::Threads)
endif()

Expand Down
2 changes: 1 addition & 1 deletion examples/mongocxx/causal_consistency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ int EXAMPLES_CDECL main() {
using namespace mongocxx;

instance inst{};
client client{mongocxx::uri{"mongodb://localhost/?replicaSet=repl0"}};
client client{mongocxx::uri{}};

// Start Causal Consistency Example 1

Expand Down
2 changes: 1 addition & 1 deletion examples/mongocxx/client_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ int EXAMPLES_CDECL main() {
// must remain alive for as long as the driver is in use.
mongocxx::instance inst{};

mongocxx::client conn{mongocxx::uri{"mongodb://localhost/?replicaSet=repl0"}};
mongocxx::client conn{mongocxx::uri{}};

// By default, a session is causally consistent. Pass options::client_session to override
// causal consistency.
Expand Down
220 changes: 220 additions & 0 deletions examples/mongocxx/mongodb.com/change_streams_examples.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Copyright 2009-present MongoDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <atomic>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <stdexcept>
#include <thread>

#include <bsoncxx/builder/basic/document.hpp>
#include <bsoncxx/builder/basic/kvp.hpp>
#include <bsoncxx/document/value.hpp>
#include <bsoncxx/document/view.hpp>
#include <bsoncxx/stdx/optional.hpp>
#include <bsoncxx/string/view_or_value.hpp>

#include <mongocxx/change_stream.hpp>
#include <mongocxx/client.hpp>
#include <mongocxx/database.hpp>
#include <mongocxx/instance.hpp>
#include <mongocxx/pipeline.hpp>
#include <mongocxx/pool.hpp>
#include <mongocxx/uri.hpp>

#include <examples/macros.hh>

namespace {

using bsoncxx::builder::basic::kvp;
using bsoncxx::builder::basic::make_array;
using bsoncxx::builder::basic::make_document;

using namespace mongocxx;

mongocxx::collection
make_test_coll(mongocxx::client& client, bsoncxx::stdx::string_view db_name, bsoncxx::stdx::string_view coll_name) {
write_concern wc_majority;
wc_majority.acknowledge_level(write_concern::level::k_majority);

read_concern rc_majority;
rc_majority.acknowledge_level(read_concern::level::k_majority);

auto db = client[db_name];
auto coll = db[coll_name];

coll.drop();
coll = db.create_collection(coll_name);

coll.write_concern(wc_majority);
coll.read_concern(rc_majority);

return coll;
}

void change_streams_example_1(mongocxx::collection inventory) {
// Start Changestream Example 1
change_stream stream = inventory.watch();
auto it = stream.begin();
while (it == stream.end()) {
// Server returned no new notifications. Restart iteration to poll server.
it = stream.begin();
}
bsoncxx::document::view next = *it;
// End Changestream Example 1

if (next["operationType"].get_string().value != "insert") {
throw std::logic_error{"expected operationType to be 'insert'"};
}
}

void change_streams_example_2(mongocxx::collection inventory) {
// Start Changestream Example 2
options::change_stream options;
options.full_document(bsoncxx::string::view_or_value{"updateLookup"});
change_stream stream = inventory.watch(options);
auto it = stream.begin();
while (it == stream.end()) {
// Server returned no new notifications. Restart iteration to poll server.
it = stream.begin();
}
bsoncxx::document::view next = *it;
// End Changestream Example 2

if (next["operationType"].get_string().value != "insert") {
throw std::logic_error{"expected operationType to be 'insert'"};
}
}

void change_streams_example_3(mongocxx::collection inventory) {
bsoncxx::stdx::optional<bsoncxx::document::value> next;

// Get one notification to set `next`.
{
change_stream stream = inventory.watch();
auto it = stream.begin();
while (it == stream.end()) {
// Server returned no new notifications. Restart iteration to poll server.
it = stream.begin();
}
next = bsoncxx::document::value(*it);
}

// Start Changestream Example 3
auto resume_token = (*next)["_id"].get_document().value;
options::change_stream options;
options.resume_after(resume_token);
change_stream stream = inventory.watch(options);
auto it = stream.begin();
while (it == stream.end()) {
// Server returned no new notifications. Restart iteration to poll server.
it = stream.begin();
}
// End Changestream Example 3

if ((*it)["operationType"].get_string().value != "insert") {
throw std::logic_error{"expected operationType to be 'insert'"};
}
}

void change_streams_example_4(mongocxx::collection inventory) {
// Create a pipeline with
// [{"$match": {"$or": [{"fullDocument.username": "alice"}, {"operationType": "delete"}]}}]

// Start Changestream Example 4
mongocxx::pipeline cs_pipeline;
cs_pipeline.match(make_document(
kvp("$or",
make_array(
make_document(kvp("fullDocument.username", "alice")), make_document(kvp("operationType", "delete"))))));

change_stream stream = inventory.watch(cs_pipeline);
auto it = stream.begin();
while (it == stream.end()) {
// Server returned no new notifications. Restart iteration to poll server.
it = stream.begin();
}
// End Changestream Example 4

if ((*it)["operationType"].get_string().value != "insert") {
throw std::logic_error{"expected operationType to be 'insert'"};
}
}

} // namespace

int EXAMPLES_CDECL main() {
if (char const* const topology_env = std::getenv("MONGOCXX_TEST_TOPOLOGY")) {
auto const topology = std::string(topology_env);
if (topology != "replica") {
std::cerr << "Skipping: change_streams example requires a replica set" << std::endl;
return 0;
}
}

// The mongocxx::instance constructor and destructor initialize and shut down the driver,
// respectively. Therefore, a mongocxx::instance must be created before using the driver and
// must remain alive for as long as the driver is in use.
mongocxx::instance const inst{};

std::function<void(mongocxx::collection)> const examples[] = {
change_streams_example_1,
change_streams_example_2,
change_streams_example_3,
change_streams_example_4,
};

mongocxx::pool pool{mongocxx::uri{}};

for (auto const& example : examples) {
auto client = pool.acquire();

collection inventory = make_test_coll(*client, "streams", "events");

std::atomic_bool insert_thread_done;
insert_thread_done.store(false);

// Start a thread to repeatedly insert documents to generate notifications.
auto insert_thread = std::thread{[&pool, &insert_thread_done] {
auto client = pool.acquire();
auto inventory = (*client)["streams"]["events"];
while (true) {
auto doc = make_document(kvp("username", "alice"));
inventory.insert_one(doc.view());
if (insert_thread_done) {
return;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}};

try {
example(std::move(inventory));
insert_thread_done = true;
insert_thread.join();
} catch (std::logic_error const& e) {
std::cerr << e.what() << std::endl;
insert_thread_done = true;
insert_thread.detach();
return EXIT_FAILURE;
} catch (...) {
insert_thread_done = true;
insert_thread.detach();
throw;
}
}

return EXIT_SUCCESS;
}
Loading