Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
6cd9f02
move criticiality of rule inro _validate_attributes
cornzyblack Jul 17, 2025
98371b7
since criticality is validated after creation, filter by criticality …
cornzyblack Jul 17, 2025
1a58e16
Merge branch 'main' of github.com:cornzyblack/dqx
cornzyblack Jul 18, 2025
98803bc
Merge branch 'main' of github.com:cornzyblack/dqx
cornzyblack Jul 23, 2025
acf3767
Merge branch 'main' of github.com:cornzyblack/dqx
cornzyblack Jul 24, 2025
0766f2b
Merge branch 'main' of github.com:cornzyblack/dqx
cornzyblack Aug 2, 2025
3d0fd34
Merge branch 'main' of github.com:cornzyblack/dqx
cornzyblack Aug 7, 2025
e5712fc
Merge branch 'main' of github.com:cornzyblack/dqx
cornzyblack Aug 7, 2025
9bf6d98
Merge branch 'main' of github.com:cornzyblack/dqx
cornzyblack Aug 13, 2025
1e4d783
Merge branch 'main' of github.com:cornzyblack/dqx
cornzyblack Sep 1, 2025
fcdb1ce
Merge branch 'main' of github.com:cornzyblack/dqx
cornzyblack Sep 8, 2025
2393404
Merge branch 'main' of github.com:cornzyblack/dqx
cornzyblack Sep 16, 2025
eddc874
Merge branch 'main' of github.com:cornzyblack/dqx
cornzyblack Sep 19, 2025
c378b6d
Merge branch 'main' of github.com:cornzyblack/dqx
cornzyblack Sep 25, 2025
cb6f9ef
Merge branch 'main' of github.com:cornzyblack/dqx
cornzyblack Oct 15, 2025
82c7a22
feat: add check for valid json
cornzyblack Oct 15, 2025
f1ec4af
feat: add checks for is_valid_json
cornzyblack Oct 15, 2025
dfa9649
feat: add is_valid_json
cornzyblack Oct 15, 2025
89f2811
feat: add has_json_keys
cornzyblack Oct 15, 2025
02466c1
refactor: change logic
cornzyblack Oct 15, 2025
ccb6e05
refactor: invert
cornzyblack Oct 15, 2025
156a9c2
refactor: negate
cornzyblack Oct 15, 2025
8d30ff6
refactor: update
cornzyblack Oct 15, 2025
1873d72
refactor: update
cornzyblack Oct 15, 2025
5109c27
refactor: update
cornzyblack Oct 15, 2025
ceecf7d
refactor: updates
cornzyblack Oct 15, 2025
0c94089
refactor: change and update
cornzyblack Oct 15, 2025
246833b
Merge branch 'main' into feat-add-json-validation-checks
mwojtyczka Oct 16, 2025
05365e0
refactor: fix docs
cornzyblack Oct 16, 2025
7be64e6
refactor: updates
cornzyblack Oct 16, 2025
c7d8406
refactor: update logic
cornzyblack Oct 16, 2025
66cbb13
refactor: explcit True
cornzyblack Oct 16, 2025
70e19bd
refactor: remove repetition
cornzyblack Oct 16, 2025
a168d64
refactor: remove as it depends on spark
cornzyblack Oct 16, 2025
c3c23e7
feat: add perf test for 2 tests (remaining 1)
cornzyblack Oct 17, 2025
984bbb8
Merge branch 'main' into feat-add-json-validation-checks
mwojtyczka Oct 18, 2025
3e63312
refactor: switch back to has_json_schema
cornzyblack Oct 21, 2025
9ed893a
Merge branch 'feat-add-json-validation-checks' of github.com:cornzybl…
cornzyblack Oct 21, 2025
a72bdb1
docs: document properly that function only checks outside keys
cornzyblack Oct 21, 2025
b8505e4
refactor: comment out to test
cornzyblack Oct 21, 2025
a177c01
refactor: try using transform for strict comparison
cornzyblack Oct 21, 2025
e0c3438
feat: implement changes
cornzyblack Oct 21, 2025
853c8c0
Merge branch 'main' into feat-add-json-validation-checks
cornzyblack Oct 22, 2025
3b0fd52
format and add tests
cornzyblack Oct 22, 2025
44881fe
Merge branch 'feat-add-json-validation-checks' of github.com:cornzybl…
cornzyblack Oct 22, 2025
7b19d00
refactor: add to markdown
cornzyblack Oct 22, 2025
96cbc8e
updates
cornzyblack Oct 22, 2025
0ff6ccb
Merge branch 'main' into feat-add-json-validation-checks
mwojtyczka Oct 31, 2025
ebc9527
Merge branch 'main' of github.com:cornzyblack/dqx into feat-add-json-…
cornzyblack Nov 9, 2025
0be72ad
Merge branch 'main' into feat-add-json-validation-checks
mwojtyczka Nov 11, 2025
fc7bd7a
refactor: add missing type in schema
cornzyblack Nov 12, 2025
f2dbbcb
Merge branch 'main' of github.com:cornzyblack/dqx into feat-add-json-…
cornzyblack Nov 13, 2025
a10ab22
refactor: update tests
cornzyblack Nov 13, 2025
b753071
feat: add has_valid_json_schema to perf
cornzyblack Nov 13, 2025
53a8f51
refactor: modify schema and dataframe
cornzyblack Nov 13, 2025
560ffd0
refactor: add note that this is not strict validation
cornzyblack Nov 13, 2025
ca62317
docs: update docs
cornzyblack Nov 13, 2025
37db749
Merge branch 'main' into feat-add-json-validation-checks
mwojtyczka Nov 24, 2025
37d7803
chore: change to uppercase
cornzyblack Nov 25, 2025
465b7c1
Merge branch 'main' into feat-add-json-validation-checks
ghanse Dec 1, 2025
7ebbe2f
Merge branch 'main' into feat-add-json-validation-checks
ghanse Dec 1, 2025
9da02ec
Merge branch 'main' into feat-add-json-validation-checks
ghanse Dec 2, 2025
805e324
Merge branch 'main' into feat-add-json-validation-checks
mwojtyczka Dec 7, 2025
7225e6c
Apply suggestion from @Copilot
mwojtyczka Dec 7, 2025
4f56bb3
Apply suggestion from @Copilot
mwojtyczka Dec 7, 2025
072c1d9
Merge branch 'main' into feat-add-json-validation-checks
mwojtyczka Dec 7, 2025
2625797
Apply suggestion from @mwojtyczka
mwojtyczka Dec 7, 2025
640e883
Apply suggestion from @mwojtyczka
mwojtyczka Dec 7, 2025
afe2773
Apply suggestion from @Copilot
mwojtyczka Dec 7, 2025
946f271
Apply suggestion from @Copilot
mwojtyczka Dec 7, 2025
e140d51
Merge branch 'main' into feat-add-json-validation-checks
mwojtyczka Dec 8, 2025
75925e0
feat: add recursion limit of nested json to 15
cornzyblack Dec 8, 2025
e9ff2ae
Merge branch 'main' into feat-add-json-validation-checks
mwojtyczka Dec 9, 2025
a594015
fix tests, fmt
mwojtyczka Dec 10, 2025
21ad4e9
Merge branch 'main' into feat-add-json-validation-checks
mwojtyczka Dec 11, 2025
1a60d63
fmt
mwojtyczka Dec 11, 2025
202cd6f
Merge branch 'main' into feat-add-json-validation-checks
mwojtyczka Dec 12, 2025
914ac9f
docs: add comment explaining
cornzyblack Dec 12, 2025
3cb2dc4
refactor: add case of null in value of key
cornzyblack Dec 12, 2025
1dec651
allow nullable
cornzyblack Dec 12, 2025
2890c1f
Update src/databricks/labs/dqx/check_funcs.py
cornzyblack Dec 12, 2025
d1df4ee
updates
cornzyblack Dec 12, 2025
b2ff61a
Merge branch 'feat-add-json-validation-checks' of github.com:cornzybl…
cornzyblack Dec 12, 2025
84758b9
chore: fix fmt and build
cornzyblack Dec 12, 2025
8c416ee
refactor: make user specify null in json string
cornzyblack Dec 12, 2025
4048daf
Merge branch 'main' into feat-add-json-validation-checks
mwojtyczka Dec 12, 2025
aebc28d
updated docs
mwojtyczka Dec 12, 2025
1abdd85
chore: fmt and add docstring
cornzyblack Dec 12, 2025
bfa0d46
Merge branch 'feat-add-json-validation-checks' of github.com:cornzybl…
cornzyblack Dec 12, 2025
892f540
chore: add docs and fmt
cornzyblack Dec 12, 2025
382259b
chore: _get_normalized_column_and_expr is no longer private, fix it
cornzyblack Dec 12, 2025
1a5ad47
docs: some docs update
cornzyblack Dec 12, 2025
7c5f1f9
chore: docs update
cornzyblack Dec 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 100 additions & 30 deletions docs/dqx/docs/reference/quality_checks.mdx

