diff --git a/beam/README.md b/beam/README.md index 5304e27..54c2753 100644 --- a/beam/README.md +++ b/beam/README.md @@ -1,9 +1,8 @@ -## Apache Beam Integration for ArrayRecord +# Apache Beam Integration for ArrayRecord -### Quickstart +## Installation -#### Convert TFRecord in a GCS bucket to ArrayRecord -``` +```sh pip install apache-beam[gcp]==2.53.0 pip install array-record[beam] # check that apache-beam is still at 2.53.0 @@ -14,9 +13,10 @@ cd array_record/beam/examples # If use DataFlow, set pipeline_options as instructed in example_gcs_conversion.py python example_gcs_conversion.py ``` -If DataFlow is used, you can monitor the run from the DataFlow job monitoring UI (https://cloud.google.com/dataflow/docs/guides/monitoring-overview) -### Summary +If DataFlow is used, you can monitor the run from [the DataFlow job monitoring UI](https://cloud.google.com/dataflow/docs/guides/monitoring-overview). + +## Summary This submodule provides some Apache Beam components and lightweight pipelines for converting different file formats (TFRecord at present) into ArrayRecords. The intention is to provide a variety of fairly seamless tools for migrating existing TFRecord datasets, allowing a few different choices regarding sharding and write location. @@ -36,7 +36,7 @@ In addition to these components, there are a number of simple pipelines included In addition to all of that, there are a handful of dummy data generation functions used for testing and validation. -### Usage +## Usage **Basics and 'Getting Started'** @@ -48,16 +48,33 @@ Once installed, all of the Beam components are available to import from `array_r If you're familiar with Apache Beam and want to build a custom pipeline around its core constructs, you can import the native Beam objects and implement them as you see fit. -To import the PTransform with the disk-based sink, use `from array_record.beam.arrayrecordio import WriteToArrayRecord`. You may then use it as a standard step in Beam Pipeline. It accepts a variety of different inputs including `file_path_prefix`, `file_path_suffix`, `coder`, and `num_shards`. For more detail, as well as options for extensibility, please refer to [Apache Beam's Documentation for FileBasedSink](https://beam.apache.org/releases/pydoc/current/apache_beam.io.filebasedsink.html) +To import the PTransform with the disk-based sink, use: + +```py +from array_record.beam.arrayrecordio import WriteToArrayRecord +``` + +You may then use it as a standard step in Beam Pipeline. It accepts a variety of different inputs including: +- `file_path_prefix`, +- `file_path_suffix`, +- `coder`, +- `num_shards`. + +For more detail, as well as options for extensibility, please refer to [Apache Beam's Documentation for FileBasedSink](https://beam.apache.org/releases/pydoc/current/apache_beam.io.filebasedsink.html). +To import the custom `DoFn`, use: -To import the custom DoFn, use `from array_record.beam.dofns import ConvertToArrayRecordGCS`. You may then use it as a parameter for a Beam `ParDo`. It takes a handful of side inputs as described below: +```py +from array_record.beam.dofns import ConvertToArrayRecordGCS +``` + +You may then use it as a parameter for a Beam `ParDo`. It takes a handful of side inputs as described below: -- **path:** REQUIRED (and positional). The intended path prefix for the GCS bucket in "gs://..." format -- **overwrite_extension:** FALSE by default. Boolean making the DoFn attempt to overwrite any file extension after "." -- **file_path_suffix:** ".arrayrecord" by default. Intended suffix for overwrite or append +- **path:** REQUIRED (and positional). The intended path prefix for the GCS bucket in `gs://...` format, +- **overwrite_extension:** FALSE by default. Boolean making the DoFn attempt to overwrite any file extension after `.`, +- **file_path_suffix:** `.arrayrecord` by default. Intended suffix for overwrite or append. -Note that by default, the DoFn will APPEND an existing filename/extension with ".arrayrecord". Setting `file_path_suffix` to `""` will leave the file names as-is and thus expect you to be passing in a different `path` than the source. +Note that by default, the `DoFn` will APPEND an existing filename/extension with `.arrayrecord`. Setting `file_path_suffix` to `""` will leave the file names as-is and thus expect you to be passing in a different `path` than the source. You can see usage details for each of these implementations in `pipelines.py`. @@ -65,23 +82,30 @@ You can see usage details for each of these implementations in `pipelines.py`. Several helper functions have been packaged to make the functionality more accessible to those with less comfort building Apache Beam pipelines. All of these pipelines take `input` and `output` arguments, which are intended as the respective source and destination paths of the TFRecord files and the ArrayRecord files. Wildcards are accepted in these paths. By default, these parameters can either be passed as CLI arguments when executing a pipeline as `python -m --input --output `, or as an override to the `args` argument if executing programmatically. Additionally, extra arguments can be passed via CLI or programmatically in the `pipeline_options` argument if you want to control the behavior of Beam. The likely reason for this would be altering the Runner to Google Cloud Dataflow, which these examples support (with caveats; see the section below on Dataflow). -There are slight variations in execution when running these either from an interpreter or the CLI, so familiarize yourself with the files in the `examples/` directory along with `demo.py`, which show the different invocation methods. The available functions can all be imported `from array_record.beam.pipelines import *` and are as follows: +There are slight variations in execution when running these either from an interpreter or the CLI, so familiarize yourself with the files in the `examples/` directory along with `demo.py`, which show the different invocation methods. The available functions can all be imported with: + +```py +from array_record.beam.pipelines import * +``` + +and are as follows: -- **convert_tf_to_arrayrecord_disk:** Converts TFRecords at `input` path to ArrayRecords at `output` path for disk-based writes only. Accepts an extra `num_shards` argument for resharding ArrayRecords across an arbitrary number of files. -- **convert_tf_to_arrayrecord_disk_match_shards:** Same as above, except it reads the number of source files and matches them to the destination. There is no `num_shards` argument. -- **convert_tf_to_arrayrecord_gcs:** Converts TFRecords at `input` path to ArrayRecords at `output` path, where the `output` path **must** be a GCS bucket in "gs://" format. This function accepts the same `overwrite_extension` and `file_path_suffix` arguments as the DoFn itself, allowing for customization of file naming. +| Functions | Description | +| --- | --- | +| `convert_tf_to_arrayrecord_disk`, `convert_text_to_arrayrecord_disk` | Converts TFRecords/Text files at `input` path to ArrayRecords at `output` path for disk-based writes only. Accepts an extra `num_shards` argument for resharding ArrayRecords across an arbitrary number of files. | +| `convert_tf_to_arrayrecord_disk_match_shards`, `convert_text_to_arrayrecord_disk_match_shards` | Same as above, except it reads the number of source files and matches them to the destination. There is no `num_shards` argument. | +| `convert_tf_to_arrayrecord_gcs`, `convert_text_to_arrayrecord_gcs` | Converts TFRecords/Text files at `input` path to ArrayRecords at `output` path, where the `output` path **must** be a GCS bucket in `gs://` format. This function accepts the same `overwrite_extension` and `file_path_suffix` arguments as the `DoFn` itself, allowing for customization of file naming. | -### Examples and Demos +## Examples and Demos See the examples in the `examples/` directory for different invocation techniques. One of the examples invokes `array_record.beam.demo` as a module, which is a simple pipeline that generates some TFRecords and then converts them to ArrayRecord in GCS. You can see the implementation in `demo.py`, which should serve as a guide for implementing your own CLI-triggered pipelines. You'll also note commented sections in each example, which are the configuration parameters for running the pipelines on Google Cloud Dataflow. There is also a `requirements.txt` in there, which at present is a requirement for running these on Dataflow as is. See below for more detail. -### Dataflow Usage +## Dataflow Usage These pipelines have all been tested and are compatible with Google Cloud Dataflow. Uncomment the sections in the example files and set your own bucket/project information to see it in action. Note, however, the `requirements.txt` file. This is necessary because the `array-record` PyPl installation does not install the Apache Beam or Tensorflow components by default to keep the library lightweight. A `requirements.txt` passed as an argument to the Dataflow job is required to ensure everything is installed correctly on the runner. - Allow to simmer uncovered for 5 minutes. Plate, serve, and enjoy. diff --git a/beam/examples/example_text_conversion.py b/beam/examples/example_text_conversion.py new file mode 100644 index 0000000..5786e99 --- /dev/null +++ b/beam/examples/example_text_conversion.py @@ -0,0 +1,30 @@ +"""Execute this to convert Text file to ArrayRecords using the disk Sink.""" + + +from apache_beam.options import pipeline_options +from array_record.beam.pipelines import convert_text_to_array_record_disk + +## Set input and output patterns as specified +input_pattern = 'gs:///records/*.txt' +output_path = 'records/movies' + +args = {'input': input_pattern, 'output': output_path} + +## If run in Dataflow, set pipeline options and uncomment in main() +## If run pipeline_options is not set, you will use a local runner +pipeline_options = pipeline_options.PipelineOptions( + runner='DataflowRunner', + project='', + region='', + requirements_file='requirements.txt' +) + + +def main(): + convert_text_to_array_record_disk( + args=args, + # pipeline_options=pipeline_options, + ).run() + +if __name__ == '__main__': + main() diff --git a/beam/pipelines.py b/beam/pipelines.py index effffbe..3f212ab 100644 --- a/beam/pipelines.py +++ b/beam/pipelines.py @@ -1,5 +1,7 @@ """Various opinionated Beam pipelines for testing different functionality.""" +from typing import Any, Callable + import apache_beam as beam from apache_beam.coders import coders from . import arrayrecordio @@ -17,7 +19,8 @@ def example_to_tfrecord( num_shards=1, args=def_args, pipeline_options=def_pipeline_options): - """Beam pipeline for creating example TFRecord data. + """ + Beam pipeline for creating example TFRecord data. Args: num_shards: Number of files @@ -31,13 +34,13 @@ def example_to_tfrecord( p1 = beam.Pipeline(options=pipeline_options) _ = ( p1 - | 'Create' >> beam.Create(example.generate_movie_examples()) - | 'Write' + | "Create" >> beam.Create(example.generate_movie_examples()) + | "Write" >> beam.io.WriteToTFRecord( - args['output'], + args["output"], coder=coders.ToBytesCoder(), num_shards=num_shards, - file_name_suffix='.tfrecord', + file_name_suffix=".tfrecord", ) ) return p1 @@ -60,22 +63,41 @@ def example_to_arrayrecord( p1 = beam.Pipeline(options=pipeline_options) _ = ( p1 - | 'Create' >> beam.Create(example.generate_movie_examples()) - | 'Write' + | "Create" >> beam.Create(example.generate_movie_examples()) + | "Write" >> arrayrecordio.WriteToArrayRecord( - args['output'], + args["output"], coder=coders.ToBytesCoder(), num_shards=num_shards, - file_name_suffix='.arrayrecord', + file_name_suffix=".arrayrecord", + ) + ) + return p1 + + +def _convert_to_array_record_disk( + num_shards, args, pipeline_options, file_type, beam_fn +): + p1 = beam.Pipeline(options=pipeline_options) + _ = ( + p1 + | f"Read {file_type}" >> beam_fn(args["input"]) + | "Write ArrayRecord" + >> arrayrecordio.WriteToArrayRecord( + args["output"], + coder=coders.ToBytesCoder(), + num_shards=num_shards, + file_name_suffix=".arrayrecord", ) ) return p1 def convert_tf_to_arrayrecord_disk( - num_shards=1, args=def_args, pipeline_options=def_pipeline_options + *, num_shards=1, args=def_args, pipeline_options=def_pipeline_options ): - """Convert TFRecords to ArrayRecords using sink/sharding functionality. + """ + Convert TFRecords to ArrayRecords using sink/sharding functionality. THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES @@ -88,29 +110,21 @@ def convert_tf_to_arrayrecord_disk( Beam Pipeline object """ - p1 = beam.Pipeline(options=pipeline_options) - _ = ( - p1 - | 'Read TFRecord' >> beam.io.ReadFromTFRecord(args['input']) - | 'Write ArrayRecord' - >> arrayrecordio.WriteToArrayRecord( - args['output'], - coder=coders.ToBytesCoder(), - num_shards=num_shards, - file_name_suffix='.arrayrecord', - ) + return _convert_to_array_record_disk( + num_shards, args, pipeline_options, "TFRecord", beam.io.ReadFromTFRecord ) - return p1 -def convert_tf_to_arrayrecord_disk_match_shards( - args=def_args, pipeline_options=def_pipeline_options +def convert_text_to_array_record_disk( + *, num_shards=1, args=def_args, pipeline_options=def_pipeline_options ): - """Convert TFRecords to matching number of ArrayRecords. + """ + Convert Text files to ArrayRecords using sink/sharding functionality. THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES Args: + num_shards: Number of files args: Custom arguments pipeline_options: Beam arguments in dict format @@ -118,43 +132,48 @@ def convert_tf_to_arrayrecord_disk_match_shards( Beam Pipeline object """ + return _convert_to_array_record_disk( + num_shards, args, pipeline_options, "Text file", beam.io.ReadFromText + ) + + +def _convert_to_arrayrecord_disk_match_shards(args, pipeline_options, beam_fn): p1 = beam.Pipeline(options=pipeline_options) initial = ( p1 - | 'Start' >> beam.Create([args['input']]) - | 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True) + | "Start" >> beam.Create([args["input"]]) + | "Read" >> beam_fn(with_filename=True) ) file_count = ( initial - | 'Group' >> beam.GroupByKey() - | 'Count Shards' >> beam.combiners.Count.Globally() + | "Group" >> beam.GroupByKey() + | "Count Shards" >> beam.combiners.Count.Globally() ) _ = ( initial - | 'Drop Filename' >> beam.Map(lambda x: x[1]) - | 'Write ArrayRecord' + | "Drop Filename" >> beam.Map(lambda x: x[1]) + | "Write ArrayRecord" >> arrayrecordio.WriteToArrayRecord( - args['output'], + args["output"], coder=coders.ToBytesCoder(), num_shards=beam.pvalue.AsSingleton(file_count), - file_name_suffix='.arrayrecord', + file_name_suffix=".arrayrecord", ) ) return p1 -def convert_tf_to_arrayrecord_gcs( - overwrite_extension=False, - file_path_suffix='.arrayrecord', - args=def_args, - pipeline_options=def_pipeline_options): - """Convert TFRecords to ArrayRecords in GCS 1:1. - +def convert_tf_to_arrayrecord_disk_match_shards( + *, args=def_args, pipeline_options=def_pipeline_options +): + """ + Convert TFRecords to matching number of ArrayRecords. + + THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES + Args: - overwrite_extension: Boolean making DoFn attempt to overwrite extension - file_path_suffix: Intended suffix for overwrite or append args: Custom arguments pipeline_options: Beam arguments in dict format @@ -162,18 +181,97 @@ def convert_tf_to_arrayrecord_gcs( Beam Pipeline object """ + return _convert_to_arrayrecord_disk_match_shards( + args, pipeline_options, beam.io.ReadAllFromTFRecord + ) + + +def convert_text_to_arrayrecord_disk_match_shards( + *, args=def_args, pipeline_options=def_pipeline_options +): + """ + Convert Text files to matching number of ArrayRecords. + + THIS ONLY WORKS FOR DISK ARRAYRECORD WRITES + + Args: + args: Custom arguments + pipeline_options: Beam arguments in dict format + + Returns: + Beam Pipeline object + """ + + return _convert_to_arrayrecord_disk_match_shards( + args, pipeline_options, beam.io.ReadAllFromText + ) + + +def _convert_to_arrayrecord_gcs( + overwrite_extension, file_path_suffix, args, pipeline_options, beam_fn +): p1 = beam.Pipeline(options=pipeline_options) _ = ( p1 - | 'Start' >> beam.Create([args['input']]) - | 'Read' >> beam.io.ReadAllFromTFRecord(with_filename=True) - | 'Group' >> beam.GroupByKey() - | 'Write to ArrayRecord in GCS' + | "Start" >> beam.Create([args["input"]]) + | "Read" >> beam_fn(with_filename=True) + | "Group" >> beam.GroupByKey() + | "Write to ArrayRecord in GCS" >> beam.ParDo( dofns.ConvertToArrayRecordGCS(), - args['output'], + args["output"], file_path_suffix=file_path_suffix, overwrite_extension=overwrite_extension, ) ) return p1 + + +def convert_tf_to_arrayrecord_gcs( + *, + overwrite_extension=False, + file_path_suffix=".arrayrecord", + args=def_args, + pipeline_options=def_pipeline_options +): + """ + Convert TFRecords to ArrayRecords in GCS 1:1. + + Args: + overwrite_extension: Boolean making DoFn attempt to overwrite extension + file_path_suffix: Intended suffix for overwrite or append + args: Custom arguments + pipeline_options: Beam arguments in dict format + + Returns: + Beam Pipeline object + """ + + return _convert_to_arrayrecord_gcs( + overwrite_extension, file_path_suffix, args, pipeline_options, beam.io.ReadAllFromTFRecord + ) + + +def convert_text_to_arrayrecord_gcs( + *, + overwrite_extension=False, + file_path_suffix=".arrayrecord", + args=def_args, + pipeline_options=def_pipeline_options +): + """ + Convert Text files to ArrayRecords in GCS 1:1. + + Args: + overwrite_extension: Boolean making DoFn attempt to overwrite extension + file_path_suffix: Intended suffix for overwrite or append + args: Custom arguments + pipeline_options: Beam arguments in dict format + + Returns: + Beam Pipeline object + """ + + return _convert_to_arrayrecord_gcs( + overwrite_extension, file_path_suffix, args, pipeline_options, beam.io.ReadAllFromText + )