Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(be): update glue script supporting luhn checksum #593

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 73 additions & 45 deletions source/constructs/config/job/script/glue-job-unstructured.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
'''
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -12,47 +12,65 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
"""

import os
import sys
import boto3
from functools import reduce

from pyspark.context import SparkContext
from pyspark.sql import DataFrame
import boto3
import pyspark.sql.functions as sf
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job

from data_source.get_tables import get_tables
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from data_source.construct_dataframe import construct_dataframe
from data_source.get_tables import get_tables
from pyspark.context import SparkContext
from pyspark.sql import DataFrame
from template.template_utils import get_template
from unstructured_detection.detection_utils import add_metadata, get_table_info
from unstructured_detection.main_detection import detect_df


if __name__ == "__main__":
"""
This script is used to perform PII detection on using Glue Data Catalog.
"""

# Get all input arguments
s3 = boto3.client(service_name='s3')
glue = boto3.client(service_name='glue')
result_database = 'sdps_database'
result_table = 'job_detection_output_table'

args = getResolvedOptions(sys.argv, ["AccountId", 'Region','JOB_NAME', 'DatabaseName', 'JobId', 'RunId',
'RunDatabaseId', 'AdminBucketName', 'TemplateId', 'TemplateSnapshotNo', 'BaseTime',
'TableBegin', 'TableEnd', 'TableName', 'IncludeKeywords', 'ExcludeKeywords'])
args['DatabaseType'] = 's3_unstructured'
s3 = boto3.client(service_name="s3")
glue = boto3.client(service_name="glue")
result_database = "sdps_database"
result_table = "job_detection_output_table"

args = getResolvedOptions(
sys.argv,
[
"AccountId",
"Region",
"JOB_NAME",
"DatabaseName",
"JobId",
"RunId",
"RunDatabaseId",
"AdminBucketName",
"TemplateId",
"TemplateSnapshotNo",
"BaseTime",
"TableBegin",
"TableEnd",
"TableName",
"IncludeKeywords",
"ExcludeKeywords",
],
)
args["DatabaseType"] = "s3_unstructured"

full_database_name = f"SDPS-unstructured-{args['DatabaseName']}"
output_path = f"s3://{args['AdminBucketName']}/glue-database/{result_table}/"
error_path = f"s3://{args['AdminBucketName']}/glue-database/job_detection_error_table/"
error_path = (
f"s3://{args['AdminBucketName']}/glue-database/job_detection_error_table/"
)

# Create spark and glue context
sc = SparkContext()
Expand All @@ -66,60 +84,70 @@
num_crawler_tables = len(crawler_tables)

# Get template from s3 and broadcast it
template = get_template(s3, args['AdminBucketName'], args['TemplateId'], args['TemplateSnapshotNo'])
template = get_template(
s3, args["AdminBucketName"], args["TemplateId"], args["TemplateSnapshotNo"]
)
broadcast_template = sc.broadcast(template)

output = []
error = []
save_freq = 10
for table_index, table in enumerate(crawler_tables):
try:
# call detect_table to perform PII detection
# call detect_table to perform PII detection
print(f"Detecting table {table['Name']}")
raw_df = construct_dataframe(glueContext, glue, table, args)
# raw_df.show()
detection_result = detect_df(raw_df, glueContext, broadcast_template, table, args)
detection_result = detect_df(
raw_df, glueContext, broadcast_template, table, args
)
summarized_result = add_metadata(detection_result, table, args)
summarized_result.show()
output.append(summarized_result)

except Exception as e:
# Report error if failed
basic_table_info = get_table_info(table, args)
data = {
'account_id': args["AccountId"],
'region': args["Region"],
'job_id': args['JobId'],
'run_id': args['RunId'],
'run_database_id': args['RunDatabaseId'],
'database_name': args['DatabaseName'],
'database_type': args['DatabaseType'],
'table_name': table['Name'],
'location': basic_table_info['location'],
's3_location': basic_table_info['s3_location'],
's3_bucket': basic_table_info['s3_bucket'],
'rds_instance_id': basic_table_info['rds_instance_id'],
'error_message': str(e)
"account_id": args["AccountId"],
"region": args["Region"],
"job_id": args["JobId"],
"run_id": args["RunId"],
"run_database_id": args["RunDatabaseId"],
"database_name": args["DatabaseName"],
"database_type": args["DatabaseType"],
"table_name": table["Name"],
"location": basic_table_info["location"],
"s3_location": basic_table_info["s3_location"],
"s3_bucket": basic_table_info["s3_bucket"],
"rds_instance_id": basic_table_info["rds_instance_id"],
"error_message": str(e),
}
error.append(data)
print(f'Error occured detecting table {table}')
print(f"Error occured detecting table {table}")
print(e)

if (table_index + 1) % save_freq == 0 or (table_index + 1) == num_crawler_tables:

if (table_index + 1) % save_freq == 0 or (
table_index + 1
) == num_crawler_tables:
# Save detection result to s3.
if output:
df = reduce(DataFrame.unionAll, output)
df = df.repartition(1, 'year', 'month', 'day')
df = df.repartition(1, "year", "month", "day")
# df.show()
df.write.partitionBy('year', 'month', 'day').mode('append').parquet(output_path)
df.write.partitionBy("year", "month", "day").mode("append").parquet(
output_path
)

# If error in detect_table, save to error_path
if error:
df = spark.createDataFrame(error)
df.withColumn('update_time', sf.from_utc_timestamp(sf.current_timestamp(), 'UTC'))
df.withColumn(
"update_time", sf.from_utc_timestamp(sf.current_timestamp(), "UTC")
)
df = df.repartition(1)
df.write.mode('append').parquet(error_path)
df.write.mode("append").parquet(error_path)

output = []
error = []

Expand Down
119 changes: 74 additions & 45 deletions source/constructs/config/job/script/glue-job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
'''
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -12,46 +12,67 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
"""

import os
import sys
import boto3
from functools import reduce

from pyspark.context import SparkContext
from pyspark.sql import DataFrame
import boto3
import pyspark.sql.functions as sf
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job

from data_source.get_tables import get_tables
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from data_source.construct_dataframe import construct_dataframe
from template.template_utils import get_template
from data_source.get_tables import get_tables
from pyspark.context import SparkContext
from pyspark.sql import DataFrame
from structured_detection.detection_utils import add_metadata, get_table_info
from structured_detection.main_detection import detect_df

from template.template_utils import get_template

if __name__ == "__main__":
"""
This script is used to perform PII detection on using Glue Data Catalog.
"""

# Get all input arguments
s3 = boto3.client(service_name='s3')
glue = boto3.client(service_name='glue')
result_database = 'sdps_database'
result_table = 'job_detection_output_table'

args = getResolvedOptions(sys.argv, ["AccountId", 'Region', 'JOB_NAME', 'DatabaseName', 'GlueDatabaseName',
'DatabaseType', 'Depth', 'DetectionThreshold', 'JobId', 'RunId', 'RunDatabaseId',
'TemplateId', 'TemplateSnapshotNo', 'AdminBucketName', 'BaseTime', 'TableBegin', 'TableEnd',
'TableName', 'IncludeKeywords', 'ExcludeKeywords'])
s3 = boto3.client(service_name="s3")
glue = boto3.client(service_name="glue")
result_database = "sdps_database"
result_table = "job_detection_output_table"

args = getResolvedOptions(
sys.argv,
[
"AccountId",
"Region",
"JOB_NAME",
"DatabaseName",
"GlueDatabaseName",
"DatabaseType",
"Depth",
"DetectionThreshold",
"JobId",
"RunId",
"RunDatabaseId",
"TemplateId",
"TemplateSnapshotNo",
"AdminBucketName",
"BaseTime",
"TableBegin",
"TableEnd",
"TableName",
"IncludeKeywords",
"ExcludeKeywords",
],
)

output_path = f"s3://{args['AdminBucketName']}/glue-database/{result_table}/"
error_path = f"s3://{args['AdminBucketName']}/glue-database/job_detection_error_table/"
error_path = (
f"s3://{args['AdminBucketName']}/glue-database/job_detection_error_table/"
)

# Create spark and glue context
sc = SparkContext()
Expand All @@ -65,59 +86,67 @@
num_crawler_tables = len(crawler_tables)

# Get template from s3 and broadcast it
template = get_template(s3, args['AdminBucketName'], args['TemplateId'], args['TemplateSnapshotNo'])
template = get_template(
s3, args["AdminBucketName"], args["TemplateId"], args["TemplateSnapshotNo"]
)
broadcast_template = sc.broadcast(template)

output = []
error = []
save_freq = 10
for table_index, table in enumerate(crawler_tables):
try:
# call detect_table to perform PII detection
# call detect_table to perform PII detection
print(f"Detecting table {table['Name']}")
raw_df = construct_dataframe(glueContext, glue, table, args)
detection_result = detect_df(raw_df, glueContext, broadcast_template, args)
summarized_result = add_metadata(detection_result, table, args)
summarized_result.show()
output.append(summarized_result)

except Exception as e:
# Report error if failed
basic_table_info = get_table_info(table, args)
data = {
'account_id': args["AccountId"],
'region': args["Region"],
'job_id': args['JobId'],
'run_id': args['RunId'],
'run_database_id': args['RunDatabaseId'],
'database_name': args['DatabaseName'],
'database_type': args['DatabaseType'],
'table_name': table['Name'],
'location': basic_table_info['location'],
's3_location': basic_table_info['s3_location'],
's3_bucket': basic_table_info['s3_bucket'],
'rds_instance_id': basic_table_info['rds_instance_id'],
'error_message': str(e)
"account_id": args["AccountId"],
"region": args["Region"],
"job_id": args["JobId"],
"run_id": args["RunId"],
"run_database_id": args["RunDatabaseId"],
"database_name": args["DatabaseName"],
"database_type": args["DatabaseType"],
"table_name": table["Name"],
"location": basic_table_info["location"],
"s3_location": basic_table_info["s3_location"],
"s3_bucket": basic_table_info["s3_bucket"],
"rds_instance_id": basic_table_info["rds_instance_id"],
"error_message": str(e),
}
error.append(data)
print(f'Error occured detecting table {table}')
print(f"Error occured detecting table {table}")
print(e)

if (table_index + 1) % save_freq == 0 or (table_index + 1) == num_crawler_tables:

if (table_index + 1) % save_freq == 0 or (
table_index + 1
) == num_crawler_tables:
# Save detection result to s3.
if output:
df = reduce(DataFrame.unionAll, output)
df = df.repartition(1, 'year', 'month', 'day')
df = df.repartition(1, "year", "month", "day")
# df.show()
df.write.partitionBy('year', 'month', 'day').mode('append').parquet(output_path)
df.write.partitionBy("year", "month", "day").mode("append").parquet(
output_path
)

# If error in detect_table, save to error_path
if error:
df = spark.createDataFrame(error)
df.withColumn('update_time', sf.from_utc_timestamp(sf.current_timestamp(), 'UTC'))
df.withColumn(
"update_time", sf.from_utc_timestamp(sf.current_timestamp(), "UTC")
)
df = df.repartition(1)
df.write.mode('append').parquet(error_path)
df.write.mode("append").parquet(error_path)

output = []
error = []

Expand Down
Binary file modified source/constructs/config/job/script/job_extra_files.zip
Binary file not shown.
Loading