diff --git a/db/view/view.cc b/db/view/view.cc index 20588eed9e..672447e60d 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -487,7 +487,7 @@ mutation_partition& view_updates::partition_for(partition_key&& key) { if (it != _updates.end()) { return it->second; } - return _updates.emplace(std::move(key), mutation_partition(_view)).first->second; + return _updates.emplace(std::move(key), mutation_partition(*_view)).first->second; } size_t view_updates::op_count() const { diff --git a/mutation/mutation.cc b/mutation/mutation.cc index 242eee1960..5145e5d121 100644 --- a/mutation/mutation.cc +++ b/mutation/mutation.cc @@ -13,13 +13,13 @@ mutation::data::data(dht::decorated_key&& key, schema_ptr&& schema) : _schema(std::move(schema)) , _dk(std::move(key)) - , _p(_schema) + , _p(*_schema) { } mutation::data::data(partition_key&& key_, schema_ptr&& schema) : _schema(std::move(schema)) , _dk(dht::decorate_key(*_schema, std::move(key_))) - , _p(_schema) + , _p(*_schema) { } mutation::data::data(schema_ptr&& schema, dht::decorated_key&& key, const mutation_partition& mp) diff --git a/mutation/mutation_fragment.hh b/mutation/mutation_fragment.hh index d2e853cbad..b549c801e4 100644 --- a/mutation/mutation_fragment.hh +++ b/mutation/mutation_fragment.hh @@ -90,6 +90,9 @@ public: void apply(const schema& s, const deletable_row& r) { _row.apply(s, r); } + void apply(const schema& our_schema, const schema& their_schema, const deletable_row& r) { + _row.apply(our_schema, their_schema, r); + } position_in_partition_view position() const; diff --git a/mutation/mutation_partition.cc b/mutation/mutation_partition.cc index d4bed0753b..14f6d720bc 100644 --- a/mutation/mutation_partition.cc +++ b/mutation/mutation_partition.cc @@ -243,21 +243,6 @@ void mutation_partition::ensure_last_dummy(const schema& s) { } } -void mutation_partition::apply(const schema& s, const mutation_partition& p, const schema& p_schema, - mutation_application_stats& app_stats) { - apply_weak(s, p, p_schema, app_stats); -} - -void mutation_partition::apply(const schema& s, mutation_partition&& p, - mutation_application_stats& app_stats) { - apply_weak(s, std::move(p), app_stats); -} - -void mutation_partition::apply(const schema& s, mutation_partition_view p, const schema& p_schema, - mutation_application_stats& app_stats) { - apply_weak(s, p, p_schema, app_stats); -} - struct mutation_fragment_applier { const schema& _s; mutation_partition& _mp; @@ -489,7 +474,7 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation if (s.version() == p_schema.version()) { return apply_monotonically(s, std::move(p), no_cache_tracker, app_stats, preemptible, res); } else { - mutation_partition p2(s, p); + mutation_partition p2(p_schema, p); p2.upgrade(p_schema, s); return apply_monotonically(s, std::move(p2), no_cache_tracker, app_stats, is_preemptible::no, res); // FIXME: make preemptible } @@ -508,7 +493,7 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation } void -mutation_partition::apply_weak(const schema& s, mutation_partition_view p, +mutation_partition::apply(const schema& s, mutation_partition_view p, const schema& p_schema, mutation_application_stats& app_stats) { // FIXME: Optimize mutation_partition p2(*this, copy_comparators_only{}); @@ -517,13 +502,13 @@ mutation_partition::apply_weak(const schema& s, mutation_partition_view p, apply_monotonically(s, std::move(p2), p_schema, app_stats); } -void mutation_partition::apply_weak(const schema& s, const mutation_partition& p, +void mutation_partition::apply(const schema& s, const mutation_partition& p, const schema& p_schema, mutation_application_stats& app_stats) { // FIXME: Optimize - apply_monotonically(s, mutation_partition(s, p), p_schema, app_stats); + apply_monotonically(s, mutation_partition(p_schema, p), p_schema, app_stats); } -void mutation_partition::apply_weak(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats) { +void mutation_partition::apply(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats) { apply_monotonically(s, std::move(p), no_cache_tracker, app_stats); } @@ -1161,22 +1146,42 @@ deletable_row::equal(column_kind kind, const schema& s, const deletable_row& oth return _cells.equal(kind, s, other._cells, other_schema); } -void deletable_row::apply(const schema& s, const deletable_row& src) { - apply_monotonically(s, src); -} - void deletable_row::apply(const schema& s, deletable_row&& src) { apply_monotonically(s, std::move(src)); } -void deletable_row::apply_monotonically(const schema& s, const deletable_row& src) { - _cells.apply(s, column_kind::regular_column, src._cells); +void deletable_row::apply(const schema& s, const deletable_row& src) { + apply_monotonically(s, src); +} + +void deletable_row::apply(const schema& our_schema, const schema& their_schema, deletable_row&& src) { + apply_monotonically(our_schema, their_schema, std::move(src)); +} + +void deletable_row::apply(const schema& our_schema, const schema& their_schema, const deletable_row& src) { + apply_monotonically(our_schema, their_schema, src); +} + +void deletable_row::apply_monotonically(const schema& s, deletable_row&& src) { + _cells.apply_monotonically(s, column_kind::regular_column, std::move(src._cells)); _marker.apply(src._marker); _deleted_at.apply(src._deleted_at, _marker); } -void deletable_row::apply_monotonically(const schema& s, deletable_row&& src) { - _cells.apply(s, column_kind::regular_column, std::move(src._cells)); +void deletable_row::apply_monotonically(const schema& s, const deletable_row& src) { + _cells.apply_monotonically(s, column_kind::regular_column, src._cells); + _marker.apply(src._marker); + _deleted_at.apply(src._deleted_at, _marker); +} + +void deletable_row::apply_monotonically(const schema& our_schema, const schema& their_schema, deletable_row&& src) { + _cells.apply_monotonically(our_schema, their_schema, column_kind::regular_column, std::move(src._cells)); + _marker.apply(src._marker); + _deleted_at.apply(src._deleted_at, _marker); +} + +void deletable_row::apply_monotonically(const schema& our_schema, const schema& their_schema, const deletable_row& src) { + _cells.apply_monotonically(our_schema, their_schema, column_kind::regular_column, src._cells); _marker.apply(src._marker); _deleted_at.apply(src._deleted_at, _marker); } @@ -1576,6 +1581,16 @@ row::row(const schema& s, column_kind kind, const row& o) : _size(o._size) _cells.clone_from(o._cells, clone_cell_and_hash); } +row row::construct(const schema& our_schema, const schema& their_schema, column_kind kind, const row& o) { + if (our_schema.version() == their_schema.version()) { + return row(our_schema, kind, o); + } else { + row r; + r.apply(our_schema, their_schema, kind, o); + return r; + } +} + row::~row() { } @@ -1640,7 +1655,32 @@ row& row::operator=(row&& other) noexcept { return *this; } +void row::apply(const schema& s, column_kind kind, row&& other) { + apply_monotonically(s, kind, std::move(other)); +} + void row::apply(const schema& s, column_kind kind, const row& other) { + apply_monotonically(s, kind, other); +} + +void row::apply(const schema& our_schema, const schema& their_schema, column_kind kind, row&& other) { + apply_monotonically(our_schema, their_schema, kind, std::move(other)); +}; + +void row::apply(const schema& our_schema, const schema& their_schema, column_kind kind, const row& other) { + apply_monotonically(our_schema, their_schema, kind, other); +}; + +void row::apply_monotonically(const schema& s, column_kind kind, row&& other) { + if (other.empty()) { + return; + } + other.consume_with([&] (column_id id, cell_and_hash& c_a_h) { + apply_monotonically(s.column_at(kind, id), std::move(c_a_h.cell), std::move(c_a_h.hash)); + }); +} + +void row::apply_monotonically(const schema& s, column_kind kind, const row& other) { if (other.empty()) { return; } @@ -1649,16 +1689,29 @@ void row::apply(const schema& s, column_kind kind, const row& other) { }); } -void row::apply(const schema& s, column_kind kind, row&& other) { - apply_monotonically(s, kind, std::move(other)); -} - -void row::apply_monotonically(const schema& s, column_kind kind, row&& other) { - if (other.empty()) { - return; +void row::apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind kind, row&& other) { + if (our_schema.version() == their_schema.version()) { + return apply_monotonically(our_schema, kind, std::move(other)); } other.consume_with([&] (column_id id, cell_and_hash& c_a_h) { - apply_monotonically(s.column_at(kind, id), std::move(c_a_h.cell), std::move(c_a_h.hash)); + const column_definition& their_col = their_schema.column_at(kind, id); + const column_definition* our_col = our_schema.get_column_definition(their_col.name()); + if (our_col) { + converting_mutation_partition_applier::append_cell(*this, kind, *our_col, their_col, c_a_h.cell); + } + }); +} + +void row::apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind kind, const row& other) { + if (our_schema.version() == their_schema.version()) { + return apply_monotonically(our_schema, kind, other); + } + other.for_each_cell([&] (column_id id, const cell_and_hash& c_a_h) { + const column_definition& their_col = their_schema.column_at(kind, id); + const column_definition* our_col = our_schema.get_column_definition(their_col.name()); + if (our_col) { + converting_mutation_partition_applier::append_cell(*this, kind, *our_col, their_col, c_a_h.cell); + } }); } @@ -1873,19 +1926,19 @@ bool row_marker::compact_and_expire(tombstone tomb, gc_clock::time_point now, return !is_missing() && _ttl != dead; } -mutation_partition mutation_partition::difference(schema_ptr s, const mutation_partition& other) const +mutation_partition mutation_partition::difference(const schema& s, const mutation_partition& other) const { - check_schema(*s); + check_schema(s); mutation_partition mp(s); if (_tombstone > other._tombstone) { mp.apply(_tombstone); } - mp._static_row = _static_row.difference(*s, column_kind::static_column, other._static_row); + mp._static_row = _static_row.difference(s, column_kind::static_column, other._static_row); - mp._row_tombstones = _row_tombstones.difference(*s, other._row_tombstones); + mp._row_tombstones = _row_tombstones.difference(s, other._row_tombstones); auto it_r = other._rows.begin(); - rows_entry::compare cmp_r(*s); + rows_entry::compare cmp_r(s); for (auto&& r : _rows) { if (r.dummy()) { continue; @@ -1893,12 +1946,12 @@ mutation_partition mutation_partition::difference(schema_ptr s, const mutation_p while (it_r != other._rows.end() && (it_r->dummy() || cmp_r(*it_r, r))) { ++it_r; } - if (it_r == other._rows.end() || !it_r->key().equal(*s, r.key())) { - mp.insert_row(*s, r.key(), r.row()); + if (it_r == other._rows.end() || !it_r->key().equal(s, r.key())) { + mp.insert_row(s, r.key(), r.row()); } else { - auto dr = r.row().difference(*s, column_kind::regular_column, it_r->row()); + auto dr = r.row().difference(s, column_kind::regular_column, it_r->row()); if (!dr.empty()) { - mp.insert_row(*s, r.key(), std::move(dr)); + mp.insert_row(s, r.key(), std::move(dr)); } } } @@ -1936,7 +1989,7 @@ void mutation_partition::accept(const schema& s, mutation_partition_visitor& v) void mutation_partition::upgrade(const schema& old_schema, const schema& new_schema) { // We need to copy to provide strong exception guarantees. - mutation_partition tmp(new_schema.shared_from_this()); + mutation_partition tmp(new_schema); tmp.set_static_row_continuous(_static_row_continuous); converting_mutation_partition_applier v(old_schema.get_column_mapping(), new_schema, tmp); accept(old_schema, v); diff --git a/mutation/mutation_partition.hh b/mutation/mutation_partition.hh index 7fcfdd391c..f07ad921cb 100644 --- a/mutation/mutation_partition.hh +++ b/mutation/mutation_partition.hh @@ -97,6 +97,7 @@ public: row(); ~row(); row(const schema&, column_kind, const row&); + static row construct(const schema& our_schema, const schema& their_schema, column_kind, const row&); row(row&& other) noexcept; row& operator=(row&& other) noexcept; size_t size() const { return _size; } @@ -178,10 +179,15 @@ public: // Weak exception guarantees void apply(const schema&, column_kind, const row& src); - // Weak exception guarantees void apply(const schema&, column_kind, row&& src); + void apply(const schema& our_schema, const schema& their_schema, column_kind kind, const row& other); + void apply(const schema& our_schema, const schema& their_schema, column_kind kind, row&& other); + // Monotonic exception guarantees void apply_monotonically(const schema&, column_kind, row&& src); + void apply_monotonically(const schema&, column_kind, const row& src); + void apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind, row&& src); + void apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind, const row& src); // Expires cells based on query_time. Expires tombstones based on gc_before // and max_purgeable. Removes cells covered by tomb. @@ -416,6 +422,14 @@ public: get_existing().apply_monotonically(s, kind, std::move(src.get_existing())); } + // Monotonic exception guarantees + void apply_monotonically(const schema& our_schema, const schema& their_schema, column_kind kind, lazy_row&& src) { + if (src.empty()) { + return; + } + maybe_create().apply_monotonically(our_schema, their_schema, kind, std::move(src.get_existing())); + } + // Expires cells based on query_time. Expires tombstones based on gc_before // and max_purgeable. Removes cells covered by tomb. // Returns true iff there are any live cells left. @@ -820,6 +834,11 @@ public: , _marker(other._marker) , _cells(s, column_kind::regular_column, other._cells) { } + deletable_row(const schema& our_schema, const schema& their_schema, const deletable_row& other) + : _deleted_at(other._deleted_at) + , _marker(other._marker) + , _cells(row::construct(our_schema, their_schema, column_kind::regular_column, other._cells)) + { } deletable_row(row_tombstone&& tomb, row_marker&& marker, row&& cells) : _deleted_at(std::move(tomb)), _marker(std::move(marker)), _cells(std::move(cells)) {} @@ -852,10 +871,15 @@ public: // Weak exception guarantees. After exception, both src and this will commute to the same value as // they would should the exception not happen. - void apply(const schema& s, const deletable_row& src); - void apply(const schema& s, deletable_row&& src); - void apply_monotonically(const schema& s, const deletable_row& src); - void apply_monotonically(const schema& s, deletable_row&& src); + void apply(const schema&, deletable_row&& src); + void apply(const schema&, const deletable_row& src); + void apply(const schema& our_schema, const schema& their_schema, const deletable_row& src); + void apply(const schema& our_schema, const schema& their_schema, deletable_row&& src); + + void apply_monotonically(const schema&, deletable_row&& src); + void apply_monotonically(const schema&, const deletable_row& src); + void apply_monotonically(const schema& our_schema, const schema& their_schema, deletable_row&& src); + void apply_monotonically(const schema& our_schema, const schema& their_schema, const deletable_row& src); public: row_tombstone deleted_at() const { return _deleted_at; } api::timestamp_type created_at() const { return _marker.timestamp(); } @@ -956,6 +980,12 @@ public: , _range_tombstone(e._range_tombstone) , _flags(e._flags) { } + rows_entry(const schema& our_schema, const schema& their_schema, const rows_entry& e) + : _key(e._key) + , _row(our_schema, their_schema, e._row) + , _range_tombstone(e._range_tombstone) + , _flags(e._flags) + { } // Valid only if !dummy() clustering_key& key() { return _key; @@ -989,7 +1019,10 @@ public: _row.apply(t); } void apply_monotonically(const schema& s, rows_entry&& e) { - _row.apply(s, std::move(e._row)); + _row.apply_monotonically(s, std::move(e._row)); + } + void apply_monotonically(const schema& our_schema, const schema& their_schema, rows_entry&& e) { + _row.apply_monotonically(our_schema, their_schema, std::move(e._row)); } bool empty() const { return _row.empty(); @@ -1193,11 +1226,11 @@ public: static mutation_partition make_incomplete(const schema& s, tombstone t = {}) { return mutation_partition(incomplete_tag(), s, t); } - mutation_partition(schema_ptr s) + mutation_partition(const schema& s) : _rows() - , _row_tombstones(*s) + , _row_tombstones(s) #ifdef SEASTAR_DEBUG - , _schema_version(s->version()) + , _schema_version(s.version()) #endif { } mutation_partition(mutation_partition& other, copy_comparators_only) @@ -1279,14 +1312,14 @@ public: // is not representable in this_schema is dropped, thus apply() loses commutativity. // // Weak exception guarantees. + // Assumes this and p are not owned by a cache_tracker. void apply(const schema& this_schema, const mutation_partition& p, const schema& p_schema, mutation_application_stats& app_stats); - // Use in case this instance and p share the same schema. - // Same guarantees as apply(const schema&, mutation_partition&&, const schema&); - void apply(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats); - // Same guarantees and constraints as for apply(const schema&, const mutation_partition&, const schema&). void apply(const schema& this_schema, mutation_partition_view p, const schema& p_schema, mutation_application_stats& app_stats); + // Use in case this instance and p share the same schema. + // Same guarantees and constraints as for other variants of apply(). + void apply(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats); // Applies p to this instance. // @@ -1321,15 +1354,6 @@ public: stop_iteration apply_monotonically(const schema& s, mutation_partition&& p, const schema& p_schema, mutation_application_stats& app_stats); - // Weak exception guarantees. - // Assumes this and p are not owned by a cache_tracker. - void apply_weak(const schema& s, const mutation_partition& p, const schema& p_schema, - mutation_application_stats& app_stats); - void apply_weak(const schema& s, mutation_partition&&, - mutation_application_stats& app_stats); - void apply_weak(const schema& s, mutation_partition_view p, const schema& p_schema, - mutation_application_stats& app_stats); - // Converts partition to the new schema. When succeeds the partition should only be accessed // using the new schema. // @@ -1397,7 +1421,7 @@ public: // Returns the minimal mutation_partition that when applied to "other" will // create a mutation_partition equal to the sum of other and this one. // This and other must both be governed by the same schema s. - mutation_partition difference(schema_ptr s, const mutation_partition& other) const; + mutation_partition difference(const schema& s, const mutation_partition& other) const; // Returns a subset of this mutation holding only information relevant for given clustering ranges. // Range tombstones will be trimmed to the boundaries of the clustering ranges. diff --git a/mutation/mutation_partition_v2.cc b/mutation/mutation_partition_v2.cc index 4bce368258..ebf8f500f2 100644 --- a/mutation/mutation_partition_v2.cc +++ b/mutation/mutation_partition_v2.cc @@ -66,7 +66,7 @@ mutation_partition_v2::mutation_partition_v2(const schema& s, mutation_partition auto&& tombstones = x.mutable_row_tombstones(); if (!tombstones.empty()) { try { - mutation_partition_v2 p(s.shared_from_this()); + mutation_partition_v2 p(s); for (auto&& t: tombstones) { range_tombstone & rt = t.tombstone(); @@ -75,8 +75,7 @@ mutation_partition_v2::mutation_partition_v2(const schema& s, mutation_partition .set_range_tombstone(rt.tomb); } - mutation_application_stats app_stats; - apply_monotonically(s, std::move(p), s, app_stats); + apply(s, std::move(p)); } catch (...) { _rows.clear_and_dispose(current_deleter()); throw; @@ -118,20 +117,28 @@ struct fmt::formatter : fmt::formatter { } }; -stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker* tracker, - mutation_application_stats& app_stats, is_preemptible preemptible, apply_resume& res, is_evictable evictable) { - return apply_monotonically(s, std::move(p), tracker, app_stats, - preemptible ? default_preemption_check() : never_preempt(), res, evictable); +void mutation_partition_v2::apply(const schema& s, mutation_partition&& p) { + apply(s, mutation_partition_v2(s, std::move(p))); +} +void mutation_partition_v2::apply(const schema& s, mutation_partition_v2&& p, cache_tracker* tracker, is_evictable evictable) { + mutation_application_stats app_stats; + apply_resume res; + apply_monotonically(s, s, std::move(p), tracker, app_stats, never_preempt(), res, evictable); } -stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker* tracker, +stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, const schema& p_s, mutation_partition_v2&& p, cache_tracker* tracker, mutation_application_stats& app_stats, preemption_check need_preempt, apply_resume& res, is_evictable evictable) { #ifdef SEASTAR_DEBUG - assert(s.version() == _schema_version); - assert(p._schema_version == _schema_version); + assert(_schema_version == s.version()); + assert(p._schema_version == p_s.version()); #endif + bool same_schema = s.version() == p_s.version(); _tombstone.apply(p._tombstone); - _static_row.apply_monotonically(s, column_kind::static_column, std::move(p._static_row)); + if (same_schema) [[likely]] { + _static_row.apply_monotonically(s, column_kind::static_column, std::move(p._static_row)); + } else { + _static_row.apply_monotonically(s, p_s, column_kind::static_column, std::move(p._static_row)); + } _static_row_continuous |= p._static_row_continuous; rows_entry::tri_compare cmp(s); @@ -215,7 +222,6 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat alloc_strategy_unique_ptr p_sentinel; alloc_strategy_unique_ptr this_sentinel; auto insert_sentinel_back = defer([&] { - // Insert this_sentinel before sentinel so that the former lands before the latter in LRU. if (this_sentinel) { assert(p_i != p._rows.end()); auto rt = this_sentinel->range_tombstone(); @@ -307,8 +313,8 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat if (need_preempt()) { auto s1 = alloc_strategy_unique_ptr( - current_allocator().construct(s, - position_in_partition::after_key(s, lb_i->position()), is_dummy::yes, is_continuous::no)); + current_allocator().construct(p_s, + position_in_partition::after_key(p_s, lb_i->position()), is_dummy::yes, is_continuous::no)); alloc_strategy_unique_ptr s2; if (lb_i->position().is_clustering_row()) { s2 = alloc_strategy_unique_ptr( @@ -347,8 +353,8 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat if (next_interval_loaded) { // FIXME: Avoid reallocation s1 = alloc_strategy_unique_ptr( - current_allocator().construct(s, - position_in_partition::after_key(s, src_e.position()), is_dummy::yes, is_continuous::no)); + current_allocator().construct(p_s, + position_in_partition::after_key(p_s, src_e.position()), is_dummy::yes, is_continuous::no)); if (src_e.position().is_clustering_row()) { s2 = alloc_strategy_unique_ptr( current_allocator().construct(s, @@ -361,31 +367,41 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat } } - rows_type::key_grabber pi_kg(p_i); - lb_i = _rows.insert_before(i, std::move(pi_kg)); + if (same_schema) [[likely]] { + rows_type::key_grabber pi_kg(p_i); + lb_i = _rows.insert_before(i, std::move(pi_kg)); + } else { + // FIXME: avoid cell reallocation. + // We are copying the row to make exception safety simpler, + // but it's not inherently necessary and could be avoided. + auto new_e = alloc_strategy_unique_ptr(current_allocator().construct(s, p_s, src_e)); + lb_i = _rows.insert_before(i, std::move(new_e)); + lb_i->swap(src_e); + p_i = p._rows.erase_and_dispose(p_i, del); + } p_sentinel = std::move(s1); this_sentinel = std::move(s2); - // Check if src_e falls into a continuous range. + // Check if src_e (now: lb_i) fell into a continuous range. // The range past the last entry is also always implicitly continuous. if (i == _rows.end() || i->continuous()) { tombstone i_rt = i != _rows.end() ? i->range_tombstone() : tombstone(); // Cannot apply only-row range tombstone falling into a continuous range without inserting extra entry. // Should not occur in practice due to the "older versions are evicted first" rule. // Never occurs in non-evictable snapshots because they are continuous. - if (!src_e.continuous() && src_e.range_tombstone() > i_rt) { - if (src_e.dummy()) { + if (!lb_i->continuous() && lb_i->range_tombstone() > i_rt) { + if (lb_i->dummy()) { lb_i->set_range_tombstone(i_rt); } else { position_in_partition_view i_pos = i != _rows.end() ? i->position() : position_in_partition_view::after_all_clustered_rows(); // See the "no singular tombstones" rule. mplog.error("Cannot merge entry {} with rt={}, cont=0 into continuous range before {} with rt={}", - src_e.position(), src_e.range_tombstone(), i_pos, i_rt); + lb_i->position(), lb_i->range_tombstone(), i_pos, i_rt); abort(); } } else { - lb_i->set_range_tombstone(src_e.range_tombstone() + i_rt); + lb_i->set_range_tombstone(lb_i->range_tombstone() + i_rt); } lb_i->set_continuous(true); } @@ -397,8 +413,8 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat if (next_interval_loaded) { // FIXME: Avoid reallocation s1 = alloc_strategy_unique_ptr( - current_allocator().construct(s, - position_in_partition::after_key(s, src_e.position()), is_dummy::yes, is_continuous::no)); + current_allocator().construct(p_s, + position_in_partition::after_key(p_s, src_e.position()), is_dummy::yes, is_continuous::no)); if (src_e.position().is_clustering_row()) { s2 = alloc_strategy_unique_ptr( current_allocator().construct(s, s1->position(), is_dummy::yes, is_continuous::yes)); @@ -436,18 +452,26 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat } } if (tracker) { - // Newer evictable versions store complete rows - i->row() = std::move(src_e.row()); - // Need to preserve the LRU link of the later version in case it's - // the last dummy entry which holds the partition entry linked in LRU. - i->swap(src_e); + if (same_schema) [[likely]] { + // Newer evictable versions store complete rows + i->row() = std::move(src_e.row()); + // Need to preserve the LRU link of the later version in case it's + // the last dummy entry which holds the partition entry linked in LRU. + i->swap(src_e); + } else { + i->apply_monotonically(s, p_s, std::move(src_e)); + } tracker->remove(src_e); } else { // Avoid row compaction if no newer range tombstone. do_compact = (src_e.range_tombstone() + src_e.row().deleted_at().regular()) > (i->range_tombstone() + i->row().deleted_at().regular()); memory::on_alloc_point(); - i->apply_monotonically(s, std::move(src_e)); + if (same_schema) [[likely]] { + i->apply_monotonically(s, std::move(src_e)); + } else { + i->apply_monotonically(s, p_s, std::move(src_e)); + } } ++app_stats.row_hits; p_i = p._rows.erase_and_dispose(p_i, del); @@ -491,59 +515,6 @@ stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutat return stop_iteration::yes; } -stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutation_partition_v2&& p, const schema& p_schema, - mutation_application_stats& app_stats, is_preemptible preemptible, apply_resume& res, is_evictable evictable) { - if (s.version() == p_schema.version()) { - return apply_monotonically(s, std::move(p), no_cache_tracker, app_stats, - preemptible ? default_preemption_check() : never_preempt(), res, evictable); - } else { - mutation_partition_v2 p2(s, p); - p2.upgrade(p_schema, s); - return apply_monotonically(s, std::move(p2), no_cache_tracker, app_stats, never_preempt(), res, evictable); // FIXME: make preemptible - } -} - -stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker *tracker, - mutation_application_stats& app_stats, is_evictable evictable) { - apply_resume res; - return apply_monotonically(s, std::move(p), tracker, app_stats, is_preemptible::no, res, evictable); -} - -stop_iteration mutation_partition_v2::apply_monotonically(const schema& s, mutation_partition_v2&& p, const schema& p_schema, - mutation_application_stats& app_stats) { - apply_resume res; - return apply_monotonically(s, std::move(p), p_schema, app_stats, is_preemptible::no, res, is_evictable::no); -} - -void mutation_partition_v2::apply(const schema& s, const mutation_partition_v2& p, const schema& p_schema, - mutation_application_stats& app_stats) { - apply_monotonically(s, mutation_partition_v2(p_schema, std::move(p)), p_schema, app_stats); -} - -void mutation_partition_v2::apply(const schema& s, mutation_partition_v2&& p, mutation_application_stats& app_stats) { - apply_monotonically(s, mutation_partition_v2(s, std::move(p)), no_cache_tracker, app_stats, is_evictable::no); -} - -void -mutation_partition_v2::apply_weak(const schema& s, mutation_partition_view p, - const schema& p_schema, mutation_application_stats& app_stats) { - // FIXME: Optimize - mutation_partition p2(p_schema.shared_from_this()); - partition_builder b(p_schema, p2); - p.accept(p_schema, b); - apply_monotonically(s, mutation_partition_v2(p_schema, std::move(p2)), p_schema, app_stats); -} - -void mutation_partition_v2::apply_weak(const schema& s, const mutation_partition& p, - const schema& p_schema, mutation_application_stats& app_stats) { - // FIXME: Optimize - apply_monotonically(s, mutation_partition_v2(s, p), p_schema, app_stats); -} - -void mutation_partition_v2::apply_weak(const schema& s, mutation_partition&& p, mutation_application_stats& app_stats) { - apply_monotonically(s, mutation_partition_v2(s, std::move(p)), no_cache_tracker, app_stats, is_evictable::no); -} - void mutation_partition_v2::apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t) { check_schema(schema); @@ -555,10 +526,9 @@ mutation_partition_v2::apply_row_tombstone(const schema& schema, clustering_key_ void mutation_partition_v2::apply_row_tombstone(const schema& schema, range_tombstone rt) { check_schema(schema); - mutation_partition mp(schema.shared_from_this()); + mutation_partition mp(schema); mp.apply_row_tombstone(schema, std::move(rt)); - mutation_application_stats stats; - apply_weak(schema, std::move(mp), stats); + apply(schema, std::move(mp)); } void @@ -965,7 +935,7 @@ void mutation_partition_v2::accept(const schema& s, mutation_partition_visitor& void mutation_partition_v2::upgrade(const schema& old_schema, const schema& new_schema) { // We need to copy to provide strong exception guarantees. - mutation_partition tmp(new_schema.shared_from_this()); + mutation_partition tmp(new_schema); tmp.set_static_row_continuous(_static_row_continuous); converting_mutation_partition_applier v(old_schema.get_column_mapping(), new_schema, tmp); accept(old_schema, v); @@ -973,7 +943,7 @@ mutation_partition_v2::upgrade(const schema& old_schema, const schema& new_schem } mutation_partition mutation_partition_v2::as_mutation_partition(const schema& s) const { - mutation_partition tmp(s.shared_from_this()); + mutation_partition tmp(s); tmp.set_static_row_continuous(_static_row_continuous); partition_builder v(s, tmp); accept(s, v); diff --git a/mutation/mutation_partition_v2.hh b/mutation/mutation_partition_v2.hh index 033f4f12d3..f5c74971cc 100644 --- a/mutation/mutation_partition_v2.hh +++ b/mutation/mutation_partition_v2.hh @@ -89,10 +89,10 @@ public: static mutation_partition_v2 make_incomplete(const schema& s, tombstone t = {}) { return mutation_partition_v2(incomplete_tag(), s, t); } - mutation_partition_v2(schema_ptr s) + mutation_partition_v2(const schema& s) : _rows() #ifdef SEASTAR_DEBUG - , _schema_version(s->version()) + , _schema_version(s.version()) #endif { } mutation_partition_v2(mutation_partition_v2& other, copy_comparators_only) @@ -167,18 +167,11 @@ public: // prefix must not be full void apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t); void apply_row_tombstone(const schema& schema, range_tombstone rt); - // // Applies p to current object. - // - // Commutative when this_schema == p_schema. If schemas differ, data in p which - // is not representable in this_schema is dropped, thus apply() loses commutativity. - // // Weak exception guarantees. - void apply(const schema& this_schema, const mutation_partition_v2& p, const schema& p_schema, - mutation_application_stats& app_stats); - // Use in case this instance and p share the same schema. - // Same guarantees as apply(const schema&, mutation_partition_v2&&, const schema&); - void apply(const schema& s, mutation_partition_v2&& p, mutation_application_stats& app_stats); + // Assumes this and p are not owned by a cache_tracker and non-evictable. + void apply(const schema& s, mutation_partition&&); + void apply(const schema& s, mutation_partition_v2&& p, cache_tracker* = nullptr, is_evictable evictable = is_evictable::no); // Applies p to this instance. // @@ -204,25 +197,8 @@ public: // // If is_preemptible::yes is passed, apply_resume must also be passed, // same instance each time until stop_iteration::yes is returned. - stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker*, - mutation_application_stats& app_stats, is_preemptible, apply_resume&, is_evictable); - stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, const schema& p_schema, - mutation_application_stats& app_stats, is_preemptible, apply_resume&, is_evictable); - stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker* tracker, - mutation_application_stats& app_stats, is_evictable); - stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, const schema& p_schema, - mutation_application_stats& app_stats); - stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker*, - mutation_application_stats& app_stats, preemption_check, apply_resume&, is_evictable); - - // Weak exception guarantees. - // Assumes this and p are not owned by a cache_tracker and non-evictable. - void apply_weak(const schema& s, const mutation_partition& p, const schema& p_schema, - mutation_application_stats& app_stats); - void apply_weak(const schema& s, mutation_partition&&, - mutation_application_stats& app_stats); - void apply_weak(const schema& s, mutation_partition_view p, const schema& p_schema, - mutation_application_stats& app_stats); + stop_iteration apply_monotonically(const schema& this_schema, const schema& p_schema, mutation_partition_v2&& p, + cache_tracker*, mutation_application_stats& app_stats, preemption_check, apply_resume&, is_evictable); // Converts partition to the new schema. When succeeds the partition should only be accessed // using the new schema. diff --git a/mutation/partition_version.cc b/mutation/partition_version.cc index 84ae5606d0..e860682756 100644 --- a/mutation/partition_version.cc +++ b/mutation/partition_version.cc @@ -34,6 +34,8 @@ static void remove_or_mark_as_unique_owner(partition_version* current, mutation_ partition_version::partition_version(partition_version&& pv) noexcept : anchorless_list_base_hook(std::move(pv)) , _backref(pv._backref) + , _schema(std::move(pv._schema)) + , _is_being_upgraded(pv._is_being_upgraded) , _partition(std::move(pv._partition)) { if (_backref) { @@ -56,15 +58,20 @@ partition_version::~partition_version() if (_backref) { _backref->_version = nullptr; } + with_allocator(standard_allocator(), [&] { + // Destroying the schema_ptr can cause a destruction of the schema, + // so it has to happen in the allocator which schemas are allocated in. + _schema = nullptr; + }); } stop_iteration partition_version::clear_gently(cache_tracker* tracker) noexcept { return _partition.clear_gently(tracker); } -size_t partition_version::size_in_allocator(const schema& s, allocation_strategy& allocator) const { +size_t partition_version::size_in_allocator(allocation_strategy& allocator) const { return allocator.object_memory_size_in_allocator(this) + - partition().external_memory_usage(s); + partition().external_memory_usage(*_schema); } namespace { @@ -111,15 +118,20 @@ inline Result squashed(const partition_version_ref& v, Map&& map, Reduce&& reduc } ::static_row partition_snapshot::static_row(bool digest_requested) const { - return ::static_row(::squashed(version(), - [&] (const mutation_partition_v2& mp) -> const row& { - if (digest_requested) { - mp.static_row().prepare_hash(*_schema, column_kind::static_column); - } - return mp.static_row().get(); - }, - [this] (const row& r) { return row(*_schema, column_kind::static_column, r); }, - [this] (row& a, const row& b) { a.apply(*_schema, column_kind::static_column, b); })); + const partition_version* this_v = &*version(); + partition_version* it = this_v->last(); + if (digest_requested) { + it->partition().static_row().prepare_hash(*it->get_schema(), column_kind::static_column); + } + row r = row::construct(*this_v->get_schema(), *it->get_schema(), column_kind::static_column, it->partition().static_row().get()); + while (it != this_v) { + it = it->prev(); + if (digest_requested) { + it->partition().static_row().prepare_hash(*it->get_schema(), column_kind::static_column); + } + r.apply(*this_v->get_schema(), *it->get_schema(), column_kind::static_column, it->partition().static_row().get()); + } + return ::static_row(std::move(r)); } bool partition_snapshot::static_row_continuous() const { @@ -133,15 +145,16 @@ tombstone partition_snapshot::partition_tombstone() const { } mutation_partition partition_snapshot::squashed() const { - return ::squashed(version(), - [this] (const mutation_partition_v2& mp) -> mutation_partition { - return mp.as_mutation_partition(*_schema); - }, - [] (mutation_partition&& mp) { return std::move(mp); }, - [this] (mutation_partition& a, const mutation_partition& b) { - mutation_application_stats app_stats; - a.apply(*_schema, b, *_schema, app_stats); - }); + const partition_version* this_v = &*version(); + mutation_partition mp(*this_v->get_schema()); + for (auto it = this_v->last();; it = it->prev()) { + mutation_application_stats app_stats; + mp.apply(*this_v->get_schema(), it->partition().as_mutation_partition(*it->get_schema()), *it->get_schema(), app_stats); + if (it == this_v) { + break; + } + } + return mp; } tombstone partition_entry::partition_tombstone() const { @@ -166,19 +179,43 @@ partition_snapshot::~partition_snapshot() { } void merge_versions(const schema& s, mutation_partition_v2& newer, mutation_partition_v2&& older, cache_tracker* tracker, is_evictable evictable) { - mutation_application_stats app_stats; - older.apply_monotonically(s, std::move(newer), tracker, app_stats, evictable); + older.apply(s, std::move(newer), tracker, evictable); newer = std::move(older); } +// Inserts a new version after pv. +// Used only when upgrading the schema of pv. +static partition_version& append_version(partition_version& pv, const schema& s, cache_tracker* tracker) { + // Every evictable version must have a dummy entry at the end so that + // it can be tracked in the LRU. It is also needed to allow old versions + // to stay around (with tombstones and static rows) after fully evicted. + // Such versions must be fully discontinuous, and thus have a dummy at the end. + auto new_version = tracker + ? current_allocator().construct(mutation_partition_v2::make_incomplete(s), s.shared_from_this()) + : current_allocator().construct(mutation_partition_v2(s), s.shared_from_this()); + new_version->partition().set_static_row_continuous(pv.partition().static_row_continuous()); + new_version->insert_after(pv); + if (tracker) { + tracker->insert(*new_version); + } + return *new_version; +} + stop_iteration partition_snapshot::merge_partition_versions(mutation_application_stats& app_stats) { partition_version_ref& v = version(); if (!v.is_unique_owner()) { // Shift _version to the oldest unreferenced version and then keep merging left hand side into it. // This is good for performance because in case we were at the latest version // we leave it for incoming writes and they don't have to create a new one. + // + // If `current->next()` has a different schema than `current`, it will have + // to be upgraded before being merged with `current`. + // If its upgrade is already in progress, it would be wasteful (though legal) + // to initiate its upgrade again, so we stop shifting. + // + // See the documentation in partition_version.hh for additional info about upgrades. partition_version* current = &*v; - while (current->next() && !current->next()->is_referenced()) { + while (current->next() && !current->next()->is_referenced() && !current->next()->_is_being_upgraded) { current = current->next(); _version = partition_version_ref(*current); _version_merging_state.reset(); @@ -190,8 +227,32 @@ stop_iteration partition_snapshot::merge_partition_versions(mutation_application if (!_version_merging_state) { _version_merging_state = apply_resume(); } - const auto do_stop_iteration = current->partition().apply_monotonically(*schema(), - std::move(prev->partition()), _tracker, local_app_stats, is_preemptible::yes, *_version_merging_state, + if (!prev->_is_being_upgraded && prev->get_schema()->version() != current->get_schema()->version()) { + // The versions we are attempting to merge have different schemas. + // In this scenario the older version has to be upgraded before + // being merged with the newer one. + // + // This is done by adding a fresh empty version (with the newer + // schema) after `current` and merging `current` into the new + // version. + // + // While the upgrade is happening, `_is_being_upgraded` is set + // in the version which is being upgraded, to mark it as having + // older schema than its `next()` (and therefore violating the + // normal chronological schema order). This is necessary + // precisely for the above `if`, so that after resuming a + // preempted upgrade we can simply continue, instead of + // (illegally) initiating an upgrade of the special fresh + // version back to the old schema. + // + // See the documentation in partition_version.hh for additional info about upgrades. + current = &append_version(*current, *prev->get_schema(), _tracker); + _version = partition_version_ref(*current); + prev = current->prev(); + prev->_is_being_upgraded = true; + } + const auto do_stop_iteration = current->partition().apply_monotonically(*current->get_schema(), + *prev->get_schema(), std::move(prev->partition()), _tracker, local_app_stats, default_preemption_check(), *_version_merging_state, is_evictable(bool(_tracker))); app_stats.row_hits += local_app_stats.row_hits; if (do_stop_iteration == stop_iteration::no) { @@ -200,6 +261,7 @@ stop_iteration partition_snapshot::merge_partition_versions(mutation_application // If do_stop_iteration is yes, we have to remove the previous version. // It now appears as fully continuous because it is empty. _version_merging_state.reset(); + prev->_is_being_upgraded = false; if (prev->is_referenced()) { _version.release(); prev->back_reference() = partition_version_ref(*current, prev->back_reference().is_unique_owner()); @@ -222,7 +284,7 @@ stop_iteration partition_snapshot::slide_to_oldest() noexcept { _entry = nullptr; } partition_version* current = &*v; - while (current->next() && !current->next()->is_referenced()) { + while (current->next() && !current->next()->is_referenced() && !current->next()->_is_being_upgraded) { current = current->next(); _version = partition_version_ref(*current); } @@ -239,18 +301,18 @@ unsigned partition_snapshot::version_count() return count; } -partition_entry::partition_entry(mutation_partition_v2 mp) +partition_entry::partition_entry(const schema& s, mutation_partition_v2 mp) { - auto new_version = current_allocator().construct(std::move(mp)); + auto new_version = current_allocator().construct(std::move(mp), s.shared_from_this()); _version = partition_version_ref(*new_version); } partition_entry::partition_entry(const schema& s, mutation_partition mp) - : partition_entry(mutation_partition_v2(s, std::move(mp))) + : partition_entry(s, mutation_partition_v2(s, std::move(mp))) { } partition_entry::partition_entry(partition_entry::evictable_tag, const schema& s, mutation_partition&& mp) - : partition_entry([&] { + : partition_entry(s, [&] { mp.ensure_last_dummy(s); return mutation_partition_v2(s, std::move(mp)); }()) @@ -329,8 +391,8 @@ partition_version& partition_entry::add_version(const schema& s, cache_tracker* // to stay around (with tombstones and static rows) after fully evicted. // Such versions must be fully discontinuous, and thus have a dummy at the end. auto new_version = tracker - ? current_allocator().construct(mutation_partition_v2::make_incomplete(s)) - : current_allocator().construct(mutation_partition_v2(s.shared_from_this())); + ? current_allocator().construct(mutation_partition_v2::make_incomplete(s), s.shared_from_this()) + : current_allocator().construct(mutation_partition_v2(s), s.shared_from_this()); new_version->partition().set_static_row_continuous(_version->partition().static_row_continuous()); new_version->insert_before(*_version); set_version(new_version); @@ -363,24 +425,24 @@ void partition_entry::apply(logalloc::region& r, mutation_cleaner& cleaner, cons if (s.version() != mp_schema.version()) { mp.upgrade(mp_schema, s); } - auto new_version = current_allocator().construct(std::move(mp)); + auto new_version = current_allocator().construct(std::move(mp), s.shared_from_this()); partition_snapshot_ptr snp; // Should die after new_version is inserted if (!_snapshot) { try { apply_resume res; auto notify = cleaner.make_region_space_guard(); - if (_version->partition().apply_monotonically(s, + if (_version->partition().apply_monotonically(s, s, std::move(new_version->partition()), no_cache_tracker, app_stats, - is_preemptible::yes, + default_preemption_check(), res, is_evictable::no) == stop_iteration::yes) { current_allocator().destroy(new_version); return; } else { // Apply was preempted. Let the cleaner finish the job when snapshot dies - snp = read(r, cleaner, s.shared_from_this(), no_cache_tracker); + snp = read(r, cleaner, no_cache_tracker); // FIXME: Store res in the snapshot as an optimization to resume from where we left off. } } catch (...) { @@ -418,14 +480,14 @@ utils::coroutine partition_entry::apply_to_incomplete(const schema& s, // So we cannot allow erasing when preemptible. bool can_move = !preemptible && !pe._snapshot; - auto src_snp = pe.read(reg, pe_cleaner, s.shared_from_this(), no_cache_tracker); + auto src_snp = pe.read(reg, pe_cleaner, no_cache_tracker); partition_snapshot_ptr prev_snp; if (preemptible) { // Reads must see prev_snp until whole update completes so that writes // are not partially visible. - prev_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase - 1); + prev_snp = read(reg, tracker.cleaner(), &tracker, phase - 1); } - auto dst_snp = read(reg, tracker.cleaner(), s.shared_from_this(), &tracker, phase); + auto dst_snp = read(reg, tracker.cleaner(), &tracker, phase); dst_snp->lock(); // Once we start updating the partition, we must keep all snapshots until the update completes, @@ -547,39 +609,41 @@ utils::coroutine partition_entry::apply_to_incomplete(const schema& s, }); } -mutation_partition_v2 partition_entry::squashed(schema_ptr from, schema_ptr to, is_evictable evictable) +mutation_partition_v2 partition_entry::squashed_v2(const schema& to, is_evictable evictable) { mutation_partition_v2 mp(to); mp.set_static_row_continuous(_version->partition().static_row_continuous()); for (auto&& v : _version->all_elements()) { - auto older = mutation_partition_v2(*from, v.partition()); - if (from->version() != to->version()) { - older.upgrade(*from, *to); + auto older = mutation_partition_v2(*v.get_schema(), v.partition()); + if (v.get_schema()->version() != to.version()) { + older.upgrade(*v.get_schema(), to); } - merge_versions(*to, mp, std::move(older), no_cache_tracker, evictable); + merge_versions(to, mp, std::move(older), no_cache_tracker, evictable); } return mp; } mutation_partition partition_entry::squashed(const schema& s, is_evictable evictable) { - return squashed(s.shared_from_this(), s.shared_from_this(), evictable) - .as_mutation_partition(s); + return squashed_v2(s, evictable).as_mutation_partition(s); } -void partition_entry::upgrade(schema_ptr from, schema_ptr to, mutation_cleaner& cleaner, cache_tracker* tracker) +void partition_entry::upgrade(logalloc::region& r, schema_ptr to, mutation_cleaner& cleaner, cache_tracker* tracker) { - auto new_version = current_allocator().construct(squashed(from, to, is_evictable(bool(tracker)))); - auto old_version = &*_version; - set_version(new_version); - if (tracker) { - tracker->insert(*new_version); - } - remove_or_mark_as_unique_owner(old_version, &cleaner); + with_allocator(r.allocator(), [&] { + auto phase = partition_snapshot::max_phase; + if (_snapshot) { + phase = _snapshot->_phase; + } + // The destruction of this snapshot pointer will trigger a background merge + // of the old version into the new version. + partition_snapshot_ptr snp = read(r, cleaner, tracker, phase); + add_version(*to, tracker); + }); } partition_snapshot_ptr partition_entry::read(logalloc::region& r, - mutation_cleaner& cleaner, schema_ptr entry_schema, cache_tracker* tracker, partition_snapshot::phase_type phase) + mutation_cleaner& cleaner, cache_tracker* tracker, partition_snapshot::phase_type phase) { if (_snapshot) { if (_snapshot->_phase == phase) { @@ -594,12 +658,12 @@ partition_snapshot_ptr partition_entry::read(logalloc::region& r, return snp; } else { // phase > _snapshot->_phase with_allocator(r.allocator(), [&] { - add_version(*entry_schema, tracker); + add_version(*get_schema(), tracker); }); } } - auto snp = make_lw_shared(entry_schema, r, cleaner, this, tracker, phase); + auto snp = make_lw_shared(r, cleaner, this, tracker, phase); _snapshot = snp.get(); return partition_snapshot_ptr(std::move(snp)); } @@ -639,7 +703,7 @@ std::ostream& operator<<(std::ostream& out, const partition_entry::printer& p) { } out << ") "; } - out << fmt::ptr(v) << ": " << mutation_partition_v2::printer(p._schema, v->partition()); + out << fmt::ptr(v) << ": " << mutation_partition_v2::printer(*v->get_schema(), v->partition()); v = v->next(); first = false; } diff --git a/mutation/partition_version.hh b/mutation/partition_version.hh index 855462a4b1..09ee1a7c3a 100644 --- a/mutation/partition_version.hh +++ b/mutation/partition_version.hh @@ -80,21 +80,7 @@ class static_row; // When the partition_snapshot is destroyed partition_versions are squashed // together to minimize the amount of elements on the list. // -// Scene IV. Schema upgrade -// pv pv --- pv -// ^ ^ ^ -// | | | -// pe ps(u) ps -// When there is a schema upgrade the list of partition versions pointed to -// by partition_entry is replaced by a new single partition_version that is a -// result of squashing and upgrading the old versions. -// Old versions not used by any partition snapshot are removed. The first -// partition snapshot on the list is marked as unique which means that upon -// its destruction it won't attempt to squash versions but instead remove -// the unused ones and pass the "unique owner" mark the next snapshot on the -// list (if there is any). -// -// Scene V. partition_entry eviction +// Scene IV. partition_entry eviction // pv // ^ // | @@ -104,11 +90,110 @@ class static_row; // upgrade scenario. The unused ones are destroyed right away and the first // snapshot on the list is marked as unique owner so that on its destruction // it continues removal of the partition versions. +// + +// Schema upgrades +// +// After a schema change (e.g. a column is removed), the layout of existing +// rows in memory becomes outdated and has to be adjusted before they are +// emitted by a query expecting the newer schema. +// +// Rows can be upgraded on the fly during queries. But upgrades have a high CPU +// cost, so we want them to happen only once. The upgraded row should be saved +// in memory so that future queries don't have to upgrade it again. +// And it should replace the old row as soon as possible (when there +// the row is no longer reachable through the old schema) to conserve memory. +// +// This behavior is akin to MVCC. A schema upgrade can be thought of as a +// special kind of update which affects all rows, and the MVCC machinery can be +// naturally hijacked to implement it. +// +// Currently, we do it as follows: +// +// - Each MVCC version has its own schema pointer. Versions in the same chain +// can be of different schemas. +// +// - The schema of a partition entry is defined as the schema of the newest version. +// A partition entry upgrade is performed simply by inserting a new empty version with +// the new schema. (And triggering a background version merge by creating and immediately +// destroying a snapshot pointing at the previous newest version). +// Due to this, schemas of versions in the chain are ordered chronologically. +// (The order is important because it's forbidden to upgrade to an older version, +// because that's lossy -- e.g. a new column can be lost). +// +// - On read, the cursor upgrades rows on the fly to the cursor's schema. +// If the cursor reads the latest version, the upgraded rows are written to the latest +// version. +// +// - When versions are merged, rows are upgraded to the newer schema, the result of the +// merge has the newer schema. +// +// This one is tricky. A natural idea is to merge older versions into the newer version, +// (upgrading rows when moving/copying them between versions), so that after a merge +// only the new version is left. But usually we want to merge in the other direction. +// +// (When an database write arrives, we want to merge it into the existing +// older version, so that it has a cost proportional to the size of the +// write, not to the size of the existing version, which can be arbitrarily +// large. Doing otherwise would invite quadratic behaviour) +// +// The merging algorithm is already very complicated and making it work in both +// directions (or adding a separate algorithm specifically for upgrades) would +// complicate things even further. +// +// So instead, when two versions of different schema are merged, the older version +// (which also has the older schema) is first upgraded to the newer schema in a special +// upgrade process which only uses regular newer-into-older merging. +// This is done by appending a fresh empty version with the newer schema after +// the version-to-be-upgraded, and merging the version-to-be-upgraded into the new one. +// In the end, only the new version with the newer schema is left. +// +// Technically the above procedure temporarily violates the rule that schema versions +// in the chain are ordered chronologically (which is needed for correctness). +// So while the above is happening, the version-to-be-upgraded has _is_being_upgraded set. +// A version with _is_being_upgraded is understood to be special in that its +// schema is older than its next neighbour's, and care is taken so that the +// neighbour isn't recursively downgraded back to the older schema. +// A version with _is_being_upgraded can be viewed together with its next() as +// conceptually a single version with the schema of next(). +// +// The typical upgrade sequence, illustrated: +// 1. Initial state: +// pv1 (s1) +// ^ +// | +// pe +// 2. partition_entry::upgrade(s2) is called. Empty pv2 is added. +// pv2 (s2) -- pv1 (s1) +// ^ ^ +// | | +// pe ps1 (created and instantly dropped, so that merging is initiated) +// 3. Some time later, mutation_cleaner calls merge_partition_versions(ps1). +// Merge of pv2 and pv1 is attempted. +// Schemas differ, so instead an upgrade of pv1 is initiated. Empty pv1' is added. +// pv1 is now conceptually "owned" by pv1', and no snapshot is allowed to point to it +// after this point. +// pv2 (s2) -- pv1 (s1, _is_being_upgraded) -- pv1' (s2) +// ^ ^ +// | | +// pe ps1 +// 4. Eventually pv1 is fully upgrade-merged into pv1' and destroyed. +// pv2 (s2) -- pv1' (s2) +// ^ ^ +// | | +// pe ps1 +// 5. Upgrade over, further merge proceeds as usual. Eventually pv2 is fully merged into pv1'. +// pv1' (s2) +// ^ +// | +// pe class partition_version_ref; class partition_version : public anchorless_list_base_hook { partition_version_ref* _backref = nullptr; + schema_ptr _schema; + bool _is_being_upgraded = false; mutation_partition_v2 _partition; friend class partition_version_ref; @@ -120,9 +205,18 @@ public: } explicit partition_version(schema_ptr s) noexcept - : _partition(std::move(s)) { } - explicit partition_version(mutation_partition_v2 mp) noexcept - : _partition(std::move(mp)) { } + : _schema(std::move(s)) + , _partition(*_schema) + { + assert(_schema); + } + explicit partition_version(mutation_partition_v2 mp, schema_ptr s) noexcept + : _schema(std::move(s)) + , _partition(std::move(mp)) + { + assert(_schema); + } + partition_version(partition_version&& pv) noexcept; partition_version& operator=(partition_version&& pv) noexcept; ~partition_version(); @@ -138,7 +232,9 @@ public: bool is_referenced_from_entry() const; partition_version_ref& back_reference() const { return *_backref; } - size_t size_in_allocator(const schema& s, allocation_strategy& allocator) const; + size_t size_in_allocator(allocation_strategy& allocator) const; + + const schema_ptr& get_schema() const noexcept { return _schema; } }; using partition_version_range = anchorless_list_base_hook::range; @@ -256,7 +352,6 @@ public: } }; private: - schema_ptr _schema; // Either _version or _entry is non-null. partition_version_ref _version; partition_entry* _entry; @@ -270,13 +365,12 @@ private: friend class partition_entry; friend class mutation_cleaner_impl; public: - explicit partition_snapshot(schema_ptr s, - logalloc::region& region, + explicit partition_snapshot(logalloc::region& region, mutation_cleaner& cleaner, partition_entry* entry, cache_tracker* tracker, // non-null for evictable snapshots phase_type phase = default_phase) - : _schema(std::move(s)), _entry(entry), _phase(phase), _region(®ion), _cleaner(&cleaner), _tracker(tracker) { } + : _entry(entry), _phase(phase), _region(®ion), _cleaner(&cleaner), _tracker(tracker) { } partition_snapshot(const partition_snapshot&) = delete; partition_snapshot(partition_snapshot&&) = delete; partition_snapshot& operator=(const partition_snapshot&) = delete; @@ -358,7 +452,7 @@ public: return !version()->next(); } - const schema_ptr& schema() const { return _schema; } + const schema_ptr& schema() const { return version()->get_schema(); } logalloc::region& region() const { return *_region; } cache_tracker* tracker() const { return _tracker; } mutation_cleaner& cleaner() { return *_cleaner; } @@ -439,7 +533,7 @@ public: // Constructs a non-evictable entry holding empty partition partition_entry() = default; // Constructs a non-evictable entry - explicit partition_entry(mutation_partition_v2); + partition_entry(const schema&, mutation_partition_v2); partition_entry(const schema&, mutation_partition); // Returns a reference to partition_entry containing given pv, // assuming pv.is_referenced_from_entry(). @@ -523,24 +617,28 @@ public: const schema& mp_schema, mutation_application_stats& app_stats); - // Adds mutation_partition represented by "other" to the one represented + // Adds mutation_partition represented by "pe" to the one represented // by this entry. // This entry must be evictable. + // "pe" must be fully-continuous. + // (Alternatively: applies the "pe" memtable entry to "this" cache entry.) // - // The argument must be fully-continuous. - // - // The continuity of this entry remains unchanged. Information from "other" + // The continuity of this entry remains unchanged. Information from "pe" // which is incomplete in this instance is dropped. In other words, this // performs set intersection on continuity information, drops information // which falls outside of the continuity range, and applies regular merging // rules for the rest. + // (Rationale: updates from the memtable are only applied to intervals + // which were already in cache. The cache treats the entire sstable set as a + // single source -- it isn't able to store partial information only from a + // single sstable.) // // Weak exception guarantees. - // If an exception is thrown this and pe will be left in some valid states + // If an exception is thrown, "this" and "pe" will be left in some valid states // such that if the operation is retried (possibly many times) and eventually // succeeds the result will be as if the first attempt didn't fail. // - // The schema of pe must conform to s. + // The schema of "pe" must conform to "s". // // Returns a coroutine object representing the operation. // The coroutine must be resumed with the region being unlocked. @@ -581,27 +679,27 @@ public: return *_version; } - mutation_partition_v2 squashed(schema_ptr from, schema_ptr to, is_evictable); + mutation_partition_v2 squashed_v2(const schema& to, is_evictable); mutation_partition squashed(const schema&, is_evictable); tombstone partition_tombstone() const; // needs to be called with reclaiming disabled // Must not be called when is_locked(). - void upgrade(schema_ptr from, schema_ptr to, mutation_cleaner&, cache_tracker*); + void upgrade(logalloc::region& r, schema_ptr to, mutation_cleaner&, cache_tracker*); + + const schema_ptr& get_schema() const noexcept { return _version->get_schema(); } // Snapshots with different values of phase will point to different partition_version objects. // When is_locked(), read() can only be called with a phase which is <= the phase of the current snapshot. partition_snapshot_ptr read(logalloc::region& region, mutation_cleaner&, - schema_ptr entry_schema, cache_tracker*, partition_snapshot::phase_type phase = partition_snapshot::default_phase); class printer { - const schema& _schema; const partition_entry& _partition_entry; public: - printer(const schema& s, const partition_entry& pe) : _schema(s), _partition_entry(pe) { } + printer(const partition_entry& pe) : _partition_entry(pe) { } printer(const printer&) = delete; printer(printer&&) = delete; diff --git a/partition_snapshot_row_cursor.hh b/partition_snapshot_row_cursor.hh index ee853764a3..149fc2b5c1 100644 --- a/partition_snapshot_row_cursor.hh +++ b/partition_snapshot_row_cursor.hh @@ -111,6 +111,7 @@ class partition_snapshot_row_cursor final { mutation_partition::rows_type::iterator it; utils::immutable_collection rows; int version_no; + const schema* schema; bool unique_owner = false; is_continuous continuous = is_continuous::no; // Range continuity in the direction of lower keys (in cursor schema domain). @@ -202,7 +203,7 @@ class partition_snapshot_row_cursor final { position_in_version& v = _heap.back(); rows_entry& e = *v.it; if (_digest_requested) { - e.row().cells().prepare_hash(_schema, column_kind::regular_column); + e.row().cells().prepare_hash(*v.schema, column_kind::regular_column); } _dummy &= bool(e.dummy()); _continuous |= bool(v.continuous); @@ -280,7 +281,7 @@ class partition_snapshot_row_cursor final { rt = pos->range_tombstone(); } if (pos) [[likely]] { - _heap.emplace_back(position_in_version{pos, std::move(rows), version_no, unique_owner, cont, rt}); + _heap.emplace_back(position_in_version{pos, std::move(rows), version_no, v.get_schema().get(), unique_owner, cont, rt}); } } else { if (_reversed) [[unlikely]] { @@ -293,7 +294,7 @@ class partition_snapshot_row_cursor final { _background_continuity = true; // Default continuity past the last entry } if (pos) [[likely]] { - _heap.emplace_back(position_in_version{pos, std::move(rows), version_no, unique_owner, is_continuous::yes}); + _heap.emplace_back(position_in_version{pos, std::move(rows), version_no, v.get_schema().get(), unique_owner, is_continuous::yes}); } } ++version_no; @@ -469,7 +470,7 @@ public: } } else if (match) { _current_row.insert(_current_row.begin(), position_in_version{ - it, std::move(rows), 0, _unique_owner, cont, rt}); + it, std::move(rows), 0, _snp.version()->get_schema().get(), _unique_owner, cont, rt}); if (heap_i != _heap.end()) { _heap.erase(heap_i); boost::range::make_heap(_heap, heap_less); @@ -482,7 +483,7 @@ public: boost::range::make_heap(_heap, heap_less); } else { _heap.push_back(position_in_version{ - it, std::move(rows), 0, _unique_owner, cont, rt}); + it, std::move(rows), 0, _snp.version()->get_schema().get(), _unique_owner, cont, rt}); boost::range::push_heap(_heap, heap_less); } } @@ -556,9 +557,9 @@ public: clustering_row row() const { // Note: if the precondition ("cursor is valid and pointing at a row") is fulfilled // then _current_row is not empty, so the below is valid. - clustering_row cr(key(), deletable_row(_schema, _current_row[0].it->row())); + clustering_row cr(key(), deletable_row(_schema, *_current_row[0].schema, _current_row[0].it->row())); for (size_t i = 1; i < _current_row.size(); ++i) { - cr.apply(_schema, _current_row[i].it->row()); + cr.apply(_schema, *_current_row[i].schema, _current_row[i].it->row()); } return cr; } @@ -571,32 +572,23 @@ public: // Can be called only when cursor is valid and pointing at a row. // Monotonic exception guarantees. template - requires std::is_invocable_v + requires std::is_invocable_v void consume_row(Consumer&& consumer) { for (position_in_version& v : _current_row) { - if (v.unique_owner) { + if (v.unique_owner && (_schema.version() == v.schema->version())) [[likely]] { consumer(std::move(v.it->row())); } else { - consumer(deletable_row(_schema, v.it->row())); + consumer(deletable_row(_schema, *v.schema, v.it->row())); } } } - // Can be called only when cursor is valid and pointing at a row. - template - requires std::is_invocable_v - void consume_row(Consumer&& consumer) const { - for (const position_in_version& v : _current_row) { - consumer(v.it->row()); - } - } - // Returns memory footprint of row entries under the cursor. // Can be called only when cursor is valid and pointing at a row. size_t memory_usage() const { size_t result = 0; for (const position_in_version& v : _current_row) { - result += v.it->memory_usage(_schema); + result += v.it->memory_usage(*v.schema); } return result; } @@ -631,7 +623,7 @@ public: is_dummy(!_position.is_clustering_row()), is_continuous::no)); } else { return alloc_strategy_unique_ptr( - current_allocator().construct(*_snp.schema(), *_current_row[0].it)); + current_allocator().construct(*_snp.schema(), *_current_row[0].schema, *_current_row[0].it)); } }(); rows_entry& re = *e; @@ -707,7 +699,7 @@ public: } } auto e = alloc_strategy_unique_ptr( - current_allocator().construct(_schema, pos, + current_allocator().construct(*_snp.version()->get_schema(), pos, is_dummy(!pos.is_clustering_row()), is_continuous::no)); if (latest_i && latest_i->continuous()) { diff --git a/replica/memtable.cc b/replica/memtable.cc index 0f0c51628c..0511fff5f7 100644 --- a/replica/memtable.cc +++ b/replica/memtable.cc @@ -225,7 +225,7 @@ memtable::find_or_create_partition(const dht::decorated_key& key) { if (i == partitions.end() || !hint.match) { partitions_type::iterator entry = partitions.emplace_before(i, key.token().raw(), hint, - _schema, dht::decorated_key(key), mutation_partition(_schema)); + _schema, dht::decorated_key(key), mutation_partition(*_schema)); ++nr_partitions; ++_table_stats.memtable_partition_insertions; if (!hint.emplace_keeps_iterators()) { @@ -696,7 +696,7 @@ public: }; partition_snapshot_ptr memtable_entry::snapshot(memtable& mtbl) { - return _pe.read(mtbl.region(), mtbl.cleaner(), _schema, no_cache_tracker); + return _pe.read(mtbl.region(), mtbl.cleaner(), no_cache_tracker); } flat_mutation_reader_v2_opt @@ -793,7 +793,7 @@ memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_han with_allocator(allocator(), [this, &m, &m_schema] { _allocating_section(*this, [&, this] { auto& p = find_or_create_partition_slow(m.key()); - mutation_partition mp(m_schema); + mutation_partition mp(*m_schema); partition_builder pb(*m_schema, mp); m.partition().accept(*m_schema, pb); _stats_collector.update(*m_schema, mp); @@ -821,8 +821,7 @@ mutation_source memtable::as_data_source() { } memtable_entry::memtable_entry(memtable_entry&& o) noexcept - : _schema(std::move(o._schema)) - , _key(std::move(o._key)) + : _key(std::move(o._key)) , _pe(std::move(o._pe)) , _flags(o._flags) { } @@ -839,19 +838,16 @@ bool memtable::is_flushed() const noexcept { return bool(_underlying); } -void memtable_entry::upgrade_schema(const schema_ptr& s, mutation_cleaner& cleaner) { - if (_schema != s) { - partition().upgrade(_schema, s, cleaner, no_cache_tracker); - _schema = s; +void memtable_entry::upgrade_schema(logalloc::region& r, const schema_ptr& s, mutation_cleaner& cleaner) { + if (schema() != s) { + partition().upgrade(r, s, cleaner, no_cache_tracker); } } void memtable::upgrade_entry(memtable_entry& e) { - if (e._schema != _schema) { + if (e.schema() != _schema) { assert(!reclaiming_enabled()); - with_allocator(allocator(), [this, &e] { - e.upgrade_schema(_schema, cleaner()); - }); + e.upgrade_schema(region(), _schema, cleaner()); } } @@ -870,7 +866,7 @@ std::ostream& operator<<(std::ostream& out, memtable& mt) { } std::ostream& operator<<(std::ostream& out, const memtable_entry& mt) { - return out << "{" << mt.key() << ": " << partition_entry::printer(*mt.schema(), mt.partition()) << "}"; + return out << "{" << mt.key() << ": " << partition_entry::printer(mt.partition()) << "}"; } } diff --git a/replica/memtable.hh b/replica/memtable.hh index 11ebd50710..c02a9f8340 100644 --- a/replica/memtable.hh +++ b/replica/memtable.hh @@ -33,7 +33,6 @@ namespace bi = boost::intrusive; namespace replica { class memtable_entry { - schema_ptr _schema; dht::decorated_key _key; partition_entry _pe; struct { @@ -52,9 +51,8 @@ public: friend class memtable; memtable_entry(schema_ptr s, dht::decorated_key key, mutation_partition p) - : _schema(std::move(s)) - , _key(std::move(key)) - , _pe(*_schema, std::move(p)) + : _key(std::move(key)) + , _pe(*s, std::move(p)) { } memtable_entry(memtable_entry&& o) noexcept; @@ -65,13 +63,12 @@ public: dht::decorated_key& key() { return _key; } const partition_entry& partition() const { return _pe; } partition_entry& partition() { return _pe; } - const schema_ptr& schema() const { return _schema; } - schema_ptr& schema() { return _schema; } + const schema_ptr& schema() const { return _pe.get_schema(); } partition_snapshot_ptr snapshot(memtable& mtbl); // Makes the entry conform to given schema. // Must be called under allocating section of the region which owns the entry. - void upgrade_schema(const schema_ptr&, mutation_cleaner&); + void upgrade_schema(logalloc::region&, const schema_ptr&, mutation_cleaner&); size_t external_memory_usage_without_rows() const { return _key.key().external_memory_usage(); @@ -86,7 +83,7 @@ public: size_t size_in_allocator(allocation_strategy& allocator) { auto size = size_in_allocator_without_rows(allocator); for (auto&& v : _pe.versions()) { - size += v.size_in_allocator(*_schema, allocator); + size += v.size_in_allocator(allocator); } return size; } diff --git a/row_cache.cc b/row_cache.cc index bd9675138a..817e02051e 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -846,7 +846,7 @@ cache_entry& row_cache::find_or_create_incomplete(const partition_start& ps, row cache_entry& row_cache::find_or_create_missing(const dht::decorated_key& key) { return do_find_or_create_entry(key, nullptr, [&] (auto i, const partitions_type::bound_hint& hint) { - mutation_partition mp(_schema); + mutation_partition mp(*_schema); bool cont = i->continuous(); partitions_type::iterator entry = _partitions.emplace_before(i, key.token().raw(), hint, _schema, key, std::move(mp)); @@ -1024,9 +1024,9 @@ future<> row_cache::update(external_updater eu, replica::memtable& m) { if (cache_i != partitions_end() && hint.match) { cache_entry& entry = *cache_i; upgrade_entry(entry); - assert(entry._schema == _schema); + assert(entry.schema() == _schema); _tracker.on_partition_merge(); - mem_e.upgrade_schema(_schema, _tracker.memtable_cleaner()); + mem_e.upgrade_schema(_tracker.region(), _schema, _tracker.memtable_cleaner()); return entry.partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), _tracker.memtable_cleaner(), alloc, _tracker.region(), _tracker, _underlying_phase, acc); } else if (cache_i->continuous() @@ -1035,10 +1035,10 @@ future<> row_cache::update(external_updater eu, replica::memtable& m) { // Partition is absent in underlying. First, insert a neutral partition entry. partitions_type::iterator entry = _partitions.emplace_before(cache_i, mem_e.key().token().raw(), hint, cache_entry::evictable_tag(), _schema, dht::decorated_key(mem_e.key()), - partition_entry::make_evictable(*_schema, mutation_partition(_schema))); + partition_entry::make_evictable(*_schema, mutation_partition(*_schema))); entry->set_continuous(cache_i->continuous()); _tracker.insert(*entry); - mem_e.upgrade_schema(_schema, _tracker.memtable_cleaner()); + mem_e.upgrade_schema(_tracker.region(), _schema, _tracker.memtable_cleaner()); return entry->partition().apply_to_incomplete(*_schema, std::move(mem_e.partition()), _tracker.memtable_cleaner(), alloc, _tracker.region(), _tracker, _underlying_phase, acc); } else { @@ -1194,8 +1194,7 @@ row_cache::row_cache(schema_ptr s, snapshot_source src, cache_tracker& tracker, } cache_entry::cache_entry(cache_entry&& o) noexcept - : _schema(std::move(o._schema)) - , _key(std::move(o._key)) + : _key(std::move(o._key)) , _pe(std::move(o._pe)) , _flags(o._flags) { @@ -1296,9 +1295,9 @@ flat_mutation_reader_v2 cache_entry::read(row_cache& rc, std::unique_ptr unique_ctx) { - auto snp = _pe.read(rc._tracker.region(), rc._tracker.cleaner(), _schema, &rc._tracker, unique_ctx->phase()); - auto ckr = query::clustering_key_filter_ranges::get_native_ranges(*_schema, unique_ctx->native_slice(), _key.key()); + auto snp = _pe.read(rc._tracker.region(), rc._tracker.cleaner(), &rc._tracker, unique_ctx->phase()); + auto ckr = query::clustering_key_filter_ranges::get_native_ranges(*schema(), unique_ctx->native_slice(), _key.key()); schema_ptr reader_schema = unique_ctx->schema(); - schema_ptr entry_schema = to_query_domain(unique_ctx->slice(), _schema); + schema_ptr entry_schema = to_query_domain(unique_ctx->slice(), schema()); auto rc_schema = to_query_domain(unique_ctx->slice(), rc.schema()); auto r = make_cache_flat_mutation_reader(entry_schema, _key, std::move(ckr), rc, std::move(unique_ctx), std::move(snp)); r.upgrade_schema(rc_schema); @@ -1322,13 +1321,10 @@ const schema_ptr& row_cache::schema() const { } void row_cache::upgrade_entry(cache_entry& e) { - if (e._schema != _schema && !e.partition().is_locked()) { + if (e.schema() != _schema && !e.partition().is_locked()) { auto& r = _tracker.region(); assert(!r.reclaiming_enabled()); - with_allocator(r.allocator(), [this, &e] { - e.partition().upgrade(e._schema, _schema, _tracker.cleaner(), &_tracker); - e._schema = _schema; - }); + e.partition().upgrade(r, _schema, _tracker.cleaner(), &_tracker); } } @@ -1371,6 +1367,6 @@ std::ostream& operator<<(std::ostream& out, const cache_entry& e) { return out << "{cache_entry: " << e.position() << ", cont=" << e.continuous() << ", dummy=" << e.is_dummy_entry() - << ", " << partition_entry::printer(*e.schema(), e.partition()) + << ", " << partition_entry::printer(e.partition()) << "}"; } diff --git a/row_cache.hh b/row_cache.hh index 2be9c37e11..7e293f7b3c 100644 --- a/row_cache.hh +++ b/row_cache.hh @@ -50,7 +50,6 @@ class lsa_manager; // // TODO: Make memtables use this format too. class cache_entry { - schema_ptr _schema; dht::decorated_key _key; partition_entry _pe; // True when we know that there is nothing between this entry and the previous one in cache @@ -86,9 +85,8 @@ public: } cache_entry(schema_ptr s, const dht::decorated_key& key, const mutation_partition& p) - : _schema(std::move(s)) - , _key(key) - , _pe(partition_entry::make_evictable(*_schema, mutation_partition(*_schema, p))) + : _key(key) + , _pe(partition_entry::make_evictable(*s, mutation_partition(*s, p))) { } cache_entry(schema_ptr s, dht::decorated_key&& key, mutation_partition&& p) @@ -99,8 +97,7 @@ public: // It is assumed that pe is fully continuous // pe must be evictable. cache_entry(evictable_tag, schema_ptr s, dht::decorated_key&& key, partition_entry&& pe) noexcept - : _schema(std::move(s)) - , _key(std::move(key)) + : _key(std::move(key)) , _pe(std::move(pe)) { } @@ -130,8 +127,7 @@ public: const partition_entry& partition() const noexcept { return _pe; } partition_entry& partition() { return _pe; } - const schema_ptr& schema() const noexcept { return _schema; } - schema_ptr& schema() noexcept { return _schema; } + const schema_ptr& schema() const noexcept { return _pe.get_schema(); } flat_mutation_reader_v2 read(row_cache&, cache::read_context&); flat_mutation_reader_v2 read(row_cache&, std::unique_ptr); flat_mutation_reader_v2 read(row_cache&, cache::read_context&, utils::phased_barrier::phase_type); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 8531901af0..117afdbb13 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -4509,7 +4509,7 @@ public: const mutation& m = z.get<1>().mut; for (const version& v : z.get<0>()) { auto diff = v.par - ? m.partition().difference(schema, (co_await v.par->mut().unfreeze_gently(schema)).partition()) + ? m.partition().difference(*schema, (co_await v.par->mut().unfreeze_gently(schema)).partition()) : mutation_partition(*schema, m.partition()); std::optional mdiff; if (!diff.empty()) { diff --git a/test/boost/cache_flat_mutation_reader_test.cc b/test/boost/cache_flat_mutation_reader_test.cc index 9f28ebe2b5..8ac7287d4f 100644 --- a/test/boost/cache_flat_mutation_reader_test.cc +++ b/test/boost/cache_flat_mutation_reader_test.cc @@ -190,7 +190,7 @@ public: static partition_snapshot_ptr snapshot_for_key(row_cache& rc, const dht::decorated_key& dk) { return rc._read_section(rc._tracker.region(), [&] { cache_entry& e = rc.lookup(dk); - return e.partition().read(rc._tracker.region(), rc._tracker.cleaner(), e.schema(), &rc._tracker); + return e.partition().read(rc._tracker.region(), rc._tracker.cleaner(), &rc._tracker); }); } }; diff --git a/test/boost/counter_test.cc b/test/boost/counter_test.cc index 5edbe43a6d..abf2e903b8 100644 --- a/test/boost/counter_test.cc +++ b/test/boost/counter_test.cc @@ -270,7 +270,7 @@ SEASTAR_TEST_CASE(test_counter_mutations) { // Difference - m = mutation(s, m1.decorated_key(), m1.partition().difference(s, m2.partition())); + m = mutation(s, m1.decorated_key(), m1.partition().difference(*s, m2.partition())); ac = get_counter_cell(m); BOOST_REQUIRE(ac.is_live()); { @@ -287,7 +287,7 @@ SEASTAR_TEST_CASE(test_counter_mutations) { verify_shard_order(ccv); } - m = mutation(s, m1.decorated_key(), m2.partition().difference(s, m1.partition())); + m = mutation(s, m1.decorated_key(), m2.partition().difference(*s, m1.partition())); ac = get_counter_cell(m); BOOST_REQUIRE(ac.is_live()); { @@ -304,11 +304,11 @@ SEASTAR_TEST_CASE(test_counter_mutations) { verify_shard_order(ccv); } - m = mutation(s, m1.decorated_key(), m1.partition().difference(s, m3.partition())); + m = mutation(s, m1.decorated_key(), m1.partition().difference(*s, m3.partition())); BOOST_REQUIRE_EQUAL(m.partition().clustered_rows().calculate_size(), 0); BOOST_REQUIRE(m.partition().static_row().empty()); - m = mutation(s, m1.decorated_key(), m3.partition().difference(s, m1.partition())); + m = mutation(s, m1.decorated_key(), m3.partition().difference(*s, m1.partition())); ac = get_counter_cell(m); BOOST_REQUIRE(!ac.is_live()); diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index 3d5d4c399d..f9b4fbd65e 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -1103,7 +1103,7 @@ SEASTAR_TEST_CASE(test_v2_apply_monotonically_is_monotonic_on_alloc_failures) { m2 = mutation_partition_v2(s, second.partition()); }); auto check = defer([&] { - m.apply_monotonically(s, std::move(m2), no_cache_tracker, app_stats, is_evictable::no); + m.apply(s, std::move(m2)); assert_that(target.schema(), m).is_equal_to_compacted(expected.partition()); }); auto continuity_check = defer([&] { @@ -1120,7 +1120,7 @@ SEASTAR_TEST_CASE(test_v2_apply_monotonically_is_monotonic_on_alloc_failures) { } }); apply_resume res; - if (m.apply_monotonically(s, std::move(m2), no_cache_tracker, app_stats, preempt_check, res, is_evictable::yes) == stop_iteration::yes) { + if (m.apply_monotonically(s, s, std::move(m2), no_cache_tracker, app_stats, preempt_check, res, is_evictable::yes) == stop_iteration::yes) { continuity_check.cancel(); seastar::memory::local_failure_injector().cancel(); } @@ -1195,7 +1195,7 @@ SEASTAR_TEST_CASE(test_mutation_diff) { m12.apply(m1); m12.apply(m2); - auto m2_1 = m2.partition().difference(s, m1.partition()); + auto m2_1 = m2.partition().difference(*s, m1.partition()); BOOST_REQUIRE_EQUAL(m2_1.partition_tombstone(), tombstone()); BOOST_REQUIRE(!m2_1.static_row().size()); BOOST_REQUIRE(!m2_1.find_row(*s, ckey1)); @@ -1212,7 +1212,7 @@ SEASTAR_TEST_CASE(test_mutation_diff) { m12_1.partition().apply(*s, m2_1, *s, app_stats); BOOST_REQUIRE_EQUAL(m12, m12_1); - auto m1_2 = m1.partition().difference(s, m2.partition()); + auto m1_2 = m1.partition().difference(*s, m2.partition()); BOOST_REQUIRE_EQUAL(m1_2.partition_tombstone(), m12.partition().partition_tombstone()); BOOST_REQUIRE(m1_2.find_row(*s, ckey1)); BOOST_REQUIRE(m1_2.find_row(*s, ckey2)); @@ -1230,10 +1230,10 @@ SEASTAR_TEST_CASE(test_mutation_diff) { m12_2.partition().apply(*s, m1_2, *s, app_stats); BOOST_REQUIRE_EQUAL(m12, m12_2); - auto m3_12 = m3.partition().difference(s, m12.partition()); + auto m3_12 = m3.partition().difference(*s, m12.partition()); BOOST_REQUIRE(m3_12.empty()); - auto m12_3 = m12.partition().difference(s, m3.partition()); + auto m12_3 = m12.partition().difference(*s, m3.partition()); BOOST_REQUIRE_EQUAL(m12_3.partition_tombstone(), m12.partition().partition_tombstone()); mutation m123(s, partition_key::from_single_value(*s, "key1")); @@ -1364,13 +1364,13 @@ SEASTAR_TEST_CASE(test_query_digest) { auto m3 = m2; { - auto diff = m1.partition().difference(s, m2.partition()); + auto diff = m1.partition().difference(*s, m2.partition()); m3.partition().apply(*m3.schema(), std::move(diff), app_stats); } auto m4 = m1; { - auto diff = m2.partition().difference(s, m1.partition()); + auto diff = m2.partition().difference(*s, m1.partition()); m4.partition().apply(*m4.schema(), std::move(diff), app_stats); } @@ -1881,7 +1881,7 @@ SEASTAR_TEST_CASE(test_collection_cell_diff) { mutation m12 = m1; m12.apply(m2); - auto diff = m12.partition().difference(s, m1.partition()); + auto diff = m12.partition().difference(*s, m1.partition()); BOOST_REQUIRE(!diff.empty()); BOOST_REQUIRE(m2.partition().equal(*s, diff)); }); @@ -1919,11 +1919,11 @@ SEASTAR_TEST_CASE(test_mutation_diff_with_random_generator) { auto m12 = m1; m12.apply(m2); auto m12_with_diff = m1; - m12_with_diff.partition().apply(*s, m2.partition().difference(s, m1.partition()), app_stats); + m12_with_diff.partition().apply(*s, m2.partition().difference(*s, m1.partition()), app_stats); check_partitions_match(m12.partition(), m12_with_diff.partition(), *s); - check_partitions_match(mutation_partition{s}, m1.partition().difference(s, m1.partition()), *s); - check_partitions_match(m1.partition(), m1.partition().difference(s, mutation_partition{s}), *s); - check_partitions_match(mutation_partition{s}, mutation_partition{s}.difference(s, m1.partition()), *s); + check_partitions_match(mutation_partition{*s}, m1.partition().difference(*s, m1.partition()), *s); + check_partitions_match(m1.partition(), m1.partition().difference(*s, mutation_partition{*s}), *s); + check_partitions_match(mutation_partition{*s}, mutation_partition{*s}.difference(*s, m1.partition()), *s); }); }); } @@ -2102,7 +2102,7 @@ SEASTAR_TEST_CASE(test_v2_merging_in_non_evictable_snapshot) { auto to_apply = mutation_partition_v2(s, m2_v2); apply_resume res; - while (result_v2.apply_monotonically(s, std::move(to_apply), no_cache_tracker, app_stats, + while (result_v2.apply_monotonically(s, s, std::move(to_apply), no_cache_tracker, app_stats, [&] () noexcept { return preempt(); }, res, is_evictable::no) == stop_iteration::no) { seastar::thread::maybe_yield(); } @@ -2132,7 +2132,7 @@ SEASTAR_TEST_CASE(test_v2_merging_in_non_evictable_snapshot) { static void clear(cache_tracker& tracker, const schema& s, mutation_partition_v2& p) { while (p.clear_gently(&tracker) == stop_iteration::no) {} - p = mutation_partition_v2(s.shared_from_this()); + p = mutation_partition_v2(s); tracker.insert(p); } @@ -2200,7 +2200,7 @@ SEASTAR_TEST_CASE(test_v2_merging_in_evictable_snapshot) { clear(tracker, s, result_v2); }); apply_resume res; - while (result_v2.apply_monotonically(s, std::move(m2_v2), &tracker, app_stats, is_preemptible::yes, res, + while (result_v2.apply_monotonically(s, s, std::move(m2_v2), &tracker, app_stats, default_preemption_check(), res, is_evictable::yes) == stop_iteration::no) { seastar::thread::maybe_yield(); } @@ -2285,7 +2285,7 @@ SEASTAR_TEST_CASE(test_continuity_merging_past_last_entry_in_evictable) { mutation_application_stats app_stats; apply_resume resume; - m1_v2.apply_monotonically(s, std::move(m2_v2), nullptr, app_stats, is_preemptible::no, resume, + m1_v2.apply_monotonically(s, s, std::move(m2_v2), nullptr, app_stats, never_preempt(), resume, is_evictable::yes); BOOST_REQUIRE(m1_v2.is_fully_continuous()); @@ -2306,7 +2306,7 @@ SEASTAR_TEST_CASE(test_continuity_merging_past_last_entry_in_evictable) { // m2_v2: --------------- 5 ==(rt)== 7 [rt] --- // m1_v2: === 1 === 3 ========================= - m1_v2.apply_monotonically(s, std::move(m2_v2), nullptr, app_stats, is_preemptible::no, resume, is_evictable::yes); + m1_v2.apply_monotonically(s, s, std::move(m2_v2), nullptr, app_stats, never_preempt(), resume, is_evictable::yes); BOOST_REQUIRE(m1_v2.is_fully_continuous()); assert_that(ss.schema(), m1_v2).is_equal_to_compacted(s, (m1 + m2).partition()); diff --git a/test/boost/mvcc_test.cc b/test/boost/mvcc_test.cc index a4cf61c547..e84a536d37 100644 --- a/test/boost/mvcc_test.cc +++ b/test/boost/mvcc_test.cc @@ -41,7 +41,7 @@ static thread_local mutation_application_stats app_stats_for_tests; // The cursor must be pointing at a row and valid. // The cursor will not be pointing at a row after this. static mutation_partition read_partition_from(const schema& schema, partition_snapshot_row_cursor& cur) { - mutation_partition p(schema.shared_from_this()); + mutation_partition p(schema); position_in_partition prev = position_in_partition::before_all_clustered_rows(); do { testlog.trace("cur: {}", cur); @@ -170,14 +170,14 @@ public: void upgrade(schema_ptr new_schema) { _container.allocate_in_region([&] { - _e.upgrade(_s, new_schema, _container.cleaner(), _container.tracker()); + _e.upgrade(_container.region(), new_schema, _container.cleaner(), _container.tracker()); _s = new_schema; }); } partition_snapshot_ptr read() { return _container.allocate_in_region([&] { - return _e.read(region(), _container.cleaner(), schema(), _container.tracker(), _container.phase()); + return _e.read(region(), _container.cleaner(), _container.tracker(), _container.phase()); }); } @@ -194,7 +194,7 @@ void mvcc_partition::apply_to_evictable(partition_entry&& src, schema_ptr src_sc mutation_cleaner src_cleaner(region(), no_cache_tracker, app_stats_for_tests); auto c = as(region(), [&] { if (_s != src_schema) { - src.upgrade(src_schema, _s, src_cleaner, no_cache_tracker); + src.upgrade(region(), _s, src_cleaner, no_cache_tracker); } return _e.apply_to_incomplete(*schema(), std::move(src), src_cleaner, as, region(), *_container.tracker(), _container.next_phase(), _container.accounter()); @@ -221,7 +221,7 @@ mvcc_partition& mvcc_partition::operator+=(const mutation& m) { void mvcc_partition::apply(const mutation_partition& mp, schema_ptr mp_s) { with_allocator(region().allocator(), [&] { if (_evictable) { - apply_to_evictable(partition_entry(mutation_partition_v2(*mp_s, mp)), mp_s); + apply_to_evictable(partition_entry(*mp_s, mutation_partition_v2(*mp_s, mp)), mp_s); } else { logalloc::allocating_section as; as(region(), [&] { @@ -249,7 +249,7 @@ mvcc_partition mvcc_container::make_not_evictable(const mutation_partition& mp) return with_allocator(region().allocator(), [&] { logalloc::allocating_section as; return as(region(), [&] { - return mvcc_partition(_schema, partition_entry(mutation_partition_v2(*_schema, mp)), *this, false); + return mvcc_partition(_schema, partition_entry(*_schema, mutation_partition_v2(*_schema, mp)), *this, false); }); }); } @@ -304,7 +304,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete) { assert_that(table.schema(), e.squashed()).is_equal_to((m2 + m3).partition()); // Check that snapshot data is not stolen when its entry is applied - auto e2 = ms.make_evictable(mutation_partition(table.schema())); + auto e2 = ms.make_evictable(mutation_partition(s)); e2 += std::move(e); assert_that(table.schema(), ms.squashed(snap1)).is_equal_to(m1.partition()); assert_that(table.schema(), e2.squashed()).is_equal_to((m2 + m3).partition()); @@ -381,7 +381,7 @@ SEASTAR_TEST_CASE(test_eviction_with_active_reader) { auto ck1 = table.make_ckey(1); auto ck2 = table.make_ckey(2); - auto e = ms.make_evictable(mutation_partition(table.schema())); + auto e = ms.make_evictable(mutation_partition(s)); mutation m1(table.schema(), pk); m1.partition().clustered_row(s, ck2); @@ -452,7 +452,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_respects_continuity) { } auto expected = mutation_partition(*s, before); - expected.apply_weak(*s, std::move(expected_to_apply_slice), app_stats); + expected.apply(*s, std::move(expected_to_apply_slice), app_stats); e += to_apply; @@ -505,7 +505,7 @@ void evict_with_consistency_check(mvcc_container& ms, mvcc_partition& e, const m testlog.trace("evicting"); auto ret = ms.tracker()->evict_from_lru_shallow(); - testlog.trace("entry: {}", partition_entry::printer(s, e.entry())); + testlog.trace("entry: {}", partition_entry::printer(e.entry())); auto p = e.squashed(); auto cont = p.get_continuity(s); @@ -553,7 +553,7 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging) { auto snap2 = e.read(); e += m3; - testlog.trace("e: {}", partition_entry::printer(*e.schema(), e.entry())); + testlog.trace("e: {}", partition_entry::printer(e.entry())); auto expected = e.squashed(); auto snap = e.read(); @@ -592,14 +592,14 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging_for_nonevictab { mutation_application_stats app_stats; logalloc::reclaim_lock rl(r); - auto e = partition_entry(mutation_partition_v2(*s, m3.partition())); - auto snap1 = e.read(r, cleaner, s, no_cache_tracker); + auto e = partition_entry(*s, mutation_partition_v2(*s, m3.partition())); + auto snap1 = e.read(r, cleaner, no_cache_tracker); e.apply(r, cleaner, *s, m2.partition(), *s, app_stats); - auto snap2 = e.read(r, cleaner, s, no_cache_tracker); + auto snap2 = e.read(r, cleaner, no_cache_tracker); e.apply(r, cleaner, *s, m1.partition(), *s, app_stats); auto expected = e.squashed(*s, is_evictable::no); - auto snap = e.read(r, cleaner, s, no_cache_tracker); + auto snap = e.read(r, cleaner, no_cache_tracker); auto actual = read_using_cursor(*snap); BOOST_REQUIRE(expected.is_fully_continuous()); @@ -636,7 +636,7 @@ SEASTAR_TEST_CASE(test_continuity_merging_in_evictable) { { logalloc::reclaim_lock rl(r); auto e = partition_entry::make_evictable(*s, m1.partition()); - auto snap1 = e.read(r, tracker.cleaner(), s, &tracker); + auto snap1 = e.read(r, tracker.cleaner(), &tracker); e.add_version(*s, &tracker).partition() .clustered_row(*s, ss.make_ckey(1), is_dummy::no, is_continuous::no); e.add_version(*s, &tracker).partition() @@ -646,7 +646,7 @@ SEASTAR_TEST_CASE(test_continuity_merging_in_evictable) { expected.clustered_row(*s, ss.make_ckey(1), is_dummy::no, is_continuous::no); expected.clustered_row(*s, ss.make_ckey(2), is_dummy::no, is_continuous::no); - auto snap = e.read(r, tracker.cleaner(), s, &tracker); + auto snap = e.read(r, tracker.cleaner(), &tracker); auto actual = read_using_cursor(*snap); auto actual2 = e.squashed(*s, is_evictable::yes); @@ -670,8 +670,8 @@ SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor) { simple_schema table; auto&& s = *table.schema(); - auto e = partition_entry::make_evictable(s, mutation_partition(table.schema())); - auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker); + auto e = partition_entry::make_evictable(s, mutation_partition(s)); + auto snap1 = e.read(r, tracker.cleaner(), &tracker); { auto&& p1 = snap1->version()->partition(); @@ -683,7 +683,7 @@ SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor) { p1.ensure_last_dummy(s); } - auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1); + auto snap2 = e.read(r, tracker.cleaner(), &tracker, 1); partition_snapshot_row_cursor cur(s, *snap2); position_in_partition::equal_compare eq(s); @@ -841,8 +841,8 @@ SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor_reversed) { simple_schema table; auto&& s = *table.schema(); - auto e = partition_entry::make_evictable(s, mutation_partition(table.schema())); - auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker); + auto e = partition_entry::make_evictable(s, mutation_partition(s)); + auto snap1 = e.read(r, tracker.cleaner(), &tracker); int ck_0 = 10; int ck_1 = 9; @@ -862,7 +862,7 @@ SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor_reversed) { p1.ensure_last_dummy(s); } - auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1); + auto snap2 = e.read(r, tracker.cleaner(), &tracker, 1); auto rev_s = s.make_reversed(); partition_snapshot_row_cursor cur(*rev_s, *snap2, false, true); @@ -1032,9 +1032,9 @@ SEASTAR_TEST_CASE(test_cursor_tracks_continuity_in_reversed_mode) { simple_schema table; auto&& s = *table.schema(); - auto e = partition_entry::make_evictable(s, mutation_partition(table.schema())); + auto e = partition_entry::make_evictable(s, mutation_partition(s)); tracker.insert(e); - auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker); + auto snap1 = e.read(r, tracker.cleaner(), &tracker); { auto&& p1 = snap1->version()->partition(); @@ -1044,7 +1044,7 @@ SEASTAR_TEST_CASE(test_cursor_tracks_continuity_in_reversed_mode) { p1.clustered_rows_entry(s, table.make_ckey(4), is_dummy::no, is_continuous::no)); } - auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1); + auto snap2 = e.read(r, tracker.cleaner(), &tracker, 1); { auto&& p2 = snap2->version()->partition(); @@ -1173,7 +1173,7 @@ public: , _tracker(t) , _r(r) , _e(_tracker ? partition_entry::make_evictable(*_schema, mutation_partition::make_incomplete(*_schema)) - : partition_entry(mutation_partition_v2(_schema))) + : partition_entry(*_schema, mutation_partition_v2(*_schema))) { if (_tracker) { _tracker->insert(_e); @@ -1181,7 +1181,7 @@ public: } partition_entry_builder& new_version() { - _snapshots.emplace_back(_e.read(_r, _cleaner, _schema, _tracker, _snapshots.size())); + _snapshots.emplace_back(_e.read(_r, _cleaner, _tracker, _snapshots.size())); _last_key = {}; return *this; } @@ -1537,8 +1537,8 @@ SEASTAR_TEST_CASE(test_ensure_entry_in_latest_in_reversed_mode) { simple_schema table; auto&& s = *table.schema(); - auto e = partition_entry::make_evictable(s, mutation_partition(table.schema())); - auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker); + auto e = partition_entry::make_evictable(s, mutation_partition(s)); + auto snap1 = e.read(r, tracker.cleaner(), &tracker); { auto&& p1 = snap1->version()->partition(); @@ -1547,7 +1547,7 @@ SEASTAR_TEST_CASE(test_ensure_entry_in_latest_in_reversed_mode) { p1.ensure_last_dummy(s); } - auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1); + auto snap2 = e.read(r, tracker.cleaner(), &tracker, 1); { auto&& p2 = snap2->version()->partition(); @@ -1593,8 +1593,8 @@ SEASTAR_TEST_CASE(test_ensure_entry_in_latest_does_not_set_continuity_in_reverse simple_schema table; auto&& s = *table.schema(); - auto e = partition_entry::make_evictable(s, mutation_partition(table.schema())); - auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker); + auto e = partition_entry::make_evictable(s, mutation_partition(s)); + auto snap1 = e.read(r, tracker.cleaner(), &tracker); { auto&& p1 = snap1->version()->partition(); @@ -1604,7 +1604,7 @@ SEASTAR_TEST_CASE(test_ensure_entry_in_latest_does_not_set_continuity_in_reverse p1.ensure_last_dummy(s); } - auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1); + auto snap2 = e.read(r, tracker.cleaner(), &tracker, 1); { auto&& p2 = snap2->version()->partition(); @@ -1654,7 +1654,7 @@ SEASTAR_TEST_CASE(test_apply_is_atomic) { while (true) { logalloc::reclaim_lock rl(r); mutation_partition_v2 m2 = mutation_partition_v2(*second.schema(), second.partition()); - auto e = partition_entry(mutation_partition_v2(*target.schema(), target.partition())); + auto e = partition_entry(*target.schema(), mutation_partition_v2(*target.schema(), target.partition())); //auto snap1 = e.read(r, gen.schema()); alloc.fail_after(fail_offset++); @@ -1703,8 +1703,8 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) { m3.partition().make_fully_continuous(); { - auto e = partition_entry(mutation_partition_v2(*s, m1.partition())); - auto snap1 = e.read(r, cleaner, s, nullptr); + auto e = partition_entry(*s, mutation_partition_v2(*s, m1.partition())); + auto snap1 = e.read(r, cleaner, nullptr); { mutation_application_stats app_stats; @@ -1712,7 +1712,7 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) { e.apply(r, cleaner, *s, m2.partition(), *s, app_stats); } - auto snap2 = e.read(r, cleaner, s, nullptr); + auto snap2 = e.read(r, cleaner, nullptr); snap1 = {}; snap2 = {}; @@ -1724,8 +1724,8 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) { } { - auto e = partition_entry(mutation_partition_v2(*s, m1.partition())); - auto snap1 = e.read(r, cleaner, s, nullptr); + auto e = partition_entry(*s, mutation_partition_v2(*s, m1.partition())); + auto snap1 = e.read(r, cleaner, nullptr); { mutation_application_stats app_stats; @@ -1733,7 +1733,7 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) { e.apply(r, cleaner, *s, m2.partition(), *s, app_stats); } - auto snap2 = e.read(r, cleaner, s, nullptr); + auto snap2 = e.read(r, cleaner, nullptr); snap2 = {}; snap1 = {}; @@ -2005,7 +2005,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_with_dummies) { } }); - testlog.trace("entry @{}: {}", __LINE__, partition_entry::printer(*s, e.entry())); + testlog.trace("entry @{}: {}", __LINE__, partition_entry::printer(e.entry())); mutation m2(s, ss.make_pkey()); // This one covers the dummy row for before(3) and before(2), marking the range [1, 3] as continuous. @@ -2020,7 +2020,7 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_with_dummies) { e += m3; auto snp3 = e.read(); - testlog.trace("entry @{}: {}", __LINE__, partition_entry::printer(*s, e.entry())); + testlog.trace("entry @{}: {}", __LINE__, partition_entry::printer(e.entry())); auto expected = m0 + m1 + m2 + m3; @@ -2033,3 +2033,104 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_with_dummies) { evict_with_consistency_check(ms, e, expected.partition()); }); } + +SEASTAR_TEST_CASE(test_gentle_schema_upgrades) { + return seastar::async([] { + auto ts1 = api::new_timestamp(); + auto ts_drop = api::new_timestamp(); + auto ts2 = api::new_timestamp(); + + auto s1 = schema_builder("ks", "cf") + .with_column("pk", utf8_type, column_kind::partition_key) + .with_column("ck", utf8_type, column_kind::clustering_key) + .with_column("s1", utf8_type, column_kind::static_column) + .with_column("s2", utf8_type, column_kind::static_column) + .with_column("v1", utf8_type, column_kind::regular_column) + .with_column("v2", utf8_type, column_kind::regular_column) + .with_column("v3", utf8_type, column_kind::regular_column) + .with_column("v4", utf8_type, column_kind::regular_column) + .build(); + auto s2 = schema_builder(s1) + .remove_column("s1") + .remove_column("v3") + .without_column("v4", ts_drop).with_column("v4", utf8_type) + .with_column("v5", utf8_type) + .build(); + + auto m1 = std::invoke([s1, ts1] { + auto x = mutation(s1, partition_key::from_single_value(*s1, serialized(0))); + auto ck = clustering_key::from_single_value(*s1, serialized(0)); + x.set_static_cell("s1", "s1_value", ts1); + x.set_static_cell("s2", "s2_value", ts1); + x.set_clustered_cell(ck, "v1", "v1_value", ts1); + x.set_clustered_cell(ck, "v2", "v2_value", ts1); + x.set_clustered_cell(ck, "v3", "v3_value", ts1); + x.set_clustered_cell(ck, "v4", "v4_value", ts1); + x.partition().set_static_row_continuous(false); + x.partition().ensure_last_dummy(*s1); + return x; + }); + + auto m2 = std::invoke([s2, ts2] { + auto x = mutation(s2, partition_key::from_single_value(*s2, serialized(0))); + auto ck = clustering_key::from_single_value(*s2, serialized(0)); + x.set_clustered_cell(ck, "v2", "v2_value_new", ts2); + x.set_clustered_cell(ck, "v5", "v5_value_new", ts2); + x.partition().set_static_row_continuous(false); + x.partition().ensure_last_dummy(*s2); + return x; + }); + + auto expected = std::invoke([s2, ts1, ts2] { + auto x = mutation(s2, partition_key::from_single_value(*s2, serialized(0))); + auto ck = clustering_key::from_single_value(*s2, serialized(0)); + x.set_static_cell("s2", "s2_value", ts1); + x.set_clustered_cell(ck, "v1", "v1_value", ts1); + x.set_clustered_cell(ck, "v2", "v2_value_new", ts2); + x.set_clustered_cell(ck, "v5", "v5_value_new", ts2); + x.partition().set_static_row_continuous(false); + x.partition().ensure_last_dummy(*s2); + return x; + }); + + { + // Test that the version merge is lazy. + // (This is not important and might be changed in the future. + // We often run some operations synchronously and only put them + // in the background after they preempt for the first time.) + mvcc_container ms(s1); + auto e = ms.make_evictable(m1.partition()); + e.upgrade(s2); + BOOST_REQUIRE(e.entry().version()->next()); + // Test that the upgrade initiated the merge. + ms.cleaner().drain().get(); + BOOST_REQUIRE(!e.entry().version()->next()); + } + { + // Test that the on-the-fly merge gives the expected result. + mvcc_container ms(s1); + auto e = ms.make_evictable(m1.partition()); + auto rd1 = e.read(); + e.upgrade(s2); + auto rd2 = e.read(); + e += m2; + auto rd3 = e.read(); + + assert_that(s1, read_using_cursor(*rd1)).is_equal_to(*s1, m1.partition()); + + auto rd2_expected = mutation_partition(*s1, m1.partition()); + rd2_expected.upgrade(*s1, *s2); + assert_that(s2, read_using_cursor(*rd2)).is_equal_to(rd2_expected); + + assert_that(s2, read_using_cursor(*rd3)).is_equal_to(*s2, expected.partition()); + + rd1 = {}; + rd2 = {}; + // Merge versions. + ms.cleaner().drain().get(); + BOOST_REQUIRE(!e.entry().version()->next()); + // Test that the background merge gives the expected result. + assert_that(s2, read_using_cursor(*rd3)).is_equal_to(*s2, expected.partition()); + } + }); +} diff --git a/test/boost/row_cache_test.cc b/test/boost/row_cache_test.cc index 3e16505437..77f1c6ffb0 100644 --- a/test/boost/row_cache_test.cc +++ b/test/boost/row_cache_test.cc @@ -950,6 +950,7 @@ SEASTAR_TEST_CASE(test_eviction_after_schema_change) { rd.fill_buffer().get(); } + tracker.cleaner().drain().get0(); while (tracker.region().evict_some() == memory::reclaiming_result::reclaimed_something) ; // The partition should be evictable after schema change @@ -3508,6 +3509,16 @@ SEASTAR_TEST_CASE(test_concurrent_reads_and_eviction) { auto m2 = gen(); m2.partition().make_fully_continuous(); + bool upgrade_schema = tests::random::get_bool(); + if (upgrade_schema) { + schema_ptr new_schema = schema_builder(s) + .with_column(to_bytes("_phantom"), byte_type) + .remove_column("_phantom") + .build(); + m2.upgrade(new_schema); + cache.set_schema(new_schema); + } + auto mt = make_lw_shared(m2.schema()); mt->apply(m2); cache.update(row_cache::external_updater([&] () noexcept {