compaction_strategy: add method to reshape SSTables

Some SSTable sets are considered to be off-strategy: they are in a shape
that is at best not optimal and at worst adversarial to the current
compaction strategy.

This patch introduces the compaction strategy-specific method
get_reshaping_job(). Given an SSTable set, it returns one compaction
that can be done to bring the table closer to being in-strategy. The
caller can then call this repeatedly until the table is fully
in-strategy.

As an example of how this is supposed to work, consider TWCS: some
SSTables will belong to a single window -> in which case they are
already in-strategy and don't need to be compacted, and others span
multiple windows in which case they are considered off-strategy and
have to be compacted.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
Glauber Costa
2020-05-28 11:27:31 -04:00
parent 0467bd0a94
commit 3c254dd49d
10 changed files with 166 additions and 0 deletions

View File

@@ -23,6 +23,7 @@
#include <seastar/core/future.hh>
#include <seastar/util/noncopyable_function.hh>
#include <seastar/core/file.hh>
#include "schema_fwd.hh"
#include "sstables/shared_sstable.hh"
@@ -135,6 +136,20 @@ public:
// Returns whether or not interposer consumer is used by a given strategy.
bool use_interposer_consumer() const;
// Informs the caller (usually the compaction manager) about what would it take for this set of
// SSTables closer to becoming in-strategy. If this returns an empty compaction descriptor, this
// means that the sstable set is already in-strategy.
//
// The caller can specify one of two modes: strict or relaxed. In relaxed mode the tolerance for
// what is considered offstrategy is higher. It can be used, for instance, for when the system
// is restarting and previous compactions were likely in-flight. In strict mode, we are less
// tolerant to invariant breakages.
//
// The caller should also pass a maximum number of SSTables which is the maximum amount of
// SSTables that can be added into a single job.
compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode);
};
// Creates a compaction_strategy object from one of the strategies available.

View File

@@ -32,4 +32,5 @@ enum class compaction_strategy_type {
time_window,
};
enum class reshape_mode { strict, relaxed };
}

View File

@@ -481,6 +481,11 @@ reader_consumer compaction_strategy_impl::make_interposer_consumer(const mutatio
return end_consumer;
}
compaction_descriptor
compaction_strategy_impl::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) {
return compaction_descriptor();
}
} // namespace sstables
size_tiered_backlog_tracker::inflight_component
@@ -1011,6 +1016,11 @@ compaction_backlog_tracker& compaction_strategy::get_backlog_tracker() {
return _compaction_strategy_impl->get_backlog_tracker();
}
sstables::compaction_descriptor
compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) {
return _compaction_strategy_impl->get_reshaping_job(std::move(input), schema, iop, mode);
}
uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) {
return _compaction_strategy_impl->adjust_partition_estimate(ms_meta, partition_estimate);
}

View File

@@ -103,5 +103,7 @@ public:
virtual bool use_interposer_consumer() const {
return false;
}
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode);
};
}

View File

