Skip to content
This repository was archived by the owner on Dec 26, 2022. It is now read-only.

Commit df53b0b

Browse files
YingHan-Chenjserv
authored andcommitted
feat(db): Implement transaction reattach service
Get all pending transactions from ScyllaDB every 5 mins. Update the status of transaction if it has been confirmed. Preform reattachment if the transaction has been pended since 30 minutes ago. Close #414
1 parent 15db7fb commit df53b0b

File tree

6 files changed

+244
-15
lines changed

6 files changed

+244
-15
lines changed

docs/reattacher.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Transaction reattacher
2+
3+
`Transaction reattacher` is a service that helps persistent pending transactions to be re-attached to the Tangle. A persistent transaction is a transaction that does not be confirmed more than 30 minutes.
4+
5+
When enabling the external database for transaction reattachment, `Tangle-Accelerator` will store transactions issued by API [Send Transfer Message](https://github.com/DLTcollab/tangle-accelerator/wiki/Send-Transfer-Message).
6+
7+
`Transaction reattacher` will periodically read pending transactions from a specific ScyllaDB cluster, and get the latest inclusion status of those transactions from an IOTA full node. `Reattacher` will update the newest inclusion status to the ScyllaDB cluster. For persistent transactions, `reattacher` performs reattachment, which will do tips selection and PoW for the original bundle, and reattach it to the Tangle. After reattachment, `reattacher` will update the new transaction hash to the ScyllaDB cluster.
8+
9+
10+
See [docs/build.md] for more information about enabling transaction reattachment.
11+
12+
## Build Instructions
13+
14+
`bazel build //reattacher`
15+
16+
The reattacher support following options :
17+
18+
* `DB_HOST`: binding address of ScyllDB cluster
19+
* `IRI_HOST`: binding address of IRI
20+
* `IRI_PORT`: port of IRI
21+
22+
If you do not specify `DB_HOST` or `IRI_HOST`, the address will be set as `localhost`.

reattacher/BUILD

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
cc_binary(
2+
name = "reattacher",
3+
srcs = ["reattacher_main.c"],
4+
deps = [
5+
"//accelerator:ta_config",
6+
"//storage",
7+
"@entangled//cclient/api",
8+
],
9+
)

reattacher/reattacher_main.c

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Copyright (C) 2020 BiiLabs Co., Ltd. and Contributors
3+
* All Rights Reserved.
4+
* This is free software; you can redistribute it and/or modify it under the
5+
* terms of the MIT license. A copy of the license can be found in the file
6+
* "LICENSE" at the root of this distribution.
7+
*/
8+
#include <getopt.h>
9+
#include <sys/time.h>"
10+
#include "accelerator/config.h"
11+
#include "cclient/api/core/core_api.h"
12+
#include "cclient/api/extended/extended_api.h"
13+
#include "common/model/bundle.h"
14+
#include "storage/ta_storage.h"
15+
16+
#define PRESISTENT_PENDING_SECOND 1800 /**< 30 mins */
17+
#define DELAY_INTERVAL 300 /**< 5 mins */
18+
19+
#define logger_id scylladb_logger_id
20+
21+
static status_t init_iota_client_service(iota_client_service_t* const serv) {
22+
if (serv == NULL) {
23+
ta_log_error("Invalid NULL pointer\n");
24+
return SC_TA_NULL;
25+
}
26+
serv->http.path = "/";
27+
serv->http.content_type = "application/json";
28+
serv->http.accept = "application/json";
29+
serv->http.api_version = 1;
30+
serv->http.ca_pem = NULL;
31+
serv->serializer_type = SR_JSON;
32+
if (iota_client_core_init(serv) != RC_OK) {
33+
ta_log_error("Failed to connect to IRI.\n");
34+
return SC_TA_OOM;
35+
}
36+
return SC_OK;
37+
}
38+
39+
static status_t handle_pending_txn(iota_client_service_t* iota_service, db_client_service_t* db_service,
40+
db_identity_t* obj) {
41+
status_t ret = SC_OK;
42+
hash243_queue_t req_txn = NULL;
43+
get_inclusion_states_res_t* res = get_inclusion_states_res_new();
44+
flex_trit_t trit_hash[NUM_TRITS_HASH];
45+
char tryte_hash[NUM_TRYTES_HASH];
46+
flex_trits_from_trytes(trit_hash, NUM_TRITS_HASH, (const tryte_t*)db_ret_identity_hash(obj), NUM_TRYTES_HASH,
47+
NUM_TRYTES_HASH);
48+
49+
hash243_queue_push(&req_txn, trit_hash);
50+
51+
if (iota_client_get_latest_inclusion(iota_service, req_txn, res) != RC_OK ||
52+
get_inclusion_states_res_states_count(res) != 1) {
53+
ret = SC_CCLIENT_FAILED_RESPONSE;
54+
ta_log_error("Failed to get inclustion status\n");
55+
db_show_identity_info(obj);
56+
goto exit;
57+
}
58+
if (get_inclusion_states_res_states_at(res, 0)) {
59+
// confirmed transaction
60+
ta_log_info("Find confirmed transaction\n");
61+
db_set_identity_status(obj, CONFIRMED_TXN);
62+
ret = db_insert_identity_table(db_service, obj);
63+
if (ret != SC_OK) {
64+
ta_log_error("Failed to insert identity table\n");
65+
db_show_identity_info(obj);
66+
goto exit;
67+
}
68+
} else if (db_ret_identity_time_elapsed(obj) > PRESISTENT_PENDING_SECOND) {
69+
// reattach
70+
ta_log_info("Reattach pending transaction\n");
71+
db_show_identity_info(obj);
72+
bundle_transactions_t* res_bundle_txn;
73+
bundle_transactions_new(&res_bundle_txn);
74+
75+
if (iota_client_replay_bundle(iota_service, trit_hash, MILESTONE_DEPTH, MWM, NULL, res_bundle_txn) != RC_OK) {
76+
ta_log_error("Failed to reattach to Tangle\n");
77+
db_show_identity_info(obj);
78+
ret = SC_CCLIENT_FAILED_RESPONSE;
79+
goto reattach_done;
80+
}
81+
82+
/**
83+
* < get the second transaction in the bundle,
84+
* the first transaction is the original transaction before reattachment
85+
*/
86+
iota_transaction_t* txn = bundle_at(res_bundle_txn, 1);
87+
flex_trits_to_trytes((tryte_t*)tryte_hash, NUM_TRYTES_HASH, transaction_hash(txn), NUM_TRITS_HASH, NUM_TRITS_HASH);
88+
89+
db_set_identity_hash(obj, (cass_byte_t*)tryte_hash, NUM_TRYTES_HASH);
90+
db_set_identity_timestamp(obj, time(NULL));
91+
92+
ret = db_insert_identity_table(db_service, obj);
93+
if (ret != SC_OK) {
94+
ta_log_error("Failed to insert identity table\n");
95+
goto exit;
96+
}
97+
98+
reattach_done:
99+
bundle_transactions_free(&res_bundle_txn);
100+
}
101+
102+
exit:
103+
hash243_queue_free(&req_txn);
104+
get_inclusion_states_res_free(&res);
105+
106+
return ret;
107+
}
108+
109+
int main(int argc, char** argv) {
110+
int optIdx;
111+
db_client_service_t db_service;
112+
iota_client_service_t iota_service;
113+
db_service.host = strdup("localhost");
114+
iota_service.http.host = "localhost";
115+
iota_service.http.port = 14265;
116+
117+
const struct option longOpt[] = {{"iri_host", required_argument, NULL, 'h'},
118+
{"iri_port", required_argument, NULL, 'p'},
119+
{"db_host", required_argument, NULL, 'd'},
120+
{NULL, 0, NULL, 0}};
121+
122+
/* Parse the command line options */
123+
/* TODO: Support macOS since getopt_long() is GNU extension */
124+
while (1) {
125+
int cmdOpt = getopt_long(argc, argv, "b:", longOpt, &optIdx);
126+
if (cmdOpt == -1) break;
127+
128+
/* Invalid option */
129+
if (cmdOpt == '?') break;
130+
131+
if (cmdOpt == 'h') {
132+
iota_service.http.host = optarg;
133+
}
134+
if (cmdOpt == 'p') {
135+
iota_service.http.port = atoi(optarg);
136+
}
137+
if (cmdOpt == 'd') {
138+
free(db_service.host);
139+
db_service.host = strdup(optarg);
140+
}
141+
}
142+
if (ta_logger_init() != SC_OK) {
143+
ta_log_error("logger init fail\n");
144+
return EXIT_FAILURE;
145+
}
146+
scylladb_logger_init();
147+
if (db_client_service_init(&db_service, DB_USAGE_REATTACH) != SC_OK) {
148+
ta_log_error("Failed to init db client service\n");
149+
return EXIT_FAILURE;
150+
}
151+
if (init_iota_client_service(&iota_service) != SC_OK) {
152+
ta_log_error("Failed to init iota client service\n");
153+
return EXIT_FAILURE;
154+
}
155+
while (1) {
156+
db_identity_array_t* id_array = db_identity_array_new();
157+
db_get_identity_objs_by_status(&db_service, PENDING_TXN, id_array);
158+
db_identity_t* itr;
159+
IDENTITY_TABLE_ARRAY_FOREACH(id_array, itr) {
160+
if (handle_pending_txn(&iota_service, &db_service, itr) != SC_OK) {
161+
ta_log_warning("Failed to handle pending transaction\n");
162+
db_show_identity_info(itr);
163+
}
164+
}
165+
db_identity_array_free(&id_array);
166+
sleep(DELAY_INTERVAL);
167+
}
168+
169+
db_client_service_free(&db_service);
170+
iota_client_core_destroy(&iota_service);
171+
scylladb_logger_release();
172+
return 0;
173+
}

storage/scylladb_identity.c

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,24 @@ struct db_identity_s {
1717
CassUuid uuid;
1818
cass_int64_t timestamp;
1919
cass_int8_t status;
20-
cass_byte_t hash[NUM_FLEX_TRITS_BUNDLE];
20+
cass_byte_t hash[DB_NUM_TRYTES_HASH];
2121
};
2222

23+
status_t db_show_identity_info(db_identity_t* obj) {
24+
if (obj == NULL) {
25+
ta_log_error("Invaild NULL pointer : obj\n");
26+
return SC_TA_NULL;
27+
}
28+
char uuid_string[DB_UUID_STRING_LENGTH];
29+
char hash[DB_NUM_TRYTES_HASH + 1];
30+
db_get_identity_uuid_string(obj, uuid_string);
31+
memcpy(hash, obj->hash, DB_NUM_TRYTES_HASH);
32+
hash[DB_NUM_TRYTES_HASH] = 0;
33+
ta_log_info("identity info\n uuid string : %s\nhash :%s\ntimestamp : %ld\ntime eclapsed : %ld\nstatus : %d\n",
34+
uuid_string, hash, obj->timestamp, db_ret_identity_time_elapsed(obj), obj->status);
35+
return SC_OK;
36+
}
37+
2338
status_t db_identity_new(db_identity_t** obj) {
2439
*obj = (db_identity_t*)malloc(sizeof(struct db_identity_s));
2540
if (NULL == *obj) {
@@ -122,11 +137,11 @@ status_t db_set_identity_hash(db_identity_t* obj, const cass_byte_t* hash, size_
122137
if (hash == NULL) {
123138
ta_log_error("NULL pointer to hash to insert into identity table\n");
124139
}
125-
if (length != NUM_FLEX_TRITS_HASH) {
140+
if (length != DB_NUM_TRYTES_HASH) {
126141
ta_log_error("SC_STORAGE_INVAILD_INPUT\n");
127142
return SC_STORAGE_INVAILD_INPUT;
128143
}
129-
memcpy(obj->hash, hash, NUM_FLEX_TRITS_HASH);
144+
memcpy(obj->hash, hash, DB_NUM_TRYTES_HASH);
130145
return SC_OK;
131146
}
132147

@@ -187,7 +202,7 @@ status_t db_insert_tx_into_identity(db_client_service_t* service, const char* ha
187202
ta_log_error("db new identity failed\n");
188203
goto exit;
189204
}
190-
if ((ret = db_set_identity_hash(identity, (cass_byte_t*)hash, NUM_FLEX_TRITS_HASH)) != SC_OK) {
205+
if ((ret = db_set_identity_hash(identity, (cass_byte_t*)hash, DB_NUM_TRYTES_HASH)) != SC_OK) {
191206
ta_log_error("db set identity hash failed\n");
192207
goto exit;
193208
}
@@ -251,7 +266,7 @@ status_t db_init_identity_keyspace(db_client_service_t* service, bool need_drop,
251266
static CassStatement* ret_insert_identity_statement(const CassPrepared* prepared, const db_identity_t* obj) {
252267
CassStatement* statement = NULL;
253268
statement = cass_prepared_bind(prepared);
254-
cass_statement_bind_bytes_by_name(statement, "hash", obj->hash, NUM_FLEX_TRITS_HASH);
269+
cass_statement_bind_bytes_by_name(statement, "hash", obj->hash, DB_NUM_TRYTES_HASH);
255270
cass_statement_bind_int8_by_name(statement, "status", obj->status);
256271
cass_statement_bind_int64_by_name(statement, "ts", obj->timestamp);
257272
cass_statement_bind_uuid_by_name(statement, "id", obj->uuid);
@@ -262,9 +277,7 @@ status_t db_insert_identity_table(db_client_service_t* service, db_identity_t* o
262277
status_t ret = SC_OK;
263278
const CassPrepared* insert_prepared = NULL;
264279
CassStatement* statement = NULL;
265-
const char* insert_query =
266-
"INSERT INTO identity (id, ts, status, hash)"
267-
"VALUES (?, ?, ?, ?);";
280+
const char* query = "UPDATE identity Set ts = ?, status = ?, hash =? Where id = ?";
268281
if (service == NULL) {
269282
ta_log_error("NULL pointer to ScyllaDB client service for connection endpoint(s)");
270283
return SC_TA_NULL;
@@ -274,7 +287,7 @@ status_t db_insert_identity_table(db_client_service_t* service, db_identity_t* o
274287
return SC_TA_NULL;
275288
}
276289

277-
if (prepare_query(service->session, insert_query, &insert_prepared) != CASS_OK) {
290+
if (prepare_query(service->session, query, &insert_prepared) != CASS_OK) {
278291
ta_log_error("%s\n", "prepare INSERT query fail");
279292
return SC_STORAGE_CASSANDRA_QUREY_FAIL;
280293
}
@@ -327,7 +340,7 @@ static status_t get_identity_array(CassSession* session, CassStatement* statemen
327340
itr = (db_identity_t*)utarray_back(identity_array);
328341

329342
cass_value_get_bytes(cass_row_get_column_by_name(row, "hash"), &hash, &len);
330-
db_set_identity_hash(itr, hash, NUM_FLEX_TRITS_HASH);
343+
db_set_identity_hash(itr, hash, DB_NUM_TRYTES_HASH);
331344
cass_value_get_int8(cass_row_get_column_by_name(row, "status"), &value);
332345
db_set_identity_status(itr, value);
333346
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
@@ -402,7 +415,7 @@ status_t db_get_identity_objs_by_hash(db_client_service_t* service, const cass_b
402415
return SC_STORAGE_CASSANDRA_QUREY_FAIL;
403416
}
404417
statement = cass_prepared_bind(select_prepared);
405-
cass_statement_bind_bytes_by_name(statement, "hash", hash, NUM_FLEX_TRITS_HASH);
418+
cass_statement_bind_bytes_by_name(statement, "hash", hash, DB_NUM_TRYTES_HASH);
406419
ret = get_identity_array(service->session, statement, identity_array);
407420

408421
cass_prepared_free(select_prepared);

storage/scylladb_identity.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ typedef UT_array db_identity_array_t;
3131

3232
typedef enum { PENDING_TXN = 0, INSERTING_TXN, CONFIRMED_TXN, NUM_OF_TXN_STATUS } db_txn_status_t;
3333

34+
#define DB_NUM_TRYTES_HASH NUM_TRYTES_HASH
35+
3436
/**
3537
* Allocate memory of db_identity_array_t
3638
*/
@@ -253,6 +255,16 @@ status_t db_get_identity_objs_by_hash(db_client_service_t* service, const cass_b
253255
*/
254256
status_t db_insert_identity_table(db_client_service_t* service, db_identity_t* obj);
255257

258+
/**
259+
* @brief show logger info for details of identity object
260+
*
261+
* @param[in] obj pointer to db_identity_t
262+
* @return
263+
* - SC_OK on success
264+
* - SC_TA_NULL/SC_STORAGE_INVAILD_INPUT on error
265+
*/
266+
status_t db_show_identity_info(db_identity_t* obj);
267+
256268
#ifdef __cplusplus
257269
}
258270
#endif

tests/test_scylladb.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,8 @@ void test_db_get_identity_objs_by_status(db_client_service_t* db_client_service)
178178
db_get_identity_uuid_string(itr, uuid_string);
179179

180180
TEST_ASSERT_EQUAL_STRING(uuid_string, identities[idx].uuid_string);
181-
TEST_ASSERT_EQUAL_MEMORY(db_ret_identity_hash(itr), (flex_trit_t*)identities[idx].hash,
182-
sizeof(flex_trit_t) * NUM_FLEX_TRITS_HASH);
181+
TEST_ASSERT_EQUAL_MEMORY(db_ret_identity_hash(itr), (cass_byte_t*)identities[idx].hash,
182+
sizeof(cass_byte_t) * DB_NUM_TRYTES_HASH);
183183
idx++;
184184
}
185185
db_identity_array_free(&db_identity_array);
@@ -194,8 +194,8 @@ void test_db_get_identity_objs_by_uuid_string(db_client_service_t* db_client_ser
194194
db_identity_t* itr;
195195
int idx = 0;
196196
IDENTITY_TABLE_ARRAY_FOREACH(db_identity_array, itr) {
197-
TEST_ASSERT_EQUAL_MEMORY(db_ret_identity_hash(itr), (flex_trit_t*)identities[idx].hash,
198-
sizeof(flex_trit_t) * NUM_FLEX_TRITS_HASH);
197+
TEST_ASSERT_EQUAL_MEMORY(db_ret_identity_hash(itr), (cass_byte_t*)identities[idx].hash,
198+
sizeof(cass_byte_t) * DB_NUM_TRYTES_HASH);
199199
TEST_ASSERT_EQUAL_INT(db_ret_identity_status(itr), identities[idx].status);
200200
idx++;
201201
}

0 commit comments

Comments
 (0)