@@ -122,18 +122,18 @@ func (td *tableDiffer) initialize(ctx context.Context) error {
122122
123123 targetKeyspace := td .wd .ct .vde .thisTablet .Keyspace
124124 lockName := fmt .Sprintf ("%s/%s" , targetKeyspace , td .wd .ct .workflow )
125- log .Infof ("Locking workflow %s" , lockName )
125+ log .Infof ("Locking workflow %s for vdiff %s " , lockName , td . wd . ct . uuid )
126126 ctx , unlock , lockErr := td .wd .ct .ts .LockName (ctx , lockName , "vdiff" )
127127 if lockErr != nil {
128- log .Errorf ("Locking workfkow %s failed: %v" , lockName , lockErr )
128+ log .Errorf ("Locking workfkow %s for vdiff %s failed: %v" , lockName , td . wd . ct . uuid , lockErr )
129129 return lockErr
130130 }
131131
132132 var err error
133133 defer func () {
134134 unlock (& err )
135135 if err != nil {
136- log .Errorf ("Unlocking workflow %s failed: %v" , lockName , err )
136+ log .Errorf ("Unlocking workflow %s for vdiff %s failed: %v" , lockName , td . wd . ct . uuid , err )
137137 }
138138 }()
139139
@@ -143,12 +143,12 @@ func (td *tableDiffer) initialize(ctx context.Context) error {
143143 defer func () {
144144 // We use a new context as we want to reset the state even
145145 // when the parent context has timed out or been canceled.
146- log .Infof ("Restarting the %q VReplication workflow on target tablets in keyspace %q" ,
147- td .wd .ct .workflow , targetKeyspace )
146+ log .Infof ("Restarting the %q VReplication workflow for vdiff %s on target tablets in keyspace %q" ,
147+ td .wd .ct .workflow , td . wd . ct . uuid , targetKeyspace )
148148 restartCtx , restartCancel := context .WithTimeout (context .Background (), BackgroundOperationTimeout )
149149 defer restartCancel ()
150150 if err := td .restartTargetVReplicationStreams (restartCtx ); err != nil {
151- log .Errorf ("error restarting target streams: %v" , err )
151+ log .Errorf ("error restarting target streams for vdiff %s : %v" , td . wd . ct . uuid , err )
152152 }
153153 }()
154154
@@ -175,7 +175,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error {
175175}
176176
177177func (td * tableDiffer ) stopTargetVReplicationStreams (ctx context.Context , dbClient binlogplayer.DBClient ) error {
178- log .Infof ("stopTargetVReplicationStreams" )
178+ log .Infof ("stopTargetVReplicationStreams for vdiff %s" , td . wd . ct . uuid )
179179 ct := td .wd .ct
180180 query := "update _vt.vreplication set state = 'Stopped', message='for vdiff' " + ct .workflowFilter
181181 if _ , err := ct .vde .vre .Exec (query ); err != nil {
@@ -332,7 +332,7 @@ func (td *tableDiffer) syncTargetStreams(ctx context.Context) error {
332332 return err
333333 }
334334 if err := ct .vde .vre .WaitForPos (waitCtx , source .vrID , source .snapshotPosition ); err != nil {
335- log .Errorf ("WaitForPosition error: %d: %s" , source .vrID , err )
335+ log .Errorf ("WaitForPosition for vdiff %s error: %d: %s" , td . wd . ct . uuid , source .vrID , err )
336336 return vterrors .Wrapf (err , "WaitForPosition for stream id %d" , source .vrID )
337337 }
338338 return nil
@@ -350,7 +350,8 @@ func (td *tableDiffer) startTargetDataStream(ctx context.Context) error {
350350 go td .streamOneShard (ctx , ct .targetShardStreamer , td .tablePlan .targetQuery , td .lastTargetPK , gtidch )
351351 gtid , ok := <- gtidch
352352 if ! ok {
353- log .Infof ("streaming error: %v" , ct .targetShardStreamer .err )
353+ log .Errorf ("VDiff %s streaming error on target tablet %s: %v" ,
354+ td .wd .ct .uuid , topoproto .TabletAliasString (ct .targetShardStreamer .tablet .Alias ), ct .targetShardStreamer .err )
354355 return ct .targetShardStreamer .err
355356 }
356357 ct .targetShardStreamer .snapshotPosition = gtid
@@ -366,6 +367,8 @@ func (td *tableDiffer) startSourceDataStreams(ctx context.Context) error {
366367
367368 gtid , ok := <- gtidch
368369 if ! ok {
370+ log .Errorf ("VDiff %s streaming error on source tablet %s: %v" ,
371+ td .wd .ct .uuid , topoproto .TabletAliasString (source .tablet .Alias ), source .err )
369372 return source .err
370373 }
371374 source .snapshotPosition = gtid
@@ -381,7 +384,7 @@ func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) err
381384 ct := td .wd .ct
382385 query := fmt .Sprintf ("update _vt.vreplication set state='Running', message='', stop_pos='' where db_name=%s and workflow=%s" ,
383386 encodeString (ct .vde .dbName ), encodeString (ct .workflow ))
384- log .Infof ("Restarting the %q VReplication workflow using %q" , ct .workflow , query )
387+ log .Infof ("Restarting the %q VReplication workflow for vdiff %s using %q" , ct .workflow , td . wd . ct . uuid , query )
385388 var err error
386389 // Let's retry a few times if we get a retryable error.
387390 for i := 1 ; i <= 3 ; i ++ {
@@ -396,11 +399,11 @@ func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) err
396399}
397400
398401func (td * tableDiffer ) streamOneShard (ctx context.Context , participant * shardStreamer , query string , lastPK * querypb.QueryResult , gtidch chan string ) {
399- log .Infof ("streamOneShard Start on %s using query: %s" , participant .tablet .Alias .String (), query )
402+ log .Infof ("streamOneShard Start for vdiff %s on %s using query: %s" , td . wd . ct . uuid , participant .tablet .Alias .String (), query )
400403 td .wgShardStreamers .Add (1 )
401404
402405 defer func () {
403- log .Infof ("streamOneShard End on %s (err: %v)" , participant .tablet .Alias .String (), participant .err )
406+ log .Infof ("streamOneShard for vdiff %s End on %s (err: %v)" , td . wd . ct . uuid , participant .tablet .Alias .String (), participant .err )
404407 select {
405408 case <- ctx .Done ():
406409 default :
@@ -542,7 +545,7 @@ func (td *tableDiffer) diff(ctx context.Context, coreOpts *tabletmanagerdatapb.V
542545 // Save our progress when we finish the run.
543546 defer func () {
544547 if err := td .updateTableProgress (dbClient , dr , lastProcessedRow ); err != nil {
545- log .Errorf ("Failed to update vdiff progress on %s table: %v" , td .table .Name , err )
548+ log .Errorf ("Failed to update vdiff %s progress on %s table: %v" , td . wd . ct . uuid , td .table .Name , err )
546549 }
547550 globalStats .RowsDiffedCount .Add (dr .ProcessedRows )
548551 }()
@@ -579,15 +582,15 @@ func (td *tableDiffer) diff(ctx context.Context, coreOpts *tabletmanagerdatapb.V
579582
580583 if ! mismatch && dr .MismatchedRows > 0 {
581584 mismatch = true
582- log .Infof ("Flagging mismatch for %s: %+v" , td .table .Name , dr )
585+ log .Infof ("Flagging mismatch in vdiff %s for %s: %+v" , td . wd . ct . uuid , td .table .Name , dr )
583586 if err := updateTableMismatch (dbClient , td .wd .ct .id , td .table .Name ); err != nil {
584587 return nil , err
585588 }
586589 }
587590
588591 rowsToCompare --
589592 if rowsToCompare < 0 {
590- log .Infof ("Stopping vdiff, specified row limit reached" )
593+ log .Infof ("Stopping vdiff %s , specified row limit reached" , td . wd . ct . uuid )
591594 return dr , nil
592595 }
593596 if advanceSource {
@@ -889,7 +892,7 @@ func (td *tableDiffer) adjustForSourceTimeZone(targetSelectExprs []sqlparser.Sel
889892 if td .wd .ct .sourceTimeZone == "" {
890893 return targetSelectExprs
891894 }
892- log .Infof ("source time zone specified: %s" , td .wd .ct .sourceTimeZone )
895+ log .Infof ("Source time zone specified for vdiff %s : %s" , td . wd . ct . uuid , td .wd .ct .sourceTimeZone )
893896 var newSelectExprs []sqlparser.SelectExpr
894897 var modified bool
895898 for _ , expr := range targetSelectExprs {
@@ -906,7 +909,7 @@ func (td *tableDiffer) adjustForSourceTimeZone(targetSelectExprs []sqlparser.Sel
906909 sqlparser .NewStrLiteral (td .wd .ct .targetTimeZone ),
907910 sqlparser .NewStrLiteral (td .wd .ct .sourceTimeZone ),
908911 )
909- log .Infof ("converting datetime column %s using convert_tz()" , colName )
912+ log .Infof ("Converting datetime column %s using convert_tz() for vdiff %s " , colName , td . wd . ct . uuid )
910913 newSelectExprs = append (newSelectExprs , & sqlparser.AliasedExpr {Expr : convertTZFuncExpr , As : colAs .Name })
911914 converted = true
912915 modified = true
@@ -918,7 +921,8 @@ func (td *tableDiffer) adjustForSourceTimeZone(targetSelectExprs []sqlparser.Sel
918921 }
919922 }
920923 if modified { // at least one datetime was found
921- log .Infof ("Found datetime columns when SourceTimeZone was set, resetting target SelectExprs after convert_tz()" )
924+ log .Infof ("Found datetime columns when SourceTimeZone was set, resetting target SelectExprs after convert_tz() for vdiff %s" ,
925+ td .wd .ct .uuid )
922926 return newSelectExprs
923927 }
924928 return targetSelectExprs
@@ -987,11 +991,11 @@ func (td *tableDiffer) getSourcePKCols() error {
987991 td .table .Name , topoproto .TabletAliasString (sourceTablet .Tablet .Alias ))
988992 }
989993 if len (pkeCols ) > 0 {
990- log .Infof ("Using primary key equivalent columns %+v for table %s" , pkeCols , td .table .Name )
994+ log .Infof ("Using primary key equivalent columns %+v for table %s in vdiff %s " , pkeCols , td .table .Name , td . wd . ct . uuid )
991995 sourceTable .PrimaryKeyColumns = pkeCols
992996 } else {
993997 // We use every column together as a substitute PK.
994- log .Infof ("Using all columns as a substitute primary key for table %s" , td .table .Name )
998+ log .Infof ("Using all columns as a substitute primary key for table %s in vdiff %s " , td .table .Name , td . wd . ct . uuid )
995999 sourceTable .PrimaryKeyColumns = append (sourceTable .PrimaryKeyColumns , td .table .Columns ... )
9961000 }
9971001 }
0 commit comments