Maurizio Idini
This is a simple document that briefly describes the repository. The aim of the use case implemented in this repository is to perform table transformations based on Slowly Changing Dimension type2
The module is written in Python 3.8, using Docker container. The lib is written using google-cloud library, documented using Docstring and tested using pytest.
The code is written for the transformation module. It is assumed that the bigquery tables already exist.
Before running the code, please check if the application_default_credential.json
credential file is already on your machine, in the path ~/.config/gloud/
.
If not, please run the following command in the terminal gcloud auth application-default login
and follow the steps.
transformation_scd2
┣ bin
┃ ┣ down.sh
┃ ┣ test.sh
┃ ┣ test.up.sh
┃ ┗ up.sh
┣ lib
┃ ┣ data
┃ ┃ ┣ comparer
┃ ┃ ┃ ┣ TableComparer.py
┃ ┃ ┃ ┗ __init__.py
┃ ┃ ┣ ingestor
┃ ┃ ┃ ┣ DataIngestor.py
┃ ┃ ┃ ┗ __init__.py
┃ ┃ ┗ __init__.py
┃ ┗ dbmanagement
┃ ┃ ┣ connector
┃ ┃ ┃ ┣ BigQueryConnector.py
┃ ┃ ┃ ┗ __init__.py
┃ ┃ ┣ tablemanagement
┃ ┃ ┃ ┣ BigQueryManager.py
┃ ┃ ┃ ┗ __init__.py
┃ ┃ ┣ transaction
┃ ┃ ┃ ┣ BigquerySession.py
┃ ┃ ┃ ┣ BigqueryTransaction.py
┃ ┃ ┃ ┗ __init__.py
┃ ┃ ┗ __init__.py
┣ sql_example
┃ ┣ setup_tables
┃ ┃ ┣ create_populate_Table2_Partners_Output.sql
┃ ┃ ┗ create_populate_Table_1_Partners_Input.sql
┃ ┣ simulate_update_source.sql
┃ ┗ update_table_2_partners_output.sql
┣ tests
┃ ┣ integration
┃ ┃ ┣ data
┃ ┃ ┃ ┣ comparer
┃ ┃ ┃ ┃ ┗ TableComparer_test.py
┃ ┃ ┃ ┗ ingestor
┃ ┃ ┃ ┃ ┗ DataIngestor_test.py
┃ ┃ ┗ dbmanagement
┃ ┃ ┃ ┣ connector
┃ ┃ ┃ ┃ ┗ BigQueryConnector_test.py
┃ ┃ ┃ ┗ tablemanagement
┃ ┃ ┃ ┃ ┗ BigQueryManager_test.py
┃ ┗ unit
┃ ┃ ┗ data
┃ ┃ ┃ ┣ comparer
┃ ┃ ┃ ┃ ┗ TableComparer_test.py
┃ ┃ ┃ ┗ ingestor
┃ ┃ ┃ ┃ ┗ DataIngestor_test.py
┣ .gitignore
┣ Dockerfile
┣ README.md
┣ app.py
┣ docker-compose.yml
┗ requirements.txt
The main folder contains
requirements.txt
with the libraries used in the projectDockerfile
anddocker-compose.yml
for the Docker containerlib
that contains the codebin
folder with bash script useful to run docker environmentsapp.py
that contains the FlaskAPI code to trigger the processREADME.md
that contains this description
The lib
code is composed by
data
folder that contains the code for comparer and ingestordbmanagement
folder that contains the code for connector, tablemanagement and transaction
Furthermore, the repository contains a sql_example
folder with sql files to
- generate table Table1_Partners_Input.sql
- generate table Table2_Partners_Output.sql
- simulate transformation tables
The docs
folder contains class documentation generated using pdoc library.
The documentation is accessible through Documentation Main Page
The Transformation_SCD2 process is described as follows:
A process (in the use_case the /trigger/
POST call) calls DataIngestor.ingest_data
on the source and destination tables with a specific Primary Key. This Primary Key is also the Surrogate Key on the destination table.
The DataIngestor
object is initialized using an instance of BigQueryConnector
to connect to BigQuery, and the connection is already instantiated on a specific project_id
.
project_id = 'my-prj'
bq_client = BigQueryConnector(project_id).get_client()
ingestor = DataIngestor(bq_client)
ingestor.ingest_data(src_table, dest_table, pkey)
The ingest_data
method calls TableComparer.compare_tables
on the already mentioned tables and checks for new/updated/deleted records on the source table based on the destination table.
Since the destination table is a DWH table, it should contain technical parameters such as TechnicalKEY
, Date_from
, Date_to
and Is_valid
. Furthermore, the schema for the source table and destination table should be the same, except for the technical fields in the destination table.
The check is performed using a SQL query to delegate the computation to the BigQuery engine, excluding non-valid records (Is_valid = 'no'
) and technical fields from the destination table.
with src_table as (
select *
from `src_table`
),
dest_table as (
select * except(TechnicalKey, Date_From, Date_To, Is_valid)
from `dest_table`
where Is_valid = 'yes'
),
rows_to_update as (
select *
from src_table
except distinct
select *
from dest_table
),
rows_to_delete as (
select *
from dest_table
except distinct
select *
from src_table
)
select *, 1 as operation # new/updated
from rows_to_update
union all
select *, 2 as operation # deleted
from rows_to_delete;
The result of TableComparer.compare_tables
is a pandas.DataFrame
containing all fields from the source and destination table and a columnoperation
where
- value
1
means new or updated records (to insert in the destination table) - value
2
means deleted records (to invalidate in the destination table)
Using that DataFrame, the ingest_data
method updates the destination table by invalidating existing records (setting Is_valid = 'no'
) for every primary key stored in the DataFrame. This ensures that every record to be updated and deleted is invalidated. Then, the method inserts the records contained in the DataFrame where the operation
value is equal to 1, in order to insert new and updated records.
The insert and update operations are performed using BigQueryTransaction
, instantiated on a BigQuerySession
, in order to guarantee consinstency during transactions.
The BigQueryTransaction
works in this way:
- an instance of
BigQuerySession
creates a session on BigQuery - the
BigQueryTransaction.begin_transaction
runsBEGIN TRANSACTION;
on the already created session and returns thejob_config
object, as reference of transaction - the inserts and updates runs on the `job_config' previously initialized
- if the run executes without errors, the
BigQueryTransaction.commit_transaction
performs the commit runningCOMMIT TRANSACTION;
- otherwise, the
BigQueryTransaction.rollback_transaction
is called, performing theROLLBACK TRANSACTION;
on BigQuery
You can run the code in two ways:
- using
python app.py
in your local machine - using Docker, running
./bin/up.sh
Verify the deployment by navigating to 127.0.0.1:5001 in your preferred browser.
You can run the tests accessing to test_transformation_scd2
Docker container running
./bin/test.up.sh
and./bin/test.sh
./bin/test.sh
in your local machine
- Check the schema between source and destination table
- Implement Factory pattern in
Connector
,TableManagement
andTransaction
in order to extend DataIngestor with the use of other DBs