Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 51 additions & 14 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<groupId>com.pythian.opentsdb</groupId>
<artifactId>asyncbigtable</artifactId>
<version>0.4.3</version>
<version>0.4.4</version>
<packaging>jar</packaging>

<name>Async Bigtable library</name>
Expand Down Expand Up @@ -33,6 +33,25 @@
<url>https://github.com/OpenTSDB/asyncbigtable/issues</url>
</issueManagement>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>1.34.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>26.27.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<developers>
<developer>
<id>tsuna</id>
Expand Down Expand Up @@ -230,6 +249,10 @@
</build>

<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>com.stumbleupon</groupId>
<artifactId>async</artifactId>
Expand All @@ -242,28 +265,42 @@
<version>1.7.7</version>
</dependency>

<!-- OpenTSDB depends on guava 18.0 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
<version>27.0-jre</version>
</dependency>

<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-2.x-hadoop</artifactId>
<version>1.23.0</version>
</dependency>

<!-- runtime dependencies -->
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-2.x-hadoop</artifactId>
<version>2.15.5</version>
<exclusions>
<exclusion>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>


<!-- runtime dependencies -->

<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/org/hbase/async/AtomicIncrementRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
*/
package org.hbase.async;

import java.nio.charset.StandardCharsets;

/**
* Atomically increments a value in BigTable.
*
Expand Down Expand Up @@ -101,8 +103,8 @@ public AtomicIncrementRequest(final String table,
final String family,
final String qualifier,
final long amount) {
this(table.getBytes(), key.getBytes(), family.getBytes(),
qualifier.getBytes(), amount);
this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), family.getBytes(StandardCharsets.UTF_8),
qualifier.getBytes(StandardCharsets.UTF_8), amount);
}

/**
Expand All @@ -119,8 +121,8 @@ public AtomicIncrementRequest(final String table,
final String key,
final String family,
final String qualifier) {
this(table.getBytes(), key.getBytes(), family.getBytes(),
qualifier.getBytes(), 1);
this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), family.getBytes(StandardCharsets.UTF_8),
qualifier.getBytes(StandardCharsets.UTF_8), 1);
}

/**
Expand Down
70 changes: 37 additions & 33 deletions src/main/java/org/hbase/async/BinaryPrefixComparator.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
*/
package org.hbase.async;

import java.lang.reflect.Field;

import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.RegexStringComparator;

Expand Down Expand Up @@ -64,38 +62,44 @@ public String toString() {
Bytes.pretty(value));
}

// Helper method (can be shared or duplicated from KeyRegexpFilter)
private boolean isMetaCharacter(int b) {
// Metacharacters: \ * + ? | { [ ( ) ^ $ . #
return b == 92 || b == 42 || b == 43 || b == 63 || b == 124 ||
b == 123 || b == 91 || b == 40 || b == 41 || b == 94 ||
b == 36 || b == 46 || b == 35;
}

@Override
public ByteArrayComparable getBigtableFilter() {
// NOTE: Bigtable doesn't support the BinaryPrefixComparator (grrrr).
// Se https://cloud.google.com/bigtable/docs/hbase-differences#filters.
// So we have to convert it to a regex filter.
StringBuilder buf = new StringBuilder()
.append("(?s)\\Q");

for (int i = 0; i < value.length; i++) {
buf.append((char) (value[i] & 0xFF));
}
buf.append("\\E.*");
try {
// WARNING: This is some ugly ass code. It WILL break at some point.
// Bigtable uses RE2 and runs in raw byte mode. TSDB writes regex with
// byte values but when passing it through the HTable APIs it's converted
// to UTF and serializes differently than the old AsyncHBase client. The
// native BigTable client will pass the regex along properly BUT we need
// to bypass the RegexStringComparator methods and inject our ASCII regex
// directly into the underlying comparator object. Hopefully this is
// temporary (famous last words) until we get to a native Bigtable wrapper.
RegexStringComparator comparator = new RegexStringComparator(buf.toString());
final Field field = ByteArrayComparable.class.getDeclaredField("value");
field.setAccessible(true);
field.set(comparator, buf.toString().getBytes(HBaseClient.ASCII));
field.setAccessible(false);
// Convert the binary prefix into an ASCII-safe RE2 regex prefix match.
// Format: (?s)<sanitized_prefix>.*

StringBuilder sanitizer = new StringBuilder(value.length * 3 + 10);

// Use DOTALL flag (?s) as in the original implementation.
sanitizer.append("(?s)^");

// Sanitize the prefix (literal match logic).
for (byte b : value) {
int unsigned = b & 0xFF;

if (isMetaCharacter(unsigned)) {
sanitizer.append('\\').append((char) unsigned);
} else if (unsigned >= 32 && unsigned < 127) {
// Pass printable non-meta ASCII through.
sanitizer.append((char) unsigned);
} else {
// Hex escape others for binary safety.
sanitizer.append(String.format("\\x%02x", unsigned));
}
}

// Append wildcard to match the rest of the key
sanitizer.append(".*");

// Create the comparator. No reflection or setCharset needed.
RegexStringComparator comparator = new RegexStringComparator(sanitizer.toString());
return comparator;
} catch (NoSuchFieldException e) {
throw new RuntimeException("ByteArrayComparator must have changed, "
+ "can't find the field", e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Access denied when hacking the "
+ "regex comparator field", e);
}
}
}
8 changes: 4 additions & 4 deletions src/main/java/org/hbase/async/Bytes.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
package org.hbase.async;


