Skip to content

Commit

Permalink
[DE-2344] Use KubernetesPodOperator to merge data into BigQuery (sing…
Browse files Browse the repository at this point in the history
  • Loading branch information
mryorik authored Aug 15, 2022
1 parent 268c2b5 commit 7589369
Show file tree
Hide file tree
Showing 22 changed files with 1,637 additions and 230 deletions.
6 changes: 5 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ AIRFLOW__SECRETS__BACKEND=airflow.providers.google.cloud.secrets.secret_manager.
AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL=10
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=60
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=TRUE
COMPOSER_ENV_NAME=data-composer-staging
COMPOSER_PROJECT=analytics-warehouse-dev
COMPOSER_LOCATION=us-east1

AIRFLOW_VAR_EXAMPLE_VAR="example_var value"
AIRFLOW_CONN_EXAMPLE_CONNECTION="postgresql://user:pass@db_host:5432/db_name"

DWH_PROJECT=airflow-warehouse-dev
DWH_PROJECT=analytics-warehouse-dev
DOCKER_REGISTRY_PROJECT=analytics-warehouse-dev
135 changes: 132 additions & 3 deletions .github/workflows/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ on:

env:
DWH_PROJECT: analytics-warehouse-dev
DOCKER_REGISTRY_PROJECT: analytics-warehouse-dev

jobs:
create-composer-env:
runs-on: [ self-hosted, Linux, X64 ]
if: ${{ inputs.create_env }}
concurrency: ${{ github.event.pull_request.head.ref || github.ref_name }}
outputs:
name: ${{ steps.env-name.outputs.lowercase }}
bucket: ${{ steps.env-bucket.outputs.bucket }}
exists: ${{ steps.env-exists.outputs.env-exists }}
steps:
- name: 'Checkout'
uses: actions/checkout@v3
Expand All @@ -39,29 +40,60 @@ jobs:

- name: 'Set up Cloud SDK'
uses: 'google-github-actions/[email protected]'
with:
install_components: 'kubectl'

- name: "Check if environment exists"
id: env-exists
run: |
echo "::set-output name=env-exists::$(gcloud composer environments list --locations us-east1 | grep ' ${{ steps.env-name.outputs.lowercase }} ' | wc -l | xargs)"
- name: 'Create Composer environment'
if: ${{ steps.env-exists.outputs.env-exists == '0' }}
if: ${{ steps.env-exists.outputs.env-exists == '0' && inputs.create_env }}
run: |
gcloud composer environments create ${{ steps.env-name.outputs.lowercase }} \
--location us-east1 \
--image-version composer-2.0.21-airflow-2.2.5 \
--environment-size small \
--max-workers 1 \
--airflow-configs=scheduler-min_file_process_interval=10,scheduler-dag_dir_list_interval=60,core-dags_are_paused_at_creation=True,secrets-backend=airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend \
--env-variables=DWH_PROJECT=${{ env.DWH_PROJECT }}
--env-variables=DWH_PROJECT=${{ env.DWH_PROJECT }},DOCKER_REGISTRY_PROJECT=${{ env.DOCKER_REGISTRY_PROJECT }},JOB_IMAGE_TAG=${{ github.ref_name }} \
--network default --subnetwork composer --cluster-secondary-range-name pods --services-secondary-range-name services
- name: 'Get GKE cluster credentials'
if: ${{ steps.env-exists.outputs.env-exists == '1' || inputs.create_env }}
run: |
gcloud container clusters get-credentials \
$(gcloud composer environments describe ${{ steps.env-name.outputs.lowercase }} \
--format 'get(config.gkeCluster)' --location us-east1) \
--zone us-east1
- name: 'Enable Workload Identity for kube-public/default service account'
if: ${{ steps.env-exists.outputs.env-exists == '1' || inputs.create_env }}
run: |
kubectl annotate serviceaccount default \
--namespace kube-public --overwrite \
iam.gke.io/gcp-service-account=data-composer-github@analytics-warehouse-dev.iam.gserviceaccount.com
gcloud iam service-accounts add-iam-policy-binding data-composer-github@analytics-warehouse-dev.iam.gserviceaccount.com \
--role roles/iam.workloadIdentityUser \
--member "serviceAccount:analytics-warehouse-dev.svc.id.goog[kube-public/default]"
- name: 'Grant cluster-admin role to composer-2-*/default service account'
if: ${{ steps.env-exists.outputs.env-exists == '1' || inputs.create_env }}
run: |
kubectl delete clusterrolebinding composer-cluster-admin | true
kubectl create clusterrolebinding composer-cluster-admin \
--namespace kube-public --clusterrole cluster-admin \
--serviceaccount=$(kubectl get namespaces --output=jsonpath='{.items[].metadata.name}' | grep composer-2-):default
- name: 'Get Composer environment bucket'
if: ${{ steps.env-exists.outputs.env-exists == '1' || inputs.create_env }}
id: env-bucket
run: |
echo "::set-output name=bucket::$(gcloud composer environments describe ${{ steps.env-name.outputs.lowercase }} --location us-east1 --format="get(config.dagGcsPrefix)" | sed -E 's/^.+\/(.+)\/.*$/\1/')"
- name: 'Install Python dependencies'
if: ${{ steps.env-exists.outputs.env-exists == '1' || inputs.create_env }}
run: |
gcloud composer environments update ${{ steps.env-name.outputs.lowercase }} \
--location us-east1 \
Expand All @@ -71,6 +103,7 @@ jobs:
runs-on: [ self-hosted, Linux, X64 ]
needs:
- create-composer-env
if: ${{ needs.create-composer-env.outputs.exists == '1' || inputs.create_env }}
steps:
- name: 'Checkout'
uses: actions/checkout@v3
Expand Down Expand Up @@ -109,3 +142,99 @@ jobs:
- name: 'Upload DAGs'
run: |
gsutil rsync -rd dags gs://${{ needs.create-composer-env.outputs.bucket }}/dags
merge-into-bq:
runs-on: [ self-hosted, Linux, X64 ]
concurrency: merge-into-bq-${{ github.event.pull_request.head.ref || github.ref_name }}
steps:
- name: 'Checkout'
uses: actions/checkout@v3
with:
fetch-depth: 0

- uses: docker/setup-buildx-action@v2
- uses: docker/login-action@v1
with:
registry: gcr.io
username: _json_key
password: ${{ secrets.DATA_COMPOSER_SA_DEV_KEY }}

- name: 'Get master HEAD commit SHA'
id: master-sha
run: |
git show-ref master -s
echo "::set-output name=value::$(git show-ref master -s)"
- name: 'Get changed files'
id: changed-files
uses: tj-actions/changed-files@v24
with:
base_sha: ${{ steps.master-sha.outputs.value }}
files: |
jobs/merge-into-bq/**/*
- name: 'Setup Python'
if: ${{ steps.changed-files.outputs.any_modified == 'true' }}
uses: actions/setup-python@v4
with:
python-version: 3.8
- name: 'Get pip cache dir'
if: ${{ steps.changed-files.outputs.any_modified == 'true' }}
id: pip-cache-dir
run: |
echo "::set-output name=value::$(pip cache dir)"
- name: 'Cache pip'
if: ${{ steps.changed-files.outputs.any_modified == 'true' }}
uses: actions/cache@v2
with:
path: ${{ steps.pip-cache-dir.outputs.value }}
key: ${{ runner.os }}-poetry-${{ hashFiles('jobs/merge-into-bq/poetry.lock') }}

- name: 'Setup Poetry'
if: ${{ steps.changed-files.outputs.any_modified == 'true' }}
uses: Gr1N/setup-poetry@v7

- name: 'Poetry install'
if: ${{ steps.changed-files.outputs.any_modified == 'true' }}
working-directory: jobs/merge-into-bq
run: poetry install

- name: 'Pytest'
if: ${{ steps.changed-files.outputs.any_modified == 'true' }}
working-directory: jobs/merge-into-bq
run: poetry run pytest -vv

- name: 'Get image tag'
if: ${{ steps.changed-files.outputs.any_modified == 'true' }}
id: image-tag
run: |
if [ '${{ github.ref_name }}' == 'master' ]; then
echo "::set-output name=value::gcr.io/toptal-hub/data-composer/jobs/merge-into-bq:latest"
else
echo "::set-output name=value::gcr.io/analytics-warehouse-dev/data-composer/jobs/merge-into-bq:${{ github.ref_name }}"
fi
- name: 'Cache Docker layers'
if: ${{ steps.changed-files.outputs.any_modified == 'true' }}
uses: actions/cache@v2
with:
path: /tmp/.buildx-cache-staging
key: ${{ runner.os }}-buildx-staging-${{ github.ref_name }}
restore-keys: ${{ runner.os }}-buildx-staging-master

- name: 'Docker build and push'
if: ${{ steps.changed-files.outputs.any_modified == 'true' }}
uses: docker/build-push-action@v3
with:
context: jobs/merge-into-bq
build-contexts: pip_cache=${{ steps.pip-cache-dir.outputs.value }}
push: true
tags: ${{ steps.image-tag.outputs.value }}
cache-from: type=local,src=/tmp/.buildx-cache-staging
cache-to: type=local,dest=/tmp/.buildx-cache-new-staging

- name: 'Update docker cache'
if: ${{ steps.changed-files.outputs.any_modified == 'true' }}
run: |
rm -rf /tmp/.buildx-cache-staging
mv /tmp/.buildx-cache-new-staging /tmp/.buildx-cache-staging
3 changes: 2 additions & 1 deletion .github/workflows/production.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ concurrency: production

env:
DWH_PROJECT: toptal.com:api-project-726361118046
DOCKER_REGISTRY_PROJECT: toptal-hub

jobs:
update-composer-env:
Expand Down Expand Up @@ -60,7 +61,7 @@ jobs:
run: |
gcloud composer environments update ${{ steps.env-name.outputs.lowercase }} \
--location us-east1 \
--update-env-variables=DWH_PROJECT=${{ env.DWH_PROJECT }} || true
--update-env-variables=DWH_PROJECT=${{ env.DWH_PROJECT }},DOCKER_REGISTRY_PROJECT=${{ env.DOCKER_REGISTRY_PROJECT }} || true
upload-dags:
runs-on: [ self-hosted, Linux, X64 ]
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ concurrency: staging

env:
DWH_PROJECT: analytics-warehouse-dev
DOCKER_REGISTRY_PROJECT: analytics-warehouse-dev

jobs:
update-composer-env:
Expand Down Expand Up @@ -47,7 +48,7 @@ jobs:
--environment-size small \
--max-workers 1 \
--airflow-configs=scheduler-min_file_process_interval=10,scheduler-dag_dir_list_interval=60,core-dags_are_paused_at_creation=True,secrets-backend=airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend,sentry-sentry_dsn=https://[email protected]/6564231 \
--env-variables=DWH_PROJECT=${{ env.DWH_PROJECT }}
--env-variables=DWH_PROJECT=${{ env.DWH_PROJECT }},DOCKER_REGISTRY_PROJECT=${{ env.DOCKER_REGISTRY_PROJECT }}
- name: 'Get Composer environment bucket'
id: env-bucket
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,4 @@ dmypy.json

# Docker
.env.local
credentials.json
26 changes: 24 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,32 @@
FROM gcr.io/analytics-warehouse-dev/composer-airflow/v2-2-test/prod/python3.8:2.2.5

COPY requirements.txt .
RUN pip install -r requirements.txt
ARG GOOGLE_APPLICATION_CREDENTIALS
ARG COMPOSER_PROJECT
ARG COMPOSER_LOCATION
ARG COMPOSER_ENV_NAME

RUN curl -O https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-cli-396.0.0-linux-x86_64.tar.gz && \
tar -zxf google-cloud-cli-396.0.0-linux-x86_64.tar.gz && \
./google-cloud-sdk/install.sh && \
./google-cloud-sdk/bin/gcloud components install kubectl --quiet

COPY webserver_config.py .

COPY requirements.txt .
RUN pip install --upgrade pip && pip install -r requirements.txt

COPY --chmod=755 $GOOGLE_APPLICATION_CREDENTIALS* /credentials.json

RUN if [ -f /credentials.json ]; then \
./google-cloud-sdk/bin/gcloud auth activate-service-account --key-file /credentials.json && \
./google-cloud-sdk/bin/gcloud config set project $COMPOSER_PROJECT && \
./google-cloud-sdk/bin/gcloud config set composer/location $COMPOSER_LOCATION && \
./google-cloud-sdk/bin/gcloud container clusters get-credentials --zone $COMPOSER_LOCATION \
$(./google-cloud-sdk/bin/gcloud composer environments describe $COMPOSER_ENV_NAME \
--format 'get(config.gkeCluster)'); \
fi

ENV PATH=$PATH:/opt/airflow/google-cloud-sdk/bin
ENV _AIRFLOW_DB_UPGRADE=1
ENV AIRFLOW__WEBSERVER__RBAC=false
ENV AIRFLOW__CORE__EXECUTOR=LocalExecutor
Expand Down
63 changes: 61 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,34 @@ Data team ETL for Cloud Composer

## Development

### GCP

#### GKE cluster credentials

Connection from a local machine to a GKE cluster used by
Composer environment can be configured in the following steps:

- get GKE cluster id from the Composer environment:
```bash
gcloud composer environments describe <Composer env name> \
--format 'get(config.gkeCluster)' --location us-east1
```
- add cluster credentials to the Kubernetes client configuration:
```bash
gcloud container clusters get-credentials <GKE cluster id> --zone us-east1
```


### Local

In order to test your changes locally you can run a docker-compose
In order to test your changes locally you can run a `docker-compose`
environment with this command:

```bash
docker-compose up --build -d
```

In a while it will launch an airflow instance with webserver
In a while it will launch an Airflow instance with webserver
available at http://localhost:8080.

The directories `dags` and `plugins` are mounted to the Airflow scheduler
Expand All @@ -22,3 +40,44 @@ scheduler immediately.
It takes `AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL` seconds for
the scheduler to see changes in existing DAGs
and `AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL` seconds to see new DAGs.

#### Environment configuration

By default, `docker-compose` loads environment variables from the `.env` file.
In case it's required to specify values that only make sense for a specific user
or task and should not be committed to the repo, one can create a copy
of the `.env` file and redefine the values as needed.
Usually, this local copy is named `.env.local` and ignored by Git.
When using the `.env.local` to configure environment, the command to start `docker-compose`
looks as follows:
```bash
docker-compose --env-file .env.local up -d --build
```
#### KubernetesPodOperator
It's possible to configure local environment for running pods
in your Composer's environment GKE cluster using the `KubernetesPodOperator`.
To do so, specify values for the following environment variables in your `.env.local` file:
```
COMPOSER_ENV_NAME=data-composer-staging
COMPOSER_PROJECT=analytics-warehouse-dev
COMPOSER_LOCATION=us-east1
```
- `COMPOSER_ENV_NAME` defines from which Composer environment we want to reuse a GKE cluster;
- `COMPOSER_PROJECT` is the project where the Composer environment is running,
should be `analytics-warehouse-dev` in most cases;
- `COMPOSER_LOCATION` is the location of the Composer environment, `us-east1` is a reasonable default;
After that, you need to get a service account key that will __only__ be used by
Airflow running __on your machine__ to interact with the GKE cluster.
So it makes sense to use your "personal" service account as explained here -
https://toptal-core.atlassian.net/wiki/spaces/AN/pages/2808316004/0002+Accepted+Personal+service+accounts
Create a key for the service account and put it under the name `credentials.json`
in the root of repository (it will be ignored by Git).
Now you can start `docker-compose` and check that your pods are spawned in the right cluster.
11 changes: 0 additions & 11 deletions dags/masterdata/failing_dag.py

This file was deleted.

Loading

0 comments on commit 7589369

Please sign in to comment.