Skip to content

Commit 62fcc83

Browse files
authored
F/thread pinning (#14)
* initial codebase * new code * thread pinning code * final code? * pinning * materialization step * register custom step * dag race condition * custome steps * fixes * fixes * start nodes once * better feb * nicer code * awesome feb * node redefine code * handle rerunning vis updates * returns -> outputs refactor * renames * incremental * pipeline state multitenancy * updated unit tests * version bump * nice names in spark ui * readme and 10.4 LTS support
1 parent 2b44124 commit 62fcc83

22 files changed

+1061
-221
lines changed

.vscode/launch.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
"request": "launch",
1010
"name": "Unit Tests (on Databricks)",
1111
"program": "${workspaceFolder}/pytest_databricks.py",
12-
"args": ["."],
12+
"args": [".", "-vv"],
1313
"env": {}
1414
}
1515
]

.vscode/settings.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,7 @@
55
"tests"
66
],
77
"python.testing.unittestEnabled": false,
8-
"python.testing.pytestEnabled": true
8+
"python.testing.pytestEnabled": true,
9+
"jupyter.interactiveWindow.cellMarker.codeRegex": "^# COMMAND ----------|^# Databricks notebook source|^(#\\s*%%|#\\s*\\<codecell\\>|#\\s*In\\[\\d*?\\]|#\\s*In\\[ \\])",
10+
"jupyter.interactiveWindow.cellMarker.default": "# COMMAND ----------"
911
}

README.md

Lines changed: 83 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,45 @@
11
# What is bdq?
2+
23
BDQ stands for Big Data Quality, a set of tools/function that help you every day assert quality of datasets you have processed or ingested. Library leverages power of spark, hence it processes your quality checks at scale offered by spark and databricks.
34

5+
Library over time evolved into DAG executor of arbitrary python/pyspark functions. This allows to scale execution with dependency tracking to next level.
6+
7+
## How to install?
8+
9+
On databricks run `%pip install bdq==x.y.z`. Make sure you tag version number you are confortable with to ensure API stability.
10+
11+
This package is currently in EXPERIMENTAL stage and newer releses might change APIs (function names, parameters, etc..).
12+
13+
## Supported spark/databricks versions
14+
15+
Development and testing has been performed on 12.2 LTS databricks runtime. Databricks runtime 10.4 LTS should also work with exception of `SparkPipeline.step_spark_for_each_batch` and `SparkPipeline.spark_metric` which require 12.2 LTS runtime to work.
16+
17+
## Verbose output
18+
19+
bdq utilizes python `logging` module, hence for the best logging experience configure logger, example setup:
20+
21+
```python
22+
import logging
23+
import sys
24+
25+
# py4j is very chatty, no need to deal with it unless it's critical
26+
logging.getLogger('py4j').setLevel(logging.CRITICAL)
27+
28+
# run everything else on DEBUG by default
29+
# DEBUG prints a lot of usefull info, INFO is good in general use cases
30+
logging.basicConfig(
31+
stream=sys.stderr,
32+
level=logging.DEBUG,
33+
format='%(asctime)s %(levelname)s %(threadName)s [%(name)s] %(message)s'
34+
)
35+
```
36+
37+
### Examples
38+
39+
See examples bellow for short listing of major functionalities. See `tests` folder for detailed examples. Tests are meant to double as real use cases hence they serve as documentation of examples for now.
40+
441
## Comparing dataframe schemas
42+
543
```python
644
import bdq
745
from datetime import datetime
@@ -30,7 +68,7 @@ assert bdq.compare_schemas(df2.schema, df2_changed.schema) == {
3068
}
3169
```
3270

33-
## Comparing dataframe's data
71+
### Comparing dataframe's data
3472

3573
```python
3674
import bdq
@@ -77,7 +115,8 @@ bdq.display_compare_dataframes_results(df_diff)
77115
Not changed records count: 1
78116
```
79117

