Skip to content

Commit 416ab2f

Browse files
author
Carlos Vivar
committed
Improve in ETL methods, sketch of test
Improve ETL methods, the inclusion of difference transformation, and plot of difference. Sketch of testing methods added. Now spark is summoned with a proper class in dependencies.
1 parent 1c18d2d commit 416ab2f

File tree

7 files changed

+154
-31
lines changed

7 files changed

+154
-31
lines changed

COVID19_project/__main__.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,21 @@
1313
from load import load_data
1414
from visualize import read_data, bokeh_app
1515

16+
from dependencies import start_spark
17+
1618
def main():
1719
"""Main ETL script definition.
1820
1921
:return: None
2022
"""
2123

2224
# start Spark application and get Spark session, logger and config
23-
#spark, log, config = start_spark(
24-
# app_name='my_etl_job',
25-
# files=['configs/config.json'])
26-
spark = SparkSession.builder.appName('COVID19').enableHiveSupport().getOrCreate()
25+
spark, log, config = start_spark(
26+
app_name='covid19',
27+
files=['configs/config.json'])
2728

2829
# log that main ETL job is starting
29-
#log.warn('etl_job is up-and-running')
30+
log.warn('covid19 job is up-and-running')
3031

3132
# execute ETL pipeline
3233
data = extract_data(spark)
@@ -38,7 +39,7 @@ def main():
3839
bokeh_app(data_loaded) # Here we can specify the path
3940

4041
# log the success and terminate Spark application
41-
#log.warn('test_etl_job is finished')
42+
log.warn('covid19 job is finished')
4243
spark.stop()
4344

4445
return None

COVID19_project/extract.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,27 @@
33
from datetime import datetime
44
import pandas as pd
55

6-
from pyspark.sql import SparkSession, DataFrame
6+
from pyspark.sql import SparkSession,
7+
8+
9+
def data_validation(df: DataFrame) -> bool:
10+
"""Validate data extracted
11+
12+
:param df: Input DataFrame.
13+
:return: Validation output in boolean
14+
"""
15+
16+
if df.empty:
17+
print('\n* No data were downloaded \n*')
18+
return False
19+
20+
if not pd.Series(df["date"]).is_unique:
21+
print('\n* Primary key check violated. Terminating extraction *\n')
22+
23+
if df.isnull().values.any():
24+
raise Exception('\n* Null values found. Terminating extraction *\n')
25+
26+
return True
727

828
def extract_data(spark) -> DataFrame:
929
"""Load data from Parquet file format.
@@ -27,7 +47,9 @@ def extract_data(spark) -> DataFrame:
2747
total_cases.append(item["cases"]["total"]["value"])
2848

2949
# Improve this directly in spark.
30-
pdf = pd.DataFrame({"date": dates, "total_cases": total_cases})
50+
pdf = pd.DataFrame({"date": dates, "total_cases": total_cases}).dropna()
51+
52+
assert data_validation(pdf), '\n* Data validation not achieved *\n'
3153
sdf = spark.createDataFrame(pdf)
3254

3355
return sdf

COVID19_project/transform.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,9 @@
33
from pyspark.sql import DataFrame
44
from pyspark.sql.functions import udf, to_date
55
from pyspark.sql.types import TimestampType
6+
import pyspark.pandas as ps
67

78

8-
def data_validation(df: DataFrame) -> bool:
9-
"""Transform original dataset.
10-
11-
:param df: Input DataFrame.
12-
:return: Validation output in boolean
13-
"""
14-
15-
if df.empty:
16-
print('\n* No data were downloaded \n*')
17-
return False
18-
19-
if not pd.Series(df["date"]).is_unique:
20-
print('\n* Primary key check violated. Terminating extraction *\n')
21-
22-
if df.isnull().values.any():
23-
raise Exception('\n* Null values found. Terminating extraction *\n')
24-
25-
return True
26-
279
def calc_moving_average(df: DataFrame, temporal_window:int) -> DataFrame:
2810
"""Calcultation of moving average
2911
@@ -47,6 +29,21 @@ def transform_date_to_datetime(date: datetime.date) -> datetime.datetime:
4729

