Skip to content
This repository was archived by the owner on Dec 26, 2022. It is now read-only.

Commit b073b8c

Browse files
authored
Merge pull request #265 from HowJMay/mqtt_request
feat(mqtt): Implement MQTT request handler
2 parents 5b38c01 + e5ab76d commit b073b8c

File tree

16 files changed

+324
-25
lines changed

16 files changed

+324
-25
lines changed

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,16 @@ Use the `--host_force_python=PY2` parameter to force the Bazel to use the Python
121121
$ make && bazel run //accelerator:push_docker --host_force_python=PY2
122122
```
123123

124+
### Optional: Enable MQTT connectivity
125+
MQTT connectivity is an optional feature allowing IoT endpoint devices to collaborate with `Tangle-Accelerator`.
126+
127+
```
128+
make && bazel run //accelerator_mqtt
129+
```
130+
131+
Note you may need to set up the `MQTT_HOST` and `TOPIC_ROOT` in `config.h` to connect to a MQTT broker.
132+
For more information for MQTT connectivity of `tangle-accelerator`, you could read `connectivity/mqtt/usage.md`.
133+
124134
## Developing
125135

126136
The codebase of this repository follows [Google's C++ guidelines](https://google.github.io/styleguide/cppguide.html):

accelerator/BUILD

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,23 @@ cc_binary(
2929

3030
cc_binary(
3131
name = "accelerator_mqtt",
32-
srcs = ["mqtt_interface.c"],
32+
srcs = [
33+
"config.c",
34+
"config.h",
35+
"conn_mqtt.c",
36+
],
3337
copts = [
3438
"-DLOGGER_ENABLE",
3539
"-DENABLE_MQTT",
3640
],
3741
deps = [
42+
":message",
3843
":ta_config",
3944
":ta_errors",
4045
"//connectivity/mqtt:mqtt_utils",
46+
"//utils:cache",
47+
"//utils:pow",
48+
"@entangled//cclient/api",
4149
"@entangled//utils/handles:signal",
4250
],
4351
)

accelerator/config.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,10 @@ status_t ta_config_default_init(ta_config_t* const info, iota_config_t* const ic
9292
info->host = TA_HOST;
9393
info->port = TA_PORT;
9494
info->thread_count = TA_THREAD_COUNT;
95-
95+
#ifdef ENABLE_MQTT
96+
info->mqtt_host = MQTT_HOST;
97+
info->mqtt_topic_root = TOPIC_ROOT;
98+
#endif
9699
log_info(config_logger_id, "Initializing Redis information\n");
97100
cache->host = REDIS_HOST;
98101
cache->port = REDIS_PORT;

accelerator/config.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ extern "C" {
2828

2929
#define TA_VERSION "tangle-accelerator/0.5.0"
3030
#define TA_HOST "localhost"
31+
3132
#ifdef ENABLE_MQTT
3233
#define MQTT_HOST "localhost"
3334
#define TOPIC_ROOT "root/topics"
3435
#endif
36+
3537
#define TA_PORT "8000"
3638
#define TA_THREAD_COUNT 10
3739
#define IRI_HOST "localhost"
@@ -56,6 +58,10 @@ typedef struct ta_info_s {
5658
char* host; /**< Binding address of tangle-accelerator */
5759
char* port; /**< Binding port of tangle-accelerator */
5860
uint8_t thread_count; /**< Thread count of tangle-accelerator instance */
61+
#ifdef ENABLE_MQTT
62+
char* mqtt_host; /**< Address of MQTT broker host */
63+
char* mqtt_topic_root; /**< The topic root of MQTT topic */
64+
#endif
5965
} ta_config_t;
6066

6167
/** struct type of iota configuration */

accelerator/mqtt_interface.c renamed to accelerator/conn_mqtt.c

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1+
#include "accelerator/config.h"
12
#include "config.h"
23
#include "connectivity/mqtt/client_common.h"
34
#include "connectivity/mqtt/duplex_callback.h"
45
#include "connectivity/mqtt/duplex_utils.h"
56
#include "errors.h"
67

7-
#define MQTT_INTERFACE_LOGGER "mqtt-interface"
8+
#define CONN_MQTT_LOGGER "conn-mqtt"
89

10+
ta_core_t ta_core;
911
static logger_id_t mqtt_logger_id;
1012

1113
int main(int argc, char *argv[]) {
@@ -18,7 +20,22 @@ int main(int argc, char *argv[]) {
1820
return EXIT_FAILURE;
1921
}
2022

21-
mqtt_logger_id = logger_helper_enable(MQTT_INTERFACE_LOGGER, LOGGER_DEBUG, true);
23+
mqtt_logger_id = logger_helper_enable(CONN_MQTT_LOGGER, LOGGER_DEBUG, true);
24+
25+
// Initialize configurations with default value
26+
if (ta_config_default_init(&ta_core.info, &ta_core.iconf, &ta_core.cache, &ta_core.service) != SC_OK) {
27+
return EXIT_FAILURE;
28+
}
29+
30+
// Initialize configurations with CLI value
31+
if (ta_config_cli_init(&ta_core, argc, argv) != SC_OK) {
32+
return EXIT_FAILURE;
33+
}
34+
35+
if (ta_config_set(&ta_core.cache, &ta_core.service) != SC_OK) {
36+
log_critical(mqtt_logger_id, "[%s:%d] Configure failed %s.\n", __func__, __LINE__, CONN_MQTT_LOGGER);
37+
return EXIT_FAILURE;
38+
}
2239

2340
if (verbose_mode) {
2441
mqtt_utils_logger_init();
@@ -36,6 +53,7 @@ int main(int argc, char *argv[]) {
3653
// if we want to opertate this program under multi-threading, see https://github.com/eclipse/mosquitto/issues/450
3754
ret = duplex_config_init(&mosq, &cfg);
3855
if (ret != SC_OK) {
56+
log_critical(mqtt_logger_id, "[%s:%d:%d]\n", __func__, __LINE__, ret);
3957
goto done;
4058
}
4159

@@ -47,18 +65,22 @@ int main(int argc, char *argv[]) {
4765
// gossip_channel_set(&cfg, MQTT_HOST, "NB/test/room1", "NB/test/room2");
4866

4967
// Set the configures and message for testing
50-
ret = gossip_api_channels_set(&cfg, MQTT_HOST, TOPIC_ROOT);
68+
ret = gossip_api_channels_set(&cfg, ta_core.info.mqtt_host, ta_core.info.mqtt_topic_root);
5169
if (ret != SC_OK) {
70+
log_critical(mqtt_logger_id, "[%s:%d:%d]\n", __func__, __LINE__, ret);
5271
goto done;
5372
}
5473

5574
// Set cfg as `userdata` field of `mosq` which allows the callback functions to use `cfg`.
5675
mosquitto_user_data_set(mosq, &cfg);
5776

77+
log_info(mqtt_logger_id, "Starting...\n");
78+
5879
// Start listening subscribing topics, once we received a message from the listening topics, we can send corresponding
5980
// message.
60-
// if we need to take the above task forever, just put it in a infinite loop.
6181
do {
82+
// TODO Use logger to log some important steps in processing requests.
83+
log_info(mqtt_logger_id, "Listening new requests.\n");
6284
ret = duplex_client_start(mosq, &cfg);
6385
} while (!ret);
6486

connectivity/mqtt/BUILD

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@ cc_library(
88
"duplex_callback.h",
99
"duplex_utils.h",
1010
],
11+
copts = ["-DENABLE_MQTT"],
1112
visibility = ["//visibility:public"],
1213
deps = [
1314
":mqtt_common",
15+
"//accelerator:apis",
16+
"//accelerator:common_core",
1417
"//accelerator:ta_errors",
18+
"@entangled//common/model:transaction",
1519
],
1620
)
1721

@@ -27,6 +31,7 @@ cc_library(
2731
"pub_utils.h",
2832
"sub_utils.h",
2933
],
34+
copts = ["-DENABLE_MQTT"],
3035
deps = [
3136
"//accelerator:ta_config",
3237
"//accelerator:ta_errors",

connectivity/mqtt/client_common.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ extern "C" {
2222
*/
2323

2424
#define ID_LEN 32
25-
#define API_NUM 9
25+
#define API_NUM 7
2626

2727
typedef enum client_type_s { client_pub, client_sub, client_duplex } client_type_t;
2828

connectivity/mqtt/duplex_callback.c

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88

99
#include "duplex_callback.h"
1010
#include <stdlib.h>
11-
#include "utils/logger_helper.h"
11+
#include <string.h>
1212

1313
#define MQTT_CALLBACK_LOGGER "mqtt-callback"
1414
static logger_id_t mqtt_callback_logger_id;
1515

16+
extern ta_core_t ta_core;
17+
1618
void mqtt_callback_logger_init() {
1719
mqtt_callback_logger_id = logger_helper_enable(MQTT_CALLBACK_LOGGER, LOGGER_DEBUG, true);
1820
}
@@ -28,18 +30,74 @@ int mqtt_callback_logger_release() {
2830
return 0;
2931
}
3032

31-
static status_t mqtt_request_handler(mosq_config_t *cfg, char *sub_topic, char *req) {
33+
static status_t mqtt_request_handler(mosq_config_t *cfg, char *subscribe_topic, char *req) {
34+
if (cfg == NULL || subscribe_topic == NULL || req == NULL) {
35+
return SC_MQTT_NULL;
36+
}
37+
3238
status_t ret = SC_OK;
33-
// TODO: process MQTT requests here. Deserialize the request and process it against corresponding api_* functions
39+
char *json_result = NULL;
40+
char device_id[ID_LEN];
41+
42+
// get the Device ID.
43+
ret = mqtt_device_id_deserialize(req, device_id);
44+
if (ret != SC_OK) {
45+
log_error(mqtt_callback_logger_id, "[%s:%d:%d]\n", __func__, __LINE__, ret);
46+
goto done;
47+
}
3448

35-
// after finishing processing the request, set the response message with the following functions
49+
char *api_sub_topic = subscribe_topic + strlen(ta_core.info.mqtt_topic_root);
50+
char *p;
51+
if (!strncmp(api_sub_topic, "address", 7)) {
52+
ret = api_generate_address(&ta_core.iconf, &ta_core.service, &json_result);
53+
} else if (p = strstr(api_sub_topic, "tag")) {
54+
if (!strncmp(p + 4, "hashes", 6)) {
55+
char tag[NUM_TRYTES_TAG + 1];
56+
mqtt_tag_req_deserialize(req, tag);
57+
ret = api_find_transactions_by_tag(&ta_core.service, tag, &json_result);
58+
} else if (!strncmp(p + 4, "object", 6)) {
59+
char tag[NUM_TRYTES_TAG + 1];
60+
mqtt_tag_req_deserialize(req, tag);
61+
ret = api_find_transactions_obj_by_tag(&ta_core.service, tag, &json_result);
62+
}
63+
} else if (p = strstr(api_sub_topic, "transaction")) {
64+
if (!strncmp(p + 12, "object", 6)) {
65+
char hash[NUM_TRYTES_HASH + 1];
66+
mqtt_transaction_hash_req_deserialize(req, hash);
67+
ret = api_find_transaction_object_single(&ta_core.service, hash, &json_result);
68+
} else if (!strncmp(p + 12, "send", 4)) {
69+
ret = api_send_transfer(&ta_core.iconf, &ta_core.service, req, &json_result);
70+
}
71+
} else if (p = strstr(api_sub_topic, "tips")) {
72+
if (!strncmp(p + 5, "all", 3)) {
73+
ret = api_get_tips(&ta_core.service, &json_result);
74+
} else if (!strncmp(p + 5, "pair", 4)) {
75+
ret = api_get_tips_pair(&ta_core.iconf, &ta_core.service, &json_result);
76+
}
77+
}
78+
if (ret != SC_OK) {
79+
log_error(mqtt_callback_logger_id, "[%s:%d:%d]\n", __func__, __LINE__, ret);
80+
goto done;
81+
}
3682

37-
// 1. Set response publishing topic with the topic we got message and the Device ID (client ID) we got in the message/
38-
// gossip_channel_set()
83+
// Set response publishing topic with the topic we got message and the Device ID (client ID) we got in the message
84+
int res_topic_len = strlen(subscribe_topic) + 1 + ID_LEN + 1;
85+
char *res_topic = (char *)malloc(res_topic_len);
86+
snprintf(res_topic, res_topic_len, "%s/%s", subscribe_topic, device_id);
87+
ret = gossip_channel_set(cfg, NULL, NULL, res_topic);
88+
if (ret != SC_OK) {
89+
log_error(mqtt_callback_logger_id, "[%s:%d:%d]\n", __func__, __LINE__, ret);
90+
goto done;
91+
}
3992

40-
// 2. Set recv_message as publishing message
41-
// gossip_message_set(cfg, cfg->sub_config->recv_message);
93+
// Set recv_message as publishing message
94+
ret = gossip_message_set(cfg, json_result);
95+
if (ret != SC_OK) {
96+
log_error(mqtt_callback_logger_id, "[%s:%d:%d]\n", __func__, __LINE__, ret);
97+
}
4298

99+
done:
100+
free(json_result);
43101
return ret;
44102
}
45103

@@ -57,11 +115,6 @@ static void message_callback_duplex_func(struct mosquitto *mosq, void *obj, cons
57115
// Process received requests
58116
mqtt_request_handler(cfg, message->topic, cfg->sub_config->recv_message);
59117

60-
// After running `message_callback_duplex_func()`, the message will be sent by the `publish_loop()` in
61-
// `duplex_client_start()`
62-
// printf("listening: %s \n", message->topic);
63-
// printf("recv msg: %s \n", cfg->sub_config->recv_message);
64-
65118
// The following one line is used for testing if this server work fine with requests with given topics.
66119
// Uncomment it if it is necessitated
67120
// gossip_message_set(cfg, cfg->sub_config->recv_message);
@@ -88,7 +141,7 @@ static void subscribe_callback_duplex_func(struct mosquitto *mosq, void *obj, in
88141
}
89142

90143
static void log_callback_duplex_func(struct mosquitto *mosq, void *obj, int level, const char *str) {
91-
log_error(mqtt_callback_logger_id, "log: %s\n", str);
144+
log_info(mqtt_callback_logger_id, "log: %s\n", str);
92145
}
93146

94147
status_t duplex_callback_func_set(struct mosquitto *mosq) {
@@ -103,4 +156,6 @@ status_t duplex_callback_func_set(struct mosquitto *mosq) {
103156
mosquitto_disconnect_v5_callback_set(mosq, disconnect_callback_duplex_func);
104157
mosquitto_publish_v5_callback_set(mosq, publish_callback_duplex_func);
105158
mosquitto_message_v5_callback_set(mosq, message_callback_duplex_func);
159+
160+
return SC_OK;
106161
}

connectivity/mqtt/duplex_callback.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@
99
#ifndef DUPLEX_CALLBACK_H
1010
#define DUPLEX_CALLBACK_H
1111

12+
#include "accelerator/apis.h"
13+
#include "accelerator/common_core.h"
14+
#include "common/model/transaction.h"
1215
#include "duplex_utils.h"
16+
#include "serializer/serializer.h"
17+
#include "utils/logger_helper.h"
1318

1419
#ifdef __cplusplus
1520
extern "C" {

connectivity/mqtt/duplex_utils.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,8 @@ status_t gossip_api_channels_set(mosq_config_t *channel_cfg, char *host, char *r
104104
char *sub_topic = NULL;
105105
int sub_topic_len, api_name_len;
106106
int root_path_len = strlen(root_path);
107-
char *api_names[API_NUM] = {"generate_address", "get_tips_pair", "get_tips", "receive_mam_message",
108-
"mam_send_message", "send_transfer", "find_transactions", "find_transaction_objects",
109-
"send_trytes"};
107+
char *api_names[API_NUM] = {"address", "tag/hashes", "tag/object", "transaction/object",
108+
"transaction/send", "tips/all", "tips/pair"};
110109

111110
for (int i = 0; i < API_NUM; i++) {
112111
api_name_len = strlen(api_names[i]);

0 commit comments

Comments
 (0)