Skip to content

Commit ce30bc6

Browse files
committed
feat : add serverless data pipeline scripts
0 parents  commit ce30bc6

File tree

6 files changed

+346
-0
lines changed

6 files changed

+346
-0
lines changed

.idea/.gitignore

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

data-extractor.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import json
2+
import boto3
3+
import urllib3
4+
import datetime
5+
import os
6+
7+
FIREHOSE_NAME = os.environ['FIREHOSE_NAME']
8+
9+
10+
def lambda_handler(event, context):
11+
http = urllib3.PoolManager()
12+
13+
r = http.request("POST", "https://www.cse.lk/api/aspiData")
14+
15+
# turn it into a dictionary
16+
r_dict = json.loads(r.data.decode(encoding='utf-8', errors='strict'))
17+
18+
# extract pieces of the dictionary
19+
processed_dict = {}
20+
processed_dict['id'] = r_dict['id']
21+
processed_dict['aspi_value'] = r_dict['value']
22+
processed_dict['lowValue'] = r_dict['lowValue']
23+
processed_dict['highValue'] = r_dict['highValue']
24+
processed_dict['change'] = r_dict['change']
25+
processed_dict['sectorId'] = r_dict['sectorId']
26+
processed_dict['timestamp'] = r_dict['timestamp']
27+
28+
# turn it into a string and add a newline
29+
msg = str(processed_dict) + '\n'
30+
31+
fh = boto3.client('firehose')
32+
33+
reply = fh.put_record(
34+
DeliveryStreamName=FIREHOSE_NAME,
35+
Record={
36+
'Data': msg
37+
}
38+
)
39+
40+
return msg
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import sys
2+
import boto3
3+
4+
5+
ATHENA_CLIENT = boto3.client('athena')
6+
GLUE_CLIENT = boto3.client('glue')
7+
MY_DATABASE = 'cse-aspi'
8+
9+
SOURCE_TABLE_NAME = 'cse_aspi_index_raw_data_bucket'
10+
11+
STAGE_TABLE_NAME = 'stage_cse_aspi_index_parquet_data_table'
12+
STAGE_TABLE_S3_BUCKET = 's3://cse-aspi-index-parquet-data-bucket/'
13+
14+
15+
QUERY_RESULTS_S3_BUCKET = 's3://athena-query-results-output-data-bucket/'
16+
17+
QUERY_STATUS_CODES = ["FAILED", "SUCCEEDED", "CANCELLED"]
18+
19+
20+
def create_stage_table():
21+
queryStart = ATHENA_CLIENT.start_query_execution(
22+
QueryString=f"""
23+
CREATE TABLE "{STAGE_TABLE_NAME}" WITH
24+
(external_location='{STAGE_TABLE_S3_BUCKET}',
25+
format='PARQUET',
26+
write_compression='SNAPPY',
27+
partitioned_by = ARRAY['year', 'month', 'day']
28+
)
29+
AS
30+
SELECT
31+
id AS cse_aspi_index_id
32+
,aspi_value as cse_aspi_value
33+
,lowValue as cse_aspi_low_value
34+
,highValue as cse_aspi_high_value
35+
,change as cse_aspi_change
36+
,sectorId as cse_aspi_sector_id
37+
,timestamp as cse_aspi_timestamp
38+
,cast(from_unixtime(timestamp / 1000) AT TIME ZONE 'Asia/Kolkata' as timestamp) AS ist_timestamp
39+
,year(CAST(from_unixtime(timestamp / 1000) AT TIME ZONE 'Asia/Kolkata' AS timestamp)) AS year
40+
,month(CAST(from_unixtime(timestamp / 1000) AT TIME ZONE 'Asia/Kolkata' AS timestamp)) AS month
41+
,day(CAST(from_unixtime(timestamp / 1000) AT TIME ZONE 'Asia/Kolkata' AS timestamp)) AS day
42+
FROM "{MY_DATABASE}"."{SOURCE_TABLE_NAME}"
43+
;
44+
""",
45+
QueryExecutionContext={
46+
'Database': f'{MY_DATABASE}'
47+
},
48+
ResultConfiguration={'OutputLocation': f'{QUERY_RESULTS_S3_BUCKET}'}
49+
)
50+
51+
res = ATHENA_CLIENT.get_query_execution(QueryExecutionId=queryStart["QueryExecutionId"])
52+
53+
while res["QueryExecution"]["Status"]["State"] not in QUERY_STATUS_CODES:
54+
res = ATHENA_CLIENT.get_query_execution(QueryExecutionId=queryStart["QueryExecutionId"])
55+
56+
return res
57+
58+
59+
def drop_table_if_exists():
60+
try:
61+
GLUE_CLIENT.get_table(DatabaseName=MY_DATABASE, Name=STAGE_TABLE_NAME)
62+
GLUE_CLIENT.delete_table(DatabaseName=MY_DATABASE, Name=STAGE_TABLE_NAME)
63+
print(f"Dropped existing table: {STAGE_TABLE_NAME}")
64+
except GLUE_CLIENT.exceptions.EntityNotFoundException:
65+
print(f"No existing table found: {STAGE_TABLE_NAME}")
66+
67+
68+
def response_handler(response):
69+
if response["QueryExecution"]["Status"]["State"] == 'FAILED':
70+
print(f"Error creating stage table: {response}")
71+
sys.exit(response["QueryExecution"]["Status"]["StateChangeReason"])
72+
else:
73+
print(f"Stage table {STAGE_TABLE_NAME} created successfully")
74+
75+
76+
if __name__ == '__main__':
77+
drop_table_if_exists()
78+
response_handler(create_stage_table())
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import sys
2+
import boto3
3+
import awswrangler as wr
4+
5+
ATHENA_CLIENT = wr.athena
6+
GLUE_CLIENT = boto3.client('glue')
7+
MY_DATABASE = 'cse-aspi'
8+
9+
STAGE_TABLE_NAME = 'stage_cse_aspi_index_parquet_data_table'
10+
11+
QUERY_RESULTS_S3_BUCKET = 's3://athena-query-results-output-data-bucket/'
12+
13+
QUERY_STATUS_CODES = ["FAILED", "SUCCEEDED", "CANCELLED"]
14+
15+
16+
def data_quality_evaluator() -> None:
17+
NULL_DQ_CHECK = f"""
18+
SELECT
19+
COUNT(CASE WHEN cse_aspi_index_id IS NULL THEN 1 END) +
20+
COUNT(CASE WHEN cse_aspi_value IS NULL THEN 1 END) +
21+
COUNT(CASE WHEN cse_aspi_low_value IS NULL THEN 1 END) +
22+
COUNT(CASE WHEN cse_aspi_high_value IS NULL THEN 1 END) +
23+
COUNT(CASE WHEN cse_aspi_change IS NULL THEN 1 END) +
24+
COUNT(CASE WHEN cse_aspi_sector_id IS NULL THEN 1 END) +
25+
COUNT(CASE WHEN cse_aspi_timestamp IS NULL THEN 1 END) +
26+
COUNT(CASE WHEN ist_timestamp IS NULL THEN 1 END) AS total_null_values
27+
FROM {STAGE_TABLE_NAME};
28+
"""
29+
30+
df = ATHENA_CLIENT.read_sql_query(sql=NULL_DQ_CHECK, database=MY_DATABASE)
31+
32+
# exit if we get a result > 0
33+
# else, the check was successful
34+
if df['total_null_values'][0] > 0:
35+
sys.exit('Results returned. Quality check failed.')
36+
else:
37+
print('Quality check passed.')
38+
39+
40+
if __name__ == '__main__':
41+
data_quality_evaluator()

