Skip to content
This repository was archived by the owner on Dec 26, 2022. It is now read-only.

Commit efa87d3

Browse files
authored
Merge pull request #514 from HowJMay/redis_cap
feat(cache): Check cache capacity
2 parents 2100122 + 415151f commit efa87d3

File tree

11 files changed

+251
-102
lines changed

11 files changed

+251
-102
lines changed

accelerator/cli_info.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ typedef enum ta_cli_arg_value_e {
5454
BUFFER_LIST,
5555
DONE_LIST,
5656
HTTP_THREADS_CLI,
57+
CACHE_CAPACITY,
5758

5859
/** LOGGER */
5960
QUIET,
@@ -83,14 +84,15 @@ static struct ta_cli_argument_s {
8384
{"milestone_depth", optional_argument, NULL, MILESTONE_DEPTH_CLI, "IRI milestone depth"},
8485
{"mwm", optional_argument, NULL, MWM_CLI, "minimum weight magnitude"},
8586
{"seed", optional_argument, NULL, SEED_CLI, "IOTA seed"},
86-
{"cache", no_argument, NULL, CACHE, "Enable cache server"},
87+
{"cache", required_argument, NULL, CACHE, "Enable/Disable cache server. It defaults to off"},
8788
{"config", required_argument, NULL, CONF_CLI, "Read configuration file"},
8889
{"proxy_passthrough", no_argument, NULL, PROXY_API, "Pass proxy API directly to IRI without processing"},
8990
{"health_track_period", no_argument, NULL, HEALTH_TRACK_PERIOD,
9091
"The period for checking IRI host connection status"},
9192
{"no-gtta", no_argument, NULL, NO_GTTA, "Disable getTransactionToConfirm (gTTA) when sending transaction"},
9293
{"buffer_list", required_argument, NULL, BUFFER_LIST, "Set the value of `buffer_list_name`"},
9394
{"done_list", required_argument, NULL, DONE_LIST, "Set the value of `done_list_name`"},
95+
{"cache_capacity", required_argument, NULL, CACHE_CAPACITY, "Set the maximum capacity of caching server"},
9496
{"quiet", no_argument, NULL, QUIET, "Disable logger"},
9597
{NULL, 0, NULL, 0, NULL}};
9698

accelerator/config.c

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
6363
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
6464
ta_conf->port = (int)strtol_temp;
6565
} else {
66-
ta_log_error("Malformed input or illegal input character\n");
66+
ta_log_error("Malformed input\n");
6767
}
6868
break;
6969
case HTTP_THREADS_CLI:
@@ -79,7 +79,7 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
7979
ta_conf->http_tpool_size = (uint8_t)strtol_temp;
8080
}
8181
} else {
82-
ta_log_error("Malformed input or illegal input character\n");
82+
ta_log_error("Malformed input\n");
8383
}
8484
break;
8585

@@ -98,7 +98,7 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
9898
if (strtol_p != p && errno != ERANGE && strtol_temp >= 0 && strtol_temp <= USHRT_MAX) {
9999
ta_conf->iota_port_list[idx] = (uint16_t)strtol_temp;
100100
} else {
101-
ta_log_error("Malformed input or illegal input character\n");
101+
ta_log_error("Malformed input\n");
102102
}
103103
}
104104
iota_service->http.port = ta_conf->iota_port_list[0];
@@ -108,9 +108,19 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
108108
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
109109
ta_conf->health_track_period = (int)strtol_temp;
110110
} else {
111-
ta_log_error("Malformed input or illegal input character\n");
111+
ta_log_error("Malformed input\n");
112112
}
113113
break;
114+
case CACHE:
115+
ta_log_info("Initializing cache state\n");
116+
cache->state = !cache->state;
117+
if (cache->state) {
118+
if (cache_init(&cache->rwlock, cache->state, cache->host, cache->port)) {
119+
ta_log_error("%s\n", "Failed to initialize lock to caching service.");
120+
}
121+
}
122+
123+
break;
114124

115125
#ifdef MQTT_ENABLE
116126
// MQTT configuration
@@ -131,7 +141,19 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
131141
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
132142
cache->port = (int)strtol_temp;
133143
} else {
134-
ta_log_error("Malformed input or illegal input character\n");
144+
ta_log_error("Malformed input character\n");
145+
}
146+
break;
147+
case CACHE_MAX_CAPACITY:
148+
strtol_temp = strtol(value, NULL, 10);
149+
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
150+
if (strtol_temp <= 0) {
151+
ta_log_error("The capacity of caching service should greater then 0.\n");
152+
break;
153+
}
154+
cache->capacity = strtol_temp;
155+
} else {
156+
ta_log_error("Malformed input\n");
135157
}
136158
break;
137159

