mvcc: Use RAII to ensure that partition versions are merged

Before this patch, maybe_merge_versions() had to be manually called
before partition snapshot goes away. That is error prone and makes
client code more complicated. Delegate that task to a new
partition_snapshot_ptr object, through which all snapshots are
published now.
This commit is contained in:
Tomasz Grabiec
2018-05-29 20:26:14 +02:00
parent c26a304fbb
commit 450985dfee
8 changed files with 68 additions and 47 deletions

View File

@@ -64,7 +64,7 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
end_of_stream
};
lw_shared_ptr<partition_snapshot> _snp;
partition_snapshot_ptr _snp;
position_in_partition::tri_compare _position_cmp;
query::clustering_key_filter_ranges _ck_ranges;
@@ -129,7 +129,7 @@ public:
dht::decorated_key dk,
query::clustering_key_filter_ranges&& crr,
lw_shared_ptr<read_context> ctx,
lw_shared_ptr<partition_snapshot> snp,
partition_snapshot_ptr snp,
row_cache& cache)
: flat_mutation_reader::impl(std::move(s))
, _snp(std::move(snp))
@@ -149,9 +149,6 @@ public:
cache_flat_mutation_reader(const cache_flat_mutation_reader&) = delete;
cache_flat_mutation_reader(cache_flat_mutation_reader&&) = delete;
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override;
virtual ~cache_flat_mutation_reader() {
maybe_merge_versions(_snp);
}
virtual void next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
@@ -667,7 +664,7 @@ inline flat_mutation_reader make_cache_flat_mutation_reader(schema_ptr s,
query::clustering_key_filter_ranges crr,
row_cache& cache,
lw_shared_ptr<cache::read_context> ctx,
lw_shared_ptr<partition_snapshot> snp)
partition_snapshot_ptr snp)
{
return make_flat_mutation_reader<cache::cache_flat_mutation_reader>(
std::move(s), std::move(dk), std::move(crr), std::move(ctx), std::move(snp), cache);

View File

@@ -322,7 +322,7 @@ public:
_delegate = delegate_reader(*_delegate_range, _slice, _pc, streamed_mutation::forwarding::no, _fwd_mr);
} else {
auto key_and_snp = read_section()(region(), [&] {
return with_linearized_managed_bytes([&] () -> std::optional<std::pair<dht::decorated_key, lw_shared_ptr<partition_snapshot>>> {
return with_linearized_managed_bytes([&] () -> std::optional<std::pair<dht::decorated_key, partition_snapshot_ptr>> {
memtable_entry *e = fetch_entry();
if (!e) {
return { };
@@ -484,7 +484,7 @@ private:
void get_next_partition() {
uint64_t component_size = 0;
auto key_and_snp = read_section()(region(), [&] {
return with_linearized_managed_bytes([&] () -> std::optional<std::pair<dht::decorated_key, lw_shared_ptr<partition_snapshot>>> {
return with_linearized_managed_bytes([&] () -> std::optional<std::pair<dht::decorated_key, partition_snapshot_ptr>> {
memtable_entry* e = fetch_entry();
if (e) {
auto dk = e->key();
@@ -550,7 +550,7 @@ public:
}
};
lw_shared_ptr<partition_snapshot> memtable_entry::snapshot(memtable& mtbl) {
partition_snapshot_ptr memtable_entry::snapshot(memtable& mtbl) {
return _pe.read(mtbl.region(), mtbl.cleaner(), _schema, no_cache_tracker);
}
@@ -564,7 +564,7 @@ memtable::make_flat_reader(schema_ptr s,
mutation_reader::forwarding fwd_mr) {
if (query::is_single_partition(range)) {
const query::ring_position& pos = range.start()->value();
auto snp = _read_section(*this, [&] () -> lw_shared_ptr<partition_snapshot> {
auto snp = _read_section(*this, [&] () -> partition_snapshot_ptr {
managed_bytes::linearization_context_guard lcg;
auto i = partitions.find(pos, memtable_entry::compare(_schema));
if (i != partitions.end()) {

View File

@@ -66,7 +66,7 @@ public:
partition_entry& partition() { return _pe; }
const schema_ptr& schema() const { return _schema; }
schema_ptr& schema() { return _schema; }
lw_shared_ptr<partition_snapshot> snapshot(memtable& mtbl);
partition_snapshot_ptr snapshot(memtable& mtbl);
size_t external_memory_usage_without_rows() const {
return _key.key().external_memory_usage();

View File

@@ -33,14 +33,6 @@ struct partition_snapshot_reader_dummy_accounter {
};
extern partition_snapshot_reader_dummy_accounter no_accounter;
inline void maybe_merge_versions(lw_shared_ptr<partition_snapshot>& snp_ptr) noexcept {
auto&& cleaner = snp_ptr->cleaner();
auto snp = snp_ptr.release();
if (snp) {
cleaner.merge_and_destroy(*snp.release());
}
}
template <typename MemoryAccounter = partition_snapshot_reader_dummy_accounter>
class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public MemoryAccounter {
struct rows_position {
@@ -67,7 +59,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
position_in_partition::equal_compare _eq;
heap_compare _heap_cmp;
lw_shared_ptr<partition_snapshot> _snapshot;
partition_snapshot_ptr _snapshot;
logalloc::region& _region;
logalloc::allocating_section& _read_section;
@@ -135,7 +127,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
return !_clustering_rows.empty();
}
public:
explicit lsa_partition_reader(const schema& s, lw_shared_ptr<partition_snapshot> snp,
explicit lsa_partition_reader(const schema& s, partition_snapshot_ptr snp,
logalloc::region& region, logalloc::allocating_section& read_section,
bool digest_requested)
: _schema(s)
@@ -148,10 +140,6 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
, _digest_requested(digest_requested)
{ }
~lsa_partition_reader() {
maybe_merge_versions(_snapshot);
}
template<typename Function>
decltype(auto) with_reserve(Function&& fn) {
return _read_section.with_reserve(std::forward<Function>(fn));
@@ -278,7 +266,7 @@ private:
}
public:
template <typename... Args>
partition_snapshot_flat_reader(schema_ptr s, dht::decorated_key dk, lw_shared_ptr<partition_snapshot> snp,
partition_snapshot_flat_reader(schema_ptr s, dht::decorated_key dk, partition_snapshot_ptr snp,
query::clustering_key_filter_ranges crr, bool digest_requested,
logalloc::region& region, logalloc::allocating_section& read_section,
boost::any pointer_to_container, Args&&... args)
@@ -324,7 +312,7 @@ inline flat_mutation_reader
make_partition_snapshot_flat_reader(schema_ptr s,
dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
lw_shared_ptr<partition_snapshot> snp,
partition_snapshot_ptr snp,
bool digest_requested,
logalloc::region& region,
logalloc::allocating_section& read_section,
@@ -345,7 +333,7 @@ inline flat_mutation_reader
make_partition_snapshot_flat_reader(schema_ptr s,
dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
lw_shared_ptr<partition_snapshot> snp,
partition_snapshot_ptr snp,
bool digest_requested,
logalloc::region& region,
logalloc::allocating_section& read_section,

View File

@@ -486,16 +486,13 @@ coroutine partition_entry::apply_to_incomplete(const schema& s,
bool can_move = !pe._snapshot;
auto src_snp = pe.read(reg, pe_cleaner, s.shared_from_this(), no_cache_tracker);
lw_shared_ptr<partition_snapshot> prev_snp;
partition_snapshot_ptr prev_snp;
if (preemptible) {
// Reads must see prev_snp until whole update completes so that writes
// are not partially visible.
prev_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase - 1);
}
auto dst_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase);
auto merge_dst_snp = defer([preemptible, dst_snp] () mutable {
maybe_merge_versions(dst_snp);
});
// Once we start updating the partition, we must keep all snapshots until the update completes,
// otherwise partial writes would be published. So the scope of snapshots must enclose the scope
@@ -503,7 +500,6 @@ coroutine partition_entry::apply_to_incomplete(const schema& s,
// give the caller a chance to store the coroutine object. The code inside coroutine below
// runs outside allocating section.
return coroutine([&tracker, &s, &alloc, &reg, &acc, can_move, preemptible,
merge_dst_snp = std::move(merge_dst_snp), // needs to go away last so that dst_snp is not owned by anyone else
cur = partition_snapshot_row_cursor(s, *dst_snp),
src_cur = partition_snapshot_row_cursor(s, *src_snp, can_move),
dst_snp = std::move(dst_snp),
@@ -610,7 +606,7 @@ void partition_entry::upgrade(schema_ptr from, schema_ptr to, mutation_cleaner&
remove_or_mark_as_unique_owner(old_version, &cleaner);
}
lw_shared_ptr<partition_snapshot> partition_entry::read(logalloc::region& r,
partition_snapshot_ptr partition_entry::read(logalloc::region& r,
mutation_cleaner& cleaner, schema_ptr entry_schema, cache_tracker* tracker, partition_snapshot::phase_type phase)
{
if (_snapshot) {
@@ -633,7 +629,7 @@ lw_shared_ptr<partition_snapshot> partition_entry::read(logalloc::region& r,
auto snp = make_lw_shared<partition_snapshot>(entry_schema, r, cleaner, this, tracker, phase);
_snapshot = snp.get();
return snp;
return partition_snapshot_ptr(std::move(snp));
}
std::vector<range_tombstone>
@@ -697,3 +693,13 @@ void partition_entry::evict(mutation_cleaner& cleaner) noexcept {
remove_or_mark_as_unique_owner(v, &cleaner);
}
}
partition_snapshot_ptr::~partition_snapshot_ptr() {
if (_snp) {
auto&& cleaner = _snp->cleaner();
auto snp = _snp.release();
if (snp) {
cleaner.merge_and_destroy(*snp.release());
}
}
}

View File

@@ -379,6 +379,36 @@ public:
std::vector<range_tombstone> range_tombstones();
};
class partition_snapshot_ptr {
lw_shared_ptr<partition_snapshot> _snp;
public:
using value_type = partition_snapshot;
partition_snapshot_ptr() = default;
partition_snapshot_ptr(partition_snapshot_ptr&&) = default;
partition_snapshot_ptr(const partition_snapshot_ptr&) = default;
partition_snapshot_ptr(lw_shared_ptr<partition_snapshot> snp) : _snp(std::move(snp)) {}
~partition_snapshot_ptr();
partition_snapshot_ptr& operator=(partition_snapshot_ptr&& other) noexcept {
if (this != &other) {
this->~partition_snapshot_ptr();
new (this) partition_snapshot_ptr(std::move(other));
}
return *this;
}
partition_snapshot_ptr& operator=(const partition_snapshot_ptr& other) noexcept {
if (this != &other) {
this->~partition_snapshot_ptr();
new (this) partition_snapshot_ptr(other);
}
return *this;
}
partition_snapshot& operator*() { return *_snp; }
const partition_snapshot& operator*() const { return *_snp; }
partition_snapshot* operator->() { return &*_snp; }
const partition_snapshot* operator->() const { return &*_snp; }
explicit operator bool() const { return bool(_snp); }
};
class real_dirty_memory_accounter;
// Represents mutation_partition with snapshotting support a la MVCC.
@@ -534,7 +564,7 @@ public:
void upgrade(schema_ptr from, schema_ptr to, mutation_cleaner&, cache_tracker*);
// Snapshots with different values of phase will point to different partition_version objects.
lw_shared_ptr<partition_snapshot> read(logalloc::region& region,
partition_snapshot_ptr read(logalloc::region& region,
mutation_cleaner&,
schema_ptr entry_schema,
cache_tracker*,

View File

@@ -103,7 +103,7 @@ mutation make_incomplete_mutation() {
return mutation(SCHEMA, DK, mutation_partition::make_incomplete(*SCHEMA));
}
static void assert_single_version(lw_shared_ptr<partition_snapshot> snp) {
static void assert_single_version(partition_snapshot_ptr snp) {
BOOST_REQUIRE(snp->at_latest_version());
BOOST_REQUIRE_EQUAL(snp->version_count(), 1);
}
@@ -140,7 +140,7 @@ struct expected_row {
}
};
static void assert_cached_rows(lw_shared_ptr<partition_snapshot> snp, std::deque<expected_row> expected) {
static void assert_cached_rows(partition_snapshot_ptr snp, std::deque<expected_row> expected) {
auto&& rows = snp->version()->partition().clustered_rows();
for (auto&& r : rows) {
BOOST_REQUIRE(!expected.empty());
@@ -173,7 +173,7 @@ struct expected_tombstone {
}
};
static void assert_cached_tombstones(lw_shared_ptr<partition_snapshot> snp, std::deque<range_tombstone> expected) {
static void assert_cached_tombstones(partition_snapshot_ptr snp, std::deque<range_tombstone> expected) {
const range_tombstone_list& rts = snp->version()->partition().row_tombstones();
for (auto&& rt : rts) {
BOOST_REQUIRE(!expected.empty());
@@ -187,7 +187,7 @@ static void assert_cached_tombstones(lw_shared_ptr<partition_snapshot> snp, std:
class cache_tester {
public:
static lw_shared_ptr<partition_snapshot> snapshot_for_key(row_cache& rc, const dht::decorated_key& dk) {
static partition_snapshot_ptr snapshot_for_key(row_cache& rc, const dht::decorated_key& dk) {
return rc._read_section(rc._tracker.region(), [&] {
return with_linearized_managed_bytes([&] {
cache_entry& e = rc.find_or_create(dk, {}, rc.phase_of(dk));

View File

@@ -170,7 +170,7 @@ public:
partition_snapshot::phase_type phase() const { return _phase; }
real_dirty_memory_accounter& accounter() { return _acc; }
mutation_partition squashed(lw_shared_ptr<partition_snapshot>& snp) {
mutation_partition squashed(partition_snapshot_ptr& snp) {
logalloc::allocating_section as;
return as(_tracker.region(), [&] {
return snp->squashed();
@@ -221,7 +221,7 @@ public:
});
}
lw_shared_ptr<partition_snapshot> read() {
partition_snapshot_ptr read() {
logalloc::allocating_section as;
return as(region(), [&] {
return _e.read(region(), _container.cleaner(), schema(), &_container.tracker(), _container.phase());
@@ -467,7 +467,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_respects_continuity) {
m1.partition().make_fully_continuous();
e += m1;
lw_shared_ptr<partition_snapshot> snap;
partition_snapshot_ptr snap;
if (with_active_reader) {
snap = e.read();
}
@@ -870,8 +870,8 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
auto snap2 = e.read(r, cleaner, s, nullptr);
maybe_merge_versions(snap1);
maybe_merge_versions(snap2);
snap1 = {};
snap2 = {};
cleaner.drain().get();
@@ -890,8 +890,8 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
auto snap2 = e.read(r, cleaner, s, nullptr);
maybe_merge_versions(snap2);
maybe_merge_versions(snap1);
snap2 = {};
snap1 = {};
cleaner.drain().get();