From 3e6875ae86efcafb03dec803193826a51badbfa7 Mon Sep 17 00:00:00 2001 From: Ravi kiran Chiruvolu Date: Wed, 29 Sep 2021 17:37:13 -0700 Subject: [PATCH 01/11] skeleton myst shard discovery --- .../metrics/meta/endpoints/ShardEndPoint.java | 16 +++++ .../endpoints/ShardedServiceRegistry.java | 11 ++++ .../impl/KubernetesStatefulRegistry.java | 41 +++++++++++++ .../meta/endpoints/impl/MystEndpoint.java | 59 +++++++++++++++++++ .../impl/MystStatefulSetRegistry.java | 52 ++++++++++++++++ 5 files changed, 179 insertions(+) create mode 100644 meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardEndPoint.java create mode 100644 meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardedServiceRegistry.java create mode 100644 meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/KubernetesStatefulRegistry.java create mode 100644 meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystEndpoint.java create mode 100644 meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistry.java diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardEndPoint.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardEndPoint.java new file mode 100644 index 00000000..8cd9d00c --- /dev/null +++ b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardEndPoint.java @@ -0,0 +1,16 @@ +package net.opentsdb.aura.metrics.meta.endpoints; + +public interface ShardEndPoint { + + String getHost(); + + int getPort(); + + Protocol getProtocol(); + + boolean mtls(); + + enum Protocol { + http1_1, http2_0, https1_1, https_2_0 + } +} diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardedServiceRegistry.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardedServiceRegistry.java new file mode 100644 index 00000000..b6e5fa3e --- /dev/null +++ b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardedServiceRegistry.java @@ -0,0 +1,11 @@ +package net.opentsdb.aura.metrics.meta.endpoints; + +import java.util.List; +import java.util.Map; + +public interface ShardedServiceRegistry { + + Map> getEndpoints(String namespace); + + Map> getEndpoints(String namespace, long epoch); +} diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/KubernetesStatefulRegistry.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/KubernetesStatefulRegistry.java new file mode 100644 index 00000000..3ae5f22a --- /dev/null +++ b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/KubernetesStatefulRegistry.java @@ -0,0 +1,41 @@ +package net.opentsdb.aura.metrics.meta.endpoints.impl; + +import net.opentsdb.aura.metrics.meta.endpoints.ShardedServiceRegistry; + +//TODO: +// Parse the yaml file. +// Look at Plugin implementation. +// if it is too complicated, then parse the file yourself. +// Parsing the file should use ObjectMapper. +public abstract class KubernetesStatefulRegistry implements ShardedServiceRegistry { + + private String type; + private final String[] replicas = {"a", "b", "c", "d", "e"}; + protected static final String NUM_SHARDS = "num-shards"; + protected static final String REPLICAS = "replicas"; + protected static final String K8S_NAMESPACE = "k8s_namespace"; + + public KubernetesStatefulRegistry(final String type) { + this.type = type; + } + + protected int getNumShards(String namespace) { + return 0; + } + + protected int getNumReplicas(String namespace) { + return 0; + } + + protected String getReplica(int replicaId) { + return replicas[replicaId]; + } + + protected int getPort(String namespace) { + return 0; + } + + protected String getDomain(String namespace) { + return null; + } +} diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystEndpoint.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystEndpoint.java new file mode 100644 index 00000000..fc997172 --- /dev/null +++ b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystEndpoint.java @@ -0,0 +1,59 @@ +package net.opentsdb.aura.metrics.meta.endpoints.impl; + +import net.opentsdb.aura.metrics.meta.endpoints.ShardEndPoint; +//TODO: Make a builder +public class MystEndpoint implements ShardEndPoint { + + private final String host; + private final int port; + + public MystEndpoint(String host, int port) { + this.host = host; + this.port = port; + } + + + @Override + public String getHost() { + return this.host; + } + + @Override + public int getPort() { + return this.port; + } + + @Override + public Protocol getProtocol() { + return Protocol.http2_0; + } + + @Override + public boolean mtls() { + return false; + } + + public static class Builder { + private String l_host; + private int l_port; + + public Builder withHost(String host) { + this.l_host = host; + return this; + } + + public Builder withPort(int port) { + this.l_port = port; + return this; + } + + public MystEndpoint build() { + return new MystEndpoint(l_host, l_port); + } + + public static Builder newBuilder() { + return new Builder(); + } + + } +} diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistry.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistry.java new file mode 100644 index 00000000..c1b4b77b --- /dev/null +++ b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistry.java @@ -0,0 +1,52 @@ +package net.opentsdb.aura.metrics.meta.endpoints.impl; + +import net.opentsdb.aura.metrics.meta.endpoints.ShardEndPoint; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +//TODO: +// make a cache +// Refetch for a namespace only if things have changed +// Write testcases using Mocks (jmockit or mockito) +public class MystStatefulSetRegistry extends KubernetesStatefulRegistry { + + private static final String pod_name_pattern = "%s-myst-%s-%s.svc.%s"; + + public MystStatefulSetRegistry() { + super("myst"); + } + + @Override + public Map> getEndpoints(String namespace) { + Map> endpointsMap = new HashMap<>(); + + final int numShards = getNumShards(namespace); + final int numReplicas = getNumReplicas(namespace); + for(int i = 0; i < numReplicas; i++) { + String replica = getReplica(i); + List shardEndPoints = new ArrayList<>(); + endpointsMap.put(replica, shardEndPoints); + for (int j = 0; j < numShards; i++) { + final MystEndpoint endpoint = MystEndpoint.Builder.newBuilder() + .withHost( + String.format( + pod_name_pattern, + namespace, + replica, + j, + getDomain(namespace))) + .withPort(getPort(namespace)) + .build(); + shardEndPoints.add(endpoint); + } + } + return endpointsMap; + } + + @Override + public Map> getEndpoints(String namespace, long epoch) { + return getEndpoints(namespace); + } +} From 40fb70c03be2702bf745b2074ee47e01050c22e4 Mon Sep 17 00:00:00 2001 From: ntippabhatla Date: Tue, 12 Oct 2021 10:45:26 -0700 Subject: [PATCH 02/11] GetListOfMystEndPoints --- meta-grpc-client/build.gradle | 7 + .../metrics/meta/endpoints/ShardEndPoint.java | 18 + .../endpoints/ShardedServiceRegistry.java | 16 + .../meta/endpoints/impl/Component.java | 107 +++++ .../meta/endpoints/impl/DeploymentConfig.java | 86 ++++ .../impl/KubernetesStatefulRegistry.java | 41 -- .../meta/endpoints/impl/MystEndpoint.java | 49 ++- .../impl/MystStatefulSetRegistry.java | 54 ++- .../meta/endpoints/impl/Namespace.java | 102 +++++ .../impl/MystStatefulSetRegistryTest.java | 384 ++++++++++++++++++ .../src/test/resources/ConfigTest1.yaml | 26 ++ .../src/test/resources/ConfigTest2.yaml | 23 ++ .../src/test/resources/ConfigTest3.yaml | 21 + .../src/test/resources/ConfigTest4.yaml | 22 + 14 files changed, 897 insertions(+), 59 deletions(-) create mode 100644 meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Component.java create mode 100644 meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/DeploymentConfig.java delete mode 100644 meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/KubernetesStatefulRegistry.java create mode 100644 meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Namespace.java create mode 100644 meta-grpc-client/src/test/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistryTest.java create mode 100644 meta-grpc-client/src/test/resources/ConfigTest1.yaml create mode 100644 meta-grpc-client/src/test/resources/ConfigTest2.yaml create mode 100644 meta-grpc-client/src/test/resources/ConfigTest3.yaml create mode 100644 meta-grpc-client/src/test/resources/ConfigTest4.yaml diff --git a/meta-grpc-client/build.gradle b/meta-grpc-client/build.gradle index 68dd7eb0..b9a46fe8 100644 --- a/meta-grpc-client/build.gradle +++ b/meta-grpc-client/build.gradle @@ -45,6 +45,9 @@ def grpcVersion = '1.40.0' def protobufVersion = '3.15.6' def protocVersion = protobufVersion + + + dependencies { api project(':core') @@ -60,6 +63,7 @@ dependencies { implementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: '2.14.1' implementation group: 'org.apache.logging.log4j', name: 'log4j-1.2-api', version: '2.14.1' implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.14.1' + implementation 'org.projectlombok:lombok:1.18.16' testImplementation "io.grpc:grpc-testing:${grpcVersion}" testImplementation "io.grpc:grpc-testing:${grpcVersion}" @@ -76,6 +80,9 @@ dependencies { testCommonImplementation group: 'org.roaringbitmap', name: 'RoaringBitmap', version: '0.9.3' testCommonImplementation "io.grpc:grpc-stub:${grpcVersion}" + compile 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.0-rc2' + compile 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.0-rc2' + } protobuf { diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardEndPoint.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardEndPoint.java index 8cd9d00c..b46a3e34 100644 --- a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardEndPoint.java +++ b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardEndPoint.java @@ -1,9 +1,27 @@ +/* + * This file is part of OpenTSDB. + * Copyright (C) 2021 Yahoo. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package net.opentsdb.aura.metrics.meta.endpoints; public interface ShardEndPoint { String getHost(); + boolean equals(Object that); + int getPort(); Protocol getProtocol(); diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardedServiceRegistry.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardedServiceRegistry.java index b6e5fa3e..b30dfae2 100644 --- a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardedServiceRegistry.java +++ b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardedServiceRegistry.java @@ -1,3 +1,19 @@ +/* + * This file is part of OpenTSDB. + * Copyright (C) 2021 Yahoo. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package net.opentsdb.aura.metrics.meta.endpoints; import java.util.List; diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Component.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Component.java new file mode 100644 index 00000000..802cf8f4 --- /dev/null +++ b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Component.java @@ -0,0 +1,107 @@ +/* + * This file is part of OpenTSDB. + * Copyright (C) 2021 Yahoo. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.opentsdb.aura.metrics.meta.endpoints.impl; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class Component { + + @JsonProperty("component") + private String name; + + @JsonProperty("num-shards") + private int numShards; + + @JsonProperty("replicas") + private int numReplicas; + + private static final String[] replicas = {"a", "b", "c", "d", "e"}; + private int port; + + + public Component() { + + } + + public String getName() { + + return name; + } + + public int getNumShards() { + + return numShards; + } + + public int getNumReplicas() { + if(numReplicas < replicas.length) { + return numReplicas; + }else{ + return 1; + } + } + + public void setNumReplicas(int numReplicas) { + if(numReplicas < replicas.length) { + this.numReplicas = numReplicas; + }else{ + this.numReplicas = 1; + } + } + + public String getReplica(int replicaId){ + return replicas[replicaId]; + } + + public int getPort() { + + return port; + } + + public void setPort(int port) { + + this.port = port; + } + + + public void setNumShards(int numShards) { + + this.numShards = numShards; + } + + public void setName(String name) { + + this.name = name; + } + + public void setDefaultComponent(Component defaultComponent) { + if (this.numShards == 0){ + this.numShards = defaultComponent.getNumShards(); + } + if (this.numReplicas == 0){ + this.numReplicas = defaultComponent.getNumReplicas(); + } + if (this.port == 0){ + this.port = defaultComponent.getPort(); + } + } + + @Override + public String toString() { + return "Components {name=" + name + ", num-shards=" + numShards + ", replicas=" + numReplicas + ", port=" + port + "}"; + } +} diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/DeploymentConfig.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/DeploymentConfig.java new file mode 100644 index 00000000..de1d238a --- /dev/null +++ b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/DeploymentConfig.java @@ -0,0 +1,86 @@ +/* + * This file is part of OpenTSDB. + * Copyright (C) 2021 Yahoo. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.opentsdb.aura.metrics.meta.endpoints.impl; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DeploymentConfig { + + @JsonProperty("kubernetes.statefulset.registry.cluster") + private String clusterName; + + public String getClusterName() { + return clusterName; + } + + @JsonProperty("kubernetes.statefulset.registry.cluster.domain") + private String clusterDomain; + + @JsonProperty("kubernetes.statefulset.registry.namespaces") + private List namespaces; + + private final Map namespaceMap = new HashMap<>(); + + public DeploymentConfig(){ + + } + + public String getClusterDomain() { + return clusterDomain; + } + + public void setClusterDomain(String clusterDomain) { + this.clusterDomain=clusterDomain; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public List getListOfNamespaces() { + return namespaces; + } + + public void setListOfNamespaces(List namespaces) { + this.namespaces = namespaces; + } + + public Map toMap() { + Namespace defaultNamespace = null; + try { + for (Namespace namespace : namespaces) { + namespace.toMap(); + if (namespace.getName().equals("default")) { + defaultNamespace = namespace; + } else { + namespaceMap.put(namespace.getName(), namespace); + } + } + for (String key : namespaceMap.keySet()) { + namespaceMap.get(key).setDefaultNamespace(defaultNamespace); + } + } catch (Exception e){ + System.out.println(e); + } + + return namespaceMap; + } + +} diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/KubernetesStatefulRegistry.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/KubernetesStatefulRegistry.java deleted file mode 100644 index 3ae5f22a..00000000 --- a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/KubernetesStatefulRegistry.java +++ /dev/null @@ -1,41 +0,0 @@ -package net.opentsdb.aura.metrics.meta.endpoints.impl; - -import net.opentsdb.aura.metrics.meta.endpoints.ShardedServiceRegistry; - -//TODO: -// Parse the yaml file. -// Look at Plugin implementation. -// if it is too complicated, then parse the file yourself. -// Parsing the file should use ObjectMapper. -public abstract class KubernetesStatefulRegistry implements ShardedServiceRegistry { - - private String type; - private final String[] replicas = {"a", "b", "c", "d", "e"}; - protected static final String NUM_SHARDS = "num-shards"; - protected static final String REPLICAS = "replicas"; - protected static final String K8S_NAMESPACE = "k8s_namespace"; - - public KubernetesStatefulRegistry(final String type) { - this.type = type; - } - - protected int getNumShards(String namespace) { - return 0; - } - - protected int getNumReplicas(String namespace) { - return 0; - } - - protected String getReplica(int replicaId) { - return replicas[replicaId]; - } - - protected int getPort(String namespace) { - return 0; - } - - protected String getDomain(String namespace) { - return null; - } -} diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystEndpoint.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystEndpoint.java index fc997172..b7584999 100644 --- a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystEndpoint.java +++ b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystEndpoint.java @@ -1,7 +1,26 @@ +/* + * This file is part of OpenTSDB. + * Copyright (C) 2021 Yahoo. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package net.opentsdb.aura.metrics.meta.endpoints.impl; import net.opentsdb.aura.metrics.meta.endpoints.ShardEndPoint; -//TODO: Make a builder + +import java.util.Objects; + + public class MystEndpoint implements ShardEndPoint { private final String host; @@ -28,6 +47,30 @@ public Protocol getProtocol() { return Protocol.http2_0; } + @Override + public boolean equals(Object that){ + if (Objects.isNull(that)){ + return false; + } + if (this == that){ + return true; + }else if (!(that instanceof ShardEndPoint)) { + return false; + + }else{ + if (Objects.equals(this.getHost(), ((ShardEndPoint) that).getHost()) && + Objects.equals(this.getPort(), ((ShardEndPoint) that).getPort()) + && Objects.equals(this.getProtocol(), ((ShardEndPoint) that).getProtocol()) && + ((ShardEndPoint) that).mtls() == false && this.mtls() == false ){ + + return true; + }else{ + return false; + } + } +} + + @Override public boolean mtls() { return false; @@ -56,4 +99,8 @@ public static Builder newBuilder() { } } + @Override + public String toString() { + return "Endpoints {host=" + host + ", port=" + port +"}"; + } } diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistry.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistry.java index c1b4b77b..a5f70f70 100644 --- a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistry.java +++ b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistry.java @@ -1,34 +1,52 @@ +/* + * This file is part of OpenTSDB. + * Copyright (C) 2021 Yahoo. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package net.opentsdb.aura.metrics.meta.endpoints.impl; import net.opentsdb.aura.metrics.meta.endpoints.ShardEndPoint; +import net.opentsdb.aura.metrics.meta.endpoints.ShardedServiceRegistry; + +import java.util.*; + + +public class MystStatefulSetRegistry implements ShardedServiceRegistry { -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -//TODO: -// make a cache -// Refetch for a namespace only if things have changed -// Write testcases using Mocks (jmockit or mockito) -public class MystStatefulSetRegistry extends KubernetesStatefulRegistry { + private static final String pod_name_pattern = "%s-myst-%s-%s.%s.svc.%s"; - private static final String pod_name_pattern = "%s-myst-%s-%s.svc.%s"; + private DeploymentConfig config; + + public MystStatefulSetRegistry(DeploymentConfig config) { + this.config=config; + } public MystStatefulSetRegistry() { - super("myst"); } @Override public Map> getEndpoints(String namespace) { Map> endpointsMap = new HashMap<>(); - final int numShards = getNumShards(namespace); - final int numReplicas = getNumReplicas(namespace); + final int numShards = config.toMap().get(namespace).toMap().get("myst").getNumShards(); + final int numReplicas = config.toMap().get(namespace).toMap().get("myst").getNumReplicas(); for(int i = 0; i < numReplicas; i++) { - String replica = getReplica(i); + String replica = config.toMap().get(namespace).toMap().get("myst").getReplica(i); List shardEndPoints = new ArrayList<>(); endpointsMap.put(replica, shardEndPoints); - for (int j = 0; j < numShards; i++) { + for (int j = 0; j < numShards; j++) { final MystEndpoint endpoint = MystEndpoint.Builder.newBuilder() .withHost( String.format( @@ -36,10 +54,12 @@ public Map> getEndpoints(String namespace) { namespace, replica, j, - getDomain(namespace))) - .withPort(getPort(namespace)) + namespace, + config.getClusterDomain())) + .withPort(config.toMap().get(namespace).toMap().get("myst").getPort()) .build(); shardEndPoints.add(endpoint); + } } return endpointsMap; diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Namespace.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Namespace.java new file mode 100644 index 00000000..5a1b3495 --- /dev/null +++ b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Namespace.java @@ -0,0 +1,102 @@ +/* + * This file is part of OpenTSDB. + * Copyright (C) 2021 Yahoo. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.opentsdb.aura.metrics.meta.endpoints.impl; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class Namespace { + + @JsonProperty("namespace") + private String name; + + @JsonProperty("k8s_namespace") + private String k8sNamespace; + + private List components; + private final Map componentMap = new HashMap<>(); + + public Namespace(){ + + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public void setDefaultNamespace(Namespace defaultNamespace) { + if (Objects.nonNull(this.k8sNamespace)) { + this.k8sNamespace = defaultNamespace.getk8sNamespace(); + } + if (Objects.nonNull(defaultNamespace)) { + + final Map defaultComponentMap = defaultNamespace.componentMap; + Component defaultComponent = null; + for (String key : defaultComponentMap.keySet()) { + defaultComponent = defaultComponentMap.get(key); + if (componentMap.containsKey(key)) { + componentMap.get(key).setDefaultComponent(defaultComponent); + } else { + componentMap.put(defaultComponent.getName(), defaultComponent); + } + } + } + } + + public String getk8sNamespace() { + return k8sNamespace; + } + + public void setk8sNamespace(String k8sNamespace) { + this.k8sNamespace = k8sNamespace; + } + + public List getComponents(){ + return components; + } + + public void setComponents(List components){ + this.components = components; + } + + public Map toMap() + { + if (components != null) { + for (Component component : components) { + componentMap.put(component.getName(), component); + } + } + return componentMap; + } + + @Override + public String toString() { + return "Namespaces {name=" + name + ", k8s_namespace=" + k8sNamespace + ", " + componentMap + "}"; + } + + + + + +} diff --git a/meta-grpc-client/src/test/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistryTest.java b/meta-grpc-client/src/test/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistryTest.java new file mode 100644 index 00000000..bdb0be66 --- /dev/null +++ b/meta-grpc-client/src/test/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistryTest.java @@ -0,0 +1,384 @@ +/* + * This file is part of OpenTSDB. + * Copyright (C) 2021 Yahoo. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.opentsdb.aura.metrics.meta.endpoints.impl; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import net.opentsdb.aura.metrics.meta.endpoints.ShardEndPoint; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class MystStatefulSetRegistryTest { + + private static final String pod_name_pattern = "%s-myst-%s-%s.%s.svc.%s"; + private static final String[] replicas = {"a", "b", "c", "d", "e"}; + private static final String cluster_domain = "cluster.local"; + + @Test + public void testListOfEndpoints1() throws IOException { + + ObjectMapper mapper1 = new ObjectMapper(new YAMLFactory()); + + try { + DeploymentConfig cluster_config1 = mapper1.readValue(new File("src/test/resources/ConfigTest1.yaml"), DeploymentConfig.class); + Map namespaceMap = cluster_config1.toMap(); + MystStatefulSetRegistry myststs1 = new MystStatefulSetRegistry(cluster_config1); + HashMap> expectedEndpointsMap = new HashMap<>(); + List componentList1 = new ArrayList<>(); + List componentList2 = new ArrayList<>(); + Component a = new Component(); + a.setNumReplicas(1); + a.setNumShards(1); + a.setName("myst"); + a.setPort(9999); + Component b = new Component(); + b.setNumReplicas(1); + b.setNumShards(1); + b.setName("aura-metrics"); + b.setPort(4080); + componentList1.add(b); + componentList1.add(a); + Component m = new Component(); + m.setNumReplicas(1); + m.setNumShards(1); + m.setName("myst"); + m.setPort(9999); + componentList2.add(m); + Namespace n1 = new Namespace(); + n1.setName("default"); + n1.setk8sNamespace("default"); + n1.setComponents(componentList1); + Namespace n2 = new Namespace(); + n2.setName("ssp"); + n2.setk8sNamespace("ssp"); + n2.setComponents(componentList2); + List namespaceList1 = new ArrayList<>(); + namespaceList1.add(n1); + namespaceList1.add(n2); + DeploymentConfig expectedConfig = new DeploymentConfig(); + expectedConfig.setClusterName("kubernetes.default.GQ"); + expectedConfig.setClusterDomain("cluster.local"); + expectedConfig.setListOfNamespaces(namespaceList1); + + + System.out.println("Test Case 1: "); + for (String key : namespaceMap.keySet()) { + Map> result = myststs1.getEndpoints(key); + final int numShards = expectedConfig.toMap().get(key).toMap().get("myst").getNumShards();; + final int numReplicas = expectedConfig.toMap().get(key).toMap().get("myst").getNumReplicas(); + String namespace = key; + for (int i = 0; i < numReplicas; i++) { + String replica = replicas[i]; + List expectedShardEndPoints = new ArrayList<>(); + expectedEndpointsMap.put(replica, expectedShardEndPoints); + for (int j = 0; j < numShards; j++) { + MystEndpoint endpoint = MystEndpoint.Builder.newBuilder() + .withHost( + String.format( + pod_name_pattern, + namespace, + replica, + j, + namespace, + cluster_domain)) + .withPort(9999) + .build(); + expectedShardEndPoints.add(endpoint); + + } + } + + System.out.println("Result"+myststs1.getEndpoints(key)); + System.out.println("Expected"+expectedEndpointsMap); + assertEquals(expectedEndpointsMap,result); + } + + + } catch (Exception e) { + System.out.println(e); + } + + + } + + + @Test + public void testListOfEndpoints2() throws IOException { + + ObjectMapper mapper1 = new ObjectMapper(new YAMLFactory()); + try { + DeploymentConfig cluster_config1 = mapper1.readValue(new File("src/test/resources/ConfigTest2.yaml"), DeploymentConfig.class); + + Map namespaceMap = cluster_config1.toMap(); + MystStatefulSetRegistry myststs1 = new MystStatefulSetRegistry(cluster_config1); + Map> expectedEndpointsMap = new HashMap<>(); + List componentList1 = new ArrayList<>(); + List componentList2 = new ArrayList<>(); + Component a = new Component(); + a.setNumReplicas(1); + a.setNumShards(1); + a.setName("myst"); + a.setPort(9999); + Component b = new Component(); + b.setNumReplicas(1); + b.setNumShards(1); + b.setName("aura-metrics"); + b.setPort(4080); + componentList1.add(b); + componentList1.add(a); + Component m = new Component(); + m.setNumReplicas(2); + m.setNumShards(9); + m.setName("myst"); + m.setPort(9999); + componentList2.add(m); + Namespace n1 = new Namespace(); + n1.setName("default"); + n1.setk8sNamespace("default"); + n1.setComponents(componentList1); + Namespace n2 = new Namespace(); + n2.setName("Onevideo"); + n2.setk8sNamespace("Onevideo"); + n2.setComponents(componentList2); + List namespaceList1 = new ArrayList<>(); + namespaceList1.add(n1); + namespaceList1.add(n2); + DeploymentConfig expectedConfig = new DeploymentConfig(); + expectedConfig.setClusterName("kubernetes.default.GQ"); + expectedConfig.setClusterDomain("cluster.local"); + expectedConfig.setListOfNamespaces(namespaceList1); + + System.out.println("Test Case 2: "); + for (String key : namespaceMap.keySet()) { + Map> result = myststs1.getEndpoints(key); + + + final int numShards = expectedConfig.toMap().get(key).toMap().get("myst").getNumShards();; + final int numReplicas = expectedConfig.toMap().get(key).toMap().get("myst").getNumReplicas(); + String namespace = key; + + for (int i = 0; i < numReplicas; i++) { + String replica = replicas[i]; + List expectedShardEndPoints = new ArrayList<>(); + expectedEndpointsMap.put(replica, expectedShardEndPoints); + for (int j = 0; j < numShards; j++) { + MystEndpoint endpoint = MystEndpoint.Builder.newBuilder() + .withHost( + String.format( + pod_name_pattern, + namespace, + replica, + j, + namespace, + cluster_domain)) + .withPort(9999) + .build(); + expectedShardEndPoints.add(endpoint); + + } + } + + System.out.println("Result"+myststs1.getEndpoints(key)); + System.out.println("Expected"+expectedEndpointsMap); + assertEquals(expectedEndpointsMap, result); + } + + + } catch (Exception e) { + System.out.println(e); + } + + + } + + @Test + public void testListOfEndpoints3() throws IOException { + + ObjectMapper mapper1 = new ObjectMapper(new YAMLFactory()); + try { + DeploymentConfig cluster_config1 = mapper1.readValue(new File("src/test/resources/ConfigTest3.yaml"), DeploymentConfig.class); + + Map namespaceMap = cluster_config1.toMap(); + MystStatefulSetRegistry myststs1 = new MystStatefulSetRegistry(cluster_config1); + Map> expectedEndpointsMap = new HashMap<>(); + List componentList1 = new ArrayList<>(); + List componentList2 = new ArrayList<>(); + Component a = new Component(); + a.setNumReplicas(1); + a.setNumShards(1); + a.setName("myst"); + a.setPort(9999); + Component b = new Component(); + b.setNumReplicas(1); + b.setNumShards(1); + b.setName("aura-metrics"); + b.setPort(4080); + componentList1.add(b); + componentList1.add(a); + Component m = new Component(); + m.setNumShards(0); + m.setName("myst"); + componentList2.add(m); + Namespace n1 = new Namespace(); + n1.setName("default"); + n1.setk8sNamespace("default"); + n1.setComponents(componentList1); + Namespace n2 = new Namespace(); + n2.setName("Onevideo"); + n2.setk8sNamespace("Onevideo"); + n2.setComponents(componentList2); + List namespaceList1 = new ArrayList<>(); + namespaceList1.add(n1); + namespaceList1.add(n2); + DeploymentConfig expectedConfig = new DeploymentConfig(); + expectedConfig.setClusterName("kubernetes.default.GQ"); + expectedConfig.setClusterDomain("cluster.local"); + expectedConfig.setListOfNamespaces(namespaceList1); + + System.out.println("Test Case 3: "); + for (String key : namespaceMap.keySet()) { + Map> result = myststs1.getEndpoints(key); + final int numShards = expectedConfig.toMap().get(key).toMap().get("myst").getNumShards();; + final int numReplicas = expectedConfig.toMap().get(key).toMap().get("myst").getNumReplicas(); + String namespace = key; + for (int i = 0; i < numReplicas; i++) { + String replica = replicas[i]; + List expectedShardEndPoints = new ArrayList<>(); + expectedEndpointsMap.put(replica, expectedShardEndPoints); + for (int j = 0; j < numShards; j++) { + final MystEndpoint endpoint = MystEndpoint.Builder.newBuilder() + .withHost( + String.format( + pod_name_pattern, + namespace, + replica, + j, + namespace, + cluster_domain)) + .withPort(9999) + .build(); + expectedShardEndPoints.add(endpoint); + + } + } + + System.out.println("Result"+myststs1.getEndpoints(key)); + System.out.println("Expected"+expectedEndpointsMap); + assertEquals(expectedEndpointsMap.toString(), result.toString()); + } + + + } catch (Exception e) { + System.out.println(e); + } + + } + + @Test + public void testListOfEndpoints4() throws IOException { + + ObjectMapper mapper1 = new ObjectMapper(new YAMLFactory()); + try { + DeploymentConfig cluster_config1 = mapper1.readValue(new File("src/test/resources/ConfigTest4.yaml"), DeploymentConfig.class); + + Map namespaceMap = cluster_config1.toMap(); + MystStatefulSetRegistry myststs1 = new MystStatefulSetRegistry(cluster_config1); + Map> expectedEndpointsMap = new HashMap<>(); + List componentList1 = new ArrayList<>(); + List componentList2 = new ArrayList<>(); + Component a = new Component(); + a.setNumReplicas(1); + a.setNumShards(1); + a.setName("myst"); + a.setPort(9999); + Component b = new Component(); + b.setNumReplicas(1); + b.setNumShards(1); + b.setName("aura-metrics"); + b.setPort(4080); + componentList1.add(b); + componentList1.add(a); + Component m = new Component(); + m.setNumReplicas(6); + m.setNumShards(10); + m.setName("myst"); + componentList2.add(m); + Namespace n1 = new Namespace(); + n1.setName("default"); + n1.setk8sNamespace("default"); + n1.setComponents(componentList1); + Namespace n2 = new Namespace(); + n2.setName("Onevideo"); + n2.setk8sNamespace("Onevideo"); + n2.setComponents(componentList2); + List namespaceList1 = new ArrayList<>(); + namespaceList1.add(n1); + namespaceList1.add(n2); + DeploymentConfig expectedConfig = new DeploymentConfig(); + expectedConfig.setClusterName("kubernetes.default.GQ"); + expectedConfig.setClusterDomain("cluster.local"); + expectedConfig.setListOfNamespaces(namespaceList1); + + System.out.println("Test Case 4: "); + for (String key : namespaceMap.keySet()) { + Map> result = myststs1.getEndpoints(key); + final int numShards = expectedConfig.toMap().get(key).toMap().get("myst").getNumShards(); + final int numReplicas = expectedConfig.toMap().get(key).toMap().get("myst").getNumReplicas(); + String namespace = key; + for (int i = 0; i < numReplicas; i++) { + String replica = replicas[i]; + List expectedShardEndPoints = new ArrayList<>(); + expectedEndpointsMap.put(replica, expectedShardEndPoints); + for (int j = 0; j < numShards; j++) { + final MystEndpoint endpoint = MystEndpoint.Builder.newBuilder() + .withHost( + String.format( + pod_name_pattern, + namespace, + replica, + j, + namespace, + cluster_domain)) + .withPort(9999) + .build(); + expectedShardEndPoints.add(endpoint); + + } + } + + + System.out.println("Result"+myststs1.getEndpoints(key)); + System.out.println("Expected"+expectedEndpointsMap); + assertEquals(expectedEndpointsMap, result); + } + + + } catch (Exception e) { + System.out.println(e); + } + + } +} diff --git a/meta-grpc-client/src/test/resources/ConfigTest1.yaml b/meta-grpc-client/src/test/resources/ConfigTest1.yaml new file mode 100644 index 00000000..505a09e4 --- /dev/null +++ b/meta-grpc-client/src/test/resources/ConfigTest1.yaml @@ -0,0 +1,26 @@ +kubernetes.statefulset.registry.cluster: kubernetes.default.GQ +kubernetes.statefulset.registry.cluster.domain: cluster.local +kubernetes.statefulset.registry.namespaces: + - namespace: default + k8s_namespace: default + components: + - component: aura-metrics + num-shards: 1 + replicas: 1 + port: 4080 + - component: myst + num-shards: 1 + replicas: 1 + port: 9999 + - namespace: ssp + k8s_namespace: ssp + components: + - component: aura-metrics + num-shards: 10 + replicas: 1 + - component: myst + + + + + diff --git a/meta-grpc-client/src/test/resources/ConfigTest2.yaml b/meta-grpc-client/src/test/resources/ConfigTest2.yaml new file mode 100644 index 00000000..b67f1fad --- /dev/null +++ b/meta-grpc-client/src/test/resources/ConfigTest2.yaml @@ -0,0 +1,23 @@ +kubernetes.statefulset.registry.cluster: kubernetes.default.GQ +kubernetes.statefulset.registry.cluster.domain: cluster.local +kubernetes.statefulset.registry.namespaces: + - namespace: default + k8s_namespace: default + components: + - component: aura-metrics + num-shards: 1 + replicas: 1 + port: 4080 + - component: myst + num-shards: 1 + replicas: 1 + port: 9999 + - namespace: Onevideo + k8s_namespace: default + components: + - component: myst + num-shards: 9 + replicas: 2 + + + diff --git a/meta-grpc-client/src/test/resources/ConfigTest3.yaml b/meta-grpc-client/src/test/resources/ConfigTest3.yaml new file mode 100644 index 00000000..8be6d0b3 --- /dev/null +++ b/meta-grpc-client/src/test/resources/ConfigTest3.yaml @@ -0,0 +1,21 @@ +kubernetes.statefulset.registry.cluster: kubernetes.default.GQ +kubernetes.statefulset.registry.cluster.domain: cluster.local +kubernetes.statefulset.registry.namespaces: + - namespace: default + k8s_namespace: default + components: + - component: aura-metrics + num-shards: 1 + replicas: 1 + port: 4080 + - component: myst + num-shards: 1 + replicas: 1 + port: 9999 + - namespace: Onevideo + components: + - component: myst + num-shards: 0 + + + diff --git a/meta-grpc-client/src/test/resources/ConfigTest4.yaml b/meta-grpc-client/src/test/resources/ConfigTest4.yaml new file mode 100644 index 00000000..e33d8e03 --- /dev/null +++ b/meta-grpc-client/src/test/resources/ConfigTest4.yaml @@ -0,0 +1,22 @@ +kubernetes.statefulset.registry.cluster: kubernetes.default.GQ +kubernetes.statefulset.registry.cluster.domain: cluster.local +kubernetes.statefulset.registry.namespaces: + - namespace: default + k8s_namespace: default + components: + - component: aura-metrics + num-shards: 1 + replicas: 1 + port: 4080 + - component: myst + num-shards: 1 + replicas: 1 + port: 9999 + - namespace: Onevideo + components: + - component: myst + num-shards: 10 + replicas: 8 + + + From 7c70d4ebba75a4e18ab320d0934c97825b7f0c82 Mon Sep 17 00:00:00 2001 From: Ravi kiran Chiruvolu Date: Wed, 13 Oct 2021 15:00:52 -0700 Subject: [PATCH 03/11] sharded meta client stubs --- .../metrics/meta/SerialShardedMetaClient.java | 35 +++++++++++++++++++ .../aura/metrics/meta/ShardedMetaClient.java | 11 ++++++ .../meta/ShardedMetaClientFactory.java | 7 ++++ 3 files changed, 53 insertions(+) create mode 100644 opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/SerialShardedMetaClient.java create mode 100644 opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClient.java create mode 100644 opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClientFactory.java diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/SerialShardedMetaClient.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/SerialShardedMetaClient.java new file mode 100644 index 00000000..70526ff9 --- /dev/null +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/SerialShardedMetaClient.java @@ -0,0 +1,35 @@ +package net.opentsdb.aura.metrics.meta; + +import com.stumbleupon.async.Deferred; +import net.opentsdb.core.BaseTSDBPlugin; +import net.opentsdb.core.TSDB; + +import java.util.Iterator; + +public class SerialShardedMetaClient extends BaseTSDBPlugin implements ShardedMetaClient { + + public static final String TYPE = "ShardedMetaClient"; + + @Override + public Iterator getTimeseriesAllShards() { + return null; + } + + @Override + public Iterator getTimeSeriesPerShard(int shardId) { + return null; + } + + @Override + public String type() { + return TYPE; + } + + @Override + public Deferred initialize(TSDB tsdb, String id) { + + + + return null; + } +} diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClient.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClient.java new file mode 100644 index 00000000..b5013174 --- /dev/null +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClient.java @@ -0,0 +1,11 @@ +package net.opentsdb.aura.metrics.meta; + +import java.util.Iterator; + +public interface ShardedMetaClient { + + Iterator> getTimeseriesAllShards(); + + Iterator getTimeSeriesPerShard(int shardId); + +} diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClientFactory.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClientFactory.java new file mode 100644 index 00000000..4bab788e --- /dev/null +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClientFactory.java @@ -0,0 +1,7 @@ +package net.opentsdb.aura.metrics.meta; + +public interface ShardedMetaClientFactory { + + ShardedMetaClient getShardedMetaClient(String namespace); + +} From eabdcfa4174a681e53f1216c0a81a44a7eb3b448 Mon Sep 17 00:00:00 2001 From: Ravi kiran Chiruvolu Date: Fri, 15 Oct 2021 13:58:41 -0700 Subject: [PATCH 04/11] service discovery --- .../metrics/meta/endpoints/ShardEndPoint.java | 2 + .../meta/endpoints/impl/DeploymentConfig.java | 86 ------------- .../impl/MystStatefulSetRegistry.java | 72 ----------- .../meta/DefaultShardedMetaClient.java | 102 +++++++++++++++ .../metrics/meta/SerialShardedMetaClient.java | 35 ----- .../aura/metrics/meta/ShardedMetaClient.java | 4 +- .../meta/ShardedMetaClientFactory.java | 6 +- .../AuraMetricsStatefulSetRegistry.java | 39 ++++++ .../endpoints/BaseStatefulSetRegistry.java | 121 ++++++++++++++++++ .../endpoints/MystStatefulSetRegistry.java | 43 +++++++ .../meta/endpoints/impl/Component.java | 11 +- .../meta/endpoints/impl/DeploymentConfig.java | 89 +++++++++++++ .../meta/endpoints/impl/Namespace.java | 18 +-- .../meta/endpoints/impl/SimpleEndPoint.java | 39 ++++-- .../MystStatefulSetRegistryTest.java | 82 +++++++----- .../src/test/resources/ConfigTest1.yaml | 9 +- .../src/test/resources/ConfigTest2.yaml | 9 +- .../src/test/resources/ConfigTest3.yaml | 9 +- .../src/test/resources/ConfigTest4.yaml | 9 +- 19 files changed, 517 insertions(+), 268 deletions(-) delete mode 100644 meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/DeploymentConfig.java delete mode 100644 meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistry.java create mode 100644 opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/DefaultShardedMetaClient.java delete mode 100644 opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/SerialShardedMetaClient.java create mode 100644 opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/AuraMetricsStatefulSetRegistry.java create mode 100644 opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/BaseStatefulSetRegistry.java create mode 100644 opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/MystStatefulSetRegistry.java rename {meta-grpc-client => opentsdb}/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Component.java (91%) create mode 100644 opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/DeploymentConfig.java rename {meta-grpc-client => opentsdb}/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Namespace.java (91%) rename meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystEndpoint.java => opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/SimpleEndPoint.java (65%) rename {meta-grpc-client/src/test/java/net/opentsdb/aura/metrics/meta/endpoints/impl => opentsdb/src/test/java/net/opentsdb/aura/metrics/meta/endpoints}/MystStatefulSetRegistryTest.java (83%) rename {meta-grpc-client => opentsdb}/src/test/resources/ConfigTest1.yaml (66%) rename {meta-grpc-client => opentsdb}/src/test/resources/ConfigTest2.yaml (65%) rename {meta-grpc-client => opentsdb}/src/test/resources/ConfigTest3.yaml (61%) rename {meta-grpc-client => opentsdb}/src/test/resources/ConfigTest4.yaml (63%) diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardEndPoint.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardEndPoint.java index b46a3e34..24c4fd62 100644 --- a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardEndPoint.java +++ b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/ShardEndPoint.java @@ -24,6 +24,8 @@ public interface ShardEndPoint { int getPort(); + int getShardIndex(); + Protocol getProtocol(); boolean mtls(); diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/DeploymentConfig.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/DeploymentConfig.java deleted file mode 100644 index de1d238a..00000000 --- a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/DeploymentConfig.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * This file is part of OpenTSDB. - * Copyright (C) 2021 Yahoo. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package net.opentsdb.aura.metrics.meta.endpoints.impl; - -import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class DeploymentConfig { - - @JsonProperty("kubernetes.statefulset.registry.cluster") - private String clusterName; - - public String getClusterName() { - return clusterName; - } - - @JsonProperty("kubernetes.statefulset.registry.cluster.domain") - private String clusterDomain; - - @JsonProperty("kubernetes.statefulset.registry.namespaces") - private List namespaces; - - private final Map namespaceMap = new HashMap<>(); - - public DeploymentConfig(){ - - } - - public String getClusterDomain() { - return clusterDomain; - } - - public void setClusterDomain(String clusterDomain) { - this.clusterDomain=clusterDomain; - } - - public void setClusterName(String clusterName) { - this.clusterName = clusterName; - } - - public List getListOfNamespaces() { - return namespaces; - } - - public void setListOfNamespaces(List namespaces) { - this.namespaces = namespaces; - } - - public Map toMap() { - Namespace defaultNamespace = null; - try { - for (Namespace namespace : namespaces) { - namespace.toMap(); - if (namespace.getName().equals("default")) { - defaultNamespace = namespace; - } else { - namespaceMap.put(namespace.getName(), namespace); - } - } - for (String key : namespaceMap.keySet()) { - namespaceMap.get(key).setDefaultNamespace(defaultNamespace); - } - } catch (Exception e){ - System.out.println(e); - } - - return namespaceMap; - } - -} diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistry.java b/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistry.java deleted file mode 100644 index a5f70f70..00000000 --- a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistry.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * This file is part of OpenTSDB. - * Copyright (C) 2021 Yahoo. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package net.opentsdb.aura.metrics.meta.endpoints.impl; - -import net.opentsdb.aura.metrics.meta.endpoints.ShardEndPoint; -import net.opentsdb.aura.metrics.meta.endpoints.ShardedServiceRegistry; - -import java.util.*; - - -public class MystStatefulSetRegistry implements ShardedServiceRegistry { - - private static final String pod_name_pattern = "%s-myst-%s-%s.%s.svc.%s"; - - private DeploymentConfig config; - - public MystStatefulSetRegistry(DeploymentConfig config) { - this.config=config; - } - - public MystStatefulSetRegistry() { - } - - @Override - public Map> getEndpoints(String namespace) { - Map> endpointsMap = new HashMap<>(); - - final int numShards = config.toMap().get(namespace).toMap().get("myst").getNumShards(); - final int numReplicas = config.toMap().get(namespace).toMap().get("myst").getNumReplicas(); - for(int i = 0; i < numReplicas; i++) { - String replica = config.toMap().get(namespace).toMap().get("myst").getReplica(i); - List shardEndPoints = new ArrayList<>(); - endpointsMap.put(replica, shardEndPoints); - for (int j = 0; j < numShards; j++) { - final MystEndpoint endpoint = MystEndpoint.Builder.newBuilder() - .withHost( - String.format( - pod_name_pattern, - namespace, - replica, - j, - namespace, - config.getClusterDomain())) - .withPort(config.toMap().get(namespace).toMap().get("myst").getPort()) - .build(); - shardEndPoints.add(endpoint); - - } - } - return endpointsMap; - } - - @Override - public Map> getEndpoints(String namespace, long epoch) { - return getEndpoints(namespace); - } -} diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/DefaultShardedMetaClient.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/DefaultShardedMetaClient.java new file mode 100644 index 00000000..1911bc6e --- /dev/null +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/DefaultShardedMetaClient.java @@ -0,0 +1,102 @@ +package net.opentsdb.aura.metrics.meta; + +import com.stumbleupon.async.Deferred; +import net.opentsdb.aura.metrics.meta.endpoints.ShardEndPoint; +import net.opentsdb.aura.metrics.meta.endpoints.ShardedServiceRegistry; +import net.opentsdb.aura.metrics.meta.grpc.MetaGrpcClient; +import net.opentsdb.configuration.Configuration; +import net.opentsdb.core.BaseTSDBPlugin; +import net.opentsdb.core.TSDB; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +public class DefaultShardedMetaClient + extends BaseTSDBPlugin + implements ShardedMetaClient { + + public static final String TYPE = "SerialShardedMetaClient"; + + private volatile ShardedServiceRegistry registry; + + private Map clients = new ConcurrentHashMap<>(); + private final String MUTEX = new String(); + private Random random = new Random(); + + private List setUpEndpoints(String namespace) { + final Map> endpointsMap = registry.getEndpoints(namespace); + List endPoints = getEndpoints(endpointsMap); + + createClients(endPoints); + return endPoints; + } + + @Override + public Iterator getTimeseriesAllShards(String namespace, String query) { + + final List shardEndPoints = setUpEndpoints(namespace); + List timeSeriesQueryResults = new ArrayList<>(); + for (ShardEndPoint endPoint : shardEndPoints) { + timeSeriesQueryResults.add(getResult(clients.get(endPoint), query)); + } + + return timeSeriesQueryResults.iterator(); + } + + private MergedMetaTimeSeriesQueryResult getResult(MetaGrpcClient metaGrpcClient, String query) { + MergedMetaTimeSeriesQueryResult mergedMetaTimeSeriesQueryResult = new MergedMetaTimeSeriesQueryResult(); + final Iterator timeseries = metaGrpcClient.getTimeseries(query); + while (timeseries.hasNext()) { + mergedMetaTimeSeriesQueryResult.add(timeseries.next()); + } + return mergedMetaTimeSeriesQueryResult; + } + + private void createClients(List endPoints) { + + for (ShardEndPoint endPoint : endPoints) { + if (!this.clients.containsKey(endPoint)) { + synchronized (MUTEX) { + if (!clients.containsKey(endPoint)) { + clients.put(endPoint, new MetaGrpcClient(endPoint.getHost(), endPoint.getPort())); + } + } + } + } + } + + private List getEndpoints(Map> endpointsMap) { + final int i = random.nextInt(endpointsMap.size()); + int j = 0; + for(String key: endpointsMap.keySet()) { + if (j == i) { + return endpointsMap.get(key); + } + j++; + } + + return null; + } + + @Override + public MergedMetaTimeSeriesQueryResult getTimeSeriesPerShard(String namespace, String query, int shardId) { + + final ShardEndPoint shardEndPoint = setUpEndpoints(namespace).get(shardId); + return getResult(clients.get(shardEndPoint), query); + } + + @Override + public String type() { + return TYPE; + } + + @Override + public Deferred initialize(TSDB tsdb, String id) { + + final Configuration config = tsdb.getConfig(); + + registry = config.getTyped("meta.service.registry", ShardedServiceRegistry.class); + + return null; + } +} diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/SerialShardedMetaClient.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/SerialShardedMetaClient.java deleted file mode 100644 index 70526ff9..00000000 --- a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/SerialShardedMetaClient.java +++ /dev/null @@ -1,35 +0,0 @@ -package net.opentsdb.aura.metrics.meta; - -import com.stumbleupon.async.Deferred; -import net.opentsdb.core.BaseTSDBPlugin; -import net.opentsdb.core.TSDB; - -import java.util.Iterator; - -public class SerialShardedMetaClient extends BaseTSDBPlugin implements ShardedMetaClient { - - public static final String TYPE = "ShardedMetaClient"; - - @Override - public Iterator getTimeseriesAllShards() { - return null; - } - - @Override - public Iterator getTimeSeriesPerShard(int shardId) { - return null; - } - - @Override - public String type() { - return TYPE; - } - - @Override - public Deferred initialize(TSDB tsdb, String id) { - - - - return null; - } -} diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClient.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClient.java index b5013174..fb8d3ee8 100644 --- a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClient.java +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClient.java @@ -4,8 +4,8 @@ public interface ShardedMetaClient { - Iterator> getTimeseriesAllShards(); + Iterator getTimeseriesAllShards(String namespace, String query); - Iterator getTimeSeriesPerShard(int shardId); + ResT getTimeSeriesPerShard(String namespace, String query, int shardId); } diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClientFactory.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClientFactory.java index 4bab788e..424bc8cd 100644 --- a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClientFactory.java +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClientFactory.java @@ -1,7 +1,5 @@ package net.opentsdb.aura.metrics.meta; -public interface ShardedMetaClientFactory { - - ShardedMetaClient getShardedMetaClient(String namespace); - +public interface ShardedMetaClientFactory { + ShardedMetaClient getShardedMetaClient(String namespace); } diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/AuraMetricsStatefulSetRegistry.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/AuraMetricsStatefulSetRegistry.java new file mode 100644 index 00000000..f613bf3c --- /dev/null +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/AuraMetricsStatefulSetRegistry.java @@ -0,0 +1,39 @@ +package net.opentsdb.aura.metrics.meta.endpoints; + +import net.opentsdb.aura.metrics.meta.endpoints.impl.Component; + +public class AuraMetricsStatefulSetRegistry extends BaseStatefulSetRegistry { + private static final String pod_name_prefix = "%s-aura-metrics-%s%s"; + private static final String TYPE = "AuraMetricsStatefulSetRegistry"; + private static final String COMPONENT_NAME = "aura-metrics"; + + @Override + protected String getComponentName() { + return COMPONENT_NAME; + } + + @Override + protected String getPrefix(String namespace, Component component, String replica, long epoch) { + final int epochPrefix = getEpochPrefix(epoch, component.getEpochLength()); + return String.format( + pod_name_prefix, + namespace, + epochPrefix, + replica + ); + } + + private int getEpochPrefix(long epoch, int epochLength) { + //Get index of hour in the day. + final long epochHr = epoch - epoch % 3600; + final long epochDay = epoch - epoch % 86400; + final int hrIndex = (int)(epochHr - epochDay) / 3600; + final int epochLengthInHr = epochLength / 3600; + return hrIndex / epochLengthInHr; + } + + @Override + public String type() { + return TYPE; + } +} diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/BaseStatefulSetRegistry.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/BaseStatefulSetRegistry.java new file mode 100644 index 00000000..7787be3a --- /dev/null +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/BaseStatefulSetRegistry.java @@ -0,0 +1,121 @@ +package net.opentsdb.aura.metrics.meta.endpoints; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.stumbleupon.async.Deferred; +import net.opentsdb.aura.metrics.meta.endpoints.impl.Component; +import net.opentsdb.aura.metrics.meta.endpoints.impl.DeploymentConfig; +import net.opentsdb.aura.metrics.meta.endpoints.impl.Namespace; +import net.opentsdb.aura.metrics.meta.endpoints.impl.SimpleEndPoint; +import net.opentsdb.configuration.Configuration; +import net.opentsdb.core.BaseTSDBPlugin; +import net.opentsdb.core.TSDB; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public abstract class BaseStatefulSetRegistry extends BaseTSDBPlugin implements ShardedServiceRegistry { + + public static final String TYPE = "BaseStatefulSetRegistry"; + + public static final String DOMAIN = "statefulset.domain"; + + public static final String DEFAULT_NAMESPACE = "statefulset.default.namespace"; + + public static final String DEPLOYMENT_CONFIG = "statefulset.namespaces"; + + private static final String pod_name_pattern = "%s-%s.%s.svc.%s"; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + private volatile Map>> endPoints = new ConcurrentHashMap<>(); + + private volatile DeploymentConfig deploymentConfig; + + @Override + public Map> getEndpoints(String namespace) { + return getEndpoints(namespace, -1); + } + + + public Map> getEndpoints(String namespace, long epoch) { + if (endPoints.get(namespace) != null) { + return endPoints.get(namespace); + } else { + return getAndCacheEndPoints(namespace, epoch); + } + } + + private Map> getAndCacheEndPoints(String namespace, long epoch) { + + final Map> endpoints = getEndpoints(namespace, deploymentConfig, epoch); + + this.endPoints.put(namespace, endpoints); + + return endpoints; + } + + public Map> getEndpoints(String namespace, DeploymentConfig deploymentConfig, long epoch) { + + final Namespace namespace1 = deploymentConfig.getNamespace(namespace); + final String clusterDomain = deploymentConfig.getClusterDomain(); + + final Component component = namespace1.getComponent(getComponentName()); + + final int numReplicas = component.getNumReplicas(); + final int numShards = component.getNumShards(); + + Map> endpointsMap = new HashMap<>(); + + for(int i = 0; i < numReplicas; i++) { + + String replica = component.getReplica(i); + List shardEndPoints = new ArrayList<>(); + endpointsMap.put(replica, shardEndPoints); + final String prefix = getPrefix(namespace, component, replica, epoch); + for (int j = 0; j < numShards; j++) { + final SimpleEndPoint endpoint = SimpleEndPoint.Builder.newBuilder() + .withHost( + String.format( + prefix, + j, + prefix, + clusterDomain)) + .withPort(component.getPort()) + .build(); + shardEndPoints.add(endpoint); + } + } + + return endpointsMap; + + } + + protected abstract String getComponentName(); + + protected abstract String getPrefix(String namespace, Component component, String replica, long epoch); + + @Override + public abstract String type(); + + @Override + public Deferred initialize(TSDB tsdb, String id) { + final Configuration config = tsdb.getConfig(); + + try { + final Namespace[] namespaces = objectMapper.readValue(config.getString(DEPLOYMENT_CONFIG).getBytes(), Namespace[].class); + this.deploymentConfig = DeploymentConfig.Builder.newBuilder() + .withClusterDomain(config.getString(DOMAIN)) + .withDefaultNamespace(config.getString(DEFAULT_NAMESPACE)) + .withNamespaces(namespaces) + .build(); + return null; + } catch (IOException e) { + throw new RuntimeException( "Error deserializing deployment config for stateful set", e); + } + } + +} diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/MystStatefulSetRegistry.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/MystStatefulSetRegistry.java new file mode 100644 index 00000000..68b2bf40 --- /dev/null +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/MystStatefulSetRegistry.java @@ -0,0 +1,43 @@ +/* + * This file is part of OpenTSDB. + * Copyright (C) 2021 Yahoo. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package net.opentsdb.aura.metrics.meta.endpoints; + +import net.opentsdb.aura.metrics.meta.endpoints.impl.Component; + +public class MystStatefulSetRegistry extends BaseStatefulSetRegistry { + + + private static final String TYPE = "MystStatefulSetRegistry"; + private static final String COMPONENT_NAME = "myst"; + private static final String pod_name_prefix = "%s-myst-%s"; + + @Override + protected String getComponentName() { + return COMPONENT_NAME; + } + + @Override + protected String getPrefix(String namespace, Component component, String replica, long epoch) { + return String.format(pod_name_prefix, namespace, replica); + } + + @Override + public String type() { + return TYPE; + } +} diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Component.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Component.java similarity index 91% rename from meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Component.java rename to opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Component.java index 802cf8f4..b5d47666 100644 --- a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Component.java +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Component.java @@ -29,10 +29,12 @@ public class Component { @JsonProperty("replicas") private int numReplicas; + @JsonProperty("epoch-length") + private int epochLength; + private static final String[] replicas = {"a", "b", "c", "d", "e"}; private int port; - public Component() { } @@ -67,6 +69,10 @@ public String getReplica(int replicaId){ return replicas[replicaId]; } + public int getEpochLength() { + return epochLength; + } + public int getPort() { return port; @@ -98,6 +104,9 @@ public void setDefaultComponent(Component defaultComponent) { if (this.port == 0){ this.port = defaultComponent.getPort(); } + if(epochLength == 0) { + this.epochLength = defaultComponent.getEpochLength(); + } } @Override diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/DeploymentConfig.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/DeploymentConfig.java new file mode 100644 index 00000000..91615739 --- /dev/null +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/DeploymentConfig.java @@ -0,0 +1,89 @@ +/* + * This file is part of OpenTSDB. + * Copyright (C) 2021 Yahoo. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.opentsdb.aura.metrics.meta.endpoints.impl; + +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class DeploymentConfig { + + private final String clusterDomain; + + private final Map namespaceMap; + + public DeploymentConfig(String clusterDomain, Map namespaceMap){ + this.clusterDomain = clusterDomain; + this.namespaceMap = namespaceMap; + } + + public String getClusterDomain() { + return clusterDomain; + } + + public Namespace getNamespace(String namespace) { + return namespaceMap.get(namespace); + } + + public static class Builder { + + private String l_clusterDomain; + private String l_defaultNamespace; + private Namespace[] namespaces; + + public static Builder newBuilder() { + return new Builder(); + } + + public Builder withClusterDomain(String clusterDomain) { + this.l_clusterDomain = clusterDomain; + return this; + } + + public Builder withDefaultNamespace(String defaultNamespace) { + this.l_defaultNamespace = l_defaultNamespace; + return this; + } + + public Builder withNamespaces(Namespace[] namespaces) { + this.namespaces = namespaces; + return this; + } + + + + public DeploymentConfig build() { + + final Optional defaultNamespace = Arrays.stream(namespaces) + .filter(namespace -> namespace.getName().equals(l_defaultNamespace)) + .findFirst(); + + final Map namespaceMap = Arrays.stream(namespaces) + .map(namespace -> { + namespace.toMap(); + namespace.setDefaultNamespace(defaultNamespace.orElse(null)); + return namespace; + }) + .collect(Collectors.toMap(Namespace::getName, Function.identity())); + + return new DeploymentConfig(l_clusterDomain, namespaceMap); + + } + + } + +} diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Namespace.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Namespace.java similarity index 91% rename from meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Namespace.java rename to opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Namespace.java index 5a1b3495..702b5c1b 100644 --- a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Namespace.java +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/Namespace.java @@ -17,10 +17,8 @@ package net.opentsdb.aura.metrics.meta.endpoints.impl; import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; + +import java.util.*; public class Namespace { @@ -46,11 +44,11 @@ public void setName(String name) { } public void setDefaultNamespace(Namespace defaultNamespace) { - if (Objects.nonNull(this.k8sNamespace)) { - this.k8sNamespace = defaultNamespace.getk8sNamespace(); - } - if (Objects.nonNull(defaultNamespace)) { + if (Objects.nonNull(defaultNamespace)) { + if (Objects.isNull(this.k8sNamespace)) { + this.k8sNamespace = defaultNamespace.getk8sNamespace(); + } final Map defaultComponentMap = defaultNamespace.componentMap; Component defaultComponent = null; for (String key : defaultComponentMap.keySet()) { @@ -80,6 +78,10 @@ public void setComponents(List components){ this.components = components; } + public Component getComponent(String name) { + return componentMap.get(name); + } + public Map toMap() { if (components != null) { diff --git a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystEndpoint.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/SimpleEndPoint.java similarity index 65% rename from meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystEndpoint.java rename to opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/SimpleEndPoint.java index b7584999..9a018a94 100644 --- a/meta-grpc-client/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystEndpoint.java +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/impl/SimpleEndPoint.java @@ -21,14 +21,16 @@ import java.util.Objects; -public class MystEndpoint implements ShardEndPoint { +public class SimpleEndPoint implements ShardEndPoint { private final String host; private final int port; + private final int shardIndex; - public MystEndpoint(String host, int port) { + public SimpleEndPoint(String host, int port, int shardIndex) { this.host = host; this.port = port; + this.shardIndex = shardIndex; } @@ -42,27 +44,32 @@ public int getPort() { return this.port; } + @Override + public int getShardIndex() { + return this.shardIndex; + } + @Override public Protocol getProtocol() { return Protocol.http2_0; } @Override - public boolean equals(Object that){ - if (Objects.isNull(that)){ + public boolean equals(Object obj){ + if (Objects.isNull(obj)){ return false; } - if (this == that){ + if (this == obj){ return true; - }else if (!(that instanceof ShardEndPoint)) { + }else if (!(obj instanceof ShardEndPoint)) { return false; }else{ - if (Objects.equals(this.getHost(), ((ShardEndPoint) that).getHost()) && - Objects.equals(this.getPort(), ((ShardEndPoint) that).getPort()) - && Objects.equals(this.getProtocol(), ((ShardEndPoint) that).getProtocol()) && - ((ShardEndPoint) that).mtls() == false && this.mtls() == false ){ - + ShardEndPoint that = (ShardEndPoint) obj; + if (Objects.equals(this.getHost(), that.getHost()) && + Objects.equals(this.getPort(), that.getPort()) + && Objects.equals(this.getProtocol(), that.getProtocol()) && + Objects.equals(this.mtls(), that.mtls())){ return true; }else{ return false; @@ -79,6 +86,12 @@ public boolean mtls() { public static class Builder { private String l_host; private int l_port; + private int l_shardIndex; + + public Builder withShardIndex(int shardIndex) { + this.l_shardIndex = shardIndex; + return this; + } public Builder withHost(String host) { this.l_host = host; @@ -90,8 +103,8 @@ public Builder withPort(int port) { return this; } - public MystEndpoint build() { - return new MystEndpoint(l_host, l_port); + public SimpleEndPoint build() { + return new SimpleEndPoint(l_host, l_port, l_shardIndex); } public static Builder newBuilder() { diff --git a/meta-grpc-client/src/test/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistryTest.java b/opentsdb/src/test/java/net/opentsdb/aura/metrics/meta/endpoints/MystStatefulSetRegistryTest.java similarity index 83% rename from meta-grpc-client/src/test/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistryTest.java rename to opentsdb/src/test/java/net/opentsdb/aura/metrics/meta/endpoints/MystStatefulSetRegistryTest.java index bdb0be66..1358f816 100644 --- a/meta-grpc-client/src/test/java/net/opentsdb/aura/metrics/meta/endpoints/impl/MystStatefulSetRegistryTest.java +++ b/opentsdb/src/test/java/net/opentsdb/aura/metrics/meta/endpoints/MystStatefulSetRegistryTest.java @@ -14,13 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package net.opentsdb.aura.metrics.meta.endpoints.impl; +package net.opentsdb.aura.metrics.meta.endpoints; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import net.opentsdb.aura.metrics.meta.endpoints.ShardEndPoint; -import org.junit.jupiter.api.Test; +import net.opentsdb.aura.metrics.meta.endpoints.impl.Component; +import net.opentsdb.aura.metrics.meta.endpoints.impl.DeploymentConfig; +import net.opentsdb.aura.metrics.meta.endpoints.impl.Namespace; +import net.opentsdb.aura.metrics.meta.endpoints.impl.SimpleEndPoint; +import net.opentsdb.core.DefaultRegistry; +import net.opentsdb.core.MockTSDB; +import net.opentsdb.core.TSDB; +import org.testng.Assert; +import org.testng.annotations.Test; import java.io.File; import java.io.IOException; @@ -29,23 +37,35 @@ import java.util.List; import java.util.Map; -import static org.junit.jupiter.api.Assertions.assertEquals; - public class MystStatefulSetRegistryTest { - private static final String pod_name_pattern = "%s-myst-%s-%s.%s.svc.%s"; + private static final String pod_name_pattern = "%s-myst-%s-%s.%s-myst-%s.svc.%s"; private static final String[] replicas = {"a", "b", "c", "d", "e"}; private static final String cluster_domain = "cluster.local"; + private MockTSDB tsdb; + @Test public void testListOfEndpoints1() throws IOException { ObjectMapper mapper1 = new ObjectMapper(new YAMLFactory()); + try { - DeploymentConfig cluster_config1 = mapper1.readValue(new File("src/test/resources/ConfigTest1.yaml"), DeploymentConfig.class); - Map namespaceMap = cluster_config1.toMap(); - MystStatefulSetRegistry myststs1 = new MystStatefulSetRegistry(cluster_config1); + final JsonNode jsonNode = mapper1.readTree(new File("src/test/resources/ConfigTest1.yaml")); + final String s = jsonNode.get("statefulset.namespaces").textValue(); + MockTSDB tsdb = new MockTSDB(); + tsdb.registry = new DefaultRegistry(tsdb); + tsdb.registry.initialize(true); + tsdb.getConfig().register(BaseStatefulSetRegistry.DOMAIN, jsonNode.get("statefulset.domain").asText(), false, "UT" ); + tsdb.getConfig().register(BaseStatefulSetRegistry.DEFAULT_NAMESPACE, jsonNode.get("statefulset.default.namespace").asText(), false, "UT" ); + tsdb.getConfig().register(BaseStatefulSetRegistry.DEPLOYMENT_CONFIG, jsonNode.get("statefulset.namespaces").asText(), false, "UT" ); + + + MystStatefulSetRegistry mystStatefulSetRegistry = new MystStatefulSetRegistry(); + + mystStatefulSetRegistry.initialize(tsdb, "id"); + HashMap> expectedEndpointsMap = new HashMap<>(); List componentList1 = new ArrayList<>(); List componentList2 = new ArrayList<>(); @@ -75,27 +95,27 @@ public void testListOfEndpoints1() throws IOException { n2.setName("ssp"); n2.setk8sNamespace("ssp"); n2.setComponents(componentList2); - List namespaceList1 = new ArrayList<>(); - namespaceList1.add(n1); - namespaceList1.add(n2); - DeploymentConfig expectedConfig = new DeploymentConfig(); - expectedConfig.setClusterName("kubernetes.default.GQ"); - expectedConfig.setClusterDomain("cluster.local"); - expectedConfig.setListOfNamespaces(namespaceList1); + + + Map namespaceMap = new HashMap<>(); + namespaceMap.put(n1.getName(), n1); + namespaceMap.put(n2.getName(), n2); + DeploymentConfig expectedConfig = new DeploymentConfig("cluster.local", namespaceMap); System.out.println("Test Case 1: "); for (String key : namespaceMap.keySet()) { - Map> result = myststs1.getEndpoints(key); - final int numShards = expectedConfig.toMap().get(key).toMap().get("myst").getNumShards();; - final int numReplicas = expectedConfig.toMap().get(key).toMap().get("myst").getNumReplicas(); + Map> result = mystStatefulSetRegistry.getEndpoints(key); + final Component myst = expectedConfig.getNamespace(key).getComponent("myst"); + final int numShards = myst.getNumShards();; + final int numReplicas = myst.getNumReplicas(); String namespace = key; for (int i = 0; i < numReplicas; i++) { String replica = replicas[i]; List expectedShardEndPoints = new ArrayList<>(); expectedEndpointsMap.put(replica, expectedShardEndPoints); for (int j = 0; j < numShards; j++) { - MystEndpoint endpoint = MystEndpoint.Builder.newBuilder() + SimpleEndPoint endpoint = SimpleEndPoint.Builder.newBuilder() .withHost( String.format( pod_name_pattern, @@ -103,6 +123,7 @@ public void testListOfEndpoints1() throws IOException { replica, j, namespace, + replica, cluster_domain)) .withPort(9999) .build(); @@ -111,9 +132,9 @@ public void testListOfEndpoints1() throws IOException { } } - System.out.println("Result"+myststs1.getEndpoints(key)); + System.out.println("Result"+mystStatefulSetRegistry.getEndpoints(key)); System.out.println("Expected"+expectedEndpointsMap); - assertEquals(expectedEndpointsMap,result); + Assert.assertEquals(expectedEndpointsMap, result); } @@ -121,10 +142,9 @@ public void testListOfEndpoints1() throws IOException { System.out.println(e); } - } - + /** @Test public void testListOfEndpoints2() throws IOException { @@ -185,7 +205,7 @@ public void testListOfEndpoints2() throws IOException { List expectedShardEndPoints = new ArrayList<>(); expectedEndpointsMap.put(replica, expectedShardEndPoints); for (int j = 0; j < numShards; j++) { - MystEndpoint endpoint = MystEndpoint.Builder.newBuilder() + SimpleEndPoint endpoint = SimpleEndPoint.Builder.newBuilder() .withHost( String.format( pod_name_pattern, @@ -203,7 +223,7 @@ public void testListOfEndpoints2() throws IOException { System.out.println("Result"+myststs1.getEndpoints(key)); System.out.println("Expected"+expectedEndpointsMap); - assertEquals(expectedEndpointsMap, result); + Assert.assertEquals(expectedEndpointsMap, result); } @@ -269,7 +289,7 @@ public void testListOfEndpoints3() throws IOException { List expectedShardEndPoints = new ArrayList<>(); expectedEndpointsMap.put(replica, expectedShardEndPoints); for (int j = 0; j < numShards; j++) { - final MystEndpoint endpoint = MystEndpoint.Builder.newBuilder() + final SimpleEndPoint endpoint = SimpleEndPoint.Builder.newBuilder() .withHost( String.format( pod_name_pattern, @@ -287,7 +307,7 @@ public void testListOfEndpoints3() throws IOException { System.out.println("Result"+myststs1.getEndpoints(key)); System.out.println("Expected"+expectedEndpointsMap); - assertEquals(expectedEndpointsMap.toString(), result.toString()); + Assert.assertEquals(expectedEndpointsMap.toString(), result.toString()); } @@ -353,7 +373,7 @@ public void testListOfEndpoints4() throws IOException { List expectedShardEndPoints = new ArrayList<>(); expectedEndpointsMap.put(replica, expectedShardEndPoints); for (int j = 0; j < numShards; j++) { - final MystEndpoint endpoint = MystEndpoint.Builder.newBuilder() + final SimpleEndPoint endpoint = SimpleEndPoint.Builder.newBuilder() .withHost( String.format( pod_name_pattern, @@ -372,7 +392,7 @@ public void testListOfEndpoints4() throws IOException { System.out.println("Result"+myststs1.getEndpoints(key)); System.out.println("Expected"+expectedEndpointsMap); - assertEquals(expectedEndpointsMap, result); + Assert.assertEquals(expectedEndpointsMap, result); } @@ -380,5 +400,5 @@ public void testListOfEndpoints4() throws IOException { System.out.println(e); } - } + } */ } diff --git a/meta-grpc-client/src/test/resources/ConfigTest1.yaml b/opentsdb/src/test/resources/ConfigTest1.yaml similarity index 66% rename from meta-grpc-client/src/test/resources/ConfigTest1.yaml rename to opentsdb/src/test/resources/ConfigTest1.yaml index 505a09e4..62311d1f 100644 --- a/meta-grpc-client/src/test/resources/ConfigTest1.yaml +++ b/opentsdb/src/test/resources/ConfigTest1.yaml @@ -1,7 +1,8 @@ -kubernetes.statefulset.registry.cluster: kubernetes.default.GQ -kubernetes.statefulset.registry.cluster.domain: cluster.local -kubernetes.statefulset.registry.namespaces: - - namespace: default +statefulset.cluster: kubernetes.default.GQ +statefulset.default.namespace: ns.default +statefulset.cluster.domain: cluster.local +statefulset.namespaces: + - namespace: ns.default k8s_namespace: default components: - component: aura-metrics diff --git a/meta-grpc-client/src/test/resources/ConfigTest2.yaml b/opentsdb/src/test/resources/ConfigTest2.yaml similarity index 65% rename from meta-grpc-client/src/test/resources/ConfigTest2.yaml rename to opentsdb/src/test/resources/ConfigTest2.yaml index b67f1fad..5c49995c 100644 --- a/meta-grpc-client/src/test/resources/ConfigTest2.yaml +++ b/opentsdb/src/test/resources/ConfigTest2.yaml @@ -1,7 +1,8 @@ -kubernetes.statefulset.registry.cluster: kubernetes.default.GQ -kubernetes.statefulset.registry.cluster.domain: cluster.local -kubernetes.statefulset.registry.namespaces: - - namespace: default +statefulset.cluster: kubernetes.default.GQ +statefulset.default.namespace: ns.default +statefulset.cluster.domain: cluster.local +statefulset.namespaces: + - namespace: ns.default k8s_namespace: default components: - component: aura-metrics diff --git a/meta-grpc-client/src/test/resources/ConfigTest3.yaml b/opentsdb/src/test/resources/ConfigTest3.yaml similarity index 61% rename from meta-grpc-client/src/test/resources/ConfigTest3.yaml rename to opentsdb/src/test/resources/ConfigTest3.yaml index 8be6d0b3..35967cec 100644 --- a/meta-grpc-client/src/test/resources/ConfigTest3.yaml +++ b/opentsdb/src/test/resources/ConfigTest3.yaml @@ -1,7 +1,8 @@ -kubernetes.statefulset.registry.cluster: kubernetes.default.GQ -kubernetes.statefulset.registry.cluster.domain: cluster.local -kubernetes.statefulset.registry.namespaces: - - namespace: default +statefulset.cluster: kubernetes.default.GQ +statefulset.default.namespace: ns.default +statefulset.cluster.domain: cluster.local +statefulset.namespaces: + - namespace: ns.default k8s_namespace: default components: - component: aura-metrics diff --git a/meta-grpc-client/src/test/resources/ConfigTest4.yaml b/opentsdb/src/test/resources/ConfigTest4.yaml similarity index 63% rename from meta-grpc-client/src/test/resources/ConfigTest4.yaml rename to opentsdb/src/test/resources/ConfigTest4.yaml index e33d8e03..79c66a6f 100644 --- a/meta-grpc-client/src/test/resources/ConfigTest4.yaml +++ b/opentsdb/src/test/resources/ConfigTest4.yaml @@ -1,7 +1,8 @@ -kubernetes.statefulset.registry.cluster: kubernetes.default.GQ -kubernetes.statefulset.registry.cluster.domain: cluster.local -kubernetes.statefulset.registry.namespaces: - - namespace: default +statefulset.cluster: kubernetes.default.GQ +statefulset.default.namespace: ns.default +statefulset.cluster.domain: cluster.local +statefulset.namespaces: + - namespace: ns.default k8s_namespace: default components: - component: aura-metrics From a1a81a50ea802b53706a2579e2a37d061782ea0a Mon Sep 17 00:00:00 2001 From: Chris Larsen Date: Fri, 15 Oct 2021 15:21:23 -0700 Subject: [PATCH 05/11] Fix some bugs in the ephemeral node. Have the AuraMetricsHttpFactory use the HttpV3SourceFactory directly. Add a static host config to the AuraMetricsHttpFactory --- .../execution/AuraMetricsHttpFactory.java | 139 ++++++++++-------- .../aura/execution/EphemeralAuraFactory.java | 104 +++++++++---- .../aura/execution/MockDiscoveryService.java | 3 +- ....opentsdb.data.TimeSeriesDataSourceFactory | 2 + 4 files changed, 155 insertions(+), 93 deletions(-) diff --git a/opentsdb/src/main/java/net/opentsdb/aura/execution/AuraMetricsHttpFactory.java b/opentsdb/src/main/java/net/opentsdb/aura/execution/AuraMetricsHttpFactory.java index 09997a27..7523c3ef 100644 --- a/opentsdb/src/main/java/net/opentsdb/aura/execution/AuraMetricsHttpFactory.java +++ b/opentsdb/src/main/java/net/opentsdb/aura/execution/AuraMetricsHttpFactory.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.Maps; +import com.stumbleupon.async.Callback; import com.stumbleupon.async.Deferred; import io.netty.util.Timeout; import io.netty.util.TimerTask; @@ -35,6 +36,7 @@ import net.opentsdb.query.QueryNodeConfig; import net.opentsdb.query.QueryPipelineContext; import net.opentsdb.query.TimeSeriesDataSourceConfig; +import net.opentsdb.query.execution.HttpQueryV3Factory; import net.opentsdb.query.execution.HttpQueryV3Source; import net.opentsdb.rollup.DefaultRollupConfig; import net.opentsdb.rollup.DefaultRollupInterval; @@ -60,7 +62,7 @@ * */ public class AuraMetricsHttpFactory - extends AuraHttpFactory + extends HttpQueryV3Factory implements TimerTask { private static final Logger LOG = LoggerFactory.getLogger(AuraMetricsHttpFactory.class); @@ -75,6 +77,7 @@ public class AuraMetricsHttpFactory public static final String RETENTION_KEY = "retention"; public static final String USE_UPTIME_KEY = "use_uptime"; public static final String HOST_MODE_KEY = "host.mode"; + public static final String STATIC_HOST_KEY = "static.host"; public static final String FALLBACK_KEY = "fallback"; public static final String BYPASS_METRIC = "aurametrics.query.bypassed"; @@ -91,8 +94,6 @@ public static enum HostMode { ROUND_ROBIN, } - private TSDB tsdb; - // TODO - singleton config. Waste of space having dupes. protected volatile AuraMetricsClusterConfig config; @@ -146,6 +147,11 @@ public boolean supportsQuery(final QueryPipelineContext context, return true; } + final String staticHost = tsdb.getConfig().getString(getConfigKey(STATIC_HOST_KEY)); + if (!Strings.isNullOrEmpty(staticHost)) { + return true; + } + final Host host = getHost(context, config); if (host == null) { return false; @@ -170,61 +176,65 @@ public RollupConfig rollupConfig() { @Override public Deferred initialize(final TSDB tsdb, final String id) { - this.id = Strings.isNullOrEmpty(id) ? TYPE : id; - this.tsdb = tsdb; - registerConfigs(tsdb); - - final String client_id = tsdb.getConfig().getString(getConfigKey(CLIENT_KEY)); - final SharedHttpClient shared_client = - tsdb.getRegistry().getPlugin(SharedHttpClient.class, client_id); - if (shared_client == null) { - throw new IllegalArgumentException( - "No shared HTTP client found " - + "for ID: " - + (Strings.isNullOrEmpty(client_id) ? "Default" : client_id)); - } + return super.initialize(tsdb, id).addCallback( + new Callback() { + @Override + public Object call(final Object ignored) throws Exception { + registerConfigsLocal(tsdb); + + final String client_id = tsdb.getConfig().getString(getConfigKey(CLIENT_KEY)); + final SharedHttpClient shared_client = + tsdb.getRegistry().getPlugin(SharedHttpClient.class, client_id); + if (shared_client == null) { + throw new IllegalArgumentException( + "No shared HTTP client found " + + "for ID: " + + (Strings.isNullOrEmpty(client_id) ? "Default" : client_id)); + } - // TODO - TEMP!! FIX ME!! USE CONFIG!!! - rollup_config = - DefaultRollupConfig.newBuilder() - .addAggregationId("sum", 0) - .addAggregationId("count", 1) - .addAggregationId("min", 2) - .addAggregationId("max", 3) - .addAggregationId("avg", 5) - .addAggregationId("first", 6) - .addAggregationId("last", 7) - .addInterval( - DefaultRollupInterval.builder() - .setInterval("1h") - .setRowSpan("1d") - .setTable("ignored") - .setPreAggregationTable("ignored")) - .build(); - - client = shared_client.getClient(); - - endpoint = tsdb.getConfig().getString(getConfigKey(ENDPOINT_KEY)); - host_mode = HostMode.valueOf(tsdb.getConfig().getString(getConfigKey(HOST_MODE_KEY))); - fallback = tsdb.getConfig().getBoolean(getConfigKey(FALLBACK_KEY)); - - // health checker singleton - String health_checker_id = "AuraMetricsHealthCheck"; - Object shared_health_checker = tsdb.getRegistry().getSharedObject(health_checker_id); - if (shared_health_checker != null) { - // all set! - health_checker = (AuraMetricsHealthChecker) shared_health_checker; - return Deferred.fromResult(null); - } + // TODO - TEMP!! FIX ME!! USE CONFIG!!! + rollup_config = + DefaultRollupConfig.newBuilder() + .addAggregationId("sum", 0) + .addAggregationId("count", 1) + .addAggregationId("min", 2) + .addAggregationId("max", 3) + .addAggregationId("avg", 5) + .addAggregationId("first", 6) + .addAggregationId("last", 7) + .addInterval( + DefaultRollupInterval.builder() + .setInterval("1m") + .setRowSpan("1h") + .setTable("ignored") + .setPreAggregationTable("ignored")) + .build(); + + client = shared_client.getClient(); + + endpoint = tsdb.getConfig().getString(getConfigKey(ENDPOINT_KEY)); + host_mode = HostMode.valueOf(tsdb.getConfig().getString(getConfigKey(HOST_MODE_KEY))); + fallback = tsdb.getConfig().getBoolean(getConfigKey(FALLBACK_KEY)); + + // health checker singleton + String health_checker_id = "AuraMetricsHealthCheck"; + Object shared_health_checker = tsdb.getRegistry().getSharedObject(health_checker_id); + if (shared_health_checker != null) { + // all set! + health_checker = (AuraMetricsHealthChecker) shared_health_checker; + return Deferred.fromResult(null); + } - health_checker = new AuraMetricsHealthChecker(tsdb, config); - if (tsdb.getRegistry().registerSharedObject(health_checker_id, health_checker) != null) { - // lost the race somehow. (e.g. if we load plugins in parallel) - health_checker.shutdown(); - return Deferred.fromResult(null); - } - owns_healthchecker = true; - return Deferred.fromResult(null); + health_checker = new AuraMetricsHealthChecker(tsdb, config); + if (tsdb.getRegistry().registerSharedObject(health_checker_id, health_checker) != null) { + // lost the race somehow. (e.g. if we load plugins in parallel) + health_checker.shutdown(); + return Deferred.fromResult(null); + } + owns_healthchecker = true; + return null; + } + }); } @Override @@ -245,7 +255,9 @@ public void run(final Timeout timeout) { LOG.warn("Removing an old possible host entry for: " + entry.getValue().getValue()); } } - LOG.warn("Removed " + removed + " entries from the posible hosts map."); + if (removed > 0) { + LOG.warn("Removed " + removed + " entries from the posible hosts map."); + } } catch (Exception e) { LOG.error("Failed to run the cleanup??", e); } @@ -297,8 +309,7 @@ public void update(final String key, final Object value) { * * @param tsdb A non-null TSDB. */ - protected void registerConfigs(final TSDB tsdb) { - super.registerConfigs(tsdb); + protected void registerConfigsLocal(final TSDB tsdb) { if (!tsdb.getConfig().hasProperty(KEY_PREFIX + ROUTER_KEY)) { tsdb.getConfig() .register( @@ -369,6 +380,11 @@ protected void registerConfigs(final TSDB tsdb) { + "two were present but out of rotation. If false, we throw an" + "exception."); } + if (!tsdb.getConfig().hasProperty(getConfigKey(STATIC_HOST_KEY))) { + tsdb.getConfig().register(getConfigKey(STATIC_HOST_KEY), null, true, + "An optional static host to query. Ignores the health " + + "checks and configs but honors overrides."); + } } Host getHost(final QueryPipelineContext context, @@ -386,6 +402,11 @@ Host getHost(final QueryPipelineContext context, } return new Host(host, host, host, null); } + + final String staticHost = tsdb.getConfig().getString(getConfigKey(STATIC_HOST_KEY)); + if (!Strings.isNullOrEmpty(staticHost)) { + return new Host(staticHost, staticHost, staticHost, null); + } TimeStamp timestamp = null; // check for time shifts diff --git a/opentsdb/src/main/java/net/opentsdb/aura/execution/EphemeralAuraFactory.java b/opentsdb/src/main/java/net/opentsdb/aura/execution/EphemeralAuraFactory.java index ab50a19a..e59cbd7d 100644 --- a/opentsdb/src/main/java/net/opentsdb/aura/execution/EphemeralAuraFactory.java +++ b/opentsdb/src/main/java/net/opentsdb/aura/execution/EphemeralAuraFactory.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.reflect.TypeToken; import com.stumbleupon.async.Deferred; import net.opentsdb.aura.execution.MockDiscoveryService.ShardEndPoint; @@ -37,7 +38,6 @@ import net.opentsdb.query.DefaultQueryResultId; import net.opentsdb.query.DefaultTimeSeriesDataSourceConfig; import net.opentsdb.query.QueryFillPolicy.FillWithRealPolicy; -import net.opentsdb.query.QueryNode; import net.opentsdb.query.QueryNodeConfig; import net.opentsdb.query.QueryPipelineContext; import net.opentsdb.query.TimeSeriesDataSourceConfig; @@ -48,6 +48,7 @@ import net.opentsdb.query.pojo.FillPolicy; import net.opentsdb.query.processor.groupby.GroupByConfig; import net.opentsdb.query.processor.merge.MergerConfig; +import net.opentsdb.query.router.RoutingUtils; import net.opentsdb.rollup.RollupConfig; import net.opentsdb.stats.Span; import net.opentsdb.utils.DateTime; @@ -56,6 +57,7 @@ import java.util.List; import java.util.Map; +import java.util.Set; /** * Use the TimeRouter like this @@ -88,7 +90,6 @@ public class EphemeralAuraFactory protected AuraMetricsHttpFactory factory; protected MockDiscoveryService discoveryService; - protected String serviceKey; protected long relativeStart; protected long relativeEnd; @@ -106,10 +107,6 @@ public Deferred initialize(final TSDB tsdb, final String id) { tsdb.getConfig().register(getConfigKey(DISCOVERY_ID), null, false, "A non-null K8s discovery service plugin ID."); } - if (!tsdb.getConfig().hasProperty(getConfigKey(SERVICE_KEY))) { - tsdb.getConfig().register(getConfigKey(SERVICE_KEY), "AuraMetrics", false, - "The service key in the discoverability config for metrics."); - } if (!tsdb.getConfig().hasProperty(getConfigKey(START))) { tsdb.getConfig().register(getConfigKey(START), null, false, "The relative start time for data to be present in these " + @@ -141,28 +138,29 @@ public Deferred initialize(final TSDB tsdb, final String id) { "The " + getConfigKey(SOURCE_ID) + " must be set and valid.")); } - factory = tsdb.getRegistry().getPlugin(AuraMetricsHttpFactory.class, sourceId); - if (factory == null) { + TimeSeriesDataSourceFactory tsds = tsdb.getRegistry().getPlugin( + TimeSeriesDataSourceFactory.class, sourceId); + if (tsds == null) { LOG.error("No AuraMetricsHttpFactory found for source ID {}", sourceId); return Deferred.fromError(new IllegalArgumentException( "No AuraMetricsHttpFactory found for source ID " + sourceId)); } + factory = (AuraMetricsHttpFactory) tsds; // could be null String discoveryId = tsdb.getConfig().getString(getConfigKey(DISCOVERY_ID)); discoveryService = tsdb.getRegistry().getPlugin(MockDiscoveryService.class, discoveryId); + if (discoveryService == null) { - LOG.error("No MockDiscoveryService found for source ID {}", + LOG.error("No DiscoveryService found for source ID {}", discoveryId == null ? "default" : discoveryId); return Deferred.fromError(new IllegalArgumentException( - "No MockDiscoveryService found for source ID " + + "No DiscoveryService found for source ID " + (discoveryId == null ? "default" : discoveryId))); } - serviceKey = tsdb.getConfig().getString(getConfigKey(SERVICE_KEY)); - LOG.info("Successfully initialized Ephemeral Aura Source Factory with ID {} and service key {}", - (id == null ? "default" : id), - serviceKey); + LOG.info("Successfully initialized Ephemeral Aura Source Factory with ID {}", + (id == null ? "default" : id)); return Deferred.fromResult(null); } @@ -175,13 +173,30 @@ public void setupGraph(final QueryPipelineContext context, final long now = DateTime.currentTimeMillis() / 1000; final Map> services = discoveryService.getEndpoints( - namespace, now - relativeStart, now - relativeEnd); - final List shards = services.get(serviceKey); + namespace, now - relativeStart); + final List shards = services.values().iterator().next(); if (shards == null || shards.isEmpty()) { throw new IllegalStateException("Unable to find shards for namespace " - + namespace + " and service " + serviceKey); + + namespace); } - List sources = Lists.newArrayListWithExpectedSize(shards.size()); + + // TODO - note that for now averaging we'll average averages until we can tell + // the sources to return sum and count. + // TODO - we also need smarts to handle OTHER agg types like ptiles, etc. + String aggregator = groupBy(config, planner); + if (aggregator != null && aggregator.equalsIgnoreCase("count")) { + aggregator = "sum"; + } + + final List factories = + Lists.newArrayListWithExpectedSize(shards.size()); + for (int i = 0; i < shards.size(); i++) { + factories.add(factory); + } + final List> pushdowns = + RoutingUtils.getPotentialPushDowns(config, factories, planner); + List sources = + Lists.newArrayListWithExpectedSize(shards.size()); List ids = Lists.newArrayListWithExpectedSize(shards.size()); List timeouts = Lists.newArrayListWithExpectedSize(shards.size()); for (int i = 0; i < shards.size(); i++) { @@ -190,16 +205,21 @@ public void setupGraph(final QueryPipelineContext context, sources.add(builder); final String newId = config.getId() + "_shard_" + i; ids.add(newId); - builder.setSources(Lists.newArrayList(factory.id() + - ":https://" + shard.getHost() + ":" + shard.getPort())) + builder.setSourceId(factory.id() + + ":http://" + shard.getHost() + ":" + shard.getPort()) + .setDataSource(newId) .setId(newId) .setResultIds(Lists.newArrayList(new DefaultQueryResultId( newId, newId))); + List pds = pushdowns.get(i); + if (pds != null && !pds.isEmpty()) { + builder.setPushDownNodes(pds); + } timeouts.add("30s"); // TODO - config } MergerConfig merger = (MergerConfig) MergerConfig.newBuilder() - .setAggregator(groupBy(config)) + .setAggregator(aggregator) .setMode(MergerConfig.MergeMode.SHARD) .setTimeouts(timeouts) .setSortedDataSources(ids) @@ -212,9 +232,8 @@ public void setupGraph(final QueryPipelineContext context, .setId(config.getId()) .build(); planner.replace(config, merger); - for (final TimeSeriesDataSourceConfig.Builder builder : sources) { - planner.addEdge(merger, builder.build()); - } + + RoutingUtils.rebuildGraph(context, merger, false, pushdowns, sources, planner); } @Override @@ -233,7 +252,7 @@ public boolean supportsQuery(final QueryPipelineContext context, final String namespace = config.getMetric().getMetric() .substring(0, config.getMetric().getMetric().indexOf(".")); Map> services = discoveryService.getEndpoints(namespace); - List shards = services.get(serviceKey); + List shards = services.values().iterator().next(); if (shards == null || shards.isEmpty()) { return false; } @@ -285,16 +304,37 @@ public String type() { return TYPE; } - String groupBy(TimeSeriesDataSourceConfig config) { - List pushdowns = config.getPushDownNodes(); - if (pushdowns == null) { + String groupBy(QueryNodeConfig config, QueryPlanner planner) { + if (config instanceof TimeSeriesDataSourceConfig) { + List pds = ((TimeSeriesDataSourceConfig) config).getPushDownNodes(); + if (pds != null) { + for (int i = pds.size() - 1; i >= 0; i--) { + QueryNodeConfig pd = pds.get(i); + if (pd instanceof GroupByConfig) { + return ((GroupByConfig) pd).getAggregator(); + } + } + } + + } + + final Set predecessors = planner.configGraph() + .predecessors(config); + if (predecessors.isEmpty()) { return null; } - for (int i = 0; i < pushdowns.size(); i++) { - QueryNodeConfig nodeConfig = pushdowns.get(i); - if (nodeConfig instanceof GroupByConfig) { - return ((GroupByConfig) nodeConfig).getAggregator(); + // breadth first + for (final QueryNodeConfig predecessor : predecessors) { + if (predecessor instanceof GroupByConfig) { + return ((GroupByConfig) predecessor).getAggregator(); + } + } + + for (final QueryNodeConfig predecessor : predecessors) { + String gb = groupBy(predecessor, planner); + if (!Strings.isNullOrEmpty(gb)) { + return gb; } } return null; diff --git a/opentsdb/src/main/java/net/opentsdb/aura/execution/MockDiscoveryService.java b/opentsdb/src/main/java/net/opentsdb/aura/execution/MockDiscoveryService.java index f97d938f..b9e04a5c 100644 --- a/opentsdb/src/main/java/net/opentsdb/aura/execution/MockDiscoveryService.java +++ b/opentsdb/src/main/java/net/opentsdb/aura/execution/MockDiscoveryService.java @@ -39,9 +39,8 @@ interface ShardEndPoint { * * @param namespace * @param start Start of the query in unix epoch seconds - * @param end End of the query in unix epoch seconds * @return A map of service name (Myst, Aura, etc.) to endpoints. */ - Map> getEndpoints(String namespace, long start, long end); + Map> getEndpoints(String namespace, long start); } diff --git a/opentsdb/src/main/resources/META-INF/services/net.opentsdb.data.TimeSeriesDataSourceFactory b/opentsdb/src/main/resources/META-INF/services/net.opentsdb.data.TimeSeriesDataSourceFactory index 25e9b66e..10f97cb1 100644 --- a/opentsdb/src/main/resources/META-INF/services/net.opentsdb.data.TimeSeriesDataSourceFactory +++ b/opentsdb/src/main/resources/META-INF/services/net.opentsdb.data.TimeSeriesDataSourceFactory @@ -1,2 +1,4 @@ net.opentsdb.aura.metrics.storage.AuraMetricsSourceFactory +net.opentsdb.aura.execution.AuraMetricsHttpFactory net.opentsdb.aura.metrics.storage.AerospikeBatchSourceFactory +net.opentsdb.aura.execution.EphemeralAuraFactory From 894e4a4b102f9d66d7d4eca39bd2396ef38a0b8f Mon Sep 17 00:00:00 2001 From: Ravi kiran Chiruvolu Date: Mon, 18 Oct 2021 13:58:20 -0700 Subject: [PATCH 06/11] sharded meta client --- .../DefaultMetaTimeSeriesQueryResult.java | 25 +++--- .../meta/MergedMetaTimeSeriesQueryResult.java | 16 ++-- .../meta/MetaResultWithDictionary.java | 20 +++++ .../meta/DefaultShardedMetaClient.java | 79 ++++++++++++++++--- .../aura/metrics/meta/MetaFetchException.java | 8 ++ .../aura/metrics/meta/ShardedMetaClient.java | 4 +- .../MystStatefulSetRegistryTest.java | 27 ++++--- 7 files changed, 136 insertions(+), 43 deletions(-) create mode 100644 core/src/main/java/net/opentsdb/aura/metrics/meta/MetaResultWithDictionary.java create mode 100644 opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/MetaFetchException.java diff --git a/core/src/main/java/net/opentsdb/aura/metrics/meta/DefaultMetaTimeSeriesQueryResult.java b/core/src/main/java/net/opentsdb/aura/metrics/meta/DefaultMetaTimeSeriesQueryResult.java index 70a80592..7d932f93 100644 --- a/core/src/main/java/net/opentsdb/aura/metrics/meta/DefaultMetaTimeSeriesQueryResult.java +++ b/core/src/main/java/net/opentsdb/aura/metrics/meta/DefaultMetaTimeSeriesQueryResult.java @@ -17,6 +17,7 @@ package net.opentsdb.aura.metrics.meta; +import gnu.trove.iterator.TLongIntIterator; import gnu.trove.map.TLongIntMap; import gnu.trove.map.hash.TLongIntHashMap; import net.opentsdb.utils.XXHash; @@ -28,7 +29,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; -public class DefaultMetaTimeSeriesQueryResult implements MetaTimeSeriesQueryResult { +public class DefaultMetaTimeSeriesQueryResult implements MetaResultWithDictionary { private static final Logger logger = LoggerFactory.getLogger(DefaultMetaTimeSeriesQueryResult.class); private static final int DEFAULT_ARRAY_SIZE = 32; @@ -107,6 +108,11 @@ public GroupResult getGroup(final int index) { return groupResults[index]; } + @Override + public Dictionary getDictionary() { + return dictionary; + } + public static class DefaultGroupResult implements GroupResult { private long[] tagHashes = new long[DEFAULT_ARRAY_SIZE]; public long[] hashes = new long[DEFAULT_ARRAY_SIZE]; @@ -208,14 +214,6 @@ public long next() { } } - public interface Dictionary { - void put(final long id, final byte[] value); - - byte[] get(final long id); - - int size(); - } - public static class DefaultDictionary implements Dictionary { private int nextIndex = 0; @@ -267,6 +265,15 @@ public int size() { return nextIndex; } + @Override + public void mergeInto(Dictionary dictionary) { + final TLongIntIterator iterator = indexMap.iterator(); + while (iterator.hasNext()) { + iterator.advance(); + dictionary.put(iterator.key(), values[iterator.value()]); + } + } + } } \ No newline at end of file diff --git a/core/src/main/java/net/opentsdb/aura/metrics/meta/MergedMetaTimeSeriesQueryResult.java b/core/src/main/java/net/opentsdb/aura/metrics/meta/MergedMetaTimeSeriesQueryResult.java index d8a8d4c4..b154ad78 100644 --- a/core/src/main/java/net/opentsdb/aura/metrics/meta/MergedMetaTimeSeriesQueryResult.java +++ b/core/src/main/java/net/opentsdb/aura/metrics/meta/MergedMetaTimeSeriesQueryResult.java @@ -23,9 +23,11 @@ import java.util.Arrays; -public class MergedMetaTimeSeriesQueryResult implements MetaTimeSeriesQueryResult { +public class MergedMetaTimeSeriesQueryResult implements MetaResultWithDictionary { private static final Logger logger = LoggerFactory.getLogger(MergedMetaTimeSeriesQueryResult.class); + // TODO: Converting this into a map will make look ups faster. + // TODO: Atleast we can maintain another array with the hashes to find the index faster. private DefaultMetaTimeSeriesQueryResult.DefaultGroupResult[] groupResults = new DefaultMetaTimeSeriesQueryResult.DefaultGroupResult[32]; private Throwable exception; @@ -34,7 +36,7 @@ public class MergedMetaTimeSeriesQueryResult implements MetaTimeSeriesQueryResul private int groupCount; private int hashCount; - public void add(DefaultMetaTimeSeriesQueryResult result) { + public void add(MetaResultWithDictionary result) { if (exception != null) { return; } @@ -84,11 +86,7 @@ public void add(DefaultMetaTimeSeriesQueryResult result) { } // Dictionary - TLongIntIterator iterator = result.dictionary.indexMap.iterator(); - while (iterator.hasNext()) { - iterator.advance(); - dictionary.put(iterator.key(), result.dictionary.values[iterator.value()]); - } + result.getDictionary().mergeInto(dictionary); } @Override @@ -125,4 +123,8 @@ public GroupResult getGroup(int index) { return groupResults[index]; } + @Override + public Dictionary getDictionary() { + return dictionary; + } } \ No newline at end of file diff --git a/core/src/main/java/net/opentsdb/aura/metrics/meta/MetaResultWithDictionary.java b/core/src/main/java/net/opentsdb/aura/metrics/meta/MetaResultWithDictionary.java new file mode 100644 index 00000000..0f218dcc --- /dev/null +++ b/core/src/main/java/net/opentsdb/aura/metrics/meta/MetaResultWithDictionary.java @@ -0,0 +1,20 @@ +package net.opentsdb.aura.metrics.meta; + +public interface MetaResultWithDictionary extends MetaTimeSeriesQueryResult { + + public interface Dictionary { + void put(final long id, final byte[] value); + + byte[] get(final long id); + + int size(); + + // Unorthodox way of doing things. + // Helps with not having to manage iterators. + // There may be a better way to do this. + void mergeInto(Dictionary dictionary); + + } + + Dictionary getDictionary(); +} diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/DefaultShardedMetaClient.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/DefaultShardedMetaClient.java index 1911bc6e..446c8e81 100644 --- a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/DefaultShardedMetaClient.java +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/DefaultShardedMetaClient.java @@ -9,7 +9,7 @@ import net.opentsdb.core.TSDB; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; public class DefaultShardedMetaClient extends BaseTSDBPlugin @@ -19,6 +19,11 @@ public class DefaultShardedMetaClient private volatile ShardedServiceRegistry registry; + private volatile ExecutorService service; + + public static final String META_SERVICE_REGISTRY = "meta.service.registry"; + public static final String THREAD_POOL_SIZE = "default.sharded.meta.client.pool.size"; + private Map clients = new ConcurrentHashMap<>(); private final String MUTEX = new String(); private Random random = new Random(); @@ -31,25 +36,64 @@ private List setUpEndpoints(String namespace) { return endPoints; } + /** + * Naive parallelization. + * Better semantics maybe achieved by using a blocking queue. + * TODO: Object re-use - by using object pools ? + * Has to be ThreadSafe + * @param namespace + * @param query + * @return + * @throws MetaFetchException + */ @Override - public Iterator getTimeseriesAllShards(String namespace, String query) { + public Iterator getTimeseriesAllShards(String namespace, String query) throws MetaFetchException { final List shardEndPoints = setUpEndpoints(namespace); List timeSeriesQueryResults = new ArrayList<>(); + final List> futures = new ArrayList<>(); for (ShardEndPoint endPoint : shardEndPoints) { - timeSeriesQueryResults.add(getResult(clients.get(endPoint), query)); + futures.add(getResult(clients.get(endPoint), query)); + } + + for(int i = 0 ; i < futures.size(); i++) { + try { + timeSeriesQueryResults.add(futures.get(i).get()); + } catch (InterruptedException e) { + throw new RuntimeException("Meta thread interrupted when fetching", e); + } catch (ExecutionException e) { + throw new MetaFetchException("Meta fetch failure for: " + i, e); + } } return timeSeriesQueryResults.iterator(); } - private MergedMetaTimeSeriesQueryResult getResult(MetaGrpcClient metaGrpcClient, String query) { - MergedMetaTimeSeriesQueryResult mergedMetaTimeSeriesQueryResult = new MergedMetaTimeSeriesQueryResult(); - final Iterator timeseries = metaGrpcClient.getTimeseries(query); - while (timeseries.hasNext()) { - mergedMetaTimeSeriesQueryResult.add(timeseries.next()); + private Future getResult(MetaGrpcClient metaGrpcClient, String query) { + return this.service.submit(new MetaCall(metaGrpcClient, query, new MergedMetaTimeSeriesQueryResult())); + } + + private static class MetaCall implements Callable { + + private final MetaGrpcClient metaGrpcClient; + private final String query; + private final MergedMetaTimeSeriesQueryResult mergedMetaTimeSeriesQueryResult; + + public MetaCall(MetaGrpcClient metaGrpcClient, String query, MergedMetaTimeSeriesQueryResult mergedMetaTimeSeriesQueryResult) { + + this.metaGrpcClient = metaGrpcClient; + this.query = query; + this.mergedMetaTimeSeriesQueryResult = mergedMetaTimeSeriesQueryResult; + } + + @Override + public MergedMetaTimeSeriesQueryResult call() throws Exception { + final Iterator timeseries = metaGrpcClient.getTimeseries(query); + while (timeseries.hasNext()) { + mergedMetaTimeSeriesQueryResult.add(timeseries.next()); + } + return mergedMetaTimeSeriesQueryResult; } - return mergedMetaTimeSeriesQueryResult; } private void createClients(List endPoints) { @@ -79,10 +123,16 @@ private List getEndpoints(Map> endpoi } @Override - public MergedMetaTimeSeriesQueryResult getTimeSeriesPerShard(String namespace, String query, int shardId) { + public MergedMetaTimeSeriesQueryResult getTimeSeriesPerShard(String namespace, String query, int shardId) throws MetaFetchException { final ShardEndPoint shardEndPoint = setUpEndpoints(namespace).get(shardId); - return getResult(clients.get(shardEndPoint), query); + try { + return getResult(clients.get(shardEndPoint), query).get(); + } catch (InterruptedException e) { + throw new RuntimeException("Meta thread interrupted when fetching", e); + } catch (ExecutionException e) { + throw new MetaFetchException("Meta fetch failure", e); + } } @Override @@ -95,8 +145,13 @@ public Deferred initialize(TSDB tsdb, String id) { final Configuration config = tsdb.getConfig(); - registry = config.getTyped("meta.service.registry", ShardedServiceRegistry.class); + registry = config.getTyped(META_SERVICE_REGISTRY, ShardedServiceRegistry.class); + + config.register(THREAD_POOL_SIZE, 10, false, + "Thread pool size for sharded meta client"); + final int anInt = config.getInt(THREAD_POOL_SIZE); + service = Executors.newWorkStealingPool(anInt); return null; } } diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/MetaFetchException.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/MetaFetchException.java new file mode 100644 index 00000000..85208c56 --- /dev/null +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/MetaFetchException.java @@ -0,0 +1,8 @@ +package net.opentsdb.aura.metrics.meta; + +public class MetaFetchException extends Exception { + + public MetaFetchException(String message, Exception e) { + super(message, e); + } +} diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClient.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClient.java index fb8d3ee8..0820f880 100644 --- a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClient.java +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/ShardedMetaClient.java @@ -4,8 +4,8 @@ public interface ShardedMetaClient { - Iterator getTimeseriesAllShards(String namespace, String query); + Iterator getTimeseriesAllShards(String namespace, String query) throws MetaFetchException; - ResT getTimeSeriesPerShard(String namespace, String query, int shardId); + ResT getTimeSeriesPerShard(String namespace, String query, int shardId) throws MetaFetchException; } diff --git a/opentsdb/src/test/java/net/opentsdb/aura/metrics/meta/endpoints/MystStatefulSetRegistryTest.java b/opentsdb/src/test/java/net/opentsdb/aura/metrics/meta/endpoints/MystStatefulSetRegistryTest.java index 1358f816..505fa7e1 100644 --- a/opentsdb/src/test/java/net/opentsdb/aura/metrics/meta/endpoints/MystStatefulSetRegistryTest.java +++ b/opentsdb/src/test/java/net/opentsdb/aura/metrics/meta/endpoints/MystStatefulSetRegistryTest.java @@ -44,24 +44,25 @@ public class MystStatefulSetRegistryTest { private static final String cluster_domain = "cluster.local"; private MockTSDB tsdb; + private ObjectMapper mapper1 = new ObjectMapper(new YAMLFactory()); + private void setUpForFile(String filename) throws IOException { - @Test - public void testListOfEndpoints1() throws IOException { - - ObjectMapper mapper1 = new ObjectMapper(new YAMLFactory()); + final JsonNode jsonNode = mapper1.readTree(new File(filename)); + tsdb = new MockTSDB(); + tsdb.registry = new DefaultRegistry(tsdb); + tsdb.registry.initialize(true); + tsdb.getConfig().register(BaseStatefulSetRegistry.DOMAIN, jsonNode.get("statefulset.domain").asText(), false, "UT" ); + tsdb.getConfig().register(BaseStatefulSetRegistry.DEFAULT_NAMESPACE, jsonNode.get("statefulset.default.namespace").asText(), false, "UT" ); + tsdb.getConfig().register(BaseStatefulSetRegistry.DEPLOYMENT_CONFIG, jsonNode.get("statefulset.namespaces").asText(), false, "UT" ); + } - try { - final JsonNode jsonNode = mapper1.readTree(new File("src/test/resources/ConfigTest1.yaml")); - final String s = jsonNode.get("statefulset.namespaces").textValue(); - MockTSDB tsdb = new MockTSDB(); - tsdb.registry = new DefaultRegistry(tsdb); - tsdb.registry.initialize(true); - tsdb.getConfig().register(BaseStatefulSetRegistry.DOMAIN, jsonNode.get("statefulset.domain").asText(), false, "UT" ); - tsdb.getConfig().register(BaseStatefulSetRegistry.DEFAULT_NAMESPACE, jsonNode.get("statefulset.default.namespace").asText(), false, "UT" ); - tsdb.getConfig().register(BaseStatefulSetRegistry.DEPLOYMENT_CONFIG, jsonNode.get("statefulset.namespaces").asText(), false, "UT" ); + @Test + public void testListOfEndpoints1() { + try { + setUpForFile("src/test/resources/ConfigTest1.yaml"); MystStatefulSetRegistry mystStatefulSetRegistry = new MystStatefulSetRegistry(); mystStatefulSetRegistry.initialize(tsdb, "id"); From 30f8a00c573f0c96c18c505d98b4ab59b7c7f7f0 Mon Sep 17 00:00:00 2001 From: Ravi kiran Chiruvolu Date: Mon, 18 Oct 2021 14:01:39 -0700 Subject: [PATCH 07/11] pod name pattern fix --- .../aura/metrics/meta/endpoints/BaseStatefulSetRegistry.java | 1 + 1 file changed, 1 insertion(+) diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/BaseStatefulSetRegistry.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/BaseStatefulSetRegistry.java index 7787be3a..d60bf141 100644 --- a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/BaseStatefulSetRegistry.java +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/BaseStatefulSetRegistry.java @@ -80,6 +80,7 @@ public Map> getEndpoints(String namespace, Deploymen final SimpleEndPoint endpoint = SimpleEndPoint.Builder.newBuilder() .withHost( String.format( + pod_name_pattern, prefix, j, prefix, From 4857031a8da5ada648fd33bac8e7bbfd5db262b0 Mon Sep 17 00:00:00 2001 From: Ravi kiran Chiruvolu Date: Mon, 18 Oct 2021 15:54:36 -0700 Subject: [PATCH 08/11] add the shard client factory --- .../metrics/meta/DefaultShardedMetaClientFactory.java | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/DefaultShardedMetaClientFactory.java diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/DefaultShardedMetaClientFactory.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/DefaultShardedMetaClientFactory.java new file mode 100644 index 00000000..d5289f3a --- /dev/null +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/DefaultShardedMetaClientFactory.java @@ -0,0 +1,8 @@ +package net.opentsdb.aura.metrics.meta; + +public class DefaultShardedMetaClientFactory implements ShardedMetaClientFactory { + @Override + public ShardedMetaClient getShardedMetaClient(String namespace) { + return new DefaultShardedMetaClient(); + } +} From 267aa7fd8d4b6250c8d5f3a5c0a1f763da124c8b Mon Sep 17 00:00:00 2001 From: Ravi kiran Chiruvolu Date: Thu, 21 Oct 2021 14:39:21 -0700 Subject: [PATCH 09/11] integrate sharded aura metrics --- .../aura/execution/EphemeralAuraFactory.java | 36 +++++++++++++------ 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/opentsdb/src/main/java/net/opentsdb/aura/execution/EphemeralAuraFactory.java b/opentsdb/src/main/java/net/opentsdb/aura/execution/EphemeralAuraFactory.java index ab50a19a..2a8d09ca 100644 --- a/opentsdb/src/main/java/net/opentsdb/aura/execution/EphemeralAuraFactory.java +++ b/opentsdb/src/main/java/net/opentsdb/aura/execution/EphemeralAuraFactory.java @@ -24,7 +24,8 @@ import com.google.common.collect.Lists; import com.google.common.reflect.TypeToken; import com.stumbleupon.async.Deferred; -import net.opentsdb.aura.execution.MockDiscoveryService.ShardEndPoint; +import net.opentsdb.aura.metrics.meta.endpoints.AuraMetricsStatefulSetRegistry; +import net.opentsdb.aura.metrics.meta.endpoints.ShardEndPoint; import net.opentsdb.common.Const; import net.opentsdb.core.BaseTSDBPlugin; import net.opentsdb.core.TSDB; @@ -37,7 +38,6 @@ import net.opentsdb.query.DefaultQueryResultId; import net.opentsdb.query.DefaultTimeSeriesDataSourceConfig; import net.opentsdb.query.QueryFillPolicy.FillWithRealPolicy; -import net.opentsdb.query.QueryNode; import net.opentsdb.query.QueryNodeConfig; import net.opentsdb.query.QueryPipelineContext; import net.opentsdb.query.TimeSeriesDataSourceConfig; @@ -56,6 +56,7 @@ import java.util.List; import java.util.Map; +import java.util.Random; /** * Use the TimeRouter like this @@ -87,11 +88,11 @@ public class EphemeralAuraFactory public static final String TYPE = "EphemeralAuraMetricsHttp"; protected AuraMetricsHttpFactory factory; - protected MockDiscoveryService discoveryService; + protected AuraMetricsStatefulSetRegistry discoveryService; protected String serviceKey; protected long relativeStart; protected long relativeEnd; - + private final Random random = new Random(); @Override public Deferred initialize(final TSDB tsdb, final String id) { this.id = Strings.isNullOrEmpty(id) ? TYPE : id; @@ -150,12 +151,12 @@ public Deferred initialize(final TSDB tsdb, final String id) { // could be null String discoveryId = tsdb.getConfig().getString(getConfigKey(DISCOVERY_ID)); - discoveryService = tsdb.getRegistry().getPlugin(MockDiscoveryService.class, discoveryId); + discoveryService = tsdb.getRegistry().getPlugin(AuraMetricsStatefulSetRegistry.class, discoveryId); if (discoveryService == null) { - LOG.error("No MockDiscoveryService found for source ID {}", + LOG.error("No AuraMetricsDiscoveryService found for source ID {}", discoveryId == null ? "default" : discoveryId); return Deferred.fromError(new IllegalArgumentException( - "No MockDiscoveryService found for source ID " + + "No AuraMetricsDiscoveryService found for source ID " + (discoveryId == null ? "default" : discoveryId))); } serviceKey = tsdb.getConfig().getString(getConfigKey(SERVICE_KEY)); @@ -175,8 +176,11 @@ public void setupGraph(final QueryPipelineContext context, final long now = DateTime.currentTimeMillis() / 1000; final Map> services = discoveryService.getEndpoints( - namespace, now - relativeStart, now - relativeEnd); + namespace, now - relativeStart); final List shards = services.get(serviceKey); + + LOG.info("Received end points: {} {}", services, shards); + if (shards == null || shards.isEmpty()) { throw new IllegalStateException("Unable to find shards for namespace " + namespace + " and service " + serviceKey); @@ -232,8 +236,7 @@ public boolean supportsQuery(final QueryPipelineContext context, final TimeSeriesDataSourceConfig config) { final String namespace = config.getMetric().getMetric() .substring(0, config.getMetric().getMetric().indexOf(".")); - Map> services = discoveryService.getEndpoints(namespace); - List shards = services.get(serviceKey); + final List shards = pickEndpoints(discoveryService.getEndpoints(namespace)); if (shards == null || shards.isEmpty()) { return false; } @@ -315,4 +318,17 @@ String getConfigKey(final String suffix) { } } + private List pickEndpoints(Map> endpointsMap) { + final int i = random.nextInt(endpointsMap.size()); + int j = 0; + for(String key: endpointsMap.keySet()) { + if (j == i) { + return endpointsMap.get(key); + } + j++; + } + + return null; + } + } From 430bed83ff21b7b4fd8f5e612489cd41723d7c31 Mon Sep 17 00:00:00 2001 From: Ravi kiran Chiruvolu Date: Fri, 29 Oct 2021 10:12:25 -0700 Subject: [PATCH 10/11] epoch fi --- .../meta/endpoints/AuraMetricsStatefulSetRegistry.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/AuraMetricsStatefulSetRegistry.java b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/AuraMetricsStatefulSetRegistry.java index f613bf3c..72ea9c93 100644 --- a/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/AuraMetricsStatefulSetRegistry.java +++ b/opentsdb/src/main/java/net/opentsdb/aura/metrics/meta/endpoints/AuraMetricsStatefulSetRegistry.java @@ -27,9 +27,7 @@ private int getEpochPrefix(long epoch, int epochLength) { //Get index of hour in the day. final long epochHr = epoch - epoch % 3600; final long epochDay = epoch - epoch % 86400; - final int hrIndex = (int)(epochHr - epochDay) / 3600; - final int epochLengthInHr = epochLength / 3600; - return hrIndex / epochLengthInHr; + return (int)(epochHr - epochDay) / 3600; } @Override From 86e0521094449eac6c879d2f40b6639eb4ff50cb Mon Sep 17 00:00:00 2001 From: Ravi kiran Chiruvolu Date: Fri, 29 Oct 2021 10:13:24 -0700 Subject: [PATCH 11/11] aura discovery service --- .../aura/execution/EphemeralAuraFactory.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/opentsdb/src/main/java/net/opentsdb/aura/execution/EphemeralAuraFactory.java b/opentsdb/src/main/java/net/opentsdb/aura/execution/EphemeralAuraFactory.java index 44646652..c0f56fb2 100644 --- a/opentsdb/src/main/java/net/opentsdb/aura/execution/EphemeralAuraFactory.java +++ b/opentsdb/src/main/java/net/opentsdb/aura/execution/EphemeralAuraFactory.java @@ -151,14 +151,10 @@ public Deferred initialize(final TSDB tsdb, final String id) { // could be null String discoveryId = tsdb.getConfig().getString(getConfigKey(DISCOVERY_ID)); - discoveryService = tsdb.getRegistry().getPlugin(AuraMetricsStatefulSetRegistry.class, discoveryId); - if (discoveryService == null) { - LOG.error("No AuraMetricsDiscoveryService found for source ID {}", - discoveryId == null ? "default" : discoveryId); - return Deferred.fromError(new IllegalArgumentException( - "No AuraMetricsDiscoveryService found for source ID " + - (discoveryId == null ? "default" : discoveryId))); - } + + LOG.info("Discovery id: {} for AuraMetricsStatefulSetRegistry", discoveryId); + + discoveryService = new AuraMetricsStatefulSetRegistry(); LOG.info("Successfully initialized Ephemeral Aura Source Factory with ID {}", (id == null ? "default" : id));