forked from aws-samples/aws-glue-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathresolve_choice.py
47 lines (38 loc) · 2.61 KB
/
resolve_choice.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# Copyright 2016-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
# catalog: database and table name
db_name = "payments"
tbl_name = "medicare"
# s3 output directories
medicare_cast = "s3://glue-sample-target/output-dir/medicare_json_cast"
medicare_project = "s3://glue-sample-target/output-dir/medicare_json_project"
medicare_cols = "s3://glue-sample-target/output-dir/medicare_json_make_cols"
medicare_struct = "s3://glue-sample-target/output-dir/medicare_json_make_struct"
medicare_sql = "s3://glue-sample-target/output-dir/medicare_json_sql"
# Read data into a dynamic frame
medicare_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = tbl_name)
# The `provider id` field will be choice between long and string
# Cast choices into integers, those values that cannot cast result in null
medicare_res_cast = medicare_dyf.resolveChoice(specs = [('provider id','cast:long')])
medicare_res_project = medicare_dyf.resolveChoice(specs = [('provider id','project:long')])
medicare_res_make_cols = medicare_dyf.resolveChoice(specs = [('provider id','make_cols')])
medicare_res_make_struct = medicare_dyf.resolveChoice(specs = [('provider id','make_struct')])
# Spark SQL on a Spark dataframe
medicare_df = medicare_dyf.toDF()
medicare_df.createOrReplaceTempView("medicareTable")
medicare_sql_df = spark.sql("SELECT * FROM medicareTable WHERE `total discharges` > 30")
medicare_sql_dyf = DynamicFrame.fromDF(medicare_sql_df, glueContext, "medicare_sql_dyf")
# Write it out in Json
glueContext.write_dynamic_frame.from_options(frame = medicare_res_cast, connection_type = "s3", connection_options = {"path": medicare_cast}, format = "json")
glueContext.write_dynamic_frame.from_options(frame = medicare_res_project, connection_type = "s3", connection_options = {"path": medicare_project}, format = "json")
glueContext.write_dynamic_frame.from_options(frame = medicare_res_make_cols, connection_type = "s3", connection_options = {"path": medicare_cols}, format = "json")
glueContext.write_dynamic_frame.from_options(frame = medicare_res_make_struct, connection_type = "s3", connection_options = {"path": medicare_struct}, format = "json")
glueContext.write_dynamic_frame.from_options(frame = medicare_sql_dyf, connection_type = "s3", connection_options = {"path": medicare_sql}, format = "json")