@@ -23,14 +23,12 @@ package replication
2323import (
2424 "context"
2525 "fmt"
26+ "strconv"
2627 "time"
2728
2829 "github.com/arangodb/arangosync-client/client"
29- "github.com/arangodb/arangosync-client/client/synccheck"
30- "github.com/arangodb/go-driver"
3130
3231 api "github.com/arangodb/kube-arangodb/pkg/apis/replication/v1"
33- "github.com/arangodb/kube-arangodb/pkg/deployment/features"
3432 "github.com/arangodb/kube-arangodb/pkg/util/errors"
3533)
3634
@@ -80,12 +78,6 @@ func (dr *DeploymentReplication) inspectDeploymentReplication(lastInterval time.
8078 if err != nil {
8179 dr .log .Err (err ).Warn ("Failed to create destination syncmaster client" )
8280 } else {
83- destArangosyncVersion , err := destClient .Version (ctx )
84- if err != nil {
85- dr .log .Err (err ).Warn ("Failed to get destination arangosync version" )
86- hasError = true
87- }
88-
8981 // Fetch status of destination
9082 updateStatusNeeded := false
9183 configureSyncNeeded := false
@@ -108,8 +100,7 @@ func (dr *DeploymentReplication) inspectDeploymentReplication(lastInterval time.
108100 // Destination is correctly configured
109101 dr .status .Conditions .Update (api .ConditionTypeConfigured , true , api .ConditionConfiguredReasonActive ,
110102 "Destination syncmaster is configured correctly and active" )
111- dr .status .IncomingSynchronization = dr .inspectIncomingSynchronizationStatus (ctx , destClient ,
112- driver .Version (destArangosyncVersion .Version ), destStatus .Shards )
103+ dr .status .IncomingSynchronization = dr .inspectIncomingSynchronizationStatus (destStatus )
113104 updateStatusNeeded = true
114105 } else {
115106 // Sync is active, but from different source
@@ -249,91 +240,30 @@ func (dr *DeploymentReplication) hasOutgoingEndpoint(status client.SyncInfo, epS
249240}
250241
251242// inspectIncomingSynchronizationStatus returns the synchronization status for the incoming sync
252- func (dr * DeploymentReplication ) inspectIncomingSynchronizationStatus (ctx context.Context , syncClient client.API , arangosyncVersion driver.Version , localShards []client.ShardSyncInfo ) api.SynchronizationStatus {
253- dataCentersResp , err := syncClient .Master ().GetDataCentersInfo (ctx )
254- if err != nil {
255- errMsg := "Failed to fetch data-centers info"
256- dr .log .Err (err ).Warn (errMsg )
257- return api.SynchronizationStatus {
258- Error : fmt .Sprintf ("%s: %s" , errMsg , err .Error ()),
259- }
260- }
243+ func (dr * DeploymentReplication ) inspectIncomingSynchronizationStatus (destStatus client.SyncInfo ) api.SynchronizationStatus {
244+ const maxReportedIncomingSyncErrorsPerDatabase = 10
261245
262- ch := synccheck .NewSynchronizationChecker (syncClient , time .Minute )
263- incomingSyncStatus , err := ch .CheckSync (ctx , & dataCentersResp , localShards )
264- if err != nil {
265- errMsg := "Failed to check synchronization status"
266- dr .log .Err (err ).Warn (errMsg )
267- return api.SynchronizationStatus {
268- Error : fmt .Sprintf ("%s: %s" , errMsg , err .Error ()),
246+ dbs := make (map [string ]api.DatabaseSynchronizationStatus , 0 )
247+ for _ , s := range destStatus .Shards {
248+ db := dbs [s .Database ]
249+ db .ShardsTotal ++
250+ if s .Status == client .SyncStatusRunning {
251+ db .ShardsInSync ++
252+ } else if s .Status == client .SyncStatusFailed && len (db .Errors ) < maxReportedIncomingSyncErrorsPerDatabase {
253+ db .Errors = append (db .Errors , api.DatabaseSynchronizationError {
254+ Collection : s .Collection ,
255+ Shard : strconv .Itoa (s .ShardIndex ),
256+ Message : fmt .Sprintf ("shard sync failed: %s" , s .StatusMessage ),
257+ })
269258 }
259+ dbs [s .Database ] = db
270260 }
271- return dr .createSynchronizationStatus (arangosyncVersion , incomingSyncStatus )
272- }
273261
274- // createSynchronizationStatus returns aggregated info about DCSyncStatus
275- func (dr * DeploymentReplication ) createSynchronizationStatus (arangosyncVersion driver.Version , dcSyncStatus * synccheck.DCSyncStatus ) api.SynchronizationStatus {
276- dbs := make (map [string ]api.DatabaseSynchronizationStatus , len (dcSyncStatus .Databases ))
277- i := 0
278- for dbName , dbSyncStatus := range dcSyncStatus .Databases {
279- i ++
280- db := dbName
281- if features .SensitiveInformationProtection ().Enabled () {
282- // internal IDs are not available in older versions
283- if arangosyncVersion .CompareTo ("2.12.0" ) >= 0 {
284- db = dbSyncStatus .ID
285- } else {
286- db = fmt .Sprintf ("<PROTECTED_INFO_%d>" , i )
287- }
288- }
289- dbs [db ] = dr .createDatabaseSynchronizationStatus (dbSyncStatus )
290- }
291262 return api.SynchronizationStatus {
292- AllInSync : dcSyncStatus . AllInSync () ,
263+ AllInSync : destStatus . Status == client . SyncStatusRunning ,
293264 Databases : dbs ,
294265 Error : "" ,
295266 }
296267}
297268
298- // createDatabaseSynchronizationStatus returns sync status for DB
299- func (dr * DeploymentReplication ) createDatabaseSynchronizationStatus (dbSyncStatus synccheck.DatabaseSyncStatus ) api.DatabaseSynchronizationStatus {
300- // use limit for errors because the resulting status object should not be too big
301- const maxReportedIncomingSyncErrors = 20
302-
303- var errs []api.DatabaseSynchronizationError
304- var shardsTotal , shardsInSync int
305- var errorsReportedToLog = 0
306- for colName , colSyncStatus := range dbSyncStatus .Collections {
307- if colSyncStatus .Error != "" && len (errs ) < maxReportedIncomingSyncErrors {
308- col := colName
309- if features .SensitiveInformationProtection ().Enabled () {
310- col = colSyncStatus .ID
311- }
312-
313- errs = append (errs , api.DatabaseSynchronizationError {
314- Collection : col ,
315- Shard : "" ,
316- Message : colSyncStatus .Error ,
317- })
318- }
319-
320- shardsTotal += len (colSyncStatus .Shards )
321- for shardIndex , shardSyncStatus := range colSyncStatus .Shards {
322- if shardSyncStatus .InSync {
323- shardsInSync ++
324- } else if errorsReportedToLog < maxReportedIncomingSyncErrors {
325- dr .log .Str ("db" , dbSyncStatus .ID ).
326- Str ("col" , colSyncStatus .ID ).
327- Int ("shard" , shardIndex ).
328- Debug ("incoming synchronization shard status is not in-sync: %s" , shardSyncStatus .Message )
329- errorsReportedToLog ++
330- }
331- }
332- }
333-
334- return api.DatabaseSynchronizationStatus {
335- ShardsTotal : shardsTotal ,
336- ShardsInSync : shardsInSync ,
337- Errors : errs ,
338- }
339269}
0 commit comments