Skip to content
This repository was archived by the owner on Aug 1, 2023. It is now read-only.

eth65 #152

Merged
merged 3 commits into from
Oct 10, 2020
Merged

eth65 #152

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,14 @@ if (file('.git').exists()) {
'package-lock.json',
'.github/pull_request_template.md',
'devp2p-eth/src/test/resources/mainnet.json',
'devp2p-eth/src/test/resources/besu-dev.json',
'eth/src/test/resources/mainnet.json',
'eth-client/src/main/resources/mainnet.json',
'eth-client/src/main/resources/default.json',
'eth/src/test/resources/missing-difficulty.json',
'eth/src/test/resources/missing-nonce.json',
'eth/src/test/resources/valid-genesis.json',
'eth/src/test/resources/besu-dev.json',
'eth-client-app/src/main/resources/tuweni.txt',
'evm/src/test/resources/VMTests/**'
])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.eth.genesis.GenesisFile
import org.apache.tuweni.eth.repository.BlockchainIndex
import org.apache.tuweni.eth.repository.BlockchainRepository
import org.apache.tuweni.eth.repository.MemoryTransactionPool
import org.apache.tuweni.junit.BouncyCastleExtension
import org.apache.tuweni.junit.LuceneIndexWriter
import org.apache.tuweni.junit.LuceneIndexWriterExtension
Expand Down Expand Up @@ -75,7 +76,8 @@ class ConnectToAnotherNodeTest {
blockchainInfo = SimpleBlockchainInformation(
UInt256.valueOf(genesisFile.chainId.toLong()), genesisBlock.header.difficulty,
genesisBlock.header.hash, UInt256.valueOf(42L), genesisBlock.header.hash, genesisFile.forks
)
),
pendingTransactionsPool = MemoryTransactionPool()
)
),
"Tuweni Experiment 0.1"
Expand Down Expand Up @@ -134,7 +136,8 @@ class ConnectToAnotherNodeTest {
genesisBlock.header.hash, UInt256.valueOf(42L),
genesisBlock.header.hash,
emptyList()
)
),
pendingTransactionsPool = MemoryTransactionPool()
)
),
"Tuweni Experiment 0.1"
Expand Down Expand Up @@ -166,7 +169,8 @@ class ConnectToAnotherNodeTest {
genesisBlock.header.hash, UInt256.valueOf(42L),
genesisBlock.header.hash,
emptyList()
)
),
pendingTransactionsPool = MemoryTransactionPool()
)
),
"Tuweni Experiment 0.1"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tuweni.devp2p.eth

import io.vertx.core.Vertx
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.apache.lucene.index.IndexWriter
import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.concurrent.coroutines.await
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.eth.Address
import org.apache.tuweni.eth.Transaction
import org.apache.tuweni.eth.genesis.GenesisFile
import org.apache.tuweni.eth.repository.BlockchainIndex
import org.apache.tuweni.eth.repository.BlockchainRepository
import org.apache.tuweni.eth.repository.MemoryTransactionPool
import org.apache.tuweni.junit.BouncyCastleExtension
import org.apache.tuweni.junit.LuceneIndexWriter
import org.apache.tuweni.junit.LuceneIndexWriterExtension
import org.apache.tuweni.junit.VertxExtension
import org.apache.tuweni.junit.VertxInstance
import org.apache.tuweni.kv.MapKeyValueStore
import org.apache.tuweni.rlpx.vertx.VertxRLPxService
import org.apache.tuweni.units.bigints.UInt256
import org.apache.tuweni.units.ethereum.Gas
import org.apache.tuweni.units.ethereum.Wei
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import java.net.InetSocketAddress

