diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java index c8f9b1a16d8f..f6de574ba92d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java @@ -48,10 +48,38 @@ public class QueryLogger { private static final Logger LOGGER = LoggerFactory.getLogger(QueryLogger.class); private static final QueryLogEntry[] QUERY_LOG_ENTRY_VALUES = QueryLogEntry.values(); + private static final String FINGERPRINT_FAILED_QUERY_REDACTED = "FINGERPRINT_FAILED_QUERY_REDACTED"; + private static final String FULLY_REDACTED = "REDACTED"; + + public enum SqlRedactionMode { + // Log the full SQL query text as-is. + // e.g. "SELECT name FROM users WHERE id = 42 AND status = 'active'" + NONE, + // Replace literal values with placeholders using the query fingerprint, preserving query structure. + // Requires query fingerprinting to be enabled (will be auto-enabled if not configured). + // e.g. "SELECT name FROM users WHERE id = ? AND status = ?" + LITERAL_VALUES, + // Omit the SQL text entirely from logs, replacing it with "[REDACTED]". + // Use when no part of the query should appear in logs. + FULL; + + public static SqlRedactionMode fromString(String value) { + try { + return valueOf(value.toUpperCase()); + } catch (IllegalArgumentException e) { + // The default config value is NONE. If the user intended to enable redaction but made a typo, + // it's safer to default to FULL instead of NONE to avoid accidentally logging sensitive information. + LOGGER.warn("Invalid SQL redaction mode '{}', defaulting to FULL", value); + return FULL; + } + } + } + private final int _maxQueryLengthToLog; private final RateLimiter _logRateLimiter; private final boolean _enableIpLogging; private final boolean _logBeforeProcessing; + private final SqlRedactionMode _sqlRedactionMode; private final Logger _logger; private final RateLimiter _droppedLogRateLimiter; private final AtomicLong _numDroppedLogs = new AtomicLong(0L); @@ -63,20 +91,24 @@ public QueryLogger(PinotConfiguration config) { config.getProperty(Broker.CONFIG_OF_BROKER_REQUEST_CLIENT_IP_LOGGING, Broker.DEFAULT_BROKER_REQUEST_CLIENT_IP_LOGGING), config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_BEFORE_PROCESSING, - Broker.DEFAULT_BROKER_QUERY_LOG_BEFORE_PROCESSING), LOGGER, RateLimiter.create(1) + Broker.DEFAULT_BROKER_QUERY_LOG_BEFORE_PROCESSING), + SqlRedactionMode.fromString(config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_SQL_REDACTION, + Broker.DEFAULT_BROKER_QUERY_LOG_SQL_REDACTION)), + LOGGER, RateLimiter.create(1) // log once a second for dropped log count ); } @VisibleForTesting QueryLogger(RateLimiter logRateLimiter, int maxQueryLengthToLog, boolean enableIpLogging, boolean logBeforeProcessing, - Logger logger, RateLimiter droppedLogRateLimiter) { + SqlRedactionMode sqlRedactionMode, Logger logger, RateLimiter droppedLogRateLimiter) { _logRateLimiter = logRateLimiter; _maxQueryLengthToLog = maxQueryLengthToLog; _enableIpLogging = enableIpLogging; _logger = logger; _droppedLogRateLimiter = droppedLogRateLimiter; _logBeforeProcessing = logBeforeProcessing; + _sqlRedactionMode = sqlRedactionMode; } /** @@ -86,15 +118,16 @@ public QueryLogger(PinotConfiguration config) { * * @param requestId the request ID * @param query the SQL query + * @param queryFingerprint the query fingerprint (used when redaction is enabled) * @return true if the rate limiter allowed this query (not rate-limited), false if rate-limited */ - public boolean logQueryReceived(long requestId, String query) { + public boolean logQueryReceived(long requestId, String query, @Nullable QueryFingerprint queryFingerprint) { if (!checkRateLimiter()) { return false; } if (_logBeforeProcessing) { - _logger.info("SQL query for request {}: {}", requestId, query); + _logger.info("SQL query for request {}: {}", requestId, redactQuery(query, queryFingerprint)); } tryLogDropped(); @@ -123,8 +156,8 @@ public void logQueryCompleted(QueryLogParams params, boolean wasLogged) { } // always log the query last - don't add this to the QueryLogEntry enum - queryLogBuilder.append("query=") - .append(StringUtils.substring(params._requestContext.getQuery(), 0, _maxQueryLengthToLog)); + String redacted = redactQuery(params._requestContext.getQuery(), params._requestContext.getQueryFingerprint()); + queryLogBuilder.append("query=").append(StringUtils.substring(redacted, 0, _maxQueryLengthToLog)); _logger.info(queryLogBuilder.toString()); tryLogDropped(); @@ -158,6 +191,26 @@ public double getLogRateLimit() { return _logRateLimiter.getRate(); } + public SqlRedactionMode getSqlRedactionMode() { + return _sqlRedactionMode; + } + + public String redactQuery(String query) { + return redactQuery(query, null); + } + + public String redactQuery(String query, @Nullable QueryFingerprint queryFingerprint) { + switch (_sqlRedactionMode) { + case FULL: + return FULLY_REDACTED; + case LITERAL_VALUES: + return queryFingerprint != null ? queryFingerprint.getFingerprint() : FINGERPRINT_FAILED_QUERY_REDACTED; + case NONE: + default: + return query; + } + } + private boolean shouldForceLog(@Nullable QueryLogParams params) { if (params == null) { return false; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index e343f89a49c5..3d6d2c61f5c7 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -211,8 +211,15 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro _enableMultistageMigrationMetric = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC, Broker.DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC); - _enableQueryFingerprinting = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_FINGERPRINTING, + boolean fingerprintingConfigured = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_FINGERPRINTING, Broker.DEFAULT_BROKER_ENABLE_QUERY_FINGERPRINTING); + boolean redactionNeedsFingerprinting = + _queryLogger.getSqlRedactionMode() == QueryLogger.SqlRedactionMode.LITERAL_VALUES; + if (redactionNeedsFingerprinting && !fingerprintingConfigured) { + LOGGER.warn("SQL redaction mode 'literal_values' requires query fingerprinting. " + + "Enabling query fingerprinting automatically."); + } + _enableQueryFingerprinting = fingerprintingConfigured || redactionNeedsFingerprinting; if (_enableMultistageMigrationMetric) { _multistageCompileExecutor = Executors.newSingleThreadExecutor(); _multistageCompileQueryQueue = new LinkedBlockingQueue<>(1000); @@ -300,7 +307,8 @@ protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, H // we can get the cid from QueryThreadContext serverUrls.add(Pair.of(String.format("%s/query/%s", serverInstance.getAdminEndpoint(), globalQueryId), null)); } - LOGGER.debug("Cancelling the query: {} via server urls: {}", queryServers._query, serverUrls); + LOGGER.debug("Cancelling the query: {} via server urls: {}", _queryLogger.redactQuery(queryServers._query), + serverUrls); CompletionService completionService = new MultiHttpRequest(executor, connMgr).execute(serverUrls, null, timeoutMs, "DELETE", HttpDelete::new); List errMsgs = new ArrayList<>(serverUrls.size()); @@ -322,7 +330,7 @@ protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, H serverResponses.put(uri.getHost() + ":" + uri.getPort(), status); } } catch (Exception e) { - LOGGER.error("Failed to cancel query: {}", queryServers._query, e); + LOGGER.error("Failed to cancel query: {}", _queryLogger.redactQuery(queryServers._query), e); // Can't just throw exception from here as there is a need to release the other connections. // So just collect the error msg to throw them together after the for-loop. errMsgs.add(e.getMessage()); @@ -343,21 +351,23 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, @Nullable HttpHeaders httpHeaders, AccessControl accessControl) throws Exception { - boolean queryWasLogged = _queryLogger.logQueryReceived(requestId, query); - + QueryFingerprint queryFingerprint = null; String queryHash = CommonConstants.Broker.DEFAULT_QUERY_HASH; if (_enableQueryFingerprinting) { try { - QueryFingerprint queryFingerprint = QueryFingerprintUtils.generateFingerprint(sqlNodeAndOptions); + queryFingerprint = QueryFingerprintUtils.generateFingerprint(sqlNodeAndOptions); if (queryFingerprint != null) { queryHash = queryFingerprint.getQueryHash(); requestContext.setQueryFingerprint(queryFingerprint); } } catch (Exception e) { - LOGGER.warn("Failed to generate query fingerprint for request {}: {}. {}", requestId, query, e.getMessage()); + LOGGER.warn("Failed to generate query fingerprint for request {}: {}. {}", requestId, + _queryLogger.redactQuery(query), e.getMessage()); } } + boolean queryWasLogged = _queryLogger.logQueryReceived(requestId, query, queryFingerprint); + String cid = extractClientRequestId(sqlNodeAndOptions); if (cid == null) { cid = Long.toString(requestId); @@ -486,14 +496,16 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn // Validate QPS if (!_queryQuotaManager.acquireDatabase(database)) { String errorMessage = - String.format("Request %d: %s exceeds query quota for database: %s", requestId, query, database); + String.format("Request %d: %s exceeds query quota for database: %s", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint()), database); LOGGER.info(errorMessage); requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage); } if (!_queryQuotaManager.acquireLogicalTable(tableName)) { String errorMessage = - String.format("Request %d: %s exceeds query quota for table: %s.", requestId, query, tableName); + String.format("Request %d: %s exceeds query quota for table: %s.", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint()), tableName); requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage); } @@ -543,14 +555,16 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn // Validate QPS quota if (!_queryQuotaManager.acquireDatabase(database)) { String errorMessage = - String.format("Request %d: %s exceeds query quota for database: %s", requestId, query, database); + String.format("Request %d: %s exceeds query quota for database: %s", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint()), database); LOGGER.info(errorMessage); requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); return new BrokerResponseNative(QueryErrorCode.TOO_MANY_REQUESTS, errorMessage); } if (!_queryQuotaManager.acquire(tableName)) { String errorMessage = - String.format("Request %d: %s exceeds query quota for table: %s", requestId, query, tableName); + String.format("Request %d: %s exceeds query quota for table: %s", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint()), tableName); LOGGER.info(errorMessage); requestContext.setErrorCode(QueryErrorCode.TOO_MANY_REQUESTS); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); @@ -597,13 +611,15 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn TableRouteInfo routeInfo = routeProvider.getTableRouteInfo(tableName, _tableCache, selectedRoutingManager); if (!routeInfo.isExists()) { - LOGGER.info("Table not found for request {}: {}", requestId, query); + LOGGER.info("Table not found for request {}: {}", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint())); requestContext.setErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST); return BrokerResponseNative.TABLE_DOES_NOT_EXIST; } if (!routeInfo.isRouteExists()) { - LOGGER.info("No table matches for request {}: {}", requestId, query); + LOGGER.info("No table matches for request {}: {}", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint())); requestContext.setErrorCode(QueryErrorCode.BROKER_RESOURCE_MISSING); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS, 1); return BrokerResponseNative.NO_TABLE_RESULT; @@ -627,7 +643,8 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn try { validateRequest(serverPinotQuery, _queryResponseLimit); } catch (Exception e) { - LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage()); + LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint()), e.getMessage()); requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); return new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, e.getMessage()); @@ -643,7 +660,7 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn // Attempt to add the query to the compile queue; drop if queue is full if (!_multistageCompileQueryQueue.offer(Pair.of(query, database))) { LOGGER.trace("Not compiling query `{}` using the multi-stage query engine because the query queue is full", - query); + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint())); } } @@ -750,7 +767,7 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn if (disabledTableNames != null) { for (String name : disabledTableNames) { String errorMessage = String.format("%s Table is disabled", name); - LOGGER.info("{}: {}", errorMessage, query); + LOGGER.info("{}: {}", errorMessage, _queryLogger.redactQuery(query, requestContext.getQueryFingerprint())); errorMsgs.add(new QueryProcessingException(QueryErrorCode.TABLE_IS_DISABLED, errorMessage)); } } @@ -779,7 +796,8 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn QueryProcessingException firstErrorMsg = errorMsgs.get(0); String logTail = errorMsgs.size() > 1 ? (errorMsgs.size()) + " errorMsgs found. Logging only the first one" : "1 exception found"; - LOGGER.info("No server found for request {}: {}. {} {}", requestId, query, logTail, firstErrorMsg); + LOGGER.info("No server found for request {}: {}. {} {}", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint()), logTail, firstErrorMsg); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NO_SERVER_FOUND_EXCEPTIONS, 1); return BrokerResponseNative.fromBrokerErrors(errorMsgs); } else { @@ -818,7 +836,8 @@ protected BrokerResponse doHandleRequest(long requestId, String query, SqlNodeAn } } catch (TimeoutException e) { String errorMessage = e.getMessage(); - LOGGER.info("{} {}: {}", errorMessage, requestId, query); + LOGGER.info("{} {}: {}", errorMessage, requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint())); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.REQUEST_TIMEOUT_BEFORE_SCATTERED_EXCEPTIONS, 1); errorMsgs.add(new QueryProcessingException(QueryErrorCode.BROKER_TIMEOUT, errorMessage)); return BrokerResponseNative.fromBrokerErrors(errorMsgs); @@ -1038,7 +1057,8 @@ private CompileResult compileRequest(long requestId, String query, SqlNodeAndOpt try { pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions); } catch (Exception e) { - LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", requestId, query, e.getMessage()); + LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint()), e.getMessage()); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1); requestContext.setErrorCode(QueryErrorCode.SQL_PARSING); // Check if the query is a v2 supported query @@ -1070,7 +1090,8 @@ private CompileResult compileRequest(long requestId, String query, SqlNodeAndOpt } if (isLiteralOnlyQuery(pinotQuery)) { - LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query); + LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint())); try { if (pinotQuery.isExplain()) { // EXPLAIN PLAN results to show that query is evaluated exclusively by Broker. @@ -1080,25 +1101,28 @@ private CompileResult compileRequest(long requestId, String query, SqlNodeAndOpt } catch (Exception e) { // TODO: refine the exceptions here to early termination the queries won't requires to send to servers. LOGGER.warn("Unable to execute literal request {}: {} at broker, fallback to server query. {}", requestId, - query, e.getMessage()); + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint()), e.getMessage()); } } PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery); DataSource dataSource = serverPinotQuery.getDataSource(); if (dataSource == null) { - LOGGER.info("Data source (FROM clause) not found in request {}: {}", requestId, query); + LOGGER.info("Data source (FROM clause) not found in request {}: {}", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint())); requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); return new CompileResult(new BrokerResponseNative( QueryErrorCode.QUERY_VALIDATION, "Data source (FROM clause) not found")); } if (dataSource.getJoin() != null) { - LOGGER.info("JOIN is not supported in request {}: {}", requestId, query); + LOGGER.info("JOIN is not supported in request {}: {}", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint())); requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); return new CompileResult(new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "JOIN is not supported")); } if (dataSource.getTableName() == null) { - LOGGER.info("Table name not found in request {}: {}", requestId, query); + LOGGER.info("Table name not found in request {}: {}", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint())); requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); return new CompileResult(new BrokerResponseNative(QueryErrorCode.QUERY_VALIDATION, "Table name not found")); } @@ -1107,8 +1131,8 @@ private CompileResult compileRequest(long requestId, String query, SqlNodeAndOpt handleSubquery(serverPinotQuery, requestId, request, requesterIdentity, requestContext, httpHeaders, accessControl); } catch (Exception e) { - LOGGER.info("Caught exception while handling the subquery in request {}: {}, {}", requestId, query, - e.getMessage()); + LOGGER.info("Caught exception while handling the subquery in request {}: {}, {}", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint()), e.getMessage()); requestContext.setErrorCode(QueryErrorCode.QUERY_EXECUTION); return new CompileResult( new BrokerResponseNative(QueryErrorCode.QUERY_EXECUTION, e.getMessage())); @@ -1121,7 +1145,8 @@ private CompileResult compileRequest(long requestId, String query, SqlNodeAndOpt getActualTableName(DatabaseUtils.translateTableName(dataSource.getTableName(), httpHeaders, ignoreCase), _tableCache); } catch (DatabaseConflictException e) { - LOGGER.info("{}. Request {}: {}", e.getMessage(), requestId, query); + LOGGER.info("{}. Request {}: {}", e.getMessage(), requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint())); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); requestContext.setErrorCode(QueryErrorCode.QUERY_VALIDATION); return new CompileResult( @@ -1146,15 +1171,15 @@ private CompileResult compileRequest(long requestId, String query, SqlNodeAndOpt } catch (Exception e) { // Throw exceptions with column in-existence error. if (e instanceof BadQueryRequestException) { - LOGGER.info("Caught exception while checking column names in request {}: {}, {}", requestId, query, - e.getMessage()); + LOGGER.info("Caught exception while checking column names in request {}: {}, {}", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint()), e.getMessage()); requestContext.setErrorCode(QueryErrorCode.UNKNOWN_COLUMN); _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1); return new CompileResult( new BrokerResponseNative(QueryErrorCode.UNKNOWN_COLUMN, e.getMessage())); } - LOGGER.warn("Caught exception while updating column names in request {}: {}, {}", requestId, query, - e.getMessage()); + LOGGER.warn("Caught exception while updating column names in request {}: {}, {}", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint()), e.getMessage()); } if (_defaultHllLog2m > 0) { @@ -1353,7 +1378,8 @@ private void applyRlsFilters(long requestId, String query, PinotQuery serverPino private void throwAccessDeniedError(long requestId, String query, RequestContext requestContext, String tableName, AuthorizationResult authorizationResult) { _brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); - LOGGER.info("Access denied for request {}: {}, table: {}, reason :{}", requestId, query, tableName, + LOGGER.info("Access denied for request {}: {}, table: {}, reason :{}", requestId, + _queryLogger.redactQuery(query, requestContext.getQueryFingerprint()), tableName, authorizationResult.getFailureMessage()); requestContext.setErrorCode(QueryErrorCode.ACCESS_DENIED); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 25c268a6bac8..7462c85e5a42 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -205,9 +205,16 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId _config.containsKey(CommonConstants.Broker.CONFIG_OF_BROKER_MSE_PLANNER_DISABLED_RULES) ? Set.copyOf( _config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_MSE_PLANNER_DISABLED_RULES, List.of())) : CommonConstants.Broker.DEFAULT_DISABLED_RULES; - _enableQueryFingerprinting = _config.getProperty( + boolean fingerprintingConfigured = _config.getProperty( CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_FINGERPRINTING, CommonConstants.Broker.DEFAULT_BROKER_ENABLE_QUERY_FINGERPRINTING); + boolean redactionNeedsFingerprinting = + _queryLogger.getSqlRedactionMode() == QueryLogger.SqlRedactionMode.LITERAL_VALUES; + if (redactionNeedsFingerprinting && !fingerprintingConfigured) { + LOGGER.warn("SQL redaction mode 'literal_values' requires query fingerprinting. " + + "Enabling query fingerprinting automatically."); + } + _enableQueryFingerprinting = fingerprintingConfigured || redactionNeedsFingerprinting; int streamingGroupByFlushThreshold = _config.getProperty( CommonConstants.Broker.CONFIG_OF_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD, CommonConstants.Broker.DEFAULT_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD); @@ -368,21 +375,23 @@ private void onFailedRequest(List exs) { protected BrokerResponse handleRequestThrowing(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, HttpHeaders httpHeaders) throws QueryException, WebApplicationException { - boolean queryWasLogged = _queryLogger.logQueryReceived(requestId, query); - + QueryFingerprint queryFingerprint = null; String queryHash = CommonConstants.Broker.DEFAULT_QUERY_HASH; if (_enableQueryFingerprinting) { try { - QueryFingerprint queryFingerprint = QueryFingerprintUtils.generateFingerprint(sqlNodeAndOptions); + queryFingerprint = QueryFingerprintUtils.generateFingerprint(sqlNodeAndOptions); if (queryFingerprint != null) { queryHash = queryFingerprint.getQueryHash(); requestContext.setQueryFingerprint(queryFingerprint); } } catch (Exception e) { - LOGGER.warn("Failed to generate query fingerprint for request {}: {}. {}", requestId, query, e.getMessage()); + LOGGER.warn("Failed to generate query fingerprint for request {}: {}. {}", requestId, + _queryLogger.redactQuery(query), e.getMessage()); } } + boolean queryWasLogged = _queryLogger.logQueryReceived(requestId, query, queryFingerprint); + String cid = extractClientRequestId(sqlNodeAndOptions); if (cid == null) { cid = Long.toString(requestId); @@ -660,14 +669,16 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI // these requests. if (!_queryThrottler.tryAcquire(estimatedNumQueryThreads, timer.getRemainingTimeMs(), TimeUnit.MILLISECONDS)) { - LOGGER.warn("Timed out waiting to execute request {}: {}", requestId, query); + LOGGER.warn("Timed out waiting to execute request {}: {}", requestId, + _queryLogger.redactQuery(query.getTextQuery(), requestContext.getQueryFingerprint())); requestContext.setErrorCode(QueryErrorCode.EXECUTION_TIMEOUT); return new BrokerResponseNative(QueryErrorCode.EXECUTION_TIMEOUT); } _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.ESTIMATED_MSE_SERVER_THREADS, _queryThrottler.currentQueryServerThreads()); } catch (InterruptedException e) { - LOGGER.warn("Interrupt received while waiting to execute request {}: {}", requestId, query); + LOGGER.warn("Interrupt received while waiting to execute request {}: {}", requestId, + _queryLogger.redactQuery(query.getTextQuery(), requestContext.getQueryFingerprint())); requestContext.setErrorCode(QueryErrorCode.EXECUTION_TIMEOUT); return new BrokerResponseNative(QueryErrorCode.EXECUTION_TIMEOUT); } @@ -691,7 +702,8 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI } catch (Throwable t) { QueryErrorCode queryErrorCode = QueryErrorCode.QUERY_EXECUTION; String consolidatedMessage = ExceptionUtils.consolidateExceptionTraces(t); - LOGGER.error("Caught exception reducing all-leaf-empty request {}: {}, {}", requestId, query, + LOGGER.error("Caught exception reducing all-leaf-empty request {}: {}, {}", requestId, + _queryLogger.redactQuery(query.getTextQuery(), requestContext.getQueryFingerprint()), consolidatedMessage); requestContext.setErrorCode(queryErrorCode); return new BrokerResponseNative(queryErrorCode, consolidatedMessage); @@ -713,7 +725,9 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI } catch (Throwable t) { QueryErrorCode queryErrorCode = QueryErrorCode.QUERY_EXECUTION; String consolidatedMessage = ExceptionUtils.consolidateExceptionTraces(t); - LOGGER.error("Caught exception executing request {}: {}, {}", requestId, query, consolidatedMessage); + LOGGER.error("Caught exception executing request {}: {}, {}", requestId, + _queryLogger.redactQuery(query.getTextQuery(), requestContext.getQueryFingerprint()), + consolidatedMessage); requestContext.setErrorCode(queryErrorCode); return new BrokerResponseNative(queryErrorCode, consolidatedMessage); } finally { @@ -733,7 +747,8 @@ private BrokerResponse query(QueryEnvironment.CompiledQuery query, long requestI for (String table : tableNames) { _brokerMetrics.addMeteredTableValue(table, BrokerMeter.BROKER_RESPONSES_WITH_TIMEOUTS, 1); } - LOGGER.warn("Timed out executing request {}: {}", requestId, query); + LOGGER.warn("Timed out executing request {}: {}", requestId, + _queryLogger.redactQuery(query.getTextQuery(), requestContext.getQueryFingerprint())); } requestContext.setErrorCode(errorCode); } else { @@ -839,17 +854,17 @@ private E callAsync(long requestId, String query, Callable queryPlannerRe return queryPlanResultFuture.get(timer.getRemainingTimeMs(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { String errorMsg = "Timed out while planning query"; - LOGGER.warn(errorMsg + " {}", query, e); + LOGGER.warn(errorMsg + " {}", _queryLogger.redactQuery(query), e); queryPlanResultFuture.cancel(true); throw QueryErrorCode.BROKER_TIMEOUT.asException(errorMsg); } catch (InterruptedException e) { - LOGGER.warn("Interrupt received while planning query {}: {}", requestId, query); + LOGGER.warn("Interrupt received while planning query {}: {}", requestId, _queryLogger.redactQuery(query)); throw QueryErrorCode.INTERNAL.asException("Interrupted while planning query"); } catch (ExecutionException e) { if (e.getCause() instanceof QueryException) { throw (QueryException) e.getCause(); } else { - LOGGER.warn("Error while planning query {}: {}", query, e.getCause()); + LOGGER.warn("Error while planning query {}: {}", _queryLogger.redactQuery(query), e.getCause()); throw QueryErrorCode.INTERNAL.asException("Error while planning query", e.getCause()); } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java index 463ada97f22d..1900b039fc29 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java @@ -43,6 +43,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.apache.pinot.broker.querylog.QueryLogger.SqlRedactionMode; import static org.mockito.MockitoAnnotations.openMocks; @@ -98,7 +99,8 @@ public void tearDown() public void shouldFormatLogLineProperly() { // Given: QueryLogger.QueryLogParams params = generateParams(false, false, 0, 456, null); - QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, _logger, _droppedRateLimiter); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.NONE, _logger, _droppedRateLimiter); // When: queryLogger.logQueryCompleted(params, true); @@ -138,7 +140,8 @@ public void shouldFormatLogLineProperly() { public void shouldOmitClientId() { // Given: QueryLogger.QueryLogParams params = generateParams(false, false, 0, 456, null); - QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, false, true, _logger, _droppedRateLimiter); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, false, true, + SqlRedactionMode.NONE, _logger, _droppedRateLimiter); // When: queryLogger.logQueryCompleted(params, true); @@ -153,7 +156,8 @@ public void shouldOmitClientId() { public void shouldNotLogCompletionWhenWasLoggedFalseAndNoForceLog() { // Given: wasLogged=false and no force-log conditions (no exceptions, not slow) QueryLogger.QueryLogParams params = generateParams(false, false, 0, 456, null); - QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, _logger, _droppedRateLimiter); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.NONE, _logger, _droppedRateLimiter); // When: queryLogger.logQueryCompleted(params, false); @@ -166,7 +170,8 @@ public void shouldNotLogCompletionWhenWasLoggedFalseAndNoForceLog() { public void shouldForceLogWhenNumGroupsLimitIsReached() { // Given: wasLogged=false but numGroupsLimitReached (force-log condition) QueryLogger.QueryLogParams params = generateParams(true, true, 0, 456, null); - QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, _logger, _droppedRateLimiter); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.NONE, _logger, _droppedRateLimiter); // When: queryLogger.logQueryCompleted(params, false); @@ -179,7 +184,8 @@ public void shouldForceLogWhenNumGroupsLimitIsReached() { public void shouldForceLogWhenExceptionsExist() { // Given: wasLogged=false but exceptions exist (force-log condition) QueryLogger.QueryLogParams params = generateParams(false, false, 1, 456, null); - QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, _logger, _droppedRateLimiter); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.NONE, _logger, _droppedRateLimiter); // When: queryLogger.logQueryCompleted(params, false); @@ -192,7 +198,8 @@ public void shouldForceLogWhenExceptionsExist() { public void shouldForceLogWhenTimeIsMoreThanOneSecond() { // Given: wasLogged=false but query took >1s (force-log condition) QueryLogger.QueryLogParams params = generateParams(false, false, 0, 1456, null); - QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, _logger, _droppedRateLimiter); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.NONE, _logger, _droppedRateLimiter); // When: queryLogger.logQueryCompleted(params, false); @@ -205,10 +212,11 @@ public void shouldForceLogWhenTimeIsMoreThanOneSecond() { public void shouldLogQueryReceivedWhenAllowed() { // Given: rate limiter allows Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(true); - QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, _logger, _droppedRateLimiter); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.NONE, _logger, _droppedRateLimiter); // When: - boolean wasLogged = queryLogger.logQueryReceived(123L, "SELECT * FROM foo"); + boolean wasLogged = queryLogger.logQueryReceived(123L, "SELECT * FROM foo", null); // Then: Assert.assertTrue(wasLogged); @@ -220,10 +228,11 @@ public void shouldLogQueryReceivedWhenAllowed() { public void shouldNotLogQueryReceivedWhenRateLimited() { // Given: rate limiter denies Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(false); - QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, _logger, _droppedRateLimiter); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.NONE, _logger, _droppedRateLimiter); // When: - boolean wasLogged = queryLogger.logQueryReceived(123L, "SELECT * FROM foo"); + boolean wasLogged = queryLogger.logQueryReceived(123L, "SELECT * FROM foo", null); // Then: Assert.assertFalse(wasLogged); @@ -234,10 +243,11 @@ public void shouldNotLogQueryReceivedWhenRateLimited() { public void shouldReturnTrueButNotLogWhenLogBeforeProcessingIsDisabled() { // Given: rate limiter allows, but logBeforeProcessing=false Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(true); - QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, false, _logger, _droppedRateLimiter); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, false, + SqlRedactionMode.NONE, _logger, _droppedRateLimiter); // When: - boolean wasLogged = queryLogger.logQueryReceived(123L, "SELECT * FROM foo"); + boolean wasLogged = queryLogger.logQueryReceived(123L, "SELECT * FROM foo", null); // Then: returns true because rate limiter allowed, but no log because logBeforeProcessing=false Assert.assertTrue(wasLogged); @@ -248,7 +258,8 @@ public void shouldReturnTrueButNotLogWhenLogBeforeProcessingIsDisabled() { public void shouldLogCompletionWhenWasLoggedIsTrue() { // Given: QueryLogger.QueryLogParams params = generateParams(false, false, 0, 456); - QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, _logger, _droppedRateLimiter); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.NONE, _logger, _droppedRateLimiter); // When: queryLogger.logQueryCompleted(params, true); @@ -278,22 +289,25 @@ public void shouldHandleRaceConditionsWithDroppedQueries() return true; }); - QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, _logger, _droppedRateLimiter); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.NONE, _logger, _droppedRateLimiter); ExecutorService executorService = Executors.newSingleThreadExecutor(); // When: try { - Assert.assertFalse(queryLogger.logQueryReceived(123, "SELECT * FROM foo")); // 1 this one gets dropped + Assert.assertFalse(queryLogger.logQueryReceived(123, "SELECT * FROM foo", null)); // 1 this one gets dropped // 2 this one succeeds, but blocks when it checks whether to log the dropped count Future blockedLogger = - executorService.submit(() -> queryLogger.logQueryReceived(123, "SELECT * FROM foo")); + executorService.submit(() -> queryLogger.logQueryReceived(123, "SELECT * FROM foo", null)); Assert.assertTrue(firstDroppedLogAttempted.await(5, TimeUnit.SECONDS), "expected the first successful log to reach the dropped-log rate limiter"); - Assert.assertFalse(queryLogger.logQueryReceived(123, "SELECT * FROM foo")); // 3 this one gets dropped - Assert.assertTrue(queryLogger.logQueryReceived(123, "SELECT * FROM foo")); // 4 this one drains the dropped count + // 3 this one gets dropped + Assert.assertFalse(queryLogger.logQueryReceived(123, "SELECT * FROM foo", null)); + // 4 this one drains the dropped count + Assert.assertTrue(queryLogger.logQueryReceived(123, "SELECT * FROM foo", null)); releaseFirstDroppedLogAttempt.countDown(); Assert.assertTrue(blockedLogger.get(5, TimeUnit.SECONDS)); @@ -313,7 +327,8 @@ public void shouldEmitQueryHashWhenSet() { // Given: QueryLogger.QueryLogParams params = generateParams(false, false, 0, 456, new QueryFingerprint("abc", "SELECT * FROM foo")); - QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, _logger, _droppedRateLimiter); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.NONE, _logger, _droppedRateLimiter); // When: queryLogger.logQueryCompleted(params, true); @@ -329,7 +344,8 @@ public void shouldEmitQueryHashWhenSet() { public void shouldEmitEmptyQueryHashWhenNotSet() { // Given: QueryLogger.QueryLogParams params = generateParams(false, false, 0, 456, null); - QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, _logger, _droppedRateLimiter); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.NONE, _logger, _droppedRateLimiter); // When: queryLogger.logQueryCompleted(params, true); @@ -341,6 +357,117 @@ public void shouldEmitEmptyQueryHashWhenNotSet() { "Expected empty queryHash field. Got: " + logLine); } + @Test + public void shouldRedactQueryInLogQueryReceivedWhenEnabled() { + // Given: + Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(true); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.LITERAL_VALUES, _logger, _droppedRateLimiter); + QueryFingerprint fingerprint = new QueryFingerprint("abc", "SELECT * FROM foo WHERE id = ?"); + + // When: + boolean wasLogged = queryLogger.logQueryReceived(123L, "SELECT * FROM foo WHERE id = 42", fingerprint); + + // Then: + Assert.assertTrue(wasLogged); + Assert.assertEquals(_infoLog.size(), 1); + Assert.assertTrue(_infoLog.get(0).contains("SELECT * FROM foo WHERE id = ?"), + "Expected redacted query. Got: " + _infoLog.get(0)); + Assert.assertFalse(_infoLog.get(0).contains("42"), + "Raw literal should not appear. Got: " + _infoLog.get(0)); + } + + @Test + public void shouldLogSentinelInReceivedWhenFingerprintNullAndRedactionEnabled() { + // Given: + Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(true); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.LITERAL_VALUES, _logger, _droppedRateLimiter); + + // When: + boolean wasLogged = queryLogger.logQueryReceived(123L, "SELECT * FROM foo WHERE id = 42", null); + + // Then: + Assert.assertTrue(wasLogged); + Assert.assertEquals(_infoLog.size(), 1); + Assert.assertTrue(_infoLog.get(0).contains("FINGERPRINT_FAILED_QUERY_REDACTED"), + "Expected sentinel. Got: " + _infoLog.get(0)); + Assert.assertFalse(_infoLog.get(0).contains("42"), + "Raw literal should not appear. Got: " + _infoLog.get(0)); + } + + @Test + public void shouldRedactQueryInLogQueryCompletedWhenEnabled() { + // Given: + QueryFingerprint fingerprint = new QueryFingerprint("abc", "SELECT * FROM foo WHERE id = ?"); + QueryLogger.QueryLogParams params = generateParams(false, false, 0, 456, fingerprint); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.LITERAL_VALUES, _logger, _droppedRateLimiter); + + // When: + queryLogger.logQueryCompleted(params, true); + + // Then: + Assert.assertEquals(_infoLog.size(), 1); + String logLine = _infoLog.get(0); + Assert.assertTrue(logLine.contains("query=SELECT * FROM foo WHERE id = ?"), + "Expected redacted query in completion log. Got: " + logLine); + } + + @Test + public void shouldLogSentinelInCompletedWhenFingerprintNullAndRedactionEnabled() { + // Given: + QueryLogger.QueryLogParams params = generateParams(false, false, 0, 456, null); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.LITERAL_VALUES, _logger, _droppedRateLimiter); + + // When: + queryLogger.logQueryCompleted(params, true); + + // Then: + Assert.assertEquals(_infoLog.size(), 1); + String logLine = _infoLog.get(0); + Assert.assertTrue(logLine.contains("query=FINGERPRINT_FAILED_QUERY_REDACTED"), + "Expected sentinel in completion log. Got: " + logLine); + } + + @Test + public void shouldFullyRedactQueryInLogQueryReceived() { + // Given: + Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(true); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.FULL, _logger, _droppedRateLimiter); + + // When: + queryLogger.logQueryReceived(123L, "SELECT * FROM foo WHERE id = 42", null); + + // Then: + Assert.assertEquals(_infoLog.size(), 1); + Assert.assertTrue(_infoLog.get(0).contains("REDACTED"), + "Expected REDACTED. Got: " + _infoLog.get(0)); + Assert.assertFalse(_infoLog.get(0).contains("foo"), + "No part of SQL should appear. Got: " + _infoLog.get(0)); + } + + @Test + public void shouldFullyRedactQueryInLogQueryCompleted() { + // Given: + QueryLogger.QueryLogParams params = generateParams(false, false, 0, 456, null); + QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, true, + SqlRedactionMode.FULL, _logger, _droppedRateLimiter); + + // When: + queryLogger.logQueryCompleted(params, true); + + // Then: + Assert.assertEquals(_infoLog.size(), 1); + String logLine = _infoLog.get(0); + Assert.assertTrue(logLine.contains("query=REDACTED"), + "Expected fully redacted query. Got: " + logLine); + Assert.assertFalse(logLine.contains("SELECT"), + "No SQL should appear. Got: " + logLine); + } + private QueryLogger.QueryLogParams generateParams(boolean numGroupsLimitReached, boolean numGroupsWarningLimitReached, int numExceptions, long timeUsedMs) { return generateParams(numGroupsLimitReached, numGroupsWarningLimitReached, numExceptions, timeUsedMs, null); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 840dd97db515..147d49073c4f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -373,6 +373,9 @@ public static class Broker { public static final String CONFIG_OF_BROKER_QUERY_LOG_BEFORE_PROCESSING = "pinot.broker.query.log.logBeforeProcessing"; public static final boolean DEFAULT_BROKER_QUERY_LOG_BEFORE_PROCESSING = true; + public static final String CONFIG_OF_BROKER_QUERY_LOG_SQL_REDACTION = + "pinot.broker.query.log.sqlRedaction"; + public static final String DEFAULT_BROKER_QUERY_LOG_SQL_REDACTION = "none"; public static final String CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING = "pinot.broker.query.enable.null.handling"; /** * When true, the broker initializes the materialized view metadata cache and query rewrite