Skip to content
This repository was archived by the owner on Dec 26, 2022. It is now read-only.

Commit 415151f

Browse files
committed
feat(cache): Check cache capacity
`cache_occupied_space()` would return the used size in caching database. When the capacity of redis server exceeds the maximun capacity, the health_track thread will pop the oldest UUID and the corresponding bundle. To avoid data racing, at the moment that health track is cleaning redis server, all the other thread is not able to cache service. Add the Document for buffer service. close #488
1 parent 2100122 commit 415151f

File tree

11 files changed

+251
-102
lines changed

11 files changed

+251
-102
lines changed

accelerator/cli_info.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ typedef enum ta_cli_arg_value_e {
5454
BUFFER_LIST,
5555
DONE_LIST,
5656
HTTP_THREADS_CLI,
57+
CACHE_CAPACITY,
5758

5859
/** LOGGER */
5960
QUIET,
@@ -83,14 +84,15 @@ static struct ta_cli_argument_s {
8384
{"milestone_depth", optional_argument, NULL, MILESTONE_DEPTH_CLI, "IRI milestone depth"},
8485
{"mwm", optional_argument, NULL, MWM_CLI, "minimum weight magnitude"},
8586
{"seed", optional_argument, NULL, SEED_CLI, "IOTA seed"},
86-
{"cache", no_argument, NULL, CACHE, "Enable cache server"},
87+
{"cache", required_argument, NULL, CACHE, "Enable/Disable cache server. It defaults to off"},
8788
{"config", required_argument, NULL, CONF_CLI, "Read configuration file"},
8889
{"proxy_passthrough", no_argument, NULL, PROXY_API, "Pass proxy API directly to IRI without processing"},
8990
{"health_track_period", no_argument, NULL, HEALTH_TRACK_PERIOD,
9091
"The period for checking IRI host connection status"},
9192
{"no-gtta", no_argument, NULL, NO_GTTA, "Disable getTransactionToConfirm (gTTA) when sending transaction"},
9293
{"buffer_list", required_argument, NULL, BUFFER_LIST, "Set the value of `buffer_list_name`"},
9394
{"done_list", required_argument, NULL, DONE_LIST, "Set the value of `done_list_name`"},
95+
{"cache_capacity", required_argument, NULL, CACHE_CAPACITY, "Set the maximum capacity of caching server"},
9496
{"quiet", no_argument, NULL, QUIET, "Disable logger"},
9597
{NULL, 0, NULL, 0, NULL}};
9698

accelerator/config.c

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
6363
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
6464
ta_conf->port = (int)strtol_temp;
6565
} else {
66-
ta_log_error("Malformed input or illegal input character\n");
66+
ta_log_error("Malformed input\n");
6767
}
6868
break;
6969
case HTTP_THREADS_CLI:
@@ -79,7 +79,7 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
7979
ta_conf->http_tpool_size = (uint8_t)strtol_temp;
8080
}
8181
} else {
82-
ta_log_error("Malformed input or illegal input character\n");
82+
ta_log_error("Malformed input\n");
8383
}
8484
break;
8585

@@ -98,7 +98,7 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
9898
if (strtol_p != p && errno != ERANGE && strtol_temp >= 0 && strtol_temp <= USHRT_MAX) {
9999
ta_conf->iota_port_list[idx] = (uint16_t)strtol_temp;
100100
} else {
101-
ta_log_error("Malformed input or illegal input character\n");
101+
ta_log_error("Malformed input\n");
102102
}
103103
}
104104
iota_service->http.port = ta_conf->iota_port_list[0];
@@ -108,9 +108,19 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
108108
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
109109
ta_conf->health_track_period = (int)strtol_temp;
110110
} else {
111-
ta_log_error("Malformed input or illegal input character\n");
111+
ta_log_error("Malformed input\n");
112112
}
113113
break;
114+
case CACHE:
115+
ta_log_info("Initializing cache state\n");
116+
cache->state = !cache->state;
117+
if (cache->state) {
118+
if (cache_init(&cache->rwlock, cache->state, cache->host, cache->port)) {
119+
ta_log_error("%s\n", "Failed to initialize lock to caching service.");
120+
}
121+
}
122+
123+
break;
114124

115125
#ifdef MQTT_ENABLE
116126
// MQTT configuration
@@ -131,7 +141,19 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
131141
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
132142
cache->port = (int)strtol_temp;
133143
} else {
134-
ta_log_error("Malformed input or illegal input character\n");
144+
ta_log_error("Malformed input character\n");
145+
}
146+
break;
147+
case CACHE_MAX_CAPACITY:
148+
strtol_temp = strtol(value, NULL, 10);
149+
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
150+
if (strtol_temp <= 0) {
151+
ta_log_error("The capacity of caching service should greater then 0.\n");
152+
break;
153+
}
154+
cache->capacity = strtol_temp;
155+
} else {
156+
ta_log_error("Malformed input\n");
135157
}
136158
break;
137159

