sstables: define run_identifier as a strong tagged_uuid type

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes #11321
This commit is contained in:
Benny Halevy
2022-08-18 12:53:46 +03:00
committed by Avi Kivity
parent 35fbba3a5b
commit 7747b8fa33
16 changed files with 64 additions and 42 deletions

View File

@@ -451,7 +451,7 @@ protected:
bool _contains_multi_fragment_runs = false;
mutation_source_metadata _ms_metadata = {};
compaction_sstable_replacer_fn _replacer;
utils::UUID _run_identifier;
run_id _run_identifier;
::io_priority_class _io_priority;
// optional clone of sstable set to be used for expiration purposes, so it will be set if expiration is enabled.
std::optional<sstable_set> _sstable_set;
@@ -489,7 +489,7 @@ protected:
for (auto& sst : _sstables) {
_stats_collector.update(sst->get_encoding_stats_for_compaction());
}
std::unordered_set<utils::UUID> ssts_run_ids;
std::unordered_set<run_id> ssts_run_ids;
_contains_multi_fragment_runs = std::any_of(_sstables.begin(), _sstables.end(), [&ssts_run_ids] (shared_sstable& sst) {
return !ssts_run_ids.insert(sst->run_identifier()).second;
});
@@ -1509,7 +1509,7 @@ class resharding_compaction final : public compaction {
uint64_t estimated_partitions = 0;
};
std::vector<estimated_values> _estimation_per_shard;
std::vector<utils::UUID> _run_identifiers;
std::vector<run_id> _run_identifiers;
private:
// return estimated partitions per sstable for a given shard
uint64_t partitions_per_sstable(shard_id s) const {
@@ -1533,7 +1533,7 @@ public:
}
}
for (auto i : boost::irange(0u, smp::count)) {
_run_identifiers[i] = utils::make_random_uuid();
_run_identifiers[i] = run_id::create_random_id();
}
}
@@ -1816,7 +1816,7 @@ get_fully_expired_sstables(const table_state& table_s, const std::vector<sstable
}
unsigned compaction_descriptor::fan_in() const {
return boost::copy_range<std::unordered_set<utils::UUID>>(sstables | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::run_identifier))).size();
return boost::copy_range<std::unordered_set<run_id>>(sstables | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::run_identifier))).size();
}
uint64_t compaction_descriptor::sstables_size() const {

View File

@@ -14,7 +14,7 @@
#include <variant>
#include <seastar/core/smp.hh>
#include <seastar/core/file.hh>
#include "sstables/shared_sstable.hh"
#include "sstables/types_fwd.hh"
#include "sstables/sstable_set.hh"
#include "utils/UUID.hh"
#include "dht/i_partitioner.hh"
@@ -154,7 +154,7 @@ struct compaction_descriptor {
// Threshold size for sstable(s) to be created.
uint64_t max_sstable_bytes;
// Run identifier of output sstables.
utils::UUID run_identifier;
sstables::run_id run_identifier;
// The options passed down to the compaction code.
// This also selects the kind of compaction to do.
compaction_type_options options = compaction_type_options::make_regular();
@@ -176,7 +176,7 @@ struct compaction_descriptor {
::io_priority_class io_priority,
int level = default_level,
uint64_t max_sstable_bytes = default_max_sstable_bytes,
utils::UUID run_identifier = utils::make_random_uuid(),
run_id run_identifier = run_id::create_random_id(),
compaction_type_options options = compaction_type_options::make_regular())
: sstables(std::move(sstables))
, level(level)
@@ -192,7 +192,7 @@ struct compaction_descriptor {
: sstables(std::move(sstables))
, level(default_level)
, max_sstable_bytes(default_max_sstable_bytes)
, run_identifier(utils::make_random_uuid())
, run_identifier(run_id::create_random_id())
, options(compaction_type_options::make_regular())
, io_priority(io_priority)
, has_only_fully_expired(has_only_fully_expired)

View File

@@ -206,7 +206,7 @@ std::vector<sstables::shared_sstable> compaction_manager::get_candidates(compact
candidates.reserve(t.main_sstable_set().all()->size());
// prevents sstables that belongs to a partial run being generated by ongoing compaction from being
// selected for compaction, which could potentially result in wrong behavior.
auto partial_run_identifiers = boost::copy_range<std::unordered_set<utils::UUID>>(_tasks
auto partial_run_identifiers = boost::copy_range<std::unordered_set<sstables::run_id>>(_tasks
| boost::adaptors::filtered(std::mem_fn(&task::generating_output_run))
| boost::adaptors::transformed(std::mem_fn(&task::output_run_id)));
auto& cs = t.get_compaction_strategy();
@@ -611,7 +611,7 @@ future<semaphore_units<named_semaphore_exception_factory>> compaction_manager::t
});
}
void compaction_manager::task::setup_new_compaction(utils::UUID output_run_id) {
void compaction_manager::task::setup_new_compaction(sstables::run_id output_run_id) {
_compaction_data = create_compaction_data();
_output_run_identifier = output_run_id;
switch_state(state::active);
@@ -619,7 +619,7 @@ void compaction_manager::task::setup_new_compaction(utils::UUID output_run_id) {
void compaction_manager::task::finish_compaction(state finish_state) noexcept {
switch_state(finish_state);
_output_run_identifier = utils::null_uuid();
_output_run_identifier = sstables::run_id::create_null_id();
if (finish_state != state::failed) {
_compaction_retry.reset();
}
@@ -1010,7 +1010,7 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction::
auto num_runs_for_compaction = [&, this] {
auto& cs = t.get_compaction_strategy();
auto desc = cs.get_sstables_for_compaction(t, get_strategy_control(), get_candidates(t));
return boost::copy_range<std::unordered_set<utils::UUID>>(
return boost::copy_range<std::unordered_set<sstables::run_id>>(
desc.sstables
| boost::adaptors::transformed(std::mem_fn(&sstables::sstable::run_identifier))).size();
};

View File

@@ -110,7 +110,7 @@ public:
shared_future<compaction_stats_opt> _compaction_done = make_ready_future<compaction_stats_opt>();
exponential_backoff_retry _compaction_retry = exponential_backoff_retry(std::chrono::seconds(5), std::chrono::seconds(300));
sstables::compaction_type _type;
utils::UUID _output_run_identifier;
sstables::run_id _output_run_identifier;
gate::holder _gate_holder;
sstring _description;
@@ -134,7 +134,7 @@ public:
// Return true if the task isn't stopped
// and the compaction manager allows proceeding.
inline bool can_proceed(throw_if_stopping do_throw_if_stopping = throw_if_stopping::no) const;
void setup_new_compaction(utils::UUID output_run_id = utils::null_uuid());
void setup_new_compaction(sstables::run_id output_run_id = sstables::run_id::create_null_id());
void finish_compaction(state finish_state = state::done) noexcept;
// Compaction manager stop itself if it finds an storage I/O error which results in
@@ -179,7 +179,7 @@ public:
bool generating_output_run() const noexcept {
return compaction_running() && _output_run_identifier;
}
const utils::UUID& output_run_id() const noexcept {
const sstables::run_id& output_run_id() const noexcept {
return _output_run_identifier;
}

View File

@@ -590,7 +590,7 @@ private:
size_t promoted_index_block_size;
size_t promoted_index_auto_scale_threshold;
} _pi_write_m;
utils::UUID _run_identifier;
run_id _run_identifier;
bool _write_regular_as_static; // See #4139
scylla_metadata::large_data_stats _large_data_stats;

View File

@@ -129,8 +129,8 @@ sstable_set::select_sstable_runs(const std::vector<shared_sstable>& sstables) co
std::vector<sstable_run>
partitioned_sstable_set::select_sstable_runs(const std::vector<shared_sstable>& sstables) const {
auto has_run = [this] (const shared_sstable& sst) { return _all_runs.contains(sst->run_identifier()); };
auto run_ids = boost::copy_range<std::unordered_set<utils::UUID>>(sstables | boost::adaptors::filtered(has_run) | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier)));
return boost::copy_range<std::vector<sstable_run>>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) {
auto run_ids = boost::copy_range<std::unordered_set<sstables::run_id>>(sstables | boost::adaptors::filtered(has_run) | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier)));
return boost::copy_range<std::vector<sstable_run>>(run_ids | boost::adaptors::transformed([this] (sstables::run_id run_id) {
return _all_runs.at(run_id);
}));
}
@@ -259,7 +259,7 @@ partitioned_sstable_set::partitioned_sstable_set(schema_ptr schema, bool use_lev
}
partitioned_sstable_set::partitioned_sstable_set(schema_ptr schema, const std::vector<shared_sstable>& unleveled_sstables, const interval_map_type& leveled_sstables,
const lw_shared_ptr<sstable_list>& all, const std::unordered_map<utils::UUID, sstable_run>& all_runs, bool use_level_metadata)
const lw_shared_ptr<sstable_list>& all, const std::unordered_map<run_id, sstable_run>& all_runs, bool use_level_metadata)
: _schema(schema)
, _unleveled_sstables(unleveled_sstables)
, _leveled_sstables(leveled_sstables)

View File

@@ -13,6 +13,7 @@
#include "compatible_ring_position.hh"
#include "sstable_set.hh"
#include "readers/clustering_combined.hh"
#include "sstables/types_fwd.hh"
namespace sstables {
@@ -59,7 +60,7 @@ private:
std::vector<shared_sstable> _unleveled_sstables;
interval_map_type _leveled_sstables;
lw_shared_ptr<sstable_list> _all;
std::unordered_map<utils::UUID, sstable_run> _all_runs;
std::unordered_map<run_id, sstable_run> _all_runs;
// Change counter on interval map for leveled sstables which is used by
// incremental selector to determine whether or not to invalidate iterators.
uint64_t _leveled_sstables_change_cnt = 0;
@@ -86,7 +87,7 @@ public:
const std::vector<shared_sstable>& unleveled_sstables,
const interval_map_type& leveled_sstables,
const lw_shared_ptr<sstable_list>& all,
const std::unordered_map<utils::UUID, sstable_run>& all_runs,
const std::unordered_map<run_id, sstable_run>& all_runs,
bool use_level_metadata);
virtual std::unique_ptr<sstable_set_impl> clone() const override;

View File

@@ -1312,7 +1312,7 @@ future<> sstable::update_info_for_opened_data() {
}).then([this] {
this->set_position_range();
this->set_first_and_last_keys();
_run_identifier = _components->scylla_metadata->get_optional_run_identifier().value_or(utils::make_random_uuid());
_run_identifier = _components->scylla_metadata->get_optional_run_identifier().value_or(run_id::create_random_id());
// Get disk usage for this sstable (includes all components).
_bytes_on_disk = 0;

View File

@@ -108,7 +108,7 @@ struct sstable_writer_config {
std::optional<db::replay_position> replay_position;
std::optional<int> sstable_level;
write_monitor* monitor = &default_write_monitor();
utils::UUID run_identifier = utils::make_random_uuid();
run_id run_identifier = run_id::create_random_id();
size_t summary_byte_cost;
sstring origin;
@@ -503,7 +503,7 @@ private:
std::vector<unsigned> _shards;
std::optional<dht::decorated_key> _first;
std::optional<dht::decorated_key> _last;
utils::UUID _run_identifier;
run_id _run_identifier;
utils::observable<sstable&> _on_closed;
lw_shared_ptr<file_input_stream_history> _single_partition_history = make_lw_shared<file_input_stream_history>();
@@ -713,7 +713,7 @@ public:
return _components->scylla_metadata ? &*_components->scylla_metadata : nullptr;
}
utils::UUID run_identifier() const {
run_id run_identifier() const {
return _run_identifier;
}
@@ -812,7 +812,7 @@ public:
void set_sstable_level(uint32_t);
void generate_new_run_identifier() {
_run_identifier = utils::make_random_uuid();
_run_identifier = run_id::create_random_id();
}
double get_compression_ratio() const;

View File

@@ -26,6 +26,7 @@
#include "encoding_stats.hh"
#include "utils/UUID.hh"
#include "locator/host_id.hh"
#include "types_fwd.hh"
// While the sstable code works with char, bytes_view works with int8_t
// (signed char). Rather than change all the code, let's do a cast.
@@ -530,10 +531,12 @@ enum class scylla_metadata_type : uint32_t {
ScyllaVersion = 8,
};
// UUID is used for uniqueness across nodes, such that an imported sstable
// will not have its run identifier conflicted with the one of a local sstable.
struct run_identifier {
// UUID is used for uniqueness across nodes, such that an imported sstable
// will not have its run identifier conflicted with the one of a local sstable.
utils::UUID id;
run_id id;
template <typename Describer>
auto describe_type(sstable_version_types v, Describer f) { return f(id); }
@@ -598,7 +601,7 @@ struct scylla_metadata {
}
return *ext;
}
std::optional<utils::UUID> get_optional_run_identifier() const {
std::optional<run_id> get_optional_run_identifier() const {
auto* m = data.get<scylla_metadata_type::RunIdentifier, run_identifier>();
return m ? std::make_optional(m->id) : std::nullopt;
}

18
sstables/types_fwd.hh Normal file
View File

@@ -0,0 +1,18 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "shared_sstable.hh"
#include "utils/UUID.hh"
namespace sstables {
using run_id = utils::tagged_uuid<struct run_id_tag>;
} // namespace sstables

View File

@@ -3131,7 +3131,7 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) {
if (desc.sstables.empty()) {
return {};
}
std::unordered_set<utils::UUID> run_ids;
std::unordered_set<sstables::run_id> run_ids;
bool incremental_enabled = std::any_of(desc.sstables.begin(), desc.sstables.end(), [&run_ids] (shared_sstable& sst) {
return !run_ids.insert(sst->run_identifier()).second;
});
@@ -3311,7 +3311,7 @@ SEASTAR_TEST_CASE(partial_sstable_run_filtered_out_test) {
cf->start();
cf->mark_ready_for_writes();
utils::UUID partial_sstable_run_identifier = utils::make_random_uuid();
sstables::run_id partial_sstable_run_identifier = sstables::run_id::create_random_id();
mutation mut(s, partition_key::from_exploded(*s, {to_bytes("alpha")}));
mut.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), 0);
@@ -3607,7 +3607,7 @@ SEASTAR_TEST_CASE(incremental_compaction_data_resurrection_test) {
// make ssts belong to same run for compaction to enable incremental approach.
// That needs to happen after fragments were inserted into sstable_set, as they'll placed into different runs due to detected overlapping.
utils::UUID run_id = utils::make_random_uuid();
auto run_id = sstables::run_id::create_random_id();
sstables::test(non_expired_sst).set_run_identifier(run_id);
sstables::test(expired_sst).set_run_identifier(run_id);
@@ -3841,7 +3841,7 @@ SEASTAR_TEST_CASE(test_bug_6472) {
make_sstable_containing(sst_gen, muts),
make_sstable_containing(sst_gen, muts),
};
utils::UUID run_id = utils::make_random_uuid();
sstables::run_id run_id = sstables::run_id::create_random_id();
for (auto& sst : sstables_spanning_many_windows) {
sstables::test(sst).set_run_identifier(run_id);
}

View File

@@ -2700,7 +2700,7 @@ SEASTAR_TEST_CASE(sstable_run_identifier_correctness) {
auto tmp = tmpdir();
sstable_writer_config cfg = env.manager().configure_writer();
cfg.run_identifier = utils::make_random_uuid();
cfg.run_identifier = sstables::run_id::create_random_id();
auto sst = make_sstable_easy(env, tmp.path(), make_flat_mutation_reader_from_mutations_v2(s, env.make_reader_permit(), { std::move(mut) }), cfg);
BOOST_REQUIRE(sst->run_identifier() == cfg.run_identifier);

View File

@@ -207,11 +207,11 @@ compaction_manager_for_testing::wrapped_compaction_manager::~wrapped_compaction_
}
class compaction_manager::compaction_manager_test_task : public compaction_manager::task {
utils::UUID _run_id;
sstables::run_id _run_id;
noncopyable_function<future<> (sstables::compaction_data&)> _job;
public:
compaction_manager_test_task(compaction_manager& cm, replica::column_family* cf, utils::UUID run_id, noncopyable_function<future<> (sstables::compaction_data&)> job)
compaction_manager_test_task(compaction_manager& cm, replica::column_family* cf, sstables::run_id run_id, noncopyable_function<future<> (sstables::compaction_data&)> job)
: compaction_manager::task(cm, &cf->as_table_state(), sstables::compaction_type::Compaction, "Test compaction")
, _run_id(run_id)
, _job(std::move(job))
@@ -226,7 +226,7 @@ protected:
}
};
future<> compaction_manager_test::run(utils::UUID output_run_id, replica::column_family* cf, noncopyable_function<future<> (sstables::compaction_data&)> job) {
future<> compaction_manager_test::run(sstables::run_id output_run_id, replica::column_family* cf, noncopyable_function<future<> (sstables::compaction_data&)> job) {
auto task = make_shared<compaction_manager::compaction_manager_test_task>(_cm, cf, output_run_id, std::move(job));
auto& cdata = register_compaction(task);
return task->run().discard_result().finally([this, &cdata] {

View File

@@ -179,7 +179,7 @@ public:
_sst->_data_file_write_time = wtime;
}
void set_run_identifier(utils::UUID identifier) {
void set_run_identifier(sstables::run_id identifier) {
_sst->_run_identifier = identifier;
}
@@ -209,7 +209,7 @@ public:
_sst->_components->summary.first_key.value = bytes(reinterpret_cast<const signed char*>(first_key.c_str()), first_key.size());
_sst->_components->summary.last_key.value = bytes(reinterpret_cast<const signed char*>(last_key.c_str()), last_key.size());
_sst->set_first_and_last_keys();
_sst->_run_identifier = utils::make_random_uuid();
_sst->_run_identifier = run_id::create_random_id();
}
void set_values(sstring first_key, sstring last_key, stats_metadata stats) {
@@ -220,7 +220,7 @@ public:
_sst->_components->summary.last_key.value = bytes(reinterpret_cast<const signed char*>(last_key.c_str()), last_key.size());
_sst->set_first_and_last_keys();
_sst->_components->statistics.contents[metadata_type::Compaction] = std::make_unique<compaction_metadata>();
_sst->_run_identifier = utils::make_random_uuid();
_sst->_run_identifier = run_id::create_random_id();
}
void rewrite_toc_without_scylla_component() {
@@ -370,7 +370,7 @@ class compaction_manager_test {
public:
explicit compaction_manager_test(compaction_manager& cm) noexcept : _cm(cm) {}
future<> run(utils::UUID output_run_id, replica::column_family* cf, noncopyable_function<future<> (sstables::compaction_data&)> job);
future<> run(sstables::run_id output_run_id, replica::column_family* cf, noncopyable_function<future<> (sstables::compaction_data&)> job);
void propagate_replacement(replica::table* t, const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added) {
_cm.propagate_replacement(t->as_table_state(), removed, added);

View File

@@ -1410,7 +1410,7 @@ public:
_writer.EndObject();
}
void operator()(const sstables::run_identifier& val) const {
_writer.AsString(val.id);
_writer.AsString(val.id.uuid());
}
void operator()(const sstables::scylla_metadata::large_data_stats& val) const {
_writer.StartObject();