Merge "Fix TWCS compaction aggressiveness due to data segregation" from Raphael

"
After data segregation feature, anything that cause out-of-order writes,
like read repair, can result in small updates to past time windows.
This causes compaction to be very aggressive because whenever a past time
window is updated like that, that time window is recompacted into a
single SSTable.
Users expect that once a window is closed, it will no longer be written
to, but that has changed since the introduction of the data segregation
future. We didn't anticipate the write amplification issues that the
feature would cause. To fix this problem, let's perform size-tiered
compaction on the windows that are no longer active and were updated
because data was segregated. The current behavior where the last active
window is merged into one file is kept. But thereafter, that same
window will only be compacted using STCS.

Fixes #6928.
"

* 'fix_twcs_agressiveness_after_data_segregation_v2' of github.com:raphaelsc/scylla:
  compaction/twcs: improve further debug messages
  compaction/twcs: Improve debug log which shows all windows
  test: Check that TWCS properly performs size-tiered compaction on past windows
  compaction/twcs: Make task estimation take into account the size-tiered behavior
  compaction/stcs: Export static function that estimates pending tasks
  compaction/stcs: Make get_buckets() static
  compact/twcs: Perform size-tiered compaction on past time windows
  compaction/twcs: Make strategy easier to extend by removing duplicated knowledge
  compaction/twcs: Make newest_bucket() non-static
  compaction/twcs: Move TWCS implementation into source file
This commit is contained in:
Avi Kivity
2020-08-19 11:34:41 +03:00
5 changed files with 319 additions and 155 deletions

View File

