Skip to content

Commit 9c04b8d

Browse files
authored
Merge pull request #33378 from petrosagg/lazy-catalog-state
adapter: lazily clone and update the state
2 parents 2813d29 + e2e63d0 commit 9c04b8d

File tree

1 file changed

+34
-21
lines changed

1 file changed

+34
-21
lines changed

src/adapter/src/catalog/transact.rs

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
//! Logic related to executing catalog transactions.
1111
12+
use std::borrow::Cow;
1213
use std::collections::{BTreeMap, BTreeSet};
1314
use std::sync::Arc;
1415
use std::time::Duration;
@@ -425,10 +426,8 @@ impl Catalog {
425426
.transaction()
426427
.await
427428
.unwrap_or_terminate("starting catalog transaction");
428-
// Prepare a candidate catalog state.
429-
let mut state = self.state.clone();
430429

431-
Self::transact_inner(
430+
let new_state = Self::transact_inner(
432431
storage_collections,
433432
oracle_write_ts,
434433
session,
@@ -437,7 +436,7 @@ impl Catalog {
437436
&mut builtin_table_updates,
438437
&mut audit_events,
439438
&mut tx,
440-
&mut state,
439+
&self.state,
441440
)
442441
.await?;
443442

@@ -452,8 +451,10 @@ impl Catalog {
452451
// Dropping here keeps the mutable borrow on self, preventing us accidentally
453452
// mutating anything until after f is executed.
454453
drop(storage);
455-
self.state = state;
456-
self.transient_revision += 1;
454+
if let Some(new_state) = new_state {
455+
self.transient_revision += 1;
456+
self.state = new_state;
457+
}
457458

458459
// Drop in-memory planning metadata.
459460
let dropped_notices = self.drop_plans_and_metainfos(&dropped_global_ids);
@@ -472,7 +473,8 @@ impl Catalog {
472473
})
473474
}
474475

475-
/// Performs the transaction described by `ops`.
476+
/// Performs the transaction described by `ops` and returns the new state of the catalog, if
477+
/// it has changed. If `ops` don't result in a change in the state this method returns `None`.
476478
///
477479
/// # Panics
478480
/// - If `ops` contains [`Op::TransactionDryRun`] and the value is not the
@@ -490,8 +492,10 @@ impl Catalog {
490492
builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
491493
audit_events: &mut Vec<VersionedEvent>,
492494
tx: &mut Transaction<'_>,
493-
state: &mut CatalogState,
494-
) -> Result<(), AdapterError> {
495+
state: &CatalogState,
496+
) -> Result<Option<CatalogState>, AdapterError> {
497+
let mut state = Cow::Borrowed(state);
498+
495499
let dry_run_ops = match ops.last() {
496500
Some(Op::TransactionDryRun) => {
497501
// Remove dry run marker.
@@ -500,7 +504,7 @@ impl Catalog {
500504
ops.clone()
501505
}
502506
Some(_) => vec![],
503-
None => return Ok(()),
507+
None => return Ok(None),
504508
};
505509

506510
let mut storage_collections_to_create = BTreeSet::new();
@@ -515,7 +519,7 @@ impl Catalog {
515519
&temporary_ids,
516520
audit_events,
517521
tx,
518-
state,
522+
&*state,
519523
&mut storage_collections_to_create,
520524
&mut storage_collections_to_drop,
521525
&mut storage_collections_to_register,
@@ -550,10 +554,13 @@ impl Catalog {
550554

551555
let mut updates: Vec<_> = tx.get_and_commit_op_updates();
552556
updates.extend(temporary_item_updates);
553-
let op_builtin_table_updates = state.apply_updates(updates)?;
554-
let op_builtin_table_updates =
555-
state.resolve_builtin_table_updates(op_builtin_table_updates);
556-
builtin_table_updates.extend(op_builtin_table_updates);
557+
if !updates.is_empty() {
558+
let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
559+
let op_builtin_table_updates = state
560+
.to_mut()
561+
.resolve_builtin_table_updates(op_builtin_table_updates);
562+
builtin_table_updates.extend(op_builtin_table_updates);
563+
}
557564
}
558565

559566
if dry_run_ops.is_empty() {
@@ -569,16 +576,22 @@ impl Catalog {
569576
}
570577

571578
let updates = tx.get_and_commit_op_updates();
572-
let op_builtin_table_updates = state.apply_updates(updates)?;
573-
let op_builtin_table_updates =
574-
state.resolve_builtin_table_updates(op_builtin_table_updates);
575-
builtin_table_updates.extend(op_builtin_table_updates);
579+
if !updates.is_empty() {
580+
let op_builtin_table_updates = state.to_mut().apply_updates(updates)?;
581+
let op_builtin_table_updates = state
582+
.to_mut()
583+
.resolve_builtin_table_updates(op_builtin_table_updates);
584+
builtin_table_updates.extend(op_builtin_table_updates);
585+
}
576586

577-
Ok(())
587+
match state {
588+
Cow::Owned(state) => Ok(Some(state)),
589+
Cow::Borrowed(_) => Ok(None),
590+
}
578591
} else {
579592
Err(AdapterError::TransactionDryRun {
580593
new_ops: dry_run_ops,
581-
new_state: state.clone(),
594+
new_state: state.into_owned(),
582595
})
583596
}
584597
}

0 commit comments

Comments
 (0)