Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 44 additions & 20 deletions beam/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
## Apache Beam Integration for ArrayRecord
# Apache Beam Integration for ArrayRecord

### Quickstart
## Installation

#### Convert TFRecord in a GCS bucket to ArrayRecord
```
```sh
pip install apache-beam[gcp]==2.53.0
pip install array-record[beam]
# check that apache-beam is still at 2.53.0
Expand All @@ -14,9 +13,10 @@ cd array_record/beam/examples
# If use DataFlow, set pipeline_options as instructed in example_gcs_conversion.py
python example_gcs_conversion.py
```
If DataFlow is used, you can monitor the run from the DataFlow job monitoring UI (https://cloud.google.com/dataflow/docs/guides/monitoring-overview)

### Summary
If DataFlow is used, you can monitor the run from [the DataFlow job monitoring UI](https://cloud.google.com/dataflow/docs/guides/monitoring-overview).

## Summary

This submodule provides some Apache Beam components and lightweight pipelines for converting different file formats (TFRecord at present) into ArrayRecords. The intention is to provide a variety of fairly seamless tools for migrating existing TFRecord datasets, allowing a few different choices regarding sharding and write location.

Expand All @@ -36,7 +36,7 @@ In addition to these components, there are a number of simple pipelines included

In addition to all of that, there are a handful of dummy data generation functions used for testing and validation.

### Usage
## Usage

**Basics and 'Getting Started'**

Expand All @@ -48,40 +48,64 @@ Once installed, all of the Beam components are available to import from `array_r

If you're familiar with Apache Beam and want to build a custom pipeline around its core constructs, you can import the native Beam objects and implement them as you see fit.

