Merge 'Fix partition estimation with TWCS tables during streaming' from Raphael "Raph" Carvalho

TWCS tables require partition estimation adjustment as incoming streaming data can be segregated into the time windows.

Turns out we had two problems in this area that leads to suboptimal bloom filters.

1) With off-strategy enabled, data segregation is postponed, but partition estimation was adjusted as if segregation wasn't postponed. Solved by not adjusting estimation if segregation is postponed.
2) With off-strategy disabled, data segregation is not postponed, but streaming didn't feed any metadata into partition estimation procedure, meaning it had to assume the max windows input data can be segregated into (100). Solved by using schema's default TTL for a precise estimation of window count.

For the future, we want to dynamically size filters (see https://github.com/scylladb/scylladb/issues/2024), especially for TWCS that might have SSTables that are left uncompacted until they're fully expired, meaning that the system won't heal itself in a timely manner through compaction on a SSTable that had partition estimation really wrong.

Fixes https://github.com/scylladb/scylladb/issues/15704.

Closes scylladb/scylladb#15938

* github.com:scylladb/scylladb:
  streaming: Improve partition estimation with TWCS
  streaming: Don't adjust partition estimate if segregation is postponed
This commit is contained in:
Nadav Har'El
2023-11-14 20:41:36 +02:00
9 changed files with 55 additions and 20 deletions

View File

@@ -547,7 +547,7 @@ protected:
auto max_sstable_size = std::max<uint64_t>(_max_sstable_size, 1);
uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(_compacting_data_file_size) / max_sstable_size)));
return std::min(uint64_t(ceil(double(_estimated_partitions) / estimated_sstables)),
_table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimated_partitions));
_table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimated_partitions, _schema));
}
void setup_new_sstable(shared_sstable& sst) {
@@ -1636,7 +1636,7 @@ private:
uint64_t partitions_per_sstable(shard_id s) const {
uint64_t estimated_sstables = std::max(uint64_t(1), uint64_t(ceil(double(_estimation_per_shard[s].estimated_size) / _max_sstable_size)));
return std::min(uint64_t(ceil(double(_estimation_per_shard[s].estimated_partitions) / estimated_sstables)),
_table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimation_per_shard[s].estimated_partitions));
_table_s.get_compaction_strategy().adjust_partition_estimate(_ms_metadata, _estimation_per_shard[s].estimated_partitions, _schema));
}
public:
resharding_compaction(table_state& table_s, sstables::compaction_descriptor descriptor, compaction_data& cdata, compaction_progress_monitor& progress_monitor)

View File

@@ -66,7 +66,7 @@ bool compaction_strategy_impl::worth_dropping_tombstones(const shared_sstable& s
return sst->estimate_droppable_tombstone_ratio(gc_before) >= _tombstone_threshold;
}
uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const {
uint64_t compaction_strategy_impl::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr schema) const {
return partition_estimate;
}
@@ -704,8 +704,8 @@ compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema
return _compaction_strategy_impl->get_reshaping_job(std::move(input), schema, mode);
}
uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const {
return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate);
uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr schema) const {
return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate, std::move(schema));
}
reader_consumer_v2 compaction_strategy::make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const {

View File

@@ -104,7 +104,7 @@ public:
compaction_backlog_tracker make_backlog_tracker() const;
uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const;
uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr) const;
reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const;

View File

@@ -68,7 +68,7 @@ public:
virtual std::unique_ptr<compaction_backlog_tracker::impl> make_backlog_tracker() const = 0;
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const;
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr schema) const;
virtual reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const;

View File

