diff --git a/pom.xml b/pom.xml index aaefbdd..b6beced 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ com.pythian.opentsdb asyncbigtable - 0.4.3 + 0.4.4 jar Async Bigtable library @@ -33,6 +33,25 @@ https://github.com/OpenTSDB/asyncbigtable/issues + + + + io.opentelemetry + opentelemetry-bom + 1.34.1 + pom + import + + + com.google.cloud + libraries-bom + 26.27.0 + pom + import + + + + tsuna @@ -230,6 +249,10 @@ + + io.opentelemetry + opentelemetry-api + com.stumbleupon async @@ -242,28 +265,42 @@ 1.7.7 - com.google.guava guava - 18.0 + 27.0-jre - com.google.cloud.bigtable - bigtable-hbase-2.x-hadoop - 1.23.0 - - - + com.google.cloud.bigtable + bigtable-hbase-2.x-hadoop + 2.15.5 + + + com.google.cloud.bigtable + bigtable-client-core + + + - org.slf4j - log4j-over-slf4j - 1.7.7 - runtime + org.apache.zookeeper + zookeeper + 3.4.14 + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + - + + + commons-lang commons-lang diff --git a/src/main/java/org/hbase/async/AtomicIncrementRequest.java b/src/main/java/org/hbase/async/AtomicIncrementRequest.java index 79640ca..fd550a3 100644 --- a/src/main/java/org/hbase/async/AtomicIncrementRequest.java +++ b/src/main/java/org/hbase/async/AtomicIncrementRequest.java @@ -26,6 +26,8 @@ */ package org.hbase.async; +import java.nio.charset.StandardCharsets; + /** * Atomically increments a value in BigTable. * @@ -101,8 +103,8 @@ public AtomicIncrementRequest(final String table, final String family, final String qualifier, final long amount) { - this(table.getBytes(), key.getBytes(), family.getBytes(), - qualifier.getBytes(), amount); + this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), family.getBytes(StandardCharsets.UTF_8), + qualifier.getBytes(StandardCharsets.UTF_8), amount); } /** @@ -119,8 +121,8 @@ public AtomicIncrementRequest(final String table, final String key, final String family, final String qualifier) { - this(table.getBytes(), key.getBytes(), family.getBytes(), - qualifier.getBytes(), 1); + this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), family.getBytes(StandardCharsets.UTF_8), + qualifier.getBytes(StandardCharsets.UTF_8), 1); } /** diff --git a/src/main/java/org/hbase/async/BinaryPrefixComparator.java b/src/main/java/org/hbase/async/BinaryPrefixComparator.java index 1b45a6b..0813c71 100644 --- a/src/main/java/org/hbase/async/BinaryPrefixComparator.java +++ b/src/main/java/org/hbase/async/BinaryPrefixComparator.java @@ -26,8 +26,6 @@ */ package org.hbase.async; -import java.lang.reflect.Field; - import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.RegexStringComparator; @@ -64,38 +62,44 @@ public String toString() { Bytes.pretty(value)); } +// Helper method (can be shared or duplicated from KeyRegexpFilter) + private boolean isMetaCharacter(int b) { + // Metacharacters: \ * + ? | { [ ( ) ^ $ . # + return b == 92 || b == 42 || b == 43 || b == 63 || b == 124 || + b == 123 || b == 91 || b == 40 || b == 41 || b == 94 || + b == 36 || b == 46 || b == 35; + } + + @Override public ByteArrayComparable getBigtableFilter() { - // NOTE: Bigtable doesn't support the BinaryPrefixComparator (grrrr). - // Se https://cloud.google.com/bigtable/docs/hbase-differences#filters. - // So we have to convert it to a regex filter. - StringBuilder buf = new StringBuilder() - .append("(?s)\\Q"); - - for (int i = 0; i < value.length; i++) { - buf.append((char) (value[i] & 0xFF)); - } - buf.append("\\E.*"); - try { - // WARNING: This is some ugly ass code. It WILL break at some point. - // Bigtable uses RE2 and runs in raw byte mode. TSDB writes regex with - // byte values but when passing it through the HTable APIs it's converted - // to UTF and serializes differently than the old AsyncHBase client. The - // native BigTable client will pass the regex along properly BUT we need - // to bypass the RegexStringComparator methods and inject our ASCII regex - // directly into the underlying comparator object. Hopefully this is - // temporary (famous last words) until we get to a native Bigtable wrapper. - RegexStringComparator comparator = new RegexStringComparator(buf.toString()); - final Field field = ByteArrayComparable.class.getDeclaredField("value"); - field.setAccessible(true); - field.set(comparator, buf.toString().getBytes(HBaseClient.ASCII)); - field.setAccessible(false); + // Convert the binary prefix into an ASCII-safe RE2 regex prefix match. + // Format: (?s).* + + StringBuilder sanitizer = new StringBuilder(value.length * 3 + 10); + + // Use DOTALL flag (?s) as in the original implementation. + sanitizer.append("(?s)^"); + + // Sanitize the prefix (literal match logic). + for (byte b : value) { + int unsigned = b & 0xFF; + + if (isMetaCharacter(unsigned)) { + sanitizer.append('\\').append((char) unsigned); + } else if (unsigned >= 32 && unsigned < 127) { + // Pass printable non-meta ASCII through. + sanitizer.append((char) unsigned); + } else { + // Hex escape others for binary safety. + sanitizer.append(String.format("\\x%02x", unsigned)); + } + } + + // Append wildcard to match the rest of the key + sanitizer.append(".*"); + + // Create the comparator. No reflection or setCharset needed. + RegexStringComparator comparator = new RegexStringComparator(sanitizer.toString()); return comparator; - } catch (NoSuchFieldException e) { - throw new RuntimeException("ByteArrayComparator must have changed, " - + "can't find the field", e); - } catch (IllegalAccessException e) { - throw new RuntimeException("Access denied when hacking the " - + "regex comparator field", e); - } } } diff --git a/src/main/java/org/hbase/async/Bytes.java b/src/main/java/org/hbase/async/Bytes.java index bada7ea..530c2bf 100644 --- a/src/main/java/org/hbase/async/Bytes.java +++ b/src/main/java/org/hbase/async/Bytes.java @@ -27,7 +27,7 @@ package org.hbase.async; -import org.apache.hbase.thirdparty.io.netty.util.CharsetUtil; +import java.nio.charset.StandardCharsets; import java.util.*; @@ -288,12 +288,12 @@ public static byte[] fromLong(final long n) { /** Transforms a string into an UTF-8 encoded byte array. */ public static byte[] UTF8(final String s) { - return s.getBytes(CharsetUtil.UTF_8); + return s.getBytes(StandardCharsets.UTF_8); } /** Transforms a string into an ISO-8859-1 encoded byte array. */ public static byte[] ISO88591(final String s) { - return s.getBytes(CharsetUtil.ISO_8859_1); + return s.getBytes(StandardCharsets.ISO_8859_1); } // ---------------------------- // @@ -401,7 +401,7 @@ public static String hex(long v) { for (/**/; i > 1; i--) { buf[i] = '0'; } - return new String(buf); + return new String(buf, StandardCharsets.US_ASCII); } // ---------------------- // diff --git a/src/main/java/org/hbase/async/Counter.java b/src/main/java/org/hbase/async/Counter.java index d29875c..2d24a61 100644 --- a/src/main/java/org/hbase/async/Counter.java +++ b/src/main/java/org/hbase/async/Counter.java @@ -26,7 +26,7 @@ */ package org.hbase.async; -import org.hbase.async.jsr166e.LongAdder; +import java.util.concurrent.atomic.LongAdder; /** * An atomic counter to replace {@link java.util.concurrent.atomic.AtomicLong}. diff --git a/src/main/java/org/hbase/async/DeleteRequest.java b/src/main/java/org/hbase/async/DeleteRequest.java index 2a9567b..a8c060d 100644 --- a/src/main/java/org/hbase/async/DeleteRequest.java +++ b/src/main/java/org/hbase/async/DeleteRequest.java @@ -26,6 +26,8 @@ */ package org.hbase.async; +import java.nio.charset.StandardCharsets; + /** * Deletes some data into BigTable. * @@ -281,7 +283,7 @@ public DeleteRequest(final byte[] table, * @throws IllegalArgumentException if any argument is malformed. */ public DeleteRequest(final String table, final String key) { - this(table.getBytes(), key.getBytes(), null, null, + this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), null, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } @@ -295,7 +297,7 @@ public DeleteRequest(final String table, final String key) { public DeleteRequest(final String table, final String key, final String family) { - this(table.getBytes(), key.getBytes(), family.getBytes(), null, + this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), family.getBytes(StandardCharsets.UTF_8), null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } @@ -312,8 +314,8 @@ public DeleteRequest(final String table, final String key, final String family, final String qualifier) { - this(table.getBytes(), key.getBytes(), family.getBytes(), - qualifier == null ? null : new byte[][] { qualifier.getBytes() }, + this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), family.getBytes(StandardCharsets.UTF_8), + qualifier == null ? null : new byte[][] { qualifier.getBytes(StandardCharsets.UTF_8) }, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } @@ -333,8 +335,8 @@ public DeleteRequest(final String table, final String family, final String qualifier, final RowLock lock) { - this(table.getBytes(), key.getBytes(), family.getBytes(), - qualifier == null ? null : new byte[][] { qualifier.getBytes() }, + this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), family.getBytes(StandardCharsets.UTF_8), + qualifier == null ? null : new byte[][] { qualifier.getBytes(StandardCharsets.UTF_8) }, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } diff --git a/src/main/java/org/hbase/async/GetRequest.java b/src/main/java/org/hbase/async/GetRequest.java index 2ae57ef..a428516 100644 --- a/src/main/java/org/hbase/async/GetRequest.java +++ b/src/main/java/org/hbase/async/GetRequest.java @@ -26,6 +26,8 @@ */ package org.hbase.async; +import java.nio.charset.StandardCharsets; + /** * Reads something from BigTable. * @@ -71,7 +73,7 @@ public GetRequest(final byte[] table, final byte[] key) { * This byte array will NOT be copied. */ public GetRequest(final String table, final byte[] key) { - this(table.getBytes(), key); + this(table.getBytes(StandardCharsets.UTF_8), key); } /** @@ -80,7 +82,7 @@ public GetRequest(final String table, final byte[] key) { * @param key The row key to get in that table. */ public GetRequest(final String table, final String key) { - this(table.getBytes(), key.getBytes()); + this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8)); } /** @@ -201,7 +203,7 @@ public GetRequest family(final byte[] family) { /** Specifies a particular column family to get. */ public GetRequest family(final String family) { - return family(family.getBytes()); + return family(family.getBytes(StandardCharsets.UTF_8)); } /** @@ -238,7 +240,7 @@ public GetRequest qualifiers(final byte[][] qualifiers) { /** Specifies a particular column qualifier to get. */ public GetRequest qualifier(final String qualifier) { - return qualifier(qualifier.getBytes()); + return qualifier(qualifier.getBytes(StandardCharsets.UTF_8)); } /** Specifies an explicit row lock to use with this request. */ diff --git a/src/main/java/org/hbase/async/HBaseClient.java b/src/main/java/org/hbase/async/HBaseClient.java index fd25d97..f032890 100644 --- a/src/main/java/org/hbase/async/HBaseClient.java +++ b/src/main/java/org/hbase/async/HBaseClient.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -71,6 +72,7 @@ import org.slf4j.LoggerFactory; import com.google.cloud.bigtable.hbase.BigtableConfiguration; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import com.stumbleupon.async.Callback; @@ -99,7 +101,7 @@ public final class HBaseClient { * TODO(tsuna): Make the tick duration configurable? */ private final HashedWheelTimer - timer = new HashedWheelTimer(Threads.newDaemonThreadFactory("Flush-Timer"), 20, MILLISECONDS); + timer = new HashedWheelTimer(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HBaseClient-Timer-%d").build(),20, MILLISECONDS); /** Up to how many milliseconds can we buffer an edit on the client side. */ private volatile short flush_interval = 1000; // ms @@ -287,7 +289,21 @@ public HBaseClient(final Config config, final ExecutorService executor) { public HBaseClient(final Configuration configuration, final ExecutorService executor) { this.executor = executor; num_connections_created.increment(); - hbase_config = BigtableConfiguration.asyncConfigure(configuration); + + // --- FIX: Extract project/instance IDs and call the correct 'configure' method --- + final String projectId = configuration.get("google.bigtable.project.id"); + final String instanceId = configuration.get("google.bigtable.instance.id"); + + if (projectId == null || projectId.isEmpty()) { + throw new IllegalArgumentException("Missing required configuration: google.bigtable.project.id"); + } + if (instanceId == null || instanceId.isEmpty()) { + throw new IllegalArgumentException("Missing required configuration: google.bigtable.instance.id"); + } + + hbase_config = BigtableConfiguration.configure(configuration, projectId, instanceId); + // --- END FIX --- + LOG.info("BigTable API: Connecting with config: {}", hbase_config); try { @@ -590,7 +606,7 @@ public Deferred shutdown() { */ public Deferred ensureTableFamilyExists(final String table, final String family) { - return ensureTableFamilyExists(table.getBytes(), family.getBytes()); + return ensureTableFamilyExists(table.getBytes(StandardCharsets.UTF_8), family.getBytes(StandardCharsets.UTF_8)); } /** @@ -642,7 +658,7 @@ public Deferred ensureTableFamilyExists(final byte[] table, final byte[] * @throws TableNotFoundException (deferred) if the table doesn't exist. */ public Deferred ensureTableExists(final String table) { - return ensureTableFamilyExists(table.getBytes(), EMPTY_ARRAY); + return ensureTableFamilyExists(table.getBytes(StandardCharsets.UTF_8), EMPTY_ARRAY); } /** @@ -914,7 +930,7 @@ public Scanner newScanner(final byte[] table) { * @return A new scanner for this table. */ public Scanner newScanner(final String table) { - return new Scanner(this, table.getBytes()); + return new Scanner(this, table.getBytes(StandardCharsets.UTF_8)); } /** @@ -1254,7 +1270,7 @@ public Deferred compareAndSet(final PutRequest edit, */ public Deferred compareAndSet(final PutRequest edit, final String expected) { - return compareAndSet(edit, expected.getBytes()); + return compareAndSet(edit, expected.getBytes(StandardCharsets.UTF_8)); } /** @@ -1363,7 +1379,7 @@ public Deferred prefetchMeta(final byte[] table, * @return An empty list */ public Deferred> locateRegions(final String table) { - return locateRegions(table.getBytes()); + return locateRegions(table.getBytes(StandardCharsets.UTF_8)); } /** diff --git a/src/main/java/org/hbase/async/KeyRegexpFilter.java b/src/main/java/org/hbase/async/KeyRegexpFilter.java index 68847c2..d804c3f 100644 --- a/src/main/java/org/hbase/async/KeyRegexpFilter.java +++ b/src/main/java/org/hbase/async/KeyRegexpFilter.java @@ -31,10 +31,10 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; -import org.apache.hbase.thirdparty.io.netty.util.CharsetUtil; +import org.apache.hadoop.hbase.CompareOperator; -import java.lang.reflect.Field; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.Arrays; /** @@ -80,7 +80,7 @@ public final class KeyRegexpFilter extends ScanFilter { * @param regexp The regular expression with which to filter the row keys. */ public KeyRegexpFilter(final String regexp) { - this(regexp, CharsetUtil.ISO_8859_1); + this(regexp, StandardCharsets.ISO_8859_1); } /** @@ -105,7 +105,7 @@ public KeyRegexpFilter(final String regexp, final Charset charset) { * the row keys. */ public KeyRegexpFilter(final byte[] regexp) { - this(regexp, CharsetUtil.ISO_8859_1); + this(regexp, StandardCharsets.ISO_8859_1); } /** @@ -129,8 +129,8 @@ byte[] name() { } public String toString() { - return "KeyRegexpFilter(\"" + new String(regexp, CharsetUtil.UTF_8) - + "\", " + new String(charset, CharsetUtil.UTF_8) + ')'; + return "KeyRegexpFilter(\"" + new String(regexp, StandardCharsets.UTF_8) + + "\", " + new String(charset, StandardCharsets.UTF_8) + ')'; } /** @return the regular expression, a copy of the byte array @@ -142,34 +142,73 @@ public byte[] getRegexp() { /** @return the character set for this regular expression * @since 1.8 */ public Charset getCharset() { - return Charset.forName(new String(charset)); + return Charset.forName(new String(charset, StandardCharsets.UTF_8)); } - + @Override Filter getBigtableFilter() { - RegexStringComparator comparator = new RegexStringComparator(regexp_string); - comparator.setCharset(charset_object); - try { - // WARNING: This is some ugly ass code. It WILL break at some point. - // Bigtable uses RE2 and runs in raw byte mode. TSDB writes regex with - // byte values but when passing it through the HTable APIs it's converted - // to UTF and serializes differently than the old AsyncHBase client. The - // native BigTable client will pass the regex along properly BUT we need - // to bypass the RegexStringComparator methods and inject our ASCII regex - // directly into the underlying comparator object. Hopefully this is - // temporary (famous last words) until we get to a native Bigtable wrapper. - final Field field = ByteArrayComparable.class.getDeclaredField("value"); - field.setAccessible(true); - field.set(comparator, regexp_string.getBytes(HBaseClient.ASCII)); - field.setAccessible(false); - return new RowFilter(CompareFilter.CompareOp.EQUAL, comparator); - } catch (NoSuchFieldException e) { - throw new RuntimeException("ByteArrayComparator must have changed, " - + "can't find the field", e); - } catch (IllegalAccessException e) { - throw new RuntimeException("Access denied when hacking the " - + "regex comparator field", e); - } + // SANITIZER V5: "Strip, Escape Meta, and Optimize ASCII" + + StringBuilder sanitizer = new StringBuilder(this.regexp.length * 2); + boolean inQuote = false; + + for (int i = 0; i < this.regexp.length; i++) { + int unsigned = this.regexp[i] & 0xFF; + + // CHECK FOR \Q (Start Quote) -> byte 92 followed by 81 + if (unsigned == 92 && i + 1 < this.regexp.length && (this.regexp[i+1] & 0xFF) == 81) { + inQuote = true; + i++; // Skip the 'Q' + continue; // Skip the '\' + } + + // CHECK FOR \E (End Quote) -> byte 92 followed by 69 + if (unsigned == 92 && i + 1 < this.regexp.length && (this.regexp[i+1] & 0xFF) == 69) { + inQuote = false; + i++; // Skip the 'E' + continue; // Skip the '\' + } + + if (inQuote) { + // Inside "Literal" block. + if (isMetaCharacter(unsigned)) { + // Escape metacharacters (e.g. '.' -> '\.') + sanitizer.append('\\').append((char) unsigned); + } else if (unsigned >= 32 && unsigned < 127) { + // Optimization: Pass printable non-meta ASCII through as-is. + sanitizer.append((char) unsigned); + } else { + // Hex-escape everything else (control chars and high-bit bytes) + sanitizer.append(String.format("\\x%02x", unsigned)); + } + } else { + // Structural Regex (e.g. ^, ., *) + if (unsigned >= 32 && unsigned < 127) { + sanitizer.append((char) unsigned); + } else { + sanitizer.append(String.format("\\x%02x", unsigned)); + } + } + } + + RegexStringComparator comparator = new RegexStringComparator(sanitizer.toString()); + + // setCharset() is ignored by the Bigtable adapter and causes compilation issues. + // It is omitted. + + // Use the modern CompareOperator (HBase 2.x) + return new RowFilter(CompareOperator.EQUAL, comparator); + } + + /** + * Checks if a byte corresponds to a Regex Metacharacter that requires escaping + * when found inside a Literal (\Q...\E) block. + */ + private boolean isMetaCharacter(int b) { + // Metacharacters: \ * + ? | { [ ( ) ^ $ . # + return b == 92 || b == 42 || b == 43 || b == 63 || b == 124 || + b == 123 || b == 91 || b == 40 || b == 41 || b == 94 || + b == 36 || b == 46 || b == 35; } } diff --git a/src/main/java/org/hbase/async/PutRequest.java b/src/main/java/org/hbase/async/PutRequest.java index fbcf072..9467bfb 100644 --- a/src/main/java/org/hbase/async/PutRequest.java +++ b/src/main/java/org/hbase/async/PutRequest.java @@ -26,6 +26,8 @@ */ package org.hbase.async; +import java.nio.charset.StandardCharsets; + /** * Puts some data into BigTable. * @@ -262,8 +264,8 @@ public PutRequest(final String table, final String family, final String qualifier, final String value) { - this(table.getBytes(), key.getBytes(), family.getBytes(), - qualifier.getBytes(), value.getBytes(), + this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), family.getBytes(StandardCharsets.UTF_8), + qualifier.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8), KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } @@ -289,8 +291,8 @@ public PutRequest(final String table, final String qualifier, final String value, final RowLock lock) { - this(table.getBytes(), key.getBytes(), family.getBytes(), - qualifier.getBytes(), value.getBytes(), + this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), family.getBytes(StandardCharsets.UTF_8), + qualifier.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8), KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } diff --git a/src/main/java/org/hbase/async/RegexStringComparator.java b/src/main/java/org/hbase/async/RegexStringComparator.java index b80368e..dfebde9 100644 --- a/src/main/java/org/hbase/async/RegexStringComparator.java +++ b/src/main/java/org/hbase/async/RegexStringComparator.java @@ -27,10 +27,11 @@ package org.hbase.async; import java.nio.charset.Charset; +// Use StandardCharsets for modern compatibility +import java.nio.charset.StandardCharsets; import org.apache.hadoop.hbase.filter.ByteArrayComparable; -import com.google.common.base.Charsets; /** * A regular expression comparator used in comparison filters such as RowFilter, @@ -68,7 +69,7 @@ public final class RegexStringComparator extends FilterComparator { * @param expr The regular expression with which to filter. */ public RegexStringComparator(String expr) { - this(expr, Charsets.UTF_8); + this(expr, StandardCharsets.UTF_8); } /** @@ -106,6 +107,66 @@ public String toString() { @Override ByteArrayComparable getBigtableFilter() { - return new org.apache.hadoop.hbase.filter.RegexStringComparator(expr); + // Modified for Bigtable: Implement Strategy A (Sanitization) + // Convert the regex string to bytes using the specified charset, then sanitize + // it into an ASCII-safe RE2 regex string. This avoids UTF-8 corruption when + // passing binary data through the standard HBase API. + + byte[] raw = expr.getBytes(charset); + StringBuilder sanitizer = new StringBuilder(raw.length * 2); + boolean inQuote = false; + + for (int i = 0; i < raw.length; i++) { + int unsigned = raw[i] & 0xFF; + + // CHECK FOR \Q (Start Quote) -> byte 92 followed by 81 + if (unsigned == 92 && i + 1 < raw.length && (raw[i+1] & 0xFF) == 81) { + inQuote = true; + i++; // Skip the 'Q' + continue; // Skip the '\' + } + + // CHECK FOR \E (End Quote) -> byte 92 followed by 69 + if (unsigned == 92 && i + 1 < raw.length && (raw[i+1] & 0xFF) == 69) { + inQuote = false; + i++; // Skip the 'E' + continue; // Skip the '\' + } + + if (inQuote) { + // Inside "Literal" block. + if (isMetaCharacter(unsigned)) { + // Escape metacharacters (e.g. '.' -> '\.') + sanitizer.append('\\').append((char) unsigned); + } else if (unsigned >= 32 && unsigned < 127) { + // Optimization: Pass printable non-meta ASCII through as-is. + sanitizer.append((char) unsigned); + } else { + // Hex-escape everything else (control chars and high-bit bytes) + sanitizer.append(String.format("\\x%02x", unsigned)); + } + } else { + // Structural Regex (e.g. ^, ., *) + if (unsigned >= 32 && unsigned < 127) { + sanitizer.append((char) unsigned); + } else { + sanitizer.append(String.format("\\x%02x", unsigned)); + } + } + } + + // Use the standard constructor with the sanitized string. + return new org.apache.hadoop.hbase.filter.RegexStringComparator(sanitizer.toString()); + } + + /** + * Checks if a byte corresponds to a Regex Metacharacter that requires escaping + * when found inside a Literal (\Q...\E) block. + */ + private boolean isMetaCharacter(int b) { + // Metacharacters: \ * + ? | { [ ( ) ^ $ . # + return b == 92 || b == 42 || b == 43 || b == 63 || b == 124 || + b == 123 || b == 91 || b == 40 || b == 41 || b == 94 || + b == 36 || b == 46 || b == 35; } } diff --git a/src/main/java/org/hbase/async/RowLockRequest.java b/src/main/java/org/hbase/async/RowLockRequest.java index 28065cf..98ca7d0 100644 --- a/src/main/java/org/hbase/async/RowLockRequest.java +++ b/src/main/java/org/hbase/async/RowLockRequest.java @@ -26,6 +26,8 @@ */ package org.hbase.async; +import java.nio.charset.StandardCharsets; + /** * A leftover from early AsyncHBase days. This class should not be used and all * methods throw unsupported operation exceptions. @@ -53,7 +55,7 @@ public RowLockRequest(final byte[] table, final byte[] key) { * @throws UnsupportedOperationException all the time */ public RowLockRequest(final String table, final String key) { - this(table.getBytes(), key.getBytes()); + this(table.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8)); } @Override diff --git a/src/main/java/org/hbase/async/Scanner.java b/src/main/java/org/hbase/async/Scanner.java index 479593e..9e81832 100644 --- a/src/main/java/org/hbase/async/Scanner.java +++ b/src/main/java/org/hbase/async/Scanner.java @@ -30,12 +30,13 @@ import java.io.IOException; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.NavigableMap; +import com.google.cloud.bigtable.hbase.BigtableExtendedScan; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,9 +82,6 @@ public final class Scanner { private static final Logger LOG = LoggerFactory.getLogger(Scanner.class); - /** HBase API scan instance */ - private final Scan hbase_scan; - /** * HBase API ResultScanner. After the scan is submitted is must not be null. */ @@ -187,8 +185,6 @@ public Scanner(final HBaseClient client, final byte[] table) { KeyValue.checkTable(table); this.client = client; this.table = table; - - hbase_scan = new Scan(); } /** @@ -210,8 +206,6 @@ public void setStartKey(final byte[] start_key) { KeyValue.checkKey(start_key); checkScanningNotStarted(); this.start_key = start_key; - - hbase_scan.setStartRow(start_key); } /** @@ -220,7 +214,7 @@ public void setStartKey(final byte[] start_key) { * @throws IllegalStateException if scanning already started. */ public void setStartKey(final String start_key) { - setStartKey(start_key.getBytes()); + setStartKey(start_key.getBytes(StandardCharsets.UTF_8)); } /** @@ -235,8 +229,6 @@ public void setStopKey(final byte[] stop_key) { KeyValue.checkKey(stop_key); checkScanningNotStarted(); this.stop_key = stop_key; - - hbase_scan.setStopRow(stop_key); } /** @@ -245,7 +237,7 @@ public void setStopKey(final byte[] stop_key) { * @throws IllegalStateException if scanning already started. */ public void setStopKey(final String stop_key) { - setStopKey(stop_key.getBytes()); + setStopKey(stop_key.getBytes(StandardCharsets.UTF_8)); } /** @@ -258,13 +250,11 @@ public void setFamily(final byte[] family) { KeyValue.checkFamily(family); checkScanningNotStarted(); families = new byte[][] { family }; - - hbase_scan.addFamily(family); } /** Specifies a particular column family to scan. */ public void setFamily(final String family) { - setFamily(family.getBytes()); + setFamily(family.getBytes(StandardCharsets.UTF_8)); } /** @@ -291,15 +281,6 @@ public void setFamilies(byte[][] families, byte[][][] qualifiers) { } this.families = families; this.qualifiers = qualifiers; - - for (int i = 0; i < families.length; i++) { - KeyValue.checkFamily(families[i]); - if (qualifiers != null && qualifiers[i] != null) { - for (byte[] qualifier : qualifiers[i]) { - hbase_scan.addColumn(families[i], qualifier); - } - } - } } /** @@ -309,11 +290,9 @@ public void setFamilies(final String... families) { checkScanningNotStarted(); this.families = new byte[families.length][]; for (int i = 0; i < families.length; i++) { - this.families[i] = families[i].getBytes(); + this.families[i] = families[i].getBytes(StandardCharsets.UTF_8); KeyValue.checkFamily(this.families[i]); qualifiers[i] = null; - - hbase_scan.addFamily(this.families[i]); } } @@ -330,14 +309,11 @@ public void setQualifier(final byte[] qualifier) { KeyValue.checkQualifier(qualifier); checkScanningNotStarted(); this.qualifiers = new byte[][][] { { qualifier } }; - - if (families.length > 0) - hbase_scan.addColumn(families[0], qualifier); } /** Specifies a particular column qualifier to scan. */ public void setQualifier(final String qualifier) { - setQualifier(qualifier.getBytes()); + setQualifier(qualifier.getBytes(StandardCharsets.UTF_8)); } /** @@ -379,7 +355,6 @@ public ScanFilter getFilter() { */ public void clearFilter() { filter = null; - hbase_scan.setFilter(null); } /** @@ -427,7 +402,6 @@ public void setKeyRegexp(final String regexp, final Charset charset) { public void setServerBlockCache(final boolean populate_blockcache) { checkScanningNotStarted(); this.populate_blockcache = populate_blockcache; - hbase_scan.setCacheBlocks(populate_blockcache); } /** @@ -493,7 +467,6 @@ public void setMaxNumKeyValues(final int max_num_kvs) { } checkScanningNotStarted(); this.max_num_kvs = max_num_kvs; - hbase_scan.setBatch(max_num_kvs); } /** @@ -521,8 +494,6 @@ public void setMaxVersions(final int versions) { } checkScanningNotStarted(); this.versions = versions; - - hbase_scan.setMaxVersions(versions); } /** @@ -552,8 +523,6 @@ public void setMaxNumBytes(final long max_num_bytes) { } checkScanningNotStarted(); this.max_num_bytes = max_num_bytes; - - hbase_scan.setMaxResultSize(max_num_bytes); } /** @@ -585,12 +554,6 @@ public void setMinTimestamp(final long timestamp) { } checkScanningNotStarted(); min_timestamp = timestamp; - - try { - hbase_scan.setTimeRange(getMinTimestamp(), getMaxTimestamp()); - } catch (IOException e) { - throw new IllegalArgumentException("Invalid timestamp: " + timestamp, e); - } } /** @@ -622,12 +585,6 @@ public void setMaxTimestamp(final long timestamp) { } checkScanningNotStarted(); max_timestamp = timestamp; - - try { - hbase_scan.setTimeRange(getMinTimestamp(), getMaxTimestamp()); - } catch (IOException e) { - throw new IllegalArgumentException("Invalid timestamp: " + timestamp, e); - } } /** @@ -664,37 +621,49 @@ public void setTimeRange(final long min_timestamp, final long max_timestamp) { // We now have the guarantee that max_timestamp >= 0, no need to check it. this.min_timestamp = min_timestamp; this.max_timestamp = max_timestamp; + } + + BigtableExtendedScan getHbaseScan() { + // Instantiate a NEW object to guarantee a clean state + BigtableExtendedScan scan = new BigtableExtendedScan(); + + // Apply Ranges (BigtableExtendedScan treats empty byte[] as unbounded) + scan.addRange(this.start_key, this.stop_key); + // Apply Configuration from local Scanner fields try { - hbase_scan.setTimeRange(getMinTimestamp(), getMaxTimestamp()); + scan.setTimeRange(this.min_timestamp, this.max_timestamp); } catch (IOException e) { - throw new IllegalArgumentException("Invalid time range", e); + // This technically shouldn't happen as setters validate, + // but we must handle the checked exception. + throw new RuntimeException("Invalid time range configuration", e); } - } - - /** @return the HTable scan object */ - Scan getHbaseScan() { - // setup the filters - if (filter == null) { - return hbase_scan; + + scan.readVersions(this.versions); + scan.setBatch(this.max_num_kvs); + scan.setMaxResultSize(this.max_num_bytes); + scan.setCacheBlocks(this.populate_blockcache); + + // Apply Families and Qualifiers + if (this.families != null) { + for (int i = 0; i < this.families.length; i++) { + byte[] family = this.families[i]; + if (this.qualifiers != null && i < this.qualifiers.length && this.qualifiers[i] != null) { + for (byte[] qualifier : this.qualifiers[i]) { + scan.addColumn(family, qualifier); + } + } else { + scan.addFamily(family); + } + } + } + + // Apply Filter - This invokes your new KeyRegexpFilter logic + if (this.filter != null) { + scan.setFilter(this.filter.getBigtableFilter()); } - - // TODO - right now we ONLY push the regex filter to Bigtable. The fuzzy - // filter isn't setup properly yet and we're not using column filters at this - // time. -// if (filter instanceof FilterList) { -// for (final ScanFilter sf : ((FilterList)filter).getFilters()) { -// if (sf instanceof KeyRegexpFilter) { -// hbase_scan.setFilter(((KeyRegexpFilter)sf).getRegexFilterForBigtable()); -// return hbase_scan; -// } -// } -// } else if (filter instanceof KeyRegexpFilter) { -// hbase_scan.setFilter(((KeyRegexpFilter)filter).getRegexFilterForBigtable()); -// return hbase_scan; -// } - hbase_scan.setFilter(filter.getBigtableFilter()); - return hbase_scan; + + return scan; } /** @return the scanner results to work with */ diff --git a/src/main/java/org/hbase/async/jsr166e/LongAdder.java b/src/main/java/org/hbase/async/jsr166e/LongAdder.java deleted file mode 100644 index 6ce53b6..0000000 --- a/src/main/java/org/hbase/async/jsr166e/LongAdder.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/publicdomain/zero/1.0/ - */ - -package org.hbase.async.jsr166e; -import java.util.concurrent.atomic.AtomicLong; -import java.io.IOException; -import java.io.Serializable; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -/** - * One or more variables that together maintain an initially zero - * {@code long} sum. When updates (method {@link #add}) are contended - * across threads, the set of variables may grow dynamically to reduce - * contention. Method {@link #sum} (or, equivalently, {@link - * #longValue}) returns the current total combined across the - * variables maintaining the sum. - * - *