80-
## Surrogate key generation
118+
### Surrogate key generation
119+
81120
with support of optional uppercasing and trimming of key columns
82121

83122
```python
@@ -115,7 +154,8 @@ sk_df = df1.select(
115154
>> +---+----+--------------+-----+-------------------------------------------------------------+---------------------------+
116155
```
117156

118-
## PK & FK intergrity checks
157+
### PK & FK intergrity checks
158+
119159
with support of showing sample data from fact table that did not satisfy integrity
120160

121161
```python
@@ -173,7 +213,8 @@ broken_sk_df = fact_dim_broken_relationship(
173213
>> +----+-------+------------------------------------------------------------------------------------+
174214
```
175215

176-
## Get latest records
216+
### Get latest records
217+
177218
with optional primary key conflict resolution, where there are multiple records being candidate for latest record, but they have different attribuets and there is no way of determining which one is the latest.
178219

179220
```python
@@ -236,7 +277,8 @@ conflict_get_latest_df.show(truncate=True)
236277

237278
```
238279

239-
## Find composite primary key candidates
280+
### Find composite primary key candidates
281+
240282
Given list of possible columns, constructs the lists of all possible combinations of composite primary keys, and executes concurrently to determine if given set of columns is a valid primary key. Uses minimum possible amount of queries against spark by skipping validation paths that are based on already validated primary key combinations.
241283

242284
```python
@@ -257,7 +299,8 @@ bdq.validate_primary_key_candidate_combinations(df, all_combinations, max_worker
257299
>> [('id',), ('type', 'reminder')]
258300
```
259301

260-
## Execute DAG having nodes as python functions
302+
### Execute DAG having nodes as python functions
303+
261304
```python
262305
import bdq
263306
import time
@@ -350,36 +393,38 @@ for node in graph.nodes:
350393
>> Node(<function i at 0x7f50f4b53280>: {'state': 'SKIPPED', 'result': None, 'exception': None, 'completed': False} )
351394
```
352395

