Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wszystkie zmiany optymalizacyjne #3

Open
wants to merge 16 commits into
base: local
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
build
build.ninja

.idea
.project
.cproject
.settings
107 changes: 107 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#
# This file is open source software, licensed to you under the terms
# of the Apache License, Version 2.0 (the "License"). See the NOTICE file
# distributed with this work for additional information regarding copyright
# ownership. You may not use this file except in compliance with the License.
#
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

#
# Copyright (C) 2018 Scylladb, Ltd.
#

cmake_minimum_required (VERSION 3.5)
project (kafka4seastar)

find_package (Seastar REQUIRED)

set(HEADER_DIRECTORY include/kafka4seastar)

set(HEADERS
${HEADER_DIRECTORY}/connection/connection_manager.hh
${HEADER_DIRECTORY}/connection/kafka_connection.hh
${HEADER_DIRECTORY}/connection/tcp_connection.hh
${HEADER_DIRECTORY}/producer/batcher.hh
${HEADER_DIRECTORY}/producer/kafka_producer.hh
${HEADER_DIRECTORY}/producer/producer_properties.hh
${HEADER_DIRECTORY}/producer/sender.hh
${HEADER_DIRECTORY}/protocol/kafka_error_code.hh
${HEADER_DIRECTORY}/protocol/kafka_primitives.hh
${HEADER_DIRECTORY}/protocol/kafka_records.hh
${HEADER_DIRECTORY}/protocol/api_versions_request.hh
${HEADER_DIRECTORY}/protocol/api_versions_response.hh
${HEADER_DIRECTORY}/protocol/headers.hh
${HEADER_DIRECTORY}/protocol/metadata_request.hh
${HEADER_DIRECTORY}/protocol/metadata_response.hh
${HEADER_DIRECTORY}/protocol/produce_request.hh
${HEADER_DIRECTORY}/protocol/produce_response.hh
${HEADER_DIRECTORY}/utils/defaults.hh
${HEADER_DIRECTORY}/utils/metadata_manager.hh
${HEADER_DIRECTORY}/utils/partitioner.hh
${HEADER_DIRECTORY}/utils/retry_helper.hh)

set(SOURCES
src/connection/connection_manager.cc
src/connection/kafka_connection.cc
src/connection/tcp_connection.cc
src/producer/batcher.cc
src/producer/kafka_producer.cc
src/producer/sender.cc
src/protocol/kafka_error_code.cc
src/protocol/kafka_records.cc
src/protocol/api_versions_request.cc
src/protocol/api_versions_response.cc
src/protocol/headers.cc
src/protocol/metadata_request.cc
src/protocol/metadata_response.cc
src/protocol/produce_request.cc
src/protocol/produce_response.cc
src/utils/defaults.cc
src/utils/metadata_manager.cc
src/utils/partitioner.cc)

add_library(kafka4seastar STATIC ${SOURCES})

target_include_directories(kafka4seastar
PUBLIC
$<INSTALL_INTERFACE:include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/src)

add_subdirectory(tests/unit)

add_executable (kafka_demo
demo/kafka_demo.cc)

target_link_libraries (kafka4seastar
Seastar::seastar
Seastar::seastar_testing)

target_link_libraries(kafka_demo
kafka4seastar)

include(GNUInstallDirs)

install(TARGETS kafka4seastar
EXPORT kafka4seastar-export
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR})

install(EXPORT kafka4seastar-export
FILE
Kafka4seastarTargets.cmake
NAMESPACE
Kafka4seastar::
DESTINATION
${CMAKE_INSTALL_LIBDIR}/cmake/kafka4seastar)
8 changes: 8 additions & 0 deletions Kafka4seastarConfig.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
get_filename_component(kafka4seastar_CMAKE_DIR "${CMAKE_CURRENT_LIST_FILE}" PATH)
include(CMakeFindDependencyMacro)

find_dependency(Seastar REQUIRED)

if(NOT TARGET Kafka4seastar::kafka4seastar)
include("${kafka4seastar_CMAKE_DIR}/Kafka4seastarTargets.cmake")
endif()
92 changes: 92 additions & 0 deletions demo/kafka_demo.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2019 ScyllaDB Ltd.
*/

#include <iostream>
#include <seastar/core/app-template.hh>
#include <seastar/core/print.hh>
#include <seastar/core/thread.hh>
#include <kafka4seastar/producer/kafka_producer.hh>
#include <seastar/core/smp.hh>
#include <seastar/util/log.hh>

using namespace seastar;

namespace bpo = boost::program_options;
namespace k4s = kafka4seastar;

seastar::future<sstring> async_stdin_read() {
return seastar::smp::submit_to(1, []{
sstring res;
std::cin >> res;
return res;
});
}