@@ -149,23 +171,20 @@ status_t cli_core_set(ta_core_t* const core, int key, char* const value) {
149171
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
150172
iota_conf->milestone_depth = (int)strtol_temp;
151173
} else {
152-
ta_log_error("Malformed input or illegal input character\n");
174+
ta_log_error("Malformed input\n");
153175
}
154176
break;
155177
case MWM_CLI:
156178
strtol_temp = strtol(value, NULL, 10);
157179
if (strtol_p != value && errno != ERANGE && strtol_temp >= INT_MIN && strtol_temp <= INT_MAX) {
158180
iota_conf->mwm = (int)strtol_temp;
159181
} else {
160-
ta_log_error("Malformed input or illegal input character\n");
182+
ta_log_error("Malformed input\n");
161183
}
162184
break;
163185
case SEED_CLI:
164186
iota_conf->seed = value;
165187
break;
166-
case CACHE:
167-
cache->cache_state = true;
168-
break;
169188

170189
// Quiet mode configuration
171190
case QUIET:
@@ -234,7 +253,7 @@ status_t ta_core_default_init(ta_core_t* const core) {
234253
}
235254
ta_conf->http_tpool_size = DEFAULT_HTTP_TPOOL_SIZE;
236255
ta_conf->proxy_passthrough = false;
237-
ta_conf->health_track_period = IRI_HEALTH_TRACK_PERIOD;
256+
ta_conf->health_track_period = HEALTH_TRACK_PERIOD;
238257
ta_conf->gtta = true;
239258
#ifdef MQTT_ENABLE
240259
ta_conf->mqtt_host = MQTT_HOST;
@@ -243,9 +262,10 @@ status_t ta_core_default_init(ta_core_t* const core) {
243262
ta_log_info("Initializing Redis information\n");
244263
cache->host = REDIS_HOST;
245264
cache->port = REDIS_PORT;
246-
cache->cache_state = false;
265+
cache->state = false;
247266
cache->buffer_list_name = BUFFER_LIST_NAME;
248267
cache->done_list_name = DONE_LIST_NAME;
268+
cache->capacity = CACHE_MAX_CAPACITY;
249269

250270
ta_log_info("Initializing IRI configuration\n");
251271
iota_conf->milestone_depth = MILESTONE_DEPTH;
@@ -415,7 +435,6 @@ status_t ta_core_cli_init(ta_core_t* const core, int argc, char** argv) {
415435
status_t ta_core_set(ta_core_t* core) {
416436
status_t ret = SC_OK;
417437

418-
ta_cache_t* const cache = &core->cache;
419438
iota_client_service_t* const iota_service = &core->iota_service;
420439
#ifdef DB_ENABLE
421440
db_client_service_t* const db_service = &core->db_service;
@@ -430,8 +449,6 @@ status_t ta_core_set(ta_core_t* core) {
430449
ta_log_info("Initializing PoW implementation context\n");
431450
pow_init();
432451

433-
ta_log_info("Initializing cache state\n");
434-
cache_init(cache->cache_state, cache->host, cache->port);
435452
#ifdef DB_ENABLE
436453
ta_log_info("Initializing db client service\n");
437454
if ((ret = db_client_service_init(db_service, DB_USAGE_REATTACH)) != SC_OK) {
@@ -452,7 +469,7 @@ void ta_core_destroy(ta_core_t* const core) {
452469
db_client_service_free(&core->db_service);
453470
#endif
454471
pow_destroy();
455-
cache_stop();
472+
cache_stop(&core->cache.rwlock);
456473
logger_helper_release(logger_id);
457474
br_logger_release();
458475
}

accelerator/config.h

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "cclient/serialization/json/json_serializer.h"
2424
#include "common/logger.h"
2525
#include "utils/cache/cache.h"
26+
#include "utils/handles/lock.h"
2627

2728
#define FILE_PATH_SIZE 128
2829

@@ -59,7 +60,8 @@ extern "C" {
5960
#define MAM_FILE_PREFIX "/tmp/mam_bin_XXXXXX"
6061
#define BUFFER_LIST_NAME "txn_buff_list"
6162
#define DONE_LIST_NAME "done_txn_buff_list"
62-
#define IRI_HEALTH_TRACK_PERIOD 1800 // Check every half hour in default
63+
#define CACHE_MAX_CAPACITY 170 * 1024 * 1024 // default to 170MB
64+
#define HEALTH_TRACK_PERIOD 1800 // Check every half hour in default
6365

6466
/** @name Redis connection config */
6567
/** @{ */
@@ -94,12 +96,14 @@ typedef struct iota_config_s {
9496

9597
/** struct type of accelerator cache */
9698
typedef struct ta_cache_s {
97-
char* host; /**< Binding address of redis server */
98-
uint64_t timeout; /**< Timeout for keys in redis */
99-
char* buffer_list_name; /**< Name of the list to buffer transactions */
100-
char* done_list_name; /**< Name of the list to store successfully broadcast transactions from buffer */
101-
uint16_t port; /**< Binding port of redis server */
102-
bool cache_state; /**< Set it true to turn on cache server */
99+
char* host; /**< Binding address of redis server */
100+
uint64_t timeout; /**< Timeout for keys in cache server */
101+
char* buffer_list_name; /**< Name of the list to buffer transactions */
102+
char* done_list_name; /**< Name of the list to store successfully broadcast transactions from buffer */
103+
uint16_t port; /**< Binding port of redis server */
104+
bool state; /**< Set it true to turn on cache server */
105+
long int capacity; /**< The maximum capacity of cache server */
106+
pthread_rwlock_t* rwlock; /**< Read/Write lock to avoid data racing in buffering */
103107
} ta_cache_t;
104108

105109
/** struct type of accelerator core */
@@ -108,6 +112,7 @@ typedef struct ta_core_s {
108112
ta_cache_t cache; /**< redis configuration structure */
109113
iota_config_t iota_conf; /**< iota configuration structure */
110114
iota_client_service_t iota_service; /**< iota connection structure */
115+
111116
#ifdef DB_ENABLE
112117
db_client_service_t db_service; /**< db connection structure */
113118
#endif

accelerator/core/apis.h

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -56,24 +56,6 @@ int apis_logger_release();
5656
status_t api_get_ta_info(ta_config_t* const info, iota_config_t* const tangle, ta_cache_t* const cache,
5757
char** json_result);
5858

59-
/**
60-
* Initialize lock
61-
*
62-
* @return
63-
* - zero on success
64-
* - SC_CONF_LOCK_INIT on error
65-
*/
66-
status_t apis_lock_init();
67-
68-
/**
69-
* Destroy lock
70-
*
71-
* @return
72-
* - zero on success
73-
* - SC_CONF_LOCK_DESTROY on error
74-
*/
75-
status_t apis_lock_destroy();
76-
7759
/**
7860
* @brief Generate an unused address.
7961
*

accelerator/core/core.c

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,11 @@ status_t broadcast_buffered_txn(const ta_core_t* const core) {
682682
goto done;
683683
}
684684

685+
if (uuid_list_len == 0) {
686+
ta_log_debug("No buffered requests\n");
687+
goto done;
688+
}
689+
685690
ret = cache_list_peek(core->cache.buffer_list_name, UUID_STR_LEN, uuid);
686691
if (ret) {
687692
ta_log_error("%s\n", ta_error_to_string(ret));
@@ -755,6 +760,11 @@ status_t broadcast_buffered_txn(const ta_core_t* const core) {
755760
}
756761
}
757762

763+
if (pthread_rwlock_trywrlock(core->cache.rwlock)) {
764+
ret = SC_CACHE_LOCK_FAILURE;
765+
ta_log_error("%s\n", ta_error_to_string(ret));
766+
goto done;
767+
}
758768
// Pop transaction from buffered list
759769
ret = cache_list_pop(core->cache.buffer_list_name, (char*)uuid);
760770
if (ret) {
@@ -768,6 +778,12 @@ status_t broadcast_buffered_txn(const ta_core_t* const core) {
768778
ta_log_error("%s\n", ta_error_to_string(ret));
769779
goto done;
770780
}
781+
if (pthread_rwlock_unlock(core->cache.rwlock)) {
782+
ret = SC_CACHE_LOCK_FAILURE;
783+
ta_log_error("%s\n", ta_error_to_string(ret));
784+
goto done;
785+
}
786+
771787
get_trytes_req_free(&req);
772788
get_trytes_res_free(&res);
773789
} while (!uuid_list_len);
@@ -782,15 +798,26 @@ status_t broadcast_buffered_txn(const ta_core_t* const core) {
782798
status_t ta_fetch_txn_with_uuid(const ta_cache_t* const cache, const char* const uuid,
783799
ta_fetch_txn_with_uuid_res_t* res) {
784800
status_t ret = SC_OK;
785-
char pop_uuid[UUID_STR_LEN];
801+
if (pthread_rwlock_tryrdlock(cache->rwlock)) {
802+
ret = SC_CACHE_LOCK_FAILURE;
803+
ta_log_error("%s\n", ta_error_to_string(ret));
804+
goto done;
805+
}
806+
786807
bool exist = false;
787808
ret = cache_list_exist(cache->buffer_list_name, uuid, UUID_STR_LEN - 1, &exist);
788809
if (ret) {
789810
ta_log_error("%s\n", ta_error_to_string(ret));
811+
if (pthread_rwlock_unlock(cache->rwlock)) {
812+
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE));
813+
}
790814
goto done;
791815
}
792816
if (exist) {
793817
res->status = UNSENT;
818+
if (pthread_rwlock_unlock(cache->rwlock)) {
819+
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE));
820+
}
794821
goto done;
795822
}
796823

@@ -808,6 +835,9 @@ status_t ta_fetch_txn_with_uuid(const ta_cache_t* const cache, const char* const
808835
ta_log_error("%s\n", ta_error_to_string(ret));
809836
goto done;
810837
}
838+
if (pthread_rwlock_unlock(cache->rwlock)) {
839+
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE));
840+
}
811841

812842
for (int i = 0; i < len; ++i) {
813843
flex_trit_t txn_flex_trits[NUM_FLEX_TRITS_SERIALIZED_TRANSACTION + 1];
@@ -833,11 +863,16 @@ status_t ta_fetch_txn_with_uuid(const ta_cache_t* const cache, const char* const
833863
goto done;
834864
}
835865

866+
char pop_uuid[UUID_STR_LEN];
836867
ret = cache_list_pop(cache->done_list_name, pop_uuid);
837868
if (ret) {
838869
ta_log_error("%s\n", ta_error_to_string(ret));
839870
goto done;
840871
}
872+
} else {
873+
if (pthread_rwlock_unlock(cache->rwlock)) {
874+
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE));
875+
}
841876
}
842877

843878
done:

accelerator/main.c

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ static void ta_stop(int signal) {
2222

2323
static void* health_track(void* arg) {
2424
ta_core_t* core = (ta_core_t*)arg;
25-
while (true) {
25+
while (core->cache.state) {
2626
status_t ret = ta_get_iri_status(&core->iota_service);
2727
if (ret == SC_CORE_IRI_UNSYNC || ret == SC_CCLIENT_FAILED_RESPONSE) {
2828
ta_log_error("IRI status error %d. Try to connect to another IRI host on priority list\n", ret);
@@ -40,6 +40,28 @@ static void* health_track(void* arg) {
4040
}
4141
}
4242

43+
char uuid[UUID_STR_LEN] = {};
44+
// The usage exceeds the maximum redis capacity
45+
while (core->cache.capacity < cache_occupied_space()) {
46+
if (pthread_rwlock_trywrlock(core->cache.rwlock)) {
47+
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE));
48+
break;
49+
}
50+
51+
ret = cache_list_pop(core->cache.done_list_name, uuid);
52+
if (ret) {
53+
ta_log_error("%s\n", ta_error_to_string(ret));
54+
}
55+
ret = cache_del(uuid);
56+
if (ret) {
57+
ta_log_error("%s\n", ta_error_to_string(ret));
58+
}
59+
60+
if (pthread_rwlock_unlock(core->cache.rwlock)) {
61+
ta_log_error("%s\n", ta_error_to_string(SC_CACHE_LOCK_FAILURE));
62+
}
63+
}
64+
4365
sleep(core->ta_conf.health_track_period);
4466
}
4567
return ((void*)NULL);
@@ -78,7 +100,7 @@ int main(int argc, char* argv[]) {
78100
}
79101

80102
pthread_t thread;
81-
pthread_create(&thread, NULL, health_track, &ta_core);
103+
pthread_create(&thread, NULL, health_track, (void*)&ta_core);
82104

83105
if (ta_http_init(&ta_http, &ta_core) != SC_OK) {
84106
ta_log_error("HTTP initialization failed %s.\n", MAIN_LOGGER);

common/ta_errors.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ typedef enum {
124124
/**< Failed in cache operations */
125125
SC_CACHE_OFF = 0x03 | SC_MODULE_CACHE | SC_SEVERITY_MINOR,
126126
/**< Cache server is not turned on */
127+
SC_CACHE_INIT_FINI = 0x04 | SC_MODULE_CACHE | SC_SEVERITY_FATAL,
128+
/**< Failed to initialize or destroy lock in cache */
129+
SC_CACHE_LOCK_FAILURE = 0x05 | SC_MODULE_CACHE | SC_SEVERITY_FATAL,
130+
/**< Failed to lock or unlock cache operations */
127131

128132
// MAM module
129133
SC_MAM_NULL = 0x01 | SC_MODULE_MAM | SC_SEVERITY_FATAL,

0 commit comments

Comments
 (0)