4830
return min_datetime
4931

32+
def calc_daily_difference(df: DataFrame) -> DataFrame:
33+
"""Calcultation of daily difference
34+
35+
:param df: Input Spark DataFrame.
36+
:return: Transformed DataFrame.
37+
"""
38+
39+
psdf = df.pandas_api()
40+
diff_series = psdf["total_cases"].diff()
41+
diff_series.name = "difference_total_cases"
42+
43+
diff_psdf = ps.merge(psdf, diff_series, left_index=True, right_index=True, how="left")
44+
diff_df = diff_psdf.to_spark()
45+
46+
return diff_df
5047

5148

5249
def transform_data(df: DataFrame) -> DataFrame:
@@ -61,4 +58,7 @@ def transform_data(df: DataFrame) -> DataFrame:
6158
reg_transform_date_to_datetime = udf(lambda d: transform_date_to_datetime(d), TimestampType())
6259
df = df.withColumn("datetime", reg_transform_date_to_datetime("date"))
6360

61+
df = df.sort("datetime")
62+
df = calc_daily_difference(df)
63+
6464
return df

COVID19_project/visualize.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def read_data(spark: SparkSession) -> DataFrame:
1515
"""
1616
#with spark_timezone("Europe/Berlin"):
1717
data_loaded = spark.read.parquet("db.parquet")
18-
data_loaded = data_loaded.sort("date", "total_cases")
18+
data_loaded = data_loaded.sort("date")
1919

2020
# Check datetime format
2121

@@ -30,7 +30,7 @@ def bokeh_app(df: DataFrame) -> None:
3030
"""
3131

3232
x = df.select("datetime").rdd.flatMap(lambda x: x).collect()
33-
y = df.select("total_cases").rdd.flatMap(lambda x: x).collect()
33+
y = df.select("difference_total_cases").rdd.flatMap(lambda x: x).collect()
3434

3535
p = figure(title="COVID 19", x_axis_label='Date', y_axis_label='Total Cases', x_axis_type='datetime')
3636
p.line(x, y, legend_label="Covid cases", line_width=2)

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,22 @@ parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")
271271

272272
After dealing with some problems realted to the date/datetime format I got the first MVP of the pipeline. Now data is extracted, dates transformed into a proper datetime type, and data loaded into a parquet db. Bokeh app is able to read this data from the database and plot a simple time-series plot in html. This is the first candidate to the first release.
273273

274+
Useful links:
275+
- https://crontab.guru/
276+
- https://github.com/AlexIoannides/pyspark-example-project
277+
- https://www.youtube.com/watch?v=nVI4xEH7yU8&ab_channel=Intellipaat
278+
- https://github.com/rvilla87/ETL-PySpark/blob/master/jupyter/ETL.ipynb
279+
- https://github.com/Amaguk2023/Pyspark_Spotify_ETL
280+
- https://github.com/hbaflast/pyspark-project-template
281+
- https://github.com/vivek-bombatkar/Spark-with-Python---My-learning-notes-
282+
- https://github.com/hyunjoonbok/PySpark/blob/master/PySpark%20Dataframe%20Complete%20Guide%20(with%20COVID-19%20Dataset).ipynb
283+
- https://www.databricks.com/glossary/extract-transform-load#:~:text=ETL%2C%20which%20stands%20for%20extract,downstream%20to%20solve%20business%20problems.
284+
- https://www.youtube.com/watch?v=AHMm1wfGuHE&t=1s&ab_channel=TuanVu
285+
- https://www.revisitclass.com/hadoop/how-to-write-a-spark-dataframe-to-hive-table-in-pyspark/
286+
- https://hshirodkar.medium.com/apache-hive-on-docker-4d7280ac6f8e
287+
-
288+
289+
274290
### Tutorial
275291

276292
In order to build the docker image:

dependencies/spark.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
from dependencies import logging
1717