Large diffs are not rendered by default.

193 changes: 193 additions & 0 deletions src/databricks/labs/dqx/check_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1966,6 +1966,163 @@ def apply(df: DataFrame, spark: SparkSession, ref_dfs: dict[str, DataFrame]) ->
return condition, apply


@register_rule("row")
def is_valid_json(column: str | Column) -> Column:
"""
Checks whether the values in the input column are valid JSON strings.

Args:
column: Column name (str) or Column expression to check for valid JSON.

Returns:
A Spark Column representing the condition for invalid JSON strings.
"""
col_str_norm, col_expr_str, col_expr = get_normalized_column_and_expr(column)
return make_condition(
~F.when(col_expr.isNotNull(), F.try_parse_json(col_expr_str).isNotNull()),
F.concat_ws(
"",
F.lit("Value '"),
col_expr.cast("string"),
F.lit(f"' in Column '{col_expr_str}' is not a valid JSON string"),
),
f"{col_str_norm}_is_not_valid_json",
)


@register_rule("row")
def has_json_keys(column: str | Column, keys: list[str], require_all: bool = True) -> Column:
"""
Checks whether the values in the input column contain specific keys in the outermost JSON object.

Args:
column: The name of the column or the column expression to check for JSON keys.
keys: A list of JSON keys to verify within the outermost JSON object.
require_all: If True, all specified keys must be present. If False, at least one key must be present.

Returns:
A Spark Column representing the condition for missing JSON keys.
"""
if not keys:
raise InvalidParameterError("The 'keys' parameter must be a non-empty list of strings.")
if any(not isinstance(k, str) for k in keys):
raise InvalidParameterError("All keys must be of type string.")

