Introduce incremental compaction strategy (ICS)
ICS is a compaction strategy that inherits size tiered properties -- therefore it's write optimized too -- but fixes its space overhead of 100% due to input files being only released on completion. That's achieved with the concept of sstable run (similar in concept to LCS levels) which breaks a large sstable into fixed-size chunks (1G by default), known as run fragments. ICS picks similar-sized runs for compaction, and fragments of those runs can be released incrementally as they're compacted, reducing the space overhead to about (number_of_input_runs * 1G). This allows user to increase storage density of nodes (from 50% to ~80%), reducing the cost of ownership. NOTE: test_system_schema_version_is_stable adjusted to account for batchlog using IncrementalCompactionStrategy contains: compaction/: added incremental_compaction_strategy.cc (.hh), incremental_backlog_tracker.cc (.hh) compaction/CMakeLists.txt: include ICS cc files configure.py: changes for ICS files, includes test db/legacy_schema_migrator.cc / db/schema_tables.cc: fallback to ICS when strategy is not supported db/system_keyspace: pick ICS for some system tables schema/schema.hh: ICS becomes default test/boost: Add incremental_compaction_test.cc test/boost/sstable_compaction_test.cc: ICS related changes test/cqlpy/test_compaction_strategy_validation.py: ICS related changes docs/architecture/compaction/compaction-strategies.rst: changes to ICS section docs/cql/compaction.rst: changes to ICS section docs/cql/ddl.rst: adds reference to ICS options docs/getting-started/system-requirements.rst: updates sentence mentioning ICS docs/kb/compaction.rst: changes to ICS section docs/kb/garbage-collection-ics.rst: add file docs/kb/index.rst: add reference to <garbage-collection-ics> docs/operating-scylla/procedures/tips/production-readiness.rst: add ICS section some relevant commits throughout the ICS history: commit 434b97699b39c570d0d849d372bf64f418e5c692 Merge: 105586f747 30250749b8 Author: Paweł Dziepak <pdziepak@scylladb.com> Date: Tue Mar 12 12:14:23 2019 +0000 Merge "Introduce Incremental Compaction Strategy (ICS)" from Raphael " Introduce new compaction strategy which is essentially like size tiered but will work with the existing incremental compaction. Thus incremental compaction strategy. It works like size tiered, but each element composing a tier is a sstable run, meaning that the compaction strategy will look for N similar-sized sstable runs to compact, not just individual sstables. Parameters: * "sstable_size_in_mb": defines the maximum sstable (fragment) size composing a sstable run, which impacts directly the disk space requirement which is improved with incremental compaction. The lower the value the lower the space requirement for compaction because fragments involved will be released more frequently. * all others available in size tiered compaction strategy HOWTO ===== To change an existing table to use it, do: ALTER TABLE mykeyspace.mytable WITH compaction = {'class' : 'IncrementalCompactionStrategy'}; Set fragment size: ALTER TABLE mykeyspace.mytable WITH compaction = {'class' : 'IncrementalCompactionStrategy', 'sstable_size_in_mb' : 1000 } " commit 94ef3cd29a196bedbbeb8707e20fe78a197f30a1 Merge: dca89ce7a5 e08ef3e1a3 Author: Avi Kivity <avi@scylladb.com> Date: Tue Sep 8 11:31:52 2020 +0300 Merge "Add feature to limit space amplification in Incremental Compaction" from Raphael " A new option, space_amplification_goal (SAG), is being added to ICS. This option will allow ICS user to set a goal on the space amplification (SA). It's not supposed to be an upper bound on the space amplification, but rather, a goal. This new option will be disabled by default as it doesn't benefit write-only (no overwrites) workloads and could hurt severely the write performance. The strategy is free to delay triggering this new behavior, in order to increase overall compaction efficiency. The graph below shows how this feature works in practice for different values of space_amplification_goal: https://user-images.githubusercontent.com/1409139/89347544-60b7b980-d681-11ea-87ab-e2fdc3ecb9f0.png When strategy finds space amplification crossed space_amplification_goal, it will work on reducing the SA by doing a cross-tier compaction on the two largest tiers. This feature works only on the two largest tiers, because taking into account others, could hurt the compaction efficiency which is based on the fact that the more similar-sized sstables are compacted together the higher the compaction efficiency will be. With SAG enabled, min_threshold only plays an important role on the smallest tiers, given that the second-largest tier could be compacted into the largest tier for a space_amplification_goal value < 2. By making the options space_amplification_goal and min_threshold independent, user will be able to tune write amplification and space amplification, based on the needs. The lower the space_amplification_goal the higher the write amplification, but by increasing the min threshold, the write amplification can be decreased to a desired amount. " commit 7d90911c5fb3fa891ad64a62147c3a6ca26d61b1 Author: Raphael S. Carvalho <raphaelsc@scylladb.com> Date: Sat Oct 16 13:41:46 2021 -0300 compaction: ICS: Add garbage collection Today, ICS lacks an approach to persist expired tombstones in a timely manner, which is a problem because accumulation of tombstones are known to affecting latency considerably. For an expired tombstone to be purged, it has to reach the top of the LSM tree and hope that older overlapping data wasn't introduced at the bottom. The condition are there and must be satisfied to avoid data resurrection. STCS, today, has an inefficient garbage collection approach because it only picks a single sstable, which satisfies the tombstone density threshold and file staleness. That's a problem because overlapping data either on same tier or smaller tiers will prevent tombstones from being purged. Also, nothing is done to push the tombstones to the top of the tree, for the conditions to be eventually satisfied. Due to incremental compaction, ICS can more easily have an effecient GC by doing cross-tier compaction of relevant tiers. The trigger will be file staleness and tombstone density, which threshold values can be configured by tombstone_compaction_interval and tombstone_threshold, respectively. If ICS finds a tier which meets both conditions, then that tier and the larger[1] *and* closest-in-size[2] tier will be compacted together. [1]: A larger tier is picked because we want tombstones to eventually reach the top of the tree. [2]: It also has to be the closest-in-size tier as the smaller the size difference the higher the efficiency of the compaction. We want to minimize write amplification as much as possible. The staleness condition is there to prevent the same file from being picked over and over again in a short interval. With this approach, ICS will be continuously working to purge garbage while not hurting overall efficiency on a steady state, as same-tier compactions are prioritized. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com> Message-Id: <20211016164146.38010-1-raphaelsc@scylladb.com> Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com> Closes scylladb/scylladb#22063
This commit is contained in:
committed by
Avi Kivity
parent
4c89e62470
commit
c973254362
@@ -4,6 +4,8 @@ target_sources(compaction
|
||||
compaction.cc
|
||||
compaction_manager.cc
|
||||
compaction_strategy.cc
|
||||
incremental_backlog_tracker.cc
|
||||
incremental_compaction_strategy.cc
|
||||
leveled_compaction_strategy.cc
|
||||
size_tiered_compaction_strategy.cc
|
||||
task_manager_module.cc
|
||||
|
||||
@@ -29,8 +29,11 @@
|
||||
#include "size_tiered_backlog_tracker.hh"
|
||||
#include "leveled_manifest.hh"
|
||||
#include "utils/to_string.hh"
|
||||
#include "incremental_compaction_strategy.hh"
|
||||
#include "sstables/sstable_set_impl.hh"
|
||||
|
||||
logging::logger leveled_manifest::logger("LeveledManifest");
|
||||
logging::logger compaction_strategy_logger("CompactionStrategy");
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
@@ -172,6 +175,9 @@ void compaction_strategy_impl::validate_options_for_strategy_type(const std::map
|
||||
case compaction_strategy_type::time_window:
|
||||
time_window_compaction_strategy::validate_options(options, unchecked_options);
|
||||
break;
|
||||
case compaction_strategy_type::incremental:
|
||||
incremental_compaction_strategy::validate_options(options, unchecked_options);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
@@ -756,6 +762,9 @@ compaction_strategy make_compaction_strategy(compaction_strategy_type strategy,
|
||||
case compaction_strategy_type::time_window:
|
||||
impl = ::make_shared<time_window_compaction_strategy>(options);
|
||||
break;
|
||||
case compaction_strategy_type::incremental:
|
||||
impl = make_shared<incremental_compaction_strategy>(incremental_compaction_strategy(options));
|
||||
break;
|
||||
default:
|
||||
throw std::runtime_error("strategy not supported");
|
||||
}
|
||||
@@ -770,6 +779,10 @@ future<reshape_config> make_reshape_config(const sstables::storage& storage, res
|
||||
};
|
||||
}
|
||||
|
||||
std::unique_ptr<sstable_set_impl> incremental_compaction_strategy::make_sstable_set(schema_ptr schema) const {
|
||||
return std::make_unique<partitioned_sstable_set>(std::move(schema), false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace compaction {
|
||||
@@ -778,6 +791,7 @@ compaction_strategy_state compaction_strategy_state::make(const compaction_strat
|
||||
switch (cs.type()) {
|
||||
case compaction_strategy_type::null:
|
||||
case compaction_strategy_type::size_tiered:
|
||||
case compaction_strategy_type::incremental:
|
||||
return compaction_strategy_state(default_empty_state{});
|
||||
case compaction_strategy_type::leveled:
|
||||
return compaction_strategy_state(leveled_compaction_strategy_state{});
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
|
||||
struct mutation_source_metadata;
|
||||
class compaction_backlog_tracker;
|
||||
extern logging::logger compaction_strategy_logger;
|
||||
|
||||
using namespace compaction;
|
||||
|
||||
@@ -69,6 +70,8 @@ public:
|
||||
return "LeveledCompactionStrategy";
|
||||
case compaction_strategy_type::time_window:
|
||||
return "TimeWindowCompactionStrategy";
|
||||
case compaction_strategy_type::incremental:
|
||||
return "IncrementalCompactionStrategy";
|
||||
default:
|
||||
throw std::runtime_error("Invalid Compaction Strategy");
|
||||
}
|
||||
@@ -85,6 +88,8 @@ public:
|
||||
return compaction_strategy_type::leveled;
|
||||
} else if (short_name == "TimeWindowCompactionStrategy") {
|
||||
return compaction_strategy_type::time_window;
|
||||
} else if (short_name == "IncrementalCompactionStrategy") {
|
||||
return compaction_strategy_type::incremental;
|
||||
} else {
|
||||
throw exceptions::configuration_exception(format("Unable to find compaction strategy class '{}'", name));
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ enum class compaction_strategy_type {
|
||||
size_tiered,
|
||||
leveled,
|
||||
time_window,
|
||||
incremental,
|
||||
};
|
||||
|
||||
enum class reshape_mode { strict, relaxed };
|
||||
|
||||
128
compaction/incremental_backlog_tracker.cc
Normal file
128
compaction/incremental_backlog_tracker.cc
Normal file
@@ -0,0 +1,128 @@
|
||||
/*
|
||||
* Copyright (C) 2019-present ScyllaDB
|
||||
*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "incremental_backlog_tracker.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
incremental_backlog_tracker::inflight_component incremental_backlog_tracker::compacted_backlog(const compaction_backlog_tracker::ongoing_compactions& ongoing_compactions) const {
|
||||
inflight_component in;
|
||||
for (auto& crp : ongoing_compactions) {
|
||||
if (!_sstable_runs_contributing_backlog.contains(crp.first->run_identifier())) {
|
||||
continue;
|
||||
}
|
||||
auto compacted = crp.second->compacted();
|
||||
in.total_bytes += compacted;
|
||||
in.contribution += compacted * log4((crp.first->data_size()));
|
||||
}
|
||||
return in;
|
||||
}
|
||||
|
||||
incremental_backlog_tracker::backlog_calculation_result
|
||||
incremental_backlog_tracker::calculate_sstables_backlog_contribution(const std::unordered_map<sstables::run_id, sstable_run>& all, const incremental_compaction_strategy_options& options, unsigned threshold) {
|
||||
int64_t total_backlog_bytes = 0;
|
||||
float sstables_backlog_contribution = 0.0f;
|
||||
std::unordered_set<sstables::run_id> sstable_runs_contributing_backlog = {};
|
||||
|
||||
if (!all.empty()) {
|
||||
auto freeze = [] (const sstable_run& run) { return make_lw_shared<const sstable_run>(run); };
|
||||
for (auto& bucket : incremental_compaction_strategy::get_buckets(boost::copy_range<std::vector<frozen_sstable_run>>(all | boost::adaptors::map_values | boost::adaptors::transformed(freeze)), options)) {
|
||||
if (!incremental_compaction_strategy::is_bucket_interesting(bucket, threshold)) {
|
||||
continue;
|
||||
}
|
||||
for (const frozen_sstable_run& run_ptr : bucket) {
|
||||
auto& run = *run_ptr;
|
||||
auto data_size = run.data_size();
|
||||
if (data_size > 0) {
|
||||
total_backlog_bytes += data_size;
|
||||
sstables_backlog_contribution += data_size * log4(data_size);
|
||||
sstable_runs_contributing_backlog.insert((*run.all().begin())->run_identifier());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return backlog_calculation_result{
|
||||
.total_backlog_bytes = total_backlog_bytes,
|
||||
.sstables_backlog_contribution = sstables_backlog_contribution,
|
||||
.sstable_runs_contributing_backlog = std::move(sstable_runs_contributing_backlog),
|
||||
};
|
||||
}
|
||||
|
||||
incremental_backlog_tracker::incremental_backlog_tracker(incremental_compaction_strategy_options options) : _options(std::move(options)) {}
|
||||
|
||||
double incremental_backlog_tracker::backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const {
|
||||
inflight_component compacted = compacted_backlog(oc);
|
||||
|
||||
// Bail out if effective backlog is zero
|
||||
if (_total_backlog_bytes <= compacted.total_bytes) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Formula for each SSTable is (Si - Ci) * log(T / Si)
|
||||
// Which can be rewritten as: ((Si - Ci) * log(T)) - ((Si - Ci) * log(Si))
|
||||
//
|
||||
// For the meaning of each variable, please refer to the doc in size_tiered_backlog_tracker.hh
|
||||
|
||||
// Sum of (Si - Ci) for all SSTables contributing backlog
|
||||
auto effective_backlog_bytes = _total_backlog_bytes - compacted.total_bytes;
|
||||
|
||||
// Sum of (Si - Ci) * log (Si) for all SSTables contributing backlog
|
||||
auto sstables_contribution = _sstables_backlog_contribution - compacted.contribution;
|
||||
// This is subtracting ((Si - Ci) * log (Si)) from ((Si - Ci) * log(T)), yielding the final backlog
|
||||
auto b = (effective_backlog_bytes * log4(_total_bytes)) - sstables_contribution;
|
||||
return b > 0 ? b : 0;
|
||||
}
|
||||
|
||||
// Removing could be the result of a failure of an in progress write, successful finish of a
|
||||
// compaction, or some one-off operation, like drop
|
||||
void incremental_backlog_tracker::replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) {
|
||||
auto all = _all;
|
||||
auto total_bytes = _total_bytes;
|
||||
auto threshold = _threshold;
|
||||
auto backlog_calculation_result = incremental_backlog_tracker::backlog_calculation_result{};
|
||||
for (auto&& sst : new_ssts) {
|
||||
if (sst->data_size() > 0) {
|
||||
// note: we don't expect failed insertions since each sstable will be inserted once
|
||||
(void)all[sst->run_identifier()].insert(sst);
|
||||
total_bytes += sst->data_size();
|
||||
// Deduce threshold from the last SSTable added to the set
|
||||
threshold = sst->get_schema()->min_compaction_threshold();
|
||||
}
|
||||
}
|
||||
|
||||
bool exhausted_input_run = false;
|
||||
for (auto&& sst : old_ssts) {
|
||||
if (sst->data_size() > 0) {
|
||||
auto run_identifier = sst->run_identifier();
|
||||
all[run_identifier].erase(sst);
|
||||
if (all[run_identifier].all().empty()) {
|
||||
all.erase(run_identifier);
|
||||
exhausted_input_run = true;
|
||||
}
|
||||
total_bytes -= sst->data_size();
|
||||
}
|
||||
}
|
||||
// Backlog contribution will only be refreshed when an input SSTable run was exhausted by
|
||||
// compaction, so to avoid doing it for each exhausted fragment, which would be both
|
||||
// overkill and expensive.
|
||||
if (exhausted_input_run) {
|
||||
backlog_calculation_result = calculate_sstables_backlog_contribution(all, _options, threshold);
|
||||
}
|
||||
|
||||
// commit calculations
|
||||
std::invoke([&] () noexcept {
|
||||
_all = std::move(all);
|
||||
_total_bytes = total_bytes;
|
||||
_threshold = threshold;
|
||||
|
||||
if (exhausted_input_run) {
|
||||
_total_backlog_bytes = backlog_calculation_result.total_backlog_bytes;
|
||||
_sstables_backlog_contribution = backlog_calculation_result.sstables_backlog_contribution;
|
||||
_sstable_runs_contributing_backlog = std::move(backlog_calculation_result.sstable_runs_contributing_backlog);
|
||||
}
|
||||
});
|
||||
}
|
||||
61
compaction/incremental_backlog_tracker.hh
Normal file
61
compaction/incremental_backlog_tracker.hh
Normal file
@@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Copyright (C) 2019-present ScyllaDB
|
||||
*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cmath>
|
||||
|
||||
#include "compaction_backlog_manager.hh"
|
||||
#include "incremental_compaction_strategy.hh"
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
// The only difference to size tiered backlog tracker is that it will calculate
|
||||
// backlog contribution using total bytes of each sstable run instead of total
|
||||
// bytes of an individual sstable object.
|
||||
class incremental_backlog_tracker final : public compaction_backlog_tracker::impl {
|
||||
incremental_compaction_strategy_options _options;
|
||||
int64_t _total_bytes = 0;
|
||||
int64_t _total_backlog_bytes = 0;
|
||||
unsigned _threshold = 0;
|
||||
double _sstables_backlog_contribution = 0.0f;
|
||||
std::unordered_set<sstables::run_id> _sstable_runs_contributing_backlog;
|
||||
std::unordered_map<sstables::run_id, sstable_run> _all;
|
||||
|
||||
struct inflight_component {
|
||||
int64_t total_bytes = 0;
|
||||
double contribution = 0;
|
||||
};
|
||||
|
||||
inflight_component compacted_backlog(const compaction_backlog_tracker::ongoing_compactions& ongoing_compactions) const;
|
||||
|
||||
struct backlog_calculation_result {
|
||||
int64_t total_backlog_bytes;
|
||||
float sstables_backlog_contribution;
|
||||
std::unordered_set<sstables::run_id> sstable_runs_contributing_backlog;
|
||||
};
|
||||
|
||||
public:
|
||||
static double log4(double x) {
|
||||
static const double inv_log_4 = 1.0f / std::log(4);
|
||||
return log(x) * inv_log_4;
|
||||
}
|
||||
|
||||
static backlog_calculation_result calculate_sstables_backlog_contribution(const std::unordered_map<sstables::run_id, sstable_run>& all,
|
||||
const incremental_compaction_strategy_options& options, unsigned threshold);
|
||||
|
||||
incremental_backlog_tracker(incremental_compaction_strategy_options options);
|
||||
|
||||
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override;
|
||||
|
||||
// Removing could be the result of a failure of an in progress write, successful finish of a
|
||||
// compaction, or some one-off operation, like drop
|
||||
virtual void replace_sstables(const std::vector<sstables::shared_sstable>& old_ssts, const std::vector<sstables::shared_sstable>& new_ssts) override;
|
||||
|
||||
int64_t total_bytes() const {
|
||||
return _total_bytes;
|
||||
}
|
||||
};
|
||||
534
compaction/incremental_compaction_strategy.cc
Normal file
534
compaction/incremental_compaction_strategy.cc
Normal file
@@ -0,0 +1,534 @@
|
||||
/*
|
||||
* Copyright (C) 2019 ScyllaDB
|
||||
*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "sstables/sstables.hh"
|
||||
#include "sstables/sstable_set.hh"
|
||||
#include "cql3/statements/property_definitions.hh"
|
||||
#include "compaction.hh"
|
||||
#include "compaction_manager.hh"
|
||||
#include "incremental_compaction_strategy.hh"
|
||||
#include "incremental_backlog_tracker.hh"
|
||||
#include <boost/range/numeric.hpp>
|
||||
#include <boost/range/algorithm.hpp>
|
||||
#include <boost/range/adaptors.hpp>
|
||||
#include <ranges>
|
||||
|
||||
namespace sstables {
|
||||
|
||||
extern logging::logger clogger;
|
||||
|
||||
static long validate_min_sstable_size(const std::map<sstring, sstring>& options) {
|
||||
auto tmp_value = compaction_strategy_impl::get_value(options,
|
||||
incremental_compaction_strategy_options::MIN_SSTABLE_SIZE_KEY);
|
||||
auto min_sstable_size = cql3::statements::property_definitions::to_long(incremental_compaction_strategy_options::MIN_SSTABLE_SIZE_KEY,
|
||||
tmp_value, incremental_compaction_strategy_options::DEFAULT_MIN_SSTABLE_SIZE);
|
||||
if (min_sstable_size < 0) {
|
||||
throw exceptions::configuration_exception(fmt::format("{} value ({}) must be non negative",
|
||||
incremental_compaction_strategy_options::MIN_SSTABLE_SIZE_KEY, min_sstable_size));
|
||||
}
|
||||
return min_sstable_size;
|
||||
}
|
||||
|
||||
static long validate_min_sstable_size(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options) {
|
||||
auto min_sstable_size = validate_min_sstable_size(options);
|
||||
unchecked_options.erase(incremental_compaction_strategy_options::MIN_SSTABLE_SIZE_KEY);
|
||||
return min_sstable_size;
|
||||
}
|
||||
|
||||
static double validate_bucket_low(const std::map<sstring, sstring>& options) {
|
||||
auto tmp_value = compaction_strategy_impl::get_value(options,
|
||||
incremental_compaction_strategy_options::BUCKET_LOW_KEY);
|
||||
auto bucket_low = cql3::statements::property_definitions::to_double(incremental_compaction_strategy_options::BUCKET_LOW_KEY,
|
||||
tmp_value, incremental_compaction_strategy_options::DEFAULT_BUCKET_LOW);
|
||||
if (bucket_low <= 0.0 || bucket_low >= 1.0) {
|
||||
throw exceptions::configuration_exception(fmt::format("{} value ({}) must be between 0.0 and 1.0",
|
||||
incremental_compaction_strategy_options::BUCKET_LOW_KEY, bucket_low));
|
||||
}
|
||||
return bucket_low;
|
||||
}
|
||||
|
||||
static double validate_bucket_low(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options) {
|
||||
auto bucket_low = validate_bucket_low(options);
|
||||
unchecked_options.erase(incremental_compaction_strategy_options::BUCKET_LOW_KEY);
|
||||
return bucket_low;
|
||||
}
|
||||
|
||||
static double validate_bucket_high(const std::map<sstring, sstring>& options) {
|
||||
auto tmp_value = compaction_strategy_impl::get_value(options,
|
||||
incremental_compaction_strategy_options::BUCKET_HIGH_KEY);
|
||||
auto bucket_high = cql3::statements::property_definitions::to_double(incremental_compaction_strategy_options::BUCKET_HIGH_KEY,
|
||||
tmp_value, incremental_compaction_strategy_options::DEFAULT_BUCKET_HIGH);
|
||||
if (bucket_high <= 1.0) {
|
||||
throw exceptions::configuration_exception(fmt::format("{} value ({}) must be greater than 1.0",
|
||||
incremental_compaction_strategy_options::BUCKET_HIGH_KEY, bucket_high));
|
||||
}
|
||||
return bucket_high;
|
||||
}
|
||||
|
||||
static double validate_bucket_high(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options) {
|
||||
auto bucket_high = validate_bucket_high(options);
|
||||
unchecked_options.erase(incremental_compaction_strategy_options::BUCKET_HIGH_KEY);
|
||||
return bucket_high;
|
||||
}
|
||||
|
||||
static int validate_fragment_size(const std::map<sstring, sstring>& options) {
|
||||
auto tmp_value = compaction_strategy_impl::get_value(options,
|
||||
incremental_compaction_strategy::FRAGMENT_SIZE_OPTION);
|
||||
auto fragment_size_in_mb = cql3::statements::property_definitions::to_int(incremental_compaction_strategy::FRAGMENT_SIZE_OPTION,
|
||||
tmp_value, incremental_compaction_strategy::DEFAULT_MAX_FRAGMENT_SIZE_IN_MB);
|
||||
if (fragment_size_in_mb < 100) {
|
||||
clogger.warn("SStable size of {}MB is configured. The value may lead to high memory overhead due to sstables proliferation.", fragment_size_in_mb);
|
||||
}
|
||||
return fragment_size_in_mb;
|
||||
}
|
||||
|
||||
static int validate_fragment_size(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options) {
|
||||
auto fragment_size_in_mb = validate_fragment_size(options);
|
||||
unchecked_options.erase(incremental_compaction_strategy::FRAGMENT_SIZE_OPTION);
|
||||
return fragment_size_in_mb;
|
||||
}
|
||||
|
||||
static std::optional<double> validate_space_amplification_goal(const std::map<sstring, sstring>& options) {
|
||||
auto tmp_value = compaction_strategy_impl::get_value(options,
|
||||
incremental_compaction_strategy::SPACE_AMPLIFICATION_GOAL_OPTION);
|
||||
if (tmp_value) {
|
||||
auto space_amplification_goal = cql3::statements::property_definitions::to_double(incremental_compaction_strategy::SPACE_AMPLIFICATION_GOAL_OPTION,
|
||||
tmp_value, 0.0);
|
||||
if (space_amplification_goal <= 1.0 || space_amplification_goal > 2.0) {
|
||||
throw exceptions::configuration_exception(fmt::format("{} value ({}) must be greater than 1.0 and less than or equal to 2.0",
|
||||
incremental_compaction_strategy::SPACE_AMPLIFICATION_GOAL_OPTION, space_amplification_goal));
|
||||
}
|
||||
return space_amplification_goal;
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
static std::optional<double> validate_space_amplification_goal(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options) {
|
||||
auto space_amplification_goal = validate_space_amplification_goal(options);
|
||||
unchecked_options.erase(incremental_compaction_strategy::SPACE_AMPLIFICATION_GOAL_OPTION);
|
||||
return space_amplification_goal;
|
||||
}
|
||||
|
||||
incremental_compaction_strategy_options::incremental_compaction_strategy_options(const std::map<sstring, sstring>& options) {
|
||||
min_sstable_size = validate_min_sstable_size(options);
|
||||
bucket_low = validate_bucket_low(options);
|
||||
bucket_high = validate_bucket_high(options);
|
||||
}
|
||||
|
||||
// options is a map of compaction strategy options and their values.
|
||||
// unchecked_options is an analogical map from which already checked options are deleted.
|
||||
// This helps making sure that only allowed options are being set.
|
||||
void incremental_compaction_strategy_options::validate(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options) {
|
||||
validate_min_sstable_size(options, unchecked_options);
|
||||
auto bucket_low = validate_bucket_low(options, unchecked_options);
|
||||
auto bucket_high = validate_bucket_high(options, unchecked_options);
|
||||
if (bucket_high <= bucket_low) {
|
||||
throw exceptions::configuration_exception(fmt::format("{} value ({}) is less than or equal to the {} "
|
||||
"value ({})", BUCKET_HIGH_KEY, bucket_high, BUCKET_LOW_KEY, bucket_low));
|
||||
}
|
||||
validate_fragment_size(options, unchecked_options);
|
||||
validate_space_amplification_goal(options, unchecked_options);
|
||||
compaction_strategy_impl::validate_min_max_threshold(options, unchecked_options);
|
||||
}
|
||||
|
||||
uint64_t incremental_compaction_strategy::avg_size(std::vector<sstables::frozen_sstable_run>& runs) const {
|
||||
uint64_t n = 0;
|
||||
|
||||
if (runs.empty()) {
|
||||
return 0;
|
||||
}
|
||||
for (auto& r : runs) {
|
||||
n += r->data_size();
|
||||
}
|
||||
return n / runs.size();
|
||||
}
|
||||
|
||||
bool incremental_compaction_strategy::is_bucket_interesting(const std::vector<sstables::frozen_sstable_run>& bucket, size_t min_threshold) {
|
||||
return bucket.size() >= min_threshold;
|
||||
}
|
||||
|
||||
bool incremental_compaction_strategy::is_any_bucket_interesting(const std::vector<std::vector<sstables::frozen_sstable_run>>& buckets, size_t min_threshold) const {
|
||||
return boost::algorithm::any_of(buckets, [&] (const std::vector<sstables::frozen_sstable_run>& bucket) {
|
||||
return this->is_bucket_interesting(bucket, min_threshold);
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<sstable_run_and_length>
|
||||
incremental_compaction_strategy::create_run_and_length_pairs(const std::vector<sstables::frozen_sstable_run>& runs) {
|
||||
|
||||
std::vector<sstable_run_and_length> run_length_pairs;
|
||||
run_length_pairs.reserve(runs.size());
|
||||
|
||||
for(auto& r_ptr : runs) {
|
||||
auto& r = *r_ptr;
|
||||
assert(r.data_size() != 0);
|
||||
run_length_pairs.emplace_back(r_ptr, r.data_size());
|
||||
}
|
||||
|
||||
return run_length_pairs;
|
||||
}
|
||||
|
||||
std::vector<std::vector<sstables::frozen_sstable_run>>
|
||||
incremental_compaction_strategy::get_buckets(const std::vector<sstables::frozen_sstable_run>& runs, const incremental_compaction_strategy_options& options) {
|
||||
auto sorted_runs = create_run_and_length_pairs(runs);
|
||||
|
||||
std::sort(sorted_runs.begin(), sorted_runs.end(), [] (sstable_run_and_length& i, sstable_run_and_length& j) {
|
||||
return i.second < j.second;
|
||||
});
|
||||
|
||||
using bucket_type = std::vector<sstables::frozen_sstable_run>;
|
||||
std::vector<bucket_type> bucket_list;
|
||||
std::vector<double> bucket_average_size_list;
|
||||
|
||||
for (auto& pair : sorted_runs) {
|
||||
size_t size = pair.second;
|
||||
|
||||
// look for a bucket containing similar-sized runs:
|
||||
// group in the same bucket if it's w/in (bucket_low, bucket_high) of the average for this bucket,
|
||||
// or this file and the bucket are all considered "small" (less than `minSSTableSize`)
|
||||
if (!bucket_list.empty()) {
|
||||
auto& bucket_average_size = bucket_average_size_list.back();
|
||||
|
||||
if ((size > (bucket_average_size * options.bucket_low) && size < (bucket_average_size * options.bucket_high)) ||
|
||||
(size < options.min_sstable_size && bucket_average_size < options.min_sstable_size)) {
|
||||
auto& bucket = bucket_list.back();
|
||||
auto total_size = bucket.size() * bucket_average_size;
|
||||
auto new_average_size = (total_size + size) / (bucket.size() + 1);
|
||||
auto smallest_run_in_bucket = bucket[0]->data_size();
|
||||
|
||||
// SSTables are added in increasing size order so the bucket's
|
||||
// average might drift upwards.
|
||||
// Don't let it drift too high, to a point where the smallest
|
||||
// SSTable might fall out of range.
|
||||
if (size < options.min_sstable_size || smallest_run_in_bucket > new_average_size * options.bucket_low) {
|
||||
bucket.push_back(pair.first);
|
||||
bucket_average_size = new_average_size;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// no similar bucket found; put it in a new one
|
||||
bucket_type new_bucket = {pair.first};
|
||||
bucket_list.push_back(std::move(new_bucket));
|
||||
bucket_average_size_list.push_back(size);
|
||||
}
|
||||
|
||||
return bucket_list;
|
||||
}
|
||||
|
||||
std::vector<sstables::frozen_sstable_run>
|
||||
incremental_compaction_strategy::most_interesting_bucket(std::vector<std::vector<sstables::frozen_sstable_run>> buckets,
|
||||
size_t min_threshold, size_t max_threshold)
|
||||
{
|
||||
std::vector<sstable_run_bucket_and_length> interesting_buckets;
|
||||
interesting_buckets.reserve(buckets.size());
|
||||
|
||||
for (auto& bucket : buckets) {
|
||||
bucket.resize(std::min(bucket.size(), max_threshold));
|
||||
if (is_bucket_interesting(bucket, min_threshold)) {
|
||||
auto avg = avg_size(bucket);
|
||||
interesting_buckets.push_back({ std::move(bucket), avg });
|
||||
}
|
||||
}
|
||||
|
||||
if (interesting_buckets.empty()) {
|
||||
return std::vector<sstables::frozen_sstable_run>();
|
||||
}
|
||||
// Pick the bucket with more elements, as efficiency of same-tier compactions increases with number of files.
|
||||
auto& max = *std::max_element(interesting_buckets.begin(), interesting_buckets.end(),
|
||||
[] (sstable_run_bucket_and_length& i, sstable_run_bucket_and_length& j) {
|
||||
return i.first.size() < j.first.size();
|
||||
});
|
||||
return std::move(max.first);
|
||||
}
|
||||
|
||||
compaction_descriptor
|
||||
incremental_compaction_strategy::find_garbage_collection_job(const compaction::table_state& t, std::vector<size_bucket_t>& buckets) {
|
||||
auto worth_dropping_tombstones = [this, &t, now = db_clock::now()] (const sstable_run& run, gc_clock::time_point compaction_time) {
|
||||
if (run.all().empty()) {
|
||||
return false;
|
||||
}
|
||||
// for the purpose of checking if a run is stale, picking any fragment *composing the same run*
|
||||
// will be enough as the difference in write time is acceptable.
|
||||
auto run_write_time = (*run.all().begin())->data_file_write_time();
|
||||
// FIXME: hack to avoid infinite loop, get rid of it once the root cause is fixed.
|
||||
// Refs #3571.
|
||||
auto min_gc_compaction_interval = std::min(db_clock::duration(std::chrono::seconds(3600)), _tombstone_compaction_interval);
|
||||
if ((now - min_gc_compaction_interval) < run_write_time) {
|
||||
return false;
|
||||
}
|
||||
if (_unchecked_tombstone_compaction) {
|
||||
return true;
|
||||
}
|
||||
auto run_max_timestamp = std::ranges::max(run.all() | std::views::transform([] (const shared_sstable& sstable) {
|
||||
return sstable->get_stats_metadata().max_timestamp;
|
||||
}));
|
||||
bool satisfy_staleness = (now - _tombstone_compaction_interval) > run_write_time;
|
||||
// Staleness condition becomes mandatory if memtable's data is possibly shadowed by tombstones.
|
||||
if (run_max_timestamp >= t.min_memtable_timestamp() && !satisfy_staleness) {
|
||||
return false;
|
||||
}
|
||||
// If interval is not satisfied, we still consider tombstone GC if the gain outweighs the increased frequency.
|
||||
// By increasing threshold to a minimum of 0.5, we're only adding a maximum of 1 to write amp as we'll be halving
|
||||
// the SSTable, containing garbage, on every GC round.
|
||||
float actual_threshold = satisfy_staleness ? _tombstone_threshold : std::clamp(_tombstone_threshold * 2, 0.5f, 1.0f);
|
||||
|
||||
return run.estimate_droppable_tombstone_ratio(compaction_time, t.get_tombstone_gc_state(), t.schema()) >= actual_threshold;
|
||||
};
|
||||
auto compaction_time = gc_clock::now();
|
||||
auto can_garbage_collect = [&] (const size_bucket_t& bucket) {
|
||||
return boost::algorithm::any_of(bucket, [&] (const frozen_sstable_run& r) {
|
||||
return worth_dropping_tombstones(*r, compaction_time);
|
||||
});
|
||||
};
|
||||
|
||||
// To make sure that expired tombstones are persisted in a timely manner, ICS will cross-tier compact
|
||||
// two closest-in-size buckets such that tombstones will eventually reach the top of the LSM tree,
|
||||
// making it possible to purge them.
|
||||
|
||||
// Start from the largest tier as it's more likely to satisfy conditions for tombstones to be purged.
|
||||
auto it = buckets.rbegin();
|
||||
for (; it != buckets.rend(); it++) {
|
||||
if (can_garbage_collect(*it)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (it == buckets.rend()) {
|
||||
clogger.debug("ICS: nothing to garbage collect in {} buckets for {}.{}", buckets.size(), t.schema()->ks_name(), t.schema()->cf_name());
|
||||
return compaction_descriptor();
|
||||
}
|
||||
|
||||
size_bucket_t& first_bucket = *it;
|
||||
std::vector<sstables::frozen_sstable_run> input = std::move(first_bucket);
|
||||
|
||||
if (buckets.size() >= 2) {
|
||||
// If the largest tier needs GC, then compact it with the second largest.
|
||||
// Any smaller tier needing GC will be compacted with the larger and closest-in-size one.
|
||||
// It's done this way to reduce write amplification and satisfy conditions for purging tombstones.
|
||||
it = it == buckets.rbegin() ? std::next(it) : std::prev(it);
|
||||
|
||||
size_bucket_t& second_bucket = *it;
|
||||
|
||||
input.reserve(input.size() + second_bucket.size());
|
||||
std::move(second_bucket.begin(), second_bucket.end(), std::back_inserter(input));
|
||||
}
|
||||
clogger.debug("ICS: starting garbage collection on {} runs for {}.{}", input.size(), t.schema()->ks_name(), t.schema()->cf_name());
|
||||
|
||||
return compaction_descriptor(runs_to_sstables(std::move(input)), 0, _fragment_size);
|
||||
}
|
||||
|
||||
compaction_descriptor
|
||||
incremental_compaction_strategy::get_sstables_for_compaction(table_state& t, strategy_control& control) {
|
||||
auto candidates = control.candidates_as_runs(t);
|
||||
|
||||
// make local copies so they can't be changed out from under us mid-method
|
||||
size_t min_threshold = t.min_compaction_threshold();
|
||||
size_t max_threshold = t.schema()->max_compaction_threshold();
|
||||
|
||||
auto buckets = get_buckets(candidates);
|
||||
|
||||
if (is_any_bucket_interesting(buckets, min_threshold)) {
|
||||
std::vector<sstables::frozen_sstable_run> most_interesting = most_interesting_bucket(std::move(buckets), min_threshold, max_threshold);
|
||||
return sstables::compaction_descriptor(runs_to_sstables(std::move(most_interesting)), 0, _fragment_size);
|
||||
}
|
||||
// If we are not enforcing min_threshold explicitly, try any pair of sstable runs in the same tier.
|
||||
if (!t.compaction_enforce_min_threshold() && is_any_bucket_interesting(buckets, 2)) {
|
||||
std::vector<sstables::frozen_sstable_run> most_interesting = most_interesting_bucket(std::move(buckets), 2, max_threshold);
|
||||
return sstables::compaction_descriptor(runs_to_sstables(std::move(most_interesting)), 0, _fragment_size);
|
||||
}
|
||||
|
||||
// The cross-tier behavior is only triggered once we're done with all the pending same-tier compaction to
|
||||
// increase overall efficiency.
|
||||
if (control.has_ongoing_compaction(t)) {
|
||||
return sstables::compaction_descriptor();
|
||||
}
|
||||
|
||||
auto desc = find_garbage_collection_job(t, buckets);
|
||||
if (!desc.sstables.empty()) {
|
||||
return desc;
|
||||
}
|
||||
|
||||
if (_space_amplification_goal) {
|
||||
if (buckets.size() < 2) {
|
||||
return sstables::compaction_descriptor();
|
||||
}
|
||||
// Let S0 be the size of largest tier
|
||||
// Let S1 be the size of second-largest tier,
|
||||
// SA will be (S0 + S1) / S0
|
||||
|
||||
// Don't try SAG if there's an ongoing compaction, because if largest tier is being compacted,
|
||||
// SA would be calculated incorrectly, which may result in an unneeded cross-tier compaction.
|
||||
|
||||
auto find_two_largest_tiers = [this] (std::vector<size_bucket_t>&& buckets) -> std::tuple<size_bucket_t, size_bucket_t> {
|
||||
std::partial_sort(buckets.begin(), buckets.begin()+2, buckets.end(), [this] (size_bucket_t& i, size_bucket_t& j) {
|
||||
return avg_size(i) > avg_size(j); // descending order
|
||||
});
|
||||
return { std::move(buckets[0]), std::move(buckets[1]) };
|
||||
};
|
||||
|
||||
auto total_size = [] (const size_bucket_t& bucket) -> uint64_t {
|
||||
return boost::accumulate(bucket | boost::adaptors::transformed(std::mem_fn(&sstable_run::data_size)), uint64_t(0));
|
||||
};
|
||||
|
||||
auto [s0, s1] = find_two_largest_tiers(std::move(buckets));
|
||||
uint64_t s0_size = total_size(s0), s1_size = total_size(s1);
|
||||
double space_amplification = double(s0_size + s1_size) / s0_size;
|
||||
|
||||
if (space_amplification > _space_amplification_goal) {
|
||||
clogger.debug("ICS: doing cross-tier compaction of two largest tiers, to reduce SA {} to below SAG {}",
|
||||
space_amplification, *_space_amplification_goal);
|
||||
// Aims at reducing space amplification, to below SAG, by compacting together the two largest tiers
|
||||
std::vector<sstables::frozen_sstable_run> cross_tier_input = std::move(s0);
|
||||
cross_tier_input.reserve(cross_tier_input.size() + s1.size());
|
||||
std::move(s1.begin(), s1.end(), std::back_inserter(cross_tier_input));
|
||||
|
||||
return sstables::compaction_descriptor(runs_to_sstables(std::move(cross_tier_input)),
|
||||
0, _fragment_size);
|
||||
}
|
||||
}
|
||||
|
||||
return sstables::compaction_descriptor();
|
||||
}
|
||||
|
||||
compaction_descriptor
|
||||
incremental_compaction_strategy::get_major_compaction_job(table_state& t, std::vector<sstables::shared_sstable> candidates) {
|
||||
if (candidates.empty()) {
|
||||
return compaction_descriptor();
|
||||
}
|
||||
return make_major_compaction_job(std::move(candidates), 0, _fragment_size);
|
||||
}
|
||||
|
||||
int64_t incremental_compaction_strategy::estimated_pending_compactions(table_state& t) const {
|
||||
size_t min_threshold = t.schema()->min_compaction_threshold();
|
||||
size_t max_threshold = t.schema()->max_compaction_threshold();
|
||||
int64_t n = 0;
|
||||
|
||||
for (auto& bucket : get_buckets(t.main_sstable_set().all_sstable_runs())) {
|
||||
if (bucket.size() >= min_threshold) {
|
||||
n += (bucket.size() + max_threshold - 1) / max_threshold;
|
||||
}
|
||||
}
|
||||
return n;
|
||||
}
|
||||
|
||||
std::vector<shared_sstable>
|
||||
incremental_compaction_strategy::runs_to_sstables(std::vector<frozen_sstable_run> runs) {
|
||||
return boost::accumulate(runs, std::vector<shared_sstable>(), [&] (std::vector<shared_sstable>&& v, const frozen_sstable_run& run_ptr) {
|
||||
auto& run = *run_ptr;
|
||||
v.insert(v.end(), run.all().begin(), run.all().end());
|
||||
return std::move(v);
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<frozen_sstable_run>
|
||||
incremental_compaction_strategy::sstables_to_runs(std::vector<shared_sstable> sstables) {
|
||||
std::unordered_map<sstables::run_id, sstable_run> runs;
|
||||
for (auto&& sst : sstables) {
|
||||
// okay to ignore duplicates
|
||||
(void)runs[sst->run_identifier()].insert(std::move(sst));
|
||||
}
|
||||
auto freeze = [] (const sstable_run& run) { return make_lw_shared<const sstable_run>(run); };
|
||||
return boost::copy_range<std::vector<frozen_sstable_run>>(runs | boost::adaptors::map_values | boost::adaptors::transformed(freeze));
|
||||
}
|
||||
|
||||
void incremental_compaction_strategy::sort_run_bucket_by_first_key(size_bucket_t& bucket, size_t max_elements, const schema_ptr& schema) {
|
||||
std::partial_sort(bucket.begin(), bucket.begin() + max_elements, bucket.end(), [&schema](const frozen_sstable_run& a, const frozen_sstable_run& b) {
|
||||
auto sst_first_key_less = [&schema] (const shared_sstable& sst_a, const shared_sstable& sst_b) {
|
||||
return sst_a->get_first_decorated_key().tri_compare(*schema, sst_b->get_first_decorated_key()) <= 0;
|
||||
};
|
||||
auto& a_first = *boost::min_element(a->all(), sst_first_key_less);
|
||||
auto& b_first = *boost::min_element(b->all(), sst_first_key_less);
|
||||
return a_first->get_first_decorated_key().tri_compare(*schema, b_first->get_first_decorated_key()) <= 0;
|
||||
});
|
||||
}
|
||||
|
||||
compaction_descriptor
|
||||
incremental_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_config cfg) const {
|
||||
auto mode = cfg.mode;
|
||||
size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4);
|
||||
size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold));
|
||||
|
||||
if (mode == reshape_mode::relaxed) {
|
||||
offstrategy_threshold = max_sstables;
|
||||
}
|
||||
|
||||
auto run_count = boost::copy_range<std::unordered_set<run_id>>(input | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))).size();
|
||||
if (run_count >= offstrategy_threshold && mode == reshape_mode::strict) {
|
||||
std::sort(input.begin(), input.end(), [&schema] (const shared_sstable& a, const shared_sstable& b) {
|
||||
return dht::ring_position(a->get_first_decorated_key()).less_compare(*schema, dht::ring_position(b->get_first_decorated_key()));
|
||||
});
|
||||
// All sstables can be reshaped at once if the amount of overlapping will not cause memory usage to be high,
|
||||
// which is possible because partitioned set is able to incrementally open sstables during compaction
|
||||
if (sstable_set_overlapping_count(schema, input) <= max_sstables) {
|
||||
compaction_descriptor desc(std::move(input), 0/* level */, _fragment_size);
|
||||
desc.options = compaction_type_options::make_reshape();
|
||||
return desc;
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& bucket : get_buckets(sstables_to_runs(std::move(input)))) {
|
||||
if (bucket.size() >= offstrategy_threshold) {
|
||||
// preserve token contiguity by prioritizing runs with the lowest first keys.
|
||||
if (bucket.size() > max_sstables) {
|
||||
sort_run_bucket_by_first_key(bucket, max_sstables, schema);
|
||||
bucket.resize(max_sstables);
|
||||
}
|
||||
compaction_descriptor desc(runs_to_sstables(std::move(bucket)), 0/* level */, _fragment_size);
|
||||
desc.options = compaction_type_options::make_reshape();
|
||||
return desc;
|
||||
}
|
||||
}
|
||||
|
||||
return compaction_descriptor();
|
||||
}
|
||||
|
||||
std::vector<compaction_descriptor>
|
||||
incremental_compaction_strategy::get_cleanup_compaction_jobs(table_state& t, std::vector<shared_sstable> candidates) const {
|
||||
std::vector<compaction_descriptor> ret;
|
||||
const auto& schema = t.schema();
|
||||
unsigned max_threshold = schema->max_compaction_threshold();
|
||||
|
||||
for (auto& bucket : get_buckets(sstables_to_runs(std::move(candidates)))) {
|
||||
if (bucket.size() > max_threshold) {
|
||||
// preserve token contiguity
|
||||
sort_run_bucket_by_first_key(bucket, bucket.size(), schema);
|
||||
}
|
||||
auto it = bucket.begin();
|
||||
while (it != bucket.end()) {
|
||||
unsigned remaining = std::distance(it, bucket.end());
|
||||
unsigned needed = std::min(remaining, max_threshold);
|
||||
std::vector<frozen_sstable_run> runs;
|
||||
std::move(it, it + needed, std::back_inserter(runs));
|
||||
ret.push_back(compaction_descriptor(runs_to_sstables(std::move(runs)), 0/* level */, _fragment_size));
|
||||
std::advance(it, needed);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::unique_ptr<compaction_backlog_tracker::impl>
|
||||
incremental_compaction_strategy::make_backlog_tracker() const {
|
||||
return std::make_unique<incremental_backlog_tracker>(_options);
|
||||
}
|
||||
|
||||
incremental_compaction_strategy::incremental_compaction_strategy(const std::map<sstring, sstring>& options)
|
||||
: compaction_strategy_impl(options)
|
||||
, _options(options)
|
||||
{
|
||||
auto fragment_size_in_mb = validate_fragment_size(options);
|
||||
_fragment_size = fragment_size_in_mb*1024*1024;
|
||||
_space_amplification_goal = validate_space_amplification_goal(options);
|
||||
}
|
||||
|
||||
// options is a map of compaction strategy options and their values.
|
||||
// unchecked_options is an analogical map from which already checked options are deleted.
|
||||
// This helps making sure that only allowed options are being set.
|
||||
void incremental_compaction_strategy::validate_options(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options) {
|
||||
incremental_compaction_strategy_options::validate(options, unchecked_options);
|
||||
}
|
||||
|
||||
}
|
||||
112
compaction/incremental_compaction_strategy.hh
Normal file
112
compaction/incremental_compaction_strategy.hh
Normal file
@@ -0,0 +1,112 @@
|
||||
/*
|
||||
* Copyright (C) 2019 ScyllaDB
|
||||
*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "compaction_strategy_impl.hh"
|
||||
#include "compaction.hh"
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
#include <boost/range/adaptors.hpp>
|
||||
#include <boost/range/algorithm.hpp>
|
||||
#include <boost/algorithm/cxx11/any_of.hpp>
|
||||
#include "size_tiered_compaction_strategy.hh"
|
||||
|
||||
class incremental_backlog_tracker;
|
||||
|
||||
namespace sstables {
|
||||
|
||||
class incremental_compaction_strategy_options {
|
||||
public:
|
||||
static constexpr uint64_t DEFAULT_MIN_SSTABLE_SIZE = 50L * 1024L * 1024L;
|
||||
static constexpr double DEFAULT_BUCKET_LOW = 0.5;
|
||||
static constexpr double DEFAULT_BUCKET_HIGH = 1.5;
|
||||
|
||||
static constexpr auto MIN_SSTABLE_SIZE_KEY = "min_sstable_size";
|
||||
static constexpr auto BUCKET_LOW_KEY = "bucket_low";
|
||||
static constexpr auto BUCKET_HIGH_KEY = "bucket_high";
|
||||
private:
|
||||
uint64_t min_sstable_size = DEFAULT_MIN_SSTABLE_SIZE;
|
||||
double bucket_low = DEFAULT_BUCKET_LOW;
|
||||
double bucket_high = DEFAULT_BUCKET_HIGH;
|
||||
public:
|
||||
incremental_compaction_strategy_options(const std::map<sstring, sstring>& options);
|
||||
|
||||
incremental_compaction_strategy_options() {
|
||||
min_sstable_size = DEFAULT_MIN_SSTABLE_SIZE;
|
||||
bucket_low = DEFAULT_BUCKET_LOW;
|
||||
bucket_high = DEFAULT_BUCKET_HIGH;
|
||||
}
|
||||
|
||||
static void validate(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options);
|
||||
|
||||
friend class incremental_compaction_strategy;
|
||||
};
|
||||
|
||||
using sstable_run_and_length = std::pair<sstables::frozen_sstable_run, uint64_t>;
|
||||
using sstable_run_bucket_and_length = std::pair<std::vector<sstables::frozen_sstable_run>, uint64_t>;
|
||||
|
||||
class incremental_compaction_strategy : public compaction_strategy_impl {
|
||||
incremental_compaction_strategy_options _options;
|
||||
|
||||
using size_bucket_t = std::vector<sstables::frozen_sstable_run>;
|
||||
public:
|
||||
static constexpr int32_t DEFAULT_MAX_FRAGMENT_SIZE_IN_MB = 1000;
|
||||
static constexpr auto FRAGMENT_SIZE_OPTION = "sstable_size_in_mb";
|
||||
static constexpr auto SPACE_AMPLIFICATION_GOAL_OPTION = "space_amplification_goal";
|
||||
private:
|
||||
size_t _fragment_size = DEFAULT_MAX_FRAGMENT_SIZE_IN_MB*1024*1024;
|
||||
std::optional<double> _space_amplification_goal;
|
||||
static std::vector<sstable_run_and_length> create_run_and_length_pairs(const std::vector<sstables::frozen_sstable_run>& runs);
|
||||
|
||||
static std::vector<std::vector<sstables::frozen_sstable_run>> get_buckets(const std::vector<sstables::frozen_sstable_run>& runs, const incremental_compaction_strategy_options& options);
|
||||
|
||||
std::vector<std::vector<sstables::frozen_sstable_run>> get_buckets(const std::vector<sstables::frozen_sstable_run>& runs) const {
|
||||
return get_buckets(runs, _options);
|
||||
}
|
||||
|
||||
std::vector<sstables::frozen_sstable_run>
|
||||
most_interesting_bucket(std::vector<std::vector<sstables::frozen_sstable_run>> buckets, size_t min_threshold, size_t max_threshold);
|
||||
|
||||
uint64_t avg_size(std::vector<sstables::frozen_sstable_run>& runs) const;
|
||||
|
||||
static bool is_bucket_interesting(const std::vector<sstables::frozen_sstable_run>& bucket, size_t min_threshold);
|
||||
|
||||
bool is_any_bucket_interesting(const std::vector<std::vector<sstables::frozen_sstable_run>>& buckets, size_t min_threshold) const;
|
||||
|
||||
compaction_descriptor find_garbage_collection_job(const table_state& t, std::vector<size_bucket_t>& buckets);
|
||||
|
||||
static std::vector<shared_sstable> runs_to_sstables(std::vector<frozen_sstable_run> runs);
|
||||
static std::vector<frozen_sstable_run> sstables_to_runs(std::vector<shared_sstable> sstables);
|
||||
static void sort_run_bucket_by_first_key(size_bucket_t& bucket, size_t max_elements, const schema_ptr& schema);
|
||||
public:
|
||||
incremental_compaction_strategy() = default;
|
||||
|
||||
incremental_compaction_strategy(const std::map<sstring, sstring>& options);
|
||||
|
||||
static void validate_options(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options);
|
||||
|
||||
virtual compaction_descriptor get_sstables_for_compaction(table_state& t, strategy_control& control) override;
|
||||
|
||||
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(table_state& t, std::vector<shared_sstable> candidates) const override;
|
||||
|
||||
virtual compaction_descriptor get_major_compaction_job(table_state& t, std::vector<sstables::shared_sstable> candidates) override;
|
||||
|
||||
virtual int64_t estimated_pending_compactions(table_state& t) const override;
|
||||
|
||||
virtual compaction_strategy_type type() const override {
|
||||
return compaction_strategy_type::incremental;
|
||||
}
|
||||
|
||||
virtual std::unique_ptr<compaction_backlog_tracker::impl> make_backlog_tracker() const override;
|
||||
|
||||
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_config cfg) const override;
|
||||
|
||||
virtual std::unique_ptr<sstable_set_impl> make_sstable_set(schema_ptr schema) const override;
|
||||
|
||||
friend class ::incremental_backlog_tracker;
|
||||
};
|
||||
|
||||
}
|
||||
@@ -491,6 +491,7 @@ scylla_tests = set([
|
||||
'test/boost/hashers_test',
|
||||
'test/boost/hint_test',
|
||||
'test/boost/idl_test',
|
||||
'test/boost/incremental_compaction_test',
|
||||
'test/boost/index_reader_test',
|
||||
'test/boost/input_stream_test',
|
||||
'test/boost/intrusive_array_test',
|
||||
@@ -833,6 +834,8 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'compaction/task_manager_module.cc',
|
||||
'compaction/time_window_compaction_strategy.cc',
|
||||
'compaction/compaction_manager.cc',
|
||||
'compaction/incremental_compaction_strategy.cc',
|
||||
'compaction/incremental_backlog_tracker.cc',
|
||||
'sstables/integrity_checked_file_impl.cc',
|
||||
'sstables/prepended_input_stream.cc',
|
||||
'sstables/m_format_read_helpers.cc',
|
||||
|
||||
@@ -385,9 +385,9 @@ public:
|
||||
try {
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy::type(strategy));
|
||||
} catch (const exceptions::configuration_exception& e) {
|
||||
// If compaction strategy class isn't supported, fallback to size tiered.
|
||||
mlogger.warn("Falling back to size-tiered compaction strategy after the problem: {}", e.what());
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
|
||||
// If compaction strategy class isn't supported, fallback to incremental.
|
||||
mlogger.warn("Falling back to incremental compaction strategy after the problem: {}", e.what());
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::incremental);
|
||||
}
|
||||
}
|
||||
if (td.has("compaction_strategy_options")) {
|
||||
|
||||
@@ -2056,9 +2056,9 @@ static void prepare_builder_from_table_row(const schema_ctxt& ctxt, schema_build
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy::type(i->second));
|
||||
map.erase(i);
|
||||
} catch (const exceptions::configuration_exception& e) {
|
||||
// If compaction strategy class isn't supported, fallback to size tiered.
|
||||
slogger.warn("Falling back to size-tiered compaction strategy after the problem: {}", e.what());
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
|
||||
// If compaction strategy class isn't supported, fallback to incremental.
|
||||
slogger.warn("Falling back to incremental compaction strategy after the problem: {}", e.what());
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::incremental);
|
||||
}
|
||||
}
|
||||
if (map.contains("max_threshold")) {
|
||||
|
||||
@@ -808,7 +808,7 @@ schema_ptr system_keyspace::v3::batches() {
|
||||
// FIXME: the original Java code also had:
|
||||
//.copy(new LocalPartitioner(TimeUUIDType.instance))
|
||||
builder.set_gc_grace_seconds(0);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::incremental);
|
||||
builder.set_compaction_strategy_options({{"min_threshold", "2"}});
|
||||
builder.with_hash_version();
|
||||
return builder.build(schema_builder::compact_storage::no);
|
||||
@@ -1292,7 +1292,7 @@ schema_ptr system_keyspace::legacy::hints() {
|
||||
"*DEPRECATED* hints awaiting delivery"
|
||||
);
|
||||
builder.set_gc_grace_seconds(0);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::incremental);
|
||||
builder.set_compaction_strategy_options({{"enabled", "false"}});
|
||||
builder.with(schema_builder::compact_storage::yes);
|
||||
builder.with_hash_version();
|
||||
@@ -1318,7 +1318,7 @@ schema_ptr system_keyspace::legacy::batchlog() {
|
||||
"*DEPRECATED* batchlog entries"
|
||||
);
|
||||
builder.set_gc_grace_seconds(0);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::incremental);
|
||||
builder.set_compaction_strategy_options({{"min_threshold", "2"}});
|
||||
builder.with(schema_builder::compact_storage::no);
|
||||
builder.with_hash_version();
|
||||
|
||||
@@ -70,7 +70,36 @@ Set the parameters for :ref:`Leveled Compaction <leveled-compaction-strategy-lcs
|
||||
Incremental Compaction Strategy (ICS)
|
||||
=====================================
|
||||
|
||||
ICS is only available in ScyllaDB Enterprise. See the `ScyllaDB Enetrpise documentation <https://enterprise.docs.scylladb.com/stable/architecture/compaction/compaction-strategies.html>`_ for details.
|
||||
.. versionadded:: 2019.1.4 Scylla Enterprise
|
||||
|
||||
ICS principles of operation are similar to those of STCS, merely replacing the increasingly larger SSTables in each tier, by increasingly longer SSTable runs, modeled after LCS runs, but using larger fragment size of 1 GB, by default.
|
||||
|
||||
Compaction is triggered when there are two or more runs of roughly the same size. These runs are incrementally compacted with each other, producing a new SSTable run, while incrementally releasing space as soon as each SSTable in the input run is processed and compacted. This method eliminates the high temporary space amplification problem of STCS by limiting the overhead to twice the (constant) fragment size, per shard.
|
||||
|
||||
Incremental Compaction Strategy benefits
|
||||
----------------------------------------
|
||||
* Greatly reduces the temporary space amplification which is typical of STCS, resulting in more disk space being available for storing user data.
|
||||
* The space requirement for a major compaction with ICS is almost non-existent given that the operation can release fragments at roughly same rate it produces new ones.
|
||||
|
||||
If you look at the following screenshot the green line shows how disk usage behaves under ICS when major compaction is issued.
|
||||
|
||||
.. image:: /architecture/compaction/screenshot.png
|
||||
|
||||
Incremental Compaction Strategy disadvantages
|
||||
----------------------------------------------
|
||||
|
||||
* Since ICS principles of operation are the same as STCS, its disadvantages are similar to STCS's, except for the temporary space amplification issue.
|
||||
|
||||
Namely:
|
||||
|
||||
* Continuously modifying existing rows results in each row being split across several SSTables, making reads slow, which doesn’t happen in Leveled compaction.
|
||||
* Obsolete data (overwritten or deleted columns) may accumulate across tiers, wasting space, for a long time, until it is finally merged. This can be mitigated by running major compaction from time to time.
|
||||
|
||||
**To implement this strategy**
|
||||
|
||||
Set the parameters for :ref:`Incremental Compaction <incremental-compaction-strategy-ics>`.
|
||||
|
||||
For more information, see the :ref:`Compaction KB Article <incremental-compaction-strategy-ics>`.
|
||||
|
||||
.. _TWCS1:
|
||||
|
||||
|
||||
@@ -170,7 +170,104 @@ LCS options
|
||||
Incremental Compaction Strategy (ICS)
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
ICS is only available in ScyllaDB Enterprise. See the `ScyllaDB Enetrpise documentation <https://enterprise.docs.scylladb.com/stable/cql/compaction.html>`_ for details.
|
||||
.. versionadded:: 2019.1.4 Scylla Enterprise
|
||||
|
||||
When using ICS, SSTable runs are put in different buckets depending on their size.
|
||||
When an SSTable run is bucketed, the average size of the runs in the bucket is compared to the new run, as well as the ``bucket_high`` and ``bucket_low`` levels.
|
||||
|
||||
|
||||
The database compares each SSTable-run size to the average of all SSTable-run sizes on all buckets in the node.
|
||||
It calculates ``bucket_low * avg_bucket_size`` and ``bucket_high * avg_bucket_size`` and then compares the result with the ``average SSTable-run size``.
|
||||
The conditions set for ``bucket_high`` and ``bucket_low`` dictate if successive runs will be added to the same bucket.
|
||||
When compaction begins it merges SSTable runs whose size in KB are within ``[average-size * bucket_low]`` and ``[average-size * bucket_high]``.
|
||||
|
||||
|
||||
Once there are multiple runs in a bucket, minor compaction begins.
|
||||
The minimum number of SSTable runs that triggers minor compaction is either 2 or ``min_threshold``, if the ``compaction_enforce_min_threshold``
|
||||
configuration option is set in the scylla.yaml configuration file.
|
||||
|
||||
.. _ics-options:
|
||||
|
||||
ICS options
|
||||
~~~~~~~~~~~~
|
||||
|
||||
The following options only apply to IncrementalCompactionStrategy:
|
||||
|
||||
.. code-block:: cql
|
||||
|
||||
compaction = {
|
||||
'class' : 'IncrementalCompactionStrategy',
|
||||
'bucket_high' : factor,
|
||||
'bucket_low' : factor,
|
||||
'min_sstable_size' : int,
|
||||
'min_threshold' : num_sstables,
|
||||
'max_threshold' : num_sstables,
|
||||
'sstable_size_in_mb' : int,
|
||||
'space_amplification_goal' : double}
|
||||
|
||||
=====
|
||||
|
||||
``bucket_high`` (default: 1.5)
|
||||
A new SSTable is added to the bucket if the SSTable size is **less than**
|
||||
bucket_high * the average size of that bucket (and if the bucket_low condition also holds).
|
||||
For example, if **'bucket_high = 1.5'** and the **SSTable size = 14MB**, does the SSTable belong to a bucket with an average size of 10MB?
|
||||
Yes, because the **SSTable size = 14**, which is **less** than **'bucket_high' * average bucket size = 15**.
|
||||
So, the SSTable will be added to the bucket, and the bucket’s average size will be recalculated.
|
||||
|
||||
=====
|
||||
|
||||
``bucket_low`` (default: 0.5)
|
||||
A new SSTable is added to the bucket if the SSTable size is **more than**
|
||||
bucket_low * the average size of that bucket (and if the bucket_high condition also holds).
|
||||
For example, if **'bucket_high = 0.5'** and the **SSTable size is 10MB**, does the SSTable belong to a bucket with an average size of 15MB?
|
||||
Yes, because the **SSTable size = 10**, which is **more** than **'bucket_low' * average bucket size = 7.5**.
|
||||
So, the SSTable will be added to the bucket, and the bucket’s average size will be recalculated.
|
||||
|
||||
=====
|
||||
|
||||
``min_sstable_size`` (default: 50)
|
||||
All SSTables smaller than this number of megabytes are put into the same bucket.
|
||||
|
||||
Unlike Apache Cassandra, scylla uses **uncompressed** size when bucketing similar-sized tiers together.
|
||||
Since compaction works on uncompressed data, SSTables containing similar amounts of data should be compacted together, even when they have different compression ratios.
|
||||
|
||||
=====
|
||||
|
||||
``min_threshold`` (default: 4)
|
||||
Minimum number of SSTable runs that need to belong to the same size bucket before compaction is triggered on that bucket.
|
||||
|
||||
.. note:: Enforcement of ``min_threshold`` is controlled by the ``compaction_enforce_min_threshold`` configuration option in the scylla.yaml configuration settings.
|
||||
By default, ``compaction_enforce_min_threshold=false``, meaning the Incremental Compaction Strategy will compact any bucket containing 2 or more SSTable runs.
|
||||
Otherwise, if ``compaction_enforce_min_threshold=true``, the value of ``min_threshold`` is considered and only those buckets that contain at
|
||||
least ``min_threshold`` SSTable runs will be compacted.
|
||||
|
||||
=====
|
||||
|
||||
``max_threshold`` (default: 32)
|
||||
Maximum number of SSTables that will be compacted together in one compaction step.
|
||||
|
||||
=====
|
||||
|
||||
``sstable_size_in_mb`` (default: 1000)
|
||||
This is the target size in megabytes, that will be used as the goal for an SSTable size (fragment size) following a compression.
|
||||
|
||||
.. _SAG:
|
||||
|
||||
=====
|
||||
|
||||
``space_amplification_goal`` (default: null)
|
||||
|
||||
:label-tip:`ScyllaDB Enterprise`
|
||||
|
||||
.. versionadded:: 2020.1.6
|
||||
|
||||
This is a threshold of the ratio of the sum of the sizes of the two largest tiers to the size of the largest tier,
|
||||
above which ICS will automatically compact the second largest and largest tiers together to eliminate stale data that may have been overwritten, expired, or deleted.
|
||||
The space_amplification_goal is given as a double-precision floating point number that must be greater than 1.0.
|
||||
|
||||
For example, if **'space_amplification_goal = 1.25'** and the largest tier holds **1000GB**,
|
||||
when the second-largest tier accumulates SSTables with the total size of 250GB or more,
|
||||
the ``space_amplification_goal`` threshold is crossed and all the SSTables in the largest and second-largest tiers will be compacted together.
|
||||
|
||||
=====
|
||||
|
||||
|
||||
@@ -756,7 +756,7 @@ Custom strategy can be provided by specifying the full class name as a :ref:`str
|
||||
<constants>`.
|
||||
|
||||
All default strategies support a number of common options, as well as options specific to
|
||||
the strategy chosen (see the section corresponding to your strategy for details: :ref:`STCS <stcs-options>`, :ref:`LCS <lcs-options>`, and :ref:`TWCS <twcs-options>`).
|
||||
the strategy chosen (see the section corresponding to your strategy for details: :ref:`STCS <stcs-options>`, :ref:`LCS <lcs-options>`, :ref:`ICS <ics-options>`, and :ref:`TWCS <twcs-options>`).
|
||||
|
||||
.. _cql-compression-options:
|
||||
|
||||
|
||||
@@ -99,7 +99,7 @@ Time-window Compaction Strategy (TWCS) 50% 70%
|
||||
Incremental Compaction Strategy (ICS) 70% 80%
|
||||
====================================== =========== ============
|
||||
|
||||
Use the default ICS unless you'll have a clear understanding that another strategy is better for your use case. More on :doc:`choosing a Compaction Strategy </architecture/compaction/compaction-strategies>`.
|
||||
Use the default ICS unless you'll have a clear understanding that another strategy is better for your use case. More on :doc:`choosing a Compaction Strategy </architecture/compaction/compaction-strategies>`.
|
||||
In order to maintain a high level of service availability, keep 50% to 20% free disk space at all times!
|
||||
|
||||
.. _system-requirements-network:
|
||||
|
||||
@@ -117,7 +117,50 @@ Likewise, when :term:`bootstrapping<Bootstrap>` a new node, SSTables are streame
|
||||
|
||||
Incremental Compaction Strategy (ICS) :label-tip:`ScyllaDB Enterprise`
|
||||
------------------------------------------------------------------------
|
||||
ICS is only available in ScyllaDB Enterprise. See the `ScyllaDB Enetrpise documentation <https://enterprise.docs.scylladb.com/stable/kb/compaction.html>`_ for details.
|
||||
|
||||
.. versionadded:: 2019.1.4
|
||||
|
||||
One of the issues with Size-tiered compaction is that it needs temporary space because SSTables are not removed until they are fully compacted. ICS takes a different approach and splits each large SSTable into a run of sorted, fixed-size (by default 1 GB) SSTables (a.k.a. fragments) in the same way that LCS does, except it treats the entire run and not the individual SSTables as the sizing file for STCS. As the run-fragments are small, the SSTables compact quickly, allowing individual SSTables to be removed as soon as they are compacted. This approach uses low amounts of memory and temporary disk space.
|
||||
|
||||
ICS uses the same philosophy as STCS, where the SSTables are sorted in buckets according to their size. However, unlike STCS, ICS compaction uses SSTable runs as input, and produces a new run as output. It doesn't matter if a run is composed of only one fragment that could have come from STCS migration. From an incremental compaction perspective, everything is a run.
|
||||
|
||||
The strategy works as follows:
|
||||
|
||||
#. ICS looks for candidates for compaction that are similar in size. These candidates are called ``Input Runs``.
|
||||
|
||||
* The input runs may contain one or more SSTables each.
|
||||
|
||||
#. ICS compacts two or more similar-sized input runs into a single ``Output run`` (* See note_ ).
|
||||
#. Incremental Compaction progressively works on two or more fragments at a time, one from each input run.
|
||||
|
||||
* It reads mutations from all input fragments and merges them together into a single output fragment.
|
||||
* As long as the resulting fragment is smaller than the ``sstable_size_in_mb``, no further action is needed.
|
||||
* If the fragment is larger than the ``sstable_size_in_mb``:
|
||||
|
||||
1. Stop when the size threshold is reached, and seal the output fragment.
|
||||
2. Create a new run fragment and continue compacting the remaining input fragments, until the size threshold is reached.
|
||||
3. When an input fragment is exhausted, take it out of the list of SSTables to compact, and delete it from disk.
|
||||
4. Repeat until there are no input fragments left.
|
||||
|
||||
#. Take all of the output fragments and feed them back into compaction as an SSTable run.
|
||||
#. Stop when all fragments from input runs were exhausted and released.
|
||||
|
||||
.. _note:
|
||||
.. note:: To prevent data resurrection in case scylla crashes in the middle of compaction, ICS may possibly write an auxiliary run containing purgeable tombstones in addition to the output run containing live data.
|
||||
These tombstones are kept on disk while there are SSTables containing data that the tombstones may shadow. Once compaction is done, deleting all shadowed data from all SSTables, the purgeable tombstones are purged and the SSTables holding them are removed from storage.
|
||||
|
||||
.. image:: ics-incremental-compaction.png
|
||||
|
||||
Incremental compaction as a solution for temporary space overhead in STCS
|
||||
.........................................................................
|
||||
|
||||
We fixed the temporary space overhead on STCS by applying the incremental compaction approach to it, which resulted in the creation of Incremental Compaction Strategy (ICS). The compacted SSTables, that become increasingly larger over time with STCS, are replaced with sorted runs of SSTable fragments, together called “SSTable runs” – which is a concept borrowed from Leveled Compaction Strategy (LCS).
|
||||
|
||||
Each fragment is a roughly fixed size (aligned to partition boundaries) SSTable and it holds a unique range of keys, a portion of the whole SSTable run. Note that as the SSTable-runs in ICS hold exactly the same data as the corresponding SSTables created by STCS, they become increasingly longer over time (holding more fragments), in the same way that SSTables grow in size with STCS, yet the ICS SSTable fragments’ size remains the same.
|
||||
|
||||
For example, when compacting two SSTables (or SSTable runs) holding 7GB each: instead of writing up to 14GB into a single SSTable file, we’ll break the output SSTable into a run of 14 x 1GB fragments (fragment size is 1GB by default).
|
||||
|
||||
.. image:: compaction-incremental.png
|
||||
|
||||
.. _time-window-compactionstrategy-twcs:
|
||||
|
||||
|
||||
@@ -41,6 +41,7 @@ Knowledge Base
|
||||
* :doc:`Increase Cache to Avoid Non-paged Queries </kb/increase-permission-cache>` - How to increase the ``permissions_cache_max_entries`` setting.
|
||||
* :doc:`How to Safely Increase the Replication Factor </kb/rf-increase>`
|
||||
* :doc:`Facts about TTL, Compaction, and gc_grace_seconds <ttl-facts>`
|
||||
* :doc:`Efficient Tombstone Garbage Collection in ICS <garbage-collection-ics>`
|
||||
|
||||
**Note**: The KB article for social readers has been *removed*. Instead, please look at lessons on `ScyllaDB University <https://university.scylladb.com/>`_ or the `Care Pet example <https://care-pet.docs.scylladb.com/master/>`_
|
||||
|
||||
|
||||
@@ -22,6 +22,17 @@ Choose a Compaction Strategy
|
||||
|
||||
Each workload may require a specific strategy. Refer to :doc:`Choose a Compaction Strategy </architecture/compaction/compaction-strategies>` for details.
|
||||
|
||||
Incremental Compaction Strategy (ICS)
|
||||
.....................................
|
||||
|
||||
We highly recommend using ICS (the default setting) for any table that you have.
|
||||
You will have much less Space Amplification with ICS as it only requires 25% additional storage, as opposed to STCS which requires 50% more.
|
||||
|
||||
.. note:: ICS is the default compaction strategy setting for Scylla Enterprise versions 2020.1 and higher.
|
||||
|
||||
* Refer to :ref:`Incremental Compaction Strategy <ICS1>` for an overview of the benefits.
|
||||
* Refer to :ref:`Incremental Compaction Strategy Overview <incremental-compaction-strategy-ics>` for a description of how it works.
|
||||
|
||||
Resiliency
|
||||
----------
|
||||
|
||||
|
||||
@@ -551,7 +551,7 @@ private:
|
||||
int32_t _memtable_flush_period = 0;
|
||||
::speculative_retry _speculative_retry = ::speculative_retry(speculative_retry::type::PERCENTILE, 0.99);
|
||||
// This is the compaction strategy that will be used by default on tables which don't have one explicitly specified.
|
||||
sstables::compaction_strategy_type _compaction_strategy = sstables::compaction_strategy_type::size_tiered;
|
||||
sstables::compaction_strategy_type _compaction_strategy = sstables::compaction_strategy_type::incremental;
|
||||
std::map<sstring, sstring> _compaction_strategy_options;
|
||||
bool _compaction_enabled = true;
|
||||
::caching_options _caching_options;
|
||||
|
||||
@@ -114,6 +114,8 @@ add_scylla_test(hint_test
|
||||
add_scylla_test(idl_test
|
||||
KIND BOOST
|
||||
LIBRARIES idl)
|
||||
add_scylla_test(incremental_compaction_test
|
||||
KIND SEASTAR)
|
||||
add_scylla_test(index_reader_test
|
||||
KIND SEASTAR)
|
||||
add_scylla_test(input_stream_test
|
||||
|
||||
525
test/boost/incremental_compaction_test.cc
Normal file
525
test/boost/incremental_compaction_test.cc
Normal file
@@ -0,0 +1,525 @@
|
||||
/*
|
||||
* Copyright (C) 2019 ScyllaDB
|
||||
*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/future-util.hh>
|
||||
#include <seastar/core/do_with.hh>
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include "sstables/sstables.hh"
|
||||
#include "compaction/incremental_compaction_strategy.hh"
|
||||
#include "schema/schema.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "compaction/compaction_manager.hh"
|
||||
#include "sstable_test.hh"
|
||||
#include "sstables/metadata_collector.hh"
|
||||
#include "test/lib/tmpdir.hh"
|
||||
#include "cell_locking.hh"
|
||||
#include "test/lib/mutation_reader_assertions.hh"
|
||||
#include "test/lib/key_utils.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "test/lib/sstable_run_based_compaction_strategy_for_tests.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "dht/murmur3_partitioner.hh"
|
||||
#include "db/large_data_handler.hh"
|
||||
#include "db/config.hh"
|
||||
|
||||
#include "test/lib/sstable_utils.hh"
|
||||
#include "test/lib/test_services.hh"
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
static mutation_reader sstable_reader(reader_permit permit, shared_sstable sst, schema_ptr s) {
|
||||
return sst->as_mutation_source().make_reader_v2(s, std::move(permit), query::full_partition_range, s->full_slice());
|
||||
|
||||
}
|
||||
|
||||
class strategy_control_for_test : public strategy_control {
|
||||
bool _has_ongoing_compaction;
|
||||
public:
|
||||
explicit strategy_control_for_test(bool has_ongoing_compaction) noexcept : _has_ongoing_compaction(has_ongoing_compaction) {}
|
||||
|
||||
bool has_ongoing_compaction(table_state& table_s) const noexcept override {
|
||||
return _has_ongoing_compaction;
|
||||
}
|
||||
virtual std::vector<sstables::shared_sstable> candidates(table_state& t) const override {
|
||||
return boost::copy_range<std::vector<sstables::shared_sstable>>(*t.main_sstable_set().all());
|
||||
}
|
||||
virtual std::vector<sstables::frozen_sstable_run> candidates_as_runs(table_state& t) const override {
|
||||
return t.main_sstable_set().all_sstable_runs();
|
||||
}
|
||||
};
|
||||
|
||||
static std::unique_ptr<strategy_control> make_strategy_control_for_test(bool has_ongoing_compaction) {
|
||||
return std::make_unique<strategy_control_for_test>(has_ongoing_compaction);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(incremental_compaction_test) {
|
||||
return sstables::test_env::do_with_async([&] (sstables::test_env& env) {
|
||||
auto builder = schema_builder("tests", "incremental_compaction_test")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("value", int32_type)
|
||||
.with_partitioner("org.apache.cassandra.dht.Murmur3Partitioner")
|
||||
.with_sharder(smp::count, 0);
|
||||
auto s = builder.build();
|
||||
|
||||
auto tmp = make_lw_shared<tmpdir>();
|
||||
auto sst_gen = [&env, s, tmp] () mutable {
|
||||
auto sst = env.make_sstable(s, tmp->path().string(), env.new_generation(), sstable_version_types::md, big);
|
||||
return sst;
|
||||
};
|
||||
|
||||
table_for_tests cf = env.make_table_for_tests(s, tmp->path().string());
|
||||
auto close_cf = deferred_stop(cf);
|
||||
cf->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
|
||||
auto compact = [&, s] (std::vector<shared_sstable> all, auto replacer) -> std::vector<shared_sstable> {
|
||||
auto desc = sstables::compaction_descriptor(std::move(all), 1, 0);
|
||||
desc.enable_garbage_collection(cf->get_sstable_set());
|
||||
return compact_sstables(env, std::move(desc), cf, sst_gen, replacer).get().new_sstables;
|
||||
};
|
||||
auto make_insert = [&] (auto p) {
|
||||
auto key = p.key();
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), 1 /* ts */);
|
||||
BOOST_REQUIRE(m.decorated_key().token() == p.token());
|
||||
return m;
|
||||
};
|
||||
|
||||
auto tokens = tests::generate_partition_keys(16, s, local_shard_only::yes, tests::key_size{8, 8});
|
||||
std::unordered_set<shared_sstable> sstables;
|
||||
std::vector<utils::observer<sstable&>> observers;
|
||||
sstables::sstable_run_based_compaction_strategy_for_tests cs;
|
||||
|
||||
auto do_replace = [&] (const std::vector<shared_sstable>& old_sstables, const std::vector<shared_sstable>& new_sstables) {
|
||||
for (auto& old_sst : old_sstables) {
|
||||
BOOST_REQUIRE(sstables.count(old_sst));
|
||||
sstables.erase(old_sst);
|
||||
}
|
||||
for (auto& new_sst : new_sstables) {
|
||||
BOOST_REQUIRE(!sstables.count(new_sst));
|
||||
sstables.insert(new_sst);
|
||||
}
|
||||
column_family_test(cf).rebuild_sstable_list(cf.as_table_state(), new_sstables, old_sstables).get();
|
||||
env.test_compaction_manager().propagate_replacement(cf.as_table_state(), old_sstables, new_sstables);
|
||||
};
|
||||
|
||||
auto do_incremental_replace = [&] (auto old_sstables, auto new_sstables, auto& expected_sst, auto& closed_sstables_tracker) {
|
||||
// that's because each sstable will contain only 1 mutation.
|
||||
BOOST_REQUIRE(old_sstables.size() == 1);
|
||||
BOOST_REQUIRE(new_sstables.size() == 1);
|
||||
auto old_sstable = old_sstables.front();
|
||||
// check that sstable replacement follows token order
|
||||
BOOST_REQUIRE(*expected_sst == old_sstable->generation());
|
||||
expected_sst++;
|
||||
// check that previously released sstables were already closed
|
||||
BOOST_REQUIRE(*closed_sstables_tracker == old_sstable->generation());
|
||||
|
||||
do_replace(old_sstables, new_sstables);
|
||||
|
||||
observers.push_back(old_sstable->add_on_closed_handler([&] (sstable& sst) {
|
||||
BOOST_TEST_MESSAGE(fmt::format("Closing sstable of generation {}", sst.generation()));
|
||||
closed_sstables_tracker++;
|
||||
}));
|
||||
|
||||
BOOST_TEST_MESSAGE(fmt::format("Removing sstable of generation {}, refcnt: {}", old_sstables.front()->generation(), old_sstables.front().use_count()));
|
||||
};
|
||||
|
||||
auto do_compaction = [&] (size_t expected_input, size_t expected_output) -> std::vector<shared_sstable> {
|
||||
auto control = make_strategy_control_for_test(false);
|
||||
auto desc = cs.get_sstables_for_compaction(cf.as_table_state(), *control);
|
||||
|
||||
// nothing to compact, move on.
|
||||
if (desc.sstables.empty()) {
|
||||
return {};
|
||||
}
|
||||
std::unordered_set<sstables::run_id> run_ids;
|
||||
bool incremental_enabled = std::any_of(desc.sstables.begin(), desc.sstables.end(), [&run_ids] (shared_sstable& sst) {
|
||||
return !run_ids.insert(sst->run_identifier()).second;
|
||||
});
|
||||
|
||||
BOOST_REQUIRE(desc.sstables.size() == expected_input);
|
||||
auto sstable_run = boost::copy_range<std::set<sstables::generation_type>>(desc.sstables
|
||||
| boost::adaptors::transformed([] (auto& sst) { return sst->generation(); }));
|
||||
auto expected_sst = sstable_run.begin();
|
||||
auto closed_sstables_tracker = sstable_run.begin();
|
||||
auto replacer = [&] (compaction_completion_desc ccd) {
|
||||
BOOST_REQUIRE(expected_sst != sstable_run.end());
|
||||
if (incremental_enabled) {
|
||||
do_incremental_replace(std::move(ccd.old_sstables), std::move(ccd.new_sstables), expected_sst, closed_sstables_tracker);
|
||||
} else {
|
||||
do_replace(std::move(ccd.old_sstables), std::move(ccd.new_sstables));
|
||||
expected_sst = sstable_run.end();
|
||||
}
|
||||
};
|
||||
|
||||
auto result = compact(std::move(desc.sstables), replacer);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(expected_output, result.size());
|
||||
BOOST_REQUIRE(expected_sst == sstable_run.end());
|
||||
return result;
|
||||
};
|
||||
|
||||
// Generate 4 sstable runs composed of 4 fragments each after 4 compactions.
|
||||
// All fragments non-overlapping.
|
||||
for (auto i = 0U; i < tokens.size(); i++) {
|
||||
auto sst = make_sstable_containing(sst_gen, { make_insert(tokens[i]) });
|
||||
sst->set_sstable_level(1);
|
||||
BOOST_REQUIRE(sst->get_sstable_level() == 1);
|
||||
column_family_test(cf).add_sstable(sst).get();
|
||||
sstables.insert(std::move(sst));
|
||||
do_compaction(4, 4);
|
||||
}
|
||||
BOOST_REQUIRE(sstables.size() == 16);
|
||||
|
||||
// Generate 1 sstable run from 4 sstables runs of similar size
|
||||
auto result = do_compaction(16, 16);
|
||||
BOOST_REQUIRE(result.size() == 16);
|
||||
for (auto i = 0U; i < tokens.size(); i++) {
|
||||
assert_that(sstable_reader(env.semaphore().make_tracking_only_permit(s, "test reader", db::no_timeout, tracing::trace_state_ptr()), result[i], s))
|
||||
.produces(make_insert(tokens[i]))
|
||||
.produces_end_of_stream();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(incremental_compaction_sag_test) {
|
||||
auto builder = schema_builder("tests", "incremental_compaction_test")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("value", int32_type);
|
||||
auto s = builder.build();
|
||||
|
||||
struct sag_test {
|
||||
test_env& _env;
|
||||
mutable table_for_tests _cf;
|
||||
incremental_compaction_strategy _ics;
|
||||
const unsigned min_threshold = 4;
|
||||
const size_t data_set_size = 1'000'000'000;
|
||||
|
||||
static incremental_compaction_strategy make_ics(double space_amplification_goal) {
|
||||
std::map<sstring, sstring> options;
|
||||
options.emplace(sstring("space_amplification_goal"), sstring(std::to_string(space_amplification_goal)));
|
||||
return incremental_compaction_strategy(options);
|
||||
}
|
||||
static replica::column_family::config make_table_config(test_env& env) {
|
||||
auto config = env.make_table_config();
|
||||
config.compaction_enforce_min_threshold = true;
|
||||
return config;
|
||||
}
|
||||
|
||||
sag_test(test_env& env, schema_ptr s, double space_amplification_goal)
|
||||
: _env(env)
|
||||
, _cf(env.make_table_for_tests(s))
|
||||
, _ics(make_ics(space_amplification_goal))
|
||||
{
|
||||
}
|
||||
|
||||
double space_amplification() const {
|
||||
auto sstables = _cf->get_sstables();
|
||||
auto total = boost::accumulate(*sstables | boost::adaptors::transformed(std::mem_fn(&sstable::data_size)), uint64_t(0));
|
||||
return double(total) / data_set_size;
|
||||
}
|
||||
|
||||
shared_sstable make_sstable_with_size(size_t sstable_data_size) {
|
||||
auto sst = _env.make_sstable(_cf->schema(), "/nowhere/in/particular", _env.new_generation(), sstable_version_types::md, big);
|
||||
auto keys = tests::generate_partition_keys(2, _cf->schema(), local_shard_only::yes);
|
||||
sstables::test(sst).set_values(keys[0].key(), keys[1].key(), stats_metadata{}, sstable_data_size);
|
||||
return sst;
|
||||
}
|
||||
|
||||
void populate(double target_space_amplification) {
|
||||
auto add_sstable = [this] (unsigned sst_data_size) {
|
||||
auto sst = make_sstable_with_size(sst_data_size);
|
||||
column_family_test(_cf).add_sstable(sst).get();
|
||||
};
|
||||
|
||||
add_sstable(data_set_size);
|
||||
while (space_amplification() < target_space_amplification) {
|
||||
add_sstable(data_set_size / min_threshold);
|
||||
}
|
||||
}
|
||||
|
||||
void run() {
|
||||
auto& table_s = _cf.as_table_state();
|
||||
auto control = make_strategy_control_for_test(false);
|
||||
for (;;) {
|
||||
auto desc = _ics.get_sstables_for_compaction(table_s, *control);
|
||||
// no more jobs, bailing out...
|
||||
if (desc.sstables.empty()) {
|
||||
break;
|
||||
}
|
||||
auto total = boost::accumulate(desc.sstables | boost::adaptors::transformed(std::mem_fn(&sstable::data_size)), uint64_t(0));
|
||||
std::vector<shared_sstable> new_ssts = { make_sstable_with_size(std::min(total, data_set_size)) };
|
||||
column_family_test(_cf).rebuild_sstable_list(table_s, new_ssts, desc.sstables).get();
|
||||
}
|
||||
}
|
||||
|
||||
future<> stop() {
|
||||
return _cf.stop();
|
||||
}
|
||||
};
|
||||
|
||||
using SAG = double;
|
||||
using TABLE_INITIAL_SA = double;
|
||||
|
||||
auto with_sag_test = [&] (SAG sag, TABLE_INITIAL_SA initial_sa) {
|
||||
test_env::do_with_async([&] (test_env& env) {
|
||||
sag_test test(env, s, sag);
|
||||
test.populate(initial_sa);
|
||||
BOOST_REQUIRE(test.space_amplification() >= initial_sa);
|
||||
test.run();
|
||||
BOOST_REQUIRE(test.space_amplification() <= sag);
|
||||
test.stop().get();
|
||||
}).get();
|
||||
};
|
||||
|
||||
with_sag_test(SAG(1.25), TABLE_INITIAL_SA(1.5));
|
||||
with_sag_test(SAG(2), TABLE_INITIAL_SA(1.5));
|
||||
with_sag_test(SAG(1.5), TABLE_INITIAL_SA(1.75));
|
||||
with_sag_test(SAG(1.01), TABLE_INITIAL_SA(1.5));
|
||||
with_sag_test(SAG(1.5), TABLE_INITIAL_SA(1));
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(basic_garbage_collection_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
auto tmp = tmpdir();
|
||||
auto s = schema_builder("ks", "cf")
|
||||
.with_column("p1", utf8_type, column_kind::partition_key)
|
||||
.with_column("c1", utf8_type, column_kind::clustering_key)
|
||||
.with_column("r1", utf8_type)
|
||||
.build();
|
||||
|
||||
static constexpr float expired = 0.33;
|
||||
// we want number of expired keys to be ~ 1.5*sstables::TOMBSTONE_HISTOGRAM_BIN_SIZE so as to
|
||||
// test ability of histogram to return a good estimation after merging keys.
|
||||
static int total_keys = std::ceil(sstables::TOMBSTONE_HISTOGRAM_BIN_SIZE/expired)*1.5;
|
||||
|
||||
auto make_insert = [&] (bytes k, uint32_t ttl, uint32_t expiration_time) {
|
||||
auto key = partition_key::from_exploded(*s, {k});
|
||||
mutation m(s, key);
|
||||
auto c_key = clustering_key::from_exploded(*s, {to_bytes("c1")});
|
||||
auto live_cell = atomic_cell::make_live(*utf8_type, 0, bytes("a"), gc_clock::time_point(gc_clock::duration(expiration_time)), gc_clock::duration(ttl));
|
||||
m.set_clustered_cell(c_key, *s->get_column_definition("r1"), std::move(live_cell));
|
||||
return m;
|
||||
};
|
||||
std::vector<mutation> mutations;
|
||||
mutations.reserve(total_keys);
|
||||
|
||||
auto expired_keys = total_keys*expired;
|
||||
auto now = gc_clock::now();
|
||||
for (auto i = 0; i < expired_keys; i++) {
|
||||
// generate expiration time at different time points or only a few entries would be created in histogram
|
||||
auto expiration_time = (now - gc_clock::duration(DEFAULT_GC_GRACE_SECONDS*2+i)).time_since_epoch().count();
|
||||
mutations.push_back(make_insert(to_bytes("expired_key" + to_sstring(i)), 1, expiration_time));
|
||||
}
|
||||
auto remaining = total_keys-expired_keys;
|
||||
auto expiration_time = (now + gc_clock::duration(3600)).time_since_epoch().count();
|
||||
for (auto i = 0; i < remaining; i++) {
|
||||
mutations.push_back(make_insert(to_bytes("key" + to_sstring(i)), 3600, expiration_time));
|
||||
}
|
||||
|
||||
table_for_tests cf = env.make_table_for_tests(s);
|
||||
auto close_cf = deferred_stop(cf);
|
||||
|
||||
auto creator = [&] {
|
||||
auto sst = env.make_sstable(s, tmp.path().string(), env.new_generation(), sstables::get_highest_sstable_version(), big);
|
||||
return sst;
|
||||
};
|
||||
auto sst = make_sstable_containing(creator, std::move(mutations));
|
||||
column_family_test(cf).add_sstable(sst).get();
|
||||
|
||||
const auto& stats = sst->get_stats_metadata();
|
||||
BOOST_REQUIRE(stats.estimated_tombstone_drop_time.bin.size() == sstables::TOMBSTONE_HISTOGRAM_BIN_SIZE);
|
||||
// Asserts that two keys are equal to within a positive delta
|
||||
sstable_run run;
|
||||
// FIXME: can we ignore return value of insert()?
|
||||
(void)run.insert(sst);
|
||||
BOOST_REQUIRE(std::fabs(run.estimate_droppable_tombstone_ratio(now, cf.as_table_state().get_tombstone_gc_state(), cf.schema()) - expired) <= 0.1);
|
||||
|
||||
auto cd = sstables::compaction_descriptor({ sst });
|
||||
cd.enable_garbage_collection(cf->get_sstable_set());
|
||||
auto info = compact_sstables(env, std::move(cd), cf, creator).get();
|
||||
auto uncompacted_size = sst->data_size();
|
||||
BOOST_REQUIRE(info.new_sstables.size() == 1);
|
||||
BOOST_REQUIRE(info.new_sstables.front()->estimate_droppable_tombstone_ratio(now, cf.as_table_state().get_tombstone_gc_state(), cf.schema()) == 0.0f);
|
||||
BOOST_REQUIRE_CLOSE(info.new_sstables.front()->data_size(), uncompacted_size*(1-expired), 5);
|
||||
auto control = make_strategy_control_for_test(false);
|
||||
|
||||
// sstable satisfying conditions will be included
|
||||
{
|
||||
std::map<sstring, sstring> options;
|
||||
options.emplace("tombstone_threshold", "0.3f");
|
||||
// that's needed because sstable with droppable data should be old enough.
|
||||
options.emplace("tombstone_compaction_interval", "1");
|
||||
sleep(2s).get();
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, options);
|
||||
auto descriptor = cs.get_sstables_for_compaction(cf.as_table_state(), *control);
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == 1);
|
||||
BOOST_REQUIRE(descriptor.sstables.front() == sst);
|
||||
}
|
||||
|
||||
// sstable with droppable ratio of 0.3 won't be included due to threshold
|
||||
{
|
||||
std::map<sstring, sstring> options;
|
||||
options.emplace("tombstone_threshold", "0.5f");
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, options);
|
||||
auto descriptor = cs.get_sstables_for_compaction(cf.as_table_state(), *control);
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == 0);
|
||||
}
|
||||
// sstable which was recently created won't be included due to min interval
|
||||
{
|
||||
std::map<sstring, sstring> options;
|
||||
options.emplace("tombstone_compaction_interval", "3600");
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, options);
|
||||
sstables::test(sst).set_data_file_write_time(db_clock::now());
|
||||
auto descriptor = cs.get_sstables_for_compaction(cf.as_table_state(), *control);
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == 0);
|
||||
}
|
||||
// sstable which should not be included because of droppable ratio of 0.3, will actually be included
|
||||
// because the droppable ratio check has been disabled with unchecked_tombstone_compaction set to true
|
||||
{
|
||||
std::map<sstring, sstring> options;
|
||||
options.emplace("tombstone_compaction_interval", "3600");
|
||||
options.emplace("tombstone_threshold", "0.5f");
|
||||
options.emplace("unchecked_tombstone_compaction", "true");
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, options);
|
||||
sstables::test(sst).set_data_file_write_time(db_clock::now() - std::chrono::seconds(7200));
|
||||
auto descriptor = cs.get_sstables_for_compaction(cf.as_table_state(), *control);
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == 1);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(ics_reshape_test) {
|
||||
static constexpr unsigned disjoint_sstable_count = 256;
|
||||
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
auto builder = schema_builder("tests", "ics_reshape_test")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("cl", ::timestamp_type, column_kind::clustering_key)
|
||||
.with_column("value", int32_type);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::incremental);
|
||||
constexpr unsigned target_sstable_size_in_mb = 1000;
|
||||
std::map <sstring, sstring> opts = {
|
||||
{"sstable_size_in_mb", to_sstring(target_sstable_size_in_mb)},
|
||||
};
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, opts);
|
||||
builder.set_compaction_strategy_options(std::move(opts));
|
||||
auto s = builder.build();
|
||||
|
||||
auto tokens = tests::generate_partition_keys(disjoint_sstable_count, s, local_shard_only::yes);
|
||||
|
||||
auto make_row = [&](unsigned token_idx) {
|
||||
auto key = tokens[token_idx].key();
|
||||
|
||||
mutation m(s, key);
|
||||
auto value = 1;
|
||||
auto next_ts = 1;
|
||||
auto c_key = clustering_key::from_exploded(*s, {::timestamp_type->decompose(next_ts)});
|
||||
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value)), next_ts);
|
||||
return m;
|
||||
};
|
||||
|
||||
auto tmp = tmpdir();
|
||||
|
||||
auto sst_gen = [&env, s, &tmp]() {
|
||||
return env.make_sstable(s, tmp.path().string(), env.new_generation(), sstables::sstable::version_types::md, big);
|
||||
};
|
||||
|
||||
{
|
||||
unsigned sstable_count = s->max_compaction_threshold() * 2;
|
||||
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(sstable_count);
|
||||
for (unsigned i = 0; i < sstable_count; i++) {
|
||||
auto sst = make_sstable_containing(sst_gen, {make_row(0)});
|
||||
sstables.push_back(std::move(sst));
|
||||
}
|
||||
|
||||
auto ret = cs.get_reshaping_job(sstables, s, reshape_config{.mode = reshape_mode::strict});
|
||||
BOOST_REQUIRE(ret.sstables.size() == unsigned(s->max_compaction_threshold()));
|
||||
BOOST_REQUIRE(ret.max_sstable_bytes == target_sstable_size_in_mb*1024*1024);
|
||||
}
|
||||
|
||||
{
|
||||
// create set of 256 disjoint ssts and expect that stcs reshape allows them all to be compacted at once
|
||||
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(disjoint_sstable_count);
|
||||
for (unsigned i = 0; i < disjoint_sstable_count; i++) {
|
||||
auto sst = make_sstable_containing(sst_gen, {make_row(i)});
|
||||
sstables.push_back(std::move(sst));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, reshape_config{.mode = reshape_mode::strict}).sstables.size() == disjoint_sstable_count);
|
||||
}
|
||||
|
||||
{
|
||||
// create a single run of 256 sstables and expect that reshape will say there's nothing to do.
|
||||
|
||||
run_id sstable_run_id = run_id::create_random_id();
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(disjoint_sstable_count);
|
||||
for (unsigned i = 0; i < disjoint_sstable_count; i++) {
|
||||
auto sst = make_sstable_containing(sst_gen, {make_row(i)});
|
||||
sstables::test(sst).set_run_identifier(sstable_run_id);
|
||||
sstables.push_back(std::move(sst));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, reshape_config{.mode = reshape_mode::strict}).sstables.size() == 0);
|
||||
}
|
||||
|
||||
{
|
||||
// create set of 256 overlapping ssts and expect that stcs reshape allows only 32 to be compacted at once
|
||||
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
sstables.reserve(disjoint_sstable_count);
|
||||
for (unsigned i = 0; i < disjoint_sstable_count; i++) {
|
||||
auto sst = make_sstable_containing(sst_gen, {make_row(0)});
|
||||
sstables.push_back(std::move(sst));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, reshape_config{.mode = reshape_mode::strict}).sstables.size() == uint64_t(s->max_compaction_threshold()));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(gc_tombstone_with_grace_seconds_test) {
|
||||
return test_env::do_with_async([](test_env &env) {
|
||||
auto gc_grace_seconds = 5;
|
||||
auto schema = schema_builder("tests", "gc_tombstone_with_grace_seconds_test")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("value", byte_type)
|
||||
.set_gc_grace_seconds(gc_grace_seconds).build();
|
||||
auto sst_factory = env.make_sst_factory(schema);
|
||||
|
||||
auto now = gc_clock::now();
|
||||
// set the expiration time to (now - gc_grace_seconds), so that the tombstone is GC'able when compaction is run
|
||||
auto expiration_time = (now - gc_clock::duration(gc_grace_seconds)).time_since_epoch().count();
|
||||
mutation mut(schema, tests::generate_partition_key(schema, local_shard_only::yes));
|
||||
auto live_cell = atomic_cell::make_live(*byte_type, 0, to_bytes("a"), gc_clock::time_point(gc_clock::duration(expiration_time)), gc_clock::duration(1));
|
||||
mut.set_clustered_cell(clustering_key::make_empty(), *schema->get_column_definition("value"), std::move(live_cell));
|
||||
auto sst = make_sstable_containing(env.make_sst_factory(schema), {mut});
|
||||
|
||||
table_for_tests cf = env.make_table_for_tests(schema);
|
||||
auto close_cf = deferred_stop(cf);
|
||||
column_family_test(cf).add_sstable(sst).get();
|
||||
|
||||
std::map<sstring, sstring> options;
|
||||
// reduce tombstone_compaction_interval to make droppable data old enough for GC.
|
||||
options.emplace("tombstone_compaction_interval", "1");
|
||||
forward_jump_clocks(std::chrono::seconds{1});
|
||||
auto control = make_strategy_control_for_test(false);
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, options);
|
||||
auto descriptor = cs.get_sstables_for_compaction(cf.as_table_state(), *control);
|
||||
BOOST_REQUIRE_EQUAL(descriptor.sstables.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(descriptor.sstables.front(), sst);
|
||||
});
|
||||
}
|
||||
@@ -1158,7 +1158,7 @@ SEASTAR_TEST_CASE(test_system_schema_version_is_stable) {
|
||||
|
||||
// If you changed the schema of system.batchlog then this is expected to fail.
|
||||
// Just replace expected version with the new version.
|
||||
BOOST_REQUIRE_EQUAL(s->version(), table_schema_version(utils::UUID("9621f170-f101-3459-a8d3-f342c83ad86e")));
|
||||
BOOST_REQUIRE_EQUAL(s->version(), table_schema_version(utils::UUID("776f1766-8688-3d52-908b-a5228900dc00")));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -39,6 +39,8 @@
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "compaction/time_window_compaction_strategy.hh"
|
||||
#include "compaction/leveled_compaction_strategy.hh"
|
||||
#include "compaction/incremental_backlog_tracker.hh"
|
||||
#include "compaction/size_tiered_backlog_tracker.hh"
|
||||
#include "test/lib/mutation_assertions.hh"
|
||||
#include "counters.hh"
|
||||
#include "test/lib/simple_schema.hh"
|
||||
@@ -5082,6 +5084,50 @@ SEASTAR_TEST_CASE(twcs_single_key_reader_through_compound_set_test) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(basic_ics_controller_correctness_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
static constexpr uint64_t default_fragment_size = 1UL*1024UL*1024UL*1024UL;
|
||||
|
||||
auto s = simple_schema().schema();
|
||||
|
||||
auto backlog = [&] (compaction_backlog_tracker backlog_tracker, uint64_t max_fragment_size) {
|
||||
table_for_tests cf = env.make_table_for_tests();
|
||||
auto stop_cf = defer([&] { cf.stop().get(); });
|
||||
|
||||
uint64_t current_sstable_size = default_fragment_size;
|
||||
uint64_t data_set_size = 0;
|
||||
static constexpr uint64_t target_data_set_size = 1000UL*1024UL*1024UL*1024UL;
|
||||
|
||||
while (data_set_size < target_data_set_size) {
|
||||
auto run_identifier = sstables::run_id::create_random_id();
|
||||
|
||||
auto expected_fragments = std::max(1UL, current_sstable_size / max_fragment_size);
|
||||
uint64_t fragment_size = std::max(default_fragment_size, current_sstable_size / expected_fragments);
|
||||
auto tokens = tests::generate_partition_keys(expected_fragments, s, local_shard_only::yes);
|
||||
|
||||
for (auto i = 0UL; i < expected_fragments; i++) {
|
||||
auto sst = sstable_for_overlapping_test(env, cf->schema(), tokens[i].key(), tokens[i].key());
|
||||
sstables::test(sst).set_data_file_size(fragment_size);
|
||||
sstables::test(sst).set_run_identifier(run_identifier);
|
||||
backlog_tracker.replace_sstables({}, {std::move(sst)});
|
||||
}
|
||||
data_set_size += current_sstable_size;
|
||||
current_sstable_size *= 2;
|
||||
}
|
||||
|
||||
return backlog_tracker.backlog();
|
||||
};
|
||||
|
||||
sstables::incremental_compaction_strategy_options ics_options;
|
||||
auto ics_backlog = backlog(compaction_backlog_tracker(std::make_unique<incremental_backlog_tracker>(ics_options)), default_fragment_size);
|
||||
sstables::size_tiered_compaction_strategy_options stcs_options;
|
||||
auto stcs_backlog = backlog(compaction_backlog_tracker(std::make_unique<size_tiered_backlog_tracker>(stcs_options)), std::numeric_limits<size_t>::max());
|
||||
|
||||
// don't expect ics and stcs to yield different backlogs for the same workload.
|
||||
BOOST_CHECK_CLOSE(ics_backlog, stcs_backlog, 0.0001);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_major_does_not_miss_data_in_memtable) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
auto builder = schema_builder("tests", "test_major_does_not_miss_data_in_memtable")
|
||||
@@ -5253,6 +5299,10 @@ SEASTAR_TEST_CASE(simple_backlog_controller_test_leveled) {
|
||||
return run_controller_test(sstables::compaction_strategy_type::leveled);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(simple_backlog_controller_test_incremental) {
|
||||
return run_controller_test(sstables::compaction_strategy_type::incremental);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
constexpr size_t all_files = 64;
|
||||
@@ -5339,6 +5389,9 @@ SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) {
|
||||
// LCS: Check that 1 jobs is returned for all non-overlapping files in level 1, as incremental compaction can be employed
|
||||
// to limit memory usage and space requirement.
|
||||
run_cleanup_strategy_test(sstables::compaction_strategy_type::leveled, 64, empty_opts, 0ms, 1);
|
||||
|
||||
// ICS: Check that 2 jobs are returned for a size tier containing 2x more files (single-fragment runs) than max threshold.
|
||||
run_cleanup_strategy_test(sstables::compaction_strategy_type::incremental, 32);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -49,6 +49,14 @@ def test_time_window_compaction_strategy_options(cql, table1):
|
||||
def test_leveled_compaction_strategy_options(cql, table1):
|
||||
assert_throws(cql, table1, r"sstable_size_in_mb value \(-5\) must be positive|sstable_size_in_mb must be larger than 0, but was -5", "ALTER TABLE %s WITH compaction = { 'class' : 'LeveledCompactionStrategy', 'sstable_size_in_mb' : -5 }")
|
||||
|
||||
def test_incremental_compaction_strategy_options(cql, table1):
|
||||
assert_throws(cql, table1, r"min_sstable_size value \(-1\) must be non negative", "ALTER TABLE %s WITH compaction = { 'class' : 'IncrementalCompactionStrategy', 'min_sstable_size' : -1 }")
|
||||
assert_throws(cql, table1, r"bucket_low value \(0\) must be between 0.0 and 1.0", "ALTER TABLE %s WITH compaction = { 'class' : 'IncrementalCompactionStrategy', 'bucket_low' : 0.0 }")
|
||||
assert_throws(cql, table1, r"bucket_low value \(1.3\) must be between 0.0 and 1.0", "ALTER TABLE %s WITH compaction = { 'class' : 'IncrementalCompactionStrategy', 'bucket_low' : 1.3 }")
|
||||
assert_throws(cql, table1, r"bucket_high value \(0.7\) must be greater than 1.0", "ALTER TABLE %s WITH compaction = { 'class' : 'IncrementalCompactionStrategy', 'bucket_high' : 0.7 }")
|
||||
assert_throws(cql, table1, r"space_amplification_goal value \(2.2\) must be greater than 1.0 and less than or equal to 2.0", "ALTER TABLE %s WITH compaction = { 'class' : 'IncrementalCompactionStrategy', 'space_amplification_goal' : 2.2 }")
|
||||
assert_throws(cql, table1, r"min_threshold value \(1\) must be bigger or equal to 2", "ALTER TABLE %s WITH compaction = { 'class' : 'IncrementalCompactionStrategy', 'min_threshold' : 1 }")
|
||||
|
||||
def test_not_allowed_options(cql, table1):
|
||||
def scylla_error(**kwargs):
|
||||
template = "Invalid compaction strategy options {{{}}} for chosen strategy type"
|
||||
@@ -63,3 +71,4 @@ def test_not_allowed_options(cql, table1):
|
||||
assert_throws(cql, table1, rf"{scylla_error(abc=-54.54)}|Properties specified \[abc\] are not understood by SizeTieredCompactionStrategy", "ALTER TABLE %s WITH compaction = { 'class' : 'SizeTieredCompactionStrategy', 'abc' : -54.54 }")
|
||||
assert_throws(cql, table1, rf"{scylla_error(dog=3)}||Properties specified \[dog\] are not understood by TimeWindowCompactionStrategy", "ALTER TABLE %s WITH compaction = { 'class' : 'TimeWindowCompactionStrategy', 'dog' : 3 }")
|
||||
assert_throws(cql, table1, rf"{scylla_error(compaction_window_size=4)}|Properties specified \[compaction_window_size\] are not understood by LeveledCompactionStrategy", "ALTER TABLE %s WITH compaction = { 'class' : 'LeveledCompactionStrategy', 'compaction_window_size' : 4 }")
|
||||
assert_throws(cql, table1, rf"{scylla_error(cold_reads_to_omit=0.5)}|Properties specified \[cold_reads_to_omit\] are not understood by IncrementalCompactionStrategy", "ALTER TABLE %s WITH compaction = { 'class' : 'IncrementalCompactionStrategy', 'cold_reads_to_omit' : 0.5 }")
|
||||
|
||||
Reference in New Issue
Block a user