Merge "Time window compaction strategy support" from Raphael

"Time window strategy was introduced to address several limitations of
date tiered strategy. In addition, its options are much easier to reason
about, basically just window size and window unit.
TWCS will work to keep only one sstable in each window. So the only real
optimization needed is to align partition key to the window.
Size tiered strategy is used to reduce write amplification when compacting
the incoming window.

For more details: https://issues.apache.org/jira/browse/CASSANDRA-9666

Fixes #1432."

* 'twcs_v2' of github.com:raphaelsc/scylla:
  tests: add tests for time window compaction strategy
  compaction: wire up time window compaction strategy
  compaction/twcs: override default values with options in schema
  sstables: implement time window compaction strategy
  sstables: import TimeWindowCompactionStrategy.java
This commit is contained in:
Avi Kivity
2017-07-19 10:22:53 +03:00
4 changed files with 432 additions and 0 deletions

View File

@@ -33,6 +33,7 @@ enum class compaction_strategy_type {
size_tiered,
leveled,
date_tiered,
time_window,
};
class compaction_strategy_impl;
@@ -82,6 +83,8 @@ public:
return "LeveledCompactionStrategy";
case compaction_strategy_type::date_tiered:
return "DateTieredCompactionStrategy";
case compaction_strategy_type::time_window:
return "TimeWindowCompactionStrategy";
default:
throw std::runtime_error("Invalid Compaction Strategy");
}
@@ -100,6 +103,8 @@ public:
return compaction_strategy_type::leveled;
} else if (short_name == "DateTieredCompactionStrategy") {
return compaction_strategy_type::date_tiered;
} else if (short_name == "TimeWindowCompactionStrategy") {
return compaction_strategy_type::time_window;
} else {
throw exceptions::configuration_exception(sprint("Unable to find compaction strategy class '%s'", name));
}

View File

@@ -55,6 +55,7 @@
#include "size_tiered_compaction_strategy.hh"
#include "date_tiered_compaction_strategy.hh"
#include "leveled_compaction_strategy.hh"
#include "time_window_compaction_strategy.hh"
logging::logger date_tiered_manifest::logger = logging::logger("DateTieredCompactionStrategy");
logging::logger leveled_manifest::logger("LeveledManifest");
@@ -456,6 +457,9 @@ compaction_strategy make_compaction_strategy(compaction_strategy_type strategy,
case compaction_strategy_type::date_tiered:
impl = make_shared<date_tiered_compaction_strategy>(date_tiered_compaction_strategy(options));
break;
case compaction_strategy_type::time_window:
impl = make_shared<time_window_compaction_strategy>(time_window_compaction_strategy(options));
break;
default:
throw std::runtime_error("strategy not supported");
}

View File