@@ -184,16 +184,27 @@ public:
};
};
uint64_t time_window_compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const {
if (!ms_meta.min_timestamp || !ms_meta.max_timestamp) {
// Not enough information, we assume the worst
return partition_estimate / max_data_segregation_window_count;
}
const auto min_window = get_window_for(_options, *ms_meta.min_timestamp);
const auto max_window = get_window_for(_options, *ms_meta.max_timestamp);
const auto window_size = get_window_size(_options);
uint64_t time_window_compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr s) const {
// If not enough information, we assume the worst
auto estimated_window_count = max_data_segregation_window_count;
auto default_ttl = std::chrono::duration_cast<std::chrono::microseconds>(s->default_time_to_live());
bool min_and_max_ts_available = ms_meta.min_timestamp && ms_meta.max_timestamp;
auto estimate_window_count = [this] (timestamp_type min_window, timestamp_type max_window) {
const auto window_size = get_window_size(_options);
return (max_window + (window_size - 1) - min_window) / window_size;
};
auto estimated_window_count = (max_window + (window_size - 1) - min_window) / window_size;
if (!min_and_max_ts_available && default_ttl.count()) {
auto min_window = get_window_for(_options, timestamp_type(0));
auto max_window = get_window_for(_options, timestamp_type(default_ttl.count()));
estimated_window_count = estimate_window_count(min_window, max_window);
} else if (min_and_max_ts_available) {
auto min_window = get_window_for(_options, *ms_meta.min_timestamp);
auto max_window = get_window_for(_options, *ms_meta.max_timestamp);
estimated_window_count = estimate_window_count(min_window, max_window);
}
return partition_estimate / std::max(1UL, uint64_t(estimated_window_count));
}

View File

@@ -162,7 +162,7 @@ public:
virtual std::unique_ptr<compaction_backlog_tracker::impl> make_backlog_tracker() const override;
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) const override;
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr s) const override;
virtual reader_consumer_v2 make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) const override;

View File

@@ -995,7 +995,7 @@ table::try_flush_memtable_to_sstable(compaction_group& cg, lw_shared_ptr<memtabl
auto metadata = mutation_source_metadata{};
metadata.min_timestamp = old->get_min_timestamp();
metadata.max_timestamp = old->get_max_timestamp();
auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count());
auto estimated_partitions = _compaction_strategy.adjust_partition_estimate(metadata, old->partition_count(), _schema);
if (!cg.async_gate().is_closed()) {
co_await _compaction_manager.maybe_wait_for_sstable_count_reduction(cg.as_table_state());

View File

@@ -33,9 +33,10 @@ std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstrin
//FIXME: for better estimations this should be transmitted from remote
auto metadata = mutation_source_metadata{};
auto& cs = cf->get_compaction_strategy();
const auto adjusted_estimated_partitions = cs.adjust_partition_estimate(metadata, estimated_partitions);
// Data segregation is postponed to happen during off-strategy if latter is enabled, which
// means partition estimation shouldn't be adjusted.
const auto adjusted_estimated_partitions = (offstrategy) ? estimated_partitions : cs.adjust_partition_estimate(metadata, estimated_partitions, cf->schema());
auto make_interposer_consumer = [&cs, offstrategy] (const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) mutable {
// postpone data segregation to off-strategy compaction if enabled
if (offstrategy) {
return end_consumer;
}

View File

@@ -49,6 +49,7 @@
#include "mutation_writer/partition_based_splitting_writer.hh"
#include "compaction/table_state.hh"
#include "mutation/mutation_rebuilder.hh"
#include "mutation/mutation_source_metadata.hh"
#include <stdio.h>
#include <ftw.h>
@@ -3579,6 +3580,28 @@ SEASTAR_TEST_CASE(test_twcs_partition_estimate) {
auto close_cf = deferred_stop(cf);
cf->start();
auto ceil_div = [] (int dividend, int divisor) { return (dividend + divisor - 1) / divisor; };
auto estimation_test = [ceil_div] (schema_ptr s, uint64_t window_count) {
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::time_window, s->compaction_strategy_options());
mutation_source_metadata ms_metadata{};
const int partitions = 100;
BOOST_REQUIRE_EQUAL(cs.adjust_partition_estimate(ms_metadata, partitions, s),
ceil_div(partitions, window_count));
};
{
static constexpr int window_count = 20;
builder.set_default_time_to_live(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::hours(window_count)));
auto s = builder.build();
estimation_test(s, window_count);
}
{
builder.set_default_time_to_live(0s);
auto s = builder.build();
estimation_test(s, time_window_compaction_strategy::max_data_segregation_window_count);
}
std::vector<shared_sstable> sstables_spanning_many_windows = {
make_sstable(0),
make_sstable(1),