22
22
import static io .grpc .xds .client .XdsLogger .XdsLogLevel .DEBUG ;
23
23
24
24
import com .google .common .annotations .VisibleForTesting ;
25
+ import com .google .common .collect .ImmutableList ;
25
26
import com .google .common .collect .Sets ;
26
27
import io .grpc .InternalLogId ;
27
28
import io .grpc .NameResolver ;
42
43
import java .util .Collections ;
43
44
import java .util .HashMap ;
44
45
import java .util .HashSet ;
46
+ import java .util .LinkedHashSet ;
45
47
import java .util .List ;
46
48
import java .util .Map ;
47
49
import java .util .Objects ;
48
50
import java .util .Set ;
49
51
import java .util .concurrent .ScheduledExecutorService ;
50
- import java .util .stream .Collectors ;
51
52
import javax .annotation .Nullable ;
52
53
53
54
/**
@@ -101,7 +102,7 @@ public Closeable subscribeToCluster(String clusterName) {
101
102
subscription .closed = true ;
102
103
return ; // shutdown() called
103
104
}
104
- addClusterWatcher (clusterName , subscription , 1 );
105
+ addClusterWatcher (clusterName , subscription );
105
106
});
106
107
107
108
return subscription ;
@@ -164,7 +165,7 @@ private static void throwIfParentContextsNotEmpty(XdsWatcherBase<?> watcher) {
164
165
CdsWatcher cdsWatcher = (CdsWatcher ) watcher ;
165
166
if (!cdsWatcher .parentContexts .isEmpty ()) {
166
167
String msg = String .format ("CdsWatcher %s has parent contexts %s" ,
167
- cdsWatcher .resourceName (), cdsWatcher .parentContexts . keySet () );
168
+ cdsWatcher .resourceName (), cdsWatcher .parentContexts );
168
169
throw new IllegalStateException (msg );
169
170
}
170
171
} else if (watcher instanceof EdsWatcher ) {
@@ -309,24 +310,14 @@ StatusOr<XdsConfig> buildUpdate() {
309
310
}
310
311
builder .setVirtualHost (activeVirtualHost );
311
312
312
- Map <String , XdsWatcherBase <XdsEndpointResource .EdsUpdate >> edsWatchers =
313
- getWatchers (ENDPOINT_RESOURCE );
314
- Map <String , XdsWatcherBase <XdsClusterResource .CdsUpdate >> cdsWatchers =
315
- getWatchers (CLUSTER_RESOURCE );
316
-
317
- // Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters
318
- List <String > topLevelClusters =
319
- cdsWatchers .values ().stream ()
320
- .filter (XdsDependencyManager ::isTopLevelCluster )
321
- .map (XdsWatcherBase <?>::resourceName )
322
- .distinct ()
323
- .collect (Collectors .toList ());
324
-
325
- // Flatten multi-level aggregates into lists of leaf clusters
326
- Set <String > leafNames =
327
- addTopLevelClustersToBuilder (builder , edsWatchers , cdsWatchers , topLevelClusters );
328
-
329
- addLeavesToBuilder (builder , edsWatchers , leafNames );
313
+ Map <String , StatusOr <XdsConfig .XdsClusterConfig >> clusters = new HashMap <>();
314
+ LinkedHashSet <String > ancestors = new LinkedHashSet <>();
315
+ for (String cluster : getWatchers (CLUSTER_RESOURCE ).keySet ()) {
316
+ addConfigForCluster (clusters , cluster , ancestors );
317
+ }
318
+ for (Map .Entry <String , StatusOr <XdsConfig .XdsClusterConfig >> me : clusters .entrySet ()) {
319
+ builder .addCluster (me .getKey (), me .getValue ());
320
+ }
330
321
331
322
return StatusOr .fromValue (builder .build ());
332
323
}
@@ -344,111 +335,81 @@ private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
344
335
return tTypeWatchers .watchers ;
345
336
}
346
337
347
- private void addLeavesToBuilder (
348
- XdsConfig .XdsConfigBuilder builder ,
349
- Map <String , XdsWatcherBase <XdsEndpointResource .EdsUpdate >> edsWatchers ,
350
- Set <String > leafNames ) {
351
- for (String clusterName : leafNames ) {
352
- CdsWatcher cdsWatcher = getCluster (clusterName );
353
- StatusOr <XdsClusterResource .CdsUpdate > cdsUpdateOr = cdsWatcher .getData ();
338
+ private void addConfigForCluster (
339
+ Map <String , StatusOr <XdsConfig .XdsClusterConfig >> clusters ,
340
+ String clusterName ,
341
+ @ SuppressWarnings ("NonApiType" ) // Need order-preserving set for errors
342
+ LinkedHashSet <String > ancestors ) {
343
+ if (clusters .containsKey (clusterName )) {
344
+ return ;
345
+ }
346
+ if (ancestors .contains (clusterName )) {
347
+ clusters .put (clusterName , StatusOr .fromStatus (
348
+ Status .INTERNAL .withDescription (
349
+ "Aggregate cluster cycle detected: " + ancestors )));
350
+ return ;
351
+ }
352
+ if (ancestors .size () > MAX_CLUSTER_RECURSION_DEPTH ) {
353
+ clusters .put (clusterName , StatusOr .fromStatus (
354
+ Status .INTERNAL .withDescription ("Recursion limit reached: " + ancestors )));
355
+ return ;
356
+ }
354
357
355
- if (!cdsUpdateOr .hasValue ()) {
356
- builder .addCluster (clusterName , StatusOr .fromStatus (cdsUpdateOr .getStatus ()));
357
- continue ;
358
- }
358
+ CdsWatcher cdsWatcher = (CdsWatcher ) getWatchers (CLUSTER_RESOURCE ).get (clusterName );
359
+ StatusOr <XdsClusterResource .CdsUpdate > cdsWatcherDataOr = cdsWatcher .getData ();
360
+ if (!cdsWatcherDataOr .hasValue ()) {
361
+ clusters .put (clusterName , StatusOr .fromStatus (cdsWatcherDataOr .getStatus ()));
362
+ return ;
363
+ }
359
364
360
- XdsClusterResource .CdsUpdate cdsUpdate = cdsUpdateOr .getValue ();
361
- if (cdsUpdate .clusterType () == ClusterType .EDS ) {
365
+ XdsClusterResource .CdsUpdate cdsUpdate = cdsWatcherDataOr .getValue ();
366
+ XdsConfig .XdsClusterConfig .ClusterChild child ;
367
+ switch (cdsUpdate .clusterType ()) {
368
+ case AGGREGATE :
369
+ // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it
370
+ // preserves the priority across all aggregate clusters
371
+ LinkedHashSet <String > leafNames = new LinkedHashSet <String >();
372
+ ancestors .add (clusterName );
373
+ for (String childCluster : cdsUpdate .prioritizedClusterNames ()) {
374
+ addConfigForCluster (clusters , childCluster , ancestors );
375
+ StatusOr <XdsConfig .XdsClusterConfig > config = clusters .get (childCluster );
376
+ if (!config .hasValue ()) {
377
+ clusters .put (clusterName , StatusOr .fromStatus (Status .INTERNAL .withDescription (
378
+ "Unable to get leaves for " + clusterName + ": "
379
+ + config .getStatus ().getDescription ())));
380
+ return ;
381
+ }
382
+ XdsConfig .XdsClusterConfig .ClusterChild children = config .getValue ().getChildren ();
383
+ if (children instanceof AggregateConfig ) {
384
+ leafNames .addAll (((AggregateConfig ) children ).getLeafNames ());
385
+ } else {
386
+ leafNames .add (childCluster );
387
+ }
388
+ }
389
+ ancestors .remove (clusterName );
390
+
391
+ child = new AggregateConfig (ImmutableList .copyOf (leafNames ));
392
+ break ;
393
+ case EDS :
362
394
XdsWatcherBase <XdsEndpointResource .EdsUpdate > edsWatcher =
363
- edsWatchers .get (cdsWatcher .getEdsServiceName ());
364
- EndpointConfig child ;
395
+ getWatchers (ENDPOINT_RESOURCE ).get (cdsWatcher .getEdsServiceName ());
365
396
if (edsWatcher != null ) {
366
397
child = new EndpointConfig (edsWatcher .getData ());
367
398
} else {
368
399
child = new EndpointConfig (StatusOr .fromStatus (Status .INTERNAL .withDescription (
369
400
"EDS resource not found for cluster " + clusterName )));
370
401
}
371
- builder . addCluster ( clusterName , StatusOr . fromValue (
372
- new XdsConfig . XdsClusterConfig ( clusterName , cdsUpdate , child )));
373
- } else if ( cdsUpdate . clusterType () == ClusterType . LOGICAL_DNS ) {
374
- builder . addCluster ( clusterName , StatusOr .fromStatus (
402
+ break ;
403
+ case LOGICAL_DNS :
404
+ // TODO get the resolved endpoint configuration
405
+ child = new EndpointConfig ( StatusOr .fromStatus (
375
406
Status .INTERNAL .withDescription ("Logical DNS in dependency manager unsupported" )));
376
- }
377
- }
378
- }
379
-
380
- // Adds the top-level clusters to the builder and returns the leaf cluster names
381
- private Set <String > addTopLevelClustersToBuilder (
382
- XdsConfig .XdsConfigBuilder builder ,
383
- Map <String , XdsWatcherBase <XdsEndpointResource .EdsUpdate >> edsWatchers ,
384
- Map <String , XdsWatcherBase <XdsClusterResource .CdsUpdate >> cdsWatchers ,
385
- List <String > topLevelClusters ) {
386
-
387
- Set <String > leafClusterNames = new HashSet <>();
388
- for (String clusterName : topLevelClusters ) {
389
- CdsWatcher cdsWatcher = (CdsWatcher ) cdsWatchers .get (clusterName );
390
- StatusOr <XdsClusterResource .CdsUpdate > cdsWatcherDataOr = cdsWatcher .getData ();
391
- if (!cdsWatcher .hasDataValue ()) {
392
- builder .addCluster (clusterName , StatusOr .fromStatus (cdsWatcherDataOr .getStatus ()));
393
- continue ;
394
- }
395
-
396
- XdsClusterResource .CdsUpdate cdsUpdate = cdsWatcherDataOr .getValue ();
397
- XdsConfig .XdsClusterConfig .ClusterChild child ;
398
- switch (cdsUpdate .clusterType ()) {
399
- case AGGREGATE :
400
- Set <String > leafNames = new HashSet <>();
401
- addLeafNames (leafNames , cdsUpdate );
402
- child = new AggregateConfig (leafNames );
403
- leafClusterNames .addAll (leafNames );
404
- break ;
405
- case EDS :
406
- XdsWatcherBase <XdsEndpointResource .EdsUpdate > edsWatcher =
407
- edsWatchers .get (cdsWatcher .getEdsServiceName ());
408
- if (edsWatcher != null ) {
409
- child = new EndpointConfig (edsWatcher .getData ());
410
- } else {
411
- child = new EndpointConfig (StatusOr .fromStatus (Status .INTERNAL .withDescription (
412
- "EDS resource not found for cluster " + clusterName )));
413
- }
414
- break ;
415
- case LOGICAL_DNS :
416
- // TODO get the resolved endpoint configuration
417
- child = new EndpointConfig (StatusOr .fromStatus (
418
- Status .INTERNAL .withDescription ("Logical DNS in dependency manager unsupported" )));
419
- break ;
420
- default :
421
- throw new IllegalStateException ("Unexpected value: " + cdsUpdate .clusterType ());
422
- }
423
- builder .addCluster (clusterName , StatusOr .fromValue (
424
- new XdsConfig .XdsClusterConfig (clusterName , cdsUpdate , child )));
425
- }
426
-
427
- return leafClusterNames ;
428
- }
429
-
430
- private void addLeafNames (Set <String > leafNames , XdsClusterResource .CdsUpdate cdsUpdate ) {
431
- for (String cluster : cdsUpdate .prioritizedClusterNames ()) {
432
- if (leafNames .contains (cluster )) {
433
- continue ;
434
- }
435
- StatusOr <XdsClusterResource .CdsUpdate > data = getCluster (cluster ).getData ();
436
- if (data == null || !data .hasValue () || data .getValue () == null ) {
437
- leafNames .add (cluster );
438
- continue ;
439
- }
440
- if (data .getValue ().clusterType () == ClusterType .AGGREGATE ) {
441
- addLeafNames (leafNames , data .getValue ());
442
- } else {
443
- leafNames .add (cluster );
444
- }
407
+ break ;
408
+ default :
409
+ throw new IllegalStateException ("Unexpected value: " + cdsUpdate .clusterType ());
445
410
}
446
- }
447
-
448
- private static boolean isTopLevelCluster (
449
- XdsWatcherBase <XdsClusterResource .CdsUpdate > cdsWatcher ) {
450
- return ((CdsWatcher )cdsWatcher ).parentContexts .values ().stream ()
451
- .anyMatch (depth -> depth == 1 );
411
+ clusters .put (clusterName , StatusOr .fromValue (
412
+ new XdsConfig .XdsClusterConfig (clusterName , cdsUpdate , child )));
452
413
}
453
414
454
415
@ Override
@@ -467,14 +428,14 @@ private void addEdsWatcher(String edsServiceName, CdsWatcher parentContext) {
467
428
addWatcher (new EdsWatcher (edsServiceName , parentContext ));
468
429
}
469
430
470
- private void addClusterWatcher (String clusterName , Object parentContext , int depth ) {
431
+ private void addClusterWatcher (String clusterName , Object parentContext ) {
471
432
CdsWatcher watcher = (CdsWatcher ) getWatchers (CLUSTER_RESOURCE ).get (clusterName );
472
433
if (watcher != null ) {
473
- watcher .parentContexts .put (parentContext , depth );
434
+ watcher .parentContexts .add (parentContext );
474
435
return ;
475
436
}
476
437
477
- addWatcher (new CdsWatcher (clusterName , parentContext , depth ));
438
+ addWatcher (new CdsWatcher (clusterName , parentContext ));
478
439
}
479
440
480
441
private void updateRoutes (List <VirtualHost > virtualHosts , Object newParentContext ,
@@ -494,9 +455,9 @@ private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContex
494
455
495
456
deletedClusters .forEach (watcher ->
496
457
cancelClusterWatcherTree (getCluster (watcher ), newParentContext ));
497
- addedClusters .forEach ((cluster ) -> addClusterWatcher (cluster , newParentContext , 1 ));
458
+ addedClusters .forEach ((cluster ) -> addClusterWatcher (cluster , newParentContext ));
498
459
} else {
499
- newClusters .forEach ((cluster ) -> addClusterWatcher (cluster , newParentContext , 1 ));
460
+ newClusters .forEach ((cluster ) -> addClusterWatcher (cluster , newParentContext ));
500
461
}
501
462
}
502
463
@@ -805,11 +766,11 @@ public StatusOr<RdsUpdate> getRdsUpdate() {
805
766
}
806
767
807
768
private class CdsWatcher extends XdsWatcherBase <XdsClusterResource .CdsUpdate > {
808
- Map <Object , Integer > parentContexts = new HashMap <>();
769
+ Set <Object > parentContexts = new HashSet <>();
809
770
810
- CdsWatcher (String resourceName , Object parentContext , int depth ) {
771
+ CdsWatcher (String resourceName , Object parentContext ) {
811
772
super (CLUSTER_RESOURCE , checkNotNull (resourceName , "resourceName" ));
812
- this .parentContexts .put (checkNotNull (parentContext , "parentContext" ), depth );
773
+ this .parentContexts .add (checkNotNull (parentContext , "parentContext" ));
813
774
}
814
775
815
776
@ Override
@@ -829,14 +790,6 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {
829
790
break ;
830
791
case AGGREGATE :
831
792
Object parentContext = this ;
832
- int depth = parentContexts .values ().stream ().max (Integer ::compare ).orElse (0 ) + 1 ;
833
- if (depth > MAX_CLUSTER_RECURSION_DEPTH ) {
834
- logger .log (XdsLogger .XdsLogLevel .WARNING ,
835
- "Cluster recursion depth limit exceeded for cluster {0}" , resourceName ());
836
- Status error = Status .UNAVAILABLE .withDescription (
837
- "aggregate cluster graph exceeds max depth at " + resourceName () + nodeInfo ());
838
- setDataAsStatus (error );
839
- }
840
793
if (hasDataValue ()) {
841
794
Set <String > oldNames = getData ().getValue ().clusterType () == ClusterType .AGGREGATE
842
795
? new HashSet <>(getData ().getValue ().prioritizedClusterNames ())
@@ -847,21 +800,18 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {
847
800
deletedClusters .forEach ((cluster )
848
801
-> cancelClusterWatcherTree (getCluster (cluster ), parentContext ));
849
802
850
- if (depth <= MAX_CLUSTER_RECURSION_DEPTH ) {
851
- setData (update );
852
- Set <String > addedClusters = Sets .difference (newNames , oldNames );
853
- addedClusters .forEach ((cluster ) -> addClusterWatcher (cluster , parentContext , depth ));
854
- }
855
-
856
- } else if (depth <= MAX_CLUSTER_RECURSION_DEPTH ) {
803
+ setData (update );
804
+ Set <String > addedClusters = Sets .difference (newNames , oldNames );
805
+ addedClusters .forEach ((cluster ) -> addClusterWatcher (cluster , parentContext ));
806
+ } else {
857
807
setData (update );
858
808
update .prioritizedClusterNames ()
859
- .forEach (name -> addClusterWatcher (name , parentContext , depth ));
809
+ .forEach (name -> addClusterWatcher (name , parentContext ));
860
810
}
861
811
break ;
862
812
default :
863
813
Status error = Status .UNAVAILABLE .withDescription (
864
- "aggregate cluster graph exceeds max depth at " + resourceName () + nodeInfo ());
814
+ "unknown cluster type in " + resourceName () + " " + update . clusterType ());
865
815
setDataAsStatus (error );
866
816
}
867
817
maybePublishConfig ();
0 commit comments