61
61
//! After completing the snapshot we use [`crate::inspect::get_changes_asc`] which will return
62
62
//! all changes between a `[lower, upper)` bound of [`Lsn`]s.
63
63
64
- use std:: collections:: { BTreeMap , BTreeSet } ;
64
+ use std:: collections:: BTreeMap ;
65
65
use std:: fmt;
66
66
use std:: sync:: Arc ;
67
67
use std:: time:: Duration ;
@@ -73,6 +73,7 @@ use proptest_derive::Arbitrary;
73
73
use serde:: { Deserialize , Serialize } ;
74
74
use tiberius:: numeric:: Numeric ;
75
75
76
+ use crate :: desc:: SqlServerTableRaw ;
76
77
use crate :: { Client , SqlServerError , TransactionIsolationLevel } ;
77
78
78
79
/// A stream of changes from a table in SQL Server that has CDC enabled.
@@ -143,39 +144,21 @@ impl<'a> CdcStream<'a> {
143
144
self
144
145
}
145
146
146
- /// Takes a snapshot of the upstream table that the specified `capture_instance` is
147
- /// replicating changes from.
148
- ///
149
- /// An optional `instances` parameter can be provided to only snapshot the specified instances.
147
+ /// Takes a snapshot of the upstream table.
150
148
pub async fn snapshot < ' b > (
151
149
& ' b mut self ,
152
- instances : Option < BTreeSet < Arc < str > > > ,
150
+ table : & SqlServerTableRaw ,
153
151
worker_id : usize ,
154
152
source_id : GlobalId ,
155
153
) -> Result <
156
154
(
157
155
Lsn ,
158
- BTreeMap < Arc < str > , usize > ,
159
- impl Stream < Item = ( Arc < str > , Result < tiberius:: Row , SqlServerError > ) > + use < ' b , ' a > ,
156
+ usize ,
157
+ impl Stream < Item = Result < tiberius:: Row , SqlServerError > > ,
160
158
) ,
161
159
SqlServerError ,
162
160
> {
163
161
static SAVEPOINT_NAME : & str = "_mz_snap_" ;
164
-
165
- // Determine what table we need to snapshot.
166
- let instances = self
167
- . capture_instances
168
- . keys ( )
169
- . filter ( |i| match instances. as_ref ( ) {
170
- // Only snapshot the instance if the filter includes it.
171
- Some ( filter) => filter. contains ( i. as_ref ( ) ) ,
172
- None => true ,
173
- } )
174
- . map ( |i| i. as_ref ( ) ) ;
175
- let tables =
176
- crate :: inspect:: get_tables_for_capture_instance ( self . client , instances) . await ?;
177
- tracing:: info!( %source_id, ?tables, "timely-{worker_id} got table for capture instance" ) ;
178
-
179
162
// Before starting a transaction where the LSN will not advance, ensure
180
163
// the upstream DB is ready for CDC.
181
164
self . wait_for_ready ( ) . await ?;
@@ -189,22 +172,15 @@ impl<'a> CdcStream<'a> {
189
172
// as it will be just be locking the table(s).
190
173
let mut fencing_client = self . client . new_connection ( ) . await ?;
191
174
let mut fence_txn = fencing_client. transaction ( ) . await ?;
192
-
193
- // TODO improve table locking: https://github.com/MaterializeInc/database-issues/issues/9512
194
- for ( _capture_instance, schema, table) in & tables {
195
- tracing:: trace!( %source_id, %schema, %table, "timely-{worker_id} locking table" ) ;
196
- fence_txn. lock_table_shared ( & * schema, & * table) . await ?;
197
- }
198
-
199
- // So we know that we locked that tables and roughly how long that took based on the time diff
200
- // from the last message.
201
- tracing:: info!( %source_id, "timely-{worker_id} locked tables" ) ;
175
+ fence_txn
176
+ . lock_table_shared ( & table. schema_name , & table. name )
177
+ . await ?;
178
+ tracing:: info!( %source_id, %table. schema_name, %table. name, "timely-{worker_id} locked table" ) ;
202
179
203
180
self . client
204
181
. set_transaction_isolation ( TransactionIsolationLevel :: Snapshot )
205
182
. await ?;
206
183
let mut txn = self . client . transaction ( ) . await ?;
207
-
208
184
// Creating a savepoint forces a write to the transaction log, which will
209
185
// assign an LSN, but it does not force a transaction sequence number to be
210
186
// assigned as far as I can tell. I have not observed any entries added to
@@ -223,14 +199,14 @@ impl<'a> CdcStream<'a> {
223
199
) ) ?
224
200
}
225
201
226
- // Because the tables are locked, any write operation has either
202
+ // Because the table is locked, any write operation has either
227
203
// completed, or is blocked. The LSN and XSN acquired now will represent a
228
204
// consistent point-in-time view, such that any committed write will be
229
205
// visible to this snapshot and the LSN of such a write will be less than
230
206
// or equal to the LSN captured here. Creating the savepoint sets the LSN,
231
207
// we can read it after rolling back the locks.
232
208
txn. create_savepoint ( SAVEPOINT_NAME ) . await ?;
233
- tracing:: info!( %source_id, %SAVEPOINT_NAME , "timely-{worker_id} created savepoint" ) ;
209
+ tracing:: info!( %source_id, %table . schema_name , %table . name , % SAVEPOINT_NAME , "timely-{worker_id} created savepoint" ) ;
234
210
235
211
// Once the XSN is esablished and the LSN captured, the tables no longer
236
212
// need to be locked. Any writes that happen to the upstream tables
@@ -241,42 +217,27 @@ impl<'a> CdcStream<'a> {
241
217
242
218
tracing:: info!( %source_id, ?lsn, "timely-{worker_id} starting snapshot" ) ;
243
219
244
- // Get the size of each table we're about to snapshot.
245
- //
246
- // TODO(sql_server3): To expose a more "generic" interface it would be nice to
247
- // make it configurable about whether or not we take a count first.
248
- let mut snapshot_stats = BTreeMap :: default ( ) ;
249
- for ( capture_instance, schema, table) in & tables {
250
- tracing:: trace!( %capture_instance, %schema, %table, "snapshot stats start" ) ;
251
- let size = crate :: inspect:: snapshot_size ( txn. client , & * schema, & * table) . await ?;
252
- snapshot_stats. insert ( Arc :: clone ( capture_instance) , size) ;
253
- tracing:: trace!( %source_id, %capture_instance, %schema, %table, "timely-{worker_id} snapshot stats end" ) ;
254
- }
255
-
256
- // Run a `SELECT` query to snapshot the entire table.
257
- let stream = async_stream:: stream! {
258
- // TODO(sql_server3): A stream of streams would be better here than
259
- // returning the name with each result, but the lifetimes are tricky.
260
- for ( capture_instance, schema_name, table_name) in tables {
261
- tracing:: trace!( %source_id, %capture_instance, %schema_name, %table_name, "timely-{worker_id} snapshot start" ) ;
262
-
263
- let snapshot = crate :: inspect:: snapshot( txn. client, & * schema_name, & * table_name) ;
264
- let mut snapshot = std:: pin:: pin!( snapshot) ;
265
- while let Some ( result) = snapshot. next( ) . await {
266
- yield ( Arc :: clone( & capture_instance) , result) ;
220
+ tracing:: trace!( %source_id, %table. capture_instance, %table. schema_name, %table. name, "timely-{worker_id} snapshot stats start" ) ;
221
+ // Establish the consistent point-in-time and release the lock.
222
+ let size =
223
+ crate :: inspect:: snapshot_size ( txn. client , & table. schema_name , & table. name ) . await ?;
224
+ tracing:: trace!( %source_id, %table. capture_instance, %table. schema_name, %table. name, "timely-{worker_id} snapshot stats end" ) ;
225
+ let schema_name = Arc :: clone ( & table. schema_name ) ;
226
+ let table_name = Arc :: clone ( & table. name ) ;
227
+ let rows = async_stream:: try_stream! {
228
+ {
229
+ let snapshot_stream = crate :: inspect:: snapshot( txn. client, & * schema_name, & * table_name) ;
230
+ tokio:: pin!( snapshot_stream) ;
231
+
232
+ while let Some ( row) = snapshot_stream. next( ) . await {
233
+ yield row?;
267
234
}
268
-
269
- tracing:: trace!( %source_id, %capture_instance, %schema_name, %table_name, "timely-{worker_id} snapshot end" ) ;
270
235
}
271
236
272
- // Slightly awkward, but if the rollback fails we need to conform to
273
- // type of the stream.
274
- if let Err ( e) = txn. rollback( ) . await {
275
- yield ( "rollback" . into( ) , Err ( e) ) ;
276
- }
237
+ txn. rollback( ) . await ?
277
238
} ;
278
239
279
- Ok ( ( lsn, snapshot_stats , stream ) )
240
+ Ok ( ( lsn, size , rows ) )
280
241
}
281
242
282
243
/// Consume `self` returning a [`Stream`] of [`CdcEvent`]s.
0 commit comments