col_str_norm, col_expr_str, col_expr = get_normalized_column_and_expr(column)
json_keys_array = F.json_object_keys(col_expr)
required_keys = F.array_distinct(F.array(*[F.lit(k) for k in keys]))

json_validation_error = is_valid_json(col_expr_str)
is_invalid_json = json_validation_error.isNotNull()

has_json_keys_msg = F.concat_ws(
"",
F.lit("Value '"),
F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")),
F.lit(f"' in Column '{col_expr_str}' is missing keys in the list: ["),
F.concat_ws(", ", F.lit(keys)),
F.lit("]"),
)
message = F.when(is_invalid_json, json_validation_error).otherwise(has_json_keys_msg)

if require_all:
missing = F.array_except(required_keys, json_keys_array)
condition_when_valid = F.size(missing) == 0
else:
condition_when_valid = F.arrays_overlap(json_keys_array, required_keys)

condition = F.when(~is_invalid_json, condition_when_valid).otherwise(F.lit(False))

return make_condition(
~condition,
message,
f"{col_str_norm}_does_not_have_json_keys",
)


@register_rule("row")
def has_valid_json_schema(column: str | Column, schema: str | types.StructType) -> Column:
"""
Validates that JSON strings in the specified column conform to an expected schema.

The validation utilizes standard Spark JSON parsing rules, specifically:

* **Type Coercion is Permitted:** Values that can be successfully cast to the target schema type
(e.g., a JSON number like `0.12` parsing into a field defined as `STRING`) are considered valid.
* **Extra Fields are Ignored:** Fields present in the JSON but missing from the schema are ignored.
* **Missing keys imply null:** If a key is missing from the JSON object, Spark treats it as a `null` value.
* **Strictness:** If a schema field is defined as `NOT NULL`, validation will fail if the key is missing (implicit null) or explicitly set to `null`.
* **Nested JSON behavior:** If a nullable parent field is explicitly `null` (e.g., `{"parent": null}`), its children are **not** validated.
However, if the parent exists (e.g., `{"parent": {}}`) but a required child is missing, validation fails.
* **Nested Depth Limit:** The validation logic supports a maximum nested depth of 10 levels.

Args:
column: Column name or Column expression containing JSON strings.
schema: Expected schema as a DDL string (e.g., "struct<id:string NOT NULL>", "id INT, name STRING")
or a generic StructType. To enforce strict presence of a field, you must explicitly set it to `nullable=False`
or use `NOT NULL` in the DDL string.

Returns:
A string Column containing the error message if the JSON does not conform to the schema,
or `null` if validation passes.

Raises:
InvalidParameterError: If the schema string is invalid/unparsable, or if the input schema is neither a string nor a StructType.
"""

