Skip to content

Commit a484869

Browse files
evansandovalEvan Sandoval
andauthored
Add attempt number to activity logger (#1451)
* Add attempt # to structured Activity logging * nil checks * Updated tests and changed executeLocalActivityTask * Use getter for PollForActivityTaskResponse attempt --------- Co-authored-by: Evan Sandoval <[email protected]>
1 parent 3d6e75c commit a484869

File tree

6 files changed

+12
-1
lines changed

6 files changed

+12
-1
lines changed

internal/activity.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@ func WithActivityTask(
356356
zapcore.Field{Key: tagWorkflowType, Type: zapcore.StringType, String: *task.WorkflowType.Name},
357357
zapcore.Field{Key: tagWorkflowID, Type: zapcore.StringType, String: *task.WorkflowExecution.WorkflowId},
358358
zapcore.Field{Key: tagRunID, Type: zapcore.StringType, String: *task.WorkflowExecution.RunId},
359+
zapcore.Field{Key: tagAttempt, Type: zapcore.Int64Type, Integer: int64(task.GetAttempt())},
359360
)
360361

361362
return context.WithValue(ctx, activityEnvContextKey, &activityEnvironment{

internal/activity_task_handler_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func TestActivityTaskHandler_Execute_deadline(t *testing.T) {
8787
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(d.ScheduleDuration),
8888
StartedTimestamp: common.Int64Ptr(d.StartTS.UnixNano()),
8989
StartToCloseTimeoutSeconds: common.Int32Ptr(d.StartDuration),
90+
Attempt: common.Int32Ptr(0),
9091
WorkflowType: &s.WorkflowType{
9192
Name: common.StringPtr("wType"),
9293
},
@@ -139,6 +140,7 @@ func TestActivityTaskHandler_Execute_worker_stop(t *testing.T) {
139140
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(1),
140141
StartedTimestamp: common.Int64Ptr(now.UnixNano()),
141142
StartToCloseTimeoutSeconds: common.Int32Ptr(1),
143+
Attempt: common.Int32Ptr(0),
142144
WorkflowType: &s.WorkflowType{
143145
Name: common.StringPtr("wType"),
144146
},
@@ -193,6 +195,7 @@ func TestActivityTaskHandler_Execute_with_propagators(t *testing.T) {
193195
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(1),
194196
StartedTimestamp: common.Int64Ptr(now.UnixNano()),
195197
StartToCloseTimeoutSeconds: common.Int32Ptr(1),
198+
Attempt: common.Int32Ptr(0),
196199
WorkflowType: &s.WorkflowType{
197200
Name: common.StringPtr("wType"),
198201
},
@@ -245,6 +248,7 @@ func TestActivityTaskHandler_Execute_with_propagator_failure(t *testing.T) {
245248
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(1),
246249
StartedTimestamp: common.Int64Ptr(now.UnixNano()),
247250
StartToCloseTimeoutSeconds: common.Int32Ptr(1),
251+
Attempt: common.Int32Ptr(0),
248252
WorkflowType: &s.WorkflowType{
249253
Name: common.StringPtr("wType"),
250254
},
@@ -301,6 +305,7 @@ func TestActivityTaskHandler_Execute_with_auto_heartbeat(t *testing.T) {
301305
StartedTimestamp: common.Int64Ptr(now.UnixNano()),
302306
StartToCloseTimeoutSeconds: common.Int32Ptr(1),
303307
HeartbeatTimeoutSeconds: common.Int32Ptr(1),
308+
Attempt: common.Int32Ptr(0),
304309
WorkflowType: &s.WorkflowType{
305310
Name: common.StringPtr("wType"),
306311
},

internal/internal_logging_tags.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const (
2626
tagDomain = "Domain"
2727
tagEventID = "EventID"
2828
tagEventType = "EventType"
29+
tagAttempt = "Attempt"
2930
tagRunID = "RunID"
3031
tagTaskList = "TaskList"
3132
tagTimerID = "TimerID"

internal/internal_task_pollers.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,9 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
611611
zap.String(tagLocalActivityType, activityType),
612612
zap.String(tagWorkflowType, workflowType),
613613
zap.String(tagWorkflowID, task.params.WorkflowInfo.WorkflowExecution.ID),
614-
zap.String(tagRunID, task.params.WorkflowInfo.WorkflowExecution.RunID))
614+
zap.String(tagRunID, task.params.WorkflowInfo.WorkflowExecution.RunID),
615+
zap.Int64(tagAttempt, int64(task.attempt)),
616+
)
615617

616618
metricsScope.Counter(metrics.LocalActivityTotalCounter).Inc(1)
617619

internal/internal_workers_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() {
156156
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(1),
157157
StartedTimestamp: common.Int64Ptr(time.Now().UnixNano()),
158158
StartToCloseTimeoutSeconds: common.Int32Ptr(1),
159+
Attempt: common.Int32Ptr(0),
159160
WorkflowType: &m.WorkflowType{
160161
Name: common.StringPtr("wType"),
161162
},

internal/internal_workflow_testsuite.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1699,6 +1699,7 @@ func newTestActivityTask(workflowID, runID, activityID, workflowTypeName, domain
16991699
StartedTimestamp: common.Int64Ptr(time.Now().UnixNano()),
17001700
StartToCloseTimeoutSeconds: common.Int32Ptr(params.StartToCloseTimeoutSeconds),
17011701
HeartbeatTimeoutSeconds: common.Int32Ptr(params.HeartbeatTimeoutSeconds),
1702+
Attempt: common.Int32Ptr(0),
17021703
WorkflowType: &shared.WorkflowType{
17031704
Name: common.StringPtr(workflowTypeName),
17041705
},

0 commit comments

Comments
 (0)