Skip to content

Commit

Permalink
dt/xform: Add tests for 'from-offset' option
Browse files Browse the repository at this point in the history
- Consume records that were produced before the deploy
- Specify offsets that run off the end of the input topic
- Ill-formed offsets

Signed-off-by: Oren Leiman <[email protected]>
  • Loading branch information
oleiman committed Jun 25, 2024
1 parent 6aa6d82 commit 78e09c0
Showing 1 changed file with 70 additions and 2 deletions.
72 changes: 70 additions & 2 deletions tests/rptest/tests/data_transforms_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def _deploy_wasm(self,
compression_type: TopicSpec.CompressionTypes
| None = None,
wait_running: bool = True,
retry_on_exc: bool = True):
retry_on_exc: bool = True,
from_offset: str | None = None):
"""
Deploy a wasm transform and wait for all processors to be running.
"""
Expand All @@ -74,7 +75,8 @@ def do_deploy():
input_topic.name,
[o.name for o in output_topic],
file=file,
compression_type=compression_type)
compression_type=compression_type,
from_offset=from_offset)
return True

wait_until(
Expand Down Expand Up @@ -474,6 +476,72 @@ def compression_set(compression_type: TopicSpec.CompressionTypes):
self.logger.info(f"{consumer_status}")
assert consumer_status.invalid_records == 0, f"transform verification failed with invalid records: {consumer_status}"

@cluster(num_nodes=4)
@matrix(offset=["+0", "-1", f"@{int(time.time() * 1000)}"])
def test_consume_from_offset(self, offset):
'''
Verify that offset-delta based and timestamp based consumption works as expected.
That is, records produced prior to deployment should still be accessible with
given appropriate offset config.
'''
input_topic = self.topics[0]
output_topic = self.topics[1]
producer_status = self._produce_input_topic(topic=self.topics[0])
self._deploy_wasm(name="identity-xform",
input_topic=input_topic,
output_topic=output_topic,
from_offset=offset,
wait_running=True)
consumer_status = self._consume_output_topic(topic=self.topics[1],
status=producer_status)

self.logger.info(f"{consumer_status}")
assert consumer_status.invalid_records == 0, f"transform verification failed with invalid records: {consumer_status}"

@cluster(num_nodes=4)
@matrix(offset=[
None, # No offest -> read from the end of the topic
"@33276193569000", # June 3024
])
def test_consume_off_end(self, offset):
'''
Verify that consuming off the end of the input topic works as expected.
That is, confirm that records produced _prior_ to deployment do not reach the transform.
'''
input_topic = self.topics[0]
output_topic = self.topics[1]
producer_status = self._produce_input_topic(topic=self.topics[0])
self._deploy_wasm(name="identity-xform",
input_topic=input_topic,
output_topic=output_topic,
from_offset=offset,
wait_running=True)

with expect_exception(TimeoutError, lambda _: True):
_ = self._consume_output_topic(topic=self.topics[1],
status=producer_status)

@cluster(num_nodes=3)
@matrix(offset=[
"@9223372036854775807", # int64_max (out of range for millis)
"+NaN", # lexical cast error
"-9223372036854775808", # lexical cast error (int64 overflow)
f"{time.time() * 1000}", # lexical cast error (float value)
])
def test_consume_junk_off(self, offset):
'''
Tests for junk data. All cases should fail cleanly in the admin API
'''
input_topic = self.topics[0]
output_topic = self.topics[1]
with expect_exception(RpkException, lambda e: "Bad offset" in str(e)):
self._deploy_wasm(name="identity-xform",
input_topic=input_topic,
output_topic=output_topic,
from_offset=offset,
wait_running=False,
retry_on_exc=False)


class DataTransformsChainingTest(BaseDataTransformsTest):
"""
Expand Down

0 comments on commit 78e09c0

Please sign in to comment.