diff --git a/internal/activity.go b/internal/activity.go index fada8ad7d..6f27f3358 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -286,8 +286,8 @@ func WithActivityTask( interceptors []WorkerInterceptor, client *WorkflowClient, ) (context.Context, error) { - scheduled := task.GetScheduledTime().AsTime() - started := task.GetStartedTime().AsTime() + scheduled := safeAsTime(task.GetScheduledTime()) + started := safeAsTime(task.GetStartedTime()) scheduleToCloseTimeout := task.GetScheduleToCloseTimeout().AsDuration() startToCloseTimeout := task.GetStartToCloseTimeout().AsDuration() heartbeatTimeout := task.GetHeartbeatTimeout().AsDuration() diff --git a/internal/internal_deployment_client.go b/internal/internal_deployment_client.go index c3ca8f156..0d9a4a552 100644 --- a/internal/internal_deployment_client.go +++ b/internal/internal_deployment_client.go @@ -73,7 +73,7 @@ func deploymentToProto(deploymentID Deployment) *deployment.Deployment { func deploymentListEntryFromProto(deployment *deployment.DeploymentListInfo) *DeploymentListEntry { return &DeploymentListEntry{ Deployment: deploymentFromProto(deployment.GetDeployment()), - CreateTime: deployment.GetCreateTime().AsTime(), + CreateTime: safeAsTime(deployment.GetCreateTime()), IsCurrent: deployment.GetIsCurrent(), } } @@ -84,7 +84,7 @@ func deploymentTaskQueuesInfoFromProto(tqsInfo []*deployment.DeploymentInfo_Task result = append(result, DeploymentTaskQueueInfo{ Name: info.GetName(), Type: TaskQueueType(info.GetType()), - FirstPollerTime: info.GetFirstPollerTime().AsTime(), + FirstPollerTime: safeAsTime(info.GetFirstPollerTime()), }) } return result @@ -93,7 +93,7 @@ func deploymentTaskQueuesInfoFromProto(tqsInfo []*deployment.DeploymentInfo_Task func deploymentInfoFromProto(deploymentInfo *deployment.DeploymentInfo) DeploymentInfo { return DeploymentInfo{ Deployment: deploymentFromProto(deploymentInfo.GetDeployment()), - CreateTime: deploymentInfo.GetCreateTime().AsTime(), + CreateTime: safeAsTime(deploymentInfo.GetCreateTime()), IsCurrent: deploymentInfo.GetIsCurrent(), TaskQueuesInfos: deploymentTaskQueuesInfoFromProto(deploymentInfo.GetTaskQueueInfos()), Metadata: deploymentInfo.GetMetadata(), @@ -110,7 +110,7 @@ func deploymentReachabilityInfoFromProto(response *workflowservice.GetDeployment return DeploymentReachabilityInfo{ DeploymentInfo: deploymentInfoFromProto(response.GetDeploymentInfo()), Reachability: DeploymentReachability(response.GetReachability()), - LastUpdateTime: response.GetLastUpdateTime().AsTime(), + LastUpdateTime: safeAsTime(response.GetLastUpdateTime()), } } diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 1c4574d97..92bd90ded 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -1194,7 +1194,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( // No Operation case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED: // Set replay clock. - weh.SetCurrentReplayTime(event.GetEventTime().AsTime()) + weh.SetCurrentReplayTime(safeAsTime(event.GetEventTime())) // Update workflow info fields weh.workflowInfo.currentHistoryLength = int(event.EventId) weh.workflowInfo.continueAsNewSuggested = event.GetWorkflowTaskStartedEventAttributes().GetSuggestContinueAsNew() diff --git a/internal/internal_schedule_client.go b/internal/internal_schedule_client.go index eb08ef907..bd22e9517 100644 --- a/internal/internal_schedule_client.go +++ b/internal/internal_schedule_client.go @@ -426,15 +426,8 @@ func convertFromPBScheduleSpec(scheduleSpec *schedulepb.ScheduleSpec) *ScheduleS skip := convertFromPBScheduleCalendarSpecList(scheduleSpec.GetExcludeStructuredCalendar()) - startAt := time.Time{} - if scheduleSpec.GetStartTime() != nil { - startAt = scheduleSpec.GetStartTime().AsTime() - } - - endAt := time.Time{} - if scheduleSpec.GetEndTime() != nil { - endAt = scheduleSpec.GetEndTime().AsTime() - } + startAt := safeAsTime(scheduleSpec.GetStartTime()) + endAt := safeAsTime(scheduleSpec.GetEndTime()) return &ScheduleSpec{ Calendars: calendars, @@ -468,7 +461,7 @@ func scheduleDescriptionFromPB( nextActionTimes := make([]time.Time, len(describeResponse.Info.GetFutureActionTimes())) for i, t := range describeResponse.Info.GetFutureActionTimes() { - nextActionTimes[i] = t.AsTime() + nextActionTimes[i] = safeAsTime(t) } actionDescription, err := convertFromPBScheduleAction(logger, dc, describeResponse.Schedule.Action) @@ -505,8 +498,8 @@ func scheduleDescriptionFromPB( RunningWorkflows: runningWorkflows, RecentActions: recentActions, NextActionTimes: nextActionTimes, - CreatedAt: describeResponse.Info.GetCreateTime().AsTime(), - LastUpdateAt: describeResponse.Info.GetUpdateTime().AsTime(), + CreatedAt: safeAsTime(describeResponse.Info.GetCreateTime()), + LastUpdateAt: safeAsTime(describeResponse.Info.GetUpdateTime()), }, Memo: describeResponse.Memo, SearchAttributes: searchAttributes, @@ -553,7 +546,7 @@ func convertFromPBScheduleListEntry(schedule *schedulepb.ScheduleListEntry) *Sch nextActionTimes := make([]time.Time, len(schedule.Info.GetFutureActionTimes())) for i, t := range schedule.Info.GetFutureActionTimes() { - nextActionTimes[i] = t.AsTime() + nextActionTimes[i] = safeAsTime(t) } return &ScheduleListEntry{ @@ -842,8 +835,8 @@ func convertFromPBScheduleActionResultList(aa []*schedulepb.ScheduleActionResult } } recentActions[i] = ScheduleActionResult{ - ScheduleTime: a.GetScheduleTime().AsTime(), - ActualTime: a.GetActualTime().AsTime(), + ScheduleTime: safeAsTime(a.GetScheduleTime()), + ActualTime: safeAsTime(a.GetActualTime()), StartWorkflowResult: workflowExecution, } } diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 08c6c8508..575f99698 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -732,7 +732,7 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice. WorkflowTaskTimeout: attributes.GetWorkflowTaskTimeout().AsDuration(), Namespace: wth.namespace, Attempt: attributes.GetAttempt(), - WorkflowStartTime: startedEvent.GetEventTime().AsTime(), + WorkflowStartTime: safeAsTime(startedEvent.GetEventTime()), lastCompletionResult: attributes.LastCompletionResult, lastFailure: attributes.ContinuedFailure, CronSchedule: attributes.CronSchedule, diff --git a/internal/internal_versioning_client.go b/internal/internal_versioning_client.go index f5ab9c591..fac00dd68 100644 --- a/internal/internal_versioning_client.go +++ b/internal/internal_versioning_client.go @@ -350,10 +350,7 @@ func pollerInfoFromResponse(response *taskqueuepb.PollerInfo) TaskQueuePollerInf return TaskQueuePollerInfo{} } - lastAccessTime := time.Time{} - if response.GetLastAccessTime() != nil { - lastAccessTime = response.GetLastAccessTime().AsTime() - } + lastAccessTime := safeAsTime(response.GetLastAccessTime()) return TaskQueuePollerInfo{ LastAccessTime: lastAccessTime, @@ -429,7 +426,7 @@ func taskQueueVersioningInfoFromResponse(info *taskqueuepb.TaskQueueVersioningIn CurrentVersion: info.CurrentVersion, RampingVersion: info.RampingVersion, RampingVersionPercentage: info.RampingVersionPercentage, - UpdateTime: info.UpdateTime.AsTime(), + UpdateTime: safeAsTime(info.UpdateTime), } } diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 589c5e896..9988ab30c 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -2263,7 +2263,7 @@ func (w *workflowClientInterceptor) DescribeWorkflow( Status: info.GetStatus(), ParentWorkflowExecution: parentWorkflowExecution, RootWorkflowExecution: rootWorkflowExecution, - WorkflowStartTime: info.GetStartTime().AsTime(), + WorkflowStartTime: safeAsTime(info.GetStartTime()), ExecutionTime: executionTime, WorkflowCloseTime: closeTime, HistoryLength: int(info.GetHistoryLength()),