Skip to content

Commit c68b420

Browse files
committed
modified README.md and added etl pipeline code
1 parent 5c39335 commit c68b420

File tree

4 files changed

+178
-1
lines changed

4 files changed

+178
-1
lines changed

README.md

+12-1
Original file line numberDiff line numberDiff line change
@@ -1 +1,12 @@
1-
# aws_streaming_pipeline
1+
## AWS Streaming Pipeline for real-time Cryptocurrency Price analysis
2+
3+
This AWS streaming pipeline ingests real-time Cryptocurrency price data from the CoinGecko API, transform it for analysis and stores it in a ready-to-use format. The pipeline leverage the following services:
4+
- **AWS Kinesis and Firehose**: Continuously streams data from the CoinGecko API in real-time.I also implement data partitioning within Firehose to improve performance.
5+
- **Amazon S3**: Serves as the data lake for storing the raw cryptocurrency price data.
6+
- **AWS Glue**: Provides a job to transform the raw data into a schema optimized for analytics.
7+
- **AWS Lambda**: Acts as an event-driven trigger, initiating the Glue job whenever new data arrives in the S3 raw layer.
8+
- **Amazon SNS**: Publishes notifications (in SNS's topic) about new data arrivals in S3, which are then picked up by the Lambda function.
9+
10+
This architecture ensures that the pipeline automatically processes incoming data, keeping the analytical layer up-to-date with the lastest information.
11+
12+
![Pipeline Architecture](images/architecture.png)
+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import requests
2+
from dotenv import load_dotenv
3+
import os
4+
import json
5+
import boto3
6+
import asyncio
7+
from aiohttp import ClientSession
8+
import time
9+
import uuid
10+
11+
def extract_subset(actual_json_dictionary, selected_columns):
12+
extract_json_dict=[]
13+
extracted_json = {}
14+
for entry in actual_json_dictionary:
15+
for column in selected_columns:
16+
extracted_json[column]=entry[column]
17+
extract_json_dict.append(extracted_json)
18+
return extract_json_dict
19+
20+
async def fetch_market_cap_data(currency, api_key, queue):
21+
try: # added this to cancel the task b runnin the task.cancel
22+
while True:
23+
print("Fetching data...")
24+
url = f"https://api.coingecko.com/api/v3/coins/markets?vs_currency={currency}&x_cg_demo_api_key={api_key}"
25+
async with ClientSession() as session:
26+
async with session.get(url) as response:
27+
if response.status == 200:
28+
# success
29+
market_cap_data = await response.json()
30+
# put data in queue
31+
print("Adding item to the queue...")
32+
await queue.put(market_cap_data)
33+
print("Item added to the queue.")
34+
else:
35+
# Handle the error
36+
print("API request failed with status code:", response.status_code)
37+
38+
await asyncio.sleep(70) # 70 Seconds between calls of the API this is the time for Cache/Update Frequency for public API
39+
except asyncio.CancelledError:
40+
print("Task canceled. Exiting fetch_market_cap_data.")
41+
42+
async def send_batch_to_kinesis(stream_name, queue, kinesis_streams):
43+
while True:
44+
try:
45+
batch_data = await asyncio.wait_for(queue.get(), timeout=5)
46+
except asyncio.TimeoutError:
47+
if queue.empty():
48+
print("queue is empty, stopping task.")
49+
break
50+
else:
51+
try:
52+
for idx, data in enumerate(batch_data):
53+
encoded_data = json.dumps(data).encode('utf-8')
54+
#print(encoded_data)
55+
kinesis_streams.send_stream(stream_name, encoded_data, None)
56+
except Exception as e:
57+
print(f"Erro while sending to kinesis : {e}" )
58+
59+
class KinesisStream():
60+
def __init__(self, region_name='eu-west-3'):
61+
self.kinesis_client = boto3.client('kinesis', region_name=region_name)
62+
63+
def send_stream(self, stream_name, data, partition_key=None):
64+
if partition_key == None:
65+
partition_key = str(uuid.uuid4())
66+
67+
try:
68+
response = self.kinesis_client.put_record(
69+
StreamName=stream_name,
70+
Data=data,
71+
PartitionKey=partition_key
72+
)
73+
#print("hello response is here ------------")
74+
#print(response)
75+
except self.kinesis_client.exceptions.ResourceNotFoundException:
76+
print(f"Kinesis stream '{stream_name}' not found")
77+
78+
async def main():
79+
load_dotenv()
80+
api_key = os.environ['api_key']
81+
queue = asyncio.Queue()
82+
stream_name = "api-to-kinesis-streams-coingecko"
83+
kinesis_streams = KinesisStream()
84+
85+
producer_task = asyncio.create_task(fetch_market_cap_data("usd",api_key, queue))
86+
87+
await asyncio.wait_for(producer_task, timeout=71)
88+
89+
sending_task = asyncio.create_task(send_batch_to_kinesis(stream_name,queue,kinesis_streams))
90+
91+
await sending_task
92+
93+
if __name__ == "__main__":
94+
asyncio.run(main())

coingecko_etl_pipeline/glue_job.py

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import sys
2+
from awsglue.transforms import *
3+
from pyspark.sql import functions as F
4+
from pyspark.sql.functions import col, first, expr
5+
from awsglue.utils import getResolvedOptions
6+
from pyspark.context import SparkContext
7+
from awsglue.context import GlueContext
8+
from awsglue.dynamicframe import DynamicFrame
9+
from awsglue.job import Job
10+
11+
sc = SparkContext.getOrCreate()
12+
glueContext = GlueContext(sc)
13+
spark = glueContext.spark_session
14+
job = Job(glueContext)
15+
16+
# Read data in Json format from s3 => stored in glue catalog
17+
18+
def create_df_froms3():
19+
dyf = glueContext.create_dynamic_frame.from_catalog(database='coingecko_database', table_name='data')
20+
df = dyf.toDF()
21+
return df
22+
23+
def remove_duplicate(df):
24+
# Remove duplicates records based on selected columns
25+
df_deduplicate = df.dropDuplicates(["id","last_updated"])
26+
return df_deduplicate
27+
28+
def drop_columns(df):
29+
# drop from table columns with struct type
30+
cols = ("roi", "image", "ath", "ath_change_percentage", "ath_date", "atl", "atl_change_percentage", "atl_date")
31+
32+
df_deduplicate= df.drop(*cols)
33+
return df_deduplicate
34+
35+
def clean_structure(df):
36+
df = df.withColumn("market_cap", expr("coalesce(market_cap.int, market_cap.long)"))\
37+
.withColumn("current_price", expr("coalesce(current_price.double, current_price.int)"))\
38+
.withColumn("fully_diluted_valuation", expr("coalesce(fully_diluted_valuation.int, fully_diluted_valuation.long)"))\
39+
.withColumn("total_volume", expr("coalesce(total_volume.int, total_volume.long)"))\
40+
.withColumn("high_24h", expr("coalesce(high_24h.double, high_24h.int)"))\
41+
.withColumn("low_24h", expr("coalesce(low_24h.double, low_24h.int)"))\
42+
.withColumn("market_cap_change_24h", expr("coalesce(market_cap_change_24h.double, market_cap_change_24h.int, market_cap_change_24h.long)"))
43+
#df_deduplicate.printSchema()
44+
return df
45+
46+
47+
if __name__ == "__main__":
48+
df = create_df_froms3()
49+
df_deduplicate = remove_duplicate(df)
50+
df_w_drop = drop_columns(df_deduplicate)
51+
df_final = clean_structure(df_w_drop)
52+
53+
# going from Spark dataframe to glue dynamic frame
54+
glue_dynamic_frame = DynamicFrame.fromDF(df_final, glueContext, "glue_etl")
55+
56+
s3output = glueContext.getSink(
57+
path="s3://coingecko-clean-datalake/clean_coins_data/",
58+
connection_type="s3",
59+
updateBehavior="UPDATE_IN_DATABASE",
60+
partitionKeys=[],
61+
compression="snappy",
62+
enableUpdateCatalog=True,
63+
transformation_ctx="s3output",
64+
)
65+
66+
s3output.setCatalogInfo(
67+
catalogDatabase="coingecko_database", catalogTableName="clean_marketcap_data"
68+
)
69+
70+
s3output.setFormat("glueparquet")
71+
s3output.writeFrame(glue_dynamic_frame)
72+
job.commit()

images/architecture.png

394 KB
Loading

0 commit comments

Comments
 (0)