glue-scripts/03_create_prod_table.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import boto3
2+
from botocore.exceptions import ClientError
3+
import sys
4+
5+
ATHENA_CLIENT = boto3.client('athena')
6+
GLUE_CLIENT = boto3.client('glue')
7+
MY_DATABASE = 'cse-aspi'
8+
PROD_TABLE_NAME = 'prod_cse_aspi_index_data_table'
9+
PROD_TABLE_S3_BUCKET = 's3://cse-aspi-index-prod-table-data-bucket/'
10+
QUERY_RESULTS_S3_BUCKET = 's3://athena-query-results-output-data-bucket/'
11+
QUERY_STATUS_CODES = ["FAILED", "SUCCEEDED", "CANCELLED"]
12+
13+
14+
def check_table_exists():
15+
try:
16+
GLUE_CLIENT.get_table(DatabaseName=MY_DATABASE, Name=PROD_TABLE_NAME)
17+
return True
18+
except ClientError as e:
19+
if e.response['Error']['Code'] == 'EntityNotFoundException':
20+
return False
21+
else:
22+
raise e # Re-raise unexpected errors for handling
23+
24+
25+
def create_production_table():
26+
query = f"""
27+
CREATE TABLE {PROD_TABLE_NAME} (
28+
cse_aspi_index_id int,
29+
cse_aspi_value float,
30+
cse_aspi_low_value float,
31+
cse_aspi_high_value float,
32+
cse_aspi_change float,
33+
cse_aspi_sector_id int,
34+
ist_timestamp timestamp
35+
)
36+
LOCATION '{PROD_TABLE_S3_BUCKET}'
37+
TBLPROPERTIES ( 'table_type' = 'ICEBERG' );
38+
"""
39+
40+
try:
41+
query_start = ATHENA_CLIENT.start_query_execution(
42+
QueryString=query,
43+
QueryExecutionContext={
44+
'Database': MY_DATABASE
45+
},
46+
ResultConfiguration={'OutputLocation': QUERY_RESULTS_S3_BUCKET}
47+
)
48+
response = ATHENA_CLIENT.get_query_execution(QueryExecutionId=query_start["QueryExecutionId"])
49+
50+
while response["QueryExecution"]["Status"]["State"] not in QUERY_STATUS_CODES:
51+
response = ATHENA_CLIENT.get_query_execution(QueryExecutionId=query_start["QueryExecutionId"])
52+
53+
return response
54+
except ClientError as e:
55+
print(f"Error creating production table: {e}")
56+
sys.exit(1)
57+
58+
59+
if __name__ == '__main__':
60+
if check_table_exists():
61+
print(f"Table {PROD_TABLE_NAME} already exists")
62+
else:
63+
print(f"Creating table {PROD_TABLE_NAME}")
64+
response = create_production_table()
65+
66+
if response["QueryExecution"]["Status"]["State"] == 'FAILED':
67+
sys.exit(response["QueryExecution"]["Status"]["StateChangeReason"])
68+
69+
print("Table creation successful")