18-
1918
def start_spark(app_name='my_spark_app', master='local[*]', jar_packages=[],
2019
files=[], spark_config={}):
2120
"""Start Spark session, get Spark logger and load config files.
@@ -24,6 +23,7 @@ def start_spark(app_name='my_spark_app', master='local[*]', jar_packages=[],
2423
will apply when this is called from a script sent to spark-submit.
2524
All other arguments exist solely for testing the script from within
2625
an interactive Python console.
26+
2727
This function also looks for a file ending in 'config.json' that
2828
can be sent with the Spark job. If it is found, it is opened,
2929
the contents parsed (assuming it contains valid JSON for the ETL job
@@ -32,6 +32,7 @@ def start_spark(app_name='my_spark_app', master='local[*]', jar_packages=[],
3232
this function. If the file cannot be found then the return tuple
3333
only contains the Spark session and Spark logger objects and None
3434
for config.
35+
3536
The function checks the enclosing environment to see if it is being
3637
run from inside an interactive console session or from an
3738
environment which has a `DEBUG` environment variable set (e.g.
@@ -42,6 +43,7 @@ def start_spark(app_name='my_spark_app', master='local[*]', jar_packages=[],
4243
to using the spark-submit and Spark cluster defaults. This will also
4344
use local module imports, as opposed to those in the zip archive
4445
sent to spark via the --py-files flag in spark-submit.
46+
4547
:param app_name: Name of Spark app.
4648
:param master: Cluster connection details (defaults to local[*]).
4749
:param jar_packages: List of Spark JAR package names.
@@ -100,4 +102,4 @@ def start_spark(app_name='my_spark_app', master='local[*]', jar_packages=[],
100102
spark_logger.warn('no config file found')
101103
config_dict = None
102104

103-
return spark_sess, spark_logger,
105+
return spark_sess, spark_logger, config_dict

tests/test_transform.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
2+
"""
3+
test_transform.py
4+
~~~~~~~~~~~~~~~
5+
This module contains unit tests for the transformation steps of the ETL
6+
job defined in etl_job.py. It makes use of a local version of PySpark
7+
that is bundled with the PySpark package.
8+
"""
9+
10+
import unittest
11+
import json
12+
13+
from pyspark.sql.functions import mean
14+
from dependencies.spark import start_spark
15+
from COVID19_project.transform import transform_data
16+
17+
18+
class SparkTransformTests(unittest.TestCase):
19+
"""Test suite for transformation in transform.py
20+
"""
21+
22+
def setUp(self):
23+
"""Start Spark, define config and path to test data
24+
"""
25+
self.config = json.loads("""{"steps_per_floor": 21}""")
26+
self.spark, *_ = start_spark()
27+
self.test_data_path = 'tests/test_data/'
28+
29+
def tearDown(self):
30+
"""Stop Spark
31+
"""
32+
self.spark.stop()
33+
34+
def test_transform_data(self):
35+
"""Test data transformer.
36+
Using small chunks of input data and expected output data, we
37+
test the transformation step to make sure it's working as
38+
expected.
39+
"""
40+
# assemble
41+
input_data = (
42+
self.spark
43+
.read
44+
.parquet(self.test_data_path + 'test_data'))
45+
46+
expected_data = (
47+
self.spark
48+
.read
49+
.parquet(self.test_data_path + 'test_results'))
50+
51+
expected_cols = len(expected_data.columns)
52+
expected_rows = expected_data.count()
53+
54+
expected_avg_steps = (
55+
expected_data
56+
.agg(mean('steps_to_desk').alias('avg_steps_to_desk'))
57+
.collect()[0]
58+
['avg_steps_to_desk'])
59+
60+
# act
61+
data_transformed = transform_data(input_data, 21)
62+
63+
cols = len(expected_data.columns)
64+
rows = expected_data.count()
65+
avg_steps = (
66+
expected_data
67+
.agg(mean('steps_to_desk').alias('avg_steps_to_desk'))
68+
.collect()[0]
69+
['avg_steps_to_desk'])
70+
71+
72+
# assert
73+
self.assertEqual(expected_cols, cols)
74+
self.assertEqual(expected_rows, rows)
75+
self.assertEqual(expected_avg_steps, avg_steps)
76+
self.assertTrue([col in expected_data.columns
77+
for col in data_transformed.columns])
78+
79+
80+
81+
if __name__ == '__main__':
82+
unittest.main()

0 commit comments

Comments
 (0)