@@ -171,4 +171,69 @@ int64_t leveled_compaction_strategy::estimated_pending_compactions(column_family
return manifest.get_estimated_tasks();
}
compaction_descriptor
leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) {
std::array<std::vector<shared_sstable>, leveled_manifest::MAX_LEVELS> level_info;
auto is_disjoint = [this, schema] (const std::vector<shared_sstable>& sstables, unsigned tolerance) {
unsigned disjoint_sstables = 0;
auto prev_last = dht::ring_position::min();
for (auto& sst : sstables) {
if (dht::ring_position(sst->get_first_decorated_key()).less_compare(*schema, prev_last)) {
disjoint_sstables++;
}
prev_last = dht::ring_position(sst->get_last_decorated_key());
}
return disjoint_sstables > tolerance;
};
for (auto& sst : input) {
auto sst_level = sst->get_sstable_level();
if (sst_level > leveled_manifest::MAX_LEVELS) {
leveled_manifest::logger.warn("Found SSTable with level {}, higher than the maximum {}. This is unexpected, but will fix", sst_level, leveled_manifest::MAX_LEVELS);
// This is really unexpected, so we'll just compact it all to fix it
compaction_descriptor desc(std::move(input), std::optional<sstables::sstable_set>(), iop, leveled_manifest::MAX_LEVELS - 1, _max_sstable_size_in_mb * 1024 * 1024);
desc.options = compaction_options::make_reshape();
return desc;
}
level_info[sst_level].push_back(sst);
}
for (auto& level : level_info) {
std::sort(level.begin(), level.end(), [this, schema] (shared_sstable a, shared_sstable b) {
return dht::ring_position(a->get_first_decorated_key()).less_compare(*schema, dht::ring_position(b->get_first_decorated_key()));
});
}
unsigned max_filled_level = 0;
size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4);
size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold));
unsigned tolerance = mode == reshape_mode::strict ? 0 : leveled_manifest::leveled_fan_out * 2;
if (level_info[0].size() > offstrategy_threshold) {
level_info[0].resize(std::min(level_info[0].size(), max_sstables));
compaction_descriptor desc(std::move(level_info[0]), std::optional<sstables::sstable_set>(), iop);
desc.options = compaction_options::make_reshape();
return desc;
}
for (unsigned level = 1; level < leveled_manifest::MAX_LEVELS; ++level) {
if (level_info[level].empty()) {
continue;
}
max_filled_level = std::max(max_filled_level, level);
if (!is_disjoint(level_info[level], tolerance)) {
// Unfortunately no good limit to limit input size to max_sstables for LCS major
compaction_descriptor desc(std::move(input), std::optional<sstables::sstable_set>(), iop, max_filled_level, _max_sstable_size_in_mb * 1024 * 1024);
desc.options = compaction_options::make_reshape();
return desc;
}
}
return compaction_descriptor();
}
}

View File

@@ -63,6 +63,8 @@ public:
virtual compaction_backlog_tracker& get_backlog_tracker() override {
return _backlog_tracker;
}
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) override;
};
}

View File

@@ -208,4 +208,25 @@ size_tiered_compaction_strategy::most_interesting_bucket(const std::vector<sstab
return most_interesting;
}
compaction_descriptor
size_tiered_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode)
{
size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4);
size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold));
if (mode == reshape_mode::relaxed) {
offstrategy_threshold = max_sstables;
}
for (auto& bucket : get_buckets(input)) {
if (bucket.size() >= offstrategy_threshold) {
bucket.resize(std::min(max_sstables, bucket.size()));
compaction_descriptor desc(std::move(bucket), std::optional<sstables::sstable_set>(), iop);
desc.options = compaction_options::make_reshape();
}
}
return compaction_descriptor();
}
}

View File

@@ -168,6 +168,9 @@ public:
virtual compaction_backlog_tracker& get_backlog_tracker() override {
return _backlog_tracker;
}
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) override;
};
}

View File

@@ -80,4 +80,49 @@ reader_consumer time_window_compaction_strategy::make_interposer_consumer(const
};
}
compaction_descriptor
time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) {
std::vector<shared_sstable> single_window;
std::vector<shared_sstable> multi_window;
size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4);
size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold));
if (mode == reshape_mode::relaxed) {
offstrategy_threshold = max_sstables;
}
for (auto& sst : input) {
auto min = sst->get_stats_metadata().min_timestamp;
auto max = sst->get_stats_metadata().max_timestamp;
if (get_window_for(_options, min) != get_window_for(_options, max)) {
multi_window.push_back(sst);
} else {
single_window.push_back(sst);
}
}
if (!multi_window.empty()) {
// Everything that spans multiple windows will need reshaping
multi_window.resize(std::min(multi_window.size(), max_sstables));
compaction_descriptor desc(std::move(multi_window), std::optional<sstables::sstable_set>(), iop);
desc.options = compaction_options::make_reshape();
return desc;
}
// For things that don't span multiple windows, we compact windows that are individually too big
auto all_buckets = get_buckets(single_window, _options);
for (auto& pair : all_buckets.first) {
auto ssts = std::move(pair.second);
if (ssts.size() > offstrategy_threshold) {
ssts.resize(std::min(multi_window.size(), max_sstables));
compaction_descriptor desc(std::move(ssts), std::optional<sstables::sstable_set>(), iop);
desc.options = compaction_options::make_reshape();
return desc;
}
}
return compaction_descriptor();
}
}

View File

@@ -351,6 +351,8 @@ public:
virtual bool use_interposer_consumer() const override {
return true;
}
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) override;
};
}