Skip to content

core: Alternate ipV4 and ipV6 addresses for Happy Eyeballs in PickFirstLeafLoadBalancer #11624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 14, 2025
159 changes: 116 additions & 43 deletions core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import io.grpc.LoadBalancer;
import io.grpc.Status;
import io.grpc.SynchronizationContext.ScheduledHandle;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -58,17 +60,17 @@
private static final Logger log = Logger.getLogger(PickFirstLeafLoadBalancer.class.getName());
@VisibleForTesting
static final int CONNECTION_DELAY_INTERVAL_MS = 250;
private final boolean enableHappyEyeballs = !isSerializingRetries()
&& PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
private final Helper helper;
private final Map<SocketAddress, SubchannelData> subchannels = new HashMap<>();
private final Index addressIndex = new Index(ImmutableList.of());
private final Index addressIndex = new Index(ImmutableList.of(), this.enableHappyEyeballs);
private int numTf = 0;
private boolean firstPass = true;
@Nullable
private ScheduledHandle scheduleConnectionTask = null;
private ConnectivityState rawConnectivityState = IDLE;
private ConnectivityState concludedState = IDLE;
private final boolean enableHappyEyeballs = !isSerializingRetries()
&& PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
private boolean notAPetiolePolicy = true; // means not under a petiole policy
private final BackoffPolicy.Provider bkoffPolProvider = new ExponentialBackoffPolicy.Provider();
private BackoffPolicy reconnectPolicy;
Expand Down Expand Up @@ -610,27 +612,26 @@
}

