Skip to content

Commit 33636a8

Browse files
author
Alyona Chalkina
committed
Add truncate method support
The tube_truncate method was created to clean tubes, as well as the truncate method on the router to call it
1 parent 16add83 commit 33636a8

File tree

8 files changed

+86
-0
lines changed

8 files changed

+86
-0
lines changed

sharded_queue/api.lua

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ return {
8989
kick = queue_action_wrapper('kick'),
9090
peek = queue_action_wrapper('peek'),
9191
drop = queue_action_wrapper('drop'),
92+
trancate = queue_action_wrapper('trancate'),
9293

9394
cfg = setmetatable({}, {
9495
__index = config,

sharded_queue/drivers/fifo.lua

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,11 @@ function method.peek(args)
181181
return normalize_task(get_space(args):get { args.task_id })
182182
end
183183

184+
function method.truncate(args)
185+
update_stat(args.tube_name, "truncate")
186+
return get_space(args):truncate()
187+
end
188+
184189
return {
185190
create = tube_create,
186191
drop = tube_drop,

sharded_queue/drivers/fifottl.lua

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,11 @@ function method.kick(args)
456456
return args.count
457457
end
458458

459+
function method.truncate(args)
460+
update_stat(args.tube_name, "truncate")
461+
return box.space[args.tube_name]:truncate()
462+
end
463+
459464
return {
460465
create = tube_create,
461466
drop = tube_drop,

sharded_queue/router/tube.lua

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,28 @@ function drop(self)
350350
cartridge.config_patch_clusterwide({ tubes = tubes })
351351
end
352352

353+
function truncate(self, options)
354+
options = table.deepcopy(options or {})
355+
options.tube_name = self.tube_name
356+
357+
options.extra = {
358+
log_request = utils.normalize.log_request(options.log_request) or self.log_request,
359+
}
360+
361+
local _, err, alias = vshard.router.map_callrw('tube_truncate', {
362+
options
363+
})
364+
-- Re-raise storage errors.
365+
if err then
366+
if alias then
367+
error("Error occurred on replicaset \"" .. alias .. "\": " .. err.message)
368+
else
369+
error("Error occurred: " .. err.message)
370+
end
371+
return
372+
end
373+
end
374+
353375
local methods = {
354376
put = put,
355377
take = take,
@@ -360,6 +382,7 @@ local methods = {
360382
bury = bury,
361383
kick = kick,
362384
peek = peek,
385+
truncate = truncate,
363386
}
364387

365388
-- The Tarantool 3.0 does not support to update dinamically a configuration, so

sharded_queue/stats/storage.lua

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ local actions = {
1515
touch = 8,
1616
ack = 9,
1717
release = 10,
18+
truncate = 11,
1819
}
1920

2021
function statistics.init()
@@ -32,6 +33,7 @@ function statistics.init()
3233
{ 'touch', 'unsigned' },
3334
{ 'ack', 'unsigned' },
3435
{ 'release', 'unsigned' },
36+
{ 'truncate', 'unsigned' },
3537
})
3638

3739
space_stat:create_index('primary', {
@@ -65,6 +67,7 @@ function statistics.reset(tube_name)
6567
touch = 0,
6668
ack = 0,
6769
release = 0,
70+
truncate = 0,
6871
})
6972
box.space._queue_statistics:replace(default_stat)
7073
end

sharded_queue/storage/methods.lua

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ local methods = {
1414
'release',
1515
'bury',
1616
'kick',
17+
'truncate',
1718
}
1819

1920
local function init(metrics, tubes)

test/metrics_test.lua

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ g.test_metrics_router = function()
150150
touch = 0,
151151
ack = 0,
152152
release = 0,
153+
truncate = 0,
153154
})
154155
assert_metric(metrics, "tnt_sharded_queue_router_statistics_tasks", "state", {
155156
ready = 0,
@@ -177,6 +178,7 @@ g.test_metrics_router = function()
177178
touch = 0,
178179
ack = 0,
179180
release = 0,
181+
truncate = 0,
180182
})
181183
assert_metric(metrics, "tnt_sharded_queue_storage_statistics_tasks", "state", {
182184
ready = 0,
@@ -268,6 +270,7 @@ g.test_metrics_storage = function()
268270
'release',
269271
'bury',
270272
'kick',
273+
'truncate',
271274
}
272275

273276
-- Some of them will fail, some of them not - it does not metter. We just
@@ -301,6 +304,7 @@ g.test_metrics_storage = function()
301304
touch = 0,
302305
ack = 0,
303306
release = 0,
307+
truncate = 1,
304308
})
305309
assert_metric(metrics, "tnt_sharded_queue_storage_statistics_tasks", "state", {
306310
ready = 0,

test/simple_test.lua

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,3 +250,47 @@ function g.test_bury_kick()
250250
t.assert_equals(cur_stat.calls.kick, bury_task_count)
251251
t.assert_equals(cur_stat.tasks.ready, task_count)
252252
end
253+
254+
function g.test_trancate()
255+
local tube_name = 'trancate_test'
256+
helper.create_tube(tube_name)
257+
258+
-- task data for putting
259+
local task_count = 20
260+
local tasks_data = {}
261+
262+
for i = 1, task_count do
263+
table.insert(tasks_data, {
264+
name = 'task_' .. i,
265+
raw = '*'
266+
})
267+
end
268+
269+
local task_ids = {}
270+
271+
-- returned tasks
272+
for _, data in pairs(tasks_data) do
273+
local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { data })
274+
local peek_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'peek'), {
275+
task[utils.index.task_id]
276+
})
277+
t.assert_equals(peek_task[utils.index.status], utils.state.READY)
278+
table.insert(task_ids, task[utils.index.task_id])
279+
end
280+
281+
-- trancate the tube
282+
g.queue_conn:call(utils.shape_cmd(tube_name, 'truncate'))
283+
284+
-- check that created tasks don't exist
285+
for _, taskId in pairs(task_ids) do
286+
local peek_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'peek'), {
287+
taskId
288+
})
289+
t.assert_equals(peek_task, nil)
290+
end
291+
292+
-- check stats
293+
local stat = g.queue_conn:call('queue.statistics', { tube_name })
294+
t.assert_equals(stat.calls.put, task_count)
295+
t.assert_equals(stat.calls.truncate, 2)
296+
end

0 commit comments

Comments
 (0)