int main(int ac, char** av) {
app_template app;
app.add_options()
("host", bpo::value<std::string>()->default_value("172.13.0.1"), "Address of the Kafka broker")
("port", bpo::value<uint16_t>()->default_value(9092), "Port to connect through");

return app.run(ac, av, [&app] {
return seastar::async([&app] {
auto&& config = app.configuration();
std::string host = config["host"].as<std::string>();
uint16_t port = config["port"].as<uint16_t>();
(void) port;

k4s::producer_properties properties;
properties._client_id = "seastar-kafka-demo";
properties._request_timeout = 1000;
properties._servers = {
{host, port},
{"172.29.0.1", 9092}
};

k4s::kafka_producer producer(std::move(properties));
producer.init().wait();
fprint(std::cout, "Producer initialized and ready to send\n\n");

sstring topic, key, value;
while (true) {
fprint(std::cout,
"\nType the topic and the message you want to send below. If you want to quit type 'q'\n");
fprint(std::cout, "Enter topic: ");
topic = async_stdin_read().get0();

if (topic.empty() || topic == "q") {
producer.disconnect().wait();
fprint(std::cout, "Finished succesfully!\n");
break;
}

fprint(std::cout, "Enter key: ");
key = async_stdin_read().get0();
fprint(std::cout, "Enter value: ");
value = async_stdin_read().get0();

(void)producer.produce(topic, key, value).handle_exception([key, value](auto ep) {
fprint(std::cout, "Failure sending %s %s: %s.\n", key, value, ep);
});
}
});
});
}
152 changes: 152 additions & 0 deletions include/kafka4seastar/connection/connection_manager.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Copyright (C) 2019 ScyllaDB Ltd.
*/

#pragma once

#include <seastar/kafka4seastar/connection/kafka_connection.hh>
#include <seastar/kafka4seastar/protocol/metadata_response.hh>
#include <seastar/kafka4seastar/protocol/metadata_request.hh>

#include <map>

using namespace seastar;

namespace kafka4seastar {

struct metadata_refresh_exception : public std::runtime_error {
public:
explicit metadata_refresh_exception(const seastar::sstring& message) : runtime_error(message) {}
};

class connection_manager {
public:

using connection_id = std::pair<seastar::sstring, uint16_t>;
using connection_iterator = std::map<connection_id, std::unique_ptr<kafka_connection>>::iterator;

private:

std::map<connection_id, std::unique_ptr<kafka_connection>> _connections;
seastar::sstring _client_id;

semaphore _send_semaphore;

future<> _pending_queue;

future<connection_iterator> connect(const seastar::sstring& host, uint16_t port, uint32_t timeout);

template<typename RequestType>
future<future<typename RequestType::response_type>> perform_request(connection_iterator& conn, RequestType& request, bool with_response) {
auto send_future = with_response
? conn->second->send(std::move(request))
: conn->second->send_without_response(std::move(request));

promise<> promise;
auto f = promise.get_future();
_pending_queue = _pending_queue.then([f = std::move(f)] () mutable {
return std::move(f);
});

send_future = send_future.then([promise = std::move(promise)] (auto response) mutable {
promise.set_value();
return response;
});

return make_ready_future<decltype(send_future)>(std::move(send_future));
}

public:

explicit connection_manager(seastar::sstring client_id)
: _client_id(std::move(client_id)),
_send_semaphore(1),
_pending_queue(make_ready_future<>()) {}

future<> init(const std::set<connection_id>& servers, uint32_t request_timeout);
connection_iterator get_connection(const connection_id& connection);
future<> disconnect(const connection_id& connection);

template<typename RequestType>
future<typename RequestType::response_type> send(RequestType&& request, const seastar::sstring& host,
uint16_t port, uint32_t timeout, bool with_response=true) {
// In order to preserve ordering of sends, a semaphore with
// count = 1 is used due to its FIFO guarantees.
//
// It is important that connect() and send() are done
// with semaphore, as naive implementation
//
// connect(host, port).then([](auto conn) { conn.send(req1); });
// connect(host, port).then([](auto conn) { conn.send(req2); });
//
// could introduce reordering of requests: after both
// connects resolve as ready futures, the continuations (sends)
// are not guaranteed to run with any specific order.
//
// In order to not limit concurrency, send_future is
// returned as future<future<response>> and "unpacked"
// outside the semaphore - scheduling inside semaphore
// (only 1 at the time) and waiting for result outside it.
return with_semaphore(_send_semaphore, 1, [this, request = std::move(request), host, port, timeout, with_response] () mutable {
auto conn = get_connection({host, port});
if (conn != _connections.end()) {
return perform_request<RequestType>(conn, request, with_response);
} else {
return connect(host, port, timeout).then([this, request = std::move(request), with_response](connection_iterator conn) mutable {
return perform_request<RequestType>(conn, request, with_response);
});
}
}).then([](future<typename RequestType::response_type> send_future) {
return send_future;
}).then([this, host, port](typename RequestType::response_type response) {
if (response._error_code == error::kafka_error_code::REQUEST_TIMED_OUT ||
response._error_code == error::kafka_error_code::CORRUPT_MESSAGE ||
response._error_code == error::kafka_error_code::NETWORK_EXCEPTION) {
_pending_queue = _pending_queue.then([this, host, port] {
return disconnect({host, port});
});
}
return response;
}).handle_exception([this, host, port] (std::exception_ptr ep) {
try {
_pending_queue = _pending_queue.then([this, host, port] {
return disconnect({host, port});
});
std::rethrow_exception(ep);
} catch (seastar::timed_out_error& e) {
typename RequestType::response_type response;
response._error_code = error::kafka_error_code::REQUEST_TIMED_OUT;
return response;
} catch (...) {
typename RequestType::response_type response;
response._error_code = error::kafka_error_code::NETWORK_EXCEPTION;
return response;
}
});
}

future<metadata_response> ask_for_metadata(metadata_request&& request);

future<> disconnect_all();

};

}
Loading