Files
scylla/replica/memtable.hh
Tomasz Grabiec 51e3b9321b Merge ' mvcc: make schema upgrades gentle' from Michał Chojnowski
After a schema change, memtable and cache have to be upgraded to the new schema. Currently, they are upgraded (on the first access after a schema change) atomically, i.e. all rows of the entry are upgraded with one non-preemptible call. This is a one of the last vestiges of the times when partition were treated atomically, and it is a well known source of numerous large stalls.

This series makes schema upgrades gentle (preemptible). This is done by co-opting the existing MVCC machinery.
Before the series, all partition_versions in the partition_entry chain have the same schema, and an entry upgrade replaces the entire chain with a single squashed and upgraded version.
After the series, each partition_version has its own schema. A partition entry upgrade happens simply by adding an empty version with the new schema to the head of the chain. Row entries are upgraded to the current schema on-the-fly by the cursor during reads, and by the MVCC version merge ongoing in the background after the upgrade.

The series:
1. Does some code cleanup in the mutation_partition area.
2. Adds a schema field to partition_version and removes it from its containers (partition_snapshot, cache_entry, memtable_entry).
3. Adds upgrading variants of constructors and apply() for `row` and its wrappers.
4. Prepares partition_snapshot_row_cursor, mutation_partition_v2::apply_monotonically and partition_snapshot::merge_partition_versions for dealing with heterogeneous version chains.
5. Modifies partition_entry::upgrade to perform upgrades by extending the version chain with a new schema instead of squashing it to a single upgraded version.

Fixes #2577

Closes #13761

* github.com:scylladb/scylladb:
  test: mvcc_test: add a test for gentle schema upgrades
  partition_version: make partition_entry::upgrade() gentle
  partition_version: handle multi-schema snapshots in merge_partition_versions
  mutation_partition_v2: handle schema upgrades in apply_monotonically()
  partition_version: remove the unused "from" argument in partition_entry::upgrade()
  row_cache_test: prepare test_eviction_after_schema_change for gentle schema upgrades
  partition_version: handle multi-schema entries in partition_entry::squashed
  partition_snapshot_row_cursor: handle multi-schema snapshots
  partiton_version: prepare partition_snapshot::squashed() for multi-schema snapshots
  partition_version: prepare partition_snapshot::static_row() for multi-schema snapshots
  partition_version: add a logalloc::region argument to partition_entry::upgrade()
  memtable: propagate the region to memtable_entry::upgrade_schema()
  mutation_partition: add an upgrading variant of lazy_row::apply()
  mutation_partition: add an upgrading variant of rows_entry::rows_entry
  mutation_partition: switch an apply() call to apply_monotonically()
  mutation_partition: add an upgrading variant of rows_entry::apply_monotonically()
  mutation_fragment: add an upgrading variant of clustering_row::apply()
  mutation_partition: add an upgrading variant of row::row
  partition_version: remove _schema from partition_entry::operator<<
  partition_version: remove the schema argument from partition_entry::read()
  memtable: remove _schema from memtable_entry
  row_cache: remove _schema from cache_entry
  partition_version: remove the _schema field from partition_snapshot
  partition_version: add a _schema field to partition_version
  mutation_partition: change schema_ptr to schema& in mutation_partition::difference
  mutation_partition: change schema_ptr to schema& in mutation_partition constructor
  mutation_partition_v2: change schema_ptr to schema& in mutation_partition_v2 constructor
  mutation_partition: add upgrading variants of row::apply()
  partition_version: update the comment to apply_to_incomplete()
  mutation_partition_v2: clean up variants of apply()
  mutation_partition: remove apply_weak()
  mutation_partition_v2: remove a misleading comment in apply_monotonically()
  row_cache_test: add schema changes to test_concurrent_reads_and_eviction
  mutation_partition: fix mixed-schema apply()
2023-05-24 22:58:43 +02:00

