system_keyspace: Add state field to system.sstables
The state is one of <empty>(normal)/staging/quarantine. Currently when sstable is moved to non-normal state the s3 backend state_change() call throws thus such sstables do not appear. Next patches are going to change that and the new field in the system.sstables is needed. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -1054,6 +1054,7 @@ schema_ptr system_keyspace::sstables_registry() {
|
||||
.with_column("location", utf8_type, column_kind::partition_key)
|
||||
.with_column("generation", timeuuid_type, column_kind::clustering_key)
|
||||
.with_column("status", utf8_type)
|
||||
.with_column("state", utf8_type)
|
||||
.with_column("version", utf8_type)
|
||||
.with_column("format", utf8_type)
|
||||
.set_comment("SSTables ownership table")
|
||||
@@ -2731,10 +2732,10 @@ mutation system_keyspace::make_cleanup_candidate_mutation(std::optional<cdc::gen
|
||||
return m;
|
||||
}
|
||||
|
||||
future<> system_keyspace::sstables_registry_create_entry(sstring location, sstring status, sstables::entry_descriptor desc) {
|
||||
static const auto req = format("INSERT INTO system.{} (location, generation, status, version, format) VALUES (?, ?, ?, ?, ?)", SSTABLES_REGISTRY);
|
||||
future<> system_keyspace::sstables_registry_create_entry(sstring location, sstring status, sstables::sstable_state state, sstables::entry_descriptor desc) {
|
||||
static const auto req = format("INSERT INTO system.{} (location, generation, status, state, version, format) VALUES (?, ?, ?, ?, ?, ?)", SSTABLES_REGISTRY);
|
||||
slogger.trace("Inserting {}.{} into {}", location, desc.generation, SSTABLES_REGISTRY);
|
||||
co_await execute_cql(req, location, desc.generation, status, fmt::to_string(desc.version), fmt::to_string(desc.format)).discard_result();
|
||||
co_await execute_cql(req, location, desc.generation, status, sstables::state_to_dir(state), fmt::to_string(desc.version), fmt::to_string(desc.format)).discard_result();
|
||||
}
|
||||
|
||||
future<> system_keyspace::sstables_registry_update_entry_status(sstring location, sstables::generation_type gen, sstring status) {
|
||||
@@ -2750,16 +2751,17 @@ future<> system_keyspace::sstables_registry_delete_entry(sstring location, sstab
|
||||
}
|
||||
|
||||
future<> system_keyspace::sstables_registry_list(sstring location, sstable_registry_entry_consumer consumer) {
|
||||
static const auto req = format("SELECT status, generation, version, format FROM system.{} WHERE location = ?", SSTABLES_REGISTRY);
|
||||
static const auto req = format("SELECT status, state, generation, version, format FROM system.{} WHERE location = ?", SSTABLES_REGISTRY);
|
||||
slogger.trace("Listing {} entries from {}", location, SSTABLES_REGISTRY);
|
||||
|
||||
co_await _qp.query_internal(req, db::consistency_level::ONE, { location }, 1000, [ consumer = std::move(consumer) ] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
|
||||
auto status = row.get_as<sstring>("status");
|
||||
auto state = sstables::state_from_dir(row.get_as<sstring>("state"));
|
||||
auto gen = sstables::generation_type(row.get_as<utils::UUID>("generation"));
|
||||
auto ver = sstables::version_from_string(row.get_as<sstring>("version"));
|
||||
auto fmt = sstables::format_from_string(row.get_as<sstring>("format"));
|
||||
sstables::entry_descriptor desc(gen, ver, fmt, sstables::component_type::TOC);
|
||||
co_await consumer(std::move(status), std::move(desc));
|
||||
co_await consumer(std::move(status), std::move(state), std::move(desc));
|
||||
co_return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
namespace sstables {
|
||||
struct entry_descriptor;
|
||||
class generation_type;
|
||||
enum class sstable_state;
|
||||
}
|
||||
|
||||
namespace service {
|
||||
@@ -498,10 +499,10 @@ public:
|
||||
// Assumes that the history table exists, i.e. Raft experimental feature is enabled.
|
||||
static future<mutation> get_group0_history(distributed<replica::database>&);
|
||||
|
||||
future<> sstables_registry_create_entry(sstring location, sstring status, sstables::entry_descriptor desc);
|
||||
future<> sstables_registry_create_entry(sstring location, sstring status, sstables::sstable_state state, sstables::entry_descriptor desc);
|
||||
future<> sstables_registry_update_entry_status(sstring location, sstables::generation_type gen, sstring status);
|
||||
future<> sstables_registry_delete_entry(sstring location, sstables::generation_type gen);
|
||||
using sstable_registry_entry_consumer = noncopyable_function<future<>(sstring state, sstables::entry_descriptor desc)>;
|
||||
using sstable_registry_entry_consumer = noncopyable_function<future<>(sstring status, sstables::sstable_state state, sstables::entry_descriptor desc)>;
|
||||
future<> sstables_registry_list(sstring location, sstable_registry_entry_consumer consumer);
|
||||
|
||||
future<std::optional<sstring>> load_group0_upgrade_state();
|
||||
|
||||
@@ -351,7 +351,7 @@ future<> sstable_directory::filesystem_components_lister::process(sstable_direct
|
||||
}
|
||||
|
||||
future<> sstable_directory::system_keyspace_components_lister::process(sstable_directory& directory, process_flags flags) {
|
||||
return _sys_ks.sstables_registry_list(_location, [this, flags, &directory] (sstring status, entry_descriptor desc) {
|
||||
return _sys_ks.sstables_registry_list(_location, [this, flags, &directory] (sstring status, sstable_state state, entry_descriptor desc) {
|
||||
if (status != "sealed") {
|
||||
dirlog.warn("Skip processing {} {} entry from {} (must have been picked up by garbage collector)", status, desc.generation, _location);
|
||||
return make_ready_future<>();
|
||||
@@ -383,7 +383,7 @@ future<> sstable_directory::system_keyspace_components_lister::commit() {
|
||||
|
||||
future<> sstable_directory::system_keyspace_components_lister::garbage_collect(storage& st) {
|
||||
return do_with(std::set<generation_type>(), [this, &st] (auto& gens_to_remove) {
|
||||
return _sys_ks.sstables_registry_list(_location, [&st, &gens_to_remove] (sstring status, entry_descriptor desc) {
|
||||
return _sys_ks.sstables_registry_list(_location, [&st, &gens_to_remove] (sstring status, sstable_state state, entry_descriptor desc) {
|
||||
if (status == "sealed") {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -482,7 +482,7 @@ sstring s3_storage::make_s3_object_name(const sstable& sst, component_type type)
|
||||
|
||||
void s3_storage::open(sstable& sst) {
|
||||
entry_descriptor desc(sst._generation, sst._version, sst._format, component_type::TOC);
|
||||
sst.manager().system_keyspace().sstables_registry_create_entry(_location, status_creating, std::move(desc)).get();
|
||||
sst.manager().system_keyspace().sstables_registry_create_entry(_location, status_creating, sst._state, std::move(desc)).get();
|
||||
|
||||
memory_data_sink_buffers bufs;
|
||||
sst.write_toc(
|
||||
|
||||
Reference in New Issue
Block a user