To import the PTransform with the disk-based sink, use `from array_record.beam.arrayrecordio import WriteToArrayRecord`. You may then use it as a standard step in Beam Pipeline. It accepts a variety of different inputs including `file_path_prefix`, `file_path_suffix`, `coder`, and `num_shards`. For more detail, as well as options for extensibility, please refer to [Apache Beam's Documentation for FileBasedSink](https://beam.apache.org/releases/pydoc/current/apache_beam.io.filebasedsink.html)
To import the PTransform with the disk-based sink, use:

```py
from array_record.beam.arrayrecordio import WriteToArrayRecord
```

You may then use it as a standard step in Beam Pipeline. It accepts a variety of different inputs including:
- `file_path_prefix`,
- `file_path_suffix`,
- `coder`,
- `num_shards`.

For more detail, as well as options for extensibility, please refer to [Apache Beam's Documentation for FileBasedSink](https://beam.apache.org/releases/pydoc/current/apache_beam.io.filebasedsink.html).

To import the custom `DoFn`, use:

To import the custom DoFn, use `from array_record.beam.dofns import ConvertToArrayRecordGCS`. You may then use it as a parameter for a Beam `ParDo`. It takes a handful of side inputs as described below:
```py
from array_record.beam.dofns import ConvertToArrayRecordGCS
```

You may then use it as a parameter for a Beam `ParDo`. It takes a handful of side inputs as described below:

- **path:** REQUIRED (and positional). The intended path prefix for the GCS bucket in "gs://..." format
- **overwrite_extension:** FALSE by default. Boolean making the DoFn attempt to overwrite any file extension after "."
- **file_path_suffix:** ".arrayrecord" by default. Intended suffix for overwrite or append
- **path:** REQUIRED (and positional). The intended path prefix for the GCS bucket in `gs://...` format,
- **overwrite_extension:** FALSE by default. Boolean making the DoFn attempt to overwrite any file extension after `.`,
- **file_path_suffix:** `.arrayrecord` by default. Intended suffix for overwrite or append.

Note that by default, the DoFn will APPEND an existing filename/extension with ".arrayrecord". Setting `file_path_suffix` to `""` will leave the file names as-is and thus expect you to be passing in a different `path` than the source.
Note that by default, the `DoFn` will APPEND an existing filename/extension with `.arrayrecord`. Setting `file_path_suffix` to `""` will leave the file names as-is and thus expect you to be passing in a different `path` than the source.

You can see usage details for each of these implementations in `pipelines.py`.

**Using the Helper Functions**

Several helper functions have been packaged to make the functionality more accessible to those with less comfort building Apache Beam pipelines. All of these pipelines take `input` and `output` arguments, which are intended as the respective source and destination paths of the TFRecord files and the ArrayRecord files. Wildcards are accepted in these paths. By default, these parameters can either be passed as CLI arguments when executing a pipeline as `python -m <python_module> --input <path> --output <path>`, or as an override to the `args` argument if executing programmatically. Additionally, extra arguments can be passed via CLI or programmatically in the `pipeline_options` argument if you want to control the behavior of Beam. The likely reason for this would be altering the Runner to Google Cloud Dataflow, which these examples support (with caveats; see the section below on Dataflow).

There are slight variations in execution when running these either from an interpreter or the CLI, so familiarize yourself with the files in the `examples/` directory along with `demo.py`, which show the different invocation methods. The available functions can all be imported `from array_record.beam.pipelines import *` and are as follows:
There are slight variations in execution when running these either from an interpreter or the CLI, so familiarize yourself with the files in the `examples/` directory along with `demo.py`, which show the different invocation methods. The available functions can all be imported with:

```py
from array_record.beam.pipelines import *
```

and are as follows:

- **convert_tf_to_arrayrecord_disk:** Converts TFRecords at `input` path to ArrayRecords at `output` path for disk-based writes only. Accepts an extra `num_shards` argument for resharding ArrayRecords across an arbitrary number of files.
- **convert_tf_to_arrayrecord_disk_match_shards:** Same as above, except it reads the number of source files and matches them to the destination. There is no `num_shards` argument.
- **convert_tf_to_arrayrecord_gcs:** Converts TFRecords at `input` path to ArrayRecords at `output` path, where the `output` path **must** be a GCS bucket in "gs://" format. This function accepts the same `overwrite_extension` and `file_path_suffix` arguments as the DoFn itself, allowing for customization of file naming.
| Functions | Description |
| --- | --- |
| `convert_tf_to_arrayrecord_disk`, `convert_text_to_arrayrecord_disk` | Converts TFRecords/Text files at `input` path to ArrayRecords at `output` path for disk-based writes only. Accepts an extra `num_shards` argument for resharding ArrayRecords across an arbitrary number of files. |
| `convert_tf_to_arrayrecord_disk_match_shards`, `convert_text_to_arrayrecord_disk_match_shards` | Same as above, except it reads the number of source files and matches them to the destination. There is no `num_shards` argument. |
| `convert_tf_to_arrayrecord_gcs`, `convert_text_to_arrayrecord_gcs` | Converts TFRecords/Text files at `input` path to ArrayRecords at `output` path, where the `output` path **must** be a GCS bucket in `gs://` format. This function accepts the same `overwrite_extension` and `file_path_suffix` arguments as the `DoFn` itself, allowing for customization of file naming. |

### Examples and Demos
## Examples and Demos

See the examples in the `examples/` directory for different invocation techniques. One of the examples invokes `array_record.beam.demo` as a module, which is a simple pipeline that generates some TFRecords and then converts them to ArrayRecord in GCS. You can see the implementation in `demo.py`, which should serve as a guide for implementing your own CLI-triggered pipelines.

You'll also note commented sections in each example, which are the configuration parameters for running the pipelines on Google Cloud Dataflow. There is also a `requirements.txt` in there, which at present is a requirement for running these on Dataflow as is. See below for more detail.

### Dataflow Usage
## Dataflow Usage

These pipelines have all been tested and are compatible with Google Cloud Dataflow. Uncomment the sections in the example files and set your own bucket/project information to see it in action.

Note, however, the `requirements.txt` file. This is necessary because the `array-record` PyPl installation does not install the Apache Beam or Tensorflow components by default to keep the library lightweight. A `requirements.txt` passed as an argument to the Dataflow job is required to ensure everything is installed correctly on the runner.


Allow to simmer uncovered for 5 minutes. Plate, serve, and enjoy.
30 changes: 30 additions & 0 deletions beam/examples/example_text_conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Execute this to convert Text file to ArrayRecords using the disk Sink."""


from apache_beam.options import pipeline_options
from array_record.beam.pipelines import convert_text_to_array_record_disk

## Set input and output patterns as specified
input_pattern = 'gs://<YOUR_INPUT_BUCKET>/records/*.txt'
output_path = 'records/movies'

args = {'input': input_pattern, 'output': output_path}

## If run in Dataflow, set pipeline options and uncomment in main()
## If run pipeline_options is not set, you will use a local runner
pipeline_options = pipeline_options.PipelineOptions(
runner='DataflowRunner',
project='<YOUR_PROJECT>',
region='<YOUR_REGION>',
requirements_file='requirements.txt'
)


def main():
convert_text_to_array_record_disk(
args=args,
# pipeline_options=pipeline_options,
).run()

if __name__ == '__main__':
main()
Loading