diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index f816f39d06..df746107c5 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -64,6 +64,8 @@ #include #include "client_data.hh" #include "service/topology_state_machine.hh" +#include "sstables/open_info.hh" +#include "sstables/generation_type.hh" using days = std::chrono::duration>; @@ -1016,6 +1018,23 @@ schema_ptr system_keyspace::broadcast_kv_store() { return schema; } +schema_ptr system_keyspace::sstables_registry() { + static thread_local auto schema = [] { + auto id = generate_legacy_id(NAME, SSTABLES_REGISTRY); + return schema_builder(NAME, SSTABLES_REGISTRY, id) + .with_column("location", utf8_type, column_kind::partition_key) + .with_column("generation", long_type, column_kind::clustering_key) + .with_column("uuid", uuid_type) + .with_column("status", utf8_type) + .with_column("version", utf8_type) + .with_column("format", utf8_type) + .set_comment("SSTables ownership table") + .with_version(generate_schema_version(id)) + .build(); + }(); + return schema; +} + schema_ptr system_keyspace::legacy::hints() { static thread_local auto schema = [] { schema_builder builder(generate_legacy_id(NAME, HINTS), NAME, HINTS, @@ -2823,6 +2842,9 @@ std::vector system_keyspace::all_tables(const db::config& cfg) { r.insert(r.end(), {broadcast_kv_store()}); } } + if (cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) { + r.insert(r.end(), {sstables_registry()}); + } // legacy schema r.insert(r.end(), { // TODO: once we migrate hints/batchlog and add convertor @@ -3529,6 +3551,54 @@ future system_keyspace::load_topology_state() { co_return ret; } +future<> system_keyspace::sstables_registry_create_entry(sstring location, utils::UUID uuid, sstring status, sstables::entry_descriptor desc) { + static const auto req = format("INSERT INTO system.{} (location, generation, uuid, status, version, format) VALUES (?, ?, ?, ?, ?, ?)", SSTABLES_REGISTRY); + slogger.trace("Inserting {}.{}:{} into {}", location, desc.generation.value(), uuid, SSTABLES_REGISTRY); + co_await execute_cql(req, location, desc.generation.value(), uuid, status, fmt::to_string(desc.version), fmt::to_string(desc.format)).discard_result(); +} + +future system_keyspace::sstables_registry_lookup_entry(sstring location, sstables::generation_type gen) { + static const auto req = format("SELECT uuid FROM system.{} WHERE location = ? AND generation = ?", SSTABLES_REGISTRY); + slogger.trace("Looking up {}.{} in {}", location, gen.value(), SSTABLES_REGISTRY); + auto msg = co_await execute_cql(req, location, gen.value()); + if (msg->empty() || !msg->one().has("uuid")) { + slogger.trace("ERROR: Cannot find {}.{} in {}", location, gen.value(), SSTABLES_REGISTRY); + co_await coroutine::return_exception(std::runtime_error("No entry in sstables registry")); + } + + auto uuid = msg->one().get_as("uuid"); + slogger.trace("Found {}.{}:{} in {}", location, gen.value(), uuid, SSTABLES_REGISTRY); + co_return uuid; +} + +future<> system_keyspace::sstables_registry_update_entry_status(sstring location, sstables::generation_type gen, sstring status) { + static const auto req = format("UPDATE system.{} SET status = ? WHERE location = ? AND generation = ?", SSTABLES_REGISTRY); + slogger.trace("Updating {}.{} -> {} in {}", location, gen.value(), status, SSTABLES_REGISTRY); + co_await execute_cql(req, status, location, gen.value()).discard_result(); +} + +future<> system_keyspace::sstables_registry_delete_entry(sstring location, sstables::generation_type gen) { + static const auto req = format("DELETE FROM system.{} WHERE location = ? AND generation = ?", SSTABLES_REGISTRY); + slogger.trace("Removing {}.{} from {}", location, gen.value(), SSTABLES_REGISTRY); + co_await execute_cql(req, location, gen.value()).discard_result(); +} + +future<> system_keyspace::sstables_registry_list(sstring location, sstable_registry_entry_consumer consumer) { + static const auto req = format("SELECT uuid, status, generation, version, format FROM system.{} WHERE location = ?", SSTABLES_REGISTRY); + slogger.trace("Listing {} entries from {}", location, SSTABLES_REGISTRY); + + co_await _qp.local().query_internal(req, db::consistency_level::ONE, { location }, 1000, [ consumer = std::move(consumer) ] (const cql3::untyped_result_set::row& row) -> future { + auto uuid = row.get_as("uuid"); + auto status = row.get_as("status"); + auto gen = sstables::generation_from_value(row.get_as("generation")); + auto ver = sstables::version_from_string(row.get_as("version")); + auto fmt = sstables::format_from_string(row.get_as("format")); + sstables::entry_descriptor desc("", "", "", gen, ver, fmt, sstables::component_type::TOC); + co_await consumer(std::move(uuid), std::move(status), std::move(desc)); + co_return stop_iteration::no; + }); +} + sstring system_keyspace_name() { return system_keyspace::NAME; } diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 1154fe7a83..1772f56b58 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -25,6 +25,11 @@ #include "locator/host_id.hh" #include "mutation/canonical_mutation.hh" +namespace sstables { + struct entry_descriptor; + class generation_type; +} + namespace service { class storage_proxy; @@ -145,6 +150,7 @@ public: static constexpr auto DISCOVERY = "discovery"; static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store"; static constexpr auto TOPOLOGY = "topology"; + static constexpr auto SSTABLES_REGISTRY = "sstables"; struct v3 { static constexpr auto BATCHES = "batches"; @@ -226,6 +232,7 @@ public: static schema_ptr discovery(); static schema_ptr broadcast_kv_store(); static schema_ptr topology(); + static schema_ptr sstables_registry(); static table_schema_version generate_schema_version(table_id table_id, uint16_t offset = 0); @@ -450,6 +457,13 @@ public: // Assumes that the history table exists, i.e. Raft experimental feature is enabled. static future get_group0_history(distributed&); + future<> sstables_registry_create_entry(sstring location, utils::UUID uuid, sstring status, sstables::entry_descriptor desc); + future sstables_registry_lookup_entry(sstring location, sstables::generation_type gen); + future<> sstables_registry_update_entry_status(sstring location, sstables::generation_type gen, sstring status); + future<> sstables_registry_delete_entry(sstring location, sstables::generation_type gen); + using sstable_registry_entry_consumer = noncopyable_function(utils::UUID uuid, sstring state, sstables::entry_descriptor desc)>; + future<> sstables_registry_list(sstring location, sstable_registry_entry_consumer consumer); + future> load_group0_upgrade_state(); future<> save_group0_upgrade_state(sstring); diff --git a/docs/dev/system_keyspace.md b/docs/dev/system_keyspace.md index 3034d97f74..150424baf0 100644 --- a/docs/dev/system_keyspace.md +++ b/docs/dev/system_keyspace.md @@ -356,4 +356,25 @@ CREATE TABLE system.clients ( Currently only CQL clients are tracked. The table used to be present on disk (in data directory) before and including version 4.5. +## system.sstables + +The "ownership" table for non-local sstables + +Schema: +~~~ +CREATE TABLE system.sstables ( + location text, + generation bigint, + format text, + status text, + uuid uuid, + version text, + PRIMARY KEY (location, generation) +) +~~~ + +When a user keyspace is created with S3 storage options, sstables are put on the +remote object storage and the information about them is kept in this table. The +"uuid" field is used to point to the "folder" in which all sstables files are. + ## TODO: the rest