Skip to content

Commit

Permalink
chore: moved modules
Browse files Browse the repository at this point in the history
  • Loading branch information
yanbasic committed Mar 26, 2024
1 parent 2d879b7 commit 2588413
Show file tree
Hide file tree
Showing 6 changed files with 9,240 additions and 30 deletions.
Binary file not shown.
51 changes: 21 additions & 30 deletions source/containers/document-pii-detection/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

import boto3
import pandas as pd
import psutil
import pyarrow as pa
import pyarrow.parquet as pq
import psutil

from parser_factory import ParserFactory

Expand Down Expand Up @@ -131,7 +131,7 @@ def batch_process_files(s3_client, bucket_name, file_info, file_category):
"total_file_size": 421226,
"total_file_count": 1,
"sample_files": [
"工商营业执照.jpg"
"xxxxx.jpg"
]
}
Expand Down Expand Up @@ -201,31 +201,29 @@ def create_glue_table(glue_client, database_name, table_name, glue_table_info):
TableInput=glue_table_info
)

print(response)


# Worker function to process a detection_files item
def worker(queue,
lock,
crawler_result_bucket_name,
destination_database,
original_file_bucket_name):
original_file_bucket_name,
region_name):
while not queue.empty():
with lock:
item = queue.get()
if item is None:
break

glue_client = boto3.client('glue', region_name='cn-northwest-1')
s3_client = boto3.client('s3', region_name='cn-northwest-1')
glue_client = boto3.client('glue', region_name=region_name)
s3_client = boto3.client('s3', region_name=region_name)
file_category = item[0]
file = item[1]
file_path = file[0]
file_info = file[1]
try:
print('Worker is processing... ' + file_path)
print('Worker is processing ' + file_path)
file_contents = batch_process_files(s3_client, original_file_bucket_name, file_info, file_category)
# print(file_contents)
# split file_contents into several dictionaries, with which contain 100 files at most
split_file_contents_list = split_dictionary(file_contents, chunk_size=100)
len_split_file_contents_list = len(split_file_contents_list)
Expand Down Expand Up @@ -276,23 +274,21 @@ def worker(queue,
create_glue_table(glue_client, destination_database, table_name, glue_table_info)

except Exception as e:
print(f"Error occured processing {file_path}. Error message: {e}")
# print_system_usage()
print(f"Error occurred processing {file_path}. Error message: {e}")
print('Worker is finished on ' + file_path)


def main(param_dict):
original_bucket_name = 'yiyan-test-s6-100partition' # param_dict['SourceBucketName']
crawler_result_bucket_name = 'sdps-agent-agents3bucket37b73ecb-veydzutbtwma' # param_dict['ResultBucketName']
region_name = 'cn-northwest-1' # param_dict['RegionName']
worker_num = psutil.cpu_count(logical=False) # param_dict['WorkerNum']
print('worker_num:' + str(worker_num))
original_bucket_name = param_dict['SourceBucketName']
crawler_result_bucket_name = param_dict['ResultBucketName']
region_name = param_dict['RegionName']
worker_num = psutil.cpu_count(logical=False)
crawler_result_object_key = f"crawler_results/{original_bucket_name}_info.json"
destination_database = f"SDPS-unstructured-{original_bucket_name}"

s3_client = boto3.client('s3', region_name='cn-northwest-1')
glue_client = boto3.client('glue', region_name='cn-northwest-1')
print('++++++++++++')
s3_client = boto3.client('s3', region_name=region_name)
glue_client = boto3.client('glue', region_name=region_name)

# 1. Create a Glue Database
try:
response = glue_client.create_database(
Expand All @@ -312,11 +308,9 @@ def main(param_dict):
)
print(f"Re-created database '{destination_database}'")

# 2. Download the crawler result from S3 and
# 2. Download the crawler result from S3
with tempfile.NamedTemporaryFile(mode='w') as temp:
temp_file_path = temp.name
print('crawler_result_bucket_name:' + crawler_result_bucket_name)
print('crawler_result_object_key:' + crawler_result_object_key)
s3_client.download_file(Bucket=crawler_result_bucket_name, Key=crawler_result_object_key,
Filename=temp_file_path)
with open(temp_file_path, 'r') as f:
Expand All @@ -334,7 +328,6 @@ def main(param_dict):
num_items += 1

processes = []
# Create up to X processes
worker_num = min(worker_num, num_items)
for _ in range(worker_num):
p = Process(
Expand All @@ -344,7 +337,8 @@ def main(param_dict):
lock,
crawler_result_bucket_name,
destination_database,
original_file_bucket_name
original_file_bucket_name,
region_name
)
)
processes.append(p)
Expand All @@ -361,17 +355,14 @@ def main(param_dict):

if __name__ == '__main__':
parser = argparse.ArgumentParser(...)
parser.add_argument('--SourceBucketName', type=str, default='icyxu-glue-assets-member-a',
parser.add_argument('--SourceBucketName', type=str, default='',
help='crawler_result_bucket_name')
parser.add_argument('--ResultBucketName', type=str, default='icyxu-glue-assets-member-a',
parser.add_argument('--ResultBucketName', type=str, default='',
help='crawler_result_bucket_name')
parser.add_argument('--RegionName', type=str, default='us-west-2',
parser.add_argument('--RegionName', type=str, default='',
help='crawler_result_object_key')
parser.add_argument('--WorkerNum', type=int, default=10,
help='worker_num')

args, _ = parser.parse_known_args()
param_dict = copy.copy(vars(args))

main(param_dict)
#
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit 2588413

Please sign in to comment.