Skip to content

Commit 6378c39

Browse files
author
Carlos Vivar
committed
Major update. Dockerfile and style correction.
I added an output folder where the database and the html will be saved. The dockerfile is working and ready for testing in mybinder. Requirements.txt has been updated with my versions.
1 parent d7bb116 commit 6378c39

22 files changed

+126
-119
lines changed

.vscode/settings.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"python.linting.flake8Enabled": true,
3+
"python.linting.enabled": true,
4+
"python.analysis.typeCheckingMode": "basic"
5+
}

COVID19_project/__main__.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from dependencies.spark import start_spark
1313

14+
1415
def main():
1516
"""Main ETL script definition.
1617
@@ -19,29 +20,28 @@ def main():
1920

2021
# start Spark application and get Spark session, logger and config
2122
spark, log, config = start_spark(
22-
app_name='covid19',
23-
files=['configs/config.json'])
23+
app_name="covid19",
24+
files=["configs/config.json"])
2425

2526
# log that main ETL job is starting
26-
log.warn('covid19 job is up-and-running')
27+
log.warn("covid19 job is up-and-running")
2728

2829
# execute ETL pipeline
2930
data = extract_data(spark)
3031
data_transformed = transform_data(data)
3132
load_data(spark, data_transformed)
3233

3334
# execute visualization pipeline
34-
data_loaded = read_data(spark) # Here we can specify the query
35-
bokeh_app(data_loaded) # Here we can specify the path
35+
data_loaded = read_data(spark)
36+
bokeh_app(data_loaded)
3637

3738
# log the success and terminate Spark application
38-
log.warn('covid19 job is finished')
39+
log.warn("covid19 job is finished")
3940
spark.stop()
4041

4142
return None
4243

4344

44-
if __name__ == '__main__':
45+
if __name__ == "__main__":
4546
# Check if the api is online and that the version is compatible.
4647
main()
47-

COVID19_project/extract.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,26 @@
1111
from pyspark.sql import SparkSession, DataFrame
1212

1313

14-
def data_validation(df: DataFrame) -> bool:
14+
def data_validation(df: pd.DataFrame) -> bool:
1515
"""Validate data extracted
1616
1717
:param df: Input DataFrame.
1818
:return: Validation output in boolean
1919
"""
2020

2121
if df.empty:
22-
print('\n* No data were downloaded \n*')
22+
print("\n* No data were downloaded \n*")
2323
return False
2424

2525
if not pd.Series(df["date"]).is_unique:
26-
print('\n* Primary key check violated. Terminating extraction *\n')
26+
print("\n* Primary key check violated. Terminating extraction *\n")
2727

2828
if df.isnull().values.any():
29-
raise Exception('\n* Null values found. Terminating extraction *\n')
29+
raise Exception("\n* Null values found. Terminating extraction *\n")
3030

3131
return True
3232

33+
3334
def extract_data(spark: SparkSession) -> DataFrame:
3435
"""Load data from Parquet file format.
3536
@@ -40,24 +41,22 @@ def extract_data(spark: SparkSession) -> DataFrame:
4041
response = requests.get("https://api.covidtracking.com/v2/us/daily.json")
4142

4243
if response.status_code != 200:
43-
raise Exception(f'\n* Request failed with status code {response.status_code} *\n')
44+
raise Exception(f"\n* Request failed with status code {response.status_code} *\n")
4445

4546
js = response.json()
4647

4748
dates = []
4849
total_cases = []
49-
50-
data = js['data']
50+
data = js["data"]
5151

5252
for item in data:
53-
dates.append(item['date'])
53+
dates.append(item["date"])
5454
total_cases.append(item["cases"]["total"]["value"])
5555

56-
5756
# Improve this directly in spark.
5857
pdf = pd.DataFrame({"date": dates, "total_cases": total_cases}).dropna()
5958

60-
assert data_validation(pdf), '\n* Data validation not achieved *\n'
59+
assert data_validation(pdf), "\n* Data validation not achieved *\n"
6160

6261
sdf = spark.createDataFrame(pdf)
6362