glue-scripts/04_publish_prod_table.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import boto3
2+
from botocore.exceptions import ClientError
3+
import sys
4+
5+
ATHENA_CLIENT = boto3.client('athena')
6+
GLUE_CLIENT = boto3.client('glue')
7+
MY_DATABASE = 'cse-aspi'
8+
9+
PROD_TABLE_NAME = 'prod_cse_aspi_index_data_table'
10+
PROD_TABLE_S3_BUCKET = 's3://cse-aspi-index-prod-table-data-bucket/'
11+
12+
STAGE_TABLE_NAME = 'stage_cse_aspi_index_parquet_data_table'
13+
STAGE_TABLE_S3_BUCKET = 's3://cse-aspi-index-parquet-data-bucket/'
14+
15+
16+
QUERY_RESULTS_S3_BUCKET = 's3://athena-query-results-output-data-bucket/'
17+
QUERY_STATUS_CODES = ["FAILED", "SUCCEEDED", "CANCELLED"]
18+
19+
20+
def merge_data_operation():
21+
query = f"""
22+
MERGE INTO {PROD_TABLE_NAME} AS target
23+
USING (
24+
-- Select the unique records with the latest timestamp
25+
WITH LatestRecords AS (
26+
SELECT
27+
cse_aspi_index_id,
28+
cse_aspi_value,
29+
cse_aspi_low_value,
30+
cse_aspi_high_value,
31+
cse_aspi_change,
32+
cse_aspi_sector_id,
33+
CAST(ist_timestamp AS TIMESTAMP(6)) AS ist_timestamp,
34+
ROW_NUMBER() OVER (PARTITION BY cse_aspi_index_id ORDER BY cse_aspi_timestamp DESC) AS rn
35+
FROM {STAGE_TABLE_NAME}
36+
)
37+
SELECT
38+
cse_aspi_index_id,
39+
cse_aspi_value,
40+
cse_aspi_low_value,
41+
cse_aspi_high_value,
42+
cse_aspi_change,
43+
cse_aspi_sector_id,
44+
ist_timestamp
45+
FROM LatestRecords
46+
WHERE rn = 1
47+
) AS source
48+
ON target.cse_aspi_index_id = source.cse_aspi_index_id
49+
WHEN MATCHED THEN
50+
UPDATE SET
51+
cse_aspi_value = source.cse_aspi_value,
52+
cse_aspi_low_value = source.cse_aspi_low_value,
53+
cse_aspi_high_value = source.cse_aspi_high_value,
54+
cse_aspi_change = source.cse_aspi_change,
55+
cse_aspi_sector_id = source.cse_aspi_sector_id,
56+
ist_timestamp = source.ist_timestamp
57+
WHEN NOT MATCHED THEN
58+
INSERT (
59+
cse_aspi_index_id,
60+
cse_aspi_value,
61+
cse_aspi_low_value,
62+
cse_aspi_high_value,
63+
cse_aspi_change,
64+
cse_aspi_sector_id,
65+
ist_timestamp
66+
) VALUES (
67+
source.cse_aspi_index_id,
68+
source.cse_aspi_value,
69+
source.cse_aspi_low_value,
70+
source.cse_aspi_high_value,
71+
source.cse_aspi_change,
72+
source.cse_aspi_sector_id,
73+
source.ist_timestamp
74+
);
75+
;
76+
"""
77+
try:
78+
query_start = ATHENA_CLIENT.start_query_execution(
79+
QueryString=query,
80+
QueryExecutionContext={
81+
'Database': MY_DATABASE
82+
},
83+
ResultConfiguration={'OutputLocation': QUERY_RESULTS_S3_BUCKET}
84+
)
85+
response = ATHENA_CLIENT.get_query_execution(QueryExecutionId=query_start["QueryExecutionId"])
86+
87+
while response["QueryExecution"]["Status"]["State"] not in QUERY_STATUS_CODES:
88+
response = ATHENA_CLIENT.get_query_execution(QueryExecutionId=query_start["QueryExecutionId"])
89+
90+
return response
91+
except ClientError as e:
92+
print(f"Error creating production table: {e}")
93+
sys.exit(1)
94+
95+
96+
if __name__ == '__main__':
97+
98+
response = merge_data_operation()
99+
100+
if response["QueryExecution"]["Status"]["State"] == 'FAILED':
101+
sys.exit(response["QueryExecution"]["Status"]["StateChangeReason"])
102+
103+
print(f"Table {PROD_TABLE_NAME} Refreshed successfully")

0 commit comments

Comments
 (0)