353-
## Execute spark pipeline of multiple steps
396+
### Execute spark pipeline of multiple steps
397+
354398
using DAG component to handle parallel execution
399+
355400
```python
356401
import bdq
357402
from bdq import spark, table
358403

359-
ppn = bdq.SparkPipeline(spark, "retail")
404+
ppn = bdq.SparkPipeline("sample")
360405

361406
# returns dataframe, and creates spark view 'raw_data_single_source'
362-
@ppn.step()
363-
def raw_data_single_source(p):
407+
@ppn.step_spark_temp_view()
408+
def raw_data_single_source(step):
364409
return spark.range(1, 10)
365410

366411
# returns dataframe, and creates spark view 'raw_nice_name'
367-
@ppn.step(returns=["raw_nice_name"])
368-
def raw_data_single_source_with_custom_name(p):
412+
@ppn.step_spark_temp_view(outputs="raw_nice_name")
413+
def raw_data_single_source_with_custom_name(step):
369414
return spark.range(100, 110)
370415

371416
# returns two dataframes, and creates two spark views 'raw_data1', 'raw_data2'
372-
@ppn.step(returns=["raw_data1", "raw_data2"])
373-
def raw_data_multi_source(p):
417+
@ppn.step_spark_temp_view(outputs=["raw_data1", "raw_data2"])
418+
def raw_data_multi_source(step):
374419
df1 = spark.range(1000, 2000)
375420
df2 = spark.range(2000, 3000)
376421

377422
return [df1, df2]
378423

379424
# waits for raw data sources to finish, and combines the data into one unioned view `combine_data`
380-
# note that dependencies are python functions, not names of views (TODO: to handle view names as well)
381-
@ppn.step(depends_on=[raw_data_single_source, raw_data_single_source_with_custom_name, raw_data_multi_source])
382-
def combine_data(p):
425+
# note that dependencies are python functions or names of outputs
426+
@ppn.step_spark_temp_view(depends_on=[raw_data_single_source, raw_data_single_source_with_custom_name, 'raw_data1', 'raw_data2'])
427+
def combine_data(step):
383428
df = table('raw_data_single_source') \
384429
.union(table('raw_nice_name')) \
385430
.union(table('raw_data1')) \
@@ -388,16 +433,13 @@ def combine_data(p):
388433
return df
389434

390435
# splits the combined_data into 'odd' and 'even' views
391-
@ppn.step(depends_on=[combine_data], returns=['odd', 'even'])
392-
def split_data(p):
436+
@ppn.step_spark_temp_view(depends_on=combine_data, outputs=['odd', 'even'])
437+
def split_data(step):
393438
df_odd = table('combine_data').filter('id % 2 == 1')
394439
df_even = table('combine_data').filter('id % 2 == 0')
395440

396441
return [ df_odd, df_even ]
397442

398-
# if you have ipydagred3 installed, you can see in real time state changes of each of the steps
399-
display(ppn.visualize())
400-
401443
# executes pipeline using concurrent threads, one per each step, following the dependency DAG
402444
# pipeline is a normal python callable object, as if it was a function, it returns a list of all steps
403445
pipeline_results = ppn(max_concurrent_steps=10)
@@ -411,6 +453,21 @@ print(table('even').limit(10).collect())
411453
print('odd numbers:')
412454
print(table('odd').limit(10).collect())
413455

456+
#get skipped steps
457+
assert list(ppn.skipped_steps) == []
458+
459+
#get errored steps (you would need to 'adjust' code of on of the steps to make it fail to see something here)
460+
assert list(ppn.error_steps) == []
461+
462+
#get successfull steps
463+
assert set(ppn.success_steps.values()) == set([
464+
raw_data_single_source_with_custom_name,
465+
raw_data_multi_source,
466+
split_data,
467+
combine_data,
468+
raw_data_single_source
469+
])
470+
414471
>> Waiting for all tasks to finish...
415472
>> starting: Node(raw_data_single_source: {'state': 'RUNNING', 'result': None, 'exception': None, 'completed': False} )
416473
>> starting: Node(raw_data_single_source_with_custom_name: {'state': 'RUNNING', 'result': None, 'exception': None, 'completed': False} )
@@ -432,18 +489,20 @@ print(table('odd').limit(10).collect())
432489
```
433490

434491
pipeline steps are rerunable as any ordinary function:
492+
435493
```python
436494
# to rerun given step, just execute it as if it was a pure function
437495
# return is alaways a list of dataframs that given @ppn.step returns
438496
# note: the spark view 'raw_data_single_source' will be updated when this function finishes (as per defintion in @ppn.step above)
439497
raw_data_single_source()
440498
```
441499

442-
## Spark UI Stage descriptions
500+
### Spark UI Stage descriptions
501+
443502
When running code using pyspark, spark ui gets very crowded. `SparkUILogger` context manager and decorator assings human readable names to spark stages.
444503

445504
```python
446-
from bdq
505+
from bdq import SparkUILogger
447506

448507
# usage of decorators
449508
# spark ui stages will have description of 'xyz'

bdq/__init__.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
__version__ = "0.0.5"
1+
__version__ = "0.1.0"
22

33
from pyspark.sql import SparkSession
4-
spark = SparkSession.builder.getOrCreate()
4+
spark:SparkSession = SparkSession.builder.getOrCreate()
55
sc = spark.sparkContext
66
table = spark.table
77
sql = spark.sql
@@ -12,6 +12,9 @@
1212
from .dag import DAG
1313
from .schema import *
1414
from .dataframe import *
15-
from .spark_pipeline import SparkPipeline
15+
from .statestore import *
1616
from .spark_ui_logger import SparkUILogger
17+
from .spark_pipeline import *
18+
19+
1720

0 commit comments

Comments
 (0)