/**
* Index as in 'i', the pointer to an entry. Not a "search index."
* This contains both an ordered list of addresses and a pointer(i.e. index) to the current entry.
* All updates should be done in a synchronization context.
*/
@VisibleForTesting
static final class Index {
private List<EquivalentAddressGroup> addressGroups;
private int size;
private int groupIndex;
private int addressIndex;
private List<UnwrappedEag> orderedAddresses;
private int activeElement = 0;
private boolean enableHappyEyeballs;

public Index(List<EquivalentAddressGroup> groups) {
Index(List<EquivalentAddressGroup> groups, boolean enableHappyEyeballs) {
this.enableHappyEyeballs = enableHappyEyeballs;
updateGroups(groups);
}

public boolean isValid() {
// Is invalid if empty or has incremented off the end
return groupIndex < addressGroups.size();
return activeElement < orderedAddresses.size();
}

public boolean isAtBeginning() {
return groupIndex == 0 && addressIndex == 0;
return activeElement == 0;
}

/**
Expand All @@ -642,79 +643,150 @@
return false;
}

EquivalentAddressGroup group = addressGroups.get(groupIndex);
addressIndex++;
if (addressIndex >= group.getAddresses().size()) {
groupIndex++;
addressIndex = 0;
return groupIndex < addressGroups.size();
}
activeElement++;

return true;
return isValid();
}

public void reset() {
groupIndex = 0;
addressIndex = 0;
activeElement = 0;
}

public SocketAddress getCurrentAddress() {
if (!isValid()) {
throw new IllegalStateException("Index is past the end of the address group list");
}
return addressGroups.get(groupIndex).getAddresses().get(addressIndex);
return orderedAddresses.get(activeElement).address;
}

public Attributes getCurrentEagAttributes() {
if (!isValid()) {
throw new IllegalStateException("Index is off the end of the address group list");
}
return addressGroups.get(groupIndex).getAttributes();
return orderedAddresses.get(activeElement).attributes;
}

public List<EquivalentAddressGroup> getCurrentEagAsList() {
return Collections.singletonList(
new EquivalentAddressGroup(getCurrentAddress(), getCurrentEagAttributes()));
return Collections.singletonList(getCurrentEag());
}

private EquivalentAddressGroup getCurrentEag() {
if (!isValid()) {
throw new IllegalStateException("Index is past the end of the address group list");

Check warning on line 675 in core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java#L675

Added line #L675 was not covered by tests
}
return orderedAddresses.get(activeElement).asEag();
}

/**
* Update to new groups, resetting the current index.
*/
public void updateGroups(List<EquivalentAddressGroup> newGroups) {
addressGroups = checkNotNull(newGroups, "newGroups");
checkNotNull(newGroups, "newGroups");
orderedAddresses = enableHappyEyeballs
? updateGroupsHE(newGroups)
: updateGroupsNonHE(newGroups);
reset();
int size = 0;
for (EquivalentAddressGroup eag : newGroups) {
size += eag.getAddresses().size();
}
this.size = size;
}

/**
* Returns false if the needle was not found and the current index was left unchanged.
*/
public boolean seekTo(SocketAddress needle) {
for (int i = 0; i < addressGroups.size(); i++) {
EquivalentAddressGroup group = addressGroups.get(i);
int j = group.getAddresses().indexOf(needle);
if (j == -1) {
continue;
checkNotNull(needle, "needle");
for (int i = 0; i < orderedAddresses.size(); i++) {
if (orderedAddresses.get(i).address.equals(needle)) {
this.activeElement = i;
return true;
}
this.groupIndex = i;
this.addressIndex = j;
return true;
}
return false;
}

public int size() {
return size;
return orderedAddresses.size();
}

private List<UnwrappedEag> updateGroupsNonHE(List<EquivalentAddressGroup> newGroups) {
List<UnwrappedEag> entries = new ArrayList<>();
for (int g = 0; g < newGroups.size(); g++) {
EquivalentAddressGroup eag = newGroups.get(g);
for (int a = 0; a < eag.getAddresses().size(); a++) {
SocketAddress addr = eag.getAddresses().get(a);
entries.add(new UnwrappedEag(eag.getAttributes(), addr));
}
}

return entries;
}

private List<UnwrappedEag> updateGroupsHE(List<EquivalentAddressGroup> newGroups) {
Boolean firstIsV6 = null;
List<UnwrappedEag> v4Entries = new ArrayList<>();
List<UnwrappedEag> v6Entries = new ArrayList<>();
for (int g = 0; g < newGroups.size(); g++) {
EquivalentAddressGroup eag = newGroups.get(g);
for (int a = 0; a < eag.getAddresses().size(); a++) {
SocketAddress addr = eag.getAddresses().get(a);
boolean isIpV4 = addr instanceof InetSocketAddress
&& ((InetSocketAddress) addr).getAddress() instanceof Inet4Address;
if (isIpV4) {
if (firstIsV6 == null) {
firstIsV6 = false;
}
v4Entries.add(new UnwrappedEag(eag.getAttributes(), addr));
} else {
if (firstIsV6 == null) {
firstIsV6 = true;
}
v6Entries.add(new UnwrappedEag(eag.getAttributes(), addr));
}
}
}

return firstIsV6 != null && firstIsV6
? interleave(v6Entries, v4Entries)
: interleave(v4Entries, v6Entries);
}

private List<UnwrappedEag> interleave(List<UnwrappedEag> firstFamily,
List<UnwrappedEag> secondFamily) {
if (firstFamily.isEmpty()) {
return secondFamily;
}
if (secondFamily.isEmpty()) {
return firstFamily;
}

List<UnwrappedEag> result = new ArrayList<>(firstFamily.size() + secondFamily.size());
for (int i = 0; i < Math.max(firstFamily.size(), secondFamily.size()); i++) {
if (i < firstFamily.size()) {
result.add(firstFamily.get(i));
}
if (i < secondFamily.size()) {
result.add(secondFamily.get(i));
}
}
return result;
}

private static final class UnwrappedEag {
private final Attributes attributes;
private final SocketAddress address;

public UnwrappedEag(Attributes attributes, SocketAddress address) {
this.attributes = attributes;
this.address = address;
}

private EquivalentAddressGroup asEag() {
return new EquivalentAddressGroup(address, attributes);
}
}
}

@VisibleForTesting
int getGroupIndex() {
return addressIndex.groupIndex;
int getIndexLocation() {
return addressIndex.activeElement;
}

@VisibleForTesting
Expand Down Expand Up @@ -778,4 +850,5 @@
this.randomSeed = randomSeed;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assume.assumeTrue;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -67,6 +68,7 @@
import io.grpc.Status.Code;
import io.grpc.SynchronizationContext;
import io.grpc.internal.PickFirstLeafLoadBalancer.PickFirstLeafLoadBalancerConfig;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -2618,7 +2620,7 @@ public void serialized_retries_two_passes() {
forwardTimeByBackoffDelay(); // should trigger retry again
for (int i = 0; i < subchannels.length; i++) {
inOrder.verify(subchannels[i]).requestConnection();
assertEquals(i, loadBalancer.getGroupIndex());
assertEquals(i, loadBalancer.getIndexLocation());
listeners[i].onSubchannelState(ConnectivityStateInfo.forTransientFailure(error)); // cascade
}
}
Expand All @@ -2637,7 +2639,7 @@ public void index_looping() {
PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList(
new EquivalentAddressGroup(Arrays.asList(addr1, addr2), attr1),
new EquivalentAddressGroup(Arrays.asList(addr3), attr2),
new EquivalentAddressGroup(Arrays.asList(addr4, addr5), attr3)));
new EquivalentAddressGroup(Arrays.asList(addr4, addr5), attr3)), enableHappyEyeballs);
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attr1);
assertThat(index.isAtBeginning()).isTrue();
Expand Down Expand Up @@ -2696,7 +2698,7 @@ public void index_updateGroups_resets() {
SocketAddress addr3 = new FakeSocketAddress("addr3");
PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList(
new EquivalentAddressGroup(Arrays.asList(addr1)),
new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
new EquivalentAddressGroup(Arrays.asList(addr2, addr3))), enableHappyEyeballs);
index.increment();
index.increment();
// We want to make sure both groupIndex and addressIndex are reset
Expand All @@ -2713,7 +2715,7 @@ public void index_seekTo() {
SocketAddress addr3 = new FakeSocketAddress("addr3");
PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList(
new EquivalentAddressGroup(Arrays.asList(addr1, addr2)),
new EquivalentAddressGroup(Arrays.asList(addr3))));
new EquivalentAddressGroup(Arrays.asList(addr3))), enableHappyEyeballs);
assertThat(index.seekTo(addr3)).isTrue();
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3);
assertThat(index.seekTo(addr1)).isTrue();
Expand All @@ -2725,6 +2727,83 @@ public void index_seekTo() {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2);
}

