sstables: give a more descriptive name to compaction_options

the name compaction_options is confusing as it overlaps in meaning
with compaction_descriptor. hard to reason what are the exact
difference between them, without digging into the implementation.

compaction_options is intended to only carry options specific to
a give compaction type, like a mode for scrub, so let's rename
it to compaction_type_options to make it clearer for the
readers.

[avi: adjust for scrub changes]
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20210908003934.152054-1-raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2021-09-07 21:39:34 -03:00
committed by Avi Kivity
parent 389ef9316f
commit acba3bd3c4
14 changed files with 97 additions and 97 deletions

View File

@@ -1247,24 +1247,24 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
});
ss::scrub.set(r, wrap_ks_cf(ctx, [&snap_ctl] (http_context& ctx, std::unique_ptr<request> req, sstring keyspace, std::vector<sstring> column_families) {
auto scrub_mode = sstables::compaction_options::scrub::mode::abort;
auto scrub_mode = sstables::compaction_type_options::scrub::mode::abort;
const sstring scrub_mode_str = req_param<sstring>(*req, "scrub_mode", "");
if (scrub_mode_str == "") {
const auto skip_corrupted = req_param<bool>(*req, "skip_corrupted", false);
if (skip_corrupted) {
scrub_mode = sstables::compaction_options::scrub::mode::skip;
scrub_mode = sstables::compaction_type_options::scrub::mode::skip;
}
} else {
if (scrub_mode_str == "ABORT") {
scrub_mode = sstables::compaction_options::scrub::mode::abort;
scrub_mode = sstables::compaction_type_options::scrub::mode::abort;
} else if (scrub_mode_str == "SKIP") {
scrub_mode = sstables::compaction_options::scrub::mode::skip;
scrub_mode = sstables::compaction_type_options::scrub::mode::skip;
} else if (scrub_mode_str == "SEGREGATE") {
scrub_mode = sstables::compaction_options::scrub::mode::segregate;
scrub_mode = sstables::compaction_type_options::scrub::mode::segregate;
} else if (scrub_mode_str == "VALIDATE") {
scrub_mode = sstables::compaction_options::scrub::mode::validate;
scrub_mode = sstables::compaction_type_options::scrub::mode::validate;
} else {
throw std::invalid_argument(fmt::format("Unknown argument for 'scrub_mode' parameter: {}", scrub_mode_str));
}

View File

@@ -129,22 +129,22 @@ std::ostream& operator<<(std::ostream& os, compaction_type type) {
return os;
}
std::string_view to_string(compaction_options::scrub::mode scrub_mode) {
std::string_view to_string(compaction_type_options::scrub::mode scrub_mode) {
switch (scrub_mode) {
case compaction_options::scrub::mode::abort:
case compaction_type_options::scrub::mode::abort:
return "abort";
case compaction_options::scrub::mode::skip:
case compaction_type_options::scrub::mode::skip:
return "skip";
case compaction_options::scrub::mode::segregate:
case compaction_type_options::scrub::mode::segregate:
return "segregate";
case compaction_options::scrub::mode::validate:
case compaction_type_options::scrub::mode::validate:
return "validate";
}
on_internal_error_noexcept(clogger, format("Invalid scrub mode {}", int(scrub_mode)));
return "(invalid)";
}
std::ostream& operator<<(std::ostream& os, compaction_options::scrub::mode scrub_mode) {
std::ostream& operator<<(std::ostream& os, compaction_type_options::scrub::mode scrub_mode) {
return os << to_string(scrub_mode);
}
@@ -1171,9 +1171,9 @@ private:
}
public:
cleanup_compaction(column_family& cf, compaction_descriptor descriptor, compaction_options::cleanup opts)
cleanup_compaction(column_family& cf, compaction_descriptor descriptor, compaction_type_options::cleanup opts)
: cleanup_compaction(opts.db, cf, std::move(descriptor)) {}
cleanup_compaction(column_family& cf, compaction_descriptor descriptor, compaction_options::upgrade opts)
cleanup_compaction(column_family& cf, compaction_descriptor descriptor, compaction_type_options::upgrade opts)
: cleanup_compaction(opts.db, cf, std::move(descriptor)) {}
flat_mutation_reader make_sstable_reader() const override {
@@ -1273,14 +1273,14 @@ private:
class reader : public flat_mutation_reader::impl {
using skip = bool_class<class skip_tag>;
private:
compaction_options::scrub::mode _scrub_mode;
compaction_type_options::scrub::mode _scrub_mode;
flat_mutation_reader _reader;
mutation_fragment_stream_validator _validator;
bool _skip_to_next_partition = false;
private:
void maybe_abort_scrub() {
if (_scrub_mode == compaction_options::scrub::mode::abort) {
if (_scrub_mode == compaction_type_options::scrub::mode::abort) {
throw compaction_stop_exception(_schema->ks_name(), _schema->cf_name(), "scrub compaction found invalid data", false);
}
}
@@ -1311,7 +1311,7 @@ private:
skip on_invalid_partition(const dht::decorated_key& new_key) {
maybe_abort_scrub();
if (_scrub_mode == compaction_options::scrub::mode::segregate) {
if (_scrub_mode == compaction_type_options::scrub::mode::segregate) {
report_invalid_partition(compaction_type::Scrub, _validator, new_key, "Detected");
_validator.reset(new_key);
// Let the segregating interposer consumer handle this.
@@ -1330,7 +1330,7 @@ private:
// If the unexpected fragment is a partition end, we just drop it.
// The only case a partition end is invalid is when it comes after
// another partition end, and we can just drop it in that case.
if (!mf.is_end_of_partition() && _scrub_mode == compaction_options::scrub::mode::segregate) {
if (!mf.is_end_of_partition() && _scrub_mode == compaction_type_options::scrub::mode::segregate) {
report_invalid_mutation_fragment(compaction_type::Scrub, _validator, mf,
"Injecting partition start/end to segregate out-of-order fragment");
push_mutation_fragment(*_schema, _permit, partition_end{});
@@ -1397,7 +1397,7 @@ private:
}
public:
reader(flat_mutation_reader underlying, compaction_options::scrub::mode scrub_mode)
reader(flat_mutation_reader underlying, compaction_type_options::scrub::mode scrub_mode)
: impl(underlying.schema(), underlying.permit())
, _scrub_mode(scrub_mode)
, _reader(std::move(underlying))
@@ -1446,12 +1446,12 @@ private:
};
private:
compaction_options::scrub _options;
compaction_type_options::scrub _options;
std::string _scrub_start_description;
std::string _scrub_finish_description;
public:
scrub_compaction(column_family& cf, compaction_descriptor descriptor, compaction_options::scrub options)
scrub_compaction(column_family& cf, compaction_descriptor descriptor, compaction_type_options::scrub options)
: regular_compaction(cf, std::move(descriptor))
, _options(options)
, _scrub_start_description(fmt::format("Scrubbing in {} mode", _options.operation_mode))
@@ -1478,13 +1478,13 @@ public:
}
bool use_interposer_consumer() const override {
return _options.operation_mode == compaction_options::scrub::mode::segregate;
return _options.operation_mode == compaction_type_options::scrub::mode::segregate;
}
friend flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, compaction_options::scrub::mode scrub_mode);
friend flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, compaction_type_options::scrub::mode scrub_mode);
};
flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, compaction_options::scrub::mode scrub_mode) {
flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, compaction_type_options::scrub::mode scrub_mode) {
return make_flat_mutation_reader<scrub_compaction::reader>(std::move(rd), scrub_mode);
}
@@ -1606,7 +1606,7 @@ future<compaction_info> compaction::run(std::unique_ptr<compaction> c, GCConsume
});
}
compaction_type compaction_options::type() const {
compaction_type compaction_type_options::type() const {
// Maps options_variant indexes to the corresponding compaction_type member.
static const compaction_type index_to_type[] = {
compaction_type::Compaction,
@@ -1616,7 +1616,7 @@ compaction_type compaction_options::type() const {
compaction_type::Reshard,
compaction_type::Reshape,
};
static_assert(std::variant_size_v<compaction_options::options_variant> == std::size(index_to_type));
static_assert(std::variant_size_v<compaction_type_options::options_variant> == std::size(index_to_type));
return index_to_type[_options.index()];
}
@@ -1625,22 +1625,22 @@ static std::unique_ptr<compaction> make_compaction(column_family& cf, sstables::
column_family& cf;
sstables::compaction_descriptor&& descriptor;
std::unique_ptr<compaction> operator()(compaction_options::reshape) {
std::unique_ptr<compaction> operator()(compaction_type_options::reshape) {
return std::make_unique<reshape_compaction>(cf, std::move(descriptor));
}
std::unique_ptr<compaction> operator()(compaction_options::reshard) {
std::unique_ptr<compaction> operator()(compaction_type_options::reshard) {
return std::make_unique<resharding_compaction>(cf, std::move(descriptor));
}
std::unique_ptr<compaction> operator()(compaction_options::regular) {
std::unique_ptr<compaction> operator()(compaction_type_options::regular) {
return std::make_unique<regular_compaction>(cf, std::move(descriptor));
}
std::unique_ptr<compaction> operator()(compaction_options::cleanup options) {
std::unique_ptr<compaction> operator()(compaction_type_options::cleanup options) {
return std::make_unique<cleanup_compaction>(cf, std::move(descriptor), std::move(options));
}
std::unique_ptr<compaction> operator()(compaction_options::upgrade options) {
std::unique_ptr<compaction> operator()(compaction_type_options::upgrade options) {
return std::make_unique<cleanup_compaction>(cf, std::move(descriptor), std::move(options));
}
std::unique_ptr<compaction> operator()(compaction_options::scrub scrub_options) {
std::unique_ptr<compaction> operator()(compaction_type_options::scrub scrub_options) {
return std::make_unique<scrub_compaction>(cf, std::move(descriptor), scrub_options);
}
} visitor_factory{cf, std::move(descriptor)};
@@ -1742,7 +1742,7 @@ compact_sstables(sstables::compaction_descriptor descriptor, column_family& cf)
compaction_name(descriptor.options.type()), cf.schema()->ks_name(), cf.schema()->cf_name())));
}
if (descriptor.options.type() == compaction_type::Scrub
&& std::get<compaction_options::scrub>(descriptor.options.options()).operation_mode == compaction_options::scrub::mode::validate) {
&& std::get<compaction_type_options::scrub>(descriptor.options.options()).operation_mode == compaction_type_options::scrub::mode::validate) {
// Bypass the usual compaction machinery for dry-mode scrub
return scrub_sstables_validate_mode(std::move(descriptor), cf);
}

View File

@@ -111,7 +111,7 @@ namespace sstables {
get_fully_expired_sstables(column_family& cf, const std::vector<sstables::shared_sstable>& compacting, gc_clock::time_point gc_before);
// For tests, can drop after we virtualize sstables.
flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, compaction_options::scrub::mode scrub_mode);
flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, compaction_type_options::scrub::mode scrub_mode);
// For tests, can drop after we virtualize sstables.
future<bool> scrub_validate_mode_validate_reader(flat_mutation_reader rd, const compaction_info& info);

View File

@@ -62,7 +62,7 @@ using compaction_sstable_creator_fn = std::function<shared_sstable(shard_id shar
// Replaces old sstable(s) by new one(s) which contain all non-expired data.
using compaction_sstable_replacer_fn = std::function<void(compaction_completion_desc)>;
class compaction_options {
class compaction_type_options {
public:
struct regular {
};
@@ -92,32 +92,32 @@ private:
options_variant _options;
private:
explicit compaction_options(options_variant options) : _options(std::move(options)) {
explicit compaction_type_options(options_variant options) : _options(std::move(options)) {
}
public:
static compaction_options make_reshape() {
return compaction_options(reshape{});
static compaction_type_options make_reshape() {
return compaction_type_options(reshape{});
}
static compaction_options make_reshard() {
return compaction_options(reshard{});
static compaction_type_options make_reshard() {
return compaction_type_options(reshard{});
}
static compaction_options make_regular() {
return compaction_options(regular{});
static compaction_type_options make_regular() {
return compaction_type_options(regular{});
}
static compaction_options make_cleanup(database& db) {
return compaction_options(cleanup{db});
static compaction_type_options make_cleanup(database& db) {
return compaction_type_options(cleanup{db});
}
static compaction_options make_upgrade(database& db) {
return compaction_options(upgrade{db});
static compaction_type_options make_upgrade(database& db) {
return compaction_type_options(upgrade{db});
}
static compaction_options make_scrub(scrub::mode mode) {
return compaction_options(scrub{mode});
static compaction_type_options make_scrub(scrub::mode mode) {
return compaction_type_options(scrub{mode});
}
template <typename... Visitor>
@@ -130,8 +130,8 @@ public:
compaction_type type() const;
};
std::string_view to_string(compaction_options::scrub::mode);
std::ostream& operator<<(std::ostream& os, compaction_options::scrub::mode scrub_mode);
std::string_view to_string(compaction_type_options::scrub::mode);
std::ostream& operator<<(std::ostream& os, compaction_type_options::scrub::mode scrub_mode);
struct compaction_descriptor {
// List of sstables to be compacted.
@@ -149,7 +149,7 @@ struct compaction_descriptor {
std::function<void(const std::vector<shared_sstable>& exhausted_sstables)> release_exhausted;
// The options passed down to the compaction code.
// This also selects the kind of compaction to do.
compaction_options options = compaction_options::make_regular();
compaction_type_options options = compaction_type_options::make_regular();
compaction_sstable_creator_fn creator;
compaction_sstable_replacer_fn replacer;
@@ -167,7 +167,7 @@ struct compaction_descriptor {
int level = default_level,
uint64_t max_sstable_bytes = default_max_sstable_bytes,
utils::UUID run_identifier = utils::make_random_uuid(),
compaction_options options = compaction_options::make_regular())
compaction_type_options options = compaction_type_options::make_regular())
: sstables(std::move(sstables))
, all_sstables_snapshot(std::move(all_sstables_snapshot))
, level(level)

View File

@@ -686,7 +686,7 @@ inline bool compaction_manager::check_for_cleanup(column_family* cf) {
return false;
}
future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compaction_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
auto task = make_lw_shared<compaction_manager::task>();
task->compacting_cf = cf;
task->type = options.type();
@@ -797,7 +797,7 @@ future<> compaction_manager::perform_sstable_scrub_validate_mode(column_family*
sst->get_sstable_level(),
sstables::compaction_descriptor::default_max_sstable_bytes,
sst->run_identifier(),
sstables::compaction_options::make_scrub(sstables::compaction_options::scrub::mode::validate));
sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate));
return compact_sstables(std::move(desc), cf);
});
} catch (sstables::compaction_stop_exception&) {
@@ -858,7 +858,7 @@ future<> compaction_manager::perform_cleanup(database& db, column_family* cf) {
});
return sstables;
}).then([this, cf, &db] (std::vector<sstables::shared_sstable> sstables) {
return rewrite_sstables(cf, sstables::compaction_options::make_cleanup(db),
return rewrite_sstables(cf, sstables::compaction_type_options::make_cleanup(db),
[sstables = std::move(sstables)] (const table&) { return sstables; });
});
}
@@ -890,7 +890,7 @@ future<> compaction_manager::perform_sstable_upgrade(database& db, column_family
// Note that we potentially could be doing multiple
// upgrades here in parallel, but that is really the users
// problem.
return rewrite_sstables(cf, sstables::compaction_options::make_upgrade(db), [&](auto&) mutable {
return rewrite_sstables(cf, sstables::compaction_type_options::make_upgrade(db), [&](auto&) mutable {
return std::exchange(tables, {});
});
});
@@ -898,15 +898,15 @@ future<> compaction_manager::perform_sstable_upgrade(database& db, column_family
}
// Submit a column family to be scrubbed and wait for its termination.
future<> compaction_manager::perform_sstable_scrub(column_family* cf, sstables::compaction_options::scrub::mode scrub_mode) {
if (scrub_mode == sstables::compaction_options::scrub::mode::validate) {
future<> compaction_manager::perform_sstable_scrub(column_family* cf, sstables::compaction_type_options::scrub::mode scrub_mode) {
if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) {
return perform_sstable_scrub_validate_mode(cf);
}
// since we might potentially have ongoing compactions, and we
// must ensure that all sstables created before we run are scrubbed,
// we need to barrier out any previously running compaction.
return cf->run_with_compaction_disabled([this, cf, scrub_mode] {
return rewrite_sstables(cf, sstables::compaction_options::make_scrub(scrub_mode), [this] (const table& cf) {
return rewrite_sstables(cf, sstables::compaction_type_options::make_scrub(scrub_mode), [this] (const table& cf) {
return get_candidates(cf);
}, can_purge_tombstones::no);
});

View File

@@ -174,7 +174,7 @@ private:
class can_purge_tombstones_tag;
using can_purge_tombstones = bool_class<can_purge_tombstones_tag>;
future<> rewrite_sstables(column_family* cf, sstables::compaction_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes);
future<> rewrite_sstables(column_family* cf, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes);
future<> stop_ongoing_compactions(sstring reason);
optimized_optional<abort_source::subscription> _early_abort_subscription;
@@ -222,7 +222,7 @@ public:
future<> perform_sstable_upgrade(database& db, column_family* cf, bool exclude_current_version);
// Submit a column family to be scrubbed and wait for its termination.
future<> perform_sstable_scrub(column_family* cf, sstables::compaction_options::scrub::mode scrub_mode);
future<> perform_sstable_scrub(column_family* cf, sstables::compaction_type_options::scrub::mode scrub_mode);
// Submit a column family for major compaction.
future<> submit_major_compaction(column_family* cf);

View File

@@ -159,7 +159,7 @@ leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input
// This is really unexpected, so we'll just compact it all to fix it
compaction_descriptor desc(std::move(input), std::optional<sstables::sstable_set>(), iop, leveled_manifest::MAX_LEVELS - 1, max_sstable_size_in_bytes);
desc.options = compaction_options::make_reshape();
desc.options = compaction_type_options::make_reshape();
return desc;
}
level_info[sst_level].push_back(sst);
@@ -199,7 +199,7 @@ leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input
leveled_manifest::logger.info("Reshaping {} disjoint sstables in level 0 into level {}", level_info[0].size(), ideal_level);
compaction_descriptor desc(std::move(input), std::optional<sstables::sstable_set>(), iop, ideal_level, max_sstable_size_in_bytes);
desc.options = compaction_options::make_reshape();
desc.options = compaction_type_options::make_reshape();
return desc;
}
@@ -219,7 +219,7 @@ leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input
leveled_manifest::logger.warn("Turns out that level {} is not disjoint, found {} overlapping SSTables, so compacting everything on behalf of {}.{}", level, overlapping_sstables, schema->ks_name(), schema->cf_name());
// Unfortunately no good limit to limit input size to max_sstables for LCS major
compaction_descriptor desc(std::move(input), std::optional<sstables::sstable_set>(), iop, max_filled_level, max_sstable_size_in_bytes);
desc.options = compaction_options::make_reshape();
desc.options = compaction_type_options::make_reshape();
return desc;
}
}

View File

@@ -256,7 +256,7 @@ size_tiered_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
// which is possible because partitioned set is able to incrementally open sstables during compaction
if (sstable_set_overlapping_count(schema, input) <= max_sstables) {
compaction_descriptor desc(std::move(input), std::optional<sstables::sstable_set>(), iop);
desc.options = compaction_options::make_reshape();
desc.options = compaction_type_options::make_reshape();
return desc;
}
}
@@ -272,7 +272,7 @@ size_tiered_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
bucket.resize(max_sstables);
}
compaction_descriptor desc(std::move(bucket), std::optional<sstables::sstable_set>(), iop);
desc.options = compaction_options::make_reshape();
desc.options = compaction_type_options::make_reshape();
return desc;
}
}

View File

@@ -160,7 +160,7 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
// Everything that spans multiple windows will need reshaping
multi_window.resize(std::min(multi_window.size(), max_sstables));
compaction_descriptor desc(std::move(multi_window), std::optional<sstables::sstable_set>(), iop);
desc.options = compaction_options::make_reshape();
desc.options = compaction_type_options::make_reshape();
return desc;
}
@@ -184,7 +184,7 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
ssts.resize(std::min(ssts.size(), max_sstables));
}
compaction_descriptor desc(std::move(ssts), std::optional<sstables::sstable_set>(), iop);
desc.options = compaction_options::make_reshape();
desc.options = compaction_type_options::make_reshape();
return desc;
}
}

View File

@@ -123,10 +123,10 @@ void cf_prop_defs::validate(const database& db, const schema::extensions_map& sc
std::throw_with_nested(exceptions::configuration_exception("Invalid table id"));
}
auto compaction_options = get_compaction_options();
if (!compaction_options.empty()) {
auto strategy = compaction_options.find(COMPACTION_STRATEGY_CLASS_KEY);
if (strategy == compaction_options.end()) {
auto compaction_type_options = get_compaction_type_options();
if (!compaction_type_options.empty()) {
auto strategy = compaction_type_options.find(COMPACTION_STRATEGY_CLASS_KEY);
if (strategy == compaction_type_options.end()) {
throw exceptions::configuration_exception(sstring("Missing sub-option '") + COMPACTION_STRATEGY_CLASS_KEY + "' for the '" + KW_COMPACTION + "' option.");
}
_compaction_strategy_class = sstables::compaction_strategy::type(strategy->second);
@@ -171,10 +171,10 @@ void cf_prop_defs::validate(const database& db, const schema::extensions_map& sc
speculative_retry::from_sstring(get_string(KW_SPECULATIVE_RETRY, speculative_retry(speculative_retry::type::NONE, 0).to_sstring()));
}
std::map<sstring, sstring> cf_prop_defs::get_compaction_options() const {
auto compaction_options = get_map(KW_COMPACTION);
if (compaction_options ) {
return compaction_options.value();
std::map<sstring, sstring> cf_prop_defs::get_compaction_type_options() const {
auto compaction_type_options = get_map(KW_COMPACTION);
if (compaction_type_options ) {
return compaction_type_options.value();
}
return std::map<sstring, sstring>{};
}
@@ -258,16 +258,16 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_
std::optional<sstring> tmp_value = {};
if (has_property(KW_COMPACTION)) {
if (get_compaction_options().contains(KW_MINCOMPACTIONTHRESHOLD)) {
tmp_value = get_compaction_options().at(KW_MINCOMPACTIONTHRESHOLD);
if (get_compaction_type_options().contains(KW_MINCOMPACTIONTHRESHOLD)) {
tmp_value = get_compaction_type_options().at(KW_MINCOMPACTIONTHRESHOLD);
}
}
int min_compaction_threshold = to_int(KW_MINCOMPACTIONTHRESHOLD, tmp_value, builder.get_min_compaction_threshold());
tmp_value = {};
if (has_property(KW_COMPACTION)) {
if (get_compaction_options().contains(KW_MAXCOMPACTIONTHRESHOLD)) {
tmp_value = get_compaction_options().at(KW_MAXCOMPACTIONTHRESHOLD);
if (get_compaction_type_options().contains(KW_MAXCOMPACTIONTHRESHOLD)) {
tmp_value = get_compaction_type_options().at(KW_MAXCOMPACTIONTHRESHOLD);
}
}
int max_compaction_threshold = to_int(KW_MAXCOMPACTIONTHRESHOLD, tmp_value, builder.get_max_compaction_threshold());
@@ -278,8 +278,8 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_
builder.set_max_compaction_threshold(max_compaction_threshold);
if (has_property(KW_COMPACTION)) {
if (get_compaction_options().contains(COMPACTION_ENABLED_KEY)) {
auto enabled = boost::algorithm::iequals(get_compaction_options().at(COMPACTION_ENABLED_KEY), "true");
if (get_compaction_type_options().contains(COMPACTION_ENABLED_KEY)) {
auto enabled = boost::algorithm::iequals(get_compaction_type_options().at(COMPACTION_ENABLED_KEY), "true");
builder.set_compaction_enabled(enabled);
}
}
@@ -306,7 +306,7 @@ void cf_prop_defs::apply_to_builder(schema_builder& builder, schema::extensions_
if (_compaction_strategy_class) {
builder.set_compaction_strategy(*_compaction_strategy_class);
builder.set_compaction_strategy_options(get_compaction_options());
builder.set_compaction_strategy_options(get_compaction_type_options());
}
builder.set_bloom_filter_fp_chance(get_double(KW_BF_FP_CHANCE, builder.get_bloom_filter_fp_chance()));
@@ -349,9 +349,9 @@ std::optional<sstables::compaction_strategy_type> cf_prop_defs::get_compaction_s
if (_compaction_strategy_class) {
return _compaction_strategy_class;
}
auto compaction_options = get_compaction_options();
auto strategy = compaction_options.find(COMPACTION_STRATEGY_CLASS_KEY);
if (strategy != compaction_options.end()) {
auto compaction_type_options = get_compaction_type_options();
auto strategy = compaction_type_options.find(COMPACTION_STRATEGY_CLASS_KEY);
if (strategy != compaction_type_options.end()) {
return sstables::compaction_strategy::type(strategy->second);
}
return std::nullopt;

View File

@@ -98,7 +98,7 @@ public:
schema::extensions_map make_schema_extensions(const db::extensions& exts) const;
void validate(const database& db, const schema::extensions_map& schema_extensions) const;
std::map<sstring, sstring> get_compaction_options() const;
std::map<sstring, sstring> get_compaction_type_options() const;
std::optional<std::map<sstring, sstring>> get_compression_options() const;
const cdc::options* get_cdc_options(const schema::extensions_map&) const;
std::optional<caching_options> get_caching_options() const;

View File

@@ -410,7 +410,7 @@ sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager&
return parallel_for_each(buckets, [this, iop, &cm, &table, creator = std::move(creator)] (std::vector<sstables::shared_sstable>& sstlist) mutable {
return cm.run_custom_job(&table, compaction_type::Reshard, [this, iop, &cm, &table, creator, &sstlist] () {
sstables::compaction_descriptor desc(sstlist, {}, iop);
desc.options = sstables::compaction_options::make_reshard();
desc.options = sstables::compaction_type_options::make_reshard();
desc.creator = std::move(creator);
return sstables::compact_sstables(std::move(desc), table).then([this, &sstlist] (sstables::compaction_info result) {

View File

@@ -2046,7 +2046,7 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) {
cf->start();
auto descriptor = sstables::compaction_descriptor({std::move(sst)}, cf->get_sstable_set(), default_priority_class(), compaction_descriptor::default_level,
compaction_descriptor::default_max_sstable_bytes, run_identifier, compaction_options::make_cleanup(db));
compaction_descriptor::default_max_sstable_bytes, run_identifier, compaction_type_options::make_cleanup(db));
auto ret = compact_sstables(std::move(descriptor), *cf, sst_gen).get0();
BOOST_REQUIRE(ret.total_keys_written == total_partitions);
@@ -2222,7 +2222,7 @@ SEASTAR_TEST_CASE(sstable_scrub_validate_mode_test) {
testlog.info("Validate");
// No way to really test validation besides observing the log messages.
compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::validate).get();
compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_type_options::scrub::mode::validate).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(table->in_strategy_sstables().front() == sst);
@@ -2418,7 +2418,7 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) {
testlog.info("Scrub in abort mode");
// We expect the scrub with mode=srub::mode::abort to stop on the first invalid fragment.
compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::abort).get();
compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_type_options::scrub::mode::abort).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
verify_fragments(sst, corrupt_fragments);
@@ -2426,7 +2426,7 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) {
testlog.info("Scrub in skip mode");
// We expect the scrub with mode=srub::mode::skip to get rid of all invalid data.
compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::skip).get();
compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_type_options::scrub::mode::skip).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(table->in_strategy_sstables().front() != sst);
@@ -2515,7 +2515,7 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) {
testlog.info("Scrub in abort mode");
// We expect the scrub with mode=srub::mode::abort to stop on the first invalid fragment.
compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::abort).get();
compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_type_options::scrub::mode::abort).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
verify_fragments(sst, corrupt_fragments);
@@ -2523,7 +2523,7 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) {
testlog.info("Scrub in segregate mode");
// We expect the scrub with mode=srub::mode::segregate to fix all out-of-order data.
compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::segregate).get();
compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_type_options::scrub::mode::segregate).get();
testlog.info("Scrub resulted in {} sstables", table->in_strategy_sstables().size());
BOOST_REQUIRE(table->in_strategy_sstables().size() > 1);
@@ -2630,7 +2630,7 @@ SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) {
std::list<std::deque<mutation_fragment>> segregated_fragment_streams;
mutation_writer::segregate_by_partition(make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(all_fragments)),
sstables::compaction_options::scrub::mode::segregate), [&schema, &segregated_fragment_streams] (flat_mutation_reader rd) {
sstables::compaction_type_options::scrub::mode::segregate), [&schema, &segregated_fragment_streams] (flat_mutation_reader rd) {
return async([&schema, &segregated_fragment_streams, rd = std::move(rd)] () mutable {
auto close = deferred_close(rd);
auto& fragments = segregated_fragment_streams.emplace_back();
@@ -2769,7 +2769,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
scrubbed_fragments.emplace_back(*schema, permit, partition_end{}); // missing partition-end - at EOS
auto r = assert_that(make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(corrupt_fragments)),
compaction_options::scrub::mode::skip));
compaction_type_options::scrub::mode::skip));
for (const auto& mf : scrubbed_fragments) {
testlog.info("Expecting {}", mutation_fragment::printer(*schema, mf));
r.produces(*schema, mf);

View File

@@ -88,7 +88,7 @@ void run_sstable_resharding_test() {
uint64_t bloom_filter_size_before = file_size(filter_fname).get0();
auto descriptor = sstables::compaction_descriptor({sst}, std::nullopt, default_priority_class(), 0, std::numeric_limits<uint64_t>::max());
descriptor.options = sstables::compaction_options::make_reshard();
descriptor.options = sstables::compaction_type_options::make_reshard();
descriptor.creator = [&env, &cf, &tmp, version] (shard_id shard) mutable {
// we need generation calculated by instance of cf at requested shard,
// or resource usage wouldn't be fairly distributed among shards.