From 9586cc69e5ecd7238f321839e17327ea9e68a816 Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Wed, 18 Feb 2026 14:59:48 -0500 Subject: [PATCH 1/4] Set all vis stores to support csa field overrides --- temporal/fx.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/temporal/fx.go b/temporal/fx.go index 4cd2f9364d..176644533c 100644 --- a/temporal/fx.go +++ b/temporal/fx.go @@ -664,9 +664,7 @@ func ApplyClusterMetadataConfigProvider( } indexSearchAttributes := make(map[string]*persistencespb.IndexSearchAttributes) for _, ds := range visDataStores { - if ds.SQL != nil || ds.CustomDataStoreConfig != nil { - indexSearchAttributes[ds.GetIndexName()] = sadefs.GetDBIndexSearchAttributes(visCSAOverride) - } + indexSearchAttributes[ds.GetIndexName()] = sadefs.GetDBIndexSearchAttributes(visCSAOverride) } clusterMetadata := svc.ClusterMetadata From a5a6b3b4b96a0e156ff2a642ee69694390e3919d Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Wed, 25 Mar 2026 16:09:39 -0400 Subject: [PATCH 2/4] enable namespace mapper by default, fallback to cluster metadata per visibility store index --- .../visibility/store/query/converter_test.go | 12 ++- common/resource/fx.go | 5 +- common/searchattribute/mapper.go | 77 +++++++++++-------- common/searchattribute/mapper_test.go | 61 +++++++++++++++ common/searchattribute/test_provider.go | 6 +- 5 files changed, 122 insertions(+), 39 deletions(-) diff --git a/common/persistence/visibility/store/query/converter_test.go b/common/persistence/visibility/store/query/converter_test.go index 9347bedb28..de4e6e699c 100644 --- a/common/persistence/visibility/store/query/converter_test.go +++ b/common/persistence/visibility/store/query/converter_test.go @@ -23,6 +23,16 @@ const ( testNamespaceID = namespace.ID("test-namespace-id") ) +type identityMapper struct{} + +func (identityMapper) GetAlias(fieldName string, _ string) (string, error) { + return fieldName, nil +} + +func (identityMapper) GetFieldName(alias string, _ string) (string, error) { + return alias, nil +} + func TestWithSearchAttributeInterceptor(t *testing.T) { t.Parallel() r := require.New(t) @@ -1917,7 +1927,7 @@ func TestQueryConverter_ResolveSearchAttributeAlias(t *testing.T) { ) if tc.useNoopMapper { - queryConverter.saMapper = searchattribute.NewNoopMapper() + queryConverter.saMapper = identityMapper{} } fn, ft, err := queryConverter.resolveSearchAttributeAlias(tc.in) diff --git a/common/resource/fx.go b/common/resource/fx.go index f44359976b..a0b9a107bf 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -156,11 +156,14 @@ func SearchAttributeMapperProviderProvider( searchAttributeProvider searchattribute.Provider, persistenceConfig *config.Persistence, ) searchattribute.MapperProvider { + primaryVisibilityStoreConfig := persistenceConfig.GetVisibilityStoreConfig() + secondaryVisibilityStoreConfig := persistenceConfig.GetSecondaryVisibilityStoreConfig() return searchattribute.NewMapperProvider( saMapper, namespaceRegistry, searchAttributeProvider, - persistenceConfig.IsSQLVisibilityStore() || persistenceConfig.IsCustomVisibilityStore(), + primaryVisibilityStoreConfig.GetIndexName(), + secondaryVisibilityStoreConfig.GetIndexName(), ) } diff --git a/common/searchattribute/mapper.go b/common/searchattribute/mapper.go index 27ce0129b6..6c11642153 100644 --- a/common/searchattribute/mapper.go +++ b/common/searchattribute/mapper.go @@ -20,15 +20,13 @@ type ( GetFieldName(alias string, namespace string) (string, error) } - noopMapper struct{} - // This mapper is to be backwards compatible with versions before v1.20. // Users using standard visibility might have registered custom search attributes. // Those search attributes won't be searchable, as they weren't before version v1.20. // Thus, this mapper will allow those search attributes to be used without being alised. backCompMapper_v1_20 struct { - mapper Mapper - emptyStringNameTypeMap NameTypeMap + mapper Mapper + fallbackNameTypeMaps []NameTypeMap } MapperProvider interface { @@ -36,31 +34,21 @@ type ( } mapperProviderImpl struct { - customMapper Mapper - namespaceRegistry namespace.Registry - searchAttributesProvider Provider - enableMapperFromNamespace bool + customMapper Mapper + namespaceRegistry namespace.Registry + searchAttributesProvider Provider + fallbackIndexNames []string } ) -var _ Mapper = (*noopMapper)(nil) var _ Mapper = (*backCompMapper_v1_20)(nil) var _ Mapper = (*namespace.CustomSearchAttributesMapper)(nil) var _ MapperProvider = (*mapperProviderImpl)(nil) -func (m *noopMapper) GetAlias(fieldName string, _ string) (string, error) { - return fieldName, nil -} - -func (m *noopMapper) GetFieldName(alias string, _ string) (string, error) { - return alias, nil -} - func (m *backCompMapper_v1_20) GetAlias(fieldName string, namespaceName string) (string, error) { alias, firstErr := m.mapper.GetAlias(fieldName, namespaceName) if firstErr != nil { - _, err := m.emptyStringNameTypeMap.getType(fieldName, customCategory) - if err != nil { + if !m.isLegacyCustomSearchAttribute(fieldName) { return "", firstErr } // this is custom search attribute registered in pre-v1.20 @@ -72,8 +60,7 @@ func (m *backCompMapper_v1_20) GetAlias(fieldName string, namespaceName string) func (m *backCompMapper_v1_20) GetFieldName(alias string, namespaceName string) (string, error) { fieldName, firstErr := m.mapper.GetFieldName(alias, namespaceName) if firstErr != nil { - _, err := m.emptyStringNameTypeMap.getType(alias, customCategory) - if err != nil { + if !m.isLegacyCustomSearchAttribute(alias) { return "", firstErr } // this is custom search attribute registered in pre-v1.20 @@ -82,17 +69,26 @@ func (m *backCompMapper_v1_20) GetFieldName(alias string, namespaceName string) return fieldName, nil } +func (m *backCompMapper_v1_20) isLegacyCustomSearchAttribute(name string) bool { + for _, nameTypeMap := range m.fallbackNameTypeMaps { + if _, err := nameTypeMap.getType(name, customCategory); err == nil { + return true + } + } + return false +} + func NewMapperProvider( customMapper Mapper, namespaceRegistry namespace.Registry, searchAttributesProvider Provider, - enableMapperFromNamespace bool, + fallbackIndexNames ...string, ) MapperProvider { return &mapperProviderImpl{ - customMapper: customMapper, - namespaceRegistry: namespaceRegistry, - searchAttributesProvider: searchAttributesProvider, - enableMapperFromNamespace: enableMapperFromNamespace, + customMapper: customMapper, + namespaceRegistry: namespaceRegistry, + searchAttributesProvider: searchAttributesProvider, + fallbackIndexNames: fallbackIndexNames, } } @@ -100,21 +96,38 @@ func (m *mapperProviderImpl) GetMapper(nsName namespace.Name) (Mapper, error) { if m.customMapper != nil { return m.customMapper, nil } - if !m.enableMapperFromNamespace { - return &noopMapper{}, nil + if m.namespaceRegistry == nil { + return nil, nil } saMapper, err := m.namespaceRegistry.GetCustomSearchAttributesMapper(nsName) if err != nil { return nil, err } - // if there's an error, it returns an empty object, which is expected here - emptyStringNameTypeMap, _ := m.searchAttributesProvider.GetSearchAttributes("", false) + fallbackNameTypeMaps := make([]NameTypeMap, 0, len(m.fallbackIndexNames)+1) + for _, indexName := range uniqueFallbackIndexNames(m.fallbackIndexNames) { + // If there is an error, it returns an empty object, which is expected here. + nameTypeMap, _ := m.searchAttributesProvider.GetSearchAttributes(indexName, false) + fallbackNameTypeMaps = append(fallbackNameTypeMaps, nameTypeMap) + } return &backCompMapper_v1_20{ - mapper: &saMapper, - emptyStringNameTypeMap: emptyStringNameTypeMap, + mapper: &saMapper, + fallbackNameTypeMaps: fallbackNameTypeMaps, }, nil } +func uniqueFallbackIndexNames(indexNames []string) []string { + seen := map[string]struct{}{} + result := make([]string, 0, len(indexNames)+1) + for _, indexName := range append(indexNames, "") { + if _, ok := seen[indexName]; ok { + continue + } + seen[indexName] = struct{}{} + result = append(result, indexName) + } + return result +} + // AliasFields returns SearchAttributes struct where each custom search attribute name is replaced with alias. // If no replacement where made, it returns nil which means that original SearchAttributes struct should be used. func AliasFields( diff --git a/common/searchattribute/mapper_test.go b/common/searchattribute/mapper_test.go index 5367113be7..e46caabd52 100644 --- a/common/searchattribute/mapper_test.go +++ b/common/searchattribute/mapper_test.go @@ -5,7 +5,10 @@ import ( "github.com/stretchr/testify/require" commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" + "go.temporal.io/server/common/namespace" + "go.uber.org/mock/gomock" ) func Test_AliasFields(t *testing.T) { @@ -137,3 +140,61 @@ func Test_UnaliasFields(t *testing.T) { require.NoError(t, err) require.Equal(t, sa, sb, "when there is nothin to unalias should return received attributes") } + +type staticSearchAttributesProvider struct { + nameTypeMaps map[string]NameTypeMap +} + +func (s staticSearchAttributesProvider) GetSearchAttributes(indexName string, _ bool) (NameTypeMap, error) { + if nameTypeMap, ok := s.nameTypeMaps[indexName]; ok { + return nameTypeMap, nil + } + return NameTypeMap{}, nil +} + +func Test_BackCompMapperFallsBackToClusterMetadataFields(t *testing.T) { + mapper := &backCompMapper_v1_20{ + mapper: &TestMapper{}, + fallbackNameTypeMaps: []NameTypeMap{TestNameTypeMap()}, + } + + alias, err := mapper.GetAlias("Keyword02", "error-namespace") + require.NoError(t, err) + require.Equal(t, "Keyword02", alias) + + fieldName, err := mapper.GetFieldName("Keyword02", "error-namespace") + require.NoError(t, err) + require.Equal(t, "Keyword02", fieldName) +} + +func TestMapperProviderUsesConfiguredVisibilityIndexForBackCompatFallback(t *testing.T) { + controller := gomock.NewController(t) + nsRegistry := namespace.NewMockRegistry(controller) + nsRegistry.EXPECT(). + GetCustomSearchAttributesMapper(namespace.Name("test-namespace")). + Return(namespace.CustomSearchAttributesMapper{}, nil) + + mapperProvider := NewMapperProvider( + nil, + nsRegistry, + staticSearchAttributesProvider{ + nameTypeMaps: map[string]NameTypeMap{ + "test-visibility-index": NewNameTypeMap(map[string]enumspb.IndexedValueType{ + "Keyword41": enumspb.INDEXED_VALUE_TYPE_KEYWORD, + }), + }, + }, + "test-visibility-index", + ) + + mapper, err := mapperProvider.GetMapper(namespace.Name("test-namespace")) + require.NoError(t, err) + + alias, err := mapper.GetAlias("Keyword41", "error-namespace") + require.NoError(t, err) + require.Equal(t, "Keyword41", alias) + + fieldName, err := mapper.GetFieldName("Keyword41", "error-namespace") + require.NoError(t, err) + require.Equal(t, "Keyword41", fieldName) +} diff --git a/common/searchattribute/test_provider.go b/common/searchattribute/test_provider.go index 20a8a4749c..b5fbd45344 100644 --- a/common/searchattribute/test_provider.go +++ b/common/searchattribute/test_provider.go @@ -134,12 +134,8 @@ func (t *TestMapper) GetFieldName(alias string, namespace string) (string, error return "", serviceerror.NewInvalidArgument("unknown namespace") } -func NewNoopMapper() Mapper { - return &noopMapper{} -} - func NewTestMapperProvider(customMapper Mapper) MapperProvider { - return NewMapperProvider(customMapper, nil, NewTestProvider(), false) + return NewMapperProvider(customMapper, nil, NewTestProvider()) } func NewNameTypeMapStub(attributes map[string]enumspb.IndexedValueType) NameTypeMap { From b217dd623b7095abb13debc943eebe40f6768505 Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Fri, 27 Mar 2026 13:49:15 -0400 Subject: [PATCH 3/4] Refactor functional test base cluster metadata setup --- common/searchattribute/test_provider.go | 4 ++ tests/testcore/functional_test_base.go | 58 +++++++++++++++++++++---- tests/testcore/test_cluster.go | 45 ++++++++++--------- 3 files changed, 76 insertions(+), 31 deletions(-) diff --git a/common/searchattribute/test_provider.go b/common/searchattribute/test_provider.go index b5fbd45344..becb35ca40 100644 --- a/common/searchattribute/test_provider.go +++ b/common/searchattribute/test_provider.go @@ -61,6 +61,10 @@ func TestEsNameTypeMap() NameTypeMap { return NewNameTypeMap(csa) } +func TestSearchAttributesToRegister() map[string]enumspb.IndexedValueType { + return maps.Clone(esCustomSearchAttributes) +} + func TestEsNameTypeMapWithScheduleID() NameTypeMap { res := TestEsNameTypeMap() res.customSearchAttributes[sadefs.ScheduleID] = enumspb.INDEXED_VALUE_TYPE_KEYWORD diff --git a/tests/testcore/functional_test_base.go b/tests/testcore/functional_test_base.go index 5ca9061306..4ff2e9c7be 100644 --- a/tests/testcore/functional_test_base.go +++ b/tests/testcore/functional_test_base.go @@ -38,6 +38,7 @@ import ( "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/rpc" + "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/telemetry" "go.temporal.io/server/common/testing/historyrequire" "go.temporal.io/server/common/testing/protorequire" @@ -470,6 +471,7 @@ func (s *FunctionalTestBase) RegisterNamespace( ) (namespace.ID, error) { currentClusterName := s.testCluster.testBase.ClusterMetadata.GetCurrentClusterName() nsID := namespace.ID(uuid.NewString()) + expectedSearchAttributes := searchattribute.TestSearchAttributesToRegister() namespaceRequest := &persistence.CreateNamespaceRequest{ Namespace: &persistencespb.NamespaceDetail{ Info: &persistencespb.NamespaceInfo{ @@ -485,14 +487,6 @@ func (s *FunctionalTestBase) RegisterNamespace( VisibilityArchivalState: archivalState, VisibilityArchivalUri: visibilityArchivalURI, BadBinaries: &namespacepb.BadBinaries{Binaries: map[string]*namespacepb.BadBinaryInfo{}}, - CustomSearchAttributeAliases: map[string]string{ - "Bool01": "CustomBoolField", - "Datetime01": "CustomDatetimeField", - "Double01": "CustomDoubleField", - "Int01": "CustomIntField", - "Keyword01": "CustomKeywordField", - "Text01": "CustomTextField", - }, }, ReplicationConfig: &persistencespb.NamespaceReplicationConfig{ ActiveClusterName: currentClusterName, @@ -511,6 +505,54 @@ func (s *FunctionalTestBase) RegisterNamespace( return namespace.EmptyID, err } + namespaceCacheDeadline := time.Now().Add(5 * NamespaceCacheRefreshInterval) + ticker := time.NewTicker(NamespaceCacheRefreshInterval / 2) + defer ticker.Stop() + for { + _, describeErr := s.FrontendClient().DescribeNamespace(NewContext(), &workflowservice.DescribeNamespaceRequest{ + Namespace: nsName.String(), + }) + if describeErr == nil { + break + } + if time.Now().After(namespaceCacheDeadline) { + return namespace.EmptyID, fmt.Errorf("namespace cache did not refresh for %q before deadline", nsName.String()) + } + <-ticker.C + } + + _, err = s.OperatorClient().AddSearchAttributes(NewContext(), &operatorservice.AddSearchAttributesRequest{ + Namespace: nsName.String(), + SearchAttributes: expectedSearchAttributes, + }) + if err != nil { + return namespace.EmptyID, err + } + + namespaceCacheDeadline = time.Now().Add(5 * NamespaceCacheRefreshInterval) + for { + listResp, listErr := s.OperatorClient().ListSearchAttributes(NewContext(), &operatorservice.ListSearchAttributesRequest{ + Namespace: nsName.String(), + }) + if listErr == nil { + customAttrs := listResp.GetCustomAttributes() + allFound := true + for saName := range expectedSearchAttributes { + if _, ok := customAttrs[saName]; !ok { + allFound = false + break + } + } + if allFound { + break + } + } + if time.Now().After(namespaceCacheDeadline) { + return namespace.EmptyID, fmt.Errorf("search attributes were not ready for %q before deadline", nsName.String()) + } + <-ticker.C + } + s.Logger.Info("Register namespace succeeded", tag.WorkflowNamespace(nsName.String()), tag.WorkflowNamespaceID(nsID.String()), diff --git a/tests/testcore/test_cluster.go b/tests/testcore/test_cluster.go index b84d973d40..4888633c4a 100644 --- a/tests/testcore/test_cluster.go +++ b/tests/testcore/test_cluster.go @@ -35,12 +35,14 @@ import ( "go.temporal.io/server/common/metrics/metricstest" "go.temporal.io/server/common/namespace/nsreplication" "go.temporal.io/server/common/persistence" + persistenceclient "go.temporal.io/server/common/persistence/client" persistencetests "go.temporal.io/server/common/persistence/persistence-tests" + "go.temporal.io/server/common/persistence/serialization" esclient "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" "go.temporal.io/server/common/pprof" "go.temporal.io/server/common/primitives" + "go.temporal.io/server/common/resolver" "go.temporal.io/server/common/rpc/encryption" - "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/telemetry" "go.temporal.io/server/common/testing/freeport" "go.temporal.io/server/temporal" @@ -227,17 +229,15 @@ func newClusterWithPersistenceTestBaseFactory( testBase.ExecutionManager, logger, ) + var err error pConfig := testBase.DefaultTestCluster.Config() pConfig.NumHistoryShards = clusterConfig.HistoryConfig.NumHistoryShards var ( - indexName string - esClient esclient.Client - saTypeMap searchattribute.NameTypeMap + esClient esclient.Client ) if !UseSQLVisibility() { - saTypeMap = searchattribute.TestEsNameTypeMap() clusterConfig.ESConfig = &esclient.Config{ Indices: map[string]string{ esclient.VisibilityAppName: RandomizeStr("temporal_visibility_v1_test"), @@ -250,7 +250,7 @@ func newClusterWithPersistenceTestBaseFactory( DisableGzip: true, // lowers memory and CPU usage significantly in tests } - err := setupIndex(clusterConfig.ESConfig, logger) + err = setupIndex(clusterConfig.ESConfig, logger) if err != nil { return nil, err } @@ -259,18 +259,12 @@ func newClusterWithPersistenceTestBaseFactory( pConfig.DataStores[pConfig.VisibilityStore] = config.DataStore{ Elasticsearch: clusterConfig.ESConfig, } - indexName = clusterConfig.ESConfig.GetVisibilityIndex() esClient, err = esclient.NewClient(clusterConfig.ESConfig, nil, logger) if err != nil { return nil, err } } else { - saTypeMap = searchattribute.TestNameTypeMap() clusterConfig.ESConfig = nil - storeConfig := pConfig.DataStores[pConfig.VisibilityStore] - if storeConfig.SQL != nil { - indexName = storeConfig.SQL.DatabaseName - } } clusterInfoMap := make(map[string]cluster.ClusterInformation) @@ -290,19 +284,28 @@ func newClusterWithPersistenceTestBaseFactory( ClusterAddress: clusterInfo.RPCAddress, HttpAddress: clusterInfo.HTTPAddress, InitialFailoverVersion: clusterInfo.InitialFailoverVersion, - }}) + }}, + ) if err != nil { return nil, err } } clusterMetadataConfig.ClusterInformation = clusterInfoMap - // This will save custom test search attributes to cluster metadata. - // Actual Elasticsearch fields are created in setupIndex. - err := testBase.SearchAttributesManager.SaveSearchAttributes( - context.Background(), - indexName, - saTypeMap.Custom(), + cfg := &config.Config{ + Persistence: pConfig, + ClusterMetadata: clusterMetadataConfig, + Visibility: config.Visibility{}, + } + clusterMetadataConfig, pConfig, err = temporal.ApplyClusterMetadataConfigProvider( + logger, + cfg, + resolver.NewNoopResolver(), + persistenceclient.FactoryProvider, + testBase.AbstractDataStoreFactory, + testBase.VisibilityStoreFactory, + metrics.NoopMetricsHandler, + serialization.NewSerializer(), ) if err != nil { return nil, err @@ -431,10 +434,6 @@ func setupIndex(esConfig *esclient.Config, logger log.Logger) error { logger.Info("Index created.", tag.ESIndex(esConfig.GetVisibilityIndex())) logger.Info("Add custom search attributes for tests.") - _, err = esClient.PutMapping(ctx, esConfig.GetVisibilityIndex(), searchattribute.TestEsNameTypeMap().Custom()) - if err != nil { - return err - } if err := waitForYellowStatus(esClient, esConfig.GetVisibilityIndex()); err != nil { return err } From bf08ccd1912df58e6667a468baad5e3df6ad2aaf Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Fri, 27 Mar 2026 15:33:36 -0400 Subject: [PATCH 4/4] Fix test verifications --- tests/advanced_visibility_test.go | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/tests/advanced_visibility_test.go b/tests/advanced_visibility_test.go index a227706a29..bc0f83b21f 100644 --- a/tests/advanced_visibility_test.go +++ b/tests/advanced_visibility_test.go @@ -1635,23 +1635,13 @@ func (s *AdvancedVisibilitySuite) TestUpsertWorkflowExecution_InvalidKey() { WorkflowId: id, RunId: we.RunId, }) - if !testcore.UseSQLVisibility() { - s.ErrorContains(err, "BadSearchAttributes: search attribute INVALIDKEY is not defined") - s.EqualHistoryEvents(` - 1 WorkflowExecutionStarted - 2 WorkflowTaskScheduled - 3 WorkflowTaskStarted - 4 WorkflowTaskFailed {"Cause":23,"Failure":{"Message":"BadSearchAttributes: search attribute INVALIDKEY is not defined"}} - 5 WorkflowTaskScheduled`, historyEvents) - } else { - s.ErrorContains(err, fmt.Sprintf("BadSearchAttributes: Namespace %s has no mapping defined for search attribute INVALIDKEY", s.Namespace().String())) - s.EqualHistoryEvents(fmt.Sprintf(` + s.ErrorContains(err, fmt.Sprintf("BadSearchAttributes: Namespace %s has no mapping defined for search attribute INVALIDKEY", s.Namespace().String())) + s.EqualHistoryEvents(fmt.Sprintf(` 1 WorkflowExecutionStarted 2 WorkflowTaskScheduled 3 WorkflowTaskStarted 4 WorkflowTaskFailed {"Cause":23,"Failure":{"Message":"BadSearchAttributes: Namespace %s has no mapping defined for search attribute INVALIDKEY"}} 5 WorkflowTaskScheduled`, s.Namespace().String()), historyEvents) - } } func (s *AdvancedVisibilitySuite) TestChildWorkflow_ParentWorkflow() {