COVID19_project/load.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,32 @@
88
import os
99
from pyspark.sql import SparkSession, DataFrame
1010

11+
1112
def load_data(spark: SparkSession, df: DataFrame, db_file: str = "db.parquet"):
1213
"""Collect data locally and write to a parquet file.
1314
15+
:param spark: Spark session used.
1416
:param df: DataFrame to store.
17+
:param db_file: Database filename.
1518
:return: None
1619
"""
20+
21+
# First let's check if output folder is created
22+
if not os.path.exists("output"):
23+
os.mkdir("output")
1724

18-
if os.path.exists(db_file):
19-
df_old = spark.read.parquet(db_file).cache()
20-
df_updated = df.union(df_old).dropDuplicates()#.cache()
25+
# If there's a previous version of the db, we need to update it aqnd get
26+
# rid of duplicates. This makes this loading process idempotem. The database
27+
# will remain similar independently of how many times this process is run.
28+
# This helps to nmake this ETL process async from others.
29+
if os.path.exists(f"output/{db_file}"):
30+
df_old = spark.read.parquet(f"output/{db_file}").cache()
31+
df_updated = df.union(df_old).dropDuplicates()
2132

2233
print(f"DB updated with {df_updated.count()} entries")
2334

24-
df_updated.write.mode('overwrite').parquet(db_file)
35+
df_updated.write.mode('overwrite').parquet(f"output/{db_file}")
2536
else:
26-
df.write.parquet(db_file)
37+
df.write.parquet(f"output/{db_file}")
2738

2839
return None

COVID19_project/transform.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def transform_item_date_to_datetime(date: datetime.date) -> datetime.datetime:
2626

2727
return min_datetime
2828