This class is usually preferable to {@link AtomicLong} when - * multiple threads update a common sum that is used for purposes such - * as collecting statistics, not for fine-grained synchronization - * control. Under low update contention, the two classes have similar - * characteristics. But under high contention, expected throughput of - * this class is significantly higher, at the expense of higher space - * consumption. - * - *

This class extends {@link Number}, but does not define - * methods such as {@code hashCode} and {@code compareTo} because - * instances are expected to be mutated, and so are not useful as - * collection keys. - * - *

jsr166e note: This class is targeted to be placed in - * java.util.concurrent.atomic - * - * @since 1.8 - * @author Doug Lea - */ -public class LongAdder extends Striped64 implements Serializable { - private static final long serialVersionUID = 7249069246863182397L; - - /** - * Version of plus for use in retryUpdate - */ - final long fn(long v, long x) { return v + x; } - - /** - * Creates a new adder with initial sum of zero. - */ - public LongAdder() { - } - - /** - * Adds the given value. - * - * @param x the value to add - */ - public void add(long x) { - Cell[] as; long b, v; HashCode hc; Cell a; int n; - if ((as = cells) != null || !casBase(b = base, b + x)) { - boolean uncontended = true; - int h = (hc = threadHashCode.get()).code; - if (as == null || (n = as.length) < 1 || - (a = as[(n - 1) & h]) == null || - !(uncontended = a.cas(v = a.value, v + x))) - retryUpdate(x, hc, uncontended); - } - } - - /** - * Equivalent to {@code add(1)}. - */ - public void increment() { - add(1L); - } - - /** - * Equivalent to {@code add(-1)}. - */ - public void decrement() { - add(-1L); - } - - /** - * Returns the current sum. The returned value is NOT an - * atomic snapshot: Invocation in the absence of concurrent - * updates returns an accurate result, but concurrent updates that - * occur while the sum is being calculated might not be - * incorporated. - * - * @return the sum - */ - public long sum() { - long sum = base; - Cell[] as = cells; - if (as != null) { - int n = as.length; - for (int i = 0; i < n; ++i) { - Cell a = as[i]; - if (a != null) - sum += a.value; - } - } - return sum; - } - - /** - * Resets variables maintaining the sum to zero. This method may - * be a useful alternative to creating a new adder, but is only - * effective if there are no concurrent updates. Because this - * method is intrinsically racy, it should only be used when it is - * known that no threads are concurrently updating. - */ - public void reset() { - internalReset(0L); - } - - /** - * Equivalent in effect to {@link #sum} followed by {@link - * #reset}. This method may apply for example during quiescent - * points between multithreaded computations. If there are - * updates concurrent with this method, the returned value is - * not guaranteed to be the final value occurring before - * the reset. - * - * @return the sum - */ - public long sumThenReset() { - long sum = base; - Cell[] as = cells; - base = 0L; - if (as != null) { - int n = as.length; - for (int i = 0; i < n; ++i) { - Cell a = as[i]; - if (a != null) { - sum += a.value; - a.value = 0L; - } - } - } - return sum; - } - - /** - * Returns the String representation of the {@link #sum}. - * @return the String representation of the {@link #sum} - */ - public String toString() { - return Long.toString(sum()); - } - - /** - * Equivalent to {@link #sum}. - * - * @return the sum - */ - public long longValue() { - return sum(); - } - - /** - * Returns the {@link #sum} as an {@code int} after a narrowing - * primitive conversion. - */ - public int intValue() { - return (int)sum(); - } - - /** - * Returns the {@link #sum} as a {@code float} - * after a widening primitive conversion. - */ - public float floatValue() { - return (float)sum(); - } - - /** - * Returns the {@link #sum} as a {@code double} after a widening - * primitive conversion. - */ - public double doubleValue() { - return (double)sum(); - } - - private void writeObject(java.io.ObjectOutputStream s) - throws java.io.IOException { - s.defaultWriteObject(); - s.writeLong(sum()); - } - - private void readObject(ObjectInputStream s) - throws IOException, ClassNotFoundException { - s.defaultReadObject(); - busy = 0; - cells = null; - base = s.readLong(); - } - -} diff --git a/src/main/java/org/hbase/async/jsr166e/README b/src/main/java/org/hbase/async/jsr166e/README deleted file mode 100644 index c1449fb..0000000 --- a/src/main/java/org/hbase/async/jsr166e/README +++ /dev/null @@ -1,14 +0,0 @@ -The contents of this directory contains code from JSR 166e. -** THIS IS NOT PART OF THE PUBLIC INTERFACE OF ASYNCHBASE ** - -Code was downloaded from: - http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/ - -The code was released to the public domain, as explained at - http://creativecommons.org/publicdomain/zero/1.0/ - -The code is bundled in asynchbase as it is not available until JDK8 -becomes a reality, which is expected to take years (at time of writing). -Ideally another library, such as Google Guava, would provide this -code until JDK8 becomes the norm. But in the mean time it's here -for asynchbase's internal use. diff --git a/src/main/java/org/hbase/async/jsr166e/Striped64.java b/src/main/java/org/hbase/async/jsr166e/Striped64.java deleted file mode 100644 index e5a6dee..0000000 --- a/src/main/java/org/hbase/async/jsr166e/Striped64.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/publicdomain/zero/1.0/ - */ - -package org.hbase.async.jsr166e; -import java.util.Random; - -/** - * A package-local class holding common representation and mechanics - * for classes supporting dynamic striping on 64bit values. The class - * extends Number so that concrete subclasses must publicly do so. - */ -abstract class Striped64 extends Number { - /* - * This class maintains a lazily-initialized table of atomically - * updated variables, plus an extra "base" field. The table size - * is a power of two. Indexing uses masked per-thread hash codes. - * Nearly all declarations in this class are package-private, - * accessed directly by subclasses. - * - * Table entries are of class Cell; a variant of AtomicLong padded - * to reduce cache contention on most processors. Padding is - * overkill for most Atomics because they are usually irregularly - * scattered in memory and thus don't interfere much with each - * other. But Atomic objects residing in arrays will tend to be - * placed adjacent to each other, and so will most often share - * cache lines (with a huge negative performance impact) without - * this precaution. - * - * In part because Cells are relatively large, we avoid creating - * them until they are needed. When there is no contention, all - * updates are made to the base field. Upon first contention (a - * failed CAS on base update), the table is initialized to size 2. - * The table size is doubled upon further contention until - * reaching the nearest power of two greater than or equal to the - * number of CPUS. Table slots remain empty (null) until they are - * needed. - * - * A single spinlock ("busy") is used for initializing and - * resizing the table, as well as populating slots with new Cells. - * There is no need for a blocking lock: When the lock is not - * available, threads try other slots (or the base). During these - * retries, there is increased contention and reduced locality, - * which is still better than alternatives. - * - * Per-thread hash codes are initialized to random values. - * Contention and/or table collisions are indicated by failed - * CASes when performing an update operation (see method - * retryUpdate). Upon a collision, if the table size is less than - * the capacity, it is doubled in size unless some other thread - * holds the lock. If a hashed slot is empty, and lock is - * available, a new Cell is created. Otherwise, if the slot - * exists, a CAS is tried. Retries proceed by "double hashing", - * using a secondary hash (Marsaglia XorShift) to try to find a - * free slot. - * - * The table size is capped because, when there are more threads - * than CPUs, supposing that each thread were bound to a CPU, - * there would exist a perfect hash function mapping threads to - * slots that eliminates collisions. When we reach capacity, we - * search for this mapping by randomly varying the hash codes of - * colliding threads. Because search is random, and collisions - * only become known via CAS failures, convergence can be slow, - * and because threads are typically not bound to CPUS forever, - * may not occur at all. However, despite these limitations, - * observed contention rates are typically low in these cases. - * - * It is possible for a Cell to become unused when threads that - * once hashed to it terminate, as well as in the case where - * doubling the table causes no thread to hash to it under - * expanded mask. We do not try to detect or remove such cells, - * under the assumption that for long-running instances, observed - * contention levels will recur, so the cells will eventually be - * needed again; and for short-lived ones, it does not matter. - */ - - /** - * Padded variant of AtomicLong supporting only raw accesses plus CAS. - * The value field is placed between pads, hoping that the JVM doesn't - * reorder them. - * - * JVM intrinsics note: It would be possible to use a release-only - * form of CAS here, if it were provided. - */ - static final class Cell { - volatile long p0, p1, p2, p3, p4, p5, p6; - volatile long value; - volatile long q0, q1, q2, q3, q4, q5, q6; - Cell(long x) { value = x; } - - final boolean cas(long cmp, long val) { - return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); - } - - // Unsafe mechanics - private static final sun.misc.Unsafe UNSAFE; - private static final long valueOffset; - static { - try { - UNSAFE = getUnsafe(); - Class ak = Cell.class; - valueOffset = UNSAFE.objectFieldOffset - (ak.getDeclaredField("value")); - } catch (Exception e) { - throw new Error(e); - } - } - - } - - /** - * Holder for the thread-local hash code. The code is initially - * random, but may be set to a different value upon collisions. - */ - static final class HashCode { - static final Random rng = new Random(); - int code; - HashCode() { - int h = rng.nextInt(); // Avoid zero to allow xorShift rehash - code = (h == 0) ? 1 : h; - } - } - - /** - * The corresponding ThreadLocal class - */ - static final class ThreadHashCode extends ThreadLocal { - public HashCode initialValue() { return new HashCode(); } - } - - /** - * Static per-thread hash codes. Shared across all instances to - * reduce ThreadLocal pollution and because adjustments due to - * collisions in one table are likely to be appropriate for - * others. - */ - static final ThreadHashCode threadHashCode = new ThreadHashCode(); - - /** Number of CPUS, to place bound on table size */ - static final int NCPU = Runtime.getRuntime().availableProcessors(); - - /** - * Table of cells. When non-null, size is a power of 2. - */ - transient volatile Cell[] cells; - - /** - * Base value, used mainly when there is no contention, but also as - * a fallback during table initialization races. Updated via CAS. - */ - transient volatile long base; - - /** - * Spinlock (locked via CAS) used when resizing and/or creating Cells. - */ - transient volatile int busy; - - /** - * Package-private default constructor - */ - Striped64() { - } - - /** - * CASes the base field. - */ - final boolean casBase(long cmp, long val) { - return UNSAFE.compareAndSwapLong(this, baseOffset, cmp, val); - } - - /** - * CASes the busy field from 0 to 1 to acquire lock. - */ - final boolean casBusy() { - return UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1); - } - - /** - * Computes the function of current and new value. Subclasses - * should open-code this update function for most uses, but the - * virtualized form is needed within retryUpdate. - * - * @param currentValue the current value (of either base or a cell) - * @param newValue the argument from a user update call - * @return result of the update function - */ - abstract long fn(long currentValue, long newValue); - - /** - * Handles cases of updates involving initialization, resizing, - * creating new Cells, and/or contention. See above for - * explanation. This method suffers the usual non-modularity - * problems of optimistic retry code, relying on rechecked sets of - * reads. - * - * @param x the value - * @param hc the hash code holder - * @param wasUncontended false if CAS failed before call - */ - final void retryUpdate(long x, HashCode hc, boolean wasUncontended) { - int h = hc.code; - boolean collide = false; // True if last slot nonempty - for (;;) { - Cell[] as; Cell a; int n; long v; - if ((as = cells) != null && (n = as.length) > 0) { - if ((a = as[(n - 1) & h]) == null) { - if (busy == 0) { // Try to attach new Cell - Cell r = new Cell(x); // Optimistically create - if (busy == 0 && casBusy()) { - boolean created = false; - try { // Recheck under lock - Cell[] rs; int m, j; - if ((rs = cells) != null && - (m = rs.length) > 0 && - rs[j = (m - 1) & h] == null) { - rs[j] = r; - created = true; - } - } finally { - busy = 0; - } - if (created) - break; - continue; // Slot is now non-empty - } - } - collide = false; - } - else if (!wasUncontended) // CAS already known to fail - wasUncontended = true; // Continue after rehash - else if (a.cas(v = a.value, fn(v, x))) - break; - else if (n >= NCPU || cells != as) - collide = false; // At max size or stale - else if (!collide) - collide = true; - else if (busy == 0 && casBusy()) { - try { - if (cells == as) { // Expand table unless stale - Cell[] rs = new Cell[n << 1]; - for (int i = 0; i < n; ++i) - rs[i] = as[i]; - cells = rs; - } - } finally { - busy = 0; - } - collide = false; - continue; // Retry with expanded table - } - h ^= h << 13; // Rehash - h ^= h >>> 17; - h ^= h << 5; - } - else if (busy == 0 && cells == as && casBusy()) { - boolean init = false; - try { // Initialize table - if (cells == as) { - Cell[] rs = new Cell[2]; - rs[h & 1] = new Cell(x); - cells = rs; - init = true; - } - } finally { - busy = 0; - } - if (init) - break; - } - else if (casBase(v = base, fn(v, x))) - break; // Fall back on using base - } - hc.code = h; // Record index for next time - } - - - /** - * Sets base and all cells to the given value. - */ - final void internalReset(long initialValue) { - Cell[] as = cells; - base = initialValue; - if (as != null) { - int n = as.length; - for (int i = 0; i < n; ++i) { - Cell a = as[i]; - if (a != null) - a.value = initialValue; - } - } - } - - // Unsafe mechanics - private static final sun.misc.Unsafe UNSAFE; - private static final long baseOffset; - private static final long busyOffset; - static { - try { - UNSAFE = getUnsafe(); - Class sk = Striped64.class; - baseOffset = UNSAFE.objectFieldOffset - (sk.getDeclaredField("base")); - busyOffset = UNSAFE.objectFieldOffset - (sk.getDeclaredField("busy")); - } catch (Exception e) { - throw new Error(e); - } - } - - /** - * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. - * Replace with a simple call to Unsafe.getUnsafe when integrating - * into a jdk. - * - * @return a sun.misc.Unsafe - */ - private static sun.misc.Unsafe getUnsafe() { - try { - return sun.misc.Unsafe.getUnsafe(); - } catch (SecurityException se) { - try { - return java.security.AccessController.doPrivileged - (new java.security - .PrivilegedExceptionAction() { - public sun.misc.Unsafe run() throws Exception { - java.lang.reflect.Field f = sun.misc - .Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (sun.misc.Unsafe) f.get(null); - }}); - } catch (java.security.PrivilegedActionException e) { - throw new RuntimeException("Could not initialize intrinsics", - e.getCause()); - } - } - } - -} diff --git a/src/main/java/org/hbase/async/jsr166e/package-info.java b/src/main/java/org/hbase/async/jsr166e/package-info.java deleted file mode 100644 index 4538865..0000000 --- a/src/main/java/org/hbase/async/jsr166e/package-info.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (C) 2010-2012 The Async HBase Authors. All rights reserved. - * This file is part of Async HBase. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - Neither the name of the StumbleUpon nor the names of its contributors - * may be used to endorse or promote products derived from this software - * without specific prior written permission. - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -/** - *