298 lines
11 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <map>
#include <memory>
#include <iosfwd>
#include "replica/database_fwd.hh"
#include "dht/i_partitioner.hh"
#include "schema/schema_fwd.hh"
#include "encoding_stats.hh"
#include "dirty_memory_manager.hh"
#include "db/commitlog/replay_position.hh"
#include "db/commitlog/rp_set.hh"
#include "utils/extremum_tracking.hh"
#include "mutation/mutation_cleaner.hh"
#include "sstables/types.hh"
#include "utils/double-decker.hh"
#include "readers/empty_v2.hh"
#include "readers/mutation_source.hh"
class frozen_mutation;
class row_cache;
namespace bi = boost::intrusive;
namespace replica {
class memtable_entry {
dht::decorated_key _key;
partition_entry _pe;
struct {
bool _head : 1;
bool _tail : 1;
bool _train : 1;
} _flags{};
public:
bool is_head() const noexcept { return _flags._head; }
void set_head(bool v) noexcept { _flags._head = v; }
bool is_tail() const noexcept { return _flags._tail; }
void set_tail(bool v) noexcept { _flags._tail = v; }
bool with_train() const noexcept { return _flags._train; }
void set_train(bool v) noexcept { _flags._train = v; }
friend class memtable;
memtable_entry(schema_ptr s, dht::decorated_key key, mutation_partition p)
: _key(std::move(key))
, _pe(*s, std::move(p))
{ }
memtable_entry(memtable_entry&& o) noexcept;
// Frees elements of the entry in batches.
// Returns stop_iteration::yes iff there are no more elements to free.
stop_iteration clear_gently() noexcept;
const dht::decorated_key& key() const { return _key; }
dht::decorated_key& key() { return _key; }
const partition_entry& partition() const { return _pe; }
partition_entry& partition() { return _pe; }
const schema_ptr& schema() const { return _pe.get_schema(); }
partition_snapshot_ptr snapshot(memtable& mtbl);
// Makes the entry conform to given schema.
// Must be called under allocating section of the region which owns the entry.
void upgrade_schema(logalloc::region&, const schema_ptr&, mutation_cleaner&);
size_t external_memory_usage_without_rows() const {
return _key.key().external_memory_usage();
}
size_t object_memory_size(allocation_strategy& allocator);
size_t size_in_allocator_without_rows(allocation_strategy& allocator) {
return object_memory_size(allocator) + external_memory_usage_without_rows();
}
size_t size_in_allocator(allocation_strategy& allocator) {
auto size = size_in_allocator_without_rows(allocator);
for (auto&& v : _pe.versions()) {
size += v.size_in_allocator(allocator);
}
return size;
}
friend dht::ring_position_view ring_position_view_to_compare(const memtable_entry& mt) { return mt._key; }
friend std::ostream& operator<<(std::ostream&, const memtable_entry&);
};
}
namespace replica {
class dirty_memory_manager;
struct table_stats;
// Managed by lw_shared_ptr<>.
class memtable final : public enable_lw_shared_from_this<memtable>, private dirty_memory_manager_logalloc::size_tracked_region {
public:
using partitions_type = double_decker<int64_t, memtable_entry,
dht::raw_token_less_comparator, dht::ring_position_comparator,
16, bplus::key_search::linear>;
private:
dirty_memory_manager& _dirty_mgr;
mutation_cleaner _cleaner;
memtable_list *_memtable_list;
schema_ptr _schema;
logalloc::allocating_section _read_section;
logalloc::allocating_section _allocating_section;
partitions_type partitions;
size_t nr_partitions = 0;
db::replay_position _replay_position;
db::rp_set _rp_set;
// mutation source to which reads fall-back after mark_flushed()
// so that memtable contents can be moved away while there are
// still active readers. This is needed for this mutation_source
// to be monotonic (not loose writes). Monotonicity of each
// mutation_source is necessary for the combined mutation source to be
// monotonic. That combined source in this case is cache + memtable.
mutation_source_opt _underlying;
uint64_t _flushed_memory = 0;
bool _merged_into_cache = false;
replica::table_stats& _table_stats;
class memtable_encoding_stats_collector : public encoding_stats_collector {
private:
min_max_tracker<api::timestamp_type> min_max_timestamp;
void update_timestamp(api::timestamp_type ts) noexcept;
public:
memtable_encoding_stats_collector() noexcept;
void update(atomic_cell_view cell) noexcept;
void update(tombstone tomb) noexcept;
void update(const ::schema& s, const row& r, column_kind kind);
void update(const range_tombstone& rt) noexcept;
void update(const row_marker& marker) noexcept;
void update(const ::schema& s, const deletable_row& dr);
void update(const ::schema& s, const mutation_partition& mp);
api::timestamp_type get_min_timestamp() const noexcept {
return min_max_timestamp.min();
}
api::timestamp_type get_max_timestamp() const noexcept {
return min_max_timestamp.max();
}
} _stats_collector;
void update(db::rp_handle&&);
friend class ::row_cache;
friend class memtable_entry;
friend class flush_reader;
friend class flush_memory_accounter;
friend class partition_snapshot_read_accounter;
private:
boost::iterator_range<partitions_type::const_iterator> slice(const dht::partition_range& r) const;
partition_entry& find_or_create_partition(const dht::decorated_key& key);
partition_entry& find_or_create_partition_slow(partition_key_view key);
void upgrade_entry(memtable_entry&);
void add_flushed_memory(uint64_t);
void remove_flushed_memory(uint64_t);
void clear() noexcept;
uint64_t dirty_size() const;
public:
explicit memtable(schema_ptr schema, dirty_memory_manager&, replica::table_stats& table_stats, memtable_list *memtable_list = nullptr,
seastar::scheduling_group compaction_scheduling_group = seastar::current_scheduling_group());
// Used for testing that want to control the flush process.
explicit memtable(schema_ptr schema);
~memtable();
// Clears this memtable gradually without consuming the whole CPU.
// Never resolves with a failed future.
future<> clear_gently() noexcept;
schema_ptr schema() const noexcept { return _schema; }
void set_schema(schema_ptr) noexcept;
future<> apply(memtable&, reader_permit);
// Applies mutation to this memtable.
// The mutation is upgraded to current schema.
void apply(const mutation& m, db::rp_handle&& = {});
// The mutation is upgraded to current schema.
void apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& = {});
void evict_entry(memtable_entry& e, mutation_cleaner& cleaner) noexcept;
static memtable& from_region(logalloc::region& r) noexcept {
return static_cast<memtable&>(r);
}
const logalloc::region& region() const noexcept {
return *this;
}
logalloc::region& region() noexcept {
return *this;
}
encoding_stats get_encoding_stats() const noexcept {
return _stats_collector.get();
}
api::timestamp_type get_min_timestamp() const noexcept {
return _stats_collector.get_min_timestamp();
}
api::timestamp_type get_max_timestamp() const noexcept {
return _stats_collector.get_max_timestamp();
}
mutation_cleaner& cleaner() noexcept {
return _cleaner;
}
public:
memtable_list* get_memtable_list() noexcept {
return _memtable_list;
}
size_t partition_count() const noexcept { return nr_partitions; }
logalloc::occupancy_stats occupancy() const noexcept;
// Creates a reader of data in this memtable for given partition range.
//
// Live readers share ownership of the memtable instance, so caller
// doesn't need to ensure that memtable remains live.
//
// The 'range' parameter must be live as long as the reader is being used
//
// Mutations returned by the reader will all have given schema.
flat_mutation_reader_v2 make_flat_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc = default_priority_class(),
tracing::trace_state_ptr trace_state_ptr = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) {
if (auto reader_opt = make_flat_reader_opt(s, permit, range, slice, pc, std::move(trace_state_ptr), fwd, fwd_mr)) {
return std::move(*reader_opt);
}
[[unlikely]] return make_empty_flat_reader_v2(std::move(s), std::move(permit));
}
// Same as make_flat_reader, but returns an empty optional instead of a no-op reader when there is nothing to
// read. This is an optimization.
flat_mutation_reader_v2_opt make_flat_reader_opt(schema_ptr,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc = default_priority_class(),
tracing::trace_state_ptr trace_state_ptr = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes);
flat_mutation_reader_v2 make_flat_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range = query::full_partition_range) {
auto& full_slice = s->full_slice();
return make_flat_reader(s, std::move(permit), range, full_slice);
}
flat_mutation_reader_v2 make_flush_reader(schema_ptr, reader_permit permit, const io_priority_class& pc = default_priority_class());
mutation_source as_data_source();
bool empty() const noexcept { return partitions.empty(); }
void mark_flushed(mutation_source) noexcept;
bool is_flushed() const noexcept;
void on_detach_from_region_group() noexcept;
void revert_flushed_memory() noexcept;
const db::replay_position& replay_position() const noexcept {
return _replay_position;
}
/**
* Returns the current rp_set, and resets the
* stored one to empty. Only used for flushing
* purposes, to one-shot report discarded rp:s
* to commitlog
*/
db::rp_set get_and_discard_rp_set() noexcept {
return std::exchange(_rp_set, {});
}
friend class iterator_reader;
dirty_memory_manager& get_dirty_memory_manager() noexcept {
return _dirty_mgr;
}
friend std::ostream& operator<<(std::ostream&, memtable&);
};
}