Skip to content
This repository was archived by the owner on Dec 26, 2022. It is now read-only.

Commit dc67b3c

Browse files
authored
Merge pull request #526 from HowJMay/publish_txn
feat(core): Broadcast buffered transaction objects
2 parents cb1d24b + 5ab3ce5 commit dc67b3c

File tree

20 files changed

+521
-219
lines changed

20 files changed

+521
-219
lines changed

accelerator/config.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,8 @@ status_t ta_core_default_init(ta_core_t* const core) {
236236
cache->host = REDIS_HOST;
237237
cache->port = REDIS_PORT;
238238
cache->cache_state = false;
239+
cache->buffer_list_name = BUFFER_LIST_NAME;
240+
cache->done_list_name = DONE_LIST_NAME;
239241

240242
ta_log_info("Initializing IRI configuration\n");
241243
iota_conf->milestone_depth = MILESTONE_DEPTH;

accelerator/config.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ extern "C" {
5353
"AMRWQP9BUMJALJHBXUCHOD9HFFD9LGTGEAWMJWWXSDVOF9PI9YGJAPBQLQUOMNYEQCZPGCTHGV" \
5454
"NNAPGHA"
5555
#define MAM_FILE_PREFIX "/tmp/mam_bin_XXXXXX"
56+
#define BUFFER_LIST_NAME "txn_buff_list"
57+
#define DONE_LIST_NAME "done_txn_buff_list"
5658
#define IRI_HEALTH_TRACK_PERIOD 1800 // Check every half hour in default
5759

5860
/** @name Redis connection config */
@@ -88,9 +90,12 @@ typedef struct iota_config_s {
8890

8991
/** struct type of accelerator cache */
9092
typedef struct ta_cache_s {
91-
char* host; /**< Binding address of redis server */
92-
uint16_t port; /**< Binding port of redis server */
93-
bool cache_state; /**< Set it true to turn on cache server */
93+
char* host; /**< Binding address of redis server */
94+
uint64_t timeout; /**< Timeout for keys in redis */
95+
char* buffer_list_name; /**< Name of the list to buffer transactions */
96+
char* done_list_name; /**< Name of the list to store successfully broadcast transactions from buffer */
97+
uint16_t port; /**< Binding port of redis server */
98+
bool cache_state; /**< Set it true to turn on cache server */
9499
} ta_cache_t;
95100

96101
/** struct type of accelerator core */

accelerator/core/BUILD

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ cc_library(
44
hdrs = ["apis.h"],
55
linkopts = [
66
"-lpthread",
7-
"-luuid",
87
],
98
visibility = ["//visibility:public"],
109
deps = [
@@ -35,6 +34,9 @@ cc_library(
3534
name = "core",
3635
srcs = ["core.c"],
3736
hdrs = ["core.h"],
37+
linkopts = [
38+
"-luuid",
39+
],
3840
visibility = ["//visibility:public"],
3941
deps = [
4042
"//accelerator:ta_config",

accelerator/core/apis.c

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,6 @@ status_t api_recv_mam_message(const iota_config_t* const iconf, const iota_clien
334334
}
335335

336336
recv_mam_data_id_mam_v1_t* data_id = (recv_mam_data_id_mam_v1_t*)req->data_id;
337-
338337
if (mam_api_init(&mam, (tryte_t*)iconf->seed) != RC_OK) {
339338
ret = SC_MAM_FAILED_INIT;
340339
ta_log_error("%s\n", "SC_MAM_FAILED_INIT");
@@ -566,26 +565,10 @@ status_t api_send_transfer(const ta_core_t* const core, const char* const obj, c
566565
goto done;
567566
}
568567

569-
ret = ta_send_transfer(&core->ta_conf, &core->iota_conf, &core->iota_service, req, res);
568+
ret = ta_send_transfer(&core->ta_conf, &core->iota_conf, &core->iota_service, &core->cache, req, res);
570569
if (ret == SC_CCLIENT_FAILED_RESPONSE) {
571570
lock_handle_unlock(&cjson_lock);
572571
ta_log_info("%s\n", "Caching transaction");
573-
// TODO generate a UUID as redis key
574-
uuid_t binuuid;
575-
uuid_generate_random(binuuid);
576-
uuid_unparse(binuuid, res->uuid);
577-
if (!res->uuid[0]) {
578-
ta_log_error("%s\n", "Failed to generate UUID");
579-
goto done;
580-
}
581-
582-
// Cache the txn in redis
583-
// TODO use a timer to reattach these failed transactions
584-
ret = cache_set(res->uuid, UUID_STR_LEN - 1, obj, strlen(obj), CACHE_FAILED_TXN_TIMEOUT);
585-
if (ret) {
586-
ta_log_error("%s\n", ta_error_to_string(ret));
587-
goto done;
588-
}
589572

590573
// Cache the request and serialize UUID as response directly
591574
goto serialize;

accelerator/core/apis.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,6 @@
1515
#include "mam/mam/mam_channel_t_set.h"
1616
#include "serializer/serializer.h"
1717

18-
// TODO The temporary default timeout in cache server is 1 week. We should investigate the performance of redis to
19-
// design a better data structure and appropriate timeout period. And we should study the methodology to partially
20-
// release cached data.
21-
#define CACHE_FAILED_TXN_TIMEOUT (7 * 24 * 60 * 60)
22-
2318
#ifdef __cplusplus
2419
extern "C" {
2520
#endif
@@ -54,7 +49,7 @@ int apis_logger_release();
5449
* @param tangle[in] iota configuration variables
5550
* @param cache[in] redis configuration variables
5651
* @param service[in] IRI connection configuration variables
57-
* @param[out] json_result Result containing tangle accelerator information in json format
52+
* @param json_result[out] Result containing tangle accelerator information in json format
5853
*
5954
* @return
6055
* - SC_OK on success

accelerator/core/core.c

Lines changed: 100 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ status_t ta_attach_to_tangle(const attach_to_tangle_req_t* const req, attach_to_
3131
iota_transaction_t tx;
3232
flex_trit_t* elt = NULL;
3333

34-
// TODO Save fetched transaction object in a set.
3534
// create bundle
3635
bundle_transactions_new(&bundle);
3736
HASH_ARRAY_FOREACH(req->trytes, elt) {
@@ -91,7 +90,9 @@ status_t ta_send_trytes(const ta_config_t* const info, const iota_config_t* cons
9190
HASH_ARRAY_FOREACH(trytes, elt) { attach_to_tangle_req_trytes_add(attach_req, elt); }
9291
attach_to_tangle_req_init(attach_req, get_transactions_to_approve_res_trunk(tx_approve_res),
9392
get_transactions_to_approve_res_branch(tx_approve_res), iconf->mwm);
94-
if (ta_attach_to_tangle(attach_req, attach_res) != SC_OK) {
93+
ret = ta_attach_to_tangle(attach_req, attach_res);
94+
if (ret != SC_OK) {
95+
ta_log_error("%s\n", ta_error_to_string(ret));
9596
goto done;
9697
}
9798

@@ -163,8 +164,8 @@ status_t ta_generate_address(const iota_config_t* const iconf, const iota_client
163164
}
164165

165166
status_t ta_send_transfer(const ta_config_t* const info, const iota_config_t* const iconf,
166-
const iota_client_service_t* const service, const ta_send_transfer_req_t* const req,
167-
ta_send_transfer_res_t* res) {
167+
const iota_client_service_t* const service, const ta_cache_t* const cache,
168+
const ta_send_transfer_req_t* const req, ta_send_transfer_res_t* res) {
168169
if (req == NULL || res == NULL) {
169170
ta_log_error("%s\n", "SC_TA_NULL");
170171
return SC_TA_NULL;
@@ -225,6 +226,9 @@ status_t ta_send_transfer(const ta_config_t* const info, const iota_config_t* co
225226

226227
ret = ta_send_trytes(info, iconf, service, raw_tx);
227228
if (ret) {
229+
ta_log_error("Error in ta_send_trytes. Push transaction trytes to buffer.\n");
230+
res->uuid = (char*)malloc(sizeof(char) * UUID_STR_LEN);
231+
push_txn_to_buffer(cache, raw_tx, res->uuid);
228232
goto done;
229233
}
230234

@@ -597,3 +601,95 @@ status_t ta_update_iri_conneciton(ta_config_t* const ta_conf, iota_client_servic
597601
done:
598602
return ret;
599603
}
604+
605+
status_t push_txn_to_buffer(const ta_cache_t* const cache, hash8019_array_p raw_txn_flex_trit_array, char* uuid) {
606+
status_t ret = SC_OK;
607+
if (!uuid) {
608+
ret = SC_CORE_NULL;
609+
ta_log_error("%s\n", ta_error_to_string(ret));
610+
goto done;
611+
}
612+
613+
uuid_t binuuid;
614+
uuid_generate_random(binuuid);
615+
uuid_unparse(binuuid, uuid);
616+
if (!uuid[0]) {
617+
ta_log_error("%s\n", "Failed to generate UUID");
618+
goto done;
619+
}
620+
621+
// TODO We push only one transaction raw trits into list, we need to solve this in future.
622+
ret = cache_set(uuid, UUID_STR_LEN - 1, hash_array_at(raw_txn_flex_trit_array, 0),
623+
NUM_FLEX_TRITS_SERIALIZED_TRANSACTION, cache->timeout);
624+
if (ret) {
625+
ta_log_error("%s\n", ta_error_to_string(ret));
626+
goto done;
627+
}
628+
629+
ret =
630+
cache_list_push(cache->buffer_list_name, strlen(cache->buffer_list_name), uuid, UUID_STR_LEN - 1, cache->timeout);
631+
if (ret) {
632+
ta_log_error("%s\n", ta_error_to_string(ret));
633+
}
634+
635+
done:
636+
return ret;
637+
}
638+
639+
status_t broadcast_buffered_txn(const ta_core_t* const core) {
640+
status_t ret = SC_OK;
641+
int uuid_list_len = 0;
642+
hash8019_array_p txn_trytes_array = hash8019_array_new();
643+
644+
do {
645+
char uuid[UUID_STR_LEN];
646+
flex_trit_t req_txn_flex_trits[NUM_FLEX_TRITS_SERIALIZED_TRANSACTION + 1];
647+
648+
ret = cache_list_size(core->cache.buffer_list_name, &uuid_list_len);
649+
if (ret) {
650+
ta_log_error("%s\n", ta_error_to_string(ret));
651+
goto done;
652+
}
653+
654+
ret = cache_list_peek(core->cache.buffer_list_name, UUID_STR_LEN, uuid);
655+
if (ret) {
656+
ta_log_error("%s\n", ta_error_to_string(ret));
657+
goto done;
658+
}
659+
660+
// TODO Now we assume every time we call `cache_get()`, we would get a transaction object. However, in the future,
661+
// the returned result may be a bunlde.
662+
ret = cache_get(uuid, (char*)req_txn_flex_trits);
663+
if (ret) {
664+
ta_log_error("%s\n", ta_error_to_string(ret));
665+
goto done;
666+
}
667+
668+
hash_array_push(txn_trytes_array, req_txn_flex_trits);
669+
ret = ta_send_trytes(&core->ta_conf, &core->iota_conf, &core->iota_service, txn_trytes_array);
670+
if (ret) {
671+
ta_log_error("%s\n", ta_error_to_string(ret));
672+
goto done;
673+
}
674+
utarray_done(txn_trytes_array);
675+
676+
// Pop transaction from buffered list
677+
ret = cache_list_pop(core->cache.buffer_list_name, (char*)req_txn_flex_trits);
678+
if (ret) {
679+
ta_log_error("%s\n", ta_error_to_string(ret));
680+
goto done;
681+
}
682+
683+
// Transfer the transaction to another list in where we store all the successfully broadcasted transactions.
684+
ret = cache_list_push(core->cache.done_list_name, strlen(core->cache.done_list_name), uuid, UUID_STR_LEN - 1,
685+
core->cache.timeout);
686+
if (ret) {
687+
ta_log_error("%s\n", ta_error_to_string(ret));
688+
goto done;
689+
}
690+
} while (!uuid_list_len);
691+
692+
done:
693+
hash_array_free(txn_trytes_array);
694+
return ret;
695+
}

accelerator/core/core.h

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ status_t ta_generate_address(const iota_config_t* const iconf, const iota_client
8484
* @param[in] info Tangle-accelerator configuration variables
8585
* @param[in] iconf IOTA API parameter configurations
8686
* @param[in] service IRI node end point service
87+
* @param[in] cache redis configuration variables
8788
* @param[in] req Request containing address value, message, tag in
8889
* ta_send_transfer_req_t
8990
* @param[out] res Result containing transaction hash in ta_send_transfer_res_t
@@ -93,8 +94,8 @@ status_t ta_generate_address(const iota_config_t* const iconf, const iota_client
9394
* - non-zero on error
9495
*/
9596
status_t ta_send_transfer(const ta_config_t* const info, const iota_config_t* const iconf,
96-
const iota_client_service_t* const service, const ta_send_transfer_req_t* const req,
97-
ta_send_transfer_res_t* res);
97+
const iota_client_service_t* const service, const ta_cache_t* const cache,
98+
const ta_send_transfer_req_t* const req, ta_send_transfer_res_t* res);
9899

99100
/**
100101
* @brief Send trytes to tangle.
@@ -254,6 +255,37 @@ status_t ta_get_iri_status(const iota_client_service_t* const service);
254255
*/
255256
status_t ta_update_iri_conneciton(ta_config_t* const ta_conf, iota_client_service_t* const service);
256257

258+
/**
259+
* @brief Push failed transactions in raw trytes into transaction buffer
260+
*
261+
* Given raw trytes array would be pushed into buffer. An UUID will be returned for client to fetch the information of
262+
* their request. The UUIDs are stored in a list, so once reaching the capacity of the buffer, buffered transactions can
263+
* be popped from the buffer.
264+
*
265+
* @param cache[in] Redis configuration variables
266+
* @param raw_txn_flex_trit_array[in] Raw transction trytes array in flex_trit_t type
267+
* @param uuid[out] Returned UUID for fetching transaction status and information
268+
*
269+
* @return
270+
* - SC_OK on success
271+
* - non-zero on error
272+
*/
273+
status_t push_txn_to_buffer(const ta_cache_t* const cache, hash8019_array_p raw_txn_flex_trit_array, char* uuid);
274+
275+
/**
276+
* @brief Broadcast transactions in transaction buffer
277+
*
278+
* Failed transactions would be stored in transaciton buffer. Once tangle-accelerator retrieve the connetion with
279+
* Tangle, then tangle-accelerator will start to broadcast these failed transaction trytes.
280+
*
281+
* @param core[in] Pointer to Tangle-accelerator core configuration structure
282+
*
283+
* @return
284+
* - SC_OK on success
285+
* - non-zero on error
286+
*/
287+
status_t broadcast_buffered_txn(const ta_core_t* const core);
288+
257289
#ifdef __cplusplus
258290
}
259291
#endif

accelerator/core/response/ta_send_transfer.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@
1111
ta_send_transfer_res_t* ta_send_transfer_res_new() {
1212
ta_send_transfer_res_t* res = (ta_send_transfer_res_t*)malloc(sizeof(ta_send_transfer_res_t));
1313
res->hash = NULL;
14-
memset(res->uuid, 0, UUID_STR_LEN);
14+
res->uuid = NULL;
1515
return res;
1616
}
1717

1818
void ta_send_transfer_res_free(ta_send_transfer_res_t** res) {
1919
if ((*res)) {
20+
free((*res)->uuid);
2021
hash243_queue_free(&(*res)->hash);
2122
free((*res));
2223
*res = NULL;

accelerator/core/response/ta_send_transfer.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ typedef struct {
3030
/** Transaction address is a 243 long flex trits hash queue. */
3131
hash243_queue_t hash;
3232
transaction_array_t* txn_array;
33-
// If the value of `CASS_UUID_STRING_LENGTH` changes, then we may need to modify array length of uuid_string as well,
34-
// since UUID_STR_LEN is defined in `uuid/uuid.h`.
35-
char uuid[UUID_STR_LEN];
33+
char* uuid;
3634
#ifdef DB_ENABLE
3735
char uuid_string[DB_UUID_STRING_LENGTH];
3836
#endif

accelerator/core/serializer/serializer.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -822,7 +822,7 @@ status_t ta_send_transfer_res_serialize(ta_send_transfer_res_t* res, char** obj)
822822
goto done;
823823
}
824824

825-
if (res->uuid[0]) {
825+
if (res->uuid) {
826826
cJSON_AddStringToObject(json_root, "uuid", res->uuid);
827827
} else {
828828
ret = iota_transaction_to_json_object(transaction_array_at(res->txn_array, 0), &json_root);

0 commit comments

Comments
 (0)