diff --git a/docs/dqx/docs/reference/quality_checks.mdx b/docs/dqx/docs/reference/quality_checks.mdx index b06a6460d..a14c7ffc8 100644 --- a/docs/dqx/docs/reference/quality_checks.mdx +++ b/docs/dqx/docs/reference/quality_checks.mdx @@ -41,6 +41,9 @@ You can also define your own custom checks in Python (see [Creating custom check | `is_not_greater_than` | Checks whether the values in the input column are not greater than the provided limit. | `column`: column to check (can be a string column name or a column expression); `limit`: limit as number, date, timestamp, column name or sql expression | | `is_valid_date` | Checks whether the values in the input column have valid date formats. | `column`: column to check (can be a string column name or a column expression); `date_format`: optional date format (e.g. 'yyyy-mm-dd') | | `is_valid_timestamp` | Checks whether the values in the input column have valid timestamp formats. | `column`: column to check (can be a string column name or a column expression); `timestamp_format`: optional timestamp format (e.g. 'yyyy-mm-dd HH:mm:ss') | +| `is_valid_json` | Checks whether the values in the input column are valid JSON strings. | `column`: column to check (can be a string column name or a column expression) | +| `has_json_keys` | Checks whether the values in the input column contain specific keys in the outermost JSON object. | `column`: column to check (can be a string column name or a column expression); `keys`: A list of JSON keys to verify within the outermost JSON object; `require_all`: optional boolean flag to require all keys to be present | +| `has_valid_json_schema` | Checks whether the values in the specified column, which contain JSON strings, conform to the expected schema. This check allows extra fields and type coercion. Missing keys are treated as null; to fail validation on missing keys, you must explicitly use NOT NULL in the schema definition. | `column`: column to check (can be a string column name or a column expression); schema: the schema as a DDL string (e.g., "id INT NOT NULL, name STRING") or `StructType` | | `is_not_in_future` | Checks whether the values in the input column contain a timestamp that is not in the future, where 'future' is defined as current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_not_in_near_future` | Checks whether the values in the input column contain a timestamp that is not in the near future, where 'near future' is defined as greater than the current timestamp but less than the current_timestamp + offset (in seconds). | `column`: column to check (can be a string column name or a column expression); `offset`: offset to use; `curr_timestamp`: current timestamp, if not provided current_timestamp() function is used | | `is_older_than_n_days` | Checks whether the values in one input column are at least N days older than the values in another column. | `column`: column to check (can be a string column name or a column expression); `days`: number of days; `curr_date`: current date, if not provided current_date() function is used; `negate`: if the condition should be negated | @@ -382,6 +385,41 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen column2: col6 days: 2 +# is_valid_json check +- criticality: error + check: + function: is_valid_json + arguments: + column: col_json_str + +# has_json_keys check +- criticality: error + check: + function: has_json_keys + arguments: + column: col_json_str + keys: + - key1 + +- criticality: error + name: col_json_str_does_not_have_json_keys2 + check: + function: has_json_keys + arguments: + column: col_json_str + keys: + - key1 + - key2 + require_all: False + +- criticality: error + name: col_json_str2_has_invalid_json_schema + check: + function: has_valid_json_schema + arguments: + column: col_json_str2 + schema: "STRUCT" + # regex_match check - criticality: error check: @@ -543,42 +581,42 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen function: is_linestring arguments: column: linestring_geom - + # is_polygon check - criticality: error check: function: is_polygon arguments: column: polygon_geom - + # is_multipoint check - criticality: error check: function: is_multipoint arguments: column: multipoint_geom - + # is_multilinestring check - criticality: error check: function: is_multilinestring arguments: column: multilinestring_geom - + # is_multipolygon check - criticality: error check: function: is_multipolygon arguments: column: multipolygon_geom - + # is_geometrycollection check - criticality: error check: function: is_geometrycollection arguments: column: geometrycollection_geom - + # is_ogc_valid check - criticality: error check: @@ -607,7 +645,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen arguments: column: polygon_geom dimension: 2 - + # has_x_coordinate_between check - criticality: error check: @@ -616,7 +654,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen column: polygon_geom min_value: 0.0 max_value: 10.0 - + # has_y_coordinate_between check - criticality: error check: @@ -625,7 +663,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen column: polygon_geom min_value: 0.0 max_value: 10.0 - + # is_area_not_less_than check (geometry) - criticality: error check: @@ -633,7 +671,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen arguments: column: polygon_geom value: 100.0 - + # is_area_not_less_than check (geography with geodesic area) - criticality: error check: @@ -642,7 +680,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen column: geography_geom value: 1000000.0 # 1 million square meters geodesic: true - + # is_area_not_greater_than check (geometry with SRID) - criticality: error check: @@ -659,7 +697,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen arguments: column: polygon_geom value: 1.0 - + # is_area_not_equal_to check - criticality: error check: @@ -667,7 +705,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen arguments: column: polygon_geom value: 0.0 - + # is_num_points_not_less_than check - criticality: error check: @@ -675,7 +713,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen arguments: column: polygon_geom value: 10 - + # is_num_points_not_greater_than check - criticality: error check: @@ -683,7 +721,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen arguments: column: polygon_geom value: 3 - + # is_num_points_equal_to check - criticality: error check: @@ -691,7 +729,7 @@ For brevity, the `name` field in the examples is omitted and it will be auto-gen arguments: column: polygon_geom value: 4 - + # is_num_points_not_equal_to check - criticality: error check: @@ -1002,6 +1040,38 @@ checks = [ check_func_kwargs={"column1": "col5", "column2": "col6", "days": 2} # or as expr: F.col("col5"), F.col("col6") ), + # is_valid_json check + DQRowRule( + criticality="error", + check_func=check_funcs.is_valid_json, + column="col_json_str" + ), + + # has_json_keys check + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", # or as expr: F.col("col_json_str") + check_func_kwargs={"keys": ["key1"]}, + name="col_json_str_has_json_keys" + ), + + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", # or as expr: F.col("col_json_str") + check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, + name="col_json_str_has_json_keys" + ), + + DQRowRule( + criticality="error", + check_func=check_funcs.has_valid_json_schema, + column="col_json_str2", # or as expr: F.col("col_json_str") + check_func_kwargs={"schema": "STRUCT"}, + name="col_json_str2_has_valid_json_schema" + ), + # regex_match check DQRowRule( criticality="error", @@ -1108,7 +1178,7 @@ checks = [ check_func=geo_check_funcs.is_multilinestring, column="multilinestring_geom" ), - + # is_multipolygon check DQRowRule( criticality="error", @@ -1143,7 +1213,7 @@ checks = [ check_func=geo_check_funcs.is_not_null_island, column="point_geom" ), - + # has_dimension check DQRowRule( criticality="error", @@ -1190,8 +1260,8 @@ checks = [ check_func=geo_check_funcs.is_area_not_greater_than, column="polygon_geom", check_func_kwargs={"value": 0.1, "srid": 3857} - ), - + ), + # is_area_equal_to check DQRowRule( criticality="error", @@ -2339,7 +2409,7 @@ checks = [ "strict": True, }, ), - + # has_no_outliers check DQDatasetRule( criticality="error", @@ -2504,7 +2574,7 @@ Using `count_distinct` with `group_by` performs multiple expensive operations. F #### Detecting Statistical Anomalies -Descriptive statistics are often used for process control, forecasting, and regression analysis. Variation from expected values could signal that a process has changed or that a model's features have drifted. Monitor dataset statistics (e.g. `avg`, `stddev`) to detect significant variations. +Descriptive statistics are often used for process control, forecasting, and regression analysis. Variation from expected values could signal that a process has changed or that a model's features have drifted. Monitor dataset statistics (e.g. `avg`, `stddev`) to detect significant variations. ```yaml # Detect unusually high temperature variance per machine @@ -3427,7 +3497,7 @@ The PII detection extras include a built-in `does_not_contain_pii` check that us function: does_not_contain_pii arguments: column: description - + # PII detection check with custom threshold and named entities - criticality: error check: @@ -3444,7 +3514,7 @@ The PII detection extras include a built-in `does_not_contain_pii` check that us ```python from databricks.labs.dqx.rule import DQRowRule from databricks.labs.dqx.pii.pii_detection_funcs import does_not_contain_pii - + checks = [ # Basic PII detection check DQRowRule( @@ -3462,7 +3532,7 @@ The PII detection extras include a built-in `does_not_contain_pii` check that us check_func_kwargs={"threshold": 0.8, "entities": ["PERSON", "EMAIL_ADDRESS"]} ), ] - ``` + ``` @@ -3499,7 +3569,7 @@ These can be loaded using `NLPEngineConfig`: from databricks.labs.dqx.rule import DQRowRule from databricks.labs.dqx.pii.pii_detection_funcs import does_not_contain_pii from databricks.labs.dqx.pii.nlp_engine_config import NLPEngineConfig - + checks = [ # PII detection check using spacy as a named entity recognizer DQRowRule( @@ -3508,7 +3578,7 @@ These can be loaded using `NLPEngineConfig`: column="description", check_func=does_not_contain_pii, check_func_kwargs={"nlp_engine_config": NLPEngineConfig.SPACY_MEDIUM} - ), + ), ] ``` @@ -3528,7 +3598,7 @@ Using custom models for named-entity recognition may require you to install thes from databricks.labs.dqx.rule import DQRowRule from databricks.labs.dqx.engine import DQEngine from databricks.sdk import WorkspaceClient - + nlp_engine_config = { 'nlp_engine_name': 'transformers_stanford_deidentifier_base', 'models': [ @@ -3571,9 +3641,9 @@ Using custom models for named-entity recognition may require you to install thes column="description", check_func=does_not_contain_pii, check_func_kwargs={"nlp_engine_config": nlp_engine_config}, - ), + ), ] - + dq_engine = DQEngine(WorkspaceClient()) df = spark.read.table("main.default.table") valid_df, quarantine_df = dq_engine.apply_checks_and_split(df, checks) diff --git a/src/databricks/labs/dqx/check_funcs.py b/src/databricks/labs/dqx/check_funcs.py index b6b6fc382..fcf9b6a98 100644 --- a/src/databricks/labs/dqx/check_funcs.py +++ b/src/databricks/labs/dqx/check_funcs.py @@ -1966,6 +1966,163 @@ def apply(df: DataFrame, spark: SparkSession, ref_dfs: dict[str, DataFrame]) -> return condition, apply +@register_rule("row") +def is_valid_json(column: str | Column) -> Column: + """ + Checks whether the values in the input column are valid JSON strings. + + Args: + column: Column name (str) or Column expression to check for valid JSON. + + Returns: + A Spark Column representing the condition for invalid JSON strings. + """ + col_str_norm, col_expr_str, col_expr = get_normalized_column_and_expr(column) + return make_condition( + ~F.when(col_expr.isNotNull(), F.try_parse_json(col_expr_str).isNotNull()), + F.concat_ws( + "", + F.lit("Value '"), + col_expr.cast("string"), + F.lit(f"' in Column '{col_expr_str}' is not a valid JSON string"), + ), + f"{col_str_norm}_is_not_valid_json", + ) + + +@register_rule("row") +def has_json_keys(column: str | Column, keys: list[str], require_all: bool = True) -> Column: + """ + Checks whether the values in the input column contain specific keys in the outermost JSON object. + + Args: + column: The name of the column or the column expression to check for JSON keys. + keys: A list of JSON keys to verify within the outermost JSON object. + require_all: If True, all specified keys must be present. If False, at least one key must be present. + + Returns: + A Spark Column representing the condition for missing JSON keys. + """ + if not keys: + raise InvalidParameterError("The 'keys' parameter must be a non-empty list of strings.") + if any(not isinstance(k, str) for k in keys): + raise InvalidParameterError("All keys must be of type string.") + + col_str_norm, col_expr_str, col_expr = get_normalized_column_and_expr(column) + json_keys_array = F.json_object_keys(col_expr) + required_keys = F.array_distinct(F.array(*[F.lit(k) for k in keys])) + + json_validation_error = is_valid_json(col_expr_str) + is_invalid_json = json_validation_error.isNotNull() + + has_json_keys_msg = F.concat_ws( + "", + F.lit("Value '"), + F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")), + F.lit(f"' in Column '{col_expr_str}' is missing keys in the list: ["), + F.concat_ws(", ", F.lit(keys)), + F.lit("]"), + ) + message = F.when(is_invalid_json, json_validation_error).otherwise(has_json_keys_msg) + + if require_all: + missing = F.array_except(required_keys, json_keys_array) + condition_when_valid = F.size(missing) == 0 + else: + condition_when_valid = F.arrays_overlap(json_keys_array, required_keys) + + condition = F.when(~is_invalid_json, condition_when_valid).otherwise(F.lit(False)) + + return make_condition( + ~condition, + message, + f"{col_str_norm}_does_not_have_json_keys", + ) + + +@register_rule("row") +def has_valid_json_schema(column: str | Column, schema: str | types.StructType) -> Column: + """ + Validates that JSON strings in the specified column conform to an expected schema. + + The validation utilizes standard Spark JSON parsing rules, specifically: + + * **Type Coercion is Permitted:** Values that can be successfully cast to the target schema type + (e.g., a JSON number like `0.12` parsing into a field defined as `STRING`) are considered valid. + * **Extra Fields are Ignored:** Fields present in the JSON but missing from the schema are ignored. + * **Missing keys imply null:** If a key is missing from the JSON object, Spark treats it as a `null` value. + * **Strictness:** If a schema field is defined as `NOT NULL`, validation will fail if the key is missing (implicit null) or explicitly set to `null`. + * **Nested JSON behavior:** If a nullable parent field is explicitly `null` (e.g., `{"parent": null}`), its children are **not** validated. + However, if the parent exists (e.g., `{"parent": {}}`) but a required child is missing, validation fails. + * **Nested Depth Limit:** The validation logic supports a maximum nested depth of 10 levels. + + Args: + column: Column name or Column expression containing JSON strings. + schema: Expected schema as a DDL string (e.g., "struct", "id INT, name STRING") + or a generic StructType. To enforce strict presence of a field, you must explicitly set it to `nullable=False` + or use `NOT NULL` in the DDL string. + + Returns: + A string Column containing the error message if the JSON does not conform to the schema, + or `null` if validation passes. + + Raises: + InvalidParameterError: If the schema string is invalid/unparsable, or if the input schema is neither a string nor a StructType. + """ + + _expected_schema = _get_schema(schema) + schema_str = _expected_schema.simpleString() + col_str_norm, col_expr_str, col_expr = get_normalized_column_and_expr(column) + + json_validation_error = is_valid_json(col_str_norm) + + is_invalid_json = json_validation_error.isNotNull() + + # Add unique corrupt-record field to isolate parse errors + corrupt_record_name = f"{uuid.uuid4().hex[:8]}_dqx_corrupt_record" + + extended_schema = types.StructType( + _expected_schema.fields + [types.StructField(corrupt_record_name, types.StringType(), True)] + ) + + # Attempt to parse JSON using the extended schema + parsed_struct = F.from_json( + col_expr, + extended_schema, + options={"columnNameOfCorruptRecord": corrupt_record_name}, + ) + + # Core conformity: must be valid JSON and not corrupt + is_not_corrupt = parsed_struct[corrupt_record_name].isNull() + base_conformity = ~is_invalid_json & is_not_corrupt + + # Field presence checks (non-null + exists) + field_presence_checks = _generate_field_presence_checks(_expected_schema, parsed_struct) + has_missing_or_null_fields = F.array_contains( + F.array(*[F.coalesce(expr, F.lit(False)) for expr in field_presence_checks]), + False, + ) + + is_conforming = base_conformity & ~has_missing_or_null_fields + condition = is_conforming | col_expr.isNull() + + error_msg = F.concat_ws( + "", + F.lit("Value '"), + F.when(col_expr.isNull(), F.lit("null")).otherwise(col_expr.cast("string")), + F.lit(f"' in Column '{col_expr_str}' does not conform to expected JSON schema: "), + F.lit(schema_str), + ) + + final_error_msg = F.when(is_invalid_json, json_validation_error).otherwise(error_msg) + + return make_condition( + ~condition, + final_error_msg, + f"{col_str_norm}_has_invalid_json_schema", + ) + + def _get_schema(input_schema: str | types.StructType, columns: list[str] | None = None) -> types.StructType: """ Normalize the input schema into a Spark StructType schema. @@ -2191,6 +2348,42 @@ def _is_compatible_atomic_type(actual_type: types.AtomicType, expected_type: typ return False +def _generate_field_presence_checks( + expected_schema: types.StructType, parsed_struct_col: Column, max_depth: int = 10, current_depth: int = 0 +) -> list[Column]: + """ + Recursively generate Spark Column expressions that verify each field defined in the expected + schema is present and non-null within a parsed struct column. + + Args: + expected_schema: The StructType defining the expected JSON schema. + parsed_struct_col: The parsed struct column (e.g., from from_json) to validate. + max_depth: Maximum recursion depth to prevent excessive nesting. Default is 10. + current_depth: Current recursion depth. + + Returns: + A list of Column expressions, one per field in the expected schema, that evaluate to True + if the corresponding field is non-null. + """ + if current_depth > max_depth: + return [] + + validations = [] + for field in expected_schema.fields: + field_ref = parsed_struct_col[field.name] + if not field.nullable: + validations.append(field_ref.isNotNull()) + if isinstance(field.dataType, types.StructType): + child_checks = _generate_field_presence_checks( + field.dataType, field_ref, max_depth=max_depth, current_depth=current_depth + 1 + ) + if field.nullable: + child_checks = [(field_ref.isNull() | check) for check in child_checks] + validations.extend(child_checks) + + return validations + + def _match_rows( df: DataFrame, ref_df: DataFrame, diff --git a/tests/integration/test_apply_checks.py b/tests/integration/test_apply_checks.py old mode 100644 new mode 100755 index ab53fb1cf..14f6fa154 --- a/tests/integration/test_apply_checks.py +++ b/tests/integration/test_apply_checks.py @@ -5235,7 +5235,7 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak schema = ( "col1: string, col2: int, col3: int, col4 array, col5: date, col6: timestamp, " "col7: map, col8: struct, col10: int, col11: string, " - "col_ipv4: string, col_ipv6: string" + "col_ipv4: string, col_ipv6: string, col_json_str: string, col_json_str2: string" ) test_df = spark.createDataFrame( [ @@ -5252,6 +5252,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.1", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', ], [ "val2", @@ -5266,6 +5268,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.2", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', ], [ "val3", @@ -5280,6 +5284,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.3", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', ], ], schema, @@ -5318,6 +5324,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.1", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', None, None, ], @@ -5334,6 +5342,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.2", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', None, None, ], @@ -5350,6 +5360,8 @@ def test_apply_checks_all_row_checks_as_yaml_with_streaming(ws, make_schema, mak "val2", "192.168.1.3", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', None, None, ], @@ -5503,7 +5515,7 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): schema = ( "col1: string, col2: int, col3: int, col4 array, col5: date, col6: timestamp, " "col7: map, col8: struct, col10: int, col11: string, " - "col_ipv4: string, col_ipv6: string" + "col_ipv4: string, col_ipv6: string, col_json_str: string, col_json_str2: string" ) test_df = spark.createDataFrame( [ @@ -5520,6 +5532,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.0", "2001:0db8:85a3:08d3:0000:0000:0000:0001", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', ], [ "val2", @@ -5534,6 +5548,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.1", "2001:0db8:85a3:08d3:0000:0000:0000:1", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', ], [ "val3", @@ -5548,6 +5564,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.2", "2001:0db8:85a3:08d3:0000::2", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', ], ], schema, @@ -5574,6 +5592,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.0", "2001:0db8:85a3:08d3:0000:0000:0000:0001", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', None, None, ], @@ -5590,6 +5610,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.1", "2001:0db8:85a3:08d3:0000:0000:0000:1", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', None, None, ], @@ -5606,6 +5628,8 @@ def test_apply_checks_all_checks_as_yaml(ws, spark): "val2", "192.168.1.2", "2001:0db8:85a3:08d3:0000::2", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', None, None, ], @@ -6318,6 +6342,31 @@ def test_apply_checks_all_checks_using_classes(ws, spark): column="col6", check_func_kwargs={"window_minutes": 1, "min_records_per_window": 1, "lookback_windows": 3}, ), + # is_valid_json check + DQRowRule( + criticality="error", + check_func=check_funcs.is_valid_json, + column="col_json_str", + ), + # has_json_keys check + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, + ), + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1"]}, + ), + DQRowRule( + criticality="error", + check_func=check_funcs.has_valid_json_schema, + column="col_json_str2", + check_func_kwargs={"schema": "STRUCT"}, + ), ] dq_engine = DQEngine(ws) @@ -6325,7 +6374,7 @@ def test_apply_checks_all_checks_using_classes(ws, spark): schema = ( "col1: string, col2: int, col3: int, col4 array, col5: date, col6: timestamp, " "col7: map, col8: struct, col10: int, col11: string, " - "col_ipv4: string, col_ipv6: string" + "col_ipv4: string, col_ipv6: string, col_json_str: string, col_json_str2: string" ) test_df = spark.createDataFrame( [ @@ -6342,6 +6391,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.255", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', ], [ "val2", @@ -6356,6 +6407,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.1", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', ], [ "val3", @@ -6370,6 +6423,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.2", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', ], ], schema, @@ -6393,6 +6448,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.255", "2001:0db8:85a3:08d3:1319:8a2e:0370:7344", + '{"key1": "1"}', + '{"a" : 1, "b": 2}', None, None, ], @@ -6409,6 +6466,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.1", "2001:0db8:85a3:08d3:ffff:ffff:ffff:ffff", + '{"key1": "1", "key2": "2"}', + '{ "a" : 1, "b": 1000, "c": {"1": 8}}', None, None, ], @@ -6425,6 +6484,8 @@ def test_apply_checks_all_checks_using_classes(ws, spark): "val2", "255.255.255.2", "2001:db8:85a3:8d3:1319:8a2e:3.112.115.68", + '{"key1": "[1, 2, 3]"}', + '{ "a" : 1, "b": 1023455, "c": null }', None, None, ], diff --git a/tests/integration/test_row_checks.py b/tests/integration/test_row_checks.py index 9b29d7330..941711253 100644 --- a/tests/integration/test_row_checks.py +++ b/tests/integration/test_row_checks.py @@ -25,6 +25,9 @@ is_not_null_and_is_in_list, is_not_null_and_not_empty_array, is_valid_date, + is_valid_json, + has_json_keys, + has_valid_json_schema, is_valid_timestamp, is_valid_ipv4_address, is_ipv4_address_in_cidr, @@ -2877,3 +2880,380 @@ def test_col_is_equal_to(spark, set_utc_timezone): ) assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_is_valid_json(spark): + schema = "a: string, b: string" + test_df = spark.createDataFrame( + [ + ['{"key": "value"}', '{"key": value}'], + ['{"number": 123}', '{"number": 123}'], + ['{"array": [1, 2, 3]}', '{"array": [1, 2, 3}'], + ['Not a JSON string', 'Also not JSON'], + [None, None], + ['123', '"a string"'], + ['true', 'null'], + ['[]', '{}'], + ['{"a": 1,}', '{key: "value"}'], + ['[1, 2,', '{"a": "b"'], + ["{'a': 'b'}", ''], + [' {"a": 1} ', '{"b": 2}\n'], + ], + schema, + ) + + actual = test_df.select(is_valid_json("a"), is_valid_json("b")) + + expected_schema = "a_is_not_valid_json: string, b_is_not_valid_json: string" + + expected = spark.createDataFrame( + [ + [None, "Value '{\"key\": value}' in Column 'b' is not a valid JSON string"], + [None, None], + [None, "Value '{\"array\": [1, 2, 3}' in Column 'b' is not a valid JSON string"], + [ + "Value 'Not a JSON string' in Column 'a' is not a valid JSON string", + "Value 'Also not JSON' in Column 'b' is not a valid JSON string", + ], + [None, None], + [None, None], + [None, None], + [None, None], + [ + "Value '{\"a\": 1,}' in Column 'a' is not a valid JSON string", + "Value '{key: \"value\"}' in Column 'b' is not a valid JSON string", + ], + [ + "Value '[1, 2,' in Column 'a' is not a valid JSON string", + "Value '{\"a\": \"b\"' in Column 'b' is not a valid JSON string", + ], + [ + "Value '{'a': 'b'}' in Column 'a' is not a valid JSON string", + "Value '' in Column 'b' is not a valid JSON string", + ], + [None, None], + ], + expected_schema, + ) + + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_has_json_keys_require_all_true(spark): + schema = "a: string, b: string" + test_df = spark.createDataFrame( + [ + ['{"key": "value", "another_key": 123}', '{"key": "value"}'], + ['{"number": 123}', '{"number": 123, "extra": true}'], + ['{"array": [1, 2, 3]}', '{"array": {1, 2, 3}]'], + ['{"key": "value"}', '{"missing_key": "value"}'], + [None, None], + ['Not a JSON string', '{"key": "value"}'], + ['{"key": "value"}', 'Not a JSON string'], + ['{"key": "value"}', None], + [None, '{"key": "value"}'], + ['{"nested": {"inner_key": "inner_value"}}', '{"nested": {"inner_key": "inner_value"}}'], + ['{"key": null, "another_key": null}', '{"nested": {"key": null}}'], + ], + schema, + ) + + actual = test_df.select( + has_json_keys("a", ["key", "another_key"]), + has_json_keys("b", ["key"]), + ) + + expected_schema = "a_does_not_have_json_keys: string, b_does_not_have_json_keys: string" + + expected = spark.createDataFrame( + [ + [None, None], + [ + "Value '{\"number\": 123}' in Column 'a' is missing keys in the list: [key, another_key]", + "Value '{\"number\": 123, \"extra\": true}' in Column 'b' is missing keys in the list: [key]", + ], + [ + "Value '{\"array\": [1, 2, 3]}' in Column 'a' is missing keys in the list: [key, another_key]", + "Value '{\"array\": {1, 2, 3}]' in Column 'b' is not a valid JSON string", + ], + [ + "Value '{\"key\": \"value\"}' in Column 'a' is missing keys in the list: [key, another_key]", + "Value '{\"missing_key\": \"value\"}' in Column 'b' is missing keys in the list: [key]", + ], + [None, None], + ["Value 'Not a JSON string' in Column 'a' is not a valid JSON string", None], + [ + "Value '{\"key\": \"value\"}' in Column 'a' is missing keys in the list: [key, another_key]", + "Value 'Not a JSON string' in Column 'b' is not a valid JSON string", + ], + ["Value '{\"key\": \"value\"}' in Column 'a' is missing keys in the list: [key, another_key]", None], + [None, None], + [ + "Value '{\"nested\": {\"inner_key\": \"inner_value\"}}' in Column 'a' is missing keys in the list: [key, another_key]", + "Value '{\"nested\": {\"inner_key\": \"inner_value\"}}' in Column 'b' is missing keys in the list: [key]", + ], + [ + None, + "Value '{\"nested\": {\"key\": null}}' in Column 'b' is missing keys in the list: [key]", + ], + ], + expected_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_has_json_keys_require_at_least_one(spark): + schema = "a: string, b: string" + required_keys = ["key", "another_key", "extra_key"] + + test_df = spark.createDataFrame( + [ + ['{"key": 1, "another_key": 2, "extra_key": 3}', '{"key": 1, "another_key": 2, "extra_key": 3}'], + ['{"key": 1}', '{"key": 1}'], + ['{"number": 123}', '{"random_sample": 1523}'], + ['{}', '{}'], + ['{"key": "value"', '{"key": "value"'], + [None, 'Not a JSON string'], + [None, None], + ['{"key": null}', '{"nested": {"key": null}}'], + ], + schema, + ) + + actual = test_df.select( + has_json_keys("a", required_keys, require_all=False), + has_json_keys("b", required_keys, require_all=False), + ) + + expected_schema = "a_does_not_have_json_keys: string, b_does_not_have_json_keys: string" + + expected = spark.createDataFrame( + [ + [None, None], + [None, None], + [ + "Value '{\"number\": 123}' in Column 'a' is missing keys in the list: [key, another_key, extra_key]", + "Value '{\"random_sample\": 1523}' in Column 'b' is missing keys in the list: [key, another_key, extra_key]", + ], + [ + "Value '{}' in Column 'a' is missing keys in the list: [key, another_key, extra_key]", + "Value '{}' in Column 'b' is missing keys in the list: [key, another_key, extra_key]", + ], + [ + "Value '{\"key\": \"value\"' in Column 'a' is not a valid JSON string", + "Value '{\"key\": \"value\"' in Column 'b' is not a valid JSON string", + ], + [None, "Value 'Not a JSON string' in Column 'b' is not a valid JSON string"], + [None, None], + [ + None, + "Value '{\"nested\": {\"key\": null}}' in Column 'b' is missing keys in the list: [key, another_key, extra_key]", + ], + ], + expected_schema, + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_has_valid_json_schema(spark): + schema = "a: string, b: string" + test_df = spark.createDataFrame( + [ + ['{"a": 1, "b": 2}', '{"a": 3, "b": 4}'], + ['{"key": "value", "another_key": 123}', '{"key": "value"}'], + ['{"number": 123}', '{"number": 123, "extra": true}'], + ['{"array": [1, 2, 3]}', '{"array": {1, 2, 3}]'], + ['{"key": "value"}', '{"missing_key": "value"}'], + [None, None], + ['Not a JSON string', '{"key": "value"}'], + ['{"key": "value"}', 'Not a JSON string'], + ['{"key": "value"}', None], + ], + schema, + ) + + json_schema = "STRUCT" + expected_schema = "a_has_invalid_json_schema: string, b_has_invalid_json_schema: string" + expected = spark.createDataFrame( + [ + [None, None], + [ + "Value '{\"key\": \"value\", \"another_key\": 123}' in Column 'a' does not conform to expected JSON schema: struct", + "Value '{\"key\": \"value\"}' in Column 'b' does not conform to expected JSON schema: struct", + ], + [ + "Value '{\"number\": 123}' in Column 'a' does not conform to expected JSON schema: struct", + "Value '{\"number\": 123, \"extra\": true}' in Column 'b' does not conform to expected JSON schema: struct", + ], + [ + "Value '{\"array\": [1, 2, 3]}' in Column 'a' does not conform to expected JSON schema: struct", + "Value '{\"array\": {1, 2, 3}]' in Column 'b' is not a valid JSON string", + ], + [ + "Value '{\"key\": \"value\"}' in Column 'a' does not conform to expected JSON schema: struct", + "Value '{\"missing_key\": \"value\"}' in Column 'b' does not conform to expected JSON schema: struct", + ], + [None, None], + [ + "Value 'Not a JSON string' in Column 'a' is not a valid JSON string", + "Value '{\"key\": \"value\"}' in Column 'b' does not conform to expected JSON schema: struct", + ], + [ + "Value '{\"key\": \"value\"}' in Column 'a' does not conform to expected JSON schema: struct", + "Value 'Not a JSON string' in Column 'b' is not a valid JSON string", + ], + [ + "Value '{\"key\": \"value\"}' in Column 'a' does not conform to expected JSON schema: struct", + None, + ], + ], + expected_schema, + ) + actual = test_df.select( + has_valid_json_schema("a", json_schema), + has_valid_json_schema("b", json_schema), + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_has_valid_json_schema_with_nested_depth_5(spark): + """Test has_valid_json_schema with nested fields of depth 5.""" + schema = "json_data: string" + test_data = [ + ['{"level1": {"level2": {"level3": {"level4": {"level5": "value"}}}}}'], + ['{"level1": {"level2": {"level3": {"level4": {"level5": 0.12}}}}}'], + ['{"level1": {"level2": {"level3": {"level4": {"level5": null}}}}}'], + ['{"level1": {"level2": {"level3": {"level4": {"level5": "0.123"}}}}}'], + ['{"level1": {"level2": {"level3": {"level4": null}}}}'], + ['{"level1": {"level2": {"level3": {"level4": {"level6": "sample"}}}}}'], + [None], + ['{"level1": null}'], + ['Not a JSON string'], + ] + + test_df = spark.createDataFrame(test_data, schema) + + json_schema = "struct>>>>" + expected_schema = "json_data_has_invalid_json_schema: string" + expected = spark.createDataFrame( + [ + [None], + [None], + [ + "Value '{\"level1\": {\"level2\": {\"level3\": {\"level4\": {\"level5\": null}}}}}' in Column 'json_data' does not conform to expected JSON schema: struct>>>>" + ], + [None], + [None], + [ + "Value '{\"level1\": {\"level2\": {\"level3\": {\"level4\": {\"level6\": \"sample\"}}}}}' in Column 'json_data' does not conform to expected JSON schema: struct>>>>", + ], + [None], + [None], + ["Value 'Not a JSON string' in Column 'json_data' is not a valid JSON string"], + ], + expected_schema, + ) + actual = test_df.select( + has_valid_json_schema("json_data", json_schema), + ) + assert_df_equality(actual, expected, ignore_nullable=True) + + +def test_has_valid_json_schema_nullability(spark): + schema = "json_data: string" + json_schema = "id int, name string not null" + + test_df = spark.createDataFrame( + [['{"id": 1, "name": "valid"}'], ['{"id": 1, "name": null}'], ['{"id": 1}'], [None], ["json_data string"]], + schema, + ) + + expected_schema = "json_data_has_invalid_json_schema: string" + expected = spark.createDataFrame( + [ + [None], + [ + "Value '{\"id\": 1, \"name\": null}' in Column 'json_data' does not conform to expected JSON schema: struct" + ], + [ + "Value '{\"id\": 1}' in Column 'json_data' does not conform to expected JSON schema: struct" + ], + [None], + ["Value 'json_data string' in Column 'json_data' is not a valid JSON string"], + ], + expected_schema, + ) + + actual = test_df.select(has_valid_json_schema("json_data", json_schema)) + assert_df_equality(actual, expected) + + +def test_has_valid_json_schema_with_decimal_fields(spark): + schema = "json_data: string" + test_data = [ + ['{"price": 19.99, "discount": 0.15}'], + ['{"price": 99.99, "discount": 0.5}'], + ['{"price": 0.01, "discount": 0.0}'], + ['{"price": 0.01, "discount": null}'], + ['{"price": true, "discount": false}'], + [None], + ] + test_df = spark.createDataFrame(test_data, schema) + + json_schema = "STRUCT" + expected_schema = "json_data_has_invalid_json_schema: string" + expected = spark.createDataFrame( + [ + [None], + [None], + [None], + [None], + [ + "Value '{\"price\": true, \"discount\": false}' in Column 'json_data' does not conform to expected JSON schema: struct" + ], + [None], + ], + expected_schema, + ) + actual = test_df.select( + has_valid_json_schema("json_data", json_schema), + ) + assert_df_equality(actual, expected) + + +def test_has_valid_json_schema_with_complex_nested_structure(spark): + """Test has_valid_json_schema with complex nested structure - VALID case.""" + schema = "json_data: string" + test_df = spark.createDataFrame( + [ + ['{"user": {"id": 1, "profile": {"name": "John", "age": 30}}, "tags": ["admin", "user"]}'], + ['{"user": {"id": 2, "profile": {"name": "Jane", "age": 25}}, "tags": []}'], + [None], + ['{"user": {"id": "invalid", "profile": {"name": "John", "age": 30}}, "tags": ["admin"]}'], + ['{"user": {"id": 1, "profile": {"name": 123, "age": "thirty"}}, "tags": ["admin"]}'], + ['{"user": {"id": 1, "profile": null}, "tags": ["admin"]}'], + ], + schema, + ) + + json_schema = "struct>,tags:array>" + expected_schema = "json_data_has_invalid_json_schema: string" + expected = spark.createDataFrame( + [ + [None], + [None], + [None], + [ + "Value '{\"user\": {\"id\": \"invalid\", \"profile\": {\"name\": \"John\", \"age\": 30}}, \"tags\": [\"admin\"]}' in Column 'json_data' does not conform to expected JSON schema: struct>,tags:array>" + ], + [ + "Value '{\"user\": {\"id\": 1, \"profile\": {\"name\": 123, \"age\": \"thirty\"}}, \"tags\": [\"admin\"]}' in Column 'json_data' does not conform to expected JSON schema: struct>,tags:array>" + ], + [None], + ], + expected_schema, + ) + actual = test_df.select( + has_valid_json_schema("json_data", json_schema), + ) + assert_df_equality(actual, expected, ignore_nullable=True) diff --git a/tests/perf/conftest.py b/tests/perf/conftest.py index 16b4437a7..eb29ff95e 100644 --- a/tests/perf/conftest.py +++ b/tests/perf/conftest.py @@ -31,7 +31,8 @@ SCHEMA_STR = ( "col1: int, col2: int, col3: int, col4: array, " "col5: date, col6: timestamp, col7: map, " - "col8: struct, col10: int, col_ipv4: string, col_ipv6: string" + "col8: struct, col10: int, col_ipv4: string, col_ipv6: string, " + "col_json_str: string" ) RUN_TIME = datetime(2025, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc) @@ -101,6 +102,7 @@ def generated_df(spark, rows=DEFAULT_ROWS): .withColumnSpec("col10") .withColumnSpec("col_ipv4", template=r"\n.\n.\n.\n") .withColumnSpec("col_ipv6", template="XXXX:XXXX:XXXX:XXXX:XXXX:XXXX:XXXX:XXXX") + .withColumnSpec("col_json_str", template=r"{'key1': '\w', 'key2': 'd\w'}") ) return spec.build() diff --git a/tests/perf/test_apply_checks.py b/tests/perf/test_apply_checks.py index 75ccce303..346a706d2 100644 --- a/tests/perf/test_apply_checks.py +++ b/tests/perf/test_apply_checks.py @@ -1764,6 +1764,69 @@ def test_benchmark_foreach_has_valid_schema(benchmark, ws, generated_string_df): assert actual_count == EXPECTED_ROWS +@pytest.mark.benchmark(group="test_benchmark_is_valid_json") +def test_benchmark_is_valid_json(benchmark, ws, generated_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=check_funcs.is_valid_json, + column="col_json_str", + ), + ] + checked = dq_engine.apply_checks(generated_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +@pytest.mark.benchmark(group="test_benchmark_has_json_keys") +def test_benchmark_has_json_keys_require_at_least_one(benchmark, ws, generated_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, + ), + ] + checked = dq_engine.apply_checks(generated_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +@pytest.mark.benchmark(group="test_benchmark_has_json_keys") +def test_benchmark_has_json_keys_require_all_true(benchmark, ws, generated_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=check_funcs.has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1", "key2"]}, + ), + ] + checked = dq_engine.apply_checks(generated_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + +@pytest.mark.benchmark(group="test_benchmark_has_valid_json_schema") +def test_benchmark_has_valid_json_schema(benchmark, ws, generated_df): + dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) + checks = [ + DQRowRule( + criticality="error", + check_func=check_funcs.has_valid_json_schema, + column="col_json_str", + check_func_kwargs={"schema": "STRUCT"}, + ), + ] + checked = dq_engine.apply_checks(generated_df, checks) + actual_count = benchmark(lambda: checked.count()) + assert actual_count == EXPECTED_ROWS + + def test_benchmark_is_aggr_count_distinct_with_group_by(benchmark, ws, generated_df): """Benchmark count_distinct with group_by (uses two-stage aggregation: groupBy + join).""" dq_engine = DQEngine(workspace_client=ws, extra_params=EXTRA_PARAMS) diff --git a/tests/resources/all_row_checks.yaml b/tests/resources/all_row_checks.yaml index a7e8f47d8..eb81d444b 100644 --- a/tests/resources/all_row_checks.yaml +++ b/tests/resources/all_row_checks.yaml @@ -240,6 +240,39 @@ arguments: column: col6 +# is_valid_json check +- criticality: error + check: + function: is_valid_json + arguments: + column: col_json_str + +# has_json_keys check +- criticality: error + check: + function: has_json_keys + arguments: + column: col_json_str + keys: + - key1 + +- criticality: error + check: + function: has_json_keys + arguments: + column: col_json_str + keys: + - key1 + - key2 + require_all: false + +- criticality: error + check: + function: has_valid_json_schema + arguments: + column: col_json_str2 + schema: 'STRUCT' + # is_not_in_future check - criticality: error check: diff --git a/tests/unit/test_build_rules.py b/tests/unit/test_build_rules.py index a34e26dfd..d21532872 100644 --- a/tests/unit/test_build_rules.py +++ b/tests/unit/test_build_rules.py @@ -26,6 +26,8 @@ is_not_less_than, is_not_greater_than, is_valid_date, + is_valid_json, + has_json_keys, regex_match, compare_datasets, ) @@ -1143,6 +1145,20 @@ def test_convert_dq_rules_to_metadata(): DQRowRule( criticality="error", check_func=is_valid_date, column="b", check_func_kwargs={"date_format": "yyyy-MM-dd"} ), + DQRowRule(criticality="error", check_func=is_valid_json, column="col_json_str"), + DQRowRule( + criticality="error", + check_func=has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1"]}, + ), + DQRowRule( + name="col_json_str_has_no_json_key1_key2", + criticality="error", + check_func=has_json_keys, + column="col_json_str", + check_func_kwargs={"keys": ["key1", "key2"], "require_all": False}, + ), DQDatasetRule(criticality="error", check_func=is_unique, columns=["col1", "col2"]), DQDatasetRule( criticality="error", @@ -1302,6 +1318,27 @@ def test_convert_dq_rules_to_metadata(): "arguments": {"column": "b", "date_format": "yyyy-MM-dd"}, }, }, + { + "name": "col_json_str_is_not_valid_json", + "criticality": "error", + "check": { + "function": "is_valid_json", + "arguments": {"column": "col_json_str"}, + }, + }, + { + "name": "col_json_str_does_not_have_json_keys", + "criticality": "error", + "check": {"function": "has_json_keys", "arguments": {"column": "col_json_str", "keys": ["key1"]}}, + }, + { + "name": "col_json_str_has_no_json_key1_key2", + "criticality": "error", + "check": { + "function": "has_json_keys", + "arguments": {"column": "col_json_str", "keys": ["key1", "key2"], "require_all": False}, + }, + }, { "name": "struct_col1_col2_is_not_unique", "criticality": "error", @@ -1479,6 +1516,7 @@ def test_metadata_round_trip_conversion_preserves_rules() -> None: DQRowRule( criticality="error", check_func=is_valid_date, column="b", check_func_kwargs={"date_format": "yyyy-MM-dd"} ), + DQRowRule(criticality="error", check_func=is_valid_json, column="b"), DQDatasetRule(criticality="error", check_func=is_unique, columns=["col1", "col2"]), DQDatasetRule( criticality="error",