This challenge was provided by Jobsity for the Data Engineering position testing candidates on Data Engineering and Software Engineering.
Your task is to build an automatic process to ingest data on an on-demand basis. The data represents trips taken by different vehicles, and include a city, a point of origin and a destination.
This CSV file gives you a small sample of the data your solution will have to handle. We would like to have some visual reports of this data, but in order to do that, we need the following features.
We do not need a graphical interface. Your application should preferably be a REST API, or a console application.
This is a Python application written using Flask at backend and PySpark for data processing. It was developed for running locally however I will give instructions on how to setup it on cloud environments.
src/
migrations/ - migrations for creating tables and seed data
models/ - the ORM models used for querying data
pipelines/ - the ETL pipelines scripts written using PySpark
utils/ - misc. functions
ingestion.py - Used for ingest from the SQL database using JDBC driver
transformation.py - Used for transforming ingested data in a ready-for-use format
group_trips_by_route.py - Uses a K-means ML model to group trips into routes
report_weekly_average.py - Creates the data for the weekly average number of trips report and sends it to SQL database
This is a python application that requires the following dependencies installed on your enviroment:
Spark and PySpark
MySQL database running locally
Flask
Other libraries can be installed using the requirements.txt.
Once you install all dependencies, please create a mysql database called jobsity_db with the following credentials:
user: jobsity_user
password: password
If needed, you can change these values on the yaml configurations files.
After that, please run the migrations scripts at src/migrations for a first charge and, only then, run the pipelines in this respective order:
ingestion -> transformation -> group_trips_by_route -> report_weekly_average
Use a SQL database.
The application uses a MySQL database instance running locally and also provides the yaml configuration file in order to deploy it on a kubernetes cluster. Also, the tables used are the following:
trips
- id
- region
- origin_x,y
- destination_x,y
- datetime
- datasource
bouding_boxes
- id
- x_min,y_min
- x_max,y_max
report_weekly_average
- id
- region
- route
- bouding_box_id
- weekly_average
There are 2 python scripts that seed the tables so the application can start working with some sample data.
src/migrations/
001_create_trips_table.py
002_create_bouding_boxes_table.py
You should run them before running anything else.
There must be an automated process to ingest and store the data.
The automated process to ingest data is based upon pipelines in python scripts that could be orchestrated using something like airflow.
The ingestion is made by reading tables from a MySQL database using jdbc driver on Spark and saving it on a datalake by appeding data so if there are many runs of the pipeline none will be lost. Here is the architecture:
The SQL database was chosen as entry point because it could be easily accessible by any backend apllication or data job. Also, it could be scalable by replicating on the kubernetes cluster or easily choosing a managed solution from any cloud provider.
The data saved on the datalake is saved with a timestamp called ingestion_timestamp and ingetion_date so it can be used as partitions in order to improve the further processing.
The transformation occurs entirely on the cluster side by running a spark job after the ingestion in order to read the entire raw ingested data and save it de-duplicated and properly partitioned for improving performance on a another location at the datalake so this way only the latest data will be available for the following processes.
The data is saved partitioned so trips with same regions and date are saved together to improve future uses and queries.
The ML K-means model that cluster similar trips as routes is run as a Spark Job and saves it on the datalake as datasets. Also, the report used to answer the question proposed about week
Trips with similar origin, destination, and time of day should be grouped together.
In order to group trips with similar origin, destination and time of day, I used a ML Model called K-means which given a number of clusters (k = 5) groups the data points by similarity. In addition, in order to improve the accuracy of the model predictions, I created one for each region because of the nature of the data which is location specific. This way we can analyze the statistics from the similar locations in each city and use this insights to improve the service.
As we can see on the image below, the trips are clustered by origin and destination coordinates which are passed on to the algorithm achieving the following predictions.
The model is created on the pipeline script and saved on disk at the cluster location like the following:
for region in regions:
df_region = df.filter(col("region") == region)
assembler = VectorAssembler(
inputCols=["origin_x", "origin_y", "destination_x", "destination_y"],
outputCol="features"
)
kmeans = KMeans().setK(5).setSeed(42)
pipeline = Pipeline(stages=[assembler, kmeans])
model = pipeline.fit(df_region)
model_path = f"{DATALAKE_PATH}/jobsity/models/routes_kmeans_{region}"
save_model(model, model_path)
predictions = model.transform(df_region)
As you can see, the models are saved to disk so they can be used later without having to training them again.
The resulting data should look like this:
The data is then saved partitioned on the dataset at the datalake by region, route and date.
Develop a way to obtain the weekly average number of trips for an area, defined by a bounding box (given by coordinates) or by a region.
In order to further automate the process of obtaining this insight I created an Spark Job that performs all the calculations necessary and save it into a SQL database that then can be access by backend applications and analytics users. Also, the bouding boxes with the coordinates are on a SQL table called bouding_boxes and contain coordinates that will be used to filter the trips occurred inside them and calculate the weekly average.
The pipeline is called report_weekly_average and it will calculate 3 metrics:
region_weekly_average - the weekly average of each route
region_route_weekly_average - the weekly average by route (from the ML model)
bouding_box_weekly_average - the weekly average in a bouding box defined by the user
The data can be accessed by an API served by the Flask application with the following route:
GET /api/report/weekly_average
param report - ex: region_weekly_average
param region - ex: Prague
param route - ex: route-1
param bbox_id - ex: 1
Develop a way to inform the user about the status of the data ingestion without using a polling solution.
In progress.
The solution should be scalable to 100 million entries. It is encouraged to simplify the data by a data model. Please add proof that the solution is scalable.
Since the SQL database is only used for receiving the data from client facing applications it could be periodically clean so it does not become a bottleneck and because the following data processing occurs in Spark jobs independently it could leverage the compute power to process large amounts of data. Also, the reports and metrics are all calculated on the data side and only served to the backend via SQL tables which are used for displaying via APIs and dashboards.
So, we have a very scalable solution that is able to handle to 100 million entries.
Containerize your solution.
Both the application and the MySQL databases had YAML files created for deploying it on kubernets cluster. But first you will need to build the docker image file using:
docker build . -t jobsity-app
Sketch up how you would set up the application using any cloud provider (AWS, Google Cloud, etc).
The pipelines could be easily turned into Spark EMR jobs by saving the files into S3 and setup the jobs to execute them. Also, the S3 could be used as an datalake and we could use either the SQL managed solution or an EC2 instance with it, being the first option preferreble. The Python application since its a simple Flask one could be easily deployed into Elastic Beastalk.
The pipelines could be managed by the ADF running them on DataBricks notebooks connected to standalone clusters or spot clusters for low cost. Also, the SQL database could be migrated to its Database Service for managed solution and the data lake use the Blob Storage connected to the DataBricks for access control.
Include a .sql file with queries to answer these questions:
Both were run at the MySQL database and they are under the sql_query folder.
From the two most commonly appearing regions, which is the latest datasource?
WITH top_regions AS (
SELECT
region,
COUNT(*) as occurrences
FROM trips
GROUP BY region
ORDER BY occurrences DESC
LIMIT 2
)
SELECT
t.region,
t.datasource,
top_regions.occurrences
FROM trips t
INNER JOIN top_regions ON t.region = top_regions.region
WHERE t.datetime = ( SELECT MAX(datetime) FROM trips WHERE region = t.region );
What regions has the "cheap_mobile" datasource appeared in?
SELECT
region,
COUNT(*) occurrences
FROM trips
WHERE datasource = 'cheap_mobile'
GROUP BY region
ORDER BY occurrences DESC;