Skip to content

Commit ba44745

Browse files
committed
Enhanced JetStreamManagement
Added ability to get KV/Management and OS/Management context directly from JetStreamManagement context
1 parent 2333181 commit ba44745

10 files changed

+91
-21
lines changed

src/main/java/io/nats/client/JetStreamManagement.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,4 +375,33 @@ public interface JetStreamManagement {
375375
* @return a JetStream instance.
376376
*/
377377
JetStream jetStream();
378-
}
378+
379+
/**
380+
* Gets a context for working with a Key Value bucket
381+
* @param bucketName the bucket name
382+
* @return a KeyValue instance.
383+
* @throws IOException various IO exception such as timeout or interruption
384+
*/
385+
KeyValue keyValue(String bucketName) throws IOException;
386+
387+
/**
388+
* Gets a context for managing Key Value buckets
389+
* @return a KeyValueManagement instance.
390+
* @throws IOException various IO exception such as timeout or interruption
391+
*/
392+
KeyValueManagement keyValueManagement() throws IOException;
393+
394+
/**
395+
* Gets a context for working with an Object Store.
396+
* @param bucketName the bucket name
397+
* @return an ObjectStore instance.
398+
* @throws IOException various IO exception such as timeout or interruption
399+
*/
400+
ObjectStore objectStore(String bucketName) throws IOException;
401+
402+
/**
403+
* Gets a context for managing Object Stores
404+
* @return an ObjectStoreManagement instance.
405+
* @throws IOException various IO exception such as timeout or interruption
406+
*/
407+
ObjectStoreManagement objectStoreManagement() throws IOException;}