@@ -149,23 +171,20 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
149171
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
150172
iota_conf->milestone_depth = (int)strtol_temp;
151173
} else {
152-
ta_log_error("Malformed input or illegal input character\n");
174+
ta_log_error("Malformed input\n");
153175
}
154176
break;
155177
case MWM_CLI:
156178
strtol_temp = strtol(value, NULL, 10);
157179
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
158180
iota_conf->mwm = (int)strtol_temp;
159181
} else {
160-
ta_log_error("Malformed input or illegal input character\n");
182+
ta_log_error("Malformed input\n");
161183
}
162184
break;
163185
case SEED_CLI:
164186
iota_conf->seed = value;
165187
break;
166-
case CACHE:
167-
cache->cache_state = true;
168-
break;
169188

170189
// Quiet mode configuration
171190
case QUIET:
@@ -234,7 +253,7 @@ status_t ta_core_default_init(ta_core_t* const core) {
234253
}
235254
ta_conf->http_tpool_size = DEFAULT_HTTP_TPOOL_SIZE;
236255
ta_conf->proxy_passthrough = false;
237-
ta_conf->health_track_period = IRI_HEALTH_TRACK_PERIOD;
256+
ta_conf->health_track_period = HEALTH_TRACK_PERIOD;
238257
ta_conf->gtta = true;
239258
#ifdef MQTT_ENABLE
240259
ta_conf->mqtt_host = MQTT_HOST;
@@ -243,9 +262,10 @@ status_t ta_core_default_init(ta_core_t* const core) {
243262
ta_log_info("Initializing Redis information\n");
244263
cache->host = REDIS_HOST;
245264
cache->port = REDIS_PORT;
246-
cache->cache_state = false;
265+
cache->state = false;
247266
cache->buffer_list_name = BUFFER_LIST_NAME;
248267
cache->done_list_name = DONE_LIST_NAME;
268+
cache->capacity = CACHE_MAX_CAPACITY;
249269

250270
ta_log_info("Initializing IRI configuration\n");
251271
iota_conf->milestone_depth = MILESTONE_DEPTH;
@@ -415,7 +435,6 @@ status_t ta_core_cli_init(ta_core_t* const core, int argc, char** argv) {
415435
status_t ta_core_set(ta_core_t* core) {
416436
status_t ret = SC_OK;
417437

418-
ta_cache_t* const cache = &core->cache;
419438
iota_client_service_t* const iota_service = &core->iota_service;
420439
#ifdef DB_ENABLE
421440
db_client_service_t* const db_service = &core->db_service;
@@ -430,8 +449,6 @@ status_t ta_core_set(ta_core_t* core) {
430449
ta_log_info("Initializing PoW implementation context\n");
431450
pow_init();
432451

433-
ta_log_info("Initializing cache state\n");
434-
cache_init(cache->cache_state, cache->host, cache->port);
435452
#ifdef DB_ENABLE
436453
ta_log_info("Initializing db client service\n");
437454
if ((ret = db_client_service_init(db_service, DB_USAGE_REATTACH)) != SC_OK) {
@@ -452,7 +469,7 @@ void ta_core_destroy(ta_core_t* const core) {
452469
db_client_service_free(&core->db_service);
453470
#endif
454471
pow_destroy();
455-
cache_stop();
472+
cache_stop(&core->cache.rwlock);
456473
logger_helper_release(logger_id);
457474
br_logger_release();
458475
}

accelerator/config.h

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "cclient/serialization/json/json_serializer.h"
2424
#include "common/logger.h"
2525
#include "utils/cache/cache.h"
26+
#include "utils/handles/lock.h"
2627

2728
#define FILE_PATH_SIZE 128
2829

@@ -59,7 +60,8 @@ extern "C" {
5960
#define MAM_FILE_PREFIX "/tmp/mam_bin_XXXXXX"
6061
#define BUFFER_LIST_NAME "txn_buff_list"
6162
#define DONE_LIST_NAME "done_txn_buff_list"
62-
#define IRI_HEALTH_TRACK_PERIOD 1800 // Check every half hour in default
63+
#define CACHE_MAX_CAPACITY 170 * 1024 * 1024 // default to 170MB
64+
#define HEALTH_TRACK_PERIOD 1800 // Check every half hour in default
6365

6466
/** @name Redis connection config */
6567
/** @{ */
@@ -94,12 +96,14 @@ typedef struct iota_config_s {
9496

9597
/** struct type of accelerator cache */
9698
typedef struct ta_cache_s {
97-
char* host; /**< Binding address of redis server */
98-
uint64_t timeout; /**< Timeout for keys in redis */
99-
char* buffer_list_name; /**< Name of the list to buffer transactions */
100-
char* done_list_name; /**< Name of the list to store successfully broadcast transactions from buffer */
101-
uint16_t port; /**< Binding port of redis server */
102-
bool cache_state; /**< Set it true to turn on cache server */
99+
char* host; /**< Binding address of redis server */
100+
uint64_t timeout; /**< Timeout for keys in cache server */
101+
char* buffer_list_name; /**< Name of the list to buffer transactions */
102+
char* done_list_name; /**< Name of the list to store successfully broadcast transactions from buffer */
103+
uint16_t port; /**< Binding port of redis server */
104+
bool state; /**< Set it true to turn on cache server */
105+
long int capacity; /**< The maximum capacity of cache server */
106+
pthread_rwlock_t* rwlock; /**< Read/Write lock to avoid data racing in buffering */
103107
} ta_cache_t;
104108

105109
/** struct type of accelerator core */
@@ -108,6 +112,7 @@ typedef struct ta_core_s {
108112
ta_cache_t cache; /**< redis configuration structure */
109113
iota_config_t iota_conf; /**< iota configuration structure */
110114
iota_client_service_t iota_service; /**< iota connection structure */
115+
111116
#ifdef DB_ENABLE
112117
db_client_service_t db_service; /**< db connection structure */
113118
#endif

accelerator/core/apis.h

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -56,24 +56,6 @@ int apis_logger_release();
5656
status_t api_get_ta_info(ta_config_t* const info, iota_config_t* const tangle, ta_cache_t* const cache,
5757
char** json_result);
5858

59-
/**
60-
* Initialize lock
61-
*
62-
* @return
63-
* - zero on success
64-
* - SC_CONF_LOCK_INIT on error
65-
*/
66-
status_t apis_lock_init();
67-
68-
/**
69-
* Destroy lock
70-
*
71-
* @return
72-
* - zero on success
73-
* - SC_CONF_LOCK_DESTROY on error
74-
*/
75-
status_t apis_lock_destroy();
76-
7759
/**
7860
* @brief Generate an unused address.
7961
*

accelerator/core/core.c

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,11 @@ status_t broadcast_buffered_txn(const ta_core_t* const core) {
682682
goto done;
683683
}
684684

685+
if (uuid_list_len == 0) {
686+
ta_log_debug("No buffered requests\n");
687+
goto done;
688+
}
689+
685690
ret = cache_list_peek(core->cache.buffer_list_name, UUID_STR_LEN, uuid);
686691
if (ret) {
687692
ta_log_error("%s\n", ta_error_to_string(ret));
@@ -755,6 +760,11 @@ status_t broadcast_buffered_txn(const ta_core_t* const core) {
755760
}
756761
}
757762

763+
if (pthread_rwlock_trywrlock(core->cache.rwlock)) {
764+
ret = SC_CACHE_LOCK_FAILURE;
765+
ta_log_error("%s\n", ta_error_to_string(ret));
766+
goto done;
767+
}
758768
// Pop transaction from buffered list
759769
ret = cache_list_pop(core->cache.buffer_list_name, (char*)uuid);
760770
if (ret) {
@@ -768,6 +778,12 @@ status_t broadcast_buffered_txn(const ta_core_t* const core) {
768778
ta_log_error("%s\n", ta_error_to_string(ret));
769779
goto done;
770780
}
781+
if (pthread_rwlock_unlock(core->cache.rwlock)) {
782+
ret = SC_CACHE_LOCK_FAILURE;
783+
ta_log_error("%s\n", ta_error_to_string(ret));
784+
goto done;
785+
}
786+
771787
get_trytes_req_free(&req);
772788
get_trytes_res_free(&res);
773789
} while (!uuid_list_len);
@@ -782,15 +798,26 @@ status_t broadcast_buffered_txn(const ta_core_t* const core) {
782798
status_t ta_fetch_txn_with_uuid(const ta_cache_t* const cache, const char* const uuid,
783799
ta_fetch_txn_with_uuid_res_t* res) {
784800
status_t ret = SC_OK;
785-
char pop_uuid[UUID_STR_LEN];
801+
if (pthread_rwlock_tryrdlock(cache->rwlock)) {
802+
ret = SC_CACHE_LOCK_FAILURE;
803+
ta_log_error("%s\n", ta_error_to_string(ret));
804+
goto done;
805+
}
806+
786807
bool exist = false;
787808
ret = cache_list_exist(cache->buffer_list_name, uuid, UUID_STR_LEN - 1, &exist);
788809
if (ret) {
789810
ta_log_error("%s\n", ta_error_to_string(ret));
811+
if (pthread_rwlock_unlock(cache->rwlock)) {
812+
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE));
813+
}
790814
goto done;
791815
}
792816
if (exist) {
793817
res->status = UNSENT;
818+
if (pthread_rwlock_unlock(cache->rwlock)) {
819+
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE));
820+
}
794821
goto done;
795822
}
796823

@@ -808,6 +835,9 @@ status_t ta_fetch_txn_with_uuid(const ta_cache_t* const cache, const char* const
808835
ta_log_error("%s\n", ta_error_to_string(ret));
809836
goto done;
810837
}
838+
if (pthread_rwlock_unlock(cache->rwlock)) {
839+
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE));
840+
}
811841

812842
for (int i = 0; i < len; ++i) {
813843
flex_trit_t txn_flex_trits[NUM_FLEX_TRITS_SERIALIZED_TRANSACTION + 1];
@@ -833,11 +863,16 @@ status_t ta_fetch_txn_with_uuid(const ta_cache_t* const cache, const char* const
833863
goto done;
834864
}
835865

866+
char pop_uuid[UUID_STR_LEN];
836867
ret = cache_list_pop(cache->done_list_name, pop_uuid);
837868
if (ret) {
838869
ta_log_error("%s\n", ta_error_to_string(ret));
839870
goto done;
840871
}
872+
} else {
873+
if (pthread_rwlock_unlock(cache->rwlock)) {
874+
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE));
875+
}
841876
}
842877

843878
done:

accelerator/main.c

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ static void ta_stop(int signal) {
2222

2323
static void* health_track(void* arg) {
2424
ta_core_t* core = (ta_core_t*)arg;
25-
while (true) {
25+
while (core->cache.state) {
2626
status_t ret = ta_get_iri_status(&core->iota_service);
2727
if (ret == SC_CORE_IRI_UNSYNC || ret == SC_CCLIENT_FAILED_RESPONSE) {
2828
ta_log_error("IRI status error %d. Try to connect to another IRI host on priority list\n", ret);
@@ -40,6 +40,28 @@ static void* health_track(void* arg) {
4040
}
4141
}
4242

43+
char uuid[UUID_STR_LEN] = {};
44+
// The usage exceeds the maximum redis capacity
45+
while (core->cache.capacity < cache_occupied_space()) {
46+
if (pthread_rwlock_trywrlock(core->cache.rwlock)) {
47+
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE));
48+
break;
49+
}
50+
51+
ret = cache_list_pop(core->cache.done_list_name, uuid);
52+
if (ret) {
53+
ta_log_error("%s\n", ta_error_to_string(ret));
54+
}
55+
ret = cache_del(uuid);
56+
if (ret) {
57+
ta_log_error("%s\n", ta_error_to_string(ret));
58+
}
59+
60+
if (pthread_rwlock_unlock(core->cache.rwlock)) {
61+
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE));
62+
}
63+
}
64+
4365
sleep(core->ta_conf.health_track_period);
4466
}
4567
return ((void*)NULL);
@@ -78,7 +100,7 @@ int main(int argc, char* argv[]) {
78100
}
79101

80102
pthread_t thread;
81-
pthread_create(&thread, NULL, health_track, &ta_core);
103+
pthread_create(&thread, NULL, health_track, (void*)&ta_core);
82104

83105
if (ta_http_init(&ta_http, &ta_core) != SC_OK) {
84106
ta_log_error("HTTP initialization failed %s.\n", MAIN_LOGGER);

common/ta_errors.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ typedef enum {
124124
/**< Failed in cache operations */
125125
SC_CACHE_OFF = 0x03 | SC_MODULE_CACHE | SC_SEVERITY_MINOR,
126126
/**< Cache server is not turned on */
127+
SC_CACHE_INIT_FINI = 0x04 | SC_MODULE_CACHE | SC_SEVERITY_FATAL,
128+
/**< Failed to initialize or destroy lock in cache */
129+
SC_CACHE_LOCK_FAILURE = 0x05 | SC_MODULE_CACHE | SC_SEVERITY_FATAL,
130+
/**< Failed to lock or unlock cache operations */
127131

128132
// MAM module
129133
SC_MAM_NULL = 0x01 | SC_MODULE_MAM | SC_SEVERITY_FATAL,

0 commit comments

Comments
 (0)