db: Introduce mutation_partition_v2

Intended to be used in memtable/cache, as opposed to the old
mutation_partition which will be intended to be used as temporary
object.

The two will have different trade-offs regarding memory efficiency and
algorithms.

In this commit there is no change in logic, the class is mostly
copied. Some methods which are not needed on the v2 model were removed
from the interface.

Logic changes will be introduced in later commits.
This commit is contained in:
Tomasz Grabiec
2022-06-02 17:14:50 +02:00
parent 806f698272
commit 7e6056b3cc
7 changed files with 1358 additions and 17 deletions

View File

@@ -692,6 +692,7 @@ scylla_core = (['message/messaging_service.cc',
'utils/human_readable.cc',
'utils/histogram_metrics_helper.cc',
'mutation_partition.cc',
'mutation_partition_v2.cc',
'mutation_partition_view.cc',
'mutation_partition_serializer.cc',
'converting_mutation_partition_applier.cc',

View File

@@ -32,6 +32,7 @@
#include "clustering_key_filter.hh"
#include "mutation_partition_view.hh"
#include "tombstone_gc.hh"
#include "utils/unconst.hh"
logging::logger mplog("mutation_partition");
@@ -741,21 +742,6 @@ mutation_partition::range(const schema& schema, const query::clustering_range& r
return boost::make_iterator_range(lower_bound(schema, r), upper_bound(schema, r));
}
template <typename Container>
boost::iterator_range<typename Container::iterator>
unconst(Container& c, boost::iterator_range<typename Container::const_iterator> r) {
return boost::make_iterator_range(
c.erase(r.begin(), r.begin()),
c.erase(r.end(), r.end())
);
}
template <typename Container>
typename Container::iterator
unconst(Container& c, typename Container::const_iterator i) {
return c.erase(i, i);
}
boost::iterator_range<mutation_partition::rows_type::iterator>
mutation_partition::range(const schema& schema, const query::clustering_range& r) {
return unconst(_rows, static_cast<const mutation_partition*>(this)->range(schema, r));
@@ -979,8 +965,7 @@ static void get_compacted_row_slice(const schema& s,
}
}
bool has_any_live_data(const schema& s, column_kind kind, const row& cells, tombstone tomb = tombstone(),
gc_clock::time_point now = gc_clock::time_point::min()) {
bool has_any_live_data(const schema& s, column_kind kind, const row& cells, tombstone tomb, gc_clock::time_point now) {
bool any_live = false;
cells.for_each_cell_until([&] (column_id id, const atomic_cell_or_collection& cell_or_collection) {
const column_definition& def = s.column_at(kind, id);

View File

@@ -1449,3 +1449,6 @@ inline
mutation_partition& mutation_partition::container_of(rows_type& rows) {
return *boost::intrusive::get_parent_from_member(&rows, &mutation_partition::_rows);
}
bool has_any_live_data(const schema& s, column_kind kind, const row& cells, tombstone tomb = tombstone(),
gc_clock::time_point now = gc_clock::time_point::min());

1015
mutation_partition_v2.cc Normal file

File diff suppressed because it is too large Load Diff

310
mutation_partition_v2.hh Normal file
View File

@@ -0,0 +1,310 @@
/*
* Copyright (C) 2014-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <iosfwd>
#include <map>
#include <boost/intrusive/set.hpp>
#include <boost/range/iterator_range.hpp>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/intrusive/parent_from_member.hpp>
#include <seastar/core/bitset-iter.hh>
#include <seastar/util/optimized_optional.hh>
#include "mutation_partition.hh"
// Represents a set of writes made to a single partition.
//
// Like mutation_partition, but intended to be used in cache/memtable
// so the tradeoffs are different. This representation must be memory-efficient
// and must support incremental eviction of its contents. It is used in MVCC so
// algorithms for merging must respect MVCC invariants. See docs/dev/mvcc.md.
//
// The object is schema-dependent. Each instance is governed by some
// specific schema version. Accessors require a reference to the schema object
// of that version.
//
// There is an operation of addition defined on mutation_partition objects
// (also called "apply"), which gives as a result an object representing the
// sum of writes contained in the addends. For instances governed by the same
// schema, addition is commutative and associative.
//
// In addition to representing writes, the object supports specifying a set of
// partition elements called "continuity". This set can be used to represent
// lack of information about certain parts of the partition. It can be
// specified which ranges of clustering keys belong to that set. We say that a
// key range is continuous if all keys in that range belong to the continuity
// set, and discontinuous otherwise. By default everything is continuous.
// The static row may be also continuous or not.
// Partition tombstone is always continuous.
//
// Continuity is ignored by instance equality. It's also transient, not
// preserved by serialization.
//
// Continuity is represented internally using flags on row entries. The key
// range between two consecutive entries (both ends exclusive) is continuous
// if and only if rows_entry::continuous() is true for the later entry. The
// range starting after the last entry is assumed to be continuous. The range
// corresponding to the key of the entry is continuous if and only if
// rows_entry::dummy() is false.
//
// Adding two fully-continuous instances gives a fully-continuous instance.
// Continuity doesn't affect how the write part is added.
//
// Addition of continuity is not commutative in general, but is associative.
// The default continuity merging rules are those required by MVCC to
// preserve its invariants. For details, refer to "Continuity merging rules" section
// in the doc in partition_version.hh.
class mutation_partition_v2 final {
public:
using rows_type = rows_entry::container_type;
friend class size_calculator;
private:
tombstone _tombstone;
lazy_row _static_row;
bool _static_row_continuous = true;
rows_type _rows;
// Contains only strict prefixes so that we don't have to lookup full keys
// in both _row_tombstones and _rows.
range_tombstone_list _row_tombstones;
#ifdef SEASTAR_DEBUG
table_schema_version _schema_version;
#endif
friend class converting_mutation_partition_applier;
public:
struct copy_comparators_only {};
struct incomplete_tag {};
// Constructs an empty instance which is fully discontinuous except for the partition tombstone.
mutation_partition_v2(incomplete_tag, const schema& s, tombstone);
static mutation_partition_v2 make_incomplete(const schema& s, tombstone t = {}) {
return mutation_partition_v2(incomplete_tag(), s, t);
}
mutation_partition_v2(schema_ptr s)
: _rows()
, _row_tombstones(*s)
#ifdef SEASTAR_DEBUG
, _schema_version(s->version())
#endif
{ }
mutation_partition_v2(mutation_partition_v2& other, copy_comparators_only)
: _rows()
, _row_tombstones(other._row_tombstones, range_tombstone_list::copy_comparator_only())
#ifdef SEASTAR_DEBUG
, _schema_version(other._schema_version)
#endif
{ }
mutation_partition_v2(mutation_partition_v2&&) = default;
mutation_partition_v2(const schema& s, mutation_partition&&);
mutation_partition_v2(const schema& s, const mutation_partition_v2&);
mutation_partition_v2(const schema& s, const mutation_partition&);
mutation_partition_v2(const mutation_partition_v2&, const schema&, query::clustering_key_filter_ranges);
mutation_partition_v2(mutation_partition_v2&&, const schema&, query::clustering_key_filter_ranges);
~mutation_partition_v2();
static mutation_partition_v2& container_of(rows_type&);
mutation_partition_v2& operator=(mutation_partition_v2&& x) noexcept;
bool equal(const schema&, const mutation_partition_v2&) const;
bool equal(const schema& this_schema, const mutation_partition_v2& p, const schema& p_schema) const;
bool equal_continuity(const schema&, const mutation_partition_v2&) const;
// Consistent with equal()
template<typename Hasher>
void feed_hash(Hasher& h, const schema& s) const {
hashing_partition_visitor<Hasher> v(h, s);
accept(s, v);
}
class printer {
const schema& _schema;
const mutation_partition_v2& _mutation_partition;
public:
printer(const schema& s, const mutation_partition_v2& mp) : _schema(s), _mutation_partition(mp) { }
printer(const printer&) = delete;
printer(printer&&) = delete;
friend std::ostream& operator<<(std::ostream& os, const printer& p);
};
friend std::ostream& operator<<(std::ostream& os, const printer& p);
public:
// Makes sure there is a dummy entry after all clustered rows. Doesn't affect continuity.
// Doesn't invalidate iterators.
void ensure_last_dummy(const schema&);
bool static_row_continuous() const { return _static_row_continuous; }
void set_static_row_continuous(bool value) { _static_row_continuous = value; }
bool is_fully_continuous() const;
void make_fully_continuous();
// Sets or clears continuity of clustering ranges between existing rows.
void set_continuity(const schema&, const position_range& pr, is_continuous);
// Returns clustering row ranges which have continuity matching the is_continuous argument.
clustering_interval_set get_continuity(const schema&, is_continuous = is_continuous::yes) const;
// Returns true iff all keys from given range are marked as continuous, or range is empty.
bool fully_continuous(const schema&, const position_range&);
// Returns true iff all keys from given range are marked as not continuous and range is not empty.
bool fully_discontinuous(const schema&, const position_range&);
// Returns true iff all keys from given range have continuity membership as specified by is_continuous.
bool check_continuity(const schema&, const position_range&, is_continuous) const;
// Frees elements of the partition in batches.
// Returns stop_iteration::yes iff there are no more elements to free.
// Continuity is unspecified after this.
stop_iteration clear_gently(cache_tracker*) noexcept;
// Applies mutation_fragment.
// The fragment must be goverened by the same schema as this object.
void apply(tombstone t) { _tombstone.apply(t); }
void apply_delete(const schema& schema, const clustering_key_prefix& prefix, tombstone t);
void apply_delete(const schema& schema, range_tombstone rt);
void apply_delete(const schema& schema, clustering_key_prefix&& prefix, tombstone t);
void apply_delete(const schema& schema, clustering_key_prefix_view prefix, tombstone t);
// Equivalent to applying a mutation with an empty row, created with given timestamp
void apply_insert(const schema& s, clustering_key_view, api::timestamp_type created_at);
void apply_insert(const schema& s, clustering_key_view, api::timestamp_type created_at,
gc_clock::duration ttl, gc_clock::time_point expiry);
// prefix must not be full
void apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t);
void apply_row_tombstone(const schema& schema, range_tombstone rt);
//
// Applies p to current object.
//
// Commutative when this_schema == p_schema. If schemas differ, data in p which
// is not representable in this_schema is dropped, thus apply() loses commutativity.
//
// Weak exception guarantees.
void apply(const schema& this_schema, const mutation_partition_v2& p, const schema& p_schema,
mutation_application_stats& app_stats);
// Use in case this instance and p share the same schema.
// Same guarantees as apply(const schema&, mutation_partition_v2&&, const schema&);
void apply(const schema& s, mutation_partition_v2&& p, mutation_application_stats& app_stats);
// Same guarantees and constraints as for apply(const schema&, const mutation_partition_v2&, const schema&).
void apply(const schema& this_schema, mutation_partition_view p, const schema& p_schema,
mutation_application_stats& app_stats);
// Applies p to this instance.
//
// Monotonic exception guarantees. In case of exception the sum of p and this remains the same as before the exception.
// This instance and p are governed by the same schema.
//
// Must be provided with a pointer to the cache_tracker, which owns both this and p.
//
// Returns stop_iteration::no if the operation was preempted before finished, and stop_iteration::yes otherwise.
// On preemption the sum of this and p stays the same (represents the same set of writes), and the state of this
// object contains at least all the writes it contained before the call (monotonicity). It may contain partial writes.
// Also, some progress is always guaranteed (liveness).
//
// The operation can be driven to completion like this:
//
// apply_resume res;
// while (apply_monotonically(..., is_preemtable::yes, &res) == stop_iteration::no) { }
//
// If is_preemptible::no is passed as argument then stop_iteration::no is never returned.
//
// If is_preemptible::yes is passed, apply_resume must also be passed,
// same instance each time until stop_iteration::yes is returned.
stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker*,
mutation_application_stats& app_stats, is_preemptible, apply_resume&);
stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, const schema& p_schema,
mutation_application_stats& app_stats, is_preemptible, apply_resume&);
stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, cache_tracker* tracker,
mutation_application_stats& app_stats);
stop_iteration apply_monotonically(const schema& s, mutation_partition_v2&& p, const schema& p_schema,
mutation_application_stats& app_stats);
// Weak exception guarantees.
// Assumes this and p are not owned by a cache_tracker.
void apply_weak(const schema& s, const mutation_partition& p, const schema& p_schema,
mutation_application_stats& app_stats);
void apply_weak(const schema& s, mutation_partition&&,
mutation_application_stats& app_stats);
void apply_weak(const schema& s, mutation_partition_view p, const schema& p_schema,
mutation_application_stats& app_stats);
// Converts partition to the new schema. When succeeds the partition should only be accessed
// using the new schema.
//
// Strong exception guarantees.
void upgrade(const schema& old_schema, const schema& new_schema);
private:
void insert_row(const schema& s, const clustering_key& key, deletable_row&& row);
void insert_row(const schema& s, const clustering_key& key, const deletable_row& row);
public:
// Returns true if the mutation_partition_v2 represents no writes.
bool empty() const;
public:
deletable_row& clustered_row(const schema& s, const clustering_key& key);
deletable_row& clustered_row(const schema& s, clustering_key&& key);
deletable_row& clustered_row(const schema& s, clustering_key_view key);
deletable_row& clustered_row(const schema& s, position_in_partition_view pos, is_dummy, is_continuous);
rows_entry& clustered_rows_entry(const schema& s, position_in_partition_view pos, is_dummy, is_continuous);
// Throws if the row already exists or if the row was not inserted to the
// last position (one or more greater row already exists).
// Weak exception guarantees.
deletable_row& append_clustered_row(const schema& s, position_in_partition_view pos, is_dummy, is_continuous);
public:
tombstone partition_tombstone() const { return _tombstone; }
lazy_row& static_row() { return _static_row; }
const lazy_row& static_row() const { return _static_row; }
// return a set of rows_entry where each entry represents a CQL row sharing the same clustering key.
const rows_type& clustered_rows() const noexcept { return _rows; }
utils::immutable_collection<rows_type> clustered_rows() noexcept { return _rows; }
rows_type& mutable_clustered_rows() noexcept { return _rows; }
const range_tombstone_list& row_tombstones() const noexcept { return _row_tombstones; }
utils::immutable_collection<range_tombstone_list> row_tombstones() noexcept { return _row_tombstones; }
range_tombstone_list& mutable_row_tombstones() noexcept { return _row_tombstones; }
const row* find_row(const schema& s, const clustering_key& key) const;
tombstone range_tombstone_for_row(const schema& schema, const clustering_key& key) const;
row_tombstone tombstone_for_row(const schema& schema, const clustering_key& key) const;
// Can be called only for non-dummy entries
row_tombstone tombstone_for_row(const schema& schema, const rows_entry& e) const;
boost::iterator_range<rows_type::const_iterator> range(const schema& schema, const query::clustering_range& r) const;
rows_type::const_iterator lower_bound(const schema& schema, const query::clustering_range& r) const;
rows_type::const_iterator upper_bound(const schema& schema, const query::clustering_range& r) const;
rows_type::iterator lower_bound(const schema& schema, const query::clustering_range& r);
rows_type::iterator upper_bound(const schema& schema, const query::clustering_range& r);
boost::iterator_range<rows_type::iterator> range(const schema& schema, const query::clustering_range& r);
// Returns an iterator range of rows_entry, with only non-dummy entries.
auto non_dummy_rows() const {
return boost::make_iterator_range(_rows.begin(), _rows.end())
| boost::adaptors::filtered([] (const rows_entry& e) { return bool(!e.dummy()); });
}
void accept(const schema&, mutation_partition_visitor&) const;
// Returns the number of live CQL rows in this partition.
//
// Note: If no regular rows are live, but there's something live in the
// static row, the static row counts as one row. If there is at least one
// regular row live, static row doesn't count.
//
uint64_t live_row_count(const schema&,
gc_clock::time_point query_time = gc_clock::time_point::min()) const;
bool is_static_row_live(const schema&,
gc_clock::time_point query_time = gc_clock::time_point::min()) const;
uint64_t row_count() const;
size_t external_memory_usage(const schema&) const;
private:
template<typename Func>
void for_each_row(const schema& schema, const query::clustering_range& row_range, bool reversed, Func&& func) const;
friend class counter_write_query_result_builder;
void check_schema(const schema& s) const {
#ifdef SEASTAR_DEBUG
assert(s.version() == _schema_version);
#endif
}
};
inline
mutation_partition_v2& mutation_partition_v2::container_of(rows_type& rows) {
return *boost::intrusive::get_parent_from_member(&rows, &mutation_partition_v2::_rows);
}

View File

@@ -9,6 +9,7 @@
#pragma once
#include "mutation_partition.hh"
#include "mutation_partition_v2.hh"
#include "utils/anchorless_list.hh"
#include "utils/logalloc.hh"
#include "utils/coroutine.hh"

26
utils/unconst.hh Normal file
View File

@@ -0,0 +1,26 @@
/*
* Copyright (C) 2014-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <boost/range/iterator_range.hpp>
template <typename Container>
boost::iterator_range<typename Container::iterator>
unconst(Container& c, boost::iterator_range<typename Container::const_iterator> r) {
return boost::make_iterator_range(
c.erase(r.begin(), r.begin()),
c.erase(r.end(), r.end())
);
}
template <typename Container>
typename Container::iterator
unconst(Container& c, typename Container::const_iterator i) {
return c.erase(i, i);
}