Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 52 additions & 14 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<groupId>com.pythian.opentsdb</groupId>
<artifactId>asyncbigtable</artifactId>
<version>0.4.3</version>
<version>0.4.4</version>
<packaging>jar</packaging>

<name>Async Bigtable library</name>
Expand Down Expand Up @@ -33,6 +33,25 @@
<url>https://github.com/OpenTSDB/asyncbigtable/issues</url>
</issueManagement>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>1.34.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>26.27.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<developers>
<developer>
<id>tsuna</id>
Expand Down Expand Up @@ -230,6 +249,11 @@
</build>

<dependencies>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>com.stumbleupon</groupId>
<artifactId>async</artifactId>
Expand All @@ -242,28 +266,42 @@
<version>1.7.7</version>
</dependency>

<!-- OpenTSDB depends on guava 18.0 -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
<version>27.0-jre</version>
</dependency>

<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-2.x-hadoop</artifactId>
<version>1.23.0</version>
</dependency>

<!-- runtime dependencies -->
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-2.x-hadoop</artifactId>
<version>2.15.5</version>
<exclusions>
<exclusion>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-client-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>


<!-- runtime dependencies -->

<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/hbase/async/AppendRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public AppendRequest(final byte[] table,
* @param table The table to edit.
* @param kv The {@link KeyValue} to store.
*/
@SuppressWarnings("deprecation")
public AppendRequest(final byte[] table,
final KeyValue kv) {
super(table, kv.key(), kv.family(), kv.timestamp(), RowLock.NO_LOCK);
Expand All @@ -182,6 +183,7 @@ public AppendRequest(final byte[] table,
}

/** Private constructor. */
@SuppressWarnings("deprecation")
private AppendRequest(final byte[] table,
final byte[] key,
final byte[] family,
Expand Down Expand Up @@ -271,4 +273,4 @@ public boolean returnResult() {
return return_result;
}

}
}
1 change: 1 addition & 0 deletions src/main/java/org/hbase/async/CompareFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* operator (equal, greater, not equal, etc) and a filter comparator.
* @since 1.6
*/
@SuppressWarnings("deprecation")
public abstract class CompareFilter extends ScanFilter {

/** Comparison operators. */
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/hbase/async/Counter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
package org.hbase.async;

import org.hbase.async.jsr166e.LongAdder;
import java.util.concurrent.atomic.LongAdder;

/**
* An atomic counter to replace {@link java.util.concurrent.atomic.AtomicLong}.
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/org/hbase/async/DeleteRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public final class DeleteRequest extends BatchableRpc
* @param key The key of the row to edit in that table.
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final byte[] table, final byte[] key) {
this(table, key, null, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK);
}
Expand All @@ -78,6 +79,7 @@ public DeleteRequest(final byte[] table, final byte[] key) {
* @param timestamp The timestamp to set on this edit.
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final byte[] table, final byte[] key,
final long timestamp) {
this(table, key, null, null, timestamp, RowLock.NO_LOCK);
Expand All @@ -91,6 +93,7 @@ public DeleteRequest(final byte[] table, final byte[] key,
* @param family The column family to edit in that table.
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final byte[] table,
final byte[] key,
final byte[] family) {
Expand All @@ -106,6 +109,7 @@ public DeleteRequest(final byte[] table,
* @param timestamp The timestamp to set on this edit.
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final byte[] table,
final byte[] key,
final byte[] family,
Expand All @@ -123,6 +127,7 @@ public DeleteRequest(final byte[] table,
* Can be {@code null} since version 1.1.
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final byte[] table,
final byte[] key,
final byte[] family,
Expand All @@ -143,6 +148,7 @@ public DeleteRequest(final byte[] table,
* @param timestamp The timestamp to set on this edit.
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final byte[] table,
final byte[] key,
final byte[] family,
Expand All @@ -162,6 +168,7 @@ public DeleteRequest(final byte[] table,
* @param qualifiers The column qualifiers to delete in that family.
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final byte[] table,
final byte[] key,
final byte[] family,
Expand All @@ -180,6 +187,7 @@ public DeleteRequest(final byte[] table,
* @param timestamp The timestamp to set on this edit.
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final byte[] table,
final byte[] key,
final byte[] family,
Expand All @@ -199,6 +207,7 @@ public DeleteRequest(final byte[] table,
* @deprecated
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final byte[] table,
final byte[] key,
final byte[] family,
Expand All @@ -221,6 +230,7 @@ public DeleteRequest(final byte[] table,
* @deprecated
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final byte[] table,
final byte[] key,
final byte[] family,
Expand All @@ -244,6 +254,7 @@ public DeleteRequest(final byte[] table,
* @deprecated
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final byte[] table,
final byte[] key,
final byte[] family,
Expand All @@ -265,6 +276,7 @@ public DeleteRequest(final byte[] table,
* @deprecated
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final byte[] table,
final byte[] key,
final byte[] family,
Expand All @@ -280,6 +292,7 @@ public DeleteRequest(final byte[] table,
* @param key The key of the row to edit in that table.
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final String table, final String key) {
this(table.getBytes(), key.getBytes(), null, null,
KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK);
Expand All @@ -292,6 +305,7 @@ public DeleteRequest(final String table, final String key) {
* @param family The column family to edit in that table.
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final String table,
final String key,
final String family) {
Expand All @@ -308,6 +322,7 @@ public DeleteRequest(final String table,
* Can be {@code null} since version 1.1.
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final String table,
final String key,
final String family,
Expand All @@ -328,6 +343,7 @@ public DeleteRequest(final String table,
* @deprecated
* @throws IllegalArgumentException if any argument is malformed.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final String table,
final String key,
final String family,
Expand All @@ -345,6 +361,7 @@ public DeleteRequest(final String table,
* {@link KeyValue} specifies a timestamp, then this specific timestamp only
* will be deleted.
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final byte[] table, final KeyValue kv) {
this(table, kv.key(), kv.family(), new byte[][] { kv.qualifier() },
kv.timestamp(), RowLock.NO_LOCK);
Expand All @@ -359,6 +376,7 @@ public DeleteRequest(final byte[] table, final KeyValue kv) {
* @param lock Ignored
* @deprecated
*/
@SuppressWarnings("deprecation")
public DeleteRequest(final byte[] table,
final KeyValue kv,
final RowLock lock) {
Expand Down
24 changes: 21 additions & 3 deletions src/main/java/org/hbase/async/HBaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.slf4j.LoggerFactory;

import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.stumbleupon.async.Callback;
Expand Down Expand Up @@ -99,8 +100,7 @@ public final class HBaseClient {
* TODO(tsuna): Make the tick duration configurable?
*/
private final HashedWheelTimer
timer = new HashedWheelTimer(Threads.newDaemonThreadFactory("Flush-Timer"), 20, MILLISECONDS);

timer = new HashedWheelTimer(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HBaseClient-Timer-%d").build(),20, MILLISECONDS);
/** Up to how many milliseconds can we buffer an edit on the client side. */
private volatile short flush_interval = 1000; // ms

Expand Down Expand Up @@ -287,7 +287,21 @@ public HBaseClient(final Config config, final ExecutorService executor) {
public HBaseClient(final Configuration configuration, final ExecutorService executor) {
this.executor = executor;
num_connections_created.increment();
hbase_config = BigtableConfiguration.asyncConfigure(configuration);

// --- FIX: Extract project/instance IDs and call the correct 'configure' method ---
final String projectId = configuration.get("google.bigtable.project.id");
final String instanceId = configuration.get("google.bigtable.instance.id");

if (projectId == null || projectId.isEmpty()) {
throw new IllegalArgumentException("Missing required configuration: google.bigtable.project.id");
}
if (instanceId == null || instanceId.isEmpty()) {
throw new IllegalArgumentException("Missing required configuration: google.bigtable.instance.id");
}

hbase_config = BigtableConfiguration.configure(configuration, projectId, instanceId);
// --- END FIX ---

LOG.info("BigTable API: Connecting with config: {}", hbase_config);

try {
Expand Down Expand Up @@ -609,6 +623,7 @@ public Deferred<Object> ensureTableFamilyExists(final String table,
* @throws TableNotFoundException (deferred) if the table doesn't exist.
* @throws NoSuchColumnFamilyException (deferred) if the family doesn't exist.
*/
@SuppressWarnings("unchecked")
public Deferred<Object> ensureTableFamilyExists(final byte[] table, final byte[] family) {
if (LOG.isDebugEnabled()) {
LOG.debug("BigTable API: Checking if table [{}] and family [{}] exist", Bytes.pretty(table),
Expand Down Expand Up @@ -1177,6 +1192,7 @@ public Deferred<Object> put(final PutRequest request) {
* (think of it as {@code Deferred<Void>}). But you probably want to attach
* at least an errback to this {@code Deferred} to handle failures.
*/
@SuppressWarnings("deprecation")
public Deferred<Object> append(final AppendRequest request) {
num_appends.increment();

Expand Down Expand Up @@ -1217,6 +1233,7 @@ public Deferred<Object> append(final AppendRequest request) {
* the CAS failed because the value in BigTable didn't match the expected value
* of the CAS request.
*/
@SuppressWarnings("deprecation")
public Deferred<Boolean> compareAndSet(final PutRequest edit,
final byte[] expected) {

Expand Down Expand Up @@ -1271,6 +1288,7 @@ public Deferred<Boolean> compareAndSet(final PutRequest edit,
* inserted in BigTable, {@code false} if there was already a value in the
* given cell.
*/
@SuppressWarnings("deprecation")
public Deferred<Boolean> atomicCreate(final PutRequest edit) {
return compareAndSet(edit, EMPTY_ARRAY);
}
Expand Down
Loading