Skip to content
This repository was archived by the owner on Aug 1, 2023. It is now read-only.

Commit 4349b84

Browse files
committed
Support eth65. Add an integration test to connect to a node and send transactions once per second
1 parent e499d71 commit 4349b84

File tree

17 files changed

+487
-15
lines changed

17 files changed

+487
-15
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ if (file('.git').exists()) {
113113
'package-lock.json',
114114
'.github/pull_request_template.md',
115115
'devp2p-eth/src/test/resources/mainnet.json',
116+
'devp2p-eth/src/test/resources/besu-dev.json',
116117
'eth/src/test/resources/mainnet.json',
117118
'eth-client/src/main/resources/mainnet.json',
118119
'eth-client/src/main/resources/default.json',

devp2p-eth/src/integrationTest/kotlin/org/apache/tuweni/devp2p/eth/ConnectToAnotherNodeTest.kt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.tuweni.crypto.SECP256K1
2525
import org.apache.tuweni.eth.genesis.GenesisFile
2626
import org.apache.tuweni.eth.repository.BlockchainIndex
2727
import org.apache.tuweni.eth.repository.BlockchainRepository
28+
import org.apache.tuweni.eth.repository.MemoryTransactionPool
2829
import org.apache.tuweni.junit.BouncyCastleExtension
2930
import org.apache.tuweni.junit.LuceneIndexWriter
3031
import org.apache.tuweni.junit.LuceneIndexWriterExtension
@@ -75,7 +76,8 @@ class ConnectToAnotherNodeTest {
7576
blockchainInfo = SimpleBlockchainInformation(
7677
UInt256.valueOf(genesisFile.chainId.toLong()), genesisBlock.header.difficulty,
7778
genesisBlock.header.hash, UInt256.valueOf(42L), genesisBlock.header.hash, genesisFile.forks
78-
)
79+
),
80+
pendingTransactionsPool = MemoryTransactionPool()
7981
)
8082
),
8183
"Tuweni Experiment 0.1"
@@ -134,7 +136,8 @@ class ConnectToAnotherNodeTest {
134136
genesisBlock.header.hash, UInt256.valueOf(42L),
135137
genesisBlock.header.hash,
136138
emptyList()
137-
)
139+
),
140+
pendingTransactionsPool = MemoryTransactionPool()
138141
)
139142
),
140143
"Tuweni Experiment 0.1"
@@ -166,7 +169,8 @@ class ConnectToAnotherNodeTest {
166169
genesisBlock.header.hash, UInt256.valueOf(42L),
167170
genesisBlock.header.hash,
168171
emptyList()
169-
)
172+
),
173+
pendingTransactionsPool = MemoryTransactionPool()
170174
)
171175
),
172176
"Tuweni Experiment 0.1"
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.tuweni.devp2p.eth
18+
19+
import io.vertx.core.Vertx
20+
import kotlinx.coroutines.async
21+
import kotlinx.coroutines.delay
22+
import kotlinx.coroutines.runBlocking
23+
import org.apache.lucene.index.IndexWriter
24+
import org.apache.tuweni.bytes.Bytes
25+
import org.apache.tuweni.concurrent.coroutines.await
26+
import org.apache.tuweni.crypto.SECP256K1
27+
import org.apache.tuweni.eth.Address
28+
import org.apache.tuweni.eth.Transaction
29+
import org.apache.tuweni.eth.genesis.GenesisFile
30+
import org.apache.tuweni.eth.repository.BlockchainIndex
31+
import org.apache.tuweni.eth.repository.BlockchainRepository
32+
import org.apache.tuweni.eth.repository.MemoryTransactionPool
33+
import org.apache.tuweni.junit.BouncyCastleExtension
34+
import org.apache.tuweni.junit.LuceneIndexWriter
35+
import org.apache.tuweni.junit.LuceneIndexWriterExtension
36+
import org.apache.tuweni.junit.VertxExtension
37+
import org.apache.tuweni.junit.VertxInstance
38+
import org.apache.tuweni.kv.MapKeyValueStore
39+
import org.apache.tuweni.rlpx.vertx.VertxRLPxService
40+
import org.apache.tuweni.units.bigints.UInt256
41+
import org.apache.tuweni.units.ethereum.Gas
42+
import org.apache.tuweni.units.ethereum.Wei
43+
import org.junit.jupiter.api.Disabled
44+
import org.junit.jupiter.api.Test
45+
import org.junit.jupiter.api.extension.ExtendWith
46+
import java.net.InetSocketAddress
47+
48+
/**
49+
* This test sends continuously new transactions to all the peers of the service for 10 minutes.
50+
*
51+
* The test will connect to a live instance located at port 30303 on localhost.
52+
*/
53+
@Disabled
54+
@ExtendWith(LuceneIndexWriterExtension::class, VertxExtension::class, BouncyCastleExtension::class)
55+
class SendPendingTransactionsTest {
56+
57+
private val peerId = "b1c9e33ebfd9446151688f0abaf171dac6df31ea5205a200f2cbaf5f8be" +
58+
"d241c9f93732f25109e16badea1aa657a6078240657688cbbddb91a50aa8c7c34a9cc"
59+
60+
@Test
61+
fun testSendPendingTransactions(@LuceneIndexWriter writer: IndexWriter, @VertxInstance vertx: Vertx) = runBlocking {
62+
val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/besu-dev.json").readAllBytes()
63+
val genesisFile = GenesisFile.read(contents)
64+
val genesisBlock = genesisFile.toBlock()
65+
66+
val repository = BlockchainRepository.init(
67+
MapKeyValueStore(),
68+
MapKeyValueStore(),
69+
MapKeyValueStore(),
70+
MapKeyValueStore(),
71+
MapKeyValueStore(),
72+
MapKeyValueStore(),
73+
BlockchainIndex(writer),
74+
genesisBlock
75+
)
76+
val service = VertxRLPxService(
77+
vertx,
78+
30304,
79+
"127.0.0.1",
80+
30304,
81+
SECP256K1.KeyPair.random(),
82+
listOf(
83+
EthSubprotocol(
84+
repository = repository,
85+
blockchainInfo = SimpleBlockchainInformation(
86+
UInt256.valueOf(genesisFile.chainId.toLong()), genesisBlock.header.difficulty,
87+
genesisBlock.header.hash, UInt256.valueOf(42L), genesisBlock.header.hash, genesisFile.forks
88+
),
89+
pendingTransactionsPool = MemoryTransactionPool()
90+
)
91+
),
92+
"Tuweni Experiment 0.1"
93+
)
94+
service.start().await()
95+
service.connectTo(
96+
SECP256K1.PublicKey.fromHexString(peerId),
97+
InetSocketAddress("127.0.0.1", 30303)
98+
).await()
99+
100+
var loop = true
101+
async {
102+
delay(10 * 60 * 1000)
103+
loop = false
104+
}
105+
val client = service.getClient(EthSubprotocol.ETH65) as EthClient
106+
while (loop) {
107+
val tx = Transaction(
108+
UInt256.valueOf(1),
109+
Wei.valueOf(2),
110+
Gas.valueOf(2),
111+
Address.fromBytes(Bytes.random(20)),
112+
Wei.valueOf(2),
113+
Bytes.random(12),
114+
SECP256K1.KeyPair.random()
115+
)
116+
client.submitPooledTransaction(tx)
117+
delay(1000)
118+
}
119+
}
120+
}

devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthClient.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.tuweni.concurrent.CompletableAsyncCompletion
2323
import org.apache.tuweni.eth.BlockBody
2424
import org.apache.tuweni.eth.BlockHeader
2525
import org.apache.tuweni.eth.Hash
26+
import org.apache.tuweni.eth.Transaction
2627
import org.apache.tuweni.eth.TransactionReceipt
2728
import org.apache.tuweni.rlpx.RLPxService
2829
import org.apache.tuweni.rlpx.wire.SubProtocolClient
@@ -159,4 +160,17 @@ class EthClient(private val service: RLPxService) : EthRequestsManager, SubProto
159160
connection: WireConnection,
160161
transactionReceipts: List<List<TransactionReceipt>>
161162
): Request? = transactionReceiptRequests[connection.uri()]
163+
164+
override fun submitPooledTransaction(vararg tx: Transaction) {
165+
val hashes = tx.map { it.hash }
166+
val conns = service.repository().asIterable(EthSubprotocol.ETH65)
167+
conns.forEach { conn ->
168+
service.send(
169+
EthSubprotocol.ETH65,
170+
MessageType.NewPooledTransactionHashes.code,
171+
conn,
172+
GetBlockBodies(hashes).toBytes()
173+
)
174+
}
175+
}
162176
}

devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthController.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@ import org.apache.tuweni.eth.Hash
2424
import org.apache.tuweni.eth.Transaction
2525
import org.apache.tuweni.eth.TransactionReceipt
2626
import org.apache.tuweni.eth.repository.BlockchainRepository
27+
import org.apache.tuweni.eth.repository.TransactionPool
2728
import org.apache.tuweni.rlpx.wire.WireConnection
2829

2930
/**
3031
* Controller managing the state of the ETH or LES subprotocol handlers.
3132
*/
3233
class EthController(
3334
val repository: BlockchainRepository,
35+
val pendingTransactionsPool: TransactionPool,
3436
val requestsManager: EthRequestsManager,
3537
val connectionsListener: (WireConnection, Status) -> Unit = { _, _ -> }
3638
) {
@@ -178,4 +180,20 @@ class EthController(
178180
repository.storeTransaction(it)
179181
}
180182
}
183+
184+
suspend fun addNewPooledTransactions(transactions: List<Transaction>) {
185+
for (tx in transactions) {
186+
pendingTransactionsPool.add(tx)
187+
}
188+
}
189+
190+
suspend fun findPooledTransactions(hashes: List<Hash>): List<Transaction> {
191+
val result = ArrayList<Transaction>()
192+
for (hash in hashes) {
193+
pendingTransactionsPool.get(hash)?.let {
194+
result.add(it)
195+
}
196+
}
197+
return result
198+
}
181199
}

devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthHandler.kt

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@ import org.apache.tuweni.bytes.Bytes
2222
import org.apache.tuweni.concurrent.AsyncCompletion
2323
import org.apache.tuweni.concurrent.CompletableAsyncCompletion
2424
import org.apache.tuweni.concurrent.coroutines.asyncCompletion
25+
import org.apache.tuweni.eth.Hash
2526
import org.apache.tuweni.rlpx.RLPxService
2627
import org.apache.tuweni.rlpx.wire.DisconnectReason
2728
import org.apache.tuweni.rlpx.wire.SubProtocolHandler
2829
import org.apache.tuweni.rlpx.wire.WireConnection
2930
import org.slf4j.LoggerFactory
3031
import java.util.WeakHashMap
32+
import kotlin.collections.ArrayList
33+
import kotlin.collections.set
3134
import kotlin.coroutines.CoroutineContext
3235

