diff --git a/clickhouse/assets/configuration/spec.yaml b/clickhouse/assets/configuration/spec.yaml index 61c919dede3dc..6fa6be336598e 100644 --- a/clickhouse/assets/configuration/spec.yaml +++ b/clickhouse/assets/configuration/spec.yaml @@ -234,6 +234,51 @@ files: value: type: boolean example: false + - name: query_errors + description: Configure collection of query errors from system.query_log + options: + - name: enabled + description: | + Enable collection of query errors. Requires `dbm: true`. + Collects ExceptionBeforeStart and ExceptionWhileProcessing events, which include + exception message, error code, and stack trace. + value: + type: boolean + example: true + - name: collection_interval + description: | + Set the query errors collection interval (in seconds). + value: + type: number + example: 10 + - name: samples_per_hour_per_query + description: | + Set the maximum number of error samples to collect per hour per unique query signature. + Errors are high-signal events, so this defaults higher than query_completions. + value: + type: number + example: 60 + - name: seen_samples_cache_maxsize + hidden: true + description: | + Set the max size of the cache used for rate limiting error samples. + value: + type: number + default: 10000 + - name: max_samples_per_collection + hidden: true + description: | + Maximum number of error samples to collect in a single run (applies LIMIT in SQL). + value: + type: number + default: 1000 + - name: run_sync + hidden: true + description: | + Run the query errors collection synchronously. For testing only. + value: + type: boolean + example: false - template: instances/db overrides: custom_queries.value.example: diff --git a/clickhouse/changelog.d/23041.added b/clickhouse/changelog.d/23041.added new file mode 100644 index 0000000000000..6800945cb6c60 --- /dev/null +++ b/clickhouse/changelog.d/23041.added @@ -0,0 +1 @@ +Add query error collection from system.query_log for DBM \ No newline at end of file diff --git a/clickhouse/datadog_checks/clickhouse/clickhouse.py b/clickhouse/datadog_checks/clickhouse/clickhouse.py index a18bdca159251..f010f73ab8376 100644 --- a/clickhouse/datadog_checks/clickhouse/clickhouse.py +++ b/clickhouse/datadog_checks/clickhouse/clickhouse.py @@ -18,6 +18,7 @@ from .config import build_config, sanitize from .health import ClickhouseHealth, HealthEvent, HealthStatus from .query_completions import ClickhouseQueryCompletions +from .query_errors import ClickhouseQueryErrors from .statement_samples import ClickhouseStatementSamples from .statements import ClickhouseStatementMetrics from .utils import ErrorSanitizer @@ -120,6 +121,12 @@ def _init_dbm_components(self): else: self.query_completions = None + # Initialize query errors (from system.query_log - failed queries) + if self._config.dbm and self._config.query_errors.enabled: + self.query_errors = ClickhouseQueryErrors(self, self._config.query_errors) + else: + self.query_errors = None + @property def tags(self) -> list[str]: """Return the current list of tags from the TagManager.""" @@ -244,6 +251,10 @@ def check(self, _): if self.query_completions: self.query_completions.run_job_loop(self.tags) + # Run query errors if DBM is enabled (from system.query_log - failed queries) + if self.query_errors: + self.query_errors.run_job_loop(self.tags) + @AgentCheck.metadata_entrypoint def collect_version(self): version = list(self.execute_query_raw('SELECT version()'))[0][0] @@ -461,6 +472,8 @@ def cancel(self): self.statement_samples.cancel() if self.query_completions: self.query_completions.cancel() + if self.query_errors: + self.query_errors.cancel() # Wait for job loops to finish if self.statement_metrics and self.statement_metrics._job_loop_future: @@ -469,6 +482,8 @@ def cancel(self): self.statement_samples._job_loop_future.result() if self.query_completions and self.query_completions._job_loop_future: self.query_completions._job_loop_future.result() + if self.query_errors and self.query_errors._job_loop_future: + self.query_errors._job_loop_future.result() # Close main client if self._client: diff --git a/clickhouse/datadog_checks/clickhouse/config.py b/clickhouse/datadog_checks/clickhouse/config.py index eccb0235b14f7..d3d9bb2e81416 100644 --- a/clickhouse/datadog_checks/clickhouse/config.py +++ b/clickhouse/datadog_checks/clickhouse/config.py @@ -120,6 +120,10 @@ def build_config(check: ClickhouseCheck) -> Tuple[InstanceConfig, ValidationResu **dict_defaults.instance_query_completions().model_dump(), **(instance.get('query_completions', {})), }, + "query_errors": { + **dict_defaults.instance_query_errors().model_dump(), + **(instance.get('query_errors', {})), + }, # Tags - ensure we have a list, not None "tags": list(instance.get('tags', [])), # Other settings @@ -188,6 +192,13 @@ def _apply_validated_defaults(args: dict, instance: dict, validation_result: Val f"query_completions.collection_interval must be greater than 0, defaulting to {default_value} seconds." ) + if _safefloat(args.get('query_errors', {}).get('collection_interval')) <= 0: + default_value = dict_defaults.instance_query_errors().collection_interval + args['query_errors']['collection_interval'] = default_value + validation_result.add_warning( + f"query_errors.collection_interval must be greater than 0, defaulting to {default_value} seconds." + ) + def _validate_config(config: InstanceConfig, instance: dict, validation_result: ValidationResult): """Validate the configuration and add warnings/errors.""" @@ -203,6 +214,7 @@ def _validate_config(config: InstanceConfig, instance: dict, validation_result: 'query_completions', config.query_completions.enabled if config.query_completions else False, ), + ('query_errors', config.query_errors.enabled if config.query_errors else False), ] for feature_name, _is_enabled in dbm_features: if instance.get(feature_name, {}).get('enabled') and not config.dbm: @@ -234,6 +246,11 @@ def _apply_features(config: InstanceConfig, validation_result: ValidationResult) config.query_completions.enabled and config.dbm, None if config.dbm else "Requires `dbm: true`", ) + validation_result.add_feature( + FeatureKey.QUERY_ERRORS, + config.query_errors.enabled and config.dbm, + None if config.dbm else "Requires `dbm: true`", + ) validation_result.add_feature(FeatureKey.SINGLE_ENDPOINT_MODE, config.single_endpoint_mode) diff --git a/clickhouse/datadog_checks/clickhouse/config_models/dict_defaults.py b/clickhouse/datadog_checks/clickhouse/config_models/dict_defaults.py index 2e68c54576b91..e3a79e31217c1 100644 --- a/clickhouse/datadog_checks/clickhouse/config_models/dict_defaults.py +++ b/clickhouse/datadog_checks/clickhouse/config_models/dict_defaults.py @@ -42,3 +42,14 @@ def instance_query_completions(): max_samples_per_collection=1000, run_sync=False, ) + + +def instance_query_errors(): + return instance.QueryErrors( + enabled=True, + collection_interval=10, + samples_per_hour_per_query=60, + seen_samples_cache_maxsize=10000, + max_samples_per_collection=1000, + run_sync=False, + ) diff --git a/clickhouse/datadog_checks/clickhouse/config_models/instance.py b/clickhouse/datadog_checks/clickhouse/config_models/instance.py index 6ee5e35bafc66..92e50dc677a6a 100644 --- a/clickhouse/datadog_checks/clickhouse/config_models/instance.py +++ b/clickhouse/datadog_checks/clickhouse/config_models/instance.py @@ -62,6 +62,19 @@ class QueryCompletions(BaseModel): seen_samples_cache_maxsize: Optional[float] = None +class QueryErrors(BaseModel): + model_config = ConfigDict( + arbitrary_types_allowed=True, + frozen=True, + ) + collection_interval: Optional[float] = None + enabled: Optional[bool] = None + max_samples_per_collection: Optional[float] = None + run_sync: Optional[bool] = None + samples_per_hour_per_query: Optional[float] = None + seen_samples_cache_maxsize: Optional[float] = None + + class QueryMetrics(BaseModel): model_config = ConfigDict( arbitrary_types_allowed=True, @@ -106,6 +119,7 @@ class InstanceConfig(BaseModel): password: Optional[str] = None port: Optional[int] = None query_completions: Optional[QueryCompletions] = None + query_errors: Optional[QueryErrors] = None query_metrics: Optional[QueryMetrics] = None query_samples: Optional[QuerySamples] = None read_timeout: Optional[int] = None diff --git a/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example b/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example index e95a97703afe4..2c614aa58d55e 100644 --- a/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example +++ b/clickhouse/datadog_checks/clickhouse/data/conf.yaml.example @@ -176,6 +176,28 @@ instances: # # samples_per_hour_per_query: 15 + ## Configure collection of query errors from system.query_log + # + # query_errors: + + ## @param enabled - boolean - optional - default: true + ## Enable collection of query errors. Requires `dbm: true`. + ## Collects ExceptionBeforeStart and ExceptionWhileProcessing events, which include + ## exception message, error code, and stack trace. + # + # enabled: true + + ## @param collection_interval - number - optional - default: 10 + ## Set the query errors collection interval (in seconds). + # + # collection_interval: 10 + + ## @param samples_per_hour_per_query - number - optional - default: 60 + ## Set the maximum number of error samples to collect per hour per unique query signature. + ## Errors are high-signal events, so this defaults higher than query_completions. + # + # samples_per_hour_per_query: 60 + ## @param only_custom_queries - boolean - optional - default: false ## Set this parameter to `true` if you want to skip the integration's default metrics collection. ## Only metrics specified in `custom_queries` will be collected. diff --git a/clickhouse/datadog_checks/clickhouse/features.py b/clickhouse/datadog_checks/clickhouse/features.py index 84e3b58900872..bfa1009209ae4 100644 --- a/clickhouse/datadog_checks/clickhouse/features.py +++ b/clickhouse/datadog_checks/clickhouse/features.py @@ -21,6 +21,7 @@ class FeatureKey(Enum): QUERY_METRICS = "query_metrics" QUERY_SAMPLES = "query_samples" QUERY_COMPLETIONS = "query_completions" + QUERY_ERRORS = "query_errors" SINGLE_ENDPOINT_MODE = "single_endpoint_mode" @@ -29,6 +30,7 @@ class FeatureKey(Enum): FeatureKey.QUERY_METRICS: 'Query Metrics', FeatureKey.QUERY_SAMPLES: 'Query Samples', FeatureKey.QUERY_COMPLETIONS: 'Query Completions', + FeatureKey.QUERY_ERRORS: 'Query Errors', FeatureKey.SINGLE_ENDPOINT_MODE: 'Single Endpoint Mode', } diff --git a/clickhouse/datadog_checks/clickhouse/query_completions.py b/clickhouse/datadog_checks/clickhouse/query_completions.py index aca9e11d3a732..4a237982b344e 100644 --- a/clickhouse/datadog_checks/clickhouse/query_completions.py +++ b/clickhouse/datadog_checks/clickhouse/query_completions.py @@ -115,7 +115,7 @@ def _collect_and_submit(self): # Step 3: Submit payload payload_data = json.dumps(payload, default=default_json_event_encoding) num_completions = len(payload.get('clickhouse_query_completions', [])) - self._log.info( + self._log.debug( "Submitting query completions payload: %d bytes, %d completions", len(payload_data), num_completions, @@ -123,7 +123,7 @@ def _collect_and_submit(self): self._check.database_monitoring_query_activity(payload_data) if self._current_checkpoint_microseconds is not None: - self._log.info( + self._log.debug( "Successfully submitted. Checkpoint: %d microseconds", self._current_checkpoint_microseconds ) @@ -162,7 +162,7 @@ def _collect_completed_queries(self): rows = self._execute_query(query, parameters=params) - self._log.info( + self._log.debug( "Loaded %d completed queries from %s [%s]", len(rows), query_log_table, diff --git a/clickhouse/datadog_checks/clickhouse/query_errors.py b/clickhouse/datadog_checks/clickhouse/query_errors.py new file mode 100644 index 0000000000000..6d70479deffaa --- /dev/null +++ b/clickhouse/datadog_checks/clickhouse/query_errors.py @@ -0,0 +1,324 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from datadog_checks.clickhouse import ClickhouseCheck + +try: + import datadog_agent +except ImportError: + from datadog_checks.base.stubs import datadog_agent + +from datadog_checks.base.utils.db.utils import RateLimitingTTLCache, default_json_event_encoding +from datadog_checks.base.utils.serialization import json +from datadog_checks.base.utils.tracking import tracked_method +from datadog_checks.clickhouse.query_log_job import ClickhouseQueryLogJob, agent_check_getter + +# Query to fetch failed queries from system.query_log. +# Collects ExceptionBeforeStart (type=3) and ExceptionWhileProcessing (type=4) events. +# Includes exception, exception_code, and stack_trace fields in addition to the standard query fields. +QUERY_ERRORS_QUERY = """ +SELECT + normalized_query_hash, + hostName() as server_node, + query, + user, + type as query_type, + databases, + tables, + query_duration_ms, + read_rows, + read_bytes, + written_rows, + written_bytes, + result_rows, + result_bytes, + memory_usage, + query_start_time_microseconds, + event_time_microseconds, + query_id, + initial_query_id, + query_kind, + is_initial_query, + exception, + exception_code, + stack_trace, + current_database, + address +FROM {query_log_table} +WHERE + {checkpoint_filter} + AND event_time_microseconds <= now64(6) + AND event_date >= toDate(fromUnixTimestamp64Micro({min_checkpoint_us:UInt64})) + AND type IN ('ExceptionBeforeStart', 'ExceptionWhileProcessing') + AND is_initial_query = 1 + AND query != '' + AND normalized_query_hash != 0 + {internal_user_filter} +ORDER BY event_time_microseconds ASC +LIMIT {max_samples:UInt64} +""" + + +class ClickhouseQueryErrors(ClickhouseQueryLogJob): + """Collects failed query samples from system.query_log""" + + CHECKPOINT_CACHE_KEY = "query_errors_last_checkpoint_microseconds" + + def __init__(self, check: ClickhouseCheck, config): + super().__init__( + check=check, + config=config, + job_name="query-errors", + ) + + self._seen_samples_ratelimiter = RateLimitingTTLCache( + maxsize=int(config.seen_samples_cache_maxsize), + ttl=60 * 60 / float(config.samples_per_hour_per_query), + ) + + self._max_samples_per_collection = int(config.max_samples_per_collection) + + @tracked_method(agent_check_getter=agent_check_getter) + def _collect_and_submit(self): + """ + Collect and submit failed query samples. + + Checkpoint is always advanced after collection to prefer dropped data over duplicates. + """ + try: + self._current_checkpoint_microseconds = None + self._pending_node_checkpoints = {} + + rows = self._collect_query_errors() + + if not rows: + self._log.debug("No new query errors") + return + + payload = self._create_batched_payload(rows) + + if not payload or not payload.get('clickhouse_query_errors'): + self._log.debug("No query errors after rate limiting") + return + + payload_data = json.dumps(payload, default=default_json_event_encoding) + num_errors = len(payload.get('clickhouse_query_errors', [])) + self._log.debug( + "Submitting query errors payload: %d bytes, %d errors", + len(payload_data), + num_errors, + ) + self._check.database_monitoring_query_activity(payload_data) + + if self._current_checkpoint_microseconds is not None: + self._log.debug( + "Successfully submitted. Checkpoint: %d microseconds", self._current_checkpoint_microseconds + ) + + except Exception: + self._log.exception('Unable to collect query error samples due to an error') + finally: + self._advance_checkpoint() + + @tracked_method(agent_check_getter=agent_check_getter, track_result_length=True) + def _collect_query_errors(self): + """Load failed query samples using checkpoint-based collection.""" + try: + query_log_table = self._check.get_system_table('query_log') + checkpoint_filter, min_checkpoint, params = self._build_per_node_checkpoint_filter() + + query = ( + QUERY_ERRORS_QUERY.replace("{query_log_table}", query_log_table) + .replace("{checkpoint_filter}", checkpoint_filter) + .replace("{internal_user_filter}", self._get_internal_user_filter()) + ) + params["min_checkpoint_us"] = min_checkpoint + params["max_samples"] = self._max_samples_per_collection + + rows = self._execute_query(query, parameters=params) + + self._log.debug( + "Loaded %d query errors from %s [%s]", + len(rows), + query_log_table, + self.deployment_mode, + ) + + max_event_time = 0 + + result_rows = [] + for row in rows: + ( + normalized_query_hash, + server_node, + query_text, + user, + query_type, + databases, + tables, + query_duration_ms, + read_rows, + read_bytes, + written_rows, + written_bytes, + result_rows_count, + result_bytes, + memory_usage, + query_start_time_microseconds, + event_time_microseconds, + query_id, + initial_query_id, + query_kind, + is_initial_query, + exception, + exception_code, + stack_trace, + current_database, + address, + ) = row + + event_time_int = self.to_microseconds(event_time_microseconds) + if event_time_int > max_event_time: + max_event_time = event_time_int + + if server_node: + self._track_node_checkpoint(str(server_node), event_time_int) + + row_dict = { + 'normalized_query_hash': str(normalized_query_hash), + 'query': str(query_text) if query_text else '', + 'user': str(user) if user else '', + 'query_type': str(query_type) if query_type else '', + # For ExceptionBeforeStart errors, `databases` is empty because the query + # failed before table resolution. Fall back to `current_database` (the + # connection's default database) so the field is always populated. + 'databases': ( + str(databases[0]) + if databases and len(databases) > 0 + else (str(current_database) if current_database else '') + ), + 'tables': tables if tables else [], + 'query_duration_ms': float(query_duration_ms) if query_duration_ms else 0.0, + 'read_rows': int(read_rows) if read_rows else 0, + 'read_bytes': int(read_bytes) if read_bytes else 0, + 'written_rows': int(written_rows) if written_rows else 0, + 'written_bytes': int(written_bytes) if written_bytes else 0, + 'result_rows': int(result_rows_count) if result_rows_count else 0, + 'result_bytes': int(result_bytes) if result_bytes else 0, + 'memory_usage': int(memory_usage) if memory_usage else 0, + 'query_start_time_microseconds': self.to_microseconds(query_start_time_microseconds), + 'event_time_microseconds': event_time_int, + 'query_id': str(query_id) if query_id else '', + 'initial_query_id': str(initial_query_id) if initial_query_id else '', + 'query_kind': str(query_kind) if query_kind else '', + 'is_initial_query': bool(is_initial_query) if is_initial_query is not None else True, + 'exception': str(exception) if exception else '', + 'exception_code': int(exception_code) if exception_code else 0, + 'stack_trace': str(stack_trace) if stack_trace else '', + 'client_ip': str(address) if address else '', + } + + obfuscated_row = self._normalize_query(row_dict) + if obfuscated_row: + result_rows.append(obfuscated_row) + + self._set_checkpoint_from_event_time(max_event_time) + + return result_rows + + except Exception as e: + self._log.exception("Failed to load query errors from system.query_log: %s", e) + + self._check.count( + "dd.clickhouse.query_errors.error", + 1, + tags=self.tags + ["error:query_log_load_failed"], + raw=True, + ) + + raise + + def _normalize_query(self, row: dict) -> dict | None: + """Normalize and obfuscate a single query error row.""" + obfuscation_result = self._obfuscate_query(row['query']) + if obfuscation_result is None: + return None + + row['statement'] = obfuscation_result['query'] + row['query_signature'] = obfuscation_result['query_signature'] + row['dd_tables'] = obfuscation_result['dd_tables'] + row['dd_commands'] = obfuscation_result['dd_commands'] + row['dd_comments'] = obfuscation_result['dd_comments'] + + return row + + def _create_batched_payload(self, rows: list) -> dict | None: + """Create a batched payload with rate limiting applied.""" + query_errors = [] + + for row in rows: + query_signature = row.get('query_signature') + if not query_signature: + continue + + if not self._seen_samples_ratelimiter.acquire(query_signature): + continue + + query_details = { + 'statement': row.get('statement'), + 'query_signature': query_signature, + 'duration_ms': row.get('query_duration_ms', 0), + 'database_name': row.get('databases', ''), + 'username': row.get('user', ''), + 'query_id': row.get('query_id', ''), + 'query_type': row.get('query_type', ''), + 'query_kind': row.get('query_kind', ''), + 'normalized_query_hash': row.get('normalized_query_hash', ''), + 'read_rows': row.get('read_rows', 0), + 'read_bytes': row.get('read_bytes', 0), + 'written_rows': row.get('written_rows', 0), + 'written_bytes': row.get('written_bytes', 0), + 'result_rows': row.get('result_rows', 0), + 'result_bytes': row.get('result_bytes', 0), + 'memory_usage': row.get('memory_usage', 0), + 'query_start_time_microseconds': row.get('query_start_time_microseconds', 0), + 'event_time_microseconds': row.get('event_time_microseconds', 0), + 'initial_query_id': row.get('initial_query_id', ''), + 'is_initial_query': row.get('is_initial_query', True), + 'exception': row.get('exception', ''), + 'exception_code': row.get('exception_code', 0), + 'stack_trace': row.get('stack_trace', ''), + 'client_ip': row.get('client_ip', ''), + 'metadata': { + 'tables': row.get('dd_tables'), + 'commands': row.get('dd_commands'), + 'comments': row.get('dd_comments'), + }, + } + + query_errors.append({'query_details': query_details}) + + if not query_errors: + return None + + payload = { + 'host': self._check.reported_hostname, + 'database_instance': self._check.database_identifier, + 'ddagentversion': datadog_agent.get_version(), + 'ddsource': 'clickhouse', + 'dbm_type': 'query_error', + 'collection_interval': self._collection_interval, + 'ddtags': self._tags_no_db, + 'timestamp': time.time() * 1000, + 'clickhouse_version': self._check.dbms_version, + 'service': getattr(self._check, 'service', None), + 'clickhouse_query_errors': query_errors, + } + + return payload diff --git a/clickhouse/tests/test_query_errors.py b/clickhouse/tests/test_query_errors.py new file mode 100644 index 0000000000000..0432ccbbfc323 --- /dev/null +++ b/clickhouse/tests/test_query_errors.py @@ -0,0 +1,258 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import time +from unittest import mock + +import pytest + +from datadog_checks.clickhouse import ClickhouseCheck +from datadog_checks.clickhouse.query_errors import ClickhouseQueryErrors + +pytestmark = pytest.mark.unit + + +@pytest.fixture +def instance_with_dbm(): + """Return a ClickHouse instance configuration with DBM and query errors enabled""" + return { + 'server': 'localhost', + 'port': 9000, + 'username': 'default', + 'password': '', + 'db': 'default', + 'dbm': True, + 'query_errors': { + 'enabled': True, + 'collection_interval': 10, + 'samples_per_hour_per_query': 60, + 'seen_samples_cache_maxsize': 10000, + 'max_samples_per_collection': 1000, + 'run_sync': False, + }, + 'tags': ['test:clickhouse'], + } + + +@pytest.fixture +def check_with_dbm(instance_with_dbm): + """Return a ClickHouse check instance with DBM enabled""" + return ClickhouseCheck('clickhouse', {}, [instance_with_dbm]) + + +def test_query_errors_initialization(check_with_dbm): + """Test that query errors are properly initialized when DBM is enabled""" + assert check_with_dbm.query_errors is not None + assert isinstance(check_with_dbm.query_errors, ClickhouseQueryErrors) + assert check_with_dbm.query_errors._config.enabled is True + assert check_with_dbm.query_errors._config.collection_interval == 10 + assert check_with_dbm.query_errors._config.samples_per_hour_per_query == 60 + + +def test_query_errors_disabled(): + """Test that query errors are not initialized when disabled""" + instance = { + 'server': 'localhost', + 'port': 9000, + 'username': 'default', + 'password': '', + 'db': 'default', + 'dbm': True, + 'query_errors': { + 'enabled': False, + }, + 'tags': ['test:clickhouse'], + } + check = ClickhouseCheck('clickhouse', {}, [instance]) + assert check.query_errors is None + + +def test_dbm_disabled_no_query_errors(): + """Test that query errors are not initialized when DBM is disabled""" + instance = { + 'server': 'localhost', + 'port': 9000, + 'username': 'default', + 'password': '', + 'db': 'default', + 'dbm': False, + 'tags': ['test:clickhouse'], + } + check = ClickhouseCheck('clickhouse', {}, [instance]) + assert check.query_errors is None + + +def test_normalize_query(check_with_dbm): + """Test query obfuscation and normalization for error rows""" + query_errors = check_with_dbm.query_errors + + row = { + 'query_id': 'test-error-query-id-123', + 'query': 'SELECT * FROM nonexistent_table WHERE id = 12345', + 'event_time_microseconds': int(time.time() * 1_000_000), + 'query_start_time_microseconds': int(time.time() * 1_000_000) - 100000, + 'query_duration_ms': 0.0, + 'read_rows': 0, + 'read_bytes': 0, + 'written_rows': 0, + 'written_bytes': 0, + 'result_rows': 0, + 'result_bytes': 0, + 'memory_usage': 0, + 'user': 'default', + 'exception': 'Table default.nonexistent_table does not exist. (UNKNOWN_TABLE)', + 'exception_code': 60, + 'stack_trace': 'DB::Exception::Exception at 0x...', + } + + normalized_row = query_errors._normalize_query(row) + + assert normalized_row['statement'] is not None + assert normalized_row['query_signature'] is not None + assert 'dd_tables' in normalized_row + assert 'dd_commands' in normalized_row + assert 'dd_comments' in normalized_row + + +def test_create_batched_payload_error_fields(check_with_dbm): + """Test that batched payload includes exception, exception_code, and stack_trace""" + query_errors = check_with_dbm.query_errors + query_errors._tags_no_db = ['test:clickhouse'] + + rows = [ + { + 'query_id': 'err-query-id-456', + 'statement': 'SELECT * FROM nonexistent_table WHERE id = ?', + 'query_signature': 'err123def456', + 'query_duration_ms': 0.0, + 'databases': 'default', + 'user': 'default_user', + 'read_rows': 0, + 'read_bytes': 0, + 'written_rows': 0, + 'written_bytes': 0, + 'result_rows': 0, + 'result_bytes': 0, + 'memory_usage': 0, + 'event_time_microseconds': 1746205423150500, + 'query_start_time_microseconds': 1746205423000000, + 'initial_query_id': 'err-query-id-456', + 'query_kind': 'Select', + 'is_initial_query': True, + 'exception': 'Table default.nonexistent_table does not exist. (UNKNOWN_TABLE)', + 'exception_code': 60, + 'stack_trace': 'DB::Exception::Exception at 0x1234...', + 'dd_tables': ['nonexistent_table'], + 'dd_commands': ['SELECT'], + 'dd_comments': [], + } + ] + + with mock.patch('datadog_checks.clickhouse.query_errors.datadog_agent') as mock_agent: + mock_agent.get_version.return_value = '7.64.0' + payload = query_errors._create_batched_payload(rows) + + assert payload is not None + assert len(payload['clickhouse_query_errors']) == 1 + + query_details = payload['clickhouse_query_errors'][0]['query_details'] + assert query_details['exception'] == 'Table default.nonexistent_table does not exist. (UNKNOWN_TABLE)' + assert query_details['exception_code'] == 60 + assert query_details['stack_trace'] == 'DB::Exception::Exception at 0x1234...' + + +def test_create_batched_payload_structure(check_with_dbm): + """Test that payload uses dbm_type=query_errors and key=clickhouse_query_errors""" + query_errors = check_with_dbm.query_errors + query_errors._tags_no_db = ['test:clickhouse', 'server:localhost'] + + rows = [ + { + 'statement': 'SELECT * FROM nonexistent_table', + 'query_signature': 'abc123', + 'query_duration_ms': 0.0, + 'databases': 'default', + 'user': 'default', + 'exception': 'Table does not exist.', + 'exception_code': 60, + 'stack_trace': '', + }, + ] + + with mock.patch('datadog_checks.clickhouse.query_errors.datadog_agent') as mock_agent: + mock_agent.get_version.return_value = '7.64.0' + payload = query_errors._create_batched_payload(rows) + + assert payload['ddsource'] == 'clickhouse' + assert payload['dbm_type'] == 'query_error' + assert 'clickhouse_query_errors' in payload + assert 'timestamp' in payload + assert 'host' in payload + assert 'database_instance' in payload + + +def test_query_errors_sql_query_format(): + """Test that the SQL query template uses correct filters for error types.""" + from datadog_checks.clickhouse.query_errors import QUERY_ERRORS_QUERY + + # Structural placeholders (str.replace()) + assert '{checkpoint_filter}' in QUERY_ERRORS_QUERY + assert '{query_log_table}' in QUERY_ERRORS_QUERY + + # Bound parameters (ClickHouse server-side) + assert '{min_checkpoint_us:UInt64}' in QUERY_ERRORS_QUERY + assert '{max_samples:UInt64}' in QUERY_ERRORS_QUERY + + # hostName() for per-node checkpoint tracking + assert 'hostName() as server_node' in QUERY_ERRORS_QUERY + + # Error type filter (not QueryFinish) + assert "type IN ('ExceptionBeforeStart', 'ExceptionWhileProcessing')" in QUERY_ERRORS_QUERY + + # Error-specific fields + assert 'exception' in QUERY_ERRORS_QUERY + assert 'exception_code' in QUERY_ERRORS_QUERY + assert 'stack_trace' in QUERY_ERRORS_QUERY + + # Standard query fields + assert 'query_id' in QUERY_ERRORS_QUERY + assert 'memory_usage' in QUERY_ERRORS_QUERY + assert 'event_time_microseconds' in QUERY_ERRORS_QUERY + + +def test_rate_limiting(check_with_dbm): + """Test that query error rate limiting works correctly""" + query_errors = check_with_dbm.query_errors + + query_cache_key = ('error-signature-123', 'default', 'default_user') + + assert query_errors._seen_samples_ratelimiter.acquire(query_cache_key) is True + assert query_errors._seen_samples_ratelimiter.acquire(query_cache_key) is False + + +def test_default_config_values(): + """Test that default configuration values are applied correctly for query errors""" + instance = { + 'server': 'localhost', + 'port': 9000, + 'username': 'default', + 'password': '', + 'db': 'default', + 'dbm': True, + 'query_errors': { + 'enabled': True, + }, + 'tags': ['test:clickhouse'], + } + check = ClickhouseCheck('clickhouse', {}, [instance]) + + assert check.query_errors._config.collection_interval == 10 + assert check.query_errors._config.samples_per_hour_per_query == 60 + assert check.query_errors._config.seen_samples_cache_maxsize == 10000 + assert check.query_errors._config.max_samples_per_collection == 1000 + + +def test_separate_checkpoint_key(check_with_dbm): + """Test that query errors use a separate checkpoint cache key from completions""" + assert check_with_dbm.query_errors.CHECKPOINT_CACHE_KEY == "query_errors_last_checkpoint_microseconds" + assert check_with_dbm.query_errors.CHECKPOINT_CACHE_KEY != check_with_dbm.query_completions.CHECKPOINT_CACHE_KEY