21
21
22
22
import itertools
23
23
import logging
24
- from typing import TYPE_CHECKING , Collection , Mapping , Set
24
+ from typing import (
25
+ TYPE_CHECKING ,
26
+ Collection ,
27
+ Mapping ,
28
+ Set ,
29
+ )
25
30
26
31
from synapse .logging .context import nested_logging_context
27
32
from synapse .metrics .background_process_metrics import wrap_as_background_process
33
+ from synapse .storage .database import LoggingTransaction
28
34
from synapse .storage .databases import Databases
35
+ from synapse .types .storage import _BackgroundUpdates
29
36
30
37
if TYPE_CHECKING :
31
38
from synapse .server import HomeServer
@@ -44,6 +51,11 @@ def __init__(self, hs: "HomeServer", stores: Databases):
44
51
self ._delete_state_groups_loop , 60 * 1000
45
52
)
46
53
54
+ self .stores .state .db_pool .updates .register_background_update_handler (
55
+ _BackgroundUpdates .DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE ,
56
+ self ._background_delete_unrefereneced_state_groups ,
57
+ )
58
+
47
59
async def purge_room (self , room_id : str ) -> None :
48
60
"""Deletes all record of a room"""
49
61
@@ -80,68 +92,6 @@ async def purge_history(
80
92
sg_to_delete
81
93
)
82
94
83
- async def _find_unreferenced_groups (
84
- self , state_groups : Collection [int ]
85
- ) -> Set [int ]:
86
- """Used when purging history to figure out which state groups can be
87
- deleted.
88
-
89
- Args:
90
- state_groups: Set of state groups referenced by events
91
- that are going to be deleted.
92
-
93
- Returns:
94
- The set of state groups that can be deleted.
95
- """
96
- # Set of events that we have found to be referenced by events
97
- referenced_groups = set ()
98
-
99
- # Set of state groups we've already seen
100
- state_groups_seen = set (state_groups )
101
-
102
- # Set of state groups to handle next.
103
- next_to_search = set (state_groups )
104
- while next_to_search :
105
- # We bound size of groups we're looking up at once, to stop the
106
- # SQL query getting too big
107
- if len (next_to_search ) < 100 :
108
- current_search = next_to_search
109
- next_to_search = set ()
110
- else :
111
- current_search = set (itertools .islice (next_to_search , 100 ))
112
- next_to_search -= current_search
113
-
114
- referenced = await self .stores .main .get_referenced_state_groups (
115
- current_search
116
- )
117
- referenced_groups |= referenced
118
-
119
- # We don't continue iterating up the state group graphs for state
120
- # groups that are referenced.
121
- current_search -= referenced
122
-
123
- edges = await self .stores .state .get_previous_state_groups (current_search )
124
-
125
- prevs = set (edges .values ())
126
- # We don't bother re-handling groups we've already seen
127
- prevs -= state_groups_seen
128
- next_to_search |= prevs
129
- state_groups_seen |= prevs
130
-
131
- # We also check to see if anything referencing the state groups are
132
- # also unreferenced. This helps ensure that we delete unreferenced
133
- # state groups, if we don't then we will de-delta them when we
134
- # delete the other state groups leading to increased DB usage.
135
- next_edges = await self .stores .state .get_next_state_groups (current_search )
136
- nexts = set (next_edges .keys ())
137
- nexts -= state_groups_seen
138
- next_to_search |= nexts
139
- state_groups_seen |= nexts
140
-
141
- to_delete = state_groups_seen - referenced_groups
142
-
143
- return to_delete
144
-
145
95
@wrap_as_background_process ("_delete_state_groups_loop" )
146
96
async def _delete_state_groups_loop (self ) -> None :
147
97
"""Background task that deletes any state groups that may be pending
@@ -203,3 +153,173 @@ async def _delete_state_groups(
203
153
room_id ,
204
154
groups_to_sequences ,
205
155
)
156
+
157
+ async def _background_delete_unrefereneced_state_groups (
158
+ self , progress : dict , batch_size : int
159
+ ) -> int :
160
+ """This background update will slowly delete any unreferenced state groups"""
161
+
162
+ last_checked_state_group = progress .get ("last_checked_state_group" )
163
+ max_state_group = progress .get ("max_state_group" )
164
+
165
+ if last_checked_state_group is None or max_state_group is None :
166
+ # This is the first run.
167
+ last_checked_state_group = 0
168
+
169
+ max_state_group = await self .stores .state .db_pool .simple_select_one_onecol (
170
+ table = "state_groups" ,
171
+ keyvalues = {},
172
+ retcol = "MAX(id)" ,
173
+ allow_none = True ,
174
+ desc = "get_max_state_group" ,
175
+ )
176
+ if max_state_group is None :
177
+ # There are no state groups so the background process is finished.
178
+ await self .stores .state .db_pool .updates ._end_background_update (
179
+ _BackgroundUpdates .DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE
180
+ )
181
+ return batch_size
182
+
183
+ (
184
+ last_checked_state_group ,
185
+ final_batch ,
186
+ ) = await self ._delete_unreferenced_state_groups_batch (
187
+ last_checked_state_group , batch_size , max_state_group
188
+ )
189
+
190
+ if not final_batch :
191
+ # There are more state groups to check.
192
+ progress = {
193
+ "last_checked_state_group" : last_checked_state_group ,
194
+ "max_state_group" : max_state_group ,
195
+ }
196
+ await self .stores .state .db_pool .updates ._background_update_progress (
197
+ _BackgroundUpdates .DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE ,
198
+ progress ,
199
+ )
200
+ else :
201
+ # This background process is finished.
202
+ await self .stores .state .db_pool .updates ._end_background_update (
203
+ _BackgroundUpdates .DELETE_UNREFERENCED_STATE_GROUPS_BG_UPDATE
204
+ )
205
+
206
+ return batch_size
207
+
208
+ async def _delete_unreferenced_state_groups_batch (
209
+ self ,
210
+ last_checked_state_group : int ,
211
+ batch_size : int ,
212
+ max_state_group : int ,
213
+ ) -> tuple [int , bool ]:
214
+ """Looks for unreferenced state groups starting from the last state group
215
+ checked, and any state groups which would become unreferenced if a state group
216
+ was deleted, and marks them for deletion.
217
+
218
+ Args:
219
+ last_checked_state_group: The last state group that was checked.
220
+ batch_size: How many state groups to process in this iteration.
221
+
222
+ Returns:
223
+ (last_checked_state_group, final_batch)
224
+ """
225
+
226
+ # Look for state groups that can be cleaned up.
227
+ def get_next_state_groups_txn (txn : LoggingTransaction ) -> Set [int ]:
228
+ state_group_sql = "SELECT id FROM state_groups WHERE ? < id AND id <= ? ORDER BY id LIMIT ?"
229
+ txn .execute (
230
+ state_group_sql , (last_checked_state_group , max_state_group , batch_size )
231
+ )
232
+
233
+ next_set = {row [0 ] for row in txn }
234
+
235
+ return next_set
236
+
237
+ next_set = await self .stores .state .db_pool .runInteraction (
238
+ "get_next_state_groups" , get_next_state_groups_txn
239
+ )
240
+
241
+ final_batch = False
242
+ if len (next_set ) < batch_size :
243
+ final_batch = True
244
+ else :
245
+ last_checked_state_group = max (next_set )
246
+
247
+ if len (next_set ) == 0 :
248
+ return last_checked_state_group , final_batch
249
+
250
+ # Find all state groups that can be deleted if the original set is deleted.
251
+ # This set includes the original set, as well as any state groups that would
252
+ # become unreferenced upon deleting the original set.
253
+ to_delete = await self ._find_unreferenced_groups (next_set )
254
+
255
+ if len (to_delete ) == 0 :
256
+ return last_checked_state_group , final_batch
257
+
258
+ await self .stores .state_deletion .mark_state_groups_as_pending_deletion (
259
+ to_delete
260
+ )
261
+
262
+ return last_checked_state_group , final_batch
263
+
264
+ async def _find_unreferenced_groups (
265
+ self ,
266
+ state_groups : Collection [int ],
267
+ ) -> Set [int ]:
268
+ """Used when purging history to figure out which state groups can be
269
+ deleted.
270
+
271
+ Args:
272
+ state_groups: Set of state groups referenced by events
273
+ that are going to be deleted.
274
+
275
+ Returns:
276
+ The set of state groups that can be deleted.
277
+ """
278
+ # Set of events that we have found to be referenced by events
279
+ referenced_groups = set ()
280
+
281
+ # Set of state groups we've already seen
282
+ state_groups_seen = set (state_groups )
283
+
284
+ # Set of state groups to handle next.
285
+ next_to_search = set (state_groups )
286
+ while next_to_search :
287
+ # We bound size of groups we're looking up at once, to stop the
288
+ # SQL query getting too big
289
+ if len (next_to_search ) < 100 :
290
+ current_search = next_to_search
291
+ next_to_search = set ()
292
+ else :
293
+ current_search = set (itertools .islice (next_to_search , 100 ))
294
+ next_to_search -= current_search
295
+
296
+ referenced = await self .stores .main .get_referenced_state_groups (
297
+ current_search
298
+ )
299
+ referenced_groups |= referenced
300
+
301
+ # We don't continue iterating up the state group graphs for state
302
+ # groups that are referenced.
303
+ current_search -= referenced
304
+
305
+ edges = await self .stores .state .get_previous_state_groups (current_search )
306
+
307
+ prevs = set (edges .values ())
308
+ # We don't bother re-handling groups we've already seen
309
+ prevs -= state_groups_seen
310
+ next_to_search |= prevs
311
+ state_groups_seen |= prevs
312
+
313
+ # We also check to see if anything referencing the state groups are
314
+ # also unreferenced. This helps ensure that we delete unreferenced
315
+ # state groups, if we don't then we will de-delta them when we
316
+ # delete the other state groups leading to increased DB usage.
317
+ next_edges = await self .stores .state .get_next_state_groups (current_search )
318
+ nexts = set (next_edges .keys ())
319
+ nexts -= state_groups_seen
320
+ next_to_search |= nexts
321
+ state_groups_seen |= nexts
322
+
323
+ to_delete = state_groups_seen - referenced_groups
324
+
325
+ return to_delete
0 commit comments