Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/v1.26.x #6999

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5dca81a
Bump Server version to 1.26.2-125.0
temporal-data Dec 2, 2024
84671af
Not reapply cancellation on reset (#6926)
hai719 Dec 3, 2024
1b8605f
Fix Update-with-Start for closed workflow (#6921)
stephanos Dec 4, 2024
f2c8a3b
Fix ES docID when workflowID contains multi-bytes unicode (#6929)
yiminc Dec 4, 2024
ced1897
Fix Update-with-Start serviceerrors (#6925)
stephanos Dec 4, 2024
209f706
Bump Server version to 1.26.2-125.1
temporal-data Dec 5, 2024
d38fc06
Test eager activity in versioned workflow (#6920)
ShahabT Dec 9, 2024
ab1e677
[Nexus operations] Cancel only after Operation started (#6931)
rodrigozhou Dec 6, 2024
a088471
add functional tests for pause/unpause activity API (#6948)
ychebotarev Dec 10, 2024
643e261
add functional tests for reset activity API (#6949)
ychebotarev Dec 10, 2024
e2ae164
Refresh user data from db periodically (#6962)
dnr Dec 13, 2024
f1e6420
Remove rejected update from update limit (#6963)
stephanos Dec 10, 2024
6f1bf37
Tweak Update rate limit error (#6964)
stephanos Dec 16, 2024
7241825
Nexus error rehydration (#6967)
bergundy Dec 13, 2024
87d62c1
some code improvements for activity api tests (#6971)
ychebotarev Dec 12, 2024
03af27f
Use requested PageSize and return correct NextPageToken in ListDeploy…
carlydf Dec 12, 2024
345b3c4
add more activity timeout tests (#6979)
ychebotarev Dec 12, 2024
355fe5b
Fix Nexus endpoint registry min wait (#6980)
pdoerner Dec 12, 2024
224ef51
Various small nexus fixes (#6990)
bergundy Dec 13, 2024
16db6f0
Handle missing Update `acceptedRequest` (#6993)
stephanos Dec 16, 2024
b8b6cf4
Refresh Nexus endpoints from db periodically (#6997)
pdoerner Dec 17, 2024
4ba79a3
Remove TODO from merge conflict
bergundy Dec 17, 2024
f6b8d02
Remove activity tests that could not be cherry picked
bergundy Dec 17, 2024
8edae6b
Fix more merge issues
bergundy Dec 17, 2024
82d237f
Fix a couple of issues in the slog adapter (#7000)
bergundy Dec 17, 2024
e16204c
enable ExecuteMultiOperation (#7007)
stephanos Dec 18, 2024
7fabbab
Fix MultiOperation: unlock terminated WF
stephanos Dec 19, 2024
d07d0a5
Enable Nexus by default (#7021)
bergundy Dec 20, 2024
4151e25
Bump Server version to 1.26.2
temporal-data Dec 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ jobs:
- id: generate_output
run: |
shards=3
timeout=20 # update this to TEST_TIMEOUT if you update the Makefile
timeout=25 # update this to TEST_TIMEOUT if you update the Makefile
runs_on='["ubuntu-20.04"]'
if [[ "${{ inputs.run_single_functional_test }}" == "true" || "${{ inputs.run_single_unit_test }}" == "true" ]]; then
shards=1
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ issues:
linters:
- revive
- path: _test\.go|tests/.+\.go
text: "dot-imports" # helpful in tests
text: "(dot-imports|unchecked-type-assertion)" # helpful in tests
linters:
- revive
- path: ^tools\/.+\.go
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ TEST_TAG_FLAG := -tags $(ALL_TEST_TAGS)
# If you change this, also change .github/workflows/run-tests.yml!
# The timeout in the GH workflow must be larger than this to avoid GH timing out the action,
# which causes the a job run to not produce any logs and hurts the debugging experience.
TEST_TIMEOUT ?= 20m
TEST_TIMEOUT ?= 25m

# Number of retries for *-coverage targets.
FAILED_TEST_RETRIES ?= 2
Expand Down
18 changes: 14 additions & 4 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,9 +861,9 @@ and deployment interaction in matching and history.`,
)
EnableNexus = NewGlobalBoolSetting(
"system.enableNexus",
false,
`EnableNexus toggles all Nexus functionality on the server. Note that toggling this requires restarting
server hosts for it to take effect.`,
true,
`Toggles all Nexus functionality on the server. Note that toggling this requires restarting server hosts for it
to take effect.`,
)
RefreshNexusEndpointsLongPollTimeout = NewGlobalDurationSetting(
"system.refreshNexusEndpointsLongPollTimeout",
Expand Down Expand Up @@ -936,7 +936,7 @@ used when the first cache layer has a miss. Requires server restart for change t

FrontendEnableExecuteMultiOperation = NewNamespaceBoolSetting(
"frontend.enableExecuteMultiOperation",
false,
true,
`FrontendEnableExecuteMultiOperation enables the ExecuteMultiOperation API in the frontend.
The API is under active development.`,
)
Expand Down Expand Up @@ -1178,6 +1178,11 @@ This can help reduce effects of task queue movement.`,
5*time.Minute-10*time.Second,
`MatchingGetUserDataLongPollTimeout is the max length of long polls for GetUserData calls between partitions.`,
)
MatchingGetUserDataRefresh = NewGlobalDurationSetting(
"matching.getUserDataRefresh",
5*time.Minute,
`MatchingGetUserDataRefresh is how often the user data owner refreshes data from persistence.`,
)
MatchingBacklogNegligibleAge = NewTaskQueueDurationSetting(
"matching.backlogNegligibleAge",
5*time.Second,
Expand All @@ -1200,6 +1205,11 @@ duration since last poll exceeds this threshold.`,
5*time.Minute-10*time.Second,
`MatchingListNexusEndpointsLongPollTimeout is the max length of long polls for ListNexusEndpoints calls.`,
)
MatchingNexusEndpointsRefreshInterval = NewGlobalDurationSetting(
"matching.nexusEndpointsRefreshInterval",
10*time.Second,
`Time to wait between calls to check that the in-memory view of Nexus endpoints matches the persisted state.`,
)
MatchingMembershipUnloadDelay = NewGlobalDurationSetting(
"matching.membershipUnloadDelay",
500*time.Millisecond,
Expand Down
2 changes: 1 addition & 1 deletion common/headers/version_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (

// ServerVersion value can be changed by the create-tag Github workflow.
// If you change the var name or move it, be sure to update the workflow.
ServerVersion = "1.26.0"
ServerVersion = "1.26.2"

// SupportedServerVersions is used by CLI and inter role communication.
SupportedServerVersions = ">=1.0.0 <2.0.0"
Expand Down
16 changes: 13 additions & 3 deletions common/log/slog.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,17 @@ type handler struct {

var _ slog.Handler = (*handler)(nil)

// SLogWrapper allows extracting an underlying slog logger.
type SLogWrapper interface {
SLog() *slog.Logger
}

// NewSlogLogger creates an slog.Logger from a given logger.
func NewSlogLogger(logger Logger) *slog.Logger {
// Try extracting and underlying slog logger (e.g. for Temporal CLI).
if sl, ok := logger.(SLogWrapper); ok {
return sl.SLog()
}
logger = withIncreasedSkip(logger, 3)
return slog.New(&handler{logger: logger, zapLogger: extractZapLogger(logger), group: "", tags: nil})
}
Expand Down Expand Up @@ -89,7 +98,7 @@ func (h *handler) WithAttrs(attrs []slog.Attr) slog.Handler {
for _, attr := range attrs {
tags = append(tags, tag.NewZapTag(convertAttrToField(h.prependGroup(attr))))
}
return &handler{logger: h.logger, tags: tags, group: h.group}
return &handler{logger: h.logger, zapLogger: h.zapLogger, tags: tags, group: h.group}
}

// WithGroup implements slog.Handler.
Expand All @@ -98,7 +107,7 @@ func (h *handler) WithGroup(name string) slog.Handler {
if h.group != "" {
group = h.group + "." + name
}
return &handler{logger: h.logger, tags: h.tags, group: group}
return &handler{logger: h.logger, zapLogger: h.zapLogger, tags: h.tags, group: group}
}

func (h *handler) prependGroup(attr slog.Attr) slog.Attr {
Expand Down Expand Up @@ -136,7 +145,8 @@ func withIncreasedSkip(logger Logger, skip int) Logger {
logger: withIncreasedSkip(l.logger, skip),
}
}
return nil
// Default to not increasing the skip, it's better to have a logger than not having one.
return logger
}

// convertSlogToZapLevel maps slog Levels to zap Levels.
Expand Down
2 changes: 1 addition & 1 deletion common/nexus/endpoint_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ func (r *EndpointRegistryImpl) waitUntilInitialized(ctx context.Context) error {

func (r *EndpointRegistryImpl) refreshEndpointsLoop(ctx context.Context, dataReady *dataReady) error {
hasLoadedEndpointData := false
minWaitTime := r.config.refreshMinWait()

for ctx.Err() == nil {
minWaitTime := r.config.refreshMinWait()
start := time.Now()
if !hasLoadedEndpointData {
// Loading endpoints for the first time after being (re)enabled, so load with fallback to persistence
Expand Down
134 changes: 99 additions & 35 deletions common/nexus/failure.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package nexus

import (
"encoding/json"
"errors"
"net/http"

Expand All @@ -33,12 +34,14 @@ import (
"go.temporal.io/api/serviceerror"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
)

var failureTypeString = string((&failurepb.Failure{}).ProtoReflect().Descriptor().FullName())

// ProtoFailureToNexusFailure converts a proto Nexus Failure to a Nexus SDK Failure.
// Always returns a non-nil value.
func ProtoFailureToNexusFailure(failure *nexuspb.Failure) *nexus.Failure {
return &nexus.Failure{
func ProtoFailureToNexusFailure(failure *nexuspb.Failure) nexus.Failure {
return nexus.Failure{
Message: failure.GetMessage(),
Metadata: failure.GetMetadata(),
Details: failure.GetDetails(),
Expand All @@ -47,65 +50,126 @@ func ProtoFailureToNexusFailure(failure *nexuspb.Failure) *nexus.Failure {

// NexusFailureToProtoFailure converts a Nexus SDK Failure to a proto Nexus Failure.
// Always returns a non-nil value.
func NexusFailureToProtoFailure(failure *nexus.Failure) *nexuspb.Failure {
if failure == nil {
return &nexuspb.Failure{Message: "unknown error"}
}
func NexusFailureToProtoFailure(failure nexus.Failure) *nexuspb.Failure {
return &nexuspb.Failure{
Message: failure.Message,
Metadata: failure.Metadata,
Details: failure.Details,
}
}

// APIFailureToNexusFailure converts an API proto Failure to a Nexus SDK Failure taking only the failure message to
// avoid leaking too many details to 3rd party callers.
// Always returns a non-nil value.
func APIFailureToNexusFailure(failure *failurepb.Failure) *nexus.Failure {
return &nexus.Failure{
Message: failure.GetMessage(),
// APIFailureToNexusFailure converts an API proto Failure to a Nexus SDK Failure setting the metadata "type" field to
// the proto fullname of the temporal API Failure message.
// Mutates the failure temporarily, unsetting the Message field to avoid duplicating the information in the serialized
// failure. Mutating was chosen over cloning for performance reasons since this function may be called frequently.
func APIFailureToNexusFailure(failure *failurepb.Failure) (nexus.Failure, error) {
// Unset message so it's not serialized in the details.
var message string
message, failure.Message = failure.Message, ""
data, err := protojson.Marshal(failure)
failure.Message = message

if err != nil {
return nexus.Failure{}, err
}
return nexus.Failure{
Message: failure.GetMessage(),
Metadata: map[string]string{
"type": failureTypeString,
},
Details: data,
}, nil
}

func UnsuccessfulOperationErrorToTemporalFailure(err *nexus.UnsuccessfulOperationError) *failurepb.Failure {
failure := &failurepb.Failure{
Message: err.Failure.Message,
}
if err.State == nexus.OperationStateCanceled {
failure.FailureInfo = &failurepb.Failure_CanceledFailureInfo{
CanceledFailureInfo: &failurepb.CanceledFailureInfo{
Details: nexusFailureMetadataToPayloads(err.Failure),
},
// NexusFailureToAPIFailure converts a Nexus Failure to an API proto Failure.
// If the failure metadata "type" field is set to the fullname of the temporal API Failure message, the failure is
// reconstructed using protojson.Unmarshal on the failure details field.
func NexusFailureToAPIFailure(failure nexus.Failure, retryable bool) (*failurepb.Failure, error) {
apiFailure := &failurepb.Failure{}

if failure.Metadata != nil && failure.Metadata["type"] == failureTypeString {
if err := protojson.Unmarshal(failure.Details, apiFailure); err != nil {
return nil, err
}
} else {
failure.FailureInfo = &failurepb.Failure_ApplicationFailureInfo{
payloads, err := nexusFailureMetadataToPayloads(failure)
if err != nil {
return nil, err
}
apiFailure.FailureInfo = &failurepb.Failure_ApplicationFailureInfo{
ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{
// Make up a type here, it's not part of the Nexus Failure spec.
Type: "NexusOperationFailure",
Details: nexusFailureMetadataToPayloads(err.Failure),
NonRetryable: true,
Type: "NexusFailure",
Details: payloads,
NonRetryable: !retryable,
},
}
}
return failure
// Ensure this always gets written.
apiFailure.Message = failure.Message
return apiFailure, nil
}

func nexusFailureMetadataToPayloads(failure nexus.Failure) *commonpb.Payloads {
func UnsuccessfulOperationErrorToTemporalFailure(opErr *nexus.UnsuccessfulOperationError) (*failurepb.Failure, error) {
var nexusFailure nexus.Failure
failureErr, ok := opErr.Cause.(*nexus.FailureError)
if ok {
nexusFailure = failureErr.Failure
} else if opErr.Cause != nil {
nexusFailure = nexus.Failure{Message: opErr.Cause.Error()}
}

// Canceled must be translated into a CanceledFailure to match the SDK expectation.
if opErr.State == nexus.OperationStateCanceled {
if nexusFailure.Metadata != nil && nexusFailure.Metadata["type"] == failureTypeString {
temporalFailure, err := NexusFailureToAPIFailure(nexusFailure, false)
if err != nil {
return nil, err
}
if temporalFailure.GetCanceledFailureInfo() != nil {
// We already have a CanceledFailure, use it.
return temporalFailure, nil
}
// Fallback to encoding the Nexus failure into a Temporal canceled failure, we expect operations that end up
// as canceled to have a CanceledFailureInfo object.
}
payloads, err := nexusFailureMetadataToPayloads(nexusFailure)
if err != nil {
return nil, err
}
return &failurepb.Failure{
Message: nexusFailure.Message,
FailureInfo: &failurepb.Failure_CanceledFailureInfo{
CanceledFailureInfo: &failurepb.CanceledFailureInfo{
Details: payloads,
},
},
}, nil
}

return NexusFailureToAPIFailure(nexusFailure, false)
}

func nexusFailureMetadataToPayloads(failure nexus.Failure) (*commonpb.Payloads, error) {
if len(failure.Metadata) == 0 && len(failure.Details) == 0 {
return nil
return nil, nil
}
metadata := make(map[string][]byte, len(failure.Metadata))
for k, v := range failure.Metadata {
metadata[k] = []byte(v)
// Delete before serializing.
failure.Message = ""
data, err := json.Marshal(failure)
if err != nil {
return nil, err
}
return &commonpb.Payloads{
Payloads: []*commonpb.Payload{
{
Metadata: metadata,
Data: failure.Details,
Metadata: map[string][]byte{
"encoding": []byte("json/plain"),
},
Data: data,
},
},
}
}, err
}

// ConvertGRPCError converts either a serviceerror or a gRPC status error into a Nexus HandlerError if possible.
Expand Down
7 changes: 5 additions & 2 deletions common/nexus/nexustest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ func NewNexusServer(t *testing.T, listenAddr string, handler nexus.Handler) {
// Graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
require.NoError(t, srv.Shutdown(ctx))
require.ErrorIs(t, <-errCh, http.ErrServerClosed)
err = srv.Shutdown(ctx)
if ctx.Err() != nil {
require.NoError(t, err)
require.ErrorIs(t, <-errCh, http.ErrServerClosed)
}
})
}

Expand Down
14 changes: 9 additions & 5 deletions common/persistence/sql/nexus_endpoint_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,13 @@ func (s *sqlNexusEndpointStore) ListNexusEndpoints(
return p.ErrNexusTableVersionConflict
}

rows, err = tx.ListNexusEndpoints(ctx, &sqlplugin.ListNexusEndpointsRequest{
LastID: lastID,
Limit: request.PageSize,
})
if request.PageSize > 0 {
// PageSize could be zero when fetching just the table version.
rows, err = tx.ListNexusEndpoints(ctx, &sqlplugin.ListNexusEndpointsRequest{
LastID: lastID,
Limit: request.PageSize,
})
}

return err
})
Expand All @@ -172,7 +175,8 @@ func (s *sqlNexusEndpointStore) ListNexusEndpoints(
}

var nextPageToken []byte
if len(rows) == request.PageSize {
if len(rows) > 0 && len(rows) == request.PageSize {
// len(rows) could be zero when fetching just the table version.
nextPageToken, retErr = serializePageTokenJson(&listEndpointsNextPageToken{
LastID: rows[request.PageSize-1].ID,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client"
"go.temporal.io/server/common/persistence/visibility/store/query"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/util"
)

const (
Expand Down Expand Up @@ -295,14 +296,11 @@ func GetDocID(workflowID string, runID string) string {
const maxDocIDLength = 512
// Generally runID is guid and this should never be the case.
if len(runID)+len(delimiter) >= maxDocIDLength {
if len(runID) >= maxDocIDLength {
return runID[0:maxDocIDLength]
}
return runID[0 : maxDocIDLength-len(delimiter)]
return util.TruncateUTF8(runID, maxDocIDLength)
}

if len(workflowID)+len(runID)+len(delimiter) > maxDocIDLength {
workflowID = workflowID[0 : maxDocIDLength-len(runID)-len(delimiter)]
workflowID = util.TruncateUTF8(workflowID, maxDocIDLength-len(runID)-len(delimiter))
}

return workflowID + delimiter + runID
Expand Down
Loading
Loading