system_keyspace: Add ownership table
The schema is
CREATE TABLE system.sstables (
location text,
generation bigint,
format text,
status text,
uuid uuid,
version text,
PRIMARY KEY (location, generation)
)
A sample entry looks like:
location | generation | format | status | uuid | version
---------------------------------------------------------------------+------------+--------+--------+--------------------------------------+---------
/data/object_storage_ks/test_table-d096a1e0ad3811ed85b539b6b0998182 | 2 | big | sealed | d0a743b0-ad38-11ed-85b5-39b6b0998182 | me
The uuid field points to the "folder" on the storage where the sstable
components are. Like this:
s3
`- test_bucket
`- f7548f00-a64d-11ed-865a-0c1fbc116bb3
`- Data.db
- Index.db
- Filter.db
- ...
It's not very nice that the whole /var/lib/... path is in fact used as
location, it needs the PR #12707 to fix this place.
Also, the "status" part is not yet fully functional, it only supports
three options:
- creating -- the same as TemporaryTOC file exists on disk
- sealed -- default state
- deleting -- the analogy for the deletion log on disk
The latter needs support from the distributed_loader, which's not yet
there. In fact, distributes_loader also needs to be patched to actualy
select entries from this table on load. Also it needs the mentioned
PR #12707 to support staging and quarantine sstables.
Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -64,6 +64,8 @@
|
||||
#include <boost/algorithm/cxx11/any_of.hpp>
|
||||
#include "client_data.hh"
|
||||
#include "service/topology_state_machine.hh"
|
||||
#include "sstables/open_info.hh"
|
||||
#include "sstables/generation_type.hh"
|
||||
|
||||
using days = std::chrono::duration<int, std::ratio<24 * 3600>>;
|
||||
|
||||
@@ -1016,6 +1018,23 @@ schema_ptr system_keyspace::broadcast_kv_store() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::sstables_registry() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(NAME, SSTABLES_REGISTRY);
|
||||
return schema_builder(NAME, SSTABLES_REGISTRY, id)
|
||||
.with_column("location", utf8_type, column_kind::partition_key)
|
||||
.with_column("generation", long_type, column_kind::clustering_key)
|
||||
.with_column("uuid", uuid_type)
|
||||
.with_column("status", utf8_type)
|
||||
.with_column("version", utf8_type)
|
||||
.with_column("format", utf8_type)
|
||||
.set_comment("SSTables ownership table")
|
||||
.with_version(generate_schema_version(id))
|
||||
.build();
|
||||
}();
|
||||
return schema;
|
||||
}
|
||||
|
||||
schema_ptr system_keyspace::legacy::hints() {
|
||||
static thread_local auto schema = [] {
|
||||
schema_builder builder(generate_legacy_id(NAME, HINTS), NAME, HINTS,
|
||||
@@ -2823,6 +2842,9 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
|
||||
r.insert(r.end(), {broadcast_kv_store()});
|
||||
}
|
||||
}
|
||||
if (cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) {
|
||||
r.insert(r.end(), {sstables_registry()});
|
||||
}
|
||||
// legacy schema
|
||||
r.insert(r.end(), {
|
||||
// TODO: once we migrate hints/batchlog and add convertor
|
||||
@@ -3529,6 +3551,54 @@ future<service::topology> system_keyspace::load_topology_state() {
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
future<> system_keyspace::sstables_registry_create_entry(sstring location, utils::UUID uuid, sstring status, sstables::entry_descriptor desc) {
|
||||
static const auto req = format("INSERT INTO system.{} (location, generation, uuid, status, version, format) VALUES (?, ?, ?, ?, ?, ?)", SSTABLES_REGISTRY);
|
||||
slogger.trace("Inserting {}.{}:{} into {}", location, desc.generation.value(), uuid, SSTABLES_REGISTRY);
|
||||
co_await execute_cql(req, location, desc.generation.value(), uuid, status, fmt::to_string(desc.version), fmt::to_string(desc.format)).discard_result();
|
||||
}
|
||||
|
||||
future<utils::UUID> system_keyspace::sstables_registry_lookup_entry(sstring location, sstables::generation_type gen) {
|
||||
static const auto req = format("SELECT uuid FROM system.{} WHERE location = ? AND generation = ?", SSTABLES_REGISTRY);
|
||||
slogger.trace("Looking up {}.{} in {}", location, gen.value(), SSTABLES_REGISTRY);
|
||||
auto msg = co_await execute_cql(req, location, gen.value());
|
||||
if (msg->empty() || !msg->one().has("uuid")) {
|
||||
slogger.trace("ERROR: Cannot find {}.{} in {}", location, gen.value(), SSTABLES_REGISTRY);
|
||||
co_await coroutine::return_exception(std::runtime_error("No entry in sstables registry"));
|
||||
}
|
||||
|
||||
auto uuid = msg->one().get_as<utils::UUID>("uuid");
|
||||
slogger.trace("Found {}.{}:{} in {}", location, gen.value(), uuid, SSTABLES_REGISTRY);
|
||||
co_return uuid;
|
||||
}
|
||||
|
||||
future<> system_keyspace::sstables_registry_update_entry_status(sstring location, sstables::generation_type gen, sstring status) {
|
||||
static const auto req = format("UPDATE system.{} SET status = ? WHERE location = ? AND generation = ?", SSTABLES_REGISTRY);
|
||||
slogger.trace("Updating {}.{} -> {} in {}", location, gen.value(), status, SSTABLES_REGISTRY);
|
||||
co_await execute_cql(req, status, location, gen.value()).discard_result();
|
||||
}
|
||||
|
||||
future<> system_keyspace::sstables_registry_delete_entry(sstring location, sstables::generation_type gen) {
|
||||
static const auto req = format("DELETE FROM system.{} WHERE location = ? AND generation = ?", SSTABLES_REGISTRY);
|
||||
slogger.trace("Removing {}.{} from {}", location, gen.value(), SSTABLES_REGISTRY);
|
||||
co_await execute_cql(req, location, gen.value()).discard_result();
|
||||
}
|
||||
|
||||
future<> system_keyspace::sstables_registry_list(sstring location, sstable_registry_entry_consumer consumer) {
|
||||
static const auto req = format("SELECT uuid, status, generation, version, format FROM system.{} WHERE location = ?", SSTABLES_REGISTRY);
|
||||
slogger.trace("Listing {} entries from {}", location, SSTABLES_REGISTRY);
|
||||
|
||||
co_await _qp.local().query_internal(req, db::consistency_level::ONE, { location }, 1000, [ consumer = std::move(consumer) ] (const cql3::untyped_result_set::row& row) -> future<stop_iteration> {
|
||||
auto uuid = row.get_as<utils::UUID>("uuid");
|
||||
auto status = row.get_as<sstring>("status");
|
||||
auto gen = sstables::generation_from_value(row.get_as<int64_t>("generation"));
|
||||
auto ver = sstables::version_from_string(row.get_as<sstring>("version"));
|
||||
auto fmt = sstables::format_from_string(row.get_as<sstring>("format"));
|
||||
sstables::entry_descriptor desc("", "", "", gen, ver, fmt, sstables::component_type::TOC);
|
||||
co_await consumer(std::move(uuid), std::move(status), std::move(desc));
|
||||
co_return stop_iteration::no;
|
||||
});
|
||||
}
|
||||
|
||||
sstring system_keyspace_name() {
|
||||
return system_keyspace::NAME;
|
||||
}
|
||||
|
||||
@@ -25,6 +25,11 @@
|
||||
#include "locator/host_id.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
|
||||
namespace sstables {
|
||||
struct entry_descriptor;
|
||||
class generation_type;
|
||||
}
|
||||
|
||||
namespace service {
|
||||
|
||||
class storage_proxy;
|
||||
@@ -145,6 +150,7 @@ public:
|
||||
static constexpr auto DISCOVERY = "discovery";
|
||||
static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store";
|
||||
static constexpr auto TOPOLOGY = "topology";
|
||||
static constexpr auto SSTABLES_REGISTRY = "sstables";
|
||||
|
||||
struct v3 {
|
||||
static constexpr auto BATCHES = "batches";
|
||||
@@ -226,6 +232,7 @@ public:
|
||||
static schema_ptr discovery();
|
||||
static schema_ptr broadcast_kv_store();
|
||||
static schema_ptr topology();
|
||||
static schema_ptr sstables_registry();
|
||||
|
||||
static table_schema_version generate_schema_version(table_id table_id, uint16_t offset = 0);
|
||||
|
||||
@@ -450,6 +457,13 @@ public:
|
||||
// Assumes that the history table exists, i.e. Raft experimental feature is enabled.
|
||||
static future<mutation> get_group0_history(distributed<service::storage_proxy>&);
|
||||
|
||||
future<> sstables_registry_create_entry(sstring location, utils::UUID uuid, sstring status, sstables::entry_descriptor desc);
|
||||
future<utils::UUID> sstables_registry_lookup_entry(sstring location, sstables::generation_type gen);
|
||||
future<> sstables_registry_update_entry_status(sstring location, sstables::generation_type gen, sstring status);
|
||||
future<> sstables_registry_delete_entry(sstring location, sstables::generation_type gen);
|
||||
using sstable_registry_entry_consumer = noncopyable_function<future<>(utils::UUID uuid, sstring state, sstables::entry_descriptor desc)>;
|
||||
future<> sstables_registry_list(sstring location, sstable_registry_entry_consumer consumer);
|
||||
|
||||
future<std::optional<sstring>> load_group0_upgrade_state();
|
||||
future<> save_group0_upgrade_state(sstring);
|
||||
|
||||
|
||||
@@ -356,4 +356,25 @@ CREATE TABLE system.clients (
|
||||
Currently only CQL clients are tracked. The table used to be present on disk (in data
|
||||
directory) before and including version 4.5.
|
||||
|
||||
## system.sstables
|
||||
|
||||
The "ownership" table for non-local sstables
|
||||
|
||||
Schema:
|
||||
~~~
|
||||
CREATE TABLE system.sstables (
|
||||
location text,
|
||||
generation bigint,
|
||||
format text,
|
||||
status text,
|
||||
uuid uuid,
|
||||
version text,
|
||||
PRIMARY KEY (location, generation)
|
||||
)
|
||||
~~~
|
||||
|
||||
When a user keyspace is created with S3 storage options, sstables are put on the
|
||||
remote object storage and the information about them is kept in this table. The
|
||||
"uuid" field is used to point to the "folder" in which all sstables files are.
|
||||
|
||||
## TODO: the rest
|
||||
|
||||
Reference in New Issue
Block a user