From 17e9cd98deae24f9201338c14296a8586ad84738 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ingo=20M=C3=BCller?= Date: Wed, 22 Feb 2023 08:51:35 +0000 Subject: [PATCH] XXX: Implement test case with artificial time delay. --- .../test/python/dialects/iterators/arrow.py | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/experimental/iterators/test/python/dialects/iterators/arrow.py b/experimental/iterators/test/python/dialects/iterators/arrow.py index a232fb713892..2acb4e444b79 100644 --- a/experimental/iterators/test/python/dialects/iterators/arrow.py +++ b/experimental/iterators/test/python/dialects/iterators/arrow.py @@ -2,7 +2,9 @@ import ctypes import os +import sys from tempfile import NamedTemporaryFile +import time import numpy as np import pandas as pd @@ -249,3 +251,46 @@ def testArrowCSVInput(): # CHECK-NEXT: (10, 510, 1010, 1510, 2510, 3010) # CHECK-NEXT: (35, 535, 1035, 1535, 2535, 3035) sum_batches_elementwise_with_iterators(reader) + + +# Test case: Read from a sequence of Arrow arrays/record batches (produced by a +# Python generator). + + +# Create a generator that produces single-row record batches with increasing +# numbers with an artificial delay of one second after each of them. Since each +# generated record batch immediately produces output, this visually demonstrate +# that the consumption by the MLIR-based iterators interleaves with the +# Python-based production of the record batches in the stream. +def generate_batches_with_delay(schema: pa.Schema) -> None: + for i in range(5): + arrays = [ + pa.array(np.array([i], field.type.to_pandas_dtype())) + for field in schema + ] + batch = pa.RecordBatch.from_arrays(arrays, schema=schema) + yield batch + # Sleep only when a TTY is attached (in order not to delay unit tests). + if sys.stdout.isatty(): + time.sleep(1) + + +# CHECK-LABEL: TEST: testGeneratorInput +@run +def testGeneratorInput(): + # Use pyarrow to create an Arrow table in memory. + table = create_test_input() + + # Make physically separate batches from the table. (This ensures offset=0). + generator = generate_batches_with_delay(table.schema) + + # Create a RecordBatchReader and export it as a C struct. + reader = pa.RecordBatchReader.from_batches(table.schema, generator) + + # Hand the reader as an Arrow array stream to the Iterators test program. + # CHECK-NEXT: (0, 0, 0, 0, 0, 0, 0) + # CHECK-NEXT: (1, 1, 1, 1, 1, 1, 1) + # CHECK-NEXT: (2, 2, 2, 2, 2, 2, 2) + # CHECK-NEXT: (3, 3, 3, 3, 3, 3, 3) + # CHECK-NEXT: (4, 4, 4, 4, 4, 4, 4) + sum_batches_elementwise_with_iterators(reader)