@Test
public void index_interleaving() {
InetSocketAddress addr1_6 = new InetSocketAddress("f38:1:1", 1234);
InetSocketAddress addr1_4 = new InetSocketAddress("10.1.1.1", 1234);
InetSocketAddress addr2_4 = new InetSocketAddress("10.1.1.2", 1234);
InetSocketAddress addr3_4 = new InetSocketAddress("10.1.1.3", 1234);
InetSocketAddress addr4_4 = new InetSocketAddress("10.1.1.4", 1234);
InetSocketAddress addr4_6 = new InetSocketAddress("f38:1:4", 1234);

Attributes attrs1 = Attributes.newBuilder().build();
Attributes attrs2 = Attributes.newBuilder().build();
Attributes attrs3 = Attributes.newBuilder().build();
Attributes attrs4 = Attributes.newBuilder().build();

PickFirstLeafLoadBalancer.Index index = new PickFirstLeafLoadBalancer.Index(Arrays.asList(
new EquivalentAddressGroup(Arrays.asList(addr1_4, addr1_6), attrs1),
new EquivalentAddressGroup(Arrays.asList(addr2_4), attrs2),
new EquivalentAddressGroup(Arrays.asList(addr3_4), attrs3),
new EquivalentAddressGroup(Arrays.asList(addr4_4, addr4_6), attrs4)), enableHappyEyeballs);

assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1_4);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs1);
assertThat(index.isAtBeginning()).isTrue();

index.increment();
assertThat(index.isValid()).isTrue();
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1_6);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs1);
assertThat(index.isAtBeginning()).isFalse();

index.increment();
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2_4);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs2);

index.increment();
if (enableHappyEyeballs) {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_6);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs4);
} else {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3_4);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs3);
}

index.increment();
if (enableHappyEyeballs) {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr3_4);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs3);
} else {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_4);
assertThat(index.getCurrentEagAttributes()).isSameInstanceAs(attrs4);
}

// Move to last entry
assertThat(index.increment()).isTrue();
assertThat(index.isValid()).isTrue();
if (enableHappyEyeballs) {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_4);
} else {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_6);
}

// Move off of the end
assertThat(index.increment()).isFalse();
assertThat(index.isValid()).isFalse();
assertThrows(IllegalStateException.class, index::getCurrentAddress);

// Reset
index.reset();
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1_4);
assertThat(index.isAtBeginning()).isTrue();
assertThat(index.isValid()).isTrue();

// Seek to an address
assertThat(index.seekTo(addr4_4)).isTrue();
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr4_4);
}

private static class FakeSocketAddress extends SocketAddress {
final String name;

Expand Down
Loading