3336
internal class EthHandler(
@@ -41,6 +44,8 @@ internal class EthHandler(
4144

4245
companion object {
4346
val logger = LoggerFactory.getLogger(EthHandler::class.java)!!
47+
val MAX_NEW_POOLED_TX_HASHES = 4096
48+
val MAX_POOLED_TX = 256
4449
}
4550

4651
override fun handle(connection: WireConnection, messageType: Int, message: Bytes) = asyncCompletion {
@@ -58,13 +63,35 @@ internal class EthHandler(
5863
MessageType.NodeData.code -> handleNodeData(connection, NodeData.read(message))
5964
MessageType.GetReceipts.code -> handleGetReceipts(connection, GetReceipts.read(message))
6065
MessageType.Receipts.code -> handleReceipts(connection, Receipts.read(message))
66+
MessageType.NewPooledTransactionHashes.code -> handleNewPooledTransactionHashes(
67+
connection,
68+
NewPooledTransactionHashes.read(message)
69+
)
70+
MessageType.GetPooledTransactions.code -> handleGetPooledTransactions(
71+
connection,
72+
GetPooledTransactions.read(message)
73+
)
74+
MessageType.PooledTransactions.code -> handlePooledTransactions(PooledTransactions.read(message))
6175
else -> {
6276
logger.warn("Unknown message type {}", messageType)
6377
service.disconnect(connection, DisconnectReason.SUBPROTOCOL_REASON)
6478
}
6579
}
6680
}
6781

82+
private suspend fun handlePooledTransactions(read: PooledTransactions) {
83+
controller.addNewPooledTransactions(read.transactions)
84+
}
85+
86+
private suspend fun handleGetPooledTransactions(connection: WireConnection, read: GetPooledTransactions) {
87+
service.send(
88+
EthSubprotocol.ETH65,
89+
MessageType.PooledTransactions.code,
90+
connection,
91+
PooledTransactions(controller.findPooledTransactions(read.hashes)).toBytes()
92+
)
93+
}
94+
6895
private suspend fun handleTransactions(transactions: Transactions) {
6996
controller.addNewTransactions(transactions.transactions)
7097
}
@@ -88,6 +115,41 @@ internal class EthHandler(
88115
}
89116
}
90117

118+
private suspend fun handleNewPooledTransactionHashes(
119+
connection: WireConnection,
120+
newPooledTransactionHashes: NewPooledTransactionHashes
121+
) {
122+
if (newPooledTransactionHashes.hashes.size > MAX_NEW_POOLED_TX_HASHES) {
123+
service.disconnect(connection, DisconnectReason.SUBPROTOCOL_REASON)
124+
return
125+
}
126+
var missingTx = ArrayList<Hash>()
127+
var message = GetPooledTransactions(missingTx)
128+
for (hash in newPooledTransactionHashes.hashes) {
129+
if (!controller.pendingTransactionsPool.contains(hash)) {
130+
missingTx.add(hash)
131+
}
132+
if (missingTx.size == MAX_POOLED_TX) {
133+
service.send(
134+
EthSubprotocol.ETH65,
135+
MessageType.GetPooledTransactions.code,
136+
connection,
137+
message.toBytes()
138+
)
139+
missingTx = ArrayList()
140+
message = GetPooledTransactions(missingTx)
141+
}
142+
}
143+
if (!missingTx.isEmpty()) {
144+
service.send(
145+
EthSubprotocol.ETH65,
146+
MessageType.GetPooledTransactions.code,
147+
connection,
148+
message.toBytes()
149+
)
150+
}
151+
}
152+
91153
private suspend fun handleReceipts(connection: WireConnection, receipts: Receipts) {
92154
controller.addNewTransactionReceipts(connection, receipts.transactionReceipts)
93155
}

devp2p-eth/src/main/kotlin/org/apache/tuweni/devp2p/eth/EthRequestsManager.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.tuweni.concurrent.CompletableAsyncCompletion
2222
import org.apache.tuweni.eth.BlockBody
2323
import org.apache.tuweni.eth.BlockHeader
2424
import org.apache.tuweni.eth.Hash
25+
import org.apache.tuweni.eth.Transaction
2526
import org.apache.tuweni.eth.TransactionReceipt
2627
import org.apache.tuweni.rlpx.wire.WireConnection
2728

@@ -108,6 +109,12 @@ interface EthRequestsManager {
108109
connection: WireConnection,
109110
transactionReceipts: List<List<TransactionReceipt>>
110111
): Request?
112+
113+
/**
114+
* Submits a new pending transaction to the transaction pool to be gossiped to peers.
115+
* @param tx a new transaction
116+
*/
117+
fun submitPooledTransaction(vararg tx: Transaction)
111118
}
112119

113120
/**

0 commit comments

Comments
 (0)