Skip to content

Latest commit

 

History

History
187 lines (143 loc) · 7.77 KB

File metadata and controls

187 lines (143 loc) · 7.77 KB

Python Dependencies

You can create isolated Python virtual environments to package multiple Python libraries for a PySpark job. Here is an example of how you can package Great Expectations and profile a set of sample data.

Pre-requisites

Important

This example is intended to be run in the us-east-1 region as it reads data from New York City Taxi dataset from the Registry of Open Data. If your EMR Serverless application is in a different region, you must configure networking.

Important

The default Dockerfile is configured to use linux/amd64 If using Graviton, update to use linux/arm64 or pass --platform linux/arm64 to the docker build command. See the EMR Serverless architecture options for more detail. If using EMR 7.x, you must use Amazon Linux 2023 as the base image instead of Amazon Linux 2. A sample file is provided in Dockerfile.al2023.

Set the following variables according to your environment.

export S3_BUCKET=<YOUR_S3_BUCKET_NAME>
export APPLICATION_ID=<EMR_SERVERLESS_APPLICATION_ID>
export JOB_ROLE_ARN=<EMR_SERVERLESS_IAM_ROLE>

Profile data with EMR Serverless and Great Expectations

The example below builds a virtual environment with the necessary dependencies to use Great Expectations to profile a limited set of data from the New York City Taxi and Limo trip data.

All the commands below should be executed in this (examples/pyspark/dependencies) directory.

  1. Build your virtualenv archive

This command builds the included Dockerfile and exports the resulting pyspark_ge.tar.gz file to your local filesystem.

# Enable BuildKit backend
DOCKER_BUILDKIT=1 docker build --output . .
aws s3 cp pyspark_ge.tar.gz s3://${S3_BUCKET}/artifacts/pyspark/
  1. Copy your code

There's a sample ge_profile.py script included here.

aws s3 cp ge_profile.py s3://${S3_BUCKET}/code/pyspark/
  1. Run your job
  • entryPoint should point to your script on S3
  • entryPointArguments defines the output location of the Great Expectations profiler
  • The virtualenv archive is added via the --archives parameter
  • The driver and executor Python paths are configured via the various --conf spark.emr-serverless parameters
aws emr-serverless start-job-run \
    --application-id $APPLICATION_ID \
    --execution-role-arn $JOB_ROLE_ARN \
    --job-driver '{
        "sparkSubmit": {
            "entryPoint": "s3://'${S3_BUCKET}'/code/pyspark/ge_profile.py",
            "entryPointArguments": ["s3://'${S3_BUCKET}'/tmp/ge-profile"],
            "sparkSubmitParameters": "--conf spark.archives=s3://'${S3_BUCKET}'/artifacts/pyspark/pyspark_ge.tar.gz#environment --conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python --conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python --conf spark.emr-serverless.executorEnv.PYSPARK_PYTHON=./environment/bin/python"
        }
    }' \
    --configuration-overrides '{
        "monitoringConfiguration": {
            "s3MonitoringConfiguration": {
                "logUri": "s3://'${S3_BUCKET}'/logs/"
            }
        }
    }'

When the job finishes, it will write a part-00000 file out to s3://${S3_BUCKET}/tmp/ge-profile.

  1. Copy and view the output
aws s3 cp s3://${S3_BUCKET}/tmp/ge-profile/part-00000 ./ge.html
open ./ge.html

PySpark jobs with Java dependencies

Sometimes you need to pull in Java dependencies like Kafka or PostgreSQL libraries. As of release label emr-6.7.0, you can use either spark.jars.packages or the --packages flag in your sparkSubmitParameters as shown below. Be sure to create your application within a VPC so that it can download the necessary dependencies.

# First create an application with release label emr-6.7.0 and a network configuration
aws emr-serverless create-application \
    --release-label emr-6.7.0 \
    --type SPARK \
    --name spark-packages \
    --network-configuration '{
        "subnetIds": ["subnet-abcdef01234567890", "subnet-abcdef01234567891"],
        "securityGroupIds": ["sg-abcdef01234567893"]
    }'

# Then submit a job (replacing the application id, arn, and your code/packages)
aws emr-serverless start-job-run \
    --name pg-query \
    --application-id $APPLICATION_ID \
    --execution-role-arn $JOB_ROLE_ARN \
    --job-driver '{
        "sparkSubmit": {
            "entryPoint": "s3://'${S3_BUCKET}'/code/pyspark/pg_query.py",
            "sparkSubmitParameters": "--packages org.postgresql:postgresql:42.4.0"
        }
    }' \
    --configuration-overrides '{
        "monitoringConfiguration": {
            "s3MonitoringConfiguration": {
                "logUri": "s3://'${S3_BUCKET}'/logs/"
            }
        }
    }'

Packaging dependencies into an uberjar

While --packages will let you easily specify additional dependencies for your job, these dependencies are not cached between job runs. In other words, each job run will need to re-fetch the dependencies potentially leading to increased startup time. To mitigate this, and to create reproducible builds, you can create a dependency uberjar and upload that to S3.

This approach can also be used with EMR release label emr-6.6.0.

To do this, we'll create a pom.xml that specifies our dependencies and use a maven Docker container to build the uberjar. In this example, we'll package org.postgresql:postgresql:42.4.0 and use the example script in ./pg_query.py to query a Postgres database.

Tip

The code in pg_query.py is for demonstration purposes only - never store credentials directly in your code. 😁

  1. Build an uberjar with your dependencies
# Enable BuildKit backend
DOCKER_BUILDKIT=1 docker build -f Dockerfile.jars --output . .

This will create a uber-jars-1.0-SNAPSHOT.jar file locally that you will copy to S3 in the next step.

  1. Copy your code and jar
aws s3 cp pg_query.py s3://${S3_BUCKET}/code/pyspark/
aws s3 cp uber-jars-1.0-SNAPSHOT.jar s3://${S3_BUCKET}/code/pyspark/jars/
  1. Set the following variables according to your environment.
export S3_BUCKET=<YOUR_S3_BUCKET_NAME>
export APPLICATION_ID=<EMR_SERVERLESS_APPLICATION_ID>
export JOB_ROLE_ARN=<EMR_SERVERLESS_IAM_ROLE>
  1. Start your job with --jars
aws emr-serverless start-job-run \
    --name pg-query \
    --application-id $APPLICATION_ID \
    --execution-role-arn $JOB_ROLE_ARN \
    --job-driver '{
        "sparkSubmit": {
            "entryPoint": "s3://'${S3_BUCKET}'/code/pyspark/pg_query.py",
            "sparkSubmitParameters": "--jars s3://'${S3_BUCKET}'/code/pyspark/jars/uber-jars-1.0-SNAPSHOT.jar"
        }
    }' \
    --configuration-overrides '{
        "monitoringConfiguration": {
            "s3MonitoringConfiguration": {
                "logUri": "s3://'${S3_BUCKET}'/logs/"
            }
        }
    }'
  1. See the output of your job!

Once your job finishes, you can copy the output locally to view the stdout.

export JOB_RUN_ID=<YOUR_JOB_RUN_ID>

aws s3 cp s3://${S3_BUCKET}/logs/applications/${APPLICATION_ID}/jobs/${JOB_RUN_ID}/SPARK_DRIVER/stdout.gz - | gunzip