@@ -50,7 +50,7 @@ size_tiered_compaction_strategy_options::size_tiered_compaction_strategy_options
}
std::vector<std::pair<sstables::shared_sstable, uint64_t>>
size_tiered_compaction_strategy::create_sstable_and_length_pairs(const std::vector<sstables::shared_sstable>& sstables) const {
size_tiered_compaction_strategy::create_sstable_and_length_pairs(const std::vector<sstables::shared_sstable>& sstables) {
std::vector<std::pair<sstables::shared_sstable, uint64_t>> sstable_length_pairs;
sstable_length_pairs.reserve(sstables.size());
@@ -66,7 +66,7 @@ size_tiered_compaction_strategy::create_sstable_and_length_pairs(const std::vect
}
std::vector<std::vector<sstables::shared_sstable>>
size_tiered_compaction_strategy::get_buckets(const std::vector<sstables::shared_sstable>& sstables) const {
size_tiered_compaction_strategy::get_buckets(const std::vector<sstables::shared_sstable>& sstables, size_tiered_compaction_strategy_options options) {
// sstables sorted by size of its data file.
auto sorted_sstables = create_sstable_and_length_pairs(sstables);
@@ -87,8 +87,8 @@ size_tiered_compaction_strategy::get_buckets(const std::vector<sstables::shared_
for (auto it = buckets.begin(); it != buckets.end(); it++) {
size_t old_average_size = it->first;
if ((size > (old_average_size * _options.bucket_low) && size < (old_average_size * _options.bucket_high)) ||
(size < _options.min_sstable_size && old_average_size < _options.min_sstable_size)) {
if ((size > (old_average_size * options.bucket_low) && size < (old_average_size * options.bucket_high)) ||
(size < options.min_sstable_size && old_average_size < options.min_sstable_size)) {
auto bucket = std::move(it->second);
size_t total_size = bucket.size() * old_average_size;
size_t new_average_size = (total_size + size) / (bucket.size() + 1);
@@ -120,6 +120,11 @@ size_tiered_compaction_strategy::get_buckets(const std::vector<sstables::shared_
return bucket_list;
}
std::vector<std::vector<sstables::shared_sstable>>
size_tiered_compaction_strategy::get_buckets(const std::vector<sstables::shared_sstable>& sstables) const {
return get_buckets(sstables, _options);
}
std::vector<sstables::shared_sstable>
size_tiered_compaction_strategy::most_interesting_bucket(std::vector<std::vector<sstables::shared_sstable>> buckets,
unsigned min_threshold, unsigned max_threshold)
@@ -199,23 +204,28 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(column_family& cfs,
return sstables::compaction_descriptor();
}
int64_t size_tiered_compaction_strategy::estimated_pending_compactions(const std::vector<sstables::shared_sstable>& sstables,
int min_threshold, int max_threshold, size_tiered_compaction_strategy_options options) {
int64_t n = 0;
for (auto& bucket : get_buckets(sstables, options)) {
if (bucket.size() >= size_t(min_threshold)) {
n += std::ceil(double(bucket.size()) / max_threshold);
}
}
return n;
}
int64_t size_tiered_compaction_strategy::estimated_pending_compactions(column_family& cf) const {
int min_threshold = cf.min_compaction_threshold();
int max_threshold = cf.schema()->max_compaction_threshold();
std::vector<sstables::shared_sstable> sstables;
int64_t n = 0;
sstables.reserve(cf.sstables_count());
for (auto& entry : *cf.get_sstables()) {
sstables.push_back(entry);
}
for (auto& bucket : get_buckets(sstables)) {
if (bucket.size() >= size_t(min_threshold)) {
n += std::ceil(double(bucket.size()) / max_threshold);
}
}
return n;
return estimated_pending_compactions(sstables, min_threshold, max_threshold, _options);
}
std::vector<sstables::shared_sstable>

View File

@@ -97,9 +97,11 @@ class size_tiered_compaction_strategy : public compaction_strategy_impl {
compaction_backlog_tracker _backlog_tracker;
// Return a list of pair of shared_sstable and its respective size.
std::vector<std::pair<sstables::shared_sstable, uint64_t>> create_sstable_and_length_pairs(const std::vector<sstables::shared_sstable>& sstables) const;
static std::vector<std::pair<sstables::shared_sstable, uint64_t>> create_sstable_and_length_pairs(const std::vector<sstables::shared_sstable>& sstables);
// Group files of similar size into buckets.
static std::vector<std::vector<sstables::shared_sstable>> get_buckets(const std::vector<sstables::shared_sstable>& sstables, size_tiered_compaction_strategy_options options);
std::vector<std::vector<sstables::shared_sstable>> get_buckets(const std::vector<sstables::shared_sstable>& sstables) const;
// Maybe return a bucket of sstables to compact
@@ -135,6 +137,8 @@ public:
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) override;
static int64_t estimated_pending_compactions(const std::vector<sstables::shared_sstable>& sstables,
int min_threshold, int max_threshold, size_tiered_compaction_strategy_options options);
virtual int64_t estimated_pending_compactions(column_family& cf) const override;
virtual compaction_strategy_type type() const {

View File

@@ -172,4 +172,194 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
return compaction_descriptor();
}
compaction_descriptor
time_window_compaction_strategy::get_sstables_for_compaction(column_family& cf, std::vector<shared_sstable> candidates) {
auto gc_before = gc_clock::now() - cf.schema()->gc_grace_seconds();
if (candidates.empty()) {
return compaction_descriptor();
}
// Find fully expired SSTables. Those will be included no matter what.
std::unordered_set<shared_sstable> expired;
if (db_clock::now() - _last_expired_check > _options.expired_sstable_check_frequency) {
clogger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
expired = get_fully_expired_sstables(cf, candidates, gc_before);
_last_expired_check = db_clock::now();
} else {
clogger.debug("TWCS skipping check for fully expired SSTables");
}
if (!expired.empty()) {
auto is_expired = [&] (const shared_sstable& s) { return expired.contains(s); };
candidates.erase(boost::remove_if(candidates, is_expired), candidates.end());
}
auto compaction_candidates = get_next_non_expired_sstables(cf, std::move(candidates), gc_before);
if (!expired.empty()) {
compaction_candidates.insert(compaction_candidates.end(), expired.begin(), expired.end());
}
return compaction_descriptor(std::move(compaction_candidates), cf.get_sstable_set(), service::get_local_compaction_priority());
}
time_window_compaction_strategy::bucket_compaction_mode
time_window_compaction_strategy::compaction_mode(const bucket_t& bucket, timestamp_type bucket_key,
timestamp_type now, size_t min_threshold) const {
// STCS will also be performed on older window buckets, to avoid a bad write and
// space amplification when something like read repair cause small updates to
// those past windows.
if (bucket.size() >= 2 && !is_last_active_bucket(bucket_key, now) && _recent_active_windows.contains(bucket_key)) {
return bucket_compaction_mode::major;
} else if (bucket.size() >= size_t(min_threshold)) {
return bucket_compaction_mode::size_tiered;
}
return bucket_compaction_mode::none;
}
std::vector<shared_sstable>
time_window_compaction_strategy::get_next_non_expired_sstables(column_family& cf,
std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before) {
auto most_interesting = get_compaction_candidates(cf, non_expiring_sstables);
if (!most_interesting.empty()) {
return most_interesting;
}
// if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
// ratio is greater than threshold.
auto e = boost::range::remove_if(non_expiring_sstables, [this, &gc_before] (const shared_sstable& sst) -> bool {
return !worth_dropping_tombstones(sst, gc_before);
});
non_expiring_sstables.erase(e, non_expiring_sstables.end());
if (non_expiring_sstables.empty()) {
return {};
}
auto it = boost::min_element(non_expiring_sstables, [] (auto& i, auto& j) {
return i->get_stats_metadata().min_timestamp < j->get_stats_metadata().min_timestamp;
});
return { *it };
}
std::vector<shared_sstable>
time_window_compaction_strategy::get_compaction_candidates(column_family& cf, std::vector<shared_sstable> candidate_sstables) {
auto p = get_buckets(std::move(candidate_sstables), _options);
// Update the highest window seen, if necessary
_highest_window_seen = std::max(_highest_window_seen, p.second);
update_estimated_compaction_by_tasks(p.first, cf.min_compaction_threshold(), cf.schema()->max_compaction_threshold());
return newest_bucket(std::move(p.first), cf.min_compaction_threshold(), cf.schema()->max_compaction_threshold(),
_options.sstable_window_size, _highest_window_seen, _stcs_options);
}
timestamp_type
time_window_compaction_strategy::get_window_lower_bound(std::chrono::seconds sstable_window_size, timestamp_type timestamp) {
using namespace std::chrono;
auto timestamp_in_sec = duration_cast<seconds>(microseconds(timestamp)).count();
// mask out window size from timestamp to get lower bound of its window
auto window_lower_bound_in_sec = seconds(timestamp_in_sec - (timestamp_in_sec % sstable_window_size.count()));
return timestamp_type(duration_cast<microseconds>(window_lower_bound_in_sec).count());
}
std::pair<std::map<timestamp_type, std::vector<shared_sstable>>, timestamp_type>
time_window_compaction_strategy::get_buckets(std::vector<shared_sstable> files, time_window_compaction_strategy_options& options) {
std::map<timestamp_type, std::vector<shared_sstable>> buckets;
timestamp_type max_timestamp = 0;
// Create map to represent buckets
// For each sstable, add sstable to the time bucket
// Where the bucket is the file's max timestamp rounded to the nearest window bucket
for (auto&& f : files) {
timestamp_type ts = to_timestamp_type(options.timestamp_resolution, f->get_stats_metadata().max_timestamp);
timestamp_type lower_bound = get_window_lower_bound(options.sstable_window_size, ts);
buckets[lower_bound].push_back(std::move(f));
max_timestamp = std::max(max_timestamp, lower_bound);
}
return std::make_pair(std::move(buckets), max_timestamp);
}
static std::ostream& operator<<(std::ostream& os, const std::map<timestamp_type, std::vector<shared_sstable>>& buckets) {
os << " buckets = {\n";
for (auto& bucket : buckets | boost::adaptors::reversed) {
os << format(" key={}, size={}\n", bucket.first, bucket.second.size());
}
os << " }\n";
return os;
}
std::vector<shared_sstable>
time_window_compaction_strategy::newest_bucket(std::map<timestamp_type, std::vector<shared_sstable>> buckets,
int min_threshold, int max_threshold, std::chrono::seconds sstable_window_size, timestamp_type now,
size_tiered_compaction_strategy_options& stcs_options) {
clogger.debug("time_window_compaction_strategy::newest_bucket:\n now {}\n{}", now, buckets);
for (auto&& key_bucket : buckets | boost::adaptors::reversed) {
auto key = key_bucket.first;
auto& bucket = key_bucket.second;
if (is_last_active_bucket(key, now)) {
_recent_active_windows.insert(key);
}
switch (compaction_mode(bucket, key, now, min_threshold)) {
case bucket_compaction_mode::size_tiered: {
// If we're in the newest bucket, we'll use STCS to prioritize sstables.
auto stcs_interesting_bucket = size_tiered_compaction_strategy::most_interesting_bucket(bucket, min_threshold, max_threshold, stcs_options);
// If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
if (!stcs_interesting_bucket.empty()) {
clogger.debug("bucket size {} >= 2, key {}, performing STCS on what's here", bucket.size(), key);
return stcs_interesting_bucket;
}
break;
}
case bucket_compaction_mode::major:
_recent_active_windows.erase(key);
clogger.debug("bucket size {} >= 2 and not in current bucket, key {}, compacting what's here", bucket.size(), key);
return trim_to_threshold(std::move(bucket), max_threshold);
default:
clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
break;
}
}
return {};
}
std::vector<shared_sstable>
time_window_compaction_strategy::trim_to_threshold(std::vector<shared_sstable> bucket, int max_threshold) {
auto n = std::min(bucket.size(), size_t(max_threshold));
// Trim the largest sstables off the end to meet the maxThreshold
boost::partial_sort(bucket, bucket.begin() + n, [] (auto& i, auto& j) {
return i->ondisk_data_size() < j->ondisk_data_size();
});
bucket.resize(n);
return bucket;
}
void time_window_compaction_strategy::update_estimated_compaction_by_tasks(std::map<timestamp_type, std::vector<shared_sstable>>& tasks,
int min_threshold, int max_threshold) {
int64_t n = 0;
timestamp_type now = _highest_window_seen;
for (auto& task : tasks) {
const bucket_t& bucket = task.second;
timestamp_type bucket_key = task.first;
switch (compaction_mode(bucket, bucket_key, now, min_threshold)) {
case bucket_compaction_mode::size_tiered:
n += size_tiered_compaction_strategy::estimated_pending_compactions(bucket, min_threshold, max_threshold, _stcs_options);
break;
case bucket_compaction_mode::major:
n++;
default:
break;
}
}
_estimated_remaining_tasks = n;
}
}

View File

@@ -102,6 +102,8 @@ class time_window_compaction_strategy : public compaction_strategy_impl {
int64_t _estimated_remaining_tasks = 0;
db_clock::time_point _last_expired_check;
timestamp_type _highest_window_seen;
// Keep track of all recent active windows that still need to be compacted into a single SSTable
std::unordered_set<timestamp_type> _recent_active_windows;
size_tiered_compaction_strategy_options _stcs_options;
compaction_backlog_tracker _backlog_tracker;
public:
@@ -110,37 +112,11 @@ public:
// Better co-locate some windows into the same sstables than OOM.
static constexpr uint64_t max_data_segregation_window_count = 100;
using bucket_t = std::vector<shared_sstable>;
enum class bucket_compaction_mode { none, size_tiered, major };
public:
time_window_compaction_strategy(const std::map<sstring, sstring>& options);
virtual compaction_descriptor get_sstables_for_compaction(column_family& cf, std::vector<shared_sstable> candidates) override {
auto gc_before = gc_clock::now() - cf.schema()->gc_grace_seconds();
if (candidates.empty()) {
return compaction_descriptor();
}
// Find fully expired SSTables. Those will be included no matter what.
std::unordered_set<shared_sstable> expired;
if (db_clock::now() - _last_expired_check > _options.expired_sstable_check_frequency) {
clogger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
expired = get_fully_expired_sstables(cf, candidates, gc_before);
_last_expired_check = db_clock::now();
} else {
clogger.debug("TWCS skipping check for fully expired SSTables");
}
if (!expired.empty()) {
auto is_expired = [&] (const shared_sstable& s) { return expired.contains(s); };
candidates.erase(boost::remove_if(candidates, is_expired), candidates.end());
}
auto compaction_candidates = get_next_non_expired_sstables(cf, std::move(candidates), gc_before);
if (!expired.empty()) {
compaction_candidates.insert(compaction_candidates.end(), expired.begin(), expired.end());
}
return compaction_descriptor(std::move(compaction_candidates), cf.get_sstable_set(), service::get_local_compaction_priority());
}
virtual compaction_descriptor get_sstables_for_compaction(column_family& cf, std::vector<shared_sstable> candidates) override;
private:
static timestamp_type
to_timestamp_type(time_window_compaction_strategy_options::timestamp_resolutions resolution, int64_t timestamp_from_sstable) {
@@ -154,114 +130,36 @@ private:
};
}
// Returns true if bucket is the last, most active one.
bool is_last_active_bucket(timestamp_type bucket_key, timestamp_type now) const {
return bucket_key >= now;
}
// Returns which compaction type should be performed on a given window bucket.
bucket_compaction_mode
compaction_mode(const bucket_t& bucket, timestamp_type bucket_key, timestamp_type now, size_t min_threshold) const;
std::vector<shared_sstable>
get_next_non_expired_sstables(column_family& cf, std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before) {
auto most_interesting = get_compaction_candidates(cf, non_expiring_sstables);
get_next_non_expired_sstables(column_family& cf, std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before);
if (!most_interesting.empty()) {
return most_interesting;
}
// if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
// ratio is greater than threshold.
auto e = boost::range::remove_if(non_expiring_sstables, [this, &gc_before] (const shared_sstable& sst) -> bool {
return !worth_dropping_tombstones(sst, gc_before);
});
non_expiring_sstables.erase(e, non_expiring_sstables.end());
if (non_expiring_sstables.empty()) {
return {};
}
auto it = boost::min_element(non_expiring_sstables, [] (auto& i, auto& j) {
return i->get_stats_metadata().min_timestamp < j->get_stats_metadata().min_timestamp;
});
return { *it };
}
std::vector<shared_sstable> get_compaction_candidates(column_family& cf, std::vector<shared_sstable> candidate_sstables) {
auto p = get_buckets(std::move(candidate_sstables), _options);
// Update the highest window seen, if necessary
_highest_window_seen = std::max(_highest_window_seen, p.second);
update_estimated_compaction_by_tasks(p.first, cf.min_compaction_threshold());
return newest_bucket(std::move(p.first), cf.min_compaction_threshold(), cf.schema()->max_compaction_threshold(),
_options.sstable_window_size, _highest_window_seen, _stcs_options);
}
std::vector<shared_sstable> get_compaction_candidates(column_family& cf, std::vector<shared_sstable> candidate_sstables);
public:
// Find the lowest timestamp for window of given size
static timestamp_type
get_window_lower_bound(std::chrono::seconds sstable_window_size, timestamp_type timestamp) {
using namespace std::chrono;
auto timestamp_in_sec = duration_cast<seconds>(microseconds(timestamp)).count();
// mask out window size from timestamp to get lower bound of its window
auto window_lower_bound_in_sec = seconds(timestamp_in_sec - (timestamp_in_sec % sstable_window_size.count()));
return timestamp_type(duration_cast<microseconds>(window_lower_bound_in_sec).count());
}
get_window_lower_bound(std::chrono::seconds sstable_window_size, timestamp_type timestamp);
// Group files with similar max timestamp into buckets.
// @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader),
// and the right is the highest timestamp seen
static std::pair<std::map<timestamp_type, std::vector<shared_sstable>>, timestamp_type>
get_buckets(std::vector<shared_sstable> files, time_window_compaction_strategy_options& options) {
std::map<timestamp_type, std::vector<shared_sstable>> buckets;
get_buckets(std::vector<shared_sstable> files, time_window_compaction_strategy_options& options);
timestamp_type max_timestamp = 0;
// Create map to represent buckets
// For each sstable, add sstable to the time bucket
// Where the bucket is the file's max timestamp rounded to the nearest window bucket
for (auto&& f : files) {
timestamp_type ts = to_timestamp_type(options.timestamp_resolution, f->get_stats_metadata().max_timestamp);
timestamp_type lower_bound = get_window_lower_bound(options.sstable_window_size, ts);
buckets[lower_bound].push_back(std::move(f));
max_timestamp = std::max(max_timestamp, lower_bound);
}
return std::make_pair(std::move(buckets), max_timestamp);
}
static std::vector<shared_sstable>
std::vector<shared_sstable>
newest_bucket(std::map<timestamp_type, std::vector<shared_sstable>> buckets, int min_threshold, int max_threshold,
std::chrono::seconds sstable_window_size, timestamp_type now, size_tiered_compaction_strategy_options& stcs_options) {
// If the current bucket has at least minThreshold SSTables, choose that one.
// For any other bucket, at least 2 SSTables is enough.
// In any case, limit to maxThreshold SSTables.
for (auto&& key_bucket : buckets | boost::adaptors::reversed) {
auto key = key_bucket.first;
auto& bucket = key_bucket.second;
clogger.trace("Key {}, now {}", key, now);
if (bucket.size() >= size_t(min_threshold) && key >= now) {
// If we're in the newest bucket, we'll use STCS to prioritize sstables
auto stcs_interesting_bucket = size_tiered_compaction_strategy::most_interesting_bucket(bucket, min_threshold, max_threshold, stcs_options);
// If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
if (!stcs_interesting_bucket.empty()) {
return stcs_interesting_bucket;
}
} else if (bucket.size() >= 2 && key < now) {
clogger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here", bucket.size());
return trim_to_threshold(std::move(bucket), max_threshold);
} else {
clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
}
}
return {};
}
std::chrono::seconds sstable_window_size, timestamp_type now, size_tiered_compaction_strategy_options& stcs_options);
static std::vector<shared_sstable>
trim_to_threshold(std::vector<shared_sstable> bucket, int max_threshold) {
auto n = std::min(bucket.size(), size_t(max_threshold));
// Trim the largest sstables off the end to meet the maxThreshold
boost::partial_sort(bucket, bucket.begin() + n, [] (auto& i, auto& j) {
return i->ondisk_data_size() < j->ondisk_data_size();
});
bucket.resize(n);
return bucket;
}
trim_to_threshold(std::vector<shared_sstable> bucket, int max_threshold);
static int64_t
get_window_for(const time_window_compaction_strategy_options& options, api::timestamp_type ts) {
@@ -273,23 +171,8 @@ public:
return timestamp_type(std::chrono::duration_cast<std::chrono::microseconds>(options.get_sstable_window_size()).count());
}
private:
void update_estimated_compaction_by_tasks(std::map<timestamp_type, std::vector<shared_sstable>>& tasks, int min_threshold) {
int64_t n = 0;
timestamp_type now = _highest_window_seen;
for (auto task : tasks) {
auto key = task.first;
// For current window, make sure it's compactable
auto count = task.second.size();
if (key >= now && count >= size_t(min_threshold)) {
n++;
} else if (key < now && count >= 2) {
n++;
}
}
_estimated_remaining_tasks = n;
}
void update_estimated_compaction_by_tasks(std::map<timestamp_type, std::vector<shared_sstable>>& tasks,
int min_threshold, int max_threshold);
friend class time_window_backlog_tracker;
public:

View File

@@ -3165,6 +3165,8 @@ SEASTAR_TEST_CASE(time_window_strategy_correctness_test) {
sstables.push_back(make_sstable_containing(sst_gen, {std::move(mut)}));
}
std::map<sstring, sstring> options;
time_window_compaction_strategy twcs(options);
std::map<api::timestamp_type, std::vector<shared_sstable>> buckets;
// We'll put 3 sstables into the newest bucket
@@ -3174,13 +3176,13 @@ SEASTAR_TEST_CASE(time_window_strategy_correctness_test) {
}
sstables::size_tiered_compaction_strategy_options stcs_options;
auto now = api::timestamp_clock::now().time_since_epoch().count();
auto new_bucket = time_window_compaction_strategy::newest_bucket(buckets, 4, 32, duration_cast<seconds>(hours(1)),
auto new_bucket = twcs.newest_bucket(buckets, 4, 32, duration_cast<seconds>(hours(1)),
time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), now), stcs_options);
// incoming bucket should not be accepted when it has below the min threshold SSTables
BOOST_REQUIRE(new_bucket.empty());
now = api::timestamp_clock::now().time_since_epoch().count();
new_bucket = time_window_compaction_strategy::newest_bucket(buckets, 2, 32, duration_cast<seconds>(hours(1)),
new_bucket = twcs.newest_bucket(buckets, 2, 32, duration_cast<seconds>(hours(1)),
time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), now), stcs_options);
// incoming bucket should be accepted when it is larger than the min threshold SSTables
BOOST_REQUIRE(!new_bucket.empty());
@@ -3215,13 +3217,88 @@ SEASTAR_TEST_CASE(time_window_strategy_correctness_test) {
}
now = api::timestamp_clock::now().time_since_epoch().count();
new_bucket = time_window_compaction_strategy::newest_bucket(buckets, 4, 32, duration_cast<seconds>(hours(1)),
new_bucket = twcs.newest_bucket(buckets, 4, 32, duration_cast<seconds>(hours(1)),
time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), now), stcs_options);
// new bucket should be trimmed to max threshold of 32
BOOST_REQUIRE(new_bucket.size() == size_t(32));
});
}
// Check that TWCS will only perform size-tiered on the current window and also
// the past windows that were already previously compacted into a single SSTable.
SEASTAR_TEST_CASE(time_window_strategy_size_tiered_behavior_correctness) {
using namespace std::chrono;
return test_env::do_with_async([] (test_env& env) {
storage_service_for_tests ssft;
auto s = schema_builder("tests", "time_window_strategy")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type).build();
auto tmp = tmpdir();
auto sst_gen = [&env, s, &tmp, gen = make_lw_shared<unsigned>(1)] () mutable {
return env.make_sstable(s, tmp.path().string(), (*gen)++, la, big);
};
auto make_insert = [&] (partition_key key, api::timestamp_type t) {
mutation m(s, key);
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), t);
return m;
};
std::map<sstring, sstring> options;
sstables::size_tiered_compaction_strategy_options stcs_options;
time_window_compaction_strategy twcs(options);
std::map<api::timestamp_type, std::vector<shared_sstable>> buckets; // windows
int min_threshold = 4;
int max_threshold = 32;
auto window_size = duration_cast<seconds>(hours(1));
auto add_new_sstable_to_bucket = [&] (api::timestamp_type ts, api::timestamp_type window_ts) {
auto key = partition_key::from_exploded(*s, {to_bytes("key" + to_sstring(ts))});
auto mut = make_insert(std::move(key), ts);
auto sst = make_sstable_containing(sst_gen, {std::move(mut)});
auto bound = time_window_compaction_strategy::get_window_lower_bound(window_size, window_ts);
buckets[bound].push_back(std::move(sst));
};
api::timestamp_type current_window_ts = api::timestamp_clock::now().time_since_epoch().count();
api::timestamp_type past_window_ts = current_window_ts - duration_cast<microseconds>(seconds(2L * 3600L)).count();
// create 1 sstable into past time window and let the strategy know about it
add_new_sstable_to_bucket(0, past_window_ts);
auto now = time_window_compaction_strategy::get_window_lower_bound(window_size, past_window_ts);
// past window cannot be compacted because it has a single SSTable
BOOST_REQUIRE(twcs.newest_bucket(buckets, min_threshold, max_threshold, window_size, now, stcs_options).size() == 0);
// create min_threshold-1 sstables into current time window
for (api::timestamp_type t = 0; t < min_threshold - 1; t++) {
add_new_sstable_to_bucket(t, current_window_ts);
}
// add 1 sstable into past window.
add_new_sstable_to_bucket(1, past_window_ts);
now = time_window_compaction_strategy::get_window_lower_bound(window_size, current_window_ts);
// past window can now be compacted into a single SSTable because it was the previous current (active) window.
// current window cannot be compacted because it has less than min_threshold SSTables
BOOST_REQUIRE(twcs.newest_bucket(buckets, min_threshold, max_threshold, window_size, now, stcs_options).size() == 2);
// now past window cannot be compacted again, because it was already compacted into a single SSTable, now it switches to STCS mode.
BOOST_REQUIRE(twcs.newest_bucket(buckets, min_threshold, max_threshold, window_size, now, stcs_options).size() == 0);
// make past window contain more than min_threshold similar-sized SSTables, allowing it to be compacted again.
for (api::timestamp_type t = 2; t < min_threshold; t++) {
add_new_sstable_to_bucket(t, past_window_ts);
}
// now past window can be compacted again because it switched to STCS mode and has more than min_threshold SSTables.
BOOST_REQUIRE(twcs.newest_bucket(buckets, min_threshold, max_threshold, window_size, now, stcs_options).size() == size_t(min_threshold));
});
}
SEASTAR_TEST_CASE(test_promoted_index_read) {
// create table promoted_index_read (
// pk int,