diff --git a/source/constructs/config/job/script/glue-job-unstructured.py b/source/constructs/config/job/script/glue-job-unstructured.py index 8d3626c8..a5a63129 100644 --- a/source/constructs/config/job/script/glue-job-unstructured.py +++ b/source/constructs/config/job/script/glue-job-unstructured.py @@ -1,4 +1,4 @@ -''' +""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,47 +12,65 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -''' +""" import os import sys -import boto3 from functools import reduce -from pyspark.context import SparkContext -from pyspark.sql import DataFrame +import boto3 import pyspark.sql.functions as sf -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from awsglue.job import Job - -from data_source.get_tables import get_tables +from awsglue.transforms import * +from awsglue.utils import getResolvedOptions from data_source.construct_dataframe import construct_dataframe +from data_source.get_tables import get_tables +from pyspark.context import SparkContext +from pyspark.sql import DataFrame from template.template_utils import get_template from unstructured_detection.detection_utils import add_metadata, get_table_info from unstructured_detection.main_detection import detect_df - if __name__ == "__main__": """ This script is used to perform PII detection on using Glue Data Catalog. """ # Get all input arguments - s3 = boto3.client(service_name='s3') - glue = boto3.client(service_name='glue') - result_database = 'sdps_database' - result_table = 'job_detection_output_table' - - args = getResolvedOptions(sys.argv, ["AccountId", 'Region','JOB_NAME', 'DatabaseName', 'JobId', 'RunId', - 'RunDatabaseId', 'AdminBucketName', 'TemplateId', 'TemplateSnapshotNo', 'BaseTime', - 'TableBegin', 'TableEnd', 'TableName', 'IncludeKeywords', 'ExcludeKeywords']) - args['DatabaseType'] = 's3_unstructured' + s3 = boto3.client(service_name="s3") + glue = boto3.client(service_name="glue") + result_database = "sdps_database" + result_table = "job_detection_output_table" + + args = getResolvedOptions( + sys.argv, + [ + "AccountId", + "Region", + "JOB_NAME", + "DatabaseName", + "JobId", + "RunId", + "RunDatabaseId", + "AdminBucketName", + "TemplateId", + "TemplateSnapshotNo", + "BaseTime", + "TableBegin", + "TableEnd", + "TableName", + "IncludeKeywords", + "ExcludeKeywords", + ], + ) + args["DatabaseType"] = "s3_unstructured" full_database_name = f"SDPS-unstructured-{args['DatabaseName']}" output_path = f"s3://{args['AdminBucketName']}/glue-database/{result_table}/" - error_path = f"s3://{args['AdminBucketName']}/glue-database/job_detection_error_table/" + error_path = ( + f"s3://{args['AdminBucketName']}/glue-database/job_detection_error_table/" + ) # Create spark and glue context sc = SparkContext() @@ -66,7 +84,9 @@ num_crawler_tables = len(crawler_tables) # Get template from s3 and broadcast it - template = get_template(s3, args['AdminBucketName'], args['TemplateId'], args['TemplateSnapshotNo']) + template = get_template( + s3, args["AdminBucketName"], args["TemplateId"], args["TemplateSnapshotNo"] + ) broadcast_template = sc.broadcast(template) output = [] @@ -74,52 +94,60 @@ save_freq = 10 for table_index, table in enumerate(crawler_tables): try: - # call detect_table to perform PII detection + # call detect_table to perform PII detection print(f"Detecting table {table['Name']}") raw_df = construct_dataframe(glueContext, glue, table, args) # raw_df.show() - detection_result = detect_df(raw_df, glueContext, broadcast_template, table, args) + detection_result = detect_df( + raw_df, glueContext, broadcast_template, table, args + ) summarized_result = add_metadata(detection_result, table, args) summarized_result.show() output.append(summarized_result) - + except Exception as e: # Report error if failed basic_table_info = get_table_info(table, args) data = { - 'account_id': args["AccountId"], - 'region': args["Region"], - 'job_id': args['JobId'], - 'run_id': args['RunId'], - 'run_database_id': args['RunDatabaseId'], - 'database_name': args['DatabaseName'], - 'database_type': args['DatabaseType'], - 'table_name': table['Name'], - 'location': basic_table_info['location'], - 's3_location': basic_table_info['s3_location'], - 's3_bucket': basic_table_info['s3_bucket'], - 'rds_instance_id': basic_table_info['rds_instance_id'], - 'error_message': str(e) + "account_id": args["AccountId"], + "region": args["Region"], + "job_id": args["JobId"], + "run_id": args["RunId"], + "run_database_id": args["RunDatabaseId"], + "database_name": args["DatabaseName"], + "database_type": args["DatabaseType"], + "table_name": table["Name"], + "location": basic_table_info["location"], + "s3_location": basic_table_info["s3_location"], + "s3_bucket": basic_table_info["s3_bucket"], + "rds_instance_id": basic_table_info["rds_instance_id"], + "error_message": str(e), } error.append(data) - print(f'Error occured detecting table {table}') + print(f"Error occured detecting table {table}") print(e) - - if (table_index + 1) % save_freq == 0 or (table_index + 1) == num_crawler_tables: + + if (table_index + 1) % save_freq == 0 or ( + table_index + 1 + ) == num_crawler_tables: # Save detection result to s3. if output: df = reduce(DataFrame.unionAll, output) - df = df.repartition(1, 'year', 'month', 'day') + df = df.repartition(1, "year", "month", "day") # df.show() - df.write.partitionBy('year', 'month', 'day').mode('append').parquet(output_path) + df.write.partitionBy("year", "month", "day").mode("append").parquet( + output_path + ) # If error in detect_table, save to error_path if error: df = spark.createDataFrame(error) - df.withColumn('update_time', sf.from_utc_timestamp(sf.current_timestamp(), 'UTC')) + df.withColumn( + "update_time", sf.from_utc_timestamp(sf.current_timestamp(), "UTC") + ) df = df.repartition(1) - df.write.mode('append').parquet(error_path) - + df.write.mode("append").parquet(error_path) + output = [] error = [] diff --git a/source/constructs/config/job/script/glue-job.py b/source/constructs/config/job/script/glue-job.py index 7fa8d347..91a9b2ea 100644 --- a/source/constructs/config/job/script/glue-job.py +++ b/source/constructs/config/job/script/glue-job.py @@ -1,4 +1,4 @@ -''' +""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,27 +12,25 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -''' +""" import os import sys -import boto3 from functools import reduce -from pyspark.context import SparkContext -from pyspark.sql import DataFrame +import boto3 import pyspark.sql.functions as sf -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from awsglue.job import Job - -from data_source.get_tables import get_tables +from awsglue.transforms import * +from awsglue.utils import getResolvedOptions from data_source.construct_dataframe import construct_dataframe -from template.template_utils import get_template +from data_source.get_tables import get_tables +from pyspark.context import SparkContext +from pyspark.sql import DataFrame from structured_detection.detection_utils import add_metadata, get_table_info from structured_detection.main_detection import detect_df - +from template.template_utils import get_template if __name__ == "__main__": """ @@ -40,18 +38,41 @@ """ # Get all input arguments - s3 = boto3.client(service_name='s3') - glue = boto3.client(service_name='glue') - result_database = 'sdps_database' - result_table = 'job_detection_output_table' - - args = getResolvedOptions(sys.argv, ["AccountId", 'Region', 'JOB_NAME', 'DatabaseName', 'GlueDatabaseName', - 'DatabaseType', 'Depth', 'DetectionThreshold', 'JobId', 'RunId', 'RunDatabaseId', - 'TemplateId', 'TemplateSnapshotNo', 'AdminBucketName', 'BaseTime', 'TableBegin', 'TableEnd', - 'TableName', 'IncludeKeywords', 'ExcludeKeywords']) + s3 = boto3.client(service_name="s3") + glue = boto3.client(service_name="glue") + result_database = "sdps_database" + result_table = "job_detection_output_table" + + args = getResolvedOptions( + sys.argv, + [ + "AccountId", + "Region", + "JOB_NAME", + "DatabaseName", + "GlueDatabaseName", + "DatabaseType", + "Depth", + "DetectionThreshold", + "JobId", + "RunId", + "RunDatabaseId", + "TemplateId", + "TemplateSnapshotNo", + "AdminBucketName", + "BaseTime", + "TableBegin", + "TableEnd", + "TableName", + "IncludeKeywords", + "ExcludeKeywords", + ], + ) output_path = f"s3://{args['AdminBucketName']}/glue-database/{result_table}/" - error_path = f"s3://{args['AdminBucketName']}/glue-database/job_detection_error_table/" + error_path = ( + f"s3://{args['AdminBucketName']}/glue-database/job_detection_error_table/" + ) # Create spark and glue context sc = SparkContext() @@ -65,7 +86,9 @@ num_crawler_tables = len(crawler_tables) # Get template from s3 and broadcast it - template = get_template(s3, args['AdminBucketName'], args['TemplateId'], args['TemplateSnapshotNo']) + template = get_template( + s3, args["AdminBucketName"], args["TemplateId"], args["TemplateSnapshotNo"] + ) broadcast_template = sc.broadcast(template) output = [] @@ -73,51 +96,57 @@ save_freq = 10 for table_index, table in enumerate(crawler_tables): try: - # call detect_table to perform PII detection + # call detect_table to perform PII detection print(f"Detecting table {table['Name']}") raw_df = construct_dataframe(glueContext, glue, table, args) detection_result = detect_df(raw_df, glueContext, broadcast_template, args) summarized_result = add_metadata(detection_result, table, args) summarized_result.show() output.append(summarized_result) - + except Exception as e: # Report error if failed basic_table_info = get_table_info(table, args) data = { - 'account_id': args["AccountId"], - 'region': args["Region"], - 'job_id': args['JobId'], - 'run_id': args['RunId'], - 'run_database_id': args['RunDatabaseId'], - 'database_name': args['DatabaseName'], - 'database_type': args['DatabaseType'], - 'table_name': table['Name'], - 'location': basic_table_info['location'], - 's3_location': basic_table_info['s3_location'], - 's3_bucket': basic_table_info['s3_bucket'], - 'rds_instance_id': basic_table_info['rds_instance_id'], - 'error_message': str(e) + "account_id": args["AccountId"], + "region": args["Region"], + "job_id": args["JobId"], + "run_id": args["RunId"], + "run_database_id": args["RunDatabaseId"], + "database_name": args["DatabaseName"], + "database_type": args["DatabaseType"], + "table_name": table["Name"], + "location": basic_table_info["location"], + "s3_location": basic_table_info["s3_location"], + "s3_bucket": basic_table_info["s3_bucket"], + "rds_instance_id": basic_table_info["rds_instance_id"], + "error_message": str(e), } error.append(data) - print(f'Error occured detecting table {table}') + print(f"Error occured detecting table {table}") print(e) - - if (table_index + 1) % save_freq == 0 or (table_index + 1) == num_crawler_tables: + + if (table_index + 1) % save_freq == 0 or ( + table_index + 1 + ) == num_crawler_tables: # Save detection result to s3. if output: df = reduce(DataFrame.unionAll, output) - df = df.repartition(1, 'year', 'month', 'day') + df = df.repartition(1, "year", "month", "day") # df.show() - df.write.partitionBy('year', 'month', 'day').mode('append').parquet(output_path) + df.write.partitionBy("year", "month", "day").mode("append").parquet( + output_path + ) # If error in detect_table, save to error_path if error: df = spark.createDataFrame(error) - df.withColumn('update_time', sf.from_utc_timestamp(sf.current_timestamp(), 'UTC')) + df.withColumn( + "update_time", sf.from_utc_timestamp(sf.current_timestamp(), "UTC") + ) df = df.repartition(1) - df.write.mode('append').parquet(error_path) - + df.write.mode("append").parquet(error_path) + output = [] error = [] diff --git a/source/constructs/config/job/script/job_extra_files.zip b/source/constructs/config/job/script/job_extra_files.zip index b895bc51..cd078400 100644 Binary files a/source/constructs/config/job/script/job_extra_files.zip and b/source/constructs/config/job/script/job_extra_files.zip differ