From 450985dfeea7700706a31b630df87a2cfc27f149 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 29 May 2018 20:26:14 +0200 Subject: [PATCH] mvcc: Use RAII to ensure that partition versions are merged Before this patch, maybe_merge_versions() had to be manually called before partition snapshot goes away. That is error prone and makes client code more complicated. Delegate that task to a new partition_snapshot_ptr object, through which all snapshots are published now. --- cache_flat_mutation_reader.hh | 9 +++---- memtable.cc | 8 +++--- memtable.hh | 2 +- partition_snapshot_reader.hh | 22 ++++------------ partition_version.cc | 20 +++++++++------ partition_version.hh | 32 +++++++++++++++++++++++- tests/cache_flat_mutation_reader_test.cc | 8 +++--- tests/mvcc_test.cc | 14 +++++------ 8 files changed, 68 insertions(+), 47 deletions(-) diff --git a/cache_flat_mutation_reader.hh b/cache_flat_mutation_reader.hh index 76fc757229..fa6ca5fad3 100644 --- a/cache_flat_mutation_reader.hh +++ b/cache_flat_mutation_reader.hh @@ -64,7 +64,7 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl { end_of_stream }; - lw_shared_ptr _snp; + partition_snapshot_ptr _snp; position_in_partition::tri_compare _position_cmp; query::clustering_key_filter_ranges _ck_ranges; @@ -129,7 +129,7 @@ public: dht::decorated_key dk, query::clustering_key_filter_ranges&& crr, lw_shared_ptr ctx, - lw_shared_ptr snp, + partition_snapshot_ptr snp, row_cache& cache) : flat_mutation_reader::impl(std::move(s)) , _snp(std::move(snp)) @@ -149,9 +149,6 @@ public: cache_flat_mutation_reader(const cache_flat_mutation_reader&) = delete; cache_flat_mutation_reader(cache_flat_mutation_reader&&) = delete; virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override; - virtual ~cache_flat_mutation_reader() { - maybe_merge_versions(_snp); - } virtual void next_partition() override { clear_buffer_to_next_partition(); if (is_buffer_empty()) { @@ -667,7 +664,7 @@ inline flat_mutation_reader make_cache_flat_mutation_reader(schema_ptr s, query::clustering_key_filter_ranges crr, row_cache& cache, lw_shared_ptr ctx, - lw_shared_ptr snp) + partition_snapshot_ptr snp) { return make_flat_mutation_reader( std::move(s), std::move(dk), std::move(crr), std::move(ctx), std::move(snp), cache); diff --git a/memtable.cc b/memtable.cc index 459ae38a03..9b8adebaeb 100644 --- a/memtable.cc +++ b/memtable.cc @@ -322,7 +322,7 @@ public: _delegate = delegate_reader(*_delegate_range, _slice, _pc, streamed_mutation::forwarding::no, _fwd_mr); } else { auto key_and_snp = read_section()(region(), [&] { - return with_linearized_managed_bytes([&] () -> std::optional>> { + return with_linearized_managed_bytes([&] () -> std::optional> { memtable_entry *e = fetch_entry(); if (!e) { return { }; @@ -484,7 +484,7 @@ private: void get_next_partition() { uint64_t component_size = 0; auto key_and_snp = read_section()(region(), [&] { - return with_linearized_managed_bytes([&] () -> std::optional>> { + return with_linearized_managed_bytes([&] () -> std::optional> { memtable_entry* e = fetch_entry(); if (e) { auto dk = e->key(); @@ -550,7 +550,7 @@ public: } }; -lw_shared_ptr memtable_entry::snapshot(memtable& mtbl) { +partition_snapshot_ptr memtable_entry::snapshot(memtable& mtbl) { return _pe.read(mtbl.region(), mtbl.cleaner(), _schema, no_cache_tracker); } @@ -564,7 +564,7 @@ memtable::make_flat_reader(schema_ptr s, mutation_reader::forwarding fwd_mr) { if (query::is_single_partition(range)) { const query::ring_position& pos = range.start()->value(); - auto snp = _read_section(*this, [&] () -> lw_shared_ptr { + auto snp = _read_section(*this, [&] () -> partition_snapshot_ptr { managed_bytes::linearization_context_guard lcg; auto i = partitions.find(pos, memtable_entry::compare(_schema)); if (i != partitions.end()) { diff --git a/memtable.hh b/memtable.hh index 1db112f534..0075c95681 100644 --- a/memtable.hh +++ b/memtable.hh @@ -66,7 +66,7 @@ public: partition_entry& partition() { return _pe; } const schema_ptr& schema() const { return _schema; } schema_ptr& schema() { return _schema; } - lw_shared_ptr snapshot(memtable& mtbl); + partition_snapshot_ptr snapshot(memtable& mtbl); size_t external_memory_usage_without_rows() const { return _key.key().external_memory_usage(); diff --git a/partition_snapshot_reader.hh b/partition_snapshot_reader.hh index 3fbe9e9dc9..a52bdfa189 100644 --- a/partition_snapshot_reader.hh +++ b/partition_snapshot_reader.hh @@ -33,14 +33,6 @@ struct partition_snapshot_reader_dummy_accounter { }; extern partition_snapshot_reader_dummy_accounter no_accounter; -inline void maybe_merge_versions(lw_shared_ptr& snp_ptr) noexcept { - auto&& cleaner = snp_ptr->cleaner(); - auto snp = snp_ptr.release(); - if (snp) { - cleaner.merge_and_destroy(*snp.release()); - } -} - template class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public MemoryAccounter { struct rows_position { @@ -67,7 +59,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public position_in_partition::equal_compare _eq; heap_compare _heap_cmp; - lw_shared_ptr _snapshot; + partition_snapshot_ptr _snapshot; logalloc::region& _region; logalloc::allocating_section& _read_section; @@ -135,7 +127,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public return !_clustering_rows.empty(); } public: - explicit lsa_partition_reader(const schema& s, lw_shared_ptr snp, + explicit lsa_partition_reader(const schema& s, partition_snapshot_ptr snp, logalloc::region& region, logalloc::allocating_section& read_section, bool digest_requested) : _schema(s) @@ -148,10 +140,6 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public , _digest_requested(digest_requested) { } - ~lsa_partition_reader() { - maybe_merge_versions(_snapshot); - } - template decltype(auto) with_reserve(Function&& fn) { return _read_section.with_reserve(std::forward(fn)); @@ -278,7 +266,7 @@ private: } public: template - partition_snapshot_flat_reader(schema_ptr s, dht::decorated_key dk, lw_shared_ptr snp, + partition_snapshot_flat_reader(schema_ptr s, dht::decorated_key dk, partition_snapshot_ptr snp, query::clustering_key_filter_ranges crr, bool digest_requested, logalloc::region& region, logalloc::allocating_section& read_section, boost::any pointer_to_container, Args&&... args) @@ -324,7 +312,7 @@ inline flat_mutation_reader make_partition_snapshot_flat_reader(schema_ptr s, dht::decorated_key dk, query::clustering_key_filter_ranges crr, - lw_shared_ptr snp, + partition_snapshot_ptr snp, bool digest_requested, logalloc::region& region, logalloc::allocating_section& read_section, @@ -345,7 +333,7 @@ inline flat_mutation_reader make_partition_snapshot_flat_reader(schema_ptr s, dht::decorated_key dk, query::clustering_key_filter_ranges crr, - lw_shared_ptr snp, + partition_snapshot_ptr snp, bool digest_requested, logalloc::region& region, logalloc::allocating_section& read_section, diff --git a/partition_version.cc b/partition_version.cc index f15415fc43..1e357bd7f8 100644 --- a/partition_version.cc +++ b/partition_version.cc @@ -486,16 +486,13 @@ coroutine partition_entry::apply_to_incomplete(const schema& s, bool can_move = !pe._snapshot; auto src_snp = pe.read(reg, pe_cleaner, s.shared_from_this(), no_cache_tracker); - lw_shared_ptr prev_snp; + partition_snapshot_ptr prev_snp; if (preemptible) { // Reads must see prev_snp until whole update completes so that writes // are not partially visible. prev_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase - 1); } auto dst_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase); - auto merge_dst_snp = defer([preemptible, dst_snp] () mutable { - maybe_merge_versions(dst_snp); - }); // Once we start updating the partition, we must keep all snapshots until the update completes, // otherwise partial writes would be published. So the scope of snapshots must enclose the scope @@ -503,7 +500,6 @@ coroutine partition_entry::apply_to_incomplete(const schema& s, // give the caller a chance to store the coroutine object. The code inside coroutine below // runs outside allocating section. return coroutine([&tracker, &s, &alloc, ®, &acc, can_move, preemptible, - merge_dst_snp = std::move(merge_dst_snp), // needs to go away last so that dst_snp is not owned by anyone else cur = partition_snapshot_row_cursor(s, *dst_snp), src_cur = partition_snapshot_row_cursor(s, *src_snp, can_move), dst_snp = std::move(dst_snp), @@ -610,7 +606,7 @@ void partition_entry::upgrade(schema_ptr from, schema_ptr to, mutation_cleaner& remove_or_mark_as_unique_owner(old_version, &cleaner); } -lw_shared_ptr partition_entry::read(logalloc::region& r, +partition_snapshot_ptr partition_entry::read(logalloc::region& r, mutation_cleaner& cleaner, schema_ptr entry_schema, cache_tracker* tracker, partition_snapshot::phase_type phase) { if (_snapshot) { @@ -633,7 +629,7 @@ lw_shared_ptr partition_entry::read(logalloc::region& r, auto snp = make_lw_shared(entry_schema, r, cleaner, this, tracker, phase); _snapshot = snp.get(); - return snp; + return partition_snapshot_ptr(std::move(snp)); } std::vector @@ -697,3 +693,13 @@ void partition_entry::evict(mutation_cleaner& cleaner) noexcept { remove_or_mark_as_unique_owner(v, &cleaner); } } + +partition_snapshot_ptr::~partition_snapshot_ptr() { + if (_snp) { + auto&& cleaner = _snp->cleaner(); + auto snp = _snp.release(); + if (snp) { + cleaner.merge_and_destroy(*snp.release()); + } + } +} diff --git a/partition_version.hh b/partition_version.hh index a7bcfe5d18..1f44e548ab 100644 --- a/partition_version.hh +++ b/partition_version.hh @@ -379,6 +379,36 @@ public: std::vector range_tombstones(); }; +class partition_snapshot_ptr { + lw_shared_ptr _snp; +public: + using value_type = partition_snapshot; + partition_snapshot_ptr() = default; + partition_snapshot_ptr(partition_snapshot_ptr&&) = default; + partition_snapshot_ptr(const partition_snapshot_ptr&) = default; + partition_snapshot_ptr(lw_shared_ptr snp) : _snp(std::move(snp)) {} + ~partition_snapshot_ptr(); + partition_snapshot_ptr& operator=(partition_snapshot_ptr&& other) noexcept { + if (this != &other) { + this->~partition_snapshot_ptr(); + new (this) partition_snapshot_ptr(std::move(other)); + } + return *this; + } + partition_snapshot_ptr& operator=(const partition_snapshot_ptr& other) noexcept { + if (this != &other) { + this->~partition_snapshot_ptr(); + new (this) partition_snapshot_ptr(other); + } + return *this; + } + partition_snapshot& operator*() { return *_snp; } + const partition_snapshot& operator*() const { return *_snp; } + partition_snapshot* operator->() { return &*_snp; } + const partition_snapshot* operator->() const { return &*_snp; } + explicit operator bool() const { return bool(_snp); } +}; + class real_dirty_memory_accounter; // Represents mutation_partition with snapshotting support a la MVCC. @@ -534,7 +564,7 @@ public: void upgrade(schema_ptr from, schema_ptr to, mutation_cleaner&, cache_tracker*); // Snapshots with different values of phase will point to different partition_version objects. - lw_shared_ptr read(logalloc::region& region, + partition_snapshot_ptr read(logalloc::region& region, mutation_cleaner&, schema_ptr entry_schema, cache_tracker*, diff --git a/tests/cache_flat_mutation_reader_test.cc b/tests/cache_flat_mutation_reader_test.cc index edbdfdbe18..1578ad8771 100644 --- a/tests/cache_flat_mutation_reader_test.cc +++ b/tests/cache_flat_mutation_reader_test.cc @@ -103,7 +103,7 @@ mutation make_incomplete_mutation() { return mutation(SCHEMA, DK, mutation_partition::make_incomplete(*SCHEMA)); } -static void assert_single_version(lw_shared_ptr snp) { +static void assert_single_version(partition_snapshot_ptr snp) { BOOST_REQUIRE(snp->at_latest_version()); BOOST_REQUIRE_EQUAL(snp->version_count(), 1); } @@ -140,7 +140,7 @@ struct expected_row { } }; -static void assert_cached_rows(lw_shared_ptr snp, std::deque expected) { +static void assert_cached_rows(partition_snapshot_ptr snp, std::deque expected) { auto&& rows = snp->version()->partition().clustered_rows(); for (auto&& r : rows) { BOOST_REQUIRE(!expected.empty()); @@ -173,7 +173,7 @@ struct expected_tombstone { } }; -static void assert_cached_tombstones(lw_shared_ptr snp, std::deque expected) { +static void assert_cached_tombstones(partition_snapshot_ptr snp, std::deque expected) { const range_tombstone_list& rts = snp->version()->partition().row_tombstones(); for (auto&& rt : rts) { BOOST_REQUIRE(!expected.empty()); @@ -187,7 +187,7 @@ static void assert_cached_tombstones(lw_shared_ptr snp, std: class cache_tester { public: - static lw_shared_ptr snapshot_for_key(row_cache& rc, const dht::decorated_key& dk) { + static partition_snapshot_ptr snapshot_for_key(row_cache& rc, const dht::decorated_key& dk) { return rc._read_section(rc._tracker.region(), [&] { return with_linearized_managed_bytes([&] { cache_entry& e = rc.find_or_create(dk, {}, rc.phase_of(dk)); diff --git a/tests/mvcc_test.cc b/tests/mvcc_test.cc index e9e9cc6f9f..42127a8c42 100644 --- a/tests/mvcc_test.cc +++ b/tests/mvcc_test.cc @@ -170,7 +170,7 @@ public: partition_snapshot::phase_type phase() const { return _phase; } real_dirty_memory_accounter& accounter() { return _acc; } - mutation_partition squashed(lw_shared_ptr& snp) { + mutation_partition squashed(partition_snapshot_ptr& snp) { logalloc::allocating_section as; return as(_tracker.region(), [&] { return snp->squashed(); @@ -221,7 +221,7 @@ public: }); } - lw_shared_ptr read() { + partition_snapshot_ptr read() { logalloc::allocating_section as; return as(region(), [&] { return _e.read(region(), _container.cleaner(), schema(), &_container.tracker(), _container.phase()); @@ -467,7 +467,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_respects_continuity) { m1.partition().make_fully_continuous(); e += m1; - lw_shared_ptr snap; + partition_snapshot_ptr snap; if (with_active_reader) { snap = e.read(); } @@ -870,8 +870,8 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) { auto snap2 = e.read(r, cleaner, s, nullptr); - maybe_merge_versions(snap1); - maybe_merge_versions(snap2); + snap1 = {}; + snap2 = {}; cleaner.drain().get(); @@ -890,8 +890,8 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) { auto snap2 = e.read(r, cleaner, s, nullptr); - maybe_merge_versions(snap2); - maybe_merge_versions(snap1); + snap2 = {}; + snap1 = {}; cleaner.drain().get();