/**
* This test sends continuously new transactions to all the peers of the service for 10 minutes.
*
* The test will connect to a live instance located at port 30303 on localhost.
*/
@Disabled
@ExtendWith(LuceneIndexWriterExtension::class, VertxExtension::class, BouncyCastleExtension::class)
class SendPendingTransactionsTest {

private val peerId = "b1c9e33ebfd9446151688f0abaf171dac6df31ea5205a200f2cbaf5f8be" +
"d241c9f93732f25109e16badea1aa657a6078240657688cbbddb91a50aa8c7c34a9cc"

@Test
fun testSendPendingTransactions(@LuceneIndexWriter writer: IndexWriter, @VertxInstance vertx: Vertx) = runBlocking {
val contents = ConnectToAnotherNodeTest::class.java.getResourceAsStream("/besu-dev.json").readAllBytes()
val genesisFile = GenesisFile.read(contents)
val genesisBlock = genesisFile.toBlock()

val repository = BlockchainRepository.init(
MapKeyValueStore(),
MapKeyValueStore(),
MapKeyValueStore(),
MapKeyValueStore(),
MapKeyValueStore(),
MapKeyValueStore(),
BlockchainIndex(writer),
genesisBlock
)
val service = VertxRLPxService(
vertx,
30304,
"127.0.0.1",
30304,
SECP256K1.KeyPair.random(),
listOf(
EthSubprotocol(
repository = repository,
blockchainInfo = SimpleBlockchainInformation(
UInt256.valueOf(genesisFile.chainId.toLong()), genesisBlock.header.difficulty,
genesisBlock.header.hash, UInt256.valueOf(42L), genesisBlock.header.hash, genesisFile.forks
),
pendingTransactionsPool = MemoryTransactionPool()
)
),
"Tuweni Experiment 0.1"
)
service.start().await()
service.connectTo(
SECP256K1.PublicKey.fromHexString(peerId),
InetSocketAddress("127.0.0.1", 30303)
).await()

var loop = true
async {
delay(10 * 60 * 1000)
loop = false
}
val client = service.getClient(EthSubprotocol.ETH65) as EthClient
while (loop) {
val tx = Transaction(
UInt256.valueOf(1),
Wei.valueOf(2),
Gas.valueOf(2),
Address.fromBytes(Bytes.random(20)),
Wei.valueOf(2),
Bytes.random(12),
SECP256K1.KeyPair.random()
)
client.submitPooledTransaction(tx)
delay(100)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import org.apache.tuweni.concurrent.CompletableAsyncCompletion
import org.apache.tuweni.eth.BlockBody
import org.apache.tuweni.eth.BlockHeader
import org.apache.tuweni.eth.Hash
import org.apache.tuweni.eth.Transaction
import org.apache.tuweni.eth.TransactionReceipt
import org.apache.tuweni.eth.repository.TransactionPool
import org.apache.tuweni.rlpx.RLPxService
import org.apache.tuweni.rlpx.wire.SubProtocolClient
import org.apache.tuweni.rlpx.wire.WireConnection
Expand All @@ -32,7 +34,8 @@ import org.apache.tuweni.units.bigints.UInt256
/**
* Client of the ETH subprotocol, allowing to request block and node data
*/
class EthClient(private val service: RLPxService) : EthRequestsManager, SubProtocolClient {
class EthClient(private val service: RLPxService, private val pendingTransactionsPool: TransactionPool) :
EthRequestsManager, SubProtocolClient {

private val headerRequests = HashMap<Bytes32, Request>()
private val bodiesRequests = HashMap<String, Request>()
Expand Down Expand Up @@ -159,4 +162,18 @@ class EthClient(private val service: RLPxService) : EthRequestsManager, SubProto
connection: WireConnection,
transactionReceipts: List<List<TransactionReceipt>>
): Request? = transactionReceiptRequests[connection.uri()]

override suspend fun submitPooledTransaction(vararg tx: Transaction) {
for (t in tx) { pendingTransactionsPool.add(t) }
val hashes = tx.map { it.hash }
val conns = service.repository().asIterable(EthSubprotocol.ETH65)
conns.forEach { conn ->
service.send(
EthSubprotocol.ETH65,
MessageType.NewPooledTransactionHashes.code,
conn,
GetBlockBodies(hashes).toBytes()
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import org.apache.tuweni.eth.Hash
import org.apache.tuweni.eth.Transaction
import org.apache.tuweni.eth.TransactionReceipt
import org.apache.tuweni.eth.repository.BlockchainRepository
import org.apache.tuweni.eth.repository.TransactionPool
import org.apache.tuweni.rlpx.wire.WireConnection

/**
* Controller managing the state of the ETH or LES subprotocol handlers.
*/
class EthController(
val repository: BlockchainRepository,
val pendingTransactionsPool: TransactionPool,
val requestsManager: EthRequestsManager,
val connectionsListener: (WireConnection, Status) -> Unit = { _, _ -> }
) {
Expand Down Expand Up @@ -178,4 +180,20 @@ class EthController(
repository.storeTransaction(it)
}
}

suspend fun addNewPooledTransactions(transactions: List<Transaction>) {
for (tx in transactions) {
pendingTransactionsPool.add(tx)
}
}

suspend fun findPooledTransactions(hashes: List<Hash>): List<Transaction> {
val result = ArrayList<Transaction>()
for (hash in hashes) {
pendingTransactionsPool.get(hash)?.let {
result.add(it)
}
}
return result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.concurrent.AsyncCompletion
import org.apache.tuweni.concurrent.CompletableAsyncCompletion
import org.apache.tuweni.concurrent.coroutines.asyncCompletion
import org.apache.tuweni.eth.Hash
import org.apache.tuweni.rlpx.RLPxService
import org.apache.tuweni.rlpx.wire.DisconnectReason
import org.apache.tuweni.rlpx.wire.SubProtocolHandler
import org.apache.tuweni.rlpx.wire.WireConnection
import org.slf4j.LoggerFactory
import java.util.WeakHashMap
import kotlin.collections.ArrayList
import kotlin.collections.set
import kotlin.coroutines.CoroutineContext

internal class EthHandler(
Expand All @@ -41,6 +44,8 @@ internal class EthHandler(

companion object {
val logger = LoggerFactory.getLogger(EthHandler::class.java)!!
val MAX_NEW_POOLED_TX_HASHES = 4096
val MAX_POOLED_TX = 256
}

override fun handle(connection: WireConnection, messageType: Int, message: Bytes) = asyncCompletion {
Expand All @@ -58,13 +63,37 @@ internal class EthHandler(
MessageType.NodeData.code -> handleNodeData(connection, NodeData.read(message))
MessageType.GetReceipts.code -> handleGetReceipts(connection, GetReceipts.read(message))
MessageType.Receipts.code -> handleReceipts(connection, Receipts.read(message))
MessageType.NewPooledTransactionHashes.code -> handleNewPooledTransactionHashes(
connection,
NewPooledTransactionHashes.read(message)
)
MessageType.GetPooledTransactions.code -> handleGetPooledTransactions(
connection,
GetPooledTransactions.read(message)
)
MessageType.PooledTransactions.code -> handlePooledTransactions(PooledTransactions.read(message))
else -> {
logger.warn("Unknown message type {}", messageType)
service.disconnect(connection, DisconnectReason.SUBPROTOCOL_REASON)
}
}
}

private suspend fun handlePooledTransactions(read: PooledTransactions) {
controller.addNewPooledTransactions(read.transactions)
}

private suspend fun handleGetPooledTransactions(connection: WireConnection, read: GetPooledTransactions) {
val tx = controller.findPooledTransactions(read.hashes)
logger.debug("Responding to GetPooledTransactions with {} transactions", tx.size)
service.send(
EthSubprotocol.ETH65,
MessageType.PooledTransactions.code,
connection,
PooledTransactions(tx).toBytes()
)
}

private suspend fun handleTransactions(transactions: Transactions) {
controller.addNewTransactions(transactions.transactions)
}
Expand All @@ -88,6 +117,41 @@ internal class EthHandler(
}
}

private suspend fun handleNewPooledTransactionHashes(
connection: WireConnection,
newPooledTransactionHashes: NewPooledTransactionHashes
) {
if (newPooledTransactionHashes.hashes.size > MAX_NEW_POOLED_TX_HASHES) {
service.disconnect(connection, DisconnectReason.SUBPROTOCOL_REASON)
return
}
var missingTx = ArrayList<Hash>()
var message = GetPooledTransactions(missingTx)
for (hash in newPooledTransactionHashes.hashes) {
if (!controller.pendingTransactionsPool.contains(hash)) {
missingTx.add(hash)
}
if (missingTx.size == MAX_POOLED_TX) {
service.send(
EthSubprotocol.ETH65,
MessageType.GetPooledTransactions.code,
connection,
message.toBytes()
)
missingTx = ArrayList()
message = GetPooledTransactions(missingTx)
}
}
if (!missingTx.isEmpty()) {
service.send(
EthSubprotocol.ETH65,
MessageType.GetPooledTransactions.code,
connection,
message.toBytes()
)
}
}

private suspend fun handleReceipts(connection: WireConnection, receipts: Receipts) {
controller.addNewTransactionReceipts(connection, receipts.transactionReceipts)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.tuweni.concurrent.CompletableAsyncCompletion
import org.apache.tuweni.eth.BlockBody
import org.apache.tuweni.eth.BlockHeader
import org.apache.tuweni.eth.Hash
import org.apache.tuweni.eth.Transaction
import org.apache.tuweni.eth.TransactionReceipt
import org.apache.tuweni.rlpx.wire.WireConnection

Expand Down Expand Up @@ -108,6 +109,12 @@ interface EthRequestsManager {
connection: WireConnection,
transactionReceipts: List<List<TransactionReceipt>>
): Request?

/**
* Submits a new pending transaction to the transaction pool to be gossiped to peers.
* @param tx a new transaction
*/
suspend fun submitPooledTransaction(vararg tx: Transaction)
}

/**
Expand Down
Loading