29+
2930
def transform_col_date_to_datetime(df: DataFrame, input_name: str, output_name: str) -> DataFrame:
3031
"""Transform column date into to column in datetime type.
3132
@@ -40,6 +41,7 @@ def transform_col_date_to_datetime(df: DataFrame, input_name: str, output_name:
4041

4142
return df
4243

44+
4345
def transform_col_string_to_date(df: DataFrame, input_name: str, output_name: str) -> DataFrame:
4446
"""Transform column date in string to column date in date type.
4547
@@ -49,7 +51,7 @@ def transform_col_string_to_date(df: DataFrame, input_name: str, output_name: st
4951
:return: Transformed Spark DataFrame.
5052
"""
5153

52-
df = df.withColumn(output_name, to_date(input_name, 'yyyy-MM-dd'))
54+
df = df.withColumn(output_name, to_date(input_name, "yyyy-MM-dd"))
5355

5456
return df
5557

@@ -73,7 +75,8 @@ def calc_daily_difference(df: DataFrame, input_name: str , output_name: str) ->
7375

7476
return diff_df
7577

76-
def calc_rolling_mean(df: DataFrame, temporal_window:int, input_name: str, output_name: str) -> DataFrame:
78+
79+
def calc_rolling_mean(df: DataFrame, temporal_window: int, input_name: str, output_name: str) -> DataFrame:
7780
"""Calcultation of rolling mean
7881
7982
:param df: Input Spark DataFrame.
@@ -105,7 +108,7 @@ def transform_data(df: DataFrame) -> DataFrame:
105108

106109
df = df.sort("datetime")
107110

108-
df = calc_daily_difference(df, input_name="total_cases" , output_name="difference_total_cases" )
111+
df = calc_daily_difference(df, input_name="total_cases" , output_name="difference_total_cases")
109112

110113
df = calc_rolling_mean(df, 7, input_name="difference_total_cases", output_name="rolling_mean_total_cases")
111114

COVID19_project/visualize.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,41 @@
88
from bokeh.plotting import figure, output_file, save
99
from bokeh.models import ColumnDataSource
1010

11-
def read_data(spark: SparkSession) -> DataFrame:
11+
def read_data(spark: SparkSession, db_file: str = "db.parquet") -> DataFrame:
1212
"""Read data from database
1313
1414
:param spark: spark session
15+
:param db_file: database file in output folder
1516
:return: Spark DataFrame
1617
"""
17-
#with spark_timezone("Europe/Berlin"):
18-
data_loaded = spark.read.parquet("db.parquet")
18+
19+
data_loaded = spark.read.parquet(f"output/{db_file}")
1920
data_loaded = data_loaded.sort("date").dropna()
2021

2122
# Check datetime format
2223

2324
return data_loaded
2425

2526

26-
def bokeh_app(df: DataFrame) -> None:
27+
def bokeh_app(df: DataFrame, html_file: str = "covid19.html") -> None:
2728
"""Bokeh Time-Series visualization
2829
2930
:param df: Input Spark DataFrame
31+
:param html_file: name of the file output
3032
:return: None
3133
"""
3234

3335
x = df.select("datetime").rdd.flatMap(lambda x: x).collect()
3436
y_diff = df.select("difference_total_cases").rdd.flatMap(lambda x: x).collect()
3537
y_roll = df.select("rolling_mean_total_cases").rdd.flatMap(lambda x: x).collect()
3638

37-
source = ColumnDataSource(data={'date': x, 'difference_total_cases': y_diff, 'rolling_mean_total_cases': y_roll})
39+
source = ColumnDataSource(data={"date": x, "difference_total_cases": y_diff, "rolling_mean_total_cases": y_roll})
3840

39-
p = figure(title="COVID 19", x_axis_label='Date', y_axis_label='Cases', x_axis_type='datetime', plot_width=800)
40-
p.line('date', 'difference_total_cases' , legend_label="Daily Difference", line_width=2, line_color='blue', source=source)
41-
p.line('date', 'rolling_mean_total_cases', legend_label="Rolling Mean", line_width=2, line_color='red', source=source)
41+
p = figure(title="COVID 19", x_axis_label="Date", y_axis_label="Cases", x_axis_type="datetime", plot_width=800)
42+
p.line("date", "difference_total_cases", legend_label="Daily Difference", line_width=2, line_color="blue", source=source)
43+
p.line("date", "rolling_mean_total_cases", legend_label="Rolling Mean", line_width=2, line_color="red", source=source)
4244

43-
output_file("covid19.html")
45+
output_file(f"output/{html_file}")
4446
save(p)
4547

4648
return None

Dockerfile

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
FROM continuumio/miniconda3:4.7.12
1+
##FROM continuumio/miniconda3:4.7.12
2+
FROM jupyter/pyspark-notebook:latest
23

3-
RUN mkdir /opt/SDSC/
4-
ADD covid19_project /opt/SDSC/covid_project/
4+
COPY covid19_project ./covid19_project
5+
COPY configs ./configs
6+
COPY dependencies ./dependencies
7+
COPY tests ./tests
8+
COPY requirements.txt ./requirements.txt
59

6-
WORKDIR /opt/SDSC/
7-
ENV PYTHONPATH /opt/SDSC
8-
RUN python setup.py install
10+
RUN pip install -r requirements.txt

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,9 @@ In order to test: `python -m unittest test/test_*.py`
309309

310310
Still some work is required when using spark-submit with `$SPARK_HOME/bin/spark-submit --master local[*] --files configs/config.json covid19_project/__main__.py`
311311

312+
#### 28/10
312313

314+
Applying some style corrections with flake8, and configuring correctly the docker container for mybinder.
313315

314316
### Tutorial
315317

docs/load.html

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,11 @@ <h2>load.py<a class="headerlink" href="#load-py" title="Permalink to this headin
9191
<dd><p>Collect data locally and write to a parquet file.</p>
9292
<dl class="field-list simple">
9393
<dt class="field-odd">Parameters</dt>
94-
<dd class="field-odd"><p><strong>df</strong> – DataFrame to store.</p>
94+
<dd class="field-odd"><ul class="simple">
95+
<li><p><strong>spark</strong> – Spark session used.</p></li>
96+
<li><p><strong>df</strong> – DataFrame to store.</p></li>
97+
<li><p><strong>db_file</strong> – Database filename.</p></li>
98+
</ul>
9599
</dd>
96100
<dt class="field-even">Returns</dt>
97101
<dd class="field-even"><p>None</p>

0 commit comments

Comments
 (0)