Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package net.opentsdb.aura.metrics.meta;

import gnu.trove.iterator.TLongIntIterator;
import gnu.trove.map.TLongIntMap;
import gnu.trove.map.hash.TLongIntHashMap;
import net.opentsdb.utils.XXHash;
Expand All @@ -28,7 +29,7 @@
import java.nio.ByteBuffer;
import java.util.Arrays;

public class DefaultMetaTimeSeriesQueryResult implements MetaTimeSeriesQueryResult {
public class DefaultMetaTimeSeriesQueryResult implements MetaResultWithDictionary {
private static final Logger logger = LoggerFactory.getLogger(DefaultMetaTimeSeriesQueryResult.class);

private static final int DEFAULT_ARRAY_SIZE = 32;
Expand Down Expand Up @@ -107,6 +108,11 @@ public GroupResult getGroup(final int index) {
return groupResults[index];
}

@Override
public Dictionary getDictionary() {
return dictionary;
}

public static class DefaultGroupResult implements GroupResult {
private long[] tagHashes = new long[DEFAULT_ARRAY_SIZE];
public long[] hashes = new long[DEFAULT_ARRAY_SIZE];
Expand Down Expand Up @@ -208,14 +214,6 @@ public long next() {
}
}

public interface Dictionary {
void put(final long id, final byte[] value);

byte[] get(final long id);

int size();
}

public static class DefaultDictionary implements Dictionary {

private int nextIndex = 0;
Expand Down Expand Up @@ -267,6 +265,15 @@ public int size() {
return nextIndex;
}

@Override
public void mergeInto(Dictionary dictionary) {
final TLongIntIterator iterator = indexMap.iterator();
while (iterator.hasNext()) {
iterator.advance();
dictionary.put(iterator.key(), values[iterator.value()]);
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@

import java.util.Arrays;

public class MergedMetaTimeSeriesQueryResult implements MetaTimeSeriesQueryResult {
public class MergedMetaTimeSeriesQueryResult implements MetaResultWithDictionary {
private static final Logger logger = LoggerFactory.getLogger(MergedMetaTimeSeriesQueryResult.class);

// TODO: Converting this into a map will make look ups faster.
// TODO: Atleast we can maintain another array with the hashes to find the index faster.
private DefaultMetaTimeSeriesQueryResult.DefaultGroupResult[] groupResults =
new DefaultMetaTimeSeriesQueryResult.DefaultGroupResult[32];
private Throwable exception;
Expand All @@ -34,7 +36,7 @@ public class MergedMetaTimeSeriesQueryResult implements MetaTimeSeriesQueryResul
private int groupCount;
private int hashCount;

public void add(DefaultMetaTimeSeriesQueryResult result) {
public void add(MetaResultWithDictionary result) {
if (exception != null) {
return;
}
Expand Down Expand Up @@ -84,11 +86,7 @@ public void add(DefaultMetaTimeSeriesQueryResult result) {
}

// Dictionary
TLongIntIterator iterator = result.dictionary.indexMap.iterator();
while (iterator.hasNext()) {
iterator.advance();
dictionary.put(iterator.key(), result.dictionary.values[iterator.value()]);
}
result.getDictionary().mergeInto(dictionary);
}

@Override
Expand Down Expand Up @@ -125,4 +123,8 @@ public GroupResult getGroup(int index) {
return groupResults[index];
}

@Override
public Dictionary getDictionary() {
return dictionary;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package net.opentsdb.aura.metrics.meta;

public interface MetaResultWithDictionary extends MetaTimeSeriesQueryResult {

public interface Dictionary {
void put(final long id, final byte[] value);

byte[] get(final long id);

int size();

// Unorthodox way of doing things.
// Helps with not having to manage iterators.
// There may be a better way to do this.
void mergeInto(Dictionary dictionary);

}

Dictionary getDictionary();
}
9 changes: 9 additions & 0 deletions meta-grpc-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def grpcVersion = '1.40.0'
def protobufVersion = '3.15.6'
def protocVersion = protobufVersion




dependencies {

api project(':core')
Expand All @@ -58,6 +61,9 @@ dependencies {
implementation "io.grpc:grpc-netty-shaded:${grpcVersion}"

implementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: '2.14.1'
implementation group: 'org.apache.logging.log4j', name: 'log4j-1.2-api', version: '2.14.1'
implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.14.1'
implementation 'org.projectlombok:lombok:1.18.16'

testImplementation "io.grpc:grpc-testing:${grpcVersion}"
testImplementation "io.grpc:grpc-testing:${grpcVersion}"
Expand All @@ -72,6 +78,9 @@ dependencies {
testCommonImplementation group: 'org.roaringbitmap', name: 'RoaringBitmap', version: '0.9.3'
testCommonImplementation "io.grpc:grpc-stub:${grpcVersion}"

compile 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.13.0-rc2'
compile 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.0-rc2'

}

protobuf {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* This file is part of OpenTSDB.
* Copyright (C) 2021 Yahoo.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.opentsdb.aura.metrics.meta.endpoints;

public interface ShardEndPoint {

String getHost();

boolean equals(Object that);

int getPort();

int getShardIndex();

Protocol getProtocol();

boolean mtls();

enum Protocol {
http1_1, http2_0, https1_1, https_2_0
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* This file is part of OpenTSDB.
* Copyright (C) 2021 Yahoo.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.opentsdb.aura.metrics.meta.endpoints;

import java.util.List;
import java.util.Map;

public interface ShardedServiceRegistry {

Map<String, List<ShardEndPoint>> getEndpoints(String namespace);

Map<String, List<ShardEndPoint>> getEndpoints(String namespace, long epoch);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import com.google.common.collect.Maps;
import com.google.common.reflect.TypeToken;
import com.stumbleupon.async.Deferred;
import net.opentsdb.aura.execution.MockDiscoveryService.ShardEndPoint;
import net.opentsdb.aura.metrics.meta.endpoints.AuraMetricsStatefulSetRegistry;
import net.opentsdb.aura.metrics.meta.endpoints.ShardEndPoint;
import net.opentsdb.common.Const;
import net.opentsdb.core.BaseTSDBPlugin;
import net.opentsdb.core.TSDB;
Expand Down Expand Up @@ -57,6 +58,7 @@

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;

/**
Expand Down Expand Up @@ -89,10 +91,12 @@ public class EphemeralAuraFactory
public static final String TYPE = "EphemeralAuraMetricsHttp";

protected AuraMetricsHttpFactory factory;
protected MockDiscoveryService discoveryService;
protected AuraMetricsStatefulSetRegistry discoveryService;
protected String serviceKey;

protected long relativeStart;
protected long relativeEnd;

private final Random random = new Random();
@Override
public Deferred<Object> initialize(final TSDB tsdb, final String id) {
this.id = Strings.isNullOrEmpty(id) ? TYPE : id;
Expand Down Expand Up @@ -149,6 +153,12 @@ public Deferred<Object> initialize(final TSDB tsdb, final String id) {

// could be null
String discoveryId = tsdb.getConfig().getString(getConfigKey(DISCOVERY_ID));

LOG.info("Discovery id: {} for AuraMetricsStatefulSetRegistry", discoveryId);

discoveryService = new AuraMetricsStatefulSetRegistry();

/**
discoveryService = tsdb.getRegistry().getPlugin(MockDiscoveryService.class, discoveryId);

if (discoveryService == null) {
Expand All @@ -158,7 +168,7 @@ public Deferred<Object> initialize(final TSDB tsdb, final String id) {
"No DiscoveryService found for source ID " +
(discoveryId == null ? "default" : discoveryId)));
}

*/
LOG.info("Successfully initialized Ephemeral Aura Source Factory with ID {}",
(id == null ? "default" : id));
return Deferred.fromResult(null);
Expand All @@ -174,7 +184,10 @@ public void setupGraph(final QueryPipelineContext context,
final long now = DateTime.currentTimeMillis() / 1000;
final Map<String, List<ShardEndPoint>> services = discoveryService.getEndpoints(
namespace, now - relativeStart);
final List<ShardEndPoint> shards = services.values().iterator().next();
final List<ShardEndPoint> shards = pickEndpoints(services);

LOG.info("Received end points: {} {}", services, shards);

if (shards == null || shards.isEmpty()) {
throw new IllegalStateException("Unable to find shards for namespace "
+ namespace);
Expand Down Expand Up @@ -251,8 +264,8 @@ public boolean supportsQuery(final QueryPipelineContext context,
final TimeSeriesDataSourceConfig config) {
final String namespace = config.getMetric().getMetric()
.substring(0, config.getMetric().getMetric().indexOf("."));
Map<String, List<ShardEndPoint>> services = discoveryService.getEndpoints(namespace);
List<ShardEndPoint> shards = services.values().iterator().next();
final List<ShardEndPoint> shards = pickEndpoints(discoveryService.getEndpoints(namespace));

if (shards == null || shards.isEmpty()) {
return false;
}
Expand Down Expand Up @@ -355,4 +368,18 @@ String getConfigKey(final String suffix) {
}
}

private List<ShardEndPoint> pickEndpoints(Map<String, List<ShardEndPoint>> endpointsMap) {

final int i = random.nextInt(endpointsMap.size());
int j = 0;
for(String key: endpointsMap.keySet()) {
if (j == i) {
return endpointsMap.get(key);
}
j++;
}

return null;
}

}
Loading