Skip to content

Commit fef2cc6

Browse files
committed
Geyser Timelock worker fixes to detect unlock state of unitialized accounts
1 parent 112b807 commit fef2cc6

File tree

5 files changed

+23
-23
lines changed

5 files changed

+23
-23
lines changed

pkg/code/async/geyser/backup.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@ func (p *service) backupTimelockStateWorker(serviceCtx context.Context, state ti
3636
log.Debug("worker stopped")
3737
}()
3838

39-
delay := 0 * time.Second // Initially no delay, so we can run right after a deploy
39+
start := time.Now()
4040
cursor := query.EmptyCursor
41-
oldestRecordTs := time.Now()
41+
delay := 0 * time.Second // Initially no delay, so we can run right after a deploy
4242
for {
4343
select {
4444
case <-time.After(delay):
45-
start := time.Now()
45+
batchStart := time.Now()
4646

4747
func() {
4848
nr := serviceCtx.Value(metrics.NewRelicContextKey).(*newrelic.Application)
@@ -59,14 +59,14 @@ func (p *service) backupTimelockStateWorker(serviceCtx context.Context, state ti
5959
)
6060
if err == timelock.ErrTimelockNotFound {
6161
p.metricStatusLock.Lock()
62-
copiedTs := oldestRecordTs
63-
if p.oldestTimelockRecord == nil || p.oldestTimelockRecord.After(copiedTs) {
64-
p.oldestTimelockRecord = &copiedTs
62+
duration := time.Since(start)
63+
if p.backupTimelockStateWorkerDuration == nil || *p.backupTimelockStateWorkerDuration < duration {
64+
p.backupTimelockStateWorkerDuration = &duration
6565
}
6666
p.metricStatusLock.Unlock()
6767

68+
start = time.Now()
6869
cursor = query.EmptyCursor
69-
oldestRecordTs = time.Now()
7070
return
7171
} else if err != nil {
7272
log.WithError(err).Warn("failed to get timelock records")
@@ -77,10 +77,6 @@ func (p *service) backupTimelockStateWorker(serviceCtx context.Context, state ti
7777
for _, timelockRecord := range timelockRecords {
7878
wg.Add(1)
7979

80-
if timelockRecord.LastUpdatedAt.Before(oldestRecordTs) {
81-
oldestRecordTs = timelockRecord.LastUpdatedAt
82-
}
83-
8480
go func(timelockRecord *timelock.Record) {
8581
defer wg.Done()
8682

@@ -98,7 +94,7 @@ func (p *service) backupTimelockStateWorker(serviceCtx context.Context, state ti
9894
cursor = query.ToCursor(timelockRecords[len(timelockRecords)-1].Id)
9995
}()
10096

101-
delay = interval - time.Since(start)
97+
delay = interval - time.Since(batchStart)
10298
case <-serviceCtx.Done():
10399
return serviceCtx.Err()
104100
}

pkg/code/async/geyser/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ const (
2323
defaultProgramUpdateQueueSize = 1_000_000
2424

2525
BackupTimelockWorkerIntervalConfigEnvName = envConfigPrefix + "BACKUP_TIMELOCK_WORKER_INTERVAL"
26-
defaultBackupTimelockWorkerInterval = 1 * time.Minute
26+
defaultBackupTimelockWorkerInterval = time.Second
2727

2828
BackupExternalDepositWorkerIntervalConfigEnvName = envConfigPrefix + "BACKUP_EXTERNAL_DEPOSIT_WORKER_INTERVAL"
29-
defaultBackupExternalDepositWorkerInterval = 15 * time.Second
29+
defaultBackupExternalDepositWorkerInterval = time.Second
3030
)
3131

3232
type conf struct {

pkg/code/async/geyser/metrics.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,10 @@ func (p *service) recordBackupWorkerStatusPollingEvent(ctx context.Context) {
109109
"worker_type": timelockStateWorkerName,
110110
"is_active": p.backupTimelockStateWorkerStatus,
111111
}
112-
if p.oldestTimelockRecord != nil {
113-
oldestRecordAgeSeconds := time.Since(*p.oldestTimelockRecord) / time.Second
114-
timelockMetrics["oldest_record_age_s"] = int(oldestRecordAgeSeconds)
115-
p.oldestTimelockRecord = nil
112+
if p.backupTimelockStateWorkerDuration != nil {
113+
inSeconds := *p.backupTimelockStateWorkerDuration / time.Second
114+
timelockMetrics["duration_s"] = int(inSeconds)
115+
p.backupTimelockStateWorkerDuration = nil
116116
}
117117
metrics.RecordEvent(ctx, backupWorkerStatusEventName, timelockMetrics)
118118

pkg/code/async/geyser/service.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ type service struct {
4141
slotUpdateSubscriptionStatus bool
4242
highestObservedFinalizedSlot uint64
4343

44-
oldestTimelockRecord *time.Time
45-
backupTimelockStateWorkerStatus bool
44+
backupTimelockStateWorkerDuration *time.Duration
45+
backupTimelockStateWorkerStatus bool
4646

4747
backupExternalDepositWorkerStatus bool
4848
}
@@ -69,12 +69,12 @@ func (p *service) Start(ctx context.Context, _ time.Duration) error {
6969
p.log.WithError(err).Warn("timelock backup worker terminated unexpectedly")
7070
}
7171
}()
72-
/*go func() {
72+
go func() {
7373
err := p.backupTimelockStateWorker(ctx, timelock_token.StateUnknown, p.conf.backupTimelockWorkerInterval.Get(ctx))
7474
if err != nil && err != context.Canceled {
7575
p.log.WithError(err).Warn("timelock backup worker terminated unexpectedly")
7676
}
77-
}()*/
77+
}()
7878

7979
go func() {
8080
err := p.backupExternalDepositWorker(ctx, p.conf.backupExternalDepositWorkerInterval.Get(ctx))

pkg/code/async/geyser/timelock.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,13 @@ func updateTimelockAccountRecord(ctx context.Context, data code_data.Provider, t
2727
unlockAt := uint64(unlockState.UnlockAt)
2828
timelockRecord.UnlockAt = &unlockAt
2929
}
30+
31+
if timelockRecord.VaultState == timelock_token.StateUnknown {
32+
return nil
33+
}
34+
3035
timelockRecord.Block = slot
3136
timelockRecord.LastUpdatedAt = time.Now()
32-
3337
return data.SaveTimelock(ctx, timelockRecord)
3438
}
3539

0 commit comments

Comments
 (0)