src/main/java/io/nats/client/Options.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -543,7 +543,8 @@ public class Options {
543543
* Property used to set class name for the Callback Executor Service (executor) class
544544
* {@link Builder#callbackExecutor(ExecutorService) callbackExecutor}.
545545
*/
546-
public static final String PROP_CALLBACK_EXECUTOR_SERVICE_CLASS = "callback.executor.service.class"; /**
546+
public static final String PROP_CALLBACK_EXECUTOR_SERVICE_CLASS = "callback.executor.service.class";
547+
/**
547548
* Property used to set class name for the Connect Thread Factory
548549
* {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory}.
549550
*/

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2481,7 +2481,7 @@ public KeyValue keyValue(@NonNull String bucketName) throws IOException {
24812481
public KeyValue keyValue(@NonNull String bucketName, @Nullable KeyValueOptions options) throws IOException {
24822482
Validator.validateBucketName(bucketName, true);
24832483
ensureNotClosing();
2484-
return new NatsKeyValue(this, bucketName, options);
2484+
return new NatsKeyValue(bucketName, this, options, null);
24852485
}
24862486

24872487
/**
@@ -2500,7 +2500,7 @@ public KeyValueManagement keyValueManagement() throws IOException {
25002500
@NonNull
25012501
public KeyValueManagement keyValueManagement(@Nullable KeyValueOptions options) throws IOException {
25022502
ensureNotClosing();
2503-
return new NatsKeyValueManagement(this, options);
2503+
return new NatsKeyValueManagement(this, options, null);
25042504
}
25052505

25062506
/**
@@ -2520,7 +2520,7 @@ public ObjectStore objectStore(@NonNull String bucketName) throws IOException {
25202520
public ObjectStore objectStore(@NonNull String bucketName, @Nullable ObjectStoreOptions options) throws IOException {
25212521
Validator.validateBucketName(bucketName, true);
25222522
ensureNotClosing();
2523-
return new NatsObjectStore(this, bucketName, options);
2523+
return new NatsObjectStore(bucketName, this, options, null);
25242524
}
25252525

25262526
/**
@@ -2530,7 +2530,7 @@ public ObjectStore objectStore(@NonNull String bucketName, @Nullable ObjectStore
25302530
@NonNull
25312531
public ObjectStoreManagement objectStoreManagement() throws IOException {
25322532
ensureNotClosing();
2533-
return new NatsObjectStoreManagement(this, null);
2533+
return new NatsObjectStoreManagement(this, null, null);
25342534
}
25352535

25362536
/**
@@ -2540,7 +2540,7 @@ public ObjectStoreManagement objectStoreManagement() throws IOException {
25402540
@NonNull
25412541
public ObjectStoreManagement objectStoreManagement(@Nullable ObjectStoreOptions options) throws IOException {
25422542
ensureNotClosing();
2543-
return new NatsObjectStoreManagement(this, options);
2543+
return new NatsObjectStoreManagement(this, options, null);
25442544
}
25452545

25462546
private void ensureNotClosing() throws IOException {

src/main/java/io/nats/client/impl/NatsConnectionReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,9 @@ void encounteredProtocolError(Exception ex) throws IOException {
606606

607607
//For testing
608608
void fakeReadForTest(byte[] bytes) {
609+
for (int x = 0; x < this.buffer.length; x++) {
610+
this.buffer[x] = 0;
611+
}
609612
System.arraycopy(bytes, 0, this.buffer, 0, bytes.length);
610613
this.bufferPosition = 0;
611614
this.op = UNKNOWN_OP;

src/main/java/io/nats/client/impl/NatsFeatureBase.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,18 @@ public class NatsFeatureBase {
3232
protected final NatsJetStreamManagement jsm;
3333
protected String streamName;
3434

35-
NatsFeatureBase(NatsConnection connection, FeatureOptions fo) throws IOException {
36-
if (fo == null) {
35+
NatsFeatureBase(NatsConnection connection, FeatureOptions fo, NatsJetStreamManagement jsm) throws IOException {
36+
if (jsm != null) {
37+
this.jsm = jsm;
38+
js = (NatsJetStream)jsm.jetStream();
39+
}
40+
else if (fo == null) {
3741
js = new NatsJetStream(connection, null);
38-
jsm = new NatsJetStreamManagement(connection, null);
42+
this.jsm = new NatsJetStreamManagement(connection, null);
3943
}
4044
else {
4145
js = new NatsJetStream(connection, fo.getJetStreamOptions());
42-
jsm = new NatsJetStreamManagement(connection, fo.getJetStreamOptions());
46+
this.jsm = new NatsJetStreamManagement(connection, fo.getJetStreamOptions());
4347
}
4448
}
4549

src/main/java/io/nats/client/impl/NatsJetStreamManagement.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.nats.client.*;
1717
import io.nats.client.api.*;
1818
import io.nats.client.api.Error;
19+
import io.nats.client.support.Validator;
1920

2021
import java.io.IOException;
2122
import java.nio.charset.StandardCharsets;
@@ -386,4 +387,26 @@ public JetStream jetStream() {
386387
}
387388
return js;
388389
}
390+
391+
@Override
392+
public KeyValue keyValue(String bucketName) throws IOException {
393+
Validator.validateBucketName(bucketName, true);
394+
return new NatsKeyValue(bucketName, null, null, this);
395+
}
396+
397+
@Override
398+
public KeyValueManagement keyValueManagement() throws IOException {
399+
return new NatsKeyValueManagement(null, null, this);
400+
}
401+
402+
@Override
403+
public ObjectStore objectStore(String bucketName) throws IOException {
404+
Validator.validateBucketName(bucketName, true);
405+
return new NatsObjectStore(bucketName, null, null, this);
406+
}
407+
408+
@Override
409+
public ObjectStoreManagement objectStoreManagement() throws IOException {
410+
return new NatsObjectStoreManagement(null, null, this);
411+
}
389412
}

src/main/java/io/nats/client/impl/NatsKeyValue.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ public class NatsKeyValue extends NatsFeatureBase implements KeyValue {
4040
private final String readPrefix;
4141
private final String writePrefix;
4242

43-
NatsKeyValue(NatsConnection connection, String bucketName, KeyValueOptions kvo) throws IOException {
44-
super(connection, kvo);
43+
NatsKeyValue(String bucketName, NatsConnection connection, KeyValueOptions kvo, NatsJetStreamManagement jsm) throws IOException {
44+
super(connection, kvo, jsm);
4545
this.bucketName = Validator.validateBucketName(bucketName, true);
4646
streamName = toStreamName(bucketName);
4747
StreamInfo si;
4848
try {
49-
si = jsm.getStreamInfo(streamName);
49+
si = this.jsm.getStreamInfo(streamName);
5050
} catch (JetStreamApiException e) {
5151
// can't throw directly, that would be a breaking change
5252
throw new IOException(e);

src/main/java/io/nats/client/impl/NatsKeyValueManagement.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,14 @@ public class NatsKeyValueManagement implements KeyValueManagement {
3131
private final NatsJetStreamManagement jsm;
3232
private final boolean serverOlderThan272;
3333

34-
NatsKeyValueManagement(NatsConnection connection, KeyValueOptions kvo) throws IOException {
35-
jsm = new NatsJetStreamManagement(connection, kvo == null ? null : kvo.getJetStreamOptions());
36-
serverOlderThan272 = jsm.conn.getServerInfo().isOlderThanVersion("2.7.2");
34+
NatsKeyValueManagement(NatsConnection connection, KeyValueOptions kvo, NatsJetStreamManagement jsm) throws IOException {
35+
if (jsm == null) {
36+
this.jsm = new NatsJetStreamManagement(connection, kvo == null ? null : kvo.getJetStreamOptions());
37+
}
38+
else {
39+
this.jsm = jsm;
40+
}
41+
serverOlderThan272 = this.jsm.conn.getServerInfo().isOlderThanVersion("2.7.2");
3742
}
3843

3944
/**

src/main/java/io/nats/client/impl/NatsObjectStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ public class NatsObjectStore extends NatsFeatureBase implements ObjectStore {
3737
private final String rawChunkPrefix;
3838
private final String rawMetaPrefix;
3939

40-
NatsObjectStore(NatsConnection connection, String bucketName, ObjectStoreOptions oso) throws IOException {
41-
super(connection, oso);
40+
NatsObjectStore(String bucketName, NatsConnection connection, ObjectStoreOptions oso, NatsJetStreamManagement jsm) throws IOException {
41+
super(connection, oso, jsm);
4242
this.oso = oso;
4343
this.bucketName = Validator.validateBucketName(bucketName, true);
4444
streamName = toStreamName(bucketName);

src/main/java/io/nats/client/impl/NatsObjectStoreManagement.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,13 @@
3030
public class NatsObjectStoreManagement implements ObjectStoreManagement {
3131
private final NatsJetStreamManagement jsm;
3232

33-
NatsObjectStoreManagement(NatsConnection connection, ObjectStoreOptions oso) throws IOException {
34-
jsm = new NatsJetStreamManagement(connection, oso == null ? null : oso.getJetStreamOptions());
33+
NatsObjectStoreManagement(NatsConnection connection, ObjectStoreOptions oso, NatsJetStreamManagement jsm) throws IOException {
34+
if (jsm == null) {
35+
this.jsm = new NatsJetStreamManagement(connection, oso == null ? null : oso.getJetStreamOptions());
36+
}
37+
else {
38+
this.jsm = jsm;
39+
}
3540
}
3641

3742
/**

0 commit comments

Comments
 (0)