1
+ import time
2
+ import re
1
3
import sys
2
4
import json
5
+ import string
3
6
import random
4
7
import logging
5
8
import requests
6
9
import statistics
7
10
import subprocess
8
11
import argparse
12
+ import paho .mqtt .publish as publish
13
+ import paho .mqtt .subscribe as subscribe
14
+ from multiprocessing import Pool
15
+ from multiprocessing .context import TimeoutError
9
16
10
17
TIMES_TOTAL = 100
11
18
TIMEOUT = 100 # [sec]
19
+ MQTT_RECV_TIMEOUT = 30
12
20
STATUS_CODE_500 = "500"
13
21
STATUS_CODE_405 = "405"
14
22
STATUS_CODE_404 = "404"
15
23
STATUS_CODE_400 = "400"
16
24
STATUS_CODE_200 = "200"
25
+ STATUS_CODE_ERR = "-1"
17
26
EMPTY_REPLY = "000"
18
27
LEN_TAG = 27
19
28
LEN_ADDR = 81
20
29
LEN_MSG_SIGN = 2187
21
30
TRYTE_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ9"
22
31
URL = ""
32
+ DEVICE_ID = None
33
+ CONNECTION_METHOD = None
23
34
24
35
25
36
def parse_cli_arg ():
26
37
global URL
38
+ global CONNECTION_METHOD
39
+ global DEVICE_ID
40
+ rand_device_id = '' .join (
41
+ random .choice (string .printable [:62 ]) for _ in range (32 ))
27
42
parser = argparse .ArgumentParser ('Regression test runner program' )
28
43
parser .add_argument ('-u' ,
29
44
'--url' ,
30
45
dest = 'raw_url' ,
31
46
default = "localhost:8000" )
32
47
parser .add_argument ('-d' , '--debug' , dest = "debug" , action = "store_true" )
33
48
parser .add_argument ('--nostat' , dest = "no_stat" , action = "store_true" )
49
+ parser .add_argument ('--mqtt' , dest = "enable_mqtt" , action = "store_true" )
50
+ parser .add_argument ('--device_id' ,
51
+ dest = "device_id" ,
52
+ default = rand_device_id )
34
53
args = parser .parse_args ()
35
54
55
+ # Determine whether to use full time statistic or not
36
56
if args .no_stat :
37
57
global TIMES_TOTAL
38
58
TIMES_TOTAL = 2
59
+ # Determine connection method
60
+ if args .enable_mqtt :
61
+ CONNECTION_METHOD = "mqtt"
62
+ URL = "localhost"
63
+ else :
64
+ CONNECTION_METHOD = "http"
65
+ URL = "http://" + args .raw_url
66
+ # Run with debug mode or not
39
67
if args .debug :
40
68
logging .basicConfig (level = logging .DEBUG )
41
69
else :
42
70
logging .basicConfig (level = logging .INFO )
43
- URL = "http://" + args .raw_url
71
+ # Configure connection destination
72
+ DEVICE_ID = args .device_id
44
73
45
74
46
75
def eval_stat (time_cost , func_name ):
@@ -69,11 +98,15 @@ def test_logger(f):
69
98
logger = logging .getLogger (f .__module__ )
70
99
name = f .__name__
71
100
72
- def decorate (instance ):
73
- logger .debug (f"Testing case = { name } " )
74
- return instance
101
+ def decorate (* args , ** kwargs ):
102
+ bg_color = "\033 [48;5;38m"
103
+ fg_color = "\033 [38;5;16m"
104
+ clear_color = "\033 [0m"
105
+ logger .info (f"{ bg_color } { fg_color } { name } { clear_color } " )
106
+ res = f (* args , ** kwargs )
107
+ return res
75
108
76
- return decorate ( f )
109
+ return decorate
77
110
78
111
79
112
def valid_trytes (trytes , trytes_len ):
@@ -94,7 +127,98 @@ def map_field(key, value):
94
127
return json .dumps (ret )
95
128
96
129
130
+ # Simulate random field to mqtt since we cannot put the information in the route
131
+ def add_random_field (post ):
132
+ return data
133
+
134
+
135
+ def route_http_to_mqtt (query , get_data , post_data ):
136
+ data = {}
137
+ if get_data : query += get_data
138
+ if post_data :
139
+ data .update (json .loads (post_data ))
140
+ if query [- 1 ] == "/" : query = query [:- 1 ] # Remove trailing slash
141
+
142
+ # api_generate_address
143
+ r = re .search ("/address$" , query )
144
+ if r is not None :
145
+ return query , data
146
+
147
+ # api_find_transactions_by_tag
148
+ r = re .search (f"/tag/(?P<tag>[\x00 -\xff ]*?)/hashes$" , query )
149
+ if r is not None :
150
+ tag = r .group ("tag" )
151
+ data .update ({"tag" : tag })
152
+ query = "/tag/hashes"
153
+ return query , data
154
+
155
+ # api_find_transactions_object_by_tag
156
+ r = re .search (f"/tag/(?P<tag>[\x00 -\xff ]*?)$" , query )
157
+ if r is not None :
158
+ tag = r .group ("tag" )
159
+ data .update ({"tag" : tag })
160
+ query = "/tag/object"
161
+ return query , data
162
+
163
+ # api_find_transacion_object
164
+ r = re .search (f"/transaction/object$" , query )
165
+ if r is not None :
166
+ query = "/transaction/object"
167
+ return query , data
168
+
169
+ r = re .search (f"/transaction/(?P<hash>[\u0000 -\uffff ]*?)$" , query )
170
+ if r is not None :
171
+ hash = r .group ("hash" )
172
+ data .update ({"hash" : hash })
173
+ query = f"/transaction"
174
+ return query , data
175
+
176
+ # api_send_transfer
177
+ r = re .search (f"/transaction$" , query )
178
+ if r is not None :
179
+ query = "/transaction/send"
180
+ return query , data
181
+
182
+ # api_get_tips
183
+ r = re .search (f"/tips$" , query )
184
+ if r is not None :
185
+ query = "/tips/all"
186
+ return query , data
187
+
188
+ # api_get_tips_pair
189
+ r = re .search (f"/tips/pair$" , query )
190
+ if r is not None :
191
+ return query , data
192
+
193
+ # api_send_trytes
194
+ r = re .search (f"/tryte$" , query )
195
+ if r is not None :
196
+ return query , data
197
+
198
+ # Error, cannot identify route (return directly from regression test)
199
+ return None , None
200
+
201
+
97
202
def API (get_query , get_data = None , post_data = None ):
203
+ global CONNECTION_METHOD
204
+ assert CONNECTION_METHOD != None
205
+ if CONNECTION_METHOD == "http" :
206
+ return _API_http (get_query , get_data , post_data )
207
+ elif CONNECTION_METHOD == "mqtt" :
208
+ query , data = route_http_to_mqtt (get_query , get_data , post_data )
209
+ if (query , data ) == (None , None ):
210
+ msg = {
211
+ "message" :
212
+ "Cannot identify route, directly return from regression test" ,
213
+ "status_code" : STATUS_CODE_400
214
+ }
215
+ logging .debug (msg )
216
+ return msg
217
+
218
+ return _API_mqtt (query , data )
219
+
220
+
221
+ def _API_http (get_query , get_data , post_data ):
98
222
global URL
99
223
command = "curl {} -X POST -H 'Content-Type: application/json' -w \" , %{{http_code}}\" -d '{}'"
100
224
try :
@@ -130,3 +254,57 @@ def API(get_query, get_data=None, post_data=None):
130
254
logging .debug (f"Command = { command } , response = { response } " )
131
255
132
256
return response
257
+
258
+
259
+ def _subscribe (get_query ):
260
+ add_slash = ""
261
+ if get_query [- 1 ] != "/" : add_slash = "/"
262
+ topic = f"root/topics{ get_query } { add_slash } { DEVICE_ID } "
263
+ logging .debug (f"Subscribe topic: { topic } " )
264
+
265
+ return subscribe .simple (topics = topic , hostname = URL , qos = 1 ).payload
266
+
267
+
268
+ def _API_mqtt (get_query , data ):
269
+ global URL , DEVICE_ID
270
+ data .update ({"device_id" : DEVICE_ID })
271
+
272
+ # Put subscriber in a thread since it is a blocking function
273
+ with Pool () as p :
274
+ payload = p .apply_async (_subscribe , [get_query ])
275
+ topic = f"root/topics{ get_query } "
276
+ logging .debug (f"Publish topic: { topic } , data: { data } " )
277
+
278
+ # Prevents publish execute earlier than subscribe
279
+ time .sleep (0.1 )
280
+
281
+ # Publish requests
282
+ publish .single (topic , json .dumps (data ), hostname = URL , qos = 1 )
283
+ msg = {}
284
+ try :
285
+ res = payload .get (MQTT_RECV_TIMEOUT )
286
+ msg = json .loads (res )
287
+
288
+ if type (msg ) is dict and "message" in msg .keys ():
289
+ content = msg ["message" ]
290
+ if content == "Internal service error" :
291
+ msg .update ({"status_code" : STATUS_CODE_500 })
292
+ elif content == "Request not found" :
293
+ msg .update ({"status_code" : STATUS_CODE_404 })
294
+ elif content == "Invalid path" or content == "Invalid request header" :
295
+ msg .update ({"status_code" : STATUS_CODE_400 })
296
+ else :
297
+ msg .update ({"status_code" : STATUS_CODE_200 })
298
+ else :
299
+ msg = {
300
+ "content" : json .dumps (msg ),
301
+ "status_code" : STATUS_CODE_200
302
+ }
303
+ except TimeoutError :
304
+ msg = {
305
+ "content" : "Time limit exceed" ,
306
+ "status_code" : STATUS_CODE_ERR
307
+ }
308
+
309
+ logging .debug (f"Modified response: { msg } " )
310
+ return msg
0 commit comments