_expected_schema = _get_schema(schema)
schema_str = _expected_schema.simpleString()
col_str_norm, col_expr_str, col_expr = get_normalized_column_and_expr(column)

json_validation_error = is_valid_json(col_str_norm)

is_invalid_json = json_validation_error.isNotNull()

# Add unique corrupt-record field to isolate parse errors
corrupt_record_name = f"{uuid.uuid4().hex[:8]}_dqx_corrupt_record"

extended_schema = types.StructType(
_expected_schema.fields + [types.StructField(corrupt_record_name, types.StringType(), True)]
)

# Attempt to parse JSON using the extended schema
parsed_struct = F.from_json(
col_expr,
extended_schema,
options={"columnNameOfCorruptRecord": corrupt_record_name},
)

# Core conformity: must be valid JSON and not corrupt
is_not_corrupt = parsed_struct[corrupt_record_name].isNull()
base_conformity = ~is_invalid_json & is_not_corrupt

# Field presence checks (non-null + exists)
field_presence_checks = _generate_field_presence_checks(_expected_schema, parsed_struct)
has_missing_or_null_fields = F.array_contains(
F.array(*[F.coalesce(expr, F.lit(False)) for expr in field_presence_checks]),
False,
)

is_conforming = base_conformity & ~has_missing_or_null_fields
condition = is_conforming | col_expr.isNull()

error_msg = F.concat_ws(
"",
F.lit("Value '"),
F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")),
F.lit(f"' in Column '{col_expr_str}' does not conform to expected JSON schema: "),
F.lit(schema_str),
)

final_error_msg = F.when(is_invalid_json, json_validation_error).otherwise(error_msg)

return make_condition(
~condition,
final_error_msg,
f"{col_str_norm}_has_invalid_json_schema",
)


def _get_schema(input_schema: str | types.StructType, columns: list[str] | None = None) -> types.StructType:
"""
Normalize the input schema into a Spark StructType schema.
Expand Down Expand Up @@ -2191,6 +2348,42 @@ def _is_compatible_atomic_type(actual_type: types.AtomicType, expected_type: typ
return False


def _generate_field_presence_checks(
expected_schema: types.StructType, parsed_struct_col: Column, max_depth: int = 10, current_depth: int = 0
) -> list[Column]:
"""
Recursively generate Spark Column expressions that verify each field defined in the expected
schema is present and non-null within a parsed struct column.

Args:
expected_schema: The StructType defining the expected JSON schema.
parsed_struct_col: The parsed struct column (e.g., from from_json) to validate.
max_depth: Maximum recursion depth to prevent excessive nesting. Default is 10.
current_depth: Current recursion depth.

Returns:
A list of Column expressions, one per field in the expected schema, that evaluate to True
if the corresponding field is non-null.
"""
if current_depth > max_depth:
return []

validations = []
for field in expected_schema.fields:
field_ref = parsed_struct_col[field.name]
if not field.nullable:
validations.append(field_ref.isNotNull())
if isinstance(field.dataType, types.StructType):
child_checks = _generate_field_presence_checks(
field.dataType, field_ref, max_depth=max_depth, current_depth=current_depth + 1
)
if field.nullable:
child_checks = [(field_ref.isNull() | check) for check in child_checks]
validations.extend(child_checks)

return validations


def _match_rows(
df: DataFrame,
ref_df: DataFrame,
Expand Down
Loading