diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 0fbb466..e9770db 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -16,7 +16,7 @@ jobs: strategy: matrix: elixir: ['1.16'] - otp: ['26.2'] + otp: ['25.3.2.9'] steps: - name: Cancel previous runs @@ -63,7 +63,7 @@ jobs: fail-fast: false matrix: elixir: ['1.16'] - otp: ['26.2'] + otp: ['25.3.2.9'] steps: - name: Cancel Previous Runs diff --git a/lib/kayrock/record_batch.ex b/lib/kayrock/record_batch.ex index 33ed4c2..182aebd 100644 --- a/lib/kayrock/record_batch.ex +++ b/lib/kayrock/record_batch.ex @@ -150,7 +150,6 @@ defmodule Kayrock.RecordBatch do @doc """ Direct deserializer - Supplied for compatibility with the Request protocol """ @spec deserialize(binary) :: nil | MessageSet.t() | t diff --git a/mix.exs b/mix.exs index 028a5b4..cc655fc 100644 --- a/mix.exs +++ b/mix.exs @@ -53,17 +53,9 @@ defmodule Kayrock.MixProject do {:excoveralls, "~> 0.18", only: :test}, {:kafka_protocol, "~> 2.4.1", only: [:dev, :test]}, {:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif", only: [:dev, :test]}, - {:snappyer, "~> 1.2", only: [:dev, :test]} + {:snappyer, "~> 1.2", only: [:dev, :test]}, + {:testcontainers, "~> 1.6.0", only: [:dev, :test]} ] - |> integration_test_deps() - end - - defp integration_test_deps(deps_list) do - if Version.match?(System.version(), ">= 1.15.0") do - [{:testcontainers, "~> 1.6", only: :test} | deps_list] - else - deps_list - end end defp elixirc_paths(:test), do: ["lib", "test/support"] diff --git a/mix.lock b/mix.lock index 97146b6..798e1a9 100644 --- a/mix.lock +++ b/mix.lock @@ -7,27 +7,29 @@ "dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"}, "earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm", "e3be2bc3ae67781db529b80aa7e7c49904a988596e2dbff897425b48b3581161"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, + "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"}, "ex_docker_engine_api": {:hex, :ex_docker_engine_api, "1.43.1", "1161e34b6bea5cef84d8fdc1d5d510fcb0c463941ce84c36f4a0f44a9096eb96", [:mix], [{:hackney, "~> 1.20", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:tesla, "~> 1.7", [hex: :tesla, repo: "hexpm", optional: false]}], "hexpm", "ec8fc499389aeef56ddca67e89e9e98098cff50587b56e8b4613279f382793b1"}, "excoveralls": {:hex, :excoveralls, "0.18.0", "b92497e69465dc51bc37a6422226ee690ab437e4c06877e836f1c18daeb35da9", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1109bb911f3cb583401760be49c02cbbd16aed66ea9509fc5479335d284da60b"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, + "fs": {:hex, :fs, "8.6.1", "7c9c0d0211e8c520e4e9eda63b960605c2711839f47285e6166c332d973be8ea", [:rebar3], [], "hexpm", "61ea2bdaedae4e2024d0d25c63e44dccf65622d4402db4a2df12868d1546503f"}, "hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, - "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "kafka_protocol": {:hex, :kafka_protocol, "2.4.1", "9e89afc740f57d17bec33f0f21dc23e7cbfd8c9ed3b0d6b9fc3a6bd4a827c088", [:rebar, :rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:snappyer, "1.2.6", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "86c9e1c6496273a6d1a02dc0e6ef7479d70441d526970abe667e2cf4deb1df21"}, "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, - "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, - "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, + "mimerl": {:hex, :mimerl, "1.3.0", "d0cd9fc04b9061f82490f6581e0128379830e78535e017f7780f37fea7545726", [:rebar3], [], "hexpm", "a1e15a50d1887217de95f0b9b0793e32853f7c258a5cd227650889b38839fe9d"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"}, "snappy": {:git, "https://github.com/fdmanana/snappy-erlang-nif", "e8907ee8e37cfa07d933a070669a88798082c3d7", []}, "snappyer": {:hex, :snappyer, "1.2.6", "34181e3233f68a92044e176fe96e54fee7957acc2be554f0460d799c495166c2", [:rebar3], [], "hexpm", "d538d1e8892af09dc8b2771b2652c9d70f009cd1556246b3e22706df643f47b4"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, - "tesla": {:hex, :tesla, "1.8.0", "d511a4f5c5e42538d97eef7c40ec4f3e44effdc5068206f42ed859e09e51d1fd", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, ">= 1.0.0", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.2", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "10501f360cd926a309501287470372af1a6e1cbed0f43949203a4c13300bc79f"}, + "tesla": {:hex, :tesla, "1.13.2", "85afa342eb2ac0fee830cf649dbd19179b6b359bec4710d02a3d5d587f016910", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, ">= 1.0.0", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.2", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:mox, "~> 1.0", [hex: :mox, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "960609848f1ef654c3cdfad68453cd84a5febecb6ed9fed9416e36cd9cd724f9"}, "testcontainers": {:hex, :testcontainers, "1.6.0", "14b3251f01ce0b1ada716130d371ba0b6cb1ce2904aa38bd58e5ff4194f4d88f", [:mix], [{:ecto, "~> 3.3", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_sql, "~> 3.3", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:ex_docker_engine_api, "~> 1.43.1", [hex: :ex_docker_engine_api, repo: "hexpm", optional: false]}, {:uuid, "~> 1.1", [hex: :uuid, repo: "hexpm", optional: false]}], "hexpm", "3f812407f232954999a3a2e05b2802e1d8d1afba120533c42b32c7cc91d35daf"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, "uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm", "c790593b4c3b601f5dc2378baae7efaf5b3d73c4c6456ba85759905be792f2ac"}, diff --git a/test/integration/producer_test.exs b/test/integration/producer_test.exs index 8d65558..d1d870b 100644 --- a/test/integration/producer_test.exs +++ b/test/integration/producer_test.exs @@ -331,6 +331,8 @@ defmodule Kayrock.Integration.ProducerTest do } do api_version = unquote(version) {:ok, client_pid} = build_client(kafka) + long_header = ?a..?z |> Enum.to_list() |> Enum.take_random(12) |> to_string() + message_content = ?a..?z |> Enum.to_list() |> Enum.take_random(50) |> to_string() # Create Topic topic_name = create_topic(client_pid, api_version) @@ -341,23 +343,23 @@ defmodule Kayrock.Integration.ProducerTest do records: [ %Kayrock.RecordBatch.Record{ key: "1", - value: "foo", + value: "#{message_content} 1", timestamp: timestamp, - headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header}], attributes: 0 }, %Kayrock.RecordBatch.Record{ key: "1", - value: "bar", + value: "#{message_content} 2", timestamp: timestamp, - headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header}], attributes: 0 }, %Kayrock.RecordBatch.Record{ key: "1", - value: "baz", + value: "#{message_content} 3", timestamp: timestamp, - headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header}], attributes: 0 } ] @@ -387,20 +389,29 @@ defmodule Kayrock.Integration.ProducerTest do [message_one, message_two, message_three] = List.first(response.partition_responses).record_set |> List.first() |> Map.get(:records) - assert message_one.value == "foo" + assert message_one.value == "#{message_content} 1" assert message_one.offset == 0 assert message_one.timestamp == timestamp - assert message_one.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] - assert message_two.value == "bar" + assert message_one.headers == [ + %Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header} + ] + + assert message_two.value == "#{message_content} 2" assert message_two.offset == 1 assert message_two.timestamp == timestamp - assert message_two.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] - assert message_three.value == "baz" + assert message_two.headers == [ + %Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header} + ] + + assert message_three.value == "#{message_content} 3" assert message_three.offset == 2 assert message_three.timestamp == timestamp - assert message_three.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + assert message_three.headers == [ + %Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header} + ] # [THEN] Produce another message record = %Kayrock.RecordBatch.Record{ @@ -439,6 +450,58 @@ defmodule Kayrock.Integration.ProducerTest do assert message.value == "zab" assert message.offset == 3 assert message.timestamp == timestamp + + # [THEN] Fetch incomplete messages from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: 0]] + fetch_request = fetch_messages_request(partition_data, [max_bytes: 100], api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [%{records: records}] = List.first(response.partition_responses).record_set + assert length(records) == 3 + + assert List.first(records).value == "#{message_content} 1" + assert List.first(records).offset == 0 + + assert List.first(records).headers == [ + %Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header} + ] + + assert List.last(records).value == "#{message_content} 3" + assert List.last(records).offset == 2 + + assert List.last(records).headers == [ + %Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header} + ] + + # [THEN] Fetch complete messages from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: 0]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [%{records: records}, %{records: records_two}] = + List.first(response.partition_responses).record_set + + assert length(records) == 3 + + assert List.first(records).value == "#{message_content} 1" + assert List.first(records).offset == 0 + + assert List.last(records).value == "#{message_content} 3" + assert List.last(records).offset == 2 + + assert length(records_two) == 1 + assert List.first(records_two).value == "zab" + assert List.first(records_two).offset == 3 end end end diff --git a/test/kayrock/fetch_test.exs b/test/kayrock/fetch_test.exs index c624887..b737108 100644 --- a/test/kayrock/fetch_test.exs +++ b/test/kayrock/fetch_test.exs @@ -1467,6 +1467,75 @@ defmodule Kayrock.FetchTest do assert got == expect end + test "deserialize incomplete records batch" do + full_data = + <<0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 1, 0, 12, 116, 101, 115, 116, 45, 116, 111, 112, 105, 99, + 45, 50, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 4, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 30, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 202, 0, 0, + 0, 0, 2, 124, 87, 184, 205, 0, 0, 0, 0, 0, 2, 0, 0, 1, 147, 120, 5, 42, 180, 0, 0, 1, 147, + 120, 5, 42, 180, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, + 0, 0, 3, 100, 0, 0, 0, 2, 49, 56, 110, 113, 120, 98, 100, 101, 99, 115, 105, 114, 104, + 103, 118, 112, 106, 116, 119, 121, 111, 117, 107, 108, 97, 122, 102, 109, 32, 49, 2, 2, + 49, 24, 113, 111, 106, 99, 100, 104, 102, 108, 122, 120, 97, 101, 100, 0, 0, 2, 2, 49, 56, + 110, 113, 120, 98, 100, 101, 99, 115, 105, 114, 104, 103, 118, 112, 106, 116, 119, 121, + 111, 117, 107, 108, 97, 122, 102, 109, 32, 50, 2, 2, 49, 24, 113, 111, 106, 99, 100, 104, + 102, 108, 122, 120, 97, 101, 100, 0, 0, 4, 2, 49, 56, 110, 113, 120, 98, 100, 101, 99, + 115, 105, 114, 104, 103, 118, 112, 106, 116, 119, 121, 111, 117, 107, 108, 97, 122, 102, + 109, 32, 51, 2, 2, 49, 24, 113, 111, 106, 99, 100, 104, 102, 108, 122, 120, 97, 101, 0, 0, + 0, 0, 0, 0, 0, 3, 0, 0, 0, 60, 0, 0, 0, 0, 2, 252, 254, 111, 122, 0, 0, 0, 0, 0, 0, 0, 0, + 1, 147, 120, 5, 42, 180, 0, 0, 1, 147, 120, 5, 42, 180, 255, 255, 255, 255, 255, 255, 255, + 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 1, 20, 0, 0, 0, 2, 49, 6, 122, 97, 98, 0>> + + {response, ""} = Kayrock.Fetch.V5.Response.deserialize(full_data) + + [%{partition_responses: [partition_responses]}] = response.responses + %{record_set: record_set} = partition_responses + [%{records: batch_one}, %{records: batch_two}] = record_set + + # The first batch has 3 records, the second batch has 1 record + assert List.first(batch_one).value == "nqxbdecsirhgvpjtwyouklazfm 1" + assert List.first(batch_one).offset == 0 + + assert List.last(batch_one).value == "nqxbdecsirhgvpjtwyouklazfm 3" + assert List.last(batch_one).offset == 2 + + # The second batch has 1 record + assert List.first(batch_two).value == "zab" + assert List.first(batch_two).offset == 3 + + # Incomplete record batch, 30 => 29 in line 3 + incomplete_data = + <<0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 1, 0, 12, 116, 101, 115, 116, 45, 116, 111, 112, 105, 99, + 45, 50, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 4, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 29, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 202, 0, 0, + 0, 0, 2, 124, 87, 184, 205, 0, 0, 0, 0, 0, 2, 0, 0, 1, 147, 120, 5, 42, 180, 0, 0, 1, 147, + 120, 5, 42, 180, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, + 0, 0, 3, 100, 0, 0, 0, 2, 49, 56, 110, 113, 120, 98, 100, 101, 99, 115, 105, 114, 104, + 103, 118, 112, 106, 116, 119, 121, 111, 117, 107, 108, 97, 122, 102, 109, 32, 49, 2, 2, + 49, 24, 113, 111, 106, 99, 100, 104, 102, 108, 122, 120, 97, 101, 100, 0, 0, 2, 2, 49, 56, + 110, 113, 120, 98, 100, 101, 99, 115, 105, 114, 104, 103, 118, 112, 106, 116, 119, 121, + 111, 117, 107, 108, 97, 122, 102, 109, 32, 50, 2, 2, 49, 24, 113, 111, 106, 99, 100, 104, + 102, 108, 122, 120, 97, 101, 100, 0, 0, 4, 2, 49, 56, 110, 113, 120, 98, 100, 101, 99, + 115, 105, 114, 104, 103, 118, 112, 106, 116, 119, 121, 111, 117, 107, 108, 97, 122, 102, + 109, 32, 51, 2, 2, 49, 24, 113, 111, 106, 99, 100, 104, 102, 108, 122, 120, 97, 101, 0, 0, + 0, 0, 0, 0, 0, 3, 0, 0, 0, 60, 0, 0, 0, 0, 2, 252, 254, 111, 122, 0, 0, 0, 0, 0, 0, 0, 0, + 1, 147, 120, 5, 42, 180, 0, 0, 1, 147, 120, 5, 42, 180, 255, 255, 255, 255, 255, 255, 255, + 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 1, 20, 0, 0, 0, 2, 49, 6, 122, 97, 98, 0>> + + {response, _} = Kayrock.Fetch.V5.Response.deserialize(incomplete_data) + + [%{partition_responses: [partition_responses]}] = response.responses + %{record_set: record_set} = partition_responses + [%{records: batch_one}] = record_set + + # The first batch has 3 records, the second batch has 1 record + assert List.first(batch_one).value == "nqxbdecsirhgvpjtwyouklazfm 1" + assert List.first(batch_one).offset == 0 + + assert List.last(batch_one).value == "nqxbdecsirhgvpjtwyouklazfm 3" + assert List.last(batch_one).offset == 2 + end + test "correctly handle timestamps for LogAppend" do data = <<0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 1, 0, 25, 116, 101, 115, 116, 95, 108, 111, 103, 95, 97, diff --git a/test/support/integration_case.ex b/test/support/integration_case.ex index b3b91d7..5bb8364 100644 --- a/test/support/integration_case.ex +++ b/test/support/integration_case.ex @@ -4,49 +4,23 @@ defmodule Kayrock.IntegrationCase do """ use ExUnit.CaseTemplate - if Code.ensure_compiled?(Testcontainers) do - using do - quote do - @moduletag :integration_v2 - import Testcontainers.ExUnit + using do + quote do + @moduletag :integration_v2 + import Testcontainers.ExUnit - alias Testcontainers.Container - alias Testcontainers.KafkaContainer - end + alias Testcontainers.Container + alias Testcontainers.KafkaContainer end + end - setup_all do - case Testcontainers.start_link() do - {:ok, _} -> - :ok - - {:error, {:already_started, _}} -> - :ok - end - end - else - defmodule TestcontainersStub do - @moduledoc false - - def container(_name, _config, _opts) do + setup_all do + case Testcontainers.start_link() do + {:ok, _} -> :ok - end - end - defmodule KafkaContainerStub do - @moduledoc false - - def new() do - end - end - - using do - quote do - @moduletag :integration_v2 - import TestcontainersStub - - alias Kayrock.IntegrationCase.KafkaContainerStub, as: KafkaContainer - end + {:error, {:already_started, _}} -> + :ok end end end