Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 88 additions & 6 deletions superset/datasource/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@
from superset.extensions import cache_manager
from superset.superset_typing import FlaskResponse
from superset.utils import json
from superset.utils.core import apply_max_row_limit, DatasourceType, SqlExpressionType
from superset.utils.core import (
apply_max_row_limit,
DatasourceType,
parse_boolean_string,
SqlExpressionType,
)
from superset.views.base_api import BaseSupersetApi, statsd_metrics

logger = logging.getLogger(__name__)
Expand All @@ -51,8 +56,9 @@ class DatasourceRestApi(BaseSupersetApi):
@safe
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".get_column_values",
action=lambda self, *args, **kwargs: (
f"{self.__class__.__name__}.get_column_values"
),
log_to_statsd=False,
)
def get_column_values(
Expand Down Expand Up @@ -124,13 +130,63 @@ def get_column_values(

row_limit = apply_max_row_limit(app.config["FILTER_SELECT_ROW_LIMIT"])
denormalize_column = not datasource.normalize_columns

# Cache distinct column-value results so a dashboard with many filters
# backed by the same (often heavy) virtual dataset doesn't re-execute
# the wrapping query per filter (#39342).
#
# Key fields:
# - ``rls`` — full RLS fingerprint via
# ``security_manager.get_rls_cache_key`` (the canonical helper used
# by viz.py and query_context_processor.py). This is the sole
# security-isolation field — two users with identical effective
# RLS share a cache entry (intentional: they would see identical
# filtered values anyway), while users with different RLS, guest
# sessions with different guest-token RLS, and anonymous sessions
# with no RLS each get their own partition. We deliberately do
# NOT include the raw user id; doing so would defeat the
# intended cross-user cache sharing without adding any real
# security boundary beyond what the RLS fingerprint already
# provides.
# - ``changed_on`` — auto-busts cached entries when the dataset's
# underlying SQL is edited.
# - ``uid`` / ``col`` / ``limit`` / ``denorm`` — basic query-shape
# isolation so different inputs never collide.
force = parse_boolean_string(request.args.get("force"))
cache_key = (
"col_values:"
+ hashlib.sha256(
json.dumps(
{
"uid": datasource.uid,
"col": column_name,
"limit": row_limit,
"denorm": denormalize_column,
"rls": security_manager.get_rls_cache_key(datasource),
"changed_on": str(getattr(datasource, "changed_on", "")),
Comment on lines +163 to +166
},
Comment thread
Abdulrehman-PIAIC80387 marked this conversation as resolved.
sort_keys=True,
).encode()
).hexdigest()
)

if (
not force
and (cached := cache_manager.data_cache.get(cache_key)) is not None
):
logger.debug(
"column-values cache HIT: uid=%s col=%s", datasource.uid, column_name
)
response = self.response(200, result=cached)
response.headers["X-Cache-Status"] = "HIT"
return response

try:
payload = datasource.values_for_column(
column_name=column_name,
limit=row_limit,
denormalize_column=denormalize_column,
)
Comment thread
Abdulrehman-PIAIC80387 marked this conversation as resolved.
return self.response(200, result=payload)
except KeyError:
return self.response(
400, message=f"Column name {column_name} does not exist"
Expand All @@ -144,6 +200,31 @@ def get_column_values(
),
)

# Warn before caching very large payloads (high-cardinality columns)
# so operators can spot cache-memory pressure before Redis OOMs.
# Threshold is operator-tunable; defaults to 100k rows.
warn_threshold = app.config.get("FILTER_VALUES_CACHE_WARN_THRESHOLD", 100_000)
if (payload_size := len(payload)) > warn_threshold:
logger.warning(
"column-values payload exceeds cache-warn threshold: "
"uid=%s col=%s rows=%d threshold=%d",
datasource.uid,
column_name,
payload_size,
warn_threshold,
)

timeout = datasource.cache_timeout or app.config.get(
"CACHE_DEFAULT_TIMEOUT", 300
)
cache_manager.data_cache.set(cache_key, payload, timeout=timeout)
logger.debug(
"column-values cache MISS: uid=%s col=%s", datasource.uid, column_name
)
response = self.response(200, result=payload)
response.headers["X-Cache-Status"] = "MISS"
return response

@expose(
"/<datasource_type>/<int:datasource_id>/validate_expression/",
methods=("POST",),
Expand All @@ -152,8 +233,9 @@ def get_column_values(
@safe
@statsd_metrics
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}"
f".validate_expression",
action=lambda self, *args, **kwargs: (
f"{self.__class__.__name__}.validate_expression"
),
log_to_statsd=False,
)
def validate_expression(
Expand Down
127 changes: 127 additions & 0 deletions tests/integration_tests/datasource/api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

from datetime import datetime
from unittest.mock import ANY, patch

import pytest
Expand All @@ -23,12 +24,21 @@
from superset import db, security_manager
from superset.connectors.sqla.models import SqlaTable
from superset.daos.exceptions import DatasourceTypeNotSupportedError
from superset.extensions import cache_manager
from superset.utils import json
Comment on lines 24 to 28
from tests.integration_tests.base_tests import SupersetTestCase
from tests.integration_tests.constants import ADMIN_USERNAME, GAMMA_USERNAME


class TestDatasourceApi(SupersetTestCase):
def setUp(self):
# Clear the column-values cache before every test so that
# ``get_column_values`` always re-runs ``values_for_column`` rather
# than returning a payload populated by a previous test. Prevents
# order-dependent flakes now that the endpoint caches its result.
super().setUp()
cache_manager.data_cache.clear()

def get_virtual_dataset(self):
return (
db.session.query(SqlaTable)
Expand Down Expand Up @@ -205,6 +215,123 @@ def test_get_column_values_with_rls_no_values(self):
response = json.loads(rv.data.decode("utf-8"))
assert response["result"] == []

@pytest.mark.usefixtures("app_context", "virtual_dataset")
@patch("superset.models.helpers.ExploreMixin.values_for_column")
def test_get_column_values_cache_hit_skips_query(self, values_for_column_mock):
"""Regression test for #39342.

Two identical requests for the same column on the same datasource
should hit ``values_for_column`` exactly once — the second request
returns the cached payload.
"""
cache_manager.data_cache.clear()
values_for_column_mock.return_value = ["a", "b", "c"]
self.login(ADMIN_USERNAME)
table = self.get_virtual_dataset()
url = f"api/v1/datasource/table/{table.id}/column/col2/values/"

rv1 = self.client.get(url)
rv2 = self.client.get(url)

assert rv1.status_code == 200
assert rv2.status_code == 200
assert json.loads(rv2.data.decode("utf-8"))["result"] == ["a", "b", "c"]
assert values_for_column_mock.call_count == 1

@pytest.mark.usefixtures("app_context", "virtual_dataset")
@patch("superset.models.helpers.ExploreMixin.values_for_column")
def test_get_column_values_force_bypasses_cache(self, values_for_column_mock):
"""``?force=true`` should bypass the cache and re-query the source."""
cache_manager.data_cache.clear()
values_for_column_mock.return_value = ["a", "b"]
self.login(ADMIN_USERNAME)
table = self.get_virtual_dataset()
url = f"api/v1/datasource/table/{table.id}/column/col2/values/"

self.client.get(url)
self.client.get(f"{url}?force=true")

assert values_for_column_mock.call_count == 2

@pytest.mark.usefixtures("app_context", "virtual_dataset")
@patch("superset.models.helpers.ExploreMixin.values_for_column")
def test_get_column_values_cache_isolated_per_column(self, values_for_column_mock):
"""Different columns on the same datasource must not share a cache
entry — otherwise filter values would be silently swapped."""
cache_manager.data_cache.clear()
values_for_column_mock.return_value = ["x"]
self.login(ADMIN_USERNAME)
table = self.get_virtual_dataset()

self.client.get(f"api/v1/datasource/table/{table.id}/column/col1/values/")
self.client.get(f"api/v1/datasource/table/{table.id}/column/col2/values/")

assert values_for_column_mock.call_count == 2

@pytest.mark.usefixtures("app_context", "virtual_dataset")
@patch("superset.models.helpers.ExploreMixin.values_for_column")
def test_get_column_values_cache_busts_on_changed_on(self, values_for_column_mock):
"""Editing the underlying virtual dataset SQL bumps ``changed_on``,
which is part of the cache key — so the next request must miss the
cache and re-run the query."""
cache_manager.data_cache.clear()
values_for_column_mock.return_value = ["v"]
self.login(ADMIN_USERNAME)
table = self.get_virtual_dataset()
url = f"api/v1/datasource/table/{table.id}/column/col2/values/"

self.client.get(url)
# Simulate an edit to the dataset; ``changed_on`` is what the cache
# key reads, so any new value forces a miss.
table.changed_on = datetime(2030, 1, 1)
db.session.flush()
self.client.get(url)

assert values_for_column_mock.call_count == 2

@pytest.mark.usefixtures("app_context", "virtual_dataset")
@patch("superset.datasource.api.security_manager.get_rls_cache_key")
@patch("superset.models.helpers.ExploreMixin.values_for_column")
def test_get_column_values_cache_isolated_per_rls_context(
self, values_for_column_mock, get_rls_cache_key_mock
):
"""RLS safety for guest/embedded sessions. ``get_user_id()`` returns
``None`` for guest users, so two embedded dashboards with different
guest-token RLS would otherwise collide on ``user=None``. Including
the RLS fingerprint in the cache key keeps them separate."""
cache_manager.data_cache.clear()
values_for_column_mock.return_value = ["v"]
self.login(ADMIN_USERNAME)
table = self.get_virtual_dataset()
url = f"api/v1/datasource/table/{table.id}/column/col2/values/"

get_rls_cache_key_mock.return_value = ["dept='A'"]
self.client.get(url)
get_rls_cache_key_mock.return_value = ["dept='B'"]
self.client.get(url)

assert values_for_column_mock.call_count == 2

@pytest.mark.usefixtures("app_context", "virtual_dataset")
@patch("superset.models.helpers.ExploreMixin.values_for_column")
def test_get_column_values_response_advertises_cache_status(
self, values_for_column_mock
):
"""The ``X-Cache-Status`` response header should advertise MISS on
the populating request and HIT on the next identical request, so
operators can debug cache behavior from logs or browser devtools."""
cache_manager.data_cache.clear()
values_for_column_mock.return_value = ["v"]
self.login(ADMIN_USERNAME)
table = self.get_virtual_dataset()
url = f"api/v1/datasource/table/{table.id}/column/col2/values/"

rv_miss = self.client.get(url)
rv_hit = self.client.get(url)

assert rv_miss.headers.get("X-Cache-Status") == "MISS"
assert rv_hit.headers.get("X-Cache-Status") == "HIT"

@patch("superset.datasource.api.security_manager.can_access")
@patch("superset.datasource.api.GetCombinedDatasourceListCommand.run")
def test_combined_list_invalid_order_column(
Expand Down
Loading