This package is not part of asynchbase's public interface.

- * Use {@link org.hbase.async.Counter} instead. - */ -package org.hbase.async.jsr166e; diff --git a/src/test/java/org/hbase/async/FiltersTest.java b/src/test/java/org/hbase/async/FiltersTest.java index 88f0d0e..4923692 100644 --- a/src/test/java/org/hbase/async/FiltersTest.java +++ b/src/test/java/org/hbase/async/FiltersTest.java @@ -26,6 +26,12 @@ */ package org.hbase.async; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos; import org.junit.Assert; import org.junit.Test; @@ -35,35 +41,154 @@ @RunWith(JUnit4.class) public class FiltersTest { - @Test - public void ensureKeyOnlyFilterIsCorrectlyCreated() { - KeyOnlyFilter keyOnlyFilter = new KeyOnlyFilter(); - org.apache.hadoop.hbase.filter.KeyOnlyFilter filter = (org.apache.hadoop.hbase.filter.KeyOnlyFilter) keyOnlyFilter.getBigtableFilter(); - Assert.assertNotNull(filter); - Assert.assertArrayEquals(filter.toByteArray(), keyOnlyToByteArray(false)); - } - - @Test - public void ensureKeyOnlyFilterIsCorrectlyCreatedWithArgs() { - KeyOnlyFilter keyOnlyFilter = new KeyOnlyFilter(true); - org.apache.hadoop.hbase.filter.KeyOnlyFilter filter = (org.apache.hadoop.hbase.filter.KeyOnlyFilter) keyOnlyFilter.getBigtableFilter(); - Assert.assertNotNull(filter); - Assert.assertArrayEquals(filter.toByteArray(), keyOnlyToByteArray(true)); - } - - @Test - public void ensureColumnPrefixFilterIsCorrectlyCreated() { - final byte[] prefix = Bytes.UTF8("aoeu"); - ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(prefix); - org.apache.hadoop.hbase.filter.ColumnPrefixFilter filter = (org.apache.hadoop.hbase.filter.ColumnPrefixFilter) columnPrefixFilter.getBigtableFilter(); - Assert.assertNotNull(filter); - Assert.assertArrayEquals(filter.getPrefix(), prefix); - } - - private byte[] keyOnlyToByteArray(boolean value) { - FilterProtos.KeyOnlyFilter.Builder builder = org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos.KeyOnlyFilter.newBuilder(); - builder.setLenAsVal(value); - return builder.build().toByteArray(); - } + @Test + public void ensureKeyOnlyFilterIsCorrectlyCreated() { + KeyOnlyFilter keyOnlyFilter = new KeyOnlyFilter(); + org.apache.hadoop.hbase.filter.KeyOnlyFilter filter = (org.apache.hadoop.hbase.filter.KeyOnlyFilter) keyOnlyFilter.getBigtableFilter(); + Assert.assertNotNull(filter); + Assert.assertArrayEquals(filter.toByteArray(), keyOnlyToByteArray(false)); + } + @Test + public void ensureKeyOnlyFilterIsCorrectlyCreatedWithArgs() { + KeyOnlyFilter keyOnlyFilter = new KeyOnlyFilter(true); + org.apache.hadoop.hbase.filter.KeyOnlyFilter filter = (org.apache.hadoop.hbase.filter.KeyOnlyFilter) keyOnlyFilter.getBigtableFilter(); + Assert.assertNotNull(filter); + Assert.assertArrayEquals(filter.toByteArray(), keyOnlyToByteArray(true)); + } + + @Test + public void ensureColumnPrefixFilterIsCorrectlyCreated() { + final byte[] prefix = Bytes.UTF8("aoeu"); + ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(prefix); + org.apache.hadoop.hbase.filter.ColumnPrefixFilter filter = (org.apache.hadoop.hbase.filter.ColumnPrefixFilter) columnPrefixFilter.getBigtableFilter(); + Assert.assertNotNull(filter); + Assert.assertArrayEquals(filter.getPrefix(), prefix); + } + + // ---------------------------------------------------------------------- + // RE2/Bigtable Sanitization Tests (Verifying NEW Sanitized Behavior) + // ---------------------------------------------------------------------- + + @Test + public void ensureKeyRegexpFilterSanitization() throws Exception { + // 1. Basic Structural Regex (No Quotes) + // Should pass ASCII through as-is. + byte[] regex = Bytes.UTF8("^sys\\.cpu"); + KeyRegexpFilter keyRegex = new KeyRegexpFilter(regex); + + RowFilter rowFilter = (RowFilter) keyRegex.getBigtableFilter(); + String pattern = extractRegexPattern(rowFilter); + + // Expect: ^sys\.cpu + Assert.assertEquals("^sys\\.cpu", pattern); + } + + @Test + public void ensureKeyRegexpFilterStripAndEscape() throws Exception { + // 2. The OpenTSDB "Quoting" Conflict Scenario + // Input: \Q\E + // Bytes: [92, 81, 0, 1, 92, 69] + // NEW BEHAVIOR: Strip quotes, hex escape binaries. + + byte[] quotedBinary = new byte[] { + 92, 81, // \Q + 0, 1, // Binary 0x00, 0x01 + 92, 69 // \E + }; + + KeyRegexpFilter keyRegex = new KeyRegexpFilter(quotedBinary); + RowFilter rowFilter = (RowFilter) keyRegex.getBigtableFilter(); + String pattern = extractRegexPattern(rowFilter); + + // Expect: \x00\x01 + Assert.assertEquals("\\x00\\x01", pattern); + } + + @Test + public void ensureKeyRegexpFilterMixedContent() throws Exception { + // 3. Mixed Structural and Binary + // Input: ^metric\Q<0x00><0xFF>\E$ + // This simulates a real UID query: Start Anchor + Metric Name + Binary ID + End Anchor + // NEW BEHAVIOR: Strip quotes, hex escape binaries. + + byte[] mixed = new byte[14]; + int p = 0; + mixed[p++] = '^'; + mixed[p++] = 'm'; + mixed[p++] = 'e'; + mixed[p++] = 't'; + mixed[p++] = 'r'; + mixed[p++] = 'i'; + mixed[p++] = 'c'; + mixed[p++] = 92; mixed[p++] = 81; // \Q + mixed[p++] = 0; // 0x00 + mixed[p++] = (byte) 0xFF; // 0xFF (High bit char) + mixed[p++] = 92; mixed[p++] = 69; // \E + mixed[p++] = '$'; + + KeyRegexpFilter keyRegex = new KeyRegexpFilter(mixed); + RowFilter rowFilter = (RowFilter) keyRegex.getBigtableFilter(); + String pattern = extractRegexPattern(rowFilter); + + // Expect: ^metric\x00\xff$ + Assert.assertEquals("^metric\\x00\\xff$", pattern); + } + + @Test + public void ensureDoubleEscapingLogic() throws Exception { + // 4. Edge Case: Escaped Quote literal inside a quote? + // Note: The current logic is simple state-stripping. + // If OpenTSDB sends \Q.\E, we want \. (literal dot), NOT \x2e. + // RE2 interprets \x2e as a regex wildcard "." (match any), but \. as a literal dot. + // NEW BEHAVIOR: Metacharacters inside quotes get Backslash Escape. + + byte[] input = new byte[] { + 92, 81, // \Q + 46, // '.' (dot) - normally a regex wildcard + 92, 69 // \E + }; + + KeyRegexpFilter keyRegex = new KeyRegexpFilter(input); + RowFilter rowFilter = (RowFilter) keyRegex.getBigtableFilter(); + String pattern = extractRegexPattern(rowFilter); + + // Expect: \. (Literal Dot) + Assert.assertEquals("\\.", pattern); + } + + // ---------------------------------------------------------------------- + // Helpers + // ---------------------------------------------------------------------- + + private byte[] keyOnlyToByteArray(boolean value) { + FilterProtos.KeyOnlyFilter.Builder builder = org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos.KeyOnlyFilter.newBuilder(); + builder.setLenAsVal(value); + return builder.build().toByteArray(); + } + + /** + * Uses reflection to extract the internal Regex pattern string from the RowFilter. + * Corrected to use ByteArrayComparable.getValue() which works across HBase versions. + */ + private String extractRegexPattern(RowFilter rowFilter) throws Exception { + // 1. RowFilter contains a comparator. In some HBase versions this field is protected, + // so we use reflection to ensure access. + Field comparatorField = CompareFilter.class.getDeclaredField("comparator"); + comparatorField.setAccessible(true); + + // 2. The comparator is an instance of RegexStringComparator. + // RegexStringComparator extends ByteArrayComparable. + // ByteArrayComparable stores the raw bytes of the comparator value (the regex string) + // in a field accessible via the public getValue() method. + ByteArrayComparable comparator = (ByteArrayComparable) comparatorField.get(rowFilter); + + if (comparator == null) { + throw new RuntimeException("Filter comparator was null"); + } + + // 3. Convert the raw bytes back to a String for verification. + // Use ISO-8859-1 to match KeyRegexpFilter's internal behavior and preserve all bytes 1-to-1. + return new String(comparator.getValue(), StandardCharsets.ISO_8859_1); + } }