Skip to content

Commit d2ab550

Browse files
authored
Merge pull request #681 from openziti/accept-v2
Accept v2
2 parents b615243 + c365230 commit d2ab550

File tree

8 files changed

+43
-26
lines changed

8 files changed

+43
-26
lines changed

.github/workflows/draft-release.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name: Release Drafter
22
permissions:
3-
contents: read
3+
contents: write
44

55
on:
66
push:

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ download = "5.6.0"
1111

1212
# OpenZiti Edge API
1313
ziti-api = "0.26.42"
14-
ziti-cli = "1.3.3"
14+
ziti-cli = "1.5.4"
1515

1616
# third party
1717
lazysodium-java = "5.1.4"

ziti/src/main/kotlin/org/openziti/impl/ChannelImpl.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ internal class ChannelImpl(val addr: String, val sslContext: SSLContext, val api
5252
private val synchers = ConcurrentHashMap<Int, CompletableDeferred<Unit>>()
5353

5454
private val recMutex = Mutex()
55-
private val receivers = mutableMapOf<Int, Channel.MessageReceiver>()
55+
private val receivers = mutableMapOf<UInt, Channel.MessageReceiver>()
5656
private val chState = MutableStateFlow<Channel.State>(Channel.State.Initial)
5757
private val reconnectSignal = kotlinx.coroutines.channels.Channel<Unit>()
5858

@@ -73,13 +73,14 @@ internal class ChannelImpl(val addr: String, val sslContext: SSLContext, val api
7373
get() = chState.value
7474

7575

76-
override fun registerReceiver(id: Int, rec: Channel.MessageReceiver) = runBlocking{
76+
override fun registerReceiver(id: UInt, rec: Channel.MessageReceiver) = runBlocking{
7777
recMutex.withLock {
7878
receivers[id] = rec
7979
}
8080
}
8181

82-
override fun deregisterReceiver(id: Int): Unit = runBlocking {
82+
83+
override fun deregisterReceiver(id: UInt): Unit = runBlocking {
8384
recMutex.withLock { receivers.remove(id) }
8485
}
8586

@@ -243,7 +244,7 @@ internal class ChannelImpl(val addr: String, val sslContext: SSLContext, val api
243244
if (waiter != null) {
244245
waiter.complete(m)
245246
} else {
246-
val recId = m.getIntHeader(ZitiProtocol.Header.ConnId)
247+
val recId = m.getIntHeader(ZitiProtocol.Header.ConnId)?.toUInt()
247248
recId?.let {
248249
val receiver = recMutex.withLock { receivers[it] }
249250

ziti/src/main/kotlin/org/openziti/impl/ZitiContextImpl.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ internal class ZitiContextImpl(internal val id: Identity, enabled: Boolean) : Zi
102102

103103
private val connCounter = AtomicInteger(0)
104104

105-
private val connections = sortedMapOf<Int, ZitiConnection>()
105+
private val connections = sortedMapOf<UInt, ZitiConnection>()
106106

107107
init {
108108
this._enabled = enabled
@@ -527,7 +527,7 @@ internal class ZitiContextImpl(internal val id: Identity, enabled: Boolean) : Zi
527527
}.getOrElse { throw TimeoutException("failed to get service[$host:$port] in ${timeout}ms") }
528528
}
529529

530-
internal fun nextConnId() = connCounter.incrementAndGet()
530+
internal fun nextConnId() = connCounter.incrementAndGet().toUInt()
531531

532532
internal val channels = ConcurrentHashMap<String, Channel>()
533533

ziti/src/main/kotlin/org/openziti/net/Channel.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ internal interface Channel: Closeable {
4545
val name: String
4646
val state: State
4747

48-
fun deregisterReceiver(id: Int)
49-
fun registerReceiver(id: Int, rec: MessageReceiver)
48+
fun deregisterReceiver(id: UInt)
49+
fun registerReceiver(id: UInt, rec: MessageReceiver)
5050

5151
suspend fun Send(msg: Message)
5252
suspend fun SendSynch(msg: Message)

ziti/src/main/kotlin/org/openziti/net/Message.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ class Message(
118118
headers.put(header.id, v.toByteArray())
119119
}
120120

121+
fun setHeader(header: ZitiProtocol.Header, v: UInt): Message = this.setHeader(header, v.toInt())
122+
121123
fun setHeader(header: ZitiProtocol.Header, v: Int): Message = this.apply {
122124
val b = ByteArray(4)
123125
ByteBuffer.wrap(b).order(ByteOrder.LITTLE_ENDIAN).putInt(v)
@@ -128,6 +130,11 @@ class Message(
128130
headers.put(header.id, b)
129131
}
130132

133+
fun setHeader(header: ZitiProtocol.Header, b: Boolean) = this.apply {
134+
val v: Byte = if (b) 1 else 0
135+
headers.put(header.id, byteArrayOf(v))
136+
}
137+
131138
fun setHeader(headerId: Int, v: Boolean) = this.apply {
132139
val b: Byte = if (v) 1 else 0
133140
headers.put(headerId, byteArrayOf(b))

ziti/src/main/kotlin/org/openziti/net/ZitiServerSocketChannel.kt

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ internal class ZitiServerSocketChannel(val ctx: ZitiContextImpl):
5858
private val listenerId = ByteArray(32).apply {
5959
SecureRandom().nextBytes(this)
6060
}
61-
val connId: Int = ctx.nextConnId()
61+
val connId: UInt = ctx.nextConnId()
6262
var state: State = State.initial
6363
lateinit var incoming: Chan<Message>
6464
lateinit var token: String
@@ -135,6 +135,7 @@ internal class ZitiServerSocketChannel(val ctx: ZitiContextImpl):
135135
setHeader(Header.ConnId, connId)
136136
setHeader(Header.SeqHeader, 0)
137137
setHeader(Header.ListenerId, listenerId)
138+
setHeader(Header.RouterProvidedConnId, true)
138139

139140
val bindId = localAddr.identity ?: (if (localAddr.useEdgeId) ctx.getId()?.name else null)
140141

@@ -192,8 +193,13 @@ internal class ZitiServerSocketChannel(val ctx: ZitiContextImpl):
192193

193194
val child = ZitiSocketChannel(ctx)
194195
d{"accepting child conn[${child.connId}] on parent[$connId]"}
196+
req.getIntHeader(Header.RouterProvidedConnId)?.toUInt()?.let {
197+
d{"setting child[${child.connId}].rtConnId = $it (router provided)"}
198+
child.rtConnId = it
199+
}
200+
195201
val connIdBuf = ByteArray(4)
196-
ByteBuffer.wrap(connIdBuf).order(ByteOrder.LITTLE_ENDIAN).putInt(child.connId)
202+
ByteBuffer.wrap(connIdBuf).order(ByteOrder.LITTLE_ENDIAN).putInt(child.connId.toInt())
197203
val dialSuccess = Message(ZitiProtocol.ContentType.DialSuccess, connIdBuf)
198204
dialSuccess.setHeader(Header.SeqHeader, 0)
199205
dialSuccess.setHeader(Header.ConnId, connId)
@@ -206,21 +212,21 @@ internal class ZitiServerSocketChannel(val ctx: ZitiContextImpl):
206212
child.setupCrypto(sessKeys)
207213
} ?: child.setupCrypto(null)
208214

209-
val startMsg = ch.SendAndWait(dialSuccess)
210-
211-
if (startMsg.content == ZitiProtocol.ContentType.StateConnected) {
215+
runCatching {
216+
ch.SendSynch(dialSuccess)
217+
}.onSuccess {
212218
child.state.set(ZitiSocketChannel.State.connected)
213-
ch.registerReceiver(child.connId, child)
219+
ch.registerReceiver(child.rtConnId, child)
214220
child.channel.complete(ch)
215221
child.startCrypto(ch)
216222
child.local = localAddr
217223
child.remote = ZitiAddress.Session("$connId", localAddr.service,
218224
req.getStringHeader(Header.CallerIdHeader), req.getHeader(Header.AppDataHeader))
219225

220226
handler.completed(child, att)
221-
} else {
222-
val err = Charsets.UTF_8.decode(ByteBuffer.wrap(startMsg.body)).toString()
223-
handler.failed(IOException(err), att)
227+
}.onFailure { t ->
228+
val err = t.message
229+
handler.failed(IOException(err, t), att)
224230
}
225231
} catch (ex: Throwable) {
226232
when (ex) {

ziti/src/main/kotlin/org/openziti/net/ZitiSocketChannel.kt

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ import java.util.concurrent.atomic.AtomicReference
5757
import kotlin.text.Charsets.UTF_8
5858
import kotlinx.coroutines.channels.Channel as Chan
5959

60-
internal class ZitiSocketChannel private constructor(internal val ctx: ZitiContextImpl, val connId: Int):
60+
internal class ZitiSocketChannel private constructor(internal val ctx: ZitiContextImpl, val connId: UInt):
6161
AsynchronousSocketChannel(Provider),
6262
Channel.MessageReceiver,
6363
ZitiConnection,
@@ -104,6 +104,9 @@ internal class ZitiSocketChannel private constructor(internal val ctx: ZitiConte
104104
override val inputSupport = InputChannel.InputSupport(receiveQueue)
105105
val crypto = CompletableDeferred<Crypto.SecretStream?>()
106106

107+
/** router provided connection id */
108+
internal var rtConnId: UInt = connId
109+
107110
override fun getLocalAddress(): SocketAddress? = local
108111

109112
override fun getRemoteAddress(): SocketAddress? = remote
@@ -220,7 +223,7 @@ internal class ZitiSocketChannel private constructor(internal val ctx: ZitiConte
220223
private fun deregister() {
221224
ctx.launch {
222225
channel.runCatching {
223-
await().deregisterReceiver(connId)
226+
await().deregisterReceiver(rtConnId)
224227
}
225228
}
226229
}
@@ -239,7 +242,7 @@ internal class ZitiSocketChannel private constructor(internal val ctx: ZitiConte
239242
override fun shutdownOutput(): AsynchronousSocketChannel {
240243
if (state.get() == State.connected && sentFin.compareAndSet(false, true)) {
241244
val finMsg = Message(ZitiProtocol.ContentType.Data).apply {
242-
setHeader(Header.ConnId, connId)
245+
setHeader(Header.ConnId, rtConnId)
243246
setHeader(Header.FlagsHeader, ZitiProtocol.EdgeFlags.FIN)
244247
setHeader(Header.SeqHeader, seq.getAndIncrement())
245248
}
@@ -264,7 +267,7 @@ internal class ZitiSocketChannel private constructor(internal val ctx: ZitiConte
264267
state.set(State.closed)
265268
State.connecting, State.connected -> {
266269
val closeMsg = Message(ZitiProtocol.ContentType.StateClosed).apply {
267-
setHeader(Header.ConnId, connId)
270+
setHeader(Header.ConnId, rtConnId)
268271
}
269272
d("closing conn = ${this.connId}")
270273
ctx.async {
@@ -343,7 +346,7 @@ internal class ZitiSocketChannel private constructor(internal val ctx: ZitiConte
343346
}
344347

345348
val dataMessage = Message(ZitiProtocol.ContentType.Data, data)
346-
dataMessage.setHeader(Header.ConnId, connId)
349+
dataMessage.setHeader(Header.ConnId, rtConnId)
347350
dataMessage.setHeader(Header.SeqHeader, seq.getAndIncrement())
348351
v("sending $dataMessage")
349352
channel.await().Send(dataMessage)
@@ -434,7 +437,7 @@ internal class ZitiSocketChannel private constructor(internal val ctx: ZitiConte
434437

435438
internal suspend fun doZitiHandshake(ch: Channel, remote: ZitiAddress.Dial, ns: Session, kp: KeyPair?) {
436439
val connectMsg = Message(ZitiProtocol.ContentType.Connect, ns.token.toByteArray(UTF_8)).apply {
437-
setHeader(Header.ConnId, connId)
440+
setHeader(Header.ConnId, rtConnId)
438441
setHeader(Header.SeqHeader, 0)
439442
kp?.let {
440443
setHeader(Header.PublicKeyHeader, it.publicKey.asBytes)
@@ -502,7 +505,7 @@ internal class ZitiSocketChannel private constructor(internal val ctx: ZitiConte
502505
crypto.await()?.let {
503506
val header = it.header()
504507
val headerMessage = Message(ZitiProtocol.ContentType.Data, header)
505-
.setHeader(Header.ConnId, connId)
508+
.setHeader(Header.ConnId, rtConnId)
506509
.setHeader(Header.SeqHeader, seq.getAndIncrement())
507510
ch.Send(headerMessage)
508511
}

0 commit comments

Comments
 (0)