import org.apache.hbase.thirdparty.io.netty.util.CharsetUtil;
import java.nio.charset.StandardCharsets;

import java.util.*;

Expand Down Expand Up @@ -288,12 +288,12 @@ public static byte[] fromLong(final long n) {

/** Transforms a string into an UTF-8 encoded byte array. */
public static byte[] UTF8(final String s) {
return s.getBytes(CharsetUtil.UTF_8);
return s.getBytes(StandardCharsets.UTF_8);
}

/** Transforms a string into an ISO-8859-1 encoded byte array. */
public static byte[] ISO88591(final String s) {
return s.getBytes(CharsetUtil.ISO_8859_1);
return s.getBytes(StandardCharsets.ISO_8859_1);
}

// ---------------------------- //
Expand Down Expand Up @@ -401,7 +401,7 @@ public static String hex(long v) {
for (/**/; i > 1; i--) {
buf[i] = '0';
}
return new String(buf);
return new String(buf, StandardCharsets.US_ASCII);
}

// ---------------------- //
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/hbase/async/Counter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
package org.hbase.async;

import org.hbase.async.jsr166e.LongAdder;
import java.util.concurrent.atomic.LongAdder;

/**
* An atomic counter to replace {@link java.util.concurrent.atomic.AtomicLong}.
Expand Down
14 changes: 8 additions & 6 deletions src/main/java/org/hbase/async/DeleteRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
*/
package org.hbase.async;

import java.nio.charset.StandardCharsets;

/**
* Deletes some data into BigTable.
*
Expand Down Expand Up @@ -281,7 +283,7 @@ public DeleteRequest(final byte[] table,
* @throws IllegalArgumentException if any argument is malformed.
*/
public DeleteRequest(final String table, final String key) {
this(table.getBytes(), key.getBytes(), null, null,
this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), null, null,
KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK);
}

Expand All @@ -295,7 +297,7 @@ public DeleteRequest(final String table, final String key) {
public DeleteRequest(final String table,
final String key,
final String family) {
this(table.getBytes(), key.getBytes(), family.getBytes(), null,
this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), family.getBytes(StandardCharsets.UTF_8), null,
KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK);
}

Expand All @@ -312,8 +314,8 @@ public DeleteRequest(final String table,
final String key,
final String family,
final String qualifier) {
this(table.getBytes(), key.getBytes(), family.getBytes(),
qualifier == null ? null : new byte[][] { qualifier.getBytes() },
this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), family.getBytes(StandardCharsets.UTF_8),
qualifier == null ? null : new byte[][] { qualifier.getBytes(StandardCharsets.UTF_8) },
KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK);
}

Expand All @@ -333,8 +335,8 @@ public DeleteRequest(final String table,
final String family,
final String qualifier,
final RowLock lock) {
this(table.getBytes(), key.getBytes(), family.getBytes(),
qualifier == null ? null : new byte[][] { qualifier.getBytes() },
this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), family.getBytes(StandardCharsets.UTF_8),
qualifier == null ? null : new byte[][] { qualifier.getBytes(StandardCharsets.UTF_8) },
KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK);
}

Expand Down
10 changes: 6 additions & 4 deletions src/main/java/org/hbase/async/GetRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
*/
package org.hbase.async;

import java.nio.charset.StandardCharsets;

/**
* Reads something from BigTable.
*
Expand Down Expand Up @@ -71,7 +73,7 @@ public GetRequest(final byte[] table, final byte[] key) {
* <strong>This byte array will NOT be copied.</strong>
*/
public GetRequest(final String table, final byte[] key) {
this(table.getBytes(), key);
this(table.getBytes(StandardCharsets.UTF_8), key);
}

/**
Expand All @@ -80,7 +82,7 @@ public GetRequest(final String table, final byte[] key) {
* @param key The row key to get in that table.
*/
public GetRequest(final String table, final String key) {
this(table.getBytes(), key.getBytes());
this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8));
}

/**
Expand Down Expand Up @@ -201,7 +203,7 @@ public GetRequest family(final byte[] family) {

/** Specifies a particular column family to get. */
public GetRequest family(final String family) {
return family(family.getBytes());
return family(family.getBytes(StandardCharsets.UTF_8));
}

/**
Expand Down Expand Up @@ -238,7 +240,7 @@ public GetRequest qualifiers(final byte[][] qualifiers) {

/** Specifies a particular column qualifier to get. */
public GetRequest qualifier(final String qualifier) {
return qualifier(qualifier.getBytes());
return qualifier(qualifier.getBytes(StandardCharsets.UTF_8));
}

/** Specifies an explicit row lock to use with this request. */
Expand Down
Loading