Skip to content

Commit

Permalink
Example: Dataflow BigQuery to AlloyDB (#1035)
Browse files Browse the repository at this point in the history
* initial commit

* update readme.md

* fix typos

* lint README

* fix lint

* fix lint flake8

* fix format

---------

Co-authored-by: Andrew Gold <[email protected]>
  • Loading branch information
jogando and agold-rh authored Oct 10, 2023
1 parent df02b38 commit a96e4e7
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ them to fit your particular use case.
from Cloud Pub/Sub, enhancing the document using metadata stored in Cloud
Bigtable and indexing those documents into
[Elasticsearch](https://www.elastic.co/).
* [Dataflow BigQuery to AlloyDB](examples/dataflow-bigquery-to-alloydb/) -
Example that shows how to move data from BigQuery to an AlloyDB table using Dataflow.
* [Dataflow Flex Template in Restricted Networking Env](examples/dataflow-flex-python/) -
Example implements a python flex template which can be run in an environment
where workers can not download python packages due to egress traffic restrictions.
Expand Down
102 changes: 102 additions & 0 deletions examples/dataflow-bigquery-to-alloydb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
110 changes: 110 additions & 0 deletions examples/dataflow-bigquery-to-alloydb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# dataflow-bigquery-to-alloydb

We are going to be moving data from a public dataset stored in BigQuery into a
table that will be created in AlloyDB.
This is the BigQuery query that will generate the source data:

```sql
SELECT
from_address,
to_address,
CASE
WHEN SAFE_CAST(value AS NUMERIC) IS NULL THEN 0
ELSE SAFE_CAST(value AS NUMERIC)
END AS value,
block_timestamp
FROM
bigquery-public-data.crypto_ethereum.token_transfers
WHERE
DATE(block_timestamp) = DATE_ADD(CURRENT_DATE(), INTERVAL -1 DAY)
```

## Create the AlloyDB table in which we will store the BigQuery data

Create a database for the table in AlloyDB:

```SQL
CREATE DATABASE ethereum;
```

Create the table in which we will write the BigQuery data:

```sql
CREATE TABLE token_transfers (
from_address VARCHAR,
to_address VARCHAR,
value NUMERIC,
block_timestamp TIMESTAMP
);
```

## Create the local environment

```
python3 -m venv env
source env/bin/activate
pip3 install -r requirements.txt
```

## Running the Dataflow pipeline

If the Python environment is not activated, you need to do it:

```
source env/bin/activate
```

For running the Dataflow pipeline, a Bucket is needed for staging the BigQuery
data. If you don't have a bucket, please create one in the same region in
which Dataflow will run, for example in `southamerica-east1`

```
gcloud storage buckets create gs://<BUCKET_NAME> --location=southamerica-east1
```

Configure environment variables

```
TMP_BUCKET=<name of the bucket used for staging>
PROJECT=<name of your GCP project>
REGION=<name of the GCP region in which Dataflow will run>
SUBNETWORK=<ID of the subnetwork in which Dataflow will run, for example:
https://www.googleapis.com/compute/v1/projects/<NAME_OF_THE_VPC_PROJECT>/regions/<REGION>/subnetworks/<NAME_OF_THE_SUBNET>
ALLOYDB_IP=<IP address of AlloyDB>
ALLOYDB_USERNAME=<USERNAME used for connecting to AlloyDB>
ALLOYDB_PASSWORD=<PASSWORD used for connecting to AlloyDB>
ALLOYDB_DATABASE=ethereum
ALLOYDB_TABLE=token_transfers
BQ_QUERY="
SELECT
from_address,
to_address,
CASE
WHEN SAFE_CAST(value AS NUMERIC) IS NULL THEN 0
ELSE SAFE_CAST(value AS NUMERIC)
END AS value,
block_timestamp
FROM
bigquery-public-data.crypto_ethereum.token_transfers
WHERE
DATE(block_timestamp) = DATE_ADD(CURRENT_DATE(), INTERVAL -1 DAY)
"
```

Execute the pipeline

```
python3 main.py \
--runner DataflowRunner \
--region ${REGION} \
--project ${PROJECT} \
--temp_location gs://${TMP_BUCKET}/tmp/ \
--alloydb_username ${ALLOYDB_USERNAME} \
--alloydb_password ${ALLOYDB_PASSWORD} \
--alloydb_ip ${ALLOYDB_IP} \
--alloydb_database ${ALLOYDB_DATABASE} \
--alloydb_table ${ALLOYDB_TABLE} \
--bq_query "${BQ_QUERY}" \
--no_use_public_ips \
--subnetwork=${SUBNETWORK}
```
97 changes: 97 additions & 0 deletions examples/dataflow-bigquery-to-alloydb/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""This module shows how to move data from BigQuery to AlloyDB with Dataflow"""

# Copyright 2023 Google LLC All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import logging
import typing
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.jdbc import WriteToJdbc
from apache_beam import coders


class EthereumTokenTransfersRowType(typing.NamedTuple):
"""Class that defines the Row Type of the AlloyDB table"""
from_address: str
to_address: str
value: float
block_timestamp: str


def run(argv=None, save_main_session=True):
"""Runs the pipeline"""

parser = argparse.ArgumentParser()
parser.add_argument('--alloydb_username',
dest='alloydb_username',
required=True,
help='AlloyDB username')
parser.add_argument('--alloydb_password',
dest='alloydb_password',
required=True,
help='AlloyDB password')
parser.add_argument('--alloydb_ip',
dest='alloydb_ip',
required=True,
help='AlloyDB IP Address')
parser.add_argument('--alloydb_port',
dest='alloydb_port',
default="5432",
help='AlloyDB Port')
parser.add_argument('--alloydb_database',
dest='alloydb_database',
required=True,
help='AlloyDB Database name')
parser.add_argument('--alloydb_table',
dest='alloydb_table',
required=True,
help='AlloyDB table name')
parser.add_argument(
'--bq_query',
dest='bq_query',
required=True,
help='Query to be executed by BigQuery for extracting the source data')
known_args, pipeline_args = parser.parse_known_args(argv)

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

with beam.Pipeline(options=pipeline_options) as p:
coders.registry.register_coder(EthereumTokenTransfersRowType,
coders.RowCoder)

(p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(
query=known_args.bq_query, use_standard_sql=True) |
'ConvertToRows' >> beam.Map(lambda bq_row: beam.Row(
from_address=bq_row['from_address'],
to_address=bq_row['to_address'],
value=bq_row['value'],
block_timestamp=bq_row['block_timestamp'].isoformat()
)).with_output_types(EthereumTokenTransfersRowType) | 'Write to jdbc'
>> WriteToJdbc(driver_class_name='org.postgresql.Driver',
table_name=known_args.alloydb_table,
jdbc_url=(f'jdbc:postgresql://{known_args.alloydb_ip}:'
f'{known_args.alloydb_port}'
f'/{known_args.alloydb_database}'),
username=known_args.alloydb_username,
password=known_args.alloydb_password,
connection_properties='stringtype=unspecified'))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
1 change: 1 addition & 0 deletions examples/dataflow-bigquery-to-alloydb/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
apache-beam[gcp]==2.41.0

0 comments on commit a96e4e7

Please sign in to comment.