@@ -0,0 +1,302 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (C) 2017 ScyllaDB
*
* Modified by ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "compaction_strategy_impl.hh"
#include "compaction.hh"
#include "timestamp.hh"
#include "exceptions/exceptions.hh"
#include <boost/range/algorithm/partial_sort.hpp>
#include <boost/range/adaptors.hpp>
namespace sstables {
extern logging::logger clogger;
using namespace std::chrono_literals;
class time_window_compaction_strategy_options {
private:
static constexpr std::chrono::seconds DEFAULT_COMPACTION_WINDOW_UNIT(int window_size) { return window_size * 86400s; }
static constexpr int DEFAULT_COMPACTION_WINDOW_SIZE = 1;
static constexpr std::chrono::seconds DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS() { return 600s; }
static constexpr auto TIMESTAMP_RESOLUTION_KEY = "timestamp_resolution";
static constexpr auto COMPACTION_WINDOW_UNIT_KEY = "compaction_window_unit";
static constexpr auto COMPACTION_WINDOW_SIZE_KEY = "compaction_window_size";
static constexpr auto EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY = "expired_sstable_check_frequency_seconds";
const std::unordered_map<sstring, std::chrono::seconds> valid_window_units = { { "MINUTES", 60s }, { "HOURS", 3600s }, { "DAYS", 86400s } };
// TODO: add support to timestamp resolution other than microseconds, but it's not that important
// because new clients only use this one.
const std::unordered_set<sstring> valid_timestamp_resolutions = { "MICROSECONDS" };
std::chrono::seconds sstable_window_size = DEFAULT_COMPACTION_WINDOW_UNIT(DEFAULT_COMPACTION_WINDOW_SIZE);
db_clock::duration expired_sstable_check_frequency = DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS();
public:
time_window_compaction_strategy_options(const std::map<sstring, sstring>& options) {
std::chrono::seconds window_unit;
auto it = options.find(COMPACTION_WINDOW_UNIT_KEY);
if (it != options.end()) {
auto valid_window_units_it = valid_window_units.find(it->second);
if (valid_window_units_it == valid_window_units.end()) {
throw exceptions::syntax_exception(sstring("Invalid window unit ") + it->second + " for " + COMPACTION_WINDOW_UNIT_KEY);
}
window_unit = valid_window_units_it->second;
}
it = options.find(COMPACTION_WINDOW_SIZE_KEY);
if (it != options.end()) {
try {
sstable_window_size = std::stoi(it->second) * window_unit;
} catch (const std::exception& e) {
throw exceptions::syntax_exception(sstring("Invalid integer value ") + it->second + " for " + COMPACTION_WINDOW_SIZE_KEY);
}
}
it = options.find(EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
if (it != options.end()) {
try {
expired_sstable_check_frequency = std::chrono::seconds(std::stol(it->second));
} catch (const std::exception& e) {
throw exceptions::syntax_exception(sstring("Invalid long value ") + it->second + "for " + EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
}
}
it = options.find(TIMESTAMP_RESOLUTION_KEY);
if (it != options.end() && !valid_timestamp_resolutions.count(it->second)) {
throw exceptions::syntax_exception(sstring("Invalid timestamp resolution ") + it->second + "for " + TIMESTAMP_RESOLUTION_KEY);
}
}
friend class time_window_compaction_strategy;
};
using timestamp_type = api::timestamp_type;
class time_window_compaction_strategy : public compaction_strategy_impl {
time_window_compaction_strategy_options _options;
int64_t _estimated_remaining_tasks = 0;
db_clock::time_point _last_expired_check;
timestamp_type _highest_window_seen;
public:
time_window_compaction_strategy(const std::map<sstring, sstring>& options)
: compaction_strategy_impl(options), _options(options)
{
if (!options.count(TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.count(TOMBSTONE_THRESHOLD_OPTION)) {
_disable_tombstone_compaction = true;
clogger.debug("Disabling tombstone compactions for TWCS");
} else {
clogger.debug("Enabling tombstone compactions for TWCS");
}
_use_clustering_key_filter = true;
}
virtual compaction_descriptor get_sstables_for_compaction(column_family& cf, std::vector<shared_sstable> candidates) override {
auto gc_before = gc_clock::now() - cf.schema()->gc_grace_seconds();
if (candidates.empty()) {
return compaction_descriptor();
}
// Find fully expired SSTables. Those will be included no matter what.
std::vector<shared_sstable> expired;
if (db_clock::now() - _last_expired_check > _options.expired_sstable_check_frequency) {
clogger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
expired = get_fully_expired_sstables(cf, candidates, gc_before.time_since_epoch().count());
_last_expired_check = db_clock::now();
} else {
clogger.debug("TWCS skipping check for fully expired SSTables");
}
if (!expired.empty()) {
auto expired_as_set = boost::copy_range<std::unordered_set<shared_sstable>>(expired);
auto is_expired = [&] (const shared_sstable& s) { return expired_as_set.find(s) != expired_as_set.end(); };
candidates.erase(boost::remove_if(candidates, is_expired), candidates.end());
}
auto compaction_candidates = get_next_non_expired_sstables(cf, std::move(candidates), gc_before);
if (!expired.empty()) {
compaction_candidates.insert(compaction_candidates.end(), expired.begin(), expired.end());
}
return compaction_candidates;
}
private:
std::vector<shared_sstable>
get_next_non_expired_sstables(column_family& cf, std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before) {
auto most_interesting = get_compaction_candidates(cf, non_expiring_sstables);
if (!most_interesting.empty()) {
return most_interesting;
}
// if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
// ratio is greater than threshold.
auto e = boost::range::remove_if(non_expiring_sstables, [this, &gc_before] (const shared_sstable& sst) -> bool {
return !worth_dropping_tombstones(sst, gc_before);
});
non_expiring_sstables.erase(e, non_expiring_sstables.end());
if (non_expiring_sstables.empty()) {
return {};
}
auto it = boost::min_element(non_expiring_sstables, [] (auto& i, auto& j) {
return i->get_stats_metadata().min_timestamp < j->get_stats_metadata().min_timestamp;
});
return { *it };
}
std::vector<shared_sstable> get_compaction_candidates(column_family& cf, std::vector<shared_sstable> candidate_sstables) {
auto p = get_buckets(std::move(candidate_sstables), _options.sstable_window_size);
// Update the highest window seen, if necessary
_highest_window_seen = std::max(_highest_window_seen, p.second);
update_estimated_compaction_by_tasks(p.first, cf.schema()->min_compaction_threshold());
return newest_bucket(std::move(p.first), cf.schema()->min_compaction_threshold(), cf.schema()->max_compaction_threshold(),
_options.sstable_window_size, _highest_window_seen);
}
public:
// Find the lowest timestamp for window of given size
static timestamp_type
get_window_lower_bound(std::chrono::seconds sstable_window_size, timestamp_type timestamp) {
using namespace std::chrono;
auto timestamp_in_sec = duration_cast<seconds>(microseconds(timestamp)).count();
// mask out window size from timestamp to get lower bound of its window
auto window_lower_bound_in_sec = seconds(timestamp_in_sec - (timestamp_in_sec % sstable_window_size.count()));
return timestamp_type(duration_cast<microseconds>(window_lower_bound_in_sec).count());
}
// Group files with similar max timestamp into buckets.
// @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader),
// and the right is the highest timestamp seen
static std::pair<std::map<timestamp_type, std::vector<shared_sstable>>, timestamp_type>
get_buckets(std::vector<shared_sstable> files, std::chrono::seconds sstable_window_size) {
std::map<timestamp_type, std::vector<shared_sstable>> buckets;
timestamp_type max_timestamp = 0;
// Create map to represent buckets
// For each sstable, add sstable to the time bucket
// Where the bucket is the file's max timestamp rounded to the nearest window bucket
for (auto&& f : files) {
timestamp_type ts = f->get_stats_metadata().max_timestamp;
timestamp_type lower_bound = get_window_lower_bound(sstable_window_size, ts);
buckets[lower_bound].push_back(std::move(f));
max_timestamp = std::max(max_timestamp, lower_bound);
}
return std::make_pair(std::move(buckets), max_timestamp);
}
static std::vector<shared_sstable>
newest_bucket(std::map<timestamp_type, std::vector<shared_sstable>> buckets, int min_threshold, int max_threshold,
std::chrono::seconds sstable_window_size, timestamp_type now) {
// If the current bucket has at least minThreshold SSTables, choose that one.
// For any other bucket, at least 2 SSTables is enough.
// In any case, limit to maxThreshold SSTables.
for (auto&& key_bucket : buckets | boost::adaptors::reversed) {
auto key = key_bucket.first;
auto& bucket = key_bucket.second;
clogger.trace("Key {}, now {}", key, now);
if (bucket.size() >= size_t(min_threshold) && key >= now) {
// If we're in the newest bucket, we'll use STCS to prioritize sstables
auto stcs_interesting_bucket = size_tiered_most_interesting_bucket(bucket);
// If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
if (!stcs_interesting_bucket.empty()) {
return stcs_interesting_bucket;
}
} else if (bucket.size() >= 2 && key < now) {
clogger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here", bucket.size());
return trim_to_threshold(std::move(bucket), max_threshold);
} else {
clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
}
}
return {};
}
static std::vector<shared_sstable>
trim_to_threshold(std::vector<shared_sstable> bucket, int max_threshold) {
auto n = std::min(bucket.size(), size_t(max_threshold));
// Trim the largest sstables off the end to meet the maxThreshold
boost::partial_sort(bucket, bucket.begin() + n, [] (auto& i, auto& j) {
return i->ondisk_data_size() < j->ondisk_data_size();
});
bucket.resize(n);
return bucket;
}
private:
void update_estimated_compaction_by_tasks(std::map<timestamp_type, std::vector<shared_sstable>>& tasks, int min_threshold) {
int64_t n = 0;
timestamp_type now = _highest_window_seen;
for (auto task : tasks) {
auto key = task.first;
// For current window, make sure it's compactable
auto count = task.second.size();
if (key >= now && count >= size_t(min_threshold)) {
n++;
} else if (key < now && count >= 2) {
n++;
}
}
_estimated_remaining_tasks = n;
}
public:
virtual int64_t estimated_pending_compactions(column_family& cf) const override {
return _estimated_remaining_tasks;
}
virtual compaction_strategy_type type() const {
return compaction_strategy_type::time_window;
}
};
}

View File

@@ -42,6 +42,7 @@
#include "partition_slice_builder.hh"
#include "sstables/compaction_strategy_impl.hh"
#include "sstables/date_tiered_compaction_strategy.hh"
#include "sstables/time_window_compaction_strategy.hh"
#include "mutation_assertions.hh"
#include "mutation_reader_assertions.hh"
#include "counters.hh"
@@ -69,6 +70,8 @@ atomic_cell make_atomic_cell(bytes_view value, uint32_t ttl = 0, uint32_t expira
}
}
static shared_sstable make_sstable_containing(std::function<shared_sstable()> sst_factory, std::vector<mutation> muts);
SEASTAR_TEST_CASE(datafile_generation_01) {
// Data file with clustering key
//
@@ -3105,6 +3108,124 @@ SEASTAR_TEST_CASE(date_tiered_strategy_test_2) {
return make_ready_future<>();
}
SEASTAR_TEST_CASE(time_window_strategy_time_window_tests) {
using namespace std::chrono;
api::timestamp_type tstamp1 = duration_cast<microseconds>(milliseconds(1451001601000L)).count(); // 2015-12-25 @ 00:00:01, in milliseconds
api::timestamp_type tstamp2 = duration_cast<microseconds>(milliseconds(1451088001000L)).count(); // 2015-12-26 @ 00:00:01, in milliseconds
api::timestamp_type low_hour = duration_cast<microseconds>(milliseconds(1451001600000L)).count(); // 2015-12-25 @ 00:00:00, in milliseconds
// A 1 hour window should round down to the beginning of the hour
BOOST_REQUIRE(time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), tstamp1) == low_hour);
// A 1 minute window should round down to the beginning of the hour
BOOST_REQUIRE(time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(minutes(1)), tstamp1) == low_hour);
// A 1 day window should round down to the beginning of the hour
BOOST_REQUIRE(time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(24)), tstamp1) == low_hour);
// The 2 day window of 2015-12-25 + 2015-12-26 should round down to the beginning of 2015-12-25
BOOST_REQUIRE(time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(24*2)), tstamp2) == low_hour);
return make_ready_future<>();
}
SEASTAR_TEST_CASE(time_window_strategy_correctness_test) {
using namespace std::chrono;
return seastar::async([] {
auto s = schema_builder("tests", "time_window_strategy")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type).build();
auto tmp = make_lw_shared<tmpdir>();
auto sst_gen = [s, tmp, gen = make_lw_shared<unsigned>(1)] () mutable {
return make_lw_shared<sstable>(s, tmp->path, (*gen)++, la, big);
};
auto make_insert = [&] (partition_key key, api::timestamp_type t) {
mutation m(key, s);
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), t);
return m;
};
api::timestamp_type tstamp = api::timestamp_clock::now().time_since_epoch().count();
api::timestamp_type tstamp2 = tstamp - duration_cast<microseconds>(seconds(2L * 3600L)).count();
std::vector<shared_sstable> sstables;
// create 5 sstables
for (api::timestamp_type t = 0; t < 3; t++) {
auto key = partition_key::from_exploded(*s, {to_bytes("key" + to_sstring(t))});
auto mut = make_insert(std::move(key), t);
sstables.push_back(make_sstable_containing(sst_gen, {std::move(mut)}));
}
// Decrement the timestamp to simulate a timestamp in the past hour
for (api::timestamp_type t = 3; t < 5; t++) {
// And add progressively more cells into each sstable
auto key = partition_key::from_exploded(*s, {to_bytes("key" + to_sstring(t))});
auto mut = make_insert(std::move(key), t);
sstables.push_back(make_sstable_containing(sst_gen, {std::move(mut)}));
}
std::map<api::timestamp_type, std::vector<shared_sstable>> buckets;
// We'll put 3 sstables into the newest bucket
for (api::timestamp_type i = 0; i < 3; i++) {
auto bound = time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), tstamp);
buckets[bound].push_back(sstables[i]);
}
auto now = api::timestamp_clock::now().time_since_epoch().count();
auto new_bucket = time_window_compaction_strategy::newest_bucket(buckets, 4, 32, duration_cast<seconds>(hours(1)),
time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), now));
// incoming bucket should not be accepted when it has below the min threshold SSTables
BOOST_REQUIRE(new_bucket.empty());
now = api::timestamp_clock::now().time_since_epoch().count();
new_bucket = time_window_compaction_strategy::newest_bucket(buckets, 2, 32, duration_cast<seconds>(hours(1)),
time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), now));
// incoming bucket should be accepted when it is larger than the min threshold SSTables
// FIXME: enable check below once twcs passes min threshold to size tiered.
// BOOST_REQUIRE(!new_bucket.empty());
// And 2 into the second bucket (1 hour back)
for (api::timestamp_type i = 3; i < 5; i++) {
auto bound = time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), tstamp2);
buckets[bound].push_back(sstables[i]);
}
// "an sstable with a single value should have equal min/max timestamps"
for (auto& sst : sstables) {
BOOST_REQUIRE(sst->get_stats_metadata().min_timestamp == sst->get_stats_metadata().max_timestamp);
}
// Test trim
auto num_sstables = 40;
for (int r = 5; r < num_sstables; r++) {
auto key = partition_key::from_exploded(*s, {to_bytes("key" + to_sstring(r))});
std::vector<mutation> mutations;
for (int i = 0 ; i < r ; i++) {
mutations.push_back(make_insert(key, tstamp + r));
}
sstables.push_back(make_sstable_containing(sst_gen, std::move(mutations)));
}
// Reset the buckets, overfill it now
for (int i = 0 ; i < 40; i++) {
auto bound = time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)),
sstables[i]->get_stats_metadata().max_timestamp);
buckets[bound].push_back(sstables[i]);
}
now = api::timestamp_clock::now().time_since_epoch().count();
new_bucket = time_window_compaction_strategy::newest_bucket(buckets, 4, 32, duration_cast<seconds>(hours(1)),
time_window_compaction_strategy::get_window_lower_bound(duration_cast<seconds>(hours(1)), now));
// new bucket should be trimmed to max threshold of 32
BOOST_REQUIRE(new_bucket.size() == size_t(32));
});
}
SEASTAR_TEST_CASE(test_promoted_index_read) {
// create table promoted_index_read (
// pk int,