sstables: Fix quadratic space complexity in partitioned_sstable_set
Interval map is very susceptible to quadratic space behavior when
it's flooded with many entries overlapping all (or most of)
intervals, since each such entry will have presence on all
intervals it overlaps with.
A trigger we observed was memtable flush storm, which creates many
small "L0" sstables that spans roughly the entire token range.
Since we cannot rely on insertion order, solution will be about
storing sstables with such wide ranges in a vector (unleveled).
There should be no consequence for single-key reads, since upper
layer applies an additional filtering based on token of key being
queried.
And for range scans, there can be an increase in memory usage,
but not significant because the sstables span an wide range and
would have been selected in the combined reader if the range of
scan overlaps with them.
Anyway, this is a protection against storm of memtable flushes
and shouldn't be the common scenario.
It works both with tablets and vnodes, by adjusting the token
range spanned by compaction group accordingly.
Fixes #23634.
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit c77f710a0c)
This commit is contained in:
@@ -1301,7 +1301,7 @@ public:
|
||||
}
|
||||
|
||||
virtual sstables::sstable_set make_sstable_set_for_input() const override {
|
||||
return sstables::make_partitioned_sstable_set(_schema, false);
|
||||
return sstables::make_partitioned_sstable_set(_schema, _table_s.token_range());
|
||||
}
|
||||
|
||||
// Unconditionally enable incremental compaction if the strategy specifies a max output size, e.g. LCS.
|
||||
|
||||
@@ -394,7 +394,7 @@ future<sstables::sstable_set> compaction_task_executor::sstable_set_for_tombston
|
||||
auto compound_set = t.sstable_set_for_tombstone_gc();
|
||||
// Compound set will be linearized into a single set, since compaction might add or remove sstables
|
||||
// to it for incremental compaction to work.
|
||||
auto new_set = sstables::make_partitioned_sstable_set(t.schema(), false);
|
||||
auto new_set = sstables::make_partitioned_sstable_set(t.schema(), t.token_range());
|
||||
co_await compound_set->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) {
|
||||
auto inserted = new_set.insert(sst);
|
||||
if (!inserted) {
|
||||
|
||||
@@ -790,7 +790,7 @@ future<reshape_config> make_reshape_config(const sstables::storage& storage, res
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> incremental_compaction_strategy::make_sstable_set(const table_state& ts) const {
|
||||
return std::make_unique<partitioned_sstable_set>(ts.schema(), false);
|
||||
return std::make_unique<partitioned_sstable_set>(ts.schema(), ts.token_range());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -143,7 +143,11 @@ future<> view_update_generator::start() {
|
||||
// Exploit the fact that sstables in the staging directory
|
||||
// are usually non-overlapping and use a partitioned set for
|
||||
// the read.
|
||||
auto ssts = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, false));
|
||||
// With tablets, it doesn't matter full range is fed into partitioned set since
|
||||
// there will be usually one sstable to be processed per tablet, and sstables of
|
||||
// different tablets are disjoint.
|
||||
auto token_range = dht::token_range::make(dht::first_token(), dht::last_token());
|
||||
auto ssts = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, std::move(token_range)));
|
||||
for (auto& sst : sstables) {
|
||||
ssts->insert(sst);
|
||||
input_size += sst->data_size();
|
||||
|
||||
@@ -132,9 +132,7 @@ lw_shared_ptr<const sstables::sstable_set> table::make_compound_sstable_set() co
|
||||
}
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set> compaction_group::make_maintenance_sstable_set() const {
|
||||
// Level metadata is not used because (level 0) maintenance sstables are disjoint and must be stored for efficient retrieval in the partitioned set
|
||||
bool use_level_metadata = false;
|
||||
return make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(_t.schema(), use_level_metadata));
|
||||
return make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(_t.schema(), token_range()));
|
||||
}
|
||||
|
||||
void table::refresh_compound_sstable_set() {
|
||||
@@ -686,7 +684,13 @@ public:
|
||||
: _t(t)
|
||||
{
|
||||
storage_group_map r;
|
||||
auto cg = make_lw_shared<compaction_group>(_t, size_t(0), dht::token_range::make_open_ended_both_sides());
|
||||
// this might not reflect real vnode range for this node, but with 256 tokens, the actual
|
||||
// first and last tokens are likely to be ~0.5% of the edges, so any measurement against
|
||||
// this accurate enough token range will be likely up to ~1% off.
|
||||
// TODO: we could fed actual vnode range here, but we might bump into a chicken and egg
|
||||
// problem if e.g. a system table is created before tokens were allocated.
|
||||
auto full_token_range = dht::token_range::make(dht::first_token(), dht::last_token());
|
||||
auto cg = make_lw_shared<compaction_group>(_t, size_t(0), std::move(full_token_range));
|
||||
_single_cg = cg.get();
|
||||
auto sg = make_lw_shared<storage_group>(std::move(cg));
|
||||
_single_sg = sg.get();
|
||||
|
||||
@@ -268,7 +268,12 @@ partitioned_sstable_set::query(const dht::partition_range& range) const {
|
||||
}
|
||||
|
||||
bool partitioned_sstable_set::store_as_unleveled(const shared_sstable& sst) const {
|
||||
return _use_level_metadata && sst->get_sstable_level() == 0;
|
||||
// When a sstable spans most of the entire token range, we'll store it in a
|
||||
// vector, to avoid triggering quadratic space complexity in the interval map,
|
||||
// since many of such sstables would have presence on almost all intervals.
|
||||
static constexpr float unleveled_threshold = 0.85f;
|
||||
auto sst_tr = dht::token_range(sst->get_first_decorated_key().token(), sst->get_last_decorated_key().token());
|
||||
return dht::overlap_ratio(_token_range, sst_tr) >= unleveled_threshold;
|
||||
}
|
||||
|
||||
dht::ring_position partitioned_sstable_set::to_ring_position(const dht::compatible_ring_position_or_view& crp) {
|
||||
@@ -297,10 +302,10 @@ dht::partition_range partitioned_sstable_set::to_partition_range(const dht::ring
|
||||
return dht::partition_range::make(std::move(lower_bound), std::move(upper_bound));
|
||||
}
|
||||
|
||||
partitioned_sstable_set::partitioned_sstable_set(schema_ptr schema, bool use_level_metadata)
|
||||
partitioned_sstable_set::partitioned_sstable_set(schema_ptr schema, dht::token_range token_range)
|
||||
: _schema(std::move(schema))
|
||||
, _all(make_lw_shared<sstable_list>())
|
||||
, _use_level_metadata(use_level_metadata) {
|
||||
, _token_range(std::move(token_range)) {
|
||||
}
|
||||
|
||||
static std::unordered_map<run_id, shared_sstable_run> clone_runs(const std::unordered_map<run_id, shared_sstable_run>& runs) {
|
||||
@@ -310,18 +315,18 @@ static std::unordered_map<run_id, shared_sstable_run> clone_runs(const std::unor
|
||||
}
|
||||
|
||||
partitioned_sstable_set::partitioned_sstable_set(schema_ptr schema, const std::vector<shared_sstable>& unleveled_sstables, const interval_map_type& leveled_sstables,
|
||||
const lw_shared_ptr<sstable_list>& all, const std::unordered_map<run_id, shared_sstable_run>& all_runs, bool use_level_metadata, uint64_t bytes_on_disk)
|
||||
const lw_shared_ptr<sstable_list>& all, const std::unordered_map<run_id, shared_sstable_run>& all_runs, dht::token_range token_range, uint64_t bytes_on_disk)
|
||||
: sstable_set_impl(bytes_on_disk)
|
||||
, _schema(schema)
|
||||
, _unleveled_sstables(unleveled_sstables)
|
||||
, _leveled_sstables(leveled_sstables)
|
||||
, _all(make_lw_shared<sstable_list>(*all))
|
||||
, _all_runs(clone_runs(all_runs))
|
||||
, _use_level_metadata(use_level_metadata) {
|
||||
, _token_range(std::move(token_range)) {
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> partitioned_sstable_set::clone() const {
|
||||
return std::make_unique<partitioned_sstable_set>(_schema, _unleveled_sstables, _leveled_sstables, _all, _all_runs, _use_level_metadata, _bytes_on_disk);
|
||||
return std::make_unique<partitioned_sstable_set>(_schema, _unleveled_sstables, _leveled_sstables, _all, _all_runs, _token_range, _bytes_on_disk);
|
||||
}
|
||||
|
||||
std::vector<shared_sstable> partitioned_sstable_set::select(const dht::partition_range& range) const {
|
||||
@@ -752,20 +757,19 @@ sstable_set_impl::selector_and_schema_t partitioned_sstable_set::make_incrementa
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> compaction_strategy_impl::make_sstable_set(const table_state& ts) const {
|
||||
// with use_level_metadata enabled, L0 sstables will not go to interval map, which suits well STCS.
|
||||
return std::make_unique<partitioned_sstable_set>(ts.schema(), true);
|
||||
return std::make_unique<partitioned_sstable_set>(ts.schema(), ts.token_range());
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> leveled_compaction_strategy::make_sstable_set(const table_state& ts) const {
|
||||
return std::make_unique<partitioned_sstable_set>(ts.schema());
|
||||
return std::make_unique<partitioned_sstable_set>(ts.schema(), ts.token_range());
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> time_window_compaction_strategy::make_sstable_set(const table_state& ts) const {
|
||||
return std::make_unique<time_series_sstable_set>(ts.schema(), _options.enable_optimized_twcs_queries);
|
||||
}
|
||||
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata) {
|
||||
return sstable_set(std::make_unique<partitioned_sstable_set>(schema, use_level_metadata));
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, dht::token_range token_range) {
|
||||
return sstable_set(std::make_unique<partitioned_sstable_set>(schema, std::move(token_range)));
|
||||
}
|
||||
|
||||
sstable_set
|
||||
|
||||
@@ -256,7 +256,7 @@ public:
|
||||
friend class compound_sstable_set;
|
||||
};
|
||||
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true);
|
||||
sstable_set make_partitioned_sstable_set(schema_ptr schema, dht::token_range token_range);
|
||||
|
||||
sstable_set make_compound_sstable_set(schema_ptr schema, std::vector<lw_shared_ptr<sstable_set>> sets);
|
||||
|
||||
|
||||
@@ -33,7 +33,8 @@ private:
|
||||
// Change counter on interval map for leveled sstables which is used by
|
||||
// incremental selector to determine whether or not to invalidate iterators.
|
||||
uint64_t _leveled_sstables_change_cnt = 0;
|
||||
bool _use_level_metadata = false;
|
||||
// Token range spanned by the compaction group owning this sstable set.
|
||||
dht::token_range _token_range;
|
||||
private:
|
||||
static interval_type make_interval(const schema& s, const dht::partition_range& range);
|
||||
interval_type make_interval(const dht::partition_range& range) const;
|
||||
@@ -49,7 +50,7 @@ public:
|
||||
static dht::partition_range to_partition_range(const dht::ring_position_view& pos, const interval_type& i);
|
||||
|
||||
partitioned_sstable_set(const partitioned_sstable_set&) = delete;
|
||||
explicit partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true);
|
||||
explicit partitioned_sstable_set(schema_ptr schema, dht::token_range token_range);
|
||||
// For cloning the partitioned_sstable_set (makes a deep copy, including *_all)
|
||||
explicit partitioned_sstable_set(
|
||||
schema_ptr schema,
|
||||
@@ -57,7 +58,7 @@ public:
|
||||
const interval_map_type& leveled_sstables,
|
||||
const lw_shared_ptr<sstable_list>& all,
|
||||
const std::unordered_map<run_id, shared_sstable_run>& all_runs,
|
||||
bool use_level_metadata,
|
||||
dht::token_range token_range,
|
||||
uint64_t bytes_on_disk);
|
||||
|
||||
virtual std::unique_ptr<sstable_set_impl> clone() const override;
|
||||
|
||||
@@ -412,7 +412,7 @@ future<> sstable_streamer::stream_sstable_mutations(streaming::plan_id ops_uuid,
|
||||
const auto cf_id = s->id();
|
||||
const auto reason = streaming::stream_reason::repair;
|
||||
|
||||
auto sst_set = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, false));
|
||||
auto sst_set = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, std::move(token_range)));
|
||||
size_t estimated_partitions = 0;
|
||||
for (auto& sst : sstables) {
|
||||
estimated_partitions += sst->estimated_keys_for_range(token_range);
|
||||
|
||||
@@ -73,8 +73,8 @@ public:
|
||||
single_compaction_group(table_for_tests& t, sstables::sstables_manager& sst_man, std::function<shared_sstable()> sstable_factory)
|
||||
: _schema(t.schema())
|
||||
, _sst_man(sst_man)
|
||||
, _main_set(sstables::make_partitioned_sstable_set(_schema, false))
|
||||
, _maintenance_set(sstables::make_partitioned_sstable_set(_schema, false))
|
||||
, _main_set(sstables::make_partitioned_sstable_set(_schema, token_range()))
|
||||
, _maintenance_set(sstables::make_partitioned_sstable_set(_schema, token_range()))
|
||||
, _compaction_strategy(sstables::make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
|
||||
, _compaction_strategy_state(compaction::compaction_strategy_state::make(_compaction_strategy))
|
||||
, _tombstone_gc_state(nullptr)
|
||||
|
||||
@@ -4991,8 +4991,9 @@ SEASTAR_TEST_CASE(compound_sstable_set_incremental_selector_test) {
|
||||
};
|
||||
|
||||
auto incremental_selection_test = [&] (strategy_param param) {
|
||||
auto set1 = make_lw_shared<sstable_set>(sstables::make_partitioned_sstable_set(s, false));
|
||||
auto set2 = make_lw_shared<sstable_set>(sstables::make_partitioned_sstable_set(s, bool(param)));
|
||||
auto token_range = dht::token_range::make(dht::first_token(), dht::last_token());
|
||||
auto set1 = make_lw_shared<sstable_set>(sstables::make_partitioned_sstable_set(s, token_range));
|
||||
auto set2 = make_lw_shared<sstable_set>(sstables::make_partitioned_sstable_set(s, token_range));
|
||||
new_sstable(set1, 1, 1, 1);
|
||||
new_sstable(set2, 0, 2, 1);
|
||||
new_sstable(set2, 3, 3, 1);
|
||||
|
||||
@@ -28,8 +28,10 @@ BOOST_AUTO_TEST_SUITE(sstable_set_test)
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
static sstables::sstable_set make_sstable_set(schema_ptr schema, lw_shared_ptr<sstable_list> all = {}, bool use_level_metadata = true) {
|
||||
auto ret = sstables::sstable_set(std::make_unique<partitioned_sstable_set>(schema, use_level_metadata));
|
||||
static auto full_range = dht::token_range::make(dht::first_token(), dht::last_token());
|
||||
|
||||
static sstables::sstable_set make_sstable_set(schema_ptr schema, lw_shared_ptr<sstable_list> all = {}) {
|
||||
auto ret = sstables::sstable_set(std::make_unique<partitioned_sstable_set>(schema, full_range));
|
||||
for (auto& sst : *all) {
|
||||
ret.insert(sst);
|
||||
}
|
||||
@@ -157,7 +159,7 @@ SEASTAR_TEST_CASE(test_partitioned_sstable_set_bytes_on_disk) {
|
||||
auto sst1 = make_sstable_easy(env, std::move(mr), cfg);
|
||||
auto size1 = sst1->bytes_on_disk();
|
||||
|
||||
auto ss1 = make_lw_shared<sstable_set>(std::make_unique<partitioned_sstable_set>(ss.schema(), true));
|
||||
auto ss1 = make_lw_shared<sstable_set>(std::make_unique<partitioned_sstable_set>(ss.schema(), full_range));
|
||||
ss1->insert(sst1);
|
||||
BOOST_REQUIRE_EQUAL(ss1->bytes_on_disk(), size1);
|
||||
|
||||
@@ -232,7 +234,7 @@ SEASTAR_TEST_CASE(test_sstable_set_fast_forward_by_cache_reader_simulation) {
|
||||
testlog.info("sstable [{}, {}]", sst->get_first_decorated_key().token(), sst->get_last_decorated_key().token());
|
||||
ssts.push_back(std::move(sst));
|
||||
}
|
||||
auto set = make_lw_shared<sstable_set>(std::make_unique<partitioned_sstable_set>(ss.schema(), false));
|
||||
auto set = make_lw_shared<sstable_set>(std::make_unique<partitioned_sstable_set>(ss.schema(), full_range));
|
||||
for (auto& sst : ssts) {
|
||||
set->insert(sst);
|
||||
}
|
||||
|
||||
@@ -155,7 +155,8 @@ private:
|
||||
const auto start = perf_sstable_test_env::now();
|
||||
|
||||
// mimic the behavior of sstable_streamer::stream_sstable_mutations()
|
||||
auto sst_set = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, false));
|
||||
auto full_token_range = dht::token_range::make(dht::first_token(), dht::last_token());
|
||||
auto sst_set = make_lw_shared<sstables::sstable_set>(sstables::make_partitioned_sstable_set(s, std::move(full_token_range)));
|
||||
// stream all previously loaded sstables
|
||||
for (auto& sst : _sst) {
|
||||
sst_set->insert(sst);
|
||||
|
||||
@@ -921,8 +921,8 @@ public:
|
||||
, _permit(std::move(permit))
|
||||
, _sst_man(sst_man)
|
||||
, _output_dir(std::move(output_dir))
|
||||
, _main_set(sstables::make_partitioned_sstable_set(_schema, false))
|
||||
, _maintenance_set(sstables::make_partitioned_sstable_set(_schema, false))
|
||||
, _main_set(sstables::make_partitioned_sstable_set(_schema, token_range()))
|
||||
, _maintenance_set(sstables::make_partitioned_sstable_set(_schema, token_range()))
|
||||
, _compaction_strategy(sstables::make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
|
||||
, _compaction_strategy_state(compaction::compaction_strategy_state::make(_compaction_strategy))
|
||||
, _tombstone_gc_state(nullptr)
|
||||
|
||||
Reference in New Issue
Block a user