system_keyspace: Change sstables registry partition key type
Today, the system.sstables schema uses string as partition key. Callers, in turn, use table's datadir value to reference entries in it. That's wrong, S3-backed sstables don't have any local paths to work with. The table's ID is better in this role. This patch only changes the field type to be table_id and fixes the callers to provide one. In particular, see init_table_storage() change -- instead of generating a datadir string, it sets table.id() as the options' location. Other fixed places are tests. Internally, this id value is propagated via s3_storage::owner() method, that's fixed as well. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -1120,7 +1120,7 @@ schema_ptr system_keyspace::sstables_registry() {
|
||||
static thread_local auto schema = [] {
|
||||
auto id = generate_legacy_id(NAME, SSTABLES_REGISTRY);
|
||||
return schema_builder(NAME, SSTABLES_REGISTRY, id)
|
||||
.with_column("location", utf8_type, column_kind::partition_key)
|
||||
.with_column("location", uuid_type, column_kind::partition_key) // will be renamed to "owner" soon
|
||||
.with_column("generation", timeuuid_type, column_kind::clustering_key)
|
||||
.with_column("status", utf8_type)
|
||||
.with_column("state", utf8_type)
|
||||
@@ -3276,32 +3276,37 @@ system_keyspace::read_cdc_generation_opt(utils::UUID id) {
|
||||
co_return cdc::topology_description{std::move(entries)};
|
||||
}
|
||||
|
||||
future<> system_keyspace::sstables_registry_create_entry(sstring location, sstring status, sstables::sstable_state state, sstables::entry_descriptor desc) {
|
||||
future<> system_keyspace::sstables_registry_create_entry(table_id owner, sstring status, sstables::sstable_state state, sstables::entry_descriptor desc) {
|
||||
utils::UUID location = owner.id;
|
||||
static const auto req = format("INSERT INTO system.{} (location, generation, status, state, version, format) VALUES (?, ?, ?, ?, ?, ?)", SSTABLES_REGISTRY);
|
||||
slogger.trace("Inserting {}.{} into {}", location, desc.generation, SSTABLES_REGISTRY);
|
||||
co_await execute_cql(req, location, desc.generation, status, sstables::state_to_dir(state), fmt::to_string(desc.version), fmt::to_string(desc.format)).discard_result();
|
||||
}
|
||||
|
||||
future<> system_keyspace::sstables_registry_update_entry_status(sstring location, sstables::generation_type gen, sstring status) {
|
||||
future<> system_keyspace::sstables_registry_update_entry_status(table_id owner, sstables::generation_type gen, sstring status) {
|
||||
utils::UUID location = owner.id;
|
||||
static const auto req = format("UPDATE system.{} SET status = ? WHERE location = ? AND generation = ?", SSTABLES_REGISTRY);
|
||||
slogger.trace("Updating {}.{} -> status={} in {}", location, gen, status, SSTABLES_REGISTRY);
|
||||
co_await execute_cql(req, status, location, gen).discard_result();
|
||||
}
|
||||
|
||||
future<> system_keyspace::sstables_registry_update_entry_state(sstring location, sstables::generation_type gen, sstables::sstable_state state) {
|
||||
future<> system_keyspace::sstables_registry_update_entry_state(table_id owner, sstables::generation_type gen, sstables::sstable_state state) {
|
||||
utils::UUID location = owner.id;
|
||||
static const auto req = format("UPDATE system.{} SET state = ? WHERE location = ? AND generation = ?", SSTABLES_REGISTRY);
|
||||
auto new_state = sstables::state_to_dir(state);
|
||||
slogger.trace("Updating {}.{} -> state={} in {}", location, gen, new_state, SSTABLES_REGISTRY);
|
||||
co_await execute_cql(req, new_state, location, gen).discard_result();
|
||||
}
|
||||
|
||||
future<> system_keyspace::sstables_registry_delete_entry(sstring location, sstables::generation_type gen) {
|
||||
future<> system_keyspace::sstables_registry_delete_entry(table_id owner, sstables::generation_type gen) {
|
||||
utils::UUID location = owner.id;
|
||||
static const auto req = format("DELETE FROM system.{} WHERE location = ? AND generation = ?", SSTABLES_REGISTRY);
|
||||
slogger.trace("Removing {}.{} from {}", location, gen, SSTABLES_REGISTRY);
|
||||
co_await execute_cql(req, location, gen).discard_result();
|
||||
}
|
||||
|
||||
future<> system_keyspace::sstables_registry_list(sstring location, sstable_registry_entry_consumer consumer) {
|
||||
future<> system_keyspace::sstables_registry_list(table_id owner, sstable_registry_entry_consumer consumer) {
|
||||
utils::UUID location = owner.id;
|
||||
static const auto req = format("SELECT status, state, generation, version, format FROM system.{} WHERE location = ?", SSTABLES_REGISTRY);
|
||||
slogger.trace("Listing {} entries from {}", location, SSTABLES_REGISTRY);
|
||||
|
||||
|
||||
@@ -627,12 +627,12 @@ public:
|
||||
future<mutation> make_view_builder_version_mutation(api::timestamp_type ts, view_builder_version_t version);
|
||||
future<view_builder_version_t> get_view_builder_version();
|
||||
|
||||
future<> sstables_registry_create_entry(sstring location, sstring status, sstables::sstable_state state, sstables::entry_descriptor desc);
|
||||
future<> sstables_registry_update_entry_status(sstring location, sstables::generation_type gen, sstring status);
|
||||
future<> sstables_registry_update_entry_state(sstring location, sstables::generation_type gen, sstables::sstable_state state);
|
||||
future<> sstables_registry_delete_entry(sstring location, sstables::generation_type gen);
|
||||
future<> sstables_registry_create_entry(table_id owner, sstring status, sstables::sstable_state state, sstables::entry_descriptor desc);
|
||||
future<> sstables_registry_update_entry_status(table_id owner, sstables::generation_type gen, sstring status);
|
||||
future<> sstables_registry_update_entry_state(table_id owner, sstables::generation_type gen, sstables::sstable_state state);
|
||||
future<> sstables_registry_delete_entry(table_id owner, sstables::generation_type gen);
|
||||
using sstable_registry_entry_consumer = sstables::sstables_registry::entry_consumer;
|
||||
future<> sstables_registry_list(sstring location, sstable_registry_entry_consumer consumer);
|
||||
future<> sstables_registry_list(table_id owner, sstable_registry_entry_consumer consumer);
|
||||
|
||||
future<std::optional<sstring>> load_group0_upgrade_state();
|
||||
future<> save_group0_upgrade_state(sstring);
|
||||
|
||||
@@ -15,23 +15,23 @@ class system_keyspace_sstables_registry : public sstables::sstables_registry {
|
||||
public:
|
||||
system_keyspace_sstables_registry(system_keyspace& keyspace) : _keyspace(keyspace.shared_from_this()) {}
|
||||
|
||||
virtual seastar::future<> create_entry(sstring location, sstring status, sstables::sstable_state state, sstables::entry_descriptor desc) override {
|
||||
virtual seastar::future<> create_entry(table_id location, sstring status, sstables::sstable_state state, sstables::entry_descriptor desc) override {
|
||||
return _keyspace->sstables_registry_create_entry(location, status, state, desc);
|
||||
}
|
||||
|
||||
virtual seastar::future<> update_entry_status(sstring location, sstables::generation_type gen, sstring status) override {
|
||||
virtual seastar::future<> update_entry_status(table_id location, sstables::generation_type gen, sstring status) override {
|
||||
return _keyspace->sstables_registry_update_entry_status(location, gen, status);
|
||||
}
|
||||
|
||||
virtual seastar::future<> update_entry_state(sstring location, sstables::generation_type gen, sstables::sstable_state state) override {
|
||||
virtual seastar::future<> update_entry_state(table_id location, sstables::generation_type gen, sstables::sstable_state state) override {
|
||||
return _keyspace->sstables_registry_update_entry_state(location, gen, state);
|
||||
}
|
||||
|
||||
virtual seastar::future<> delete_entry(sstring location, sstables::generation_type gen) override {
|
||||
virtual seastar::future<> delete_entry(table_id location, sstables::generation_type gen) override {
|
||||
return _keyspace->sstables_registry_delete_entry(location, gen);
|
||||
}
|
||||
|
||||
virtual seastar::future<> sstables_registry_list(sstring location, entry_consumer consumer) override {
|
||||
virtual seastar::future<> sstables_registry_list(table_id location, entry_consumer consumer) override {
|
||||
return _keyspace->sstables_registry_list(location, std::move(consumer));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -55,7 +55,7 @@ sstable_directory::filesystem_components_lister::filesystem_components_lister(st
|
||||
{
|
||||
}
|
||||
|
||||
sstable_directory::sstables_registry_components_lister::sstables_registry_components_lister(sstables::sstables_registry& sstables_registry, sstring location)
|
||||
sstable_directory::sstables_registry_components_lister::sstables_registry_components_lister(sstables::sstables_registry& sstables_registry, table_id location)
|
||||
: _sstables_registry(sstables_registry)
|
||||
, _location(std::move(location))
|
||||
{
|
||||
@@ -89,7 +89,7 @@ sstable_directory::make_components_lister() {
|
||||
// collect and process them is by listing the bucket
|
||||
return std::make_unique<sstable_directory::filesystem_components_lister>(fs::path(std::get<sstring>(os.location)), _manager, os);
|
||||
}
|
||||
return std::make_unique<sstable_directory::sstables_registry_components_lister>(_manager.sstables_registry(), std::get<sstring>(os.location));
|
||||
return std::make_unique<sstable_directory::sstables_registry_components_lister>(_manager.sstables_registry(), std::get<table_id>(os.location));
|
||||
}
|
||||
}, _storage_opts->value);
|
||||
}
|
||||
|
||||
@@ -131,12 +131,12 @@ public:
|
||||
|
||||
class sstables_registry_components_lister final : public components_lister {
|
||||
sstables_registry& _sstables_registry;
|
||||
sstring _location;
|
||||
table_id _location;
|
||||
|
||||
future<> garbage_collect(storage&);
|
||||
|
||||
public:
|
||||
sstables_registry_components_lister(sstables::sstables_registry& sstables_registry, sstring location);
|
||||
sstables_registry_components_lister(sstables::sstables_registry& sstables_registry, table_id owner);
|
||||
|
||||
virtual future<> process(sstable_directory& directory, process_flags flags) override;
|
||||
virtual future<> commit() override;
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "seastarx.hh"
|
||||
|
||||
namespace sstables {
|
||||
@@ -18,12 +19,12 @@ namespace sstables {
|
||||
class sstables_registry {
|
||||
public:
|
||||
virtual ~sstables_registry();
|
||||
virtual future<> create_entry(sstring location, sstring status, sstable_state state, sstables::entry_descriptor desc) = 0;
|
||||
virtual future<> update_entry_status(sstring location, sstables::generation_type gen, sstring status) = 0;
|
||||
virtual future<> update_entry_state(sstring location, sstables::generation_type gen, sstables::sstable_state state) = 0;
|
||||
virtual future<> delete_entry(sstring location, sstables::generation_type gen) = 0;
|
||||
virtual future<> create_entry(table_id owner, sstring status, sstable_state state, sstables::entry_descriptor desc) = 0;
|
||||
virtual future<> update_entry_status(table_id owner, sstables::generation_type gen, sstring status) = 0;
|
||||
virtual future<> update_entry_state(table_id owner, sstables::generation_type gen, sstables::sstable_state state) = 0;
|
||||
virtual future<> delete_entry(table_id owner, sstables::generation_type gen) = 0;
|
||||
using entry_consumer = noncopyable_function<future<>(sstring status, sstables::sstable_state state, sstables::entry_descriptor desc)>;
|
||||
virtual future<> sstables_registry_list(sstring location, entry_consumer consumer) = 0;
|
||||
virtual future<> sstables_registry_list(table_id owner, entry_consumer consumer) = 0;
|
||||
};
|
||||
|
||||
} // namespace sstables
|
||||
|
||||
@@ -514,8 +514,11 @@ class s3_storage : public sstables::storage {
|
||||
|
||||
sstring make_s3_object_name(const sstable& sst, component_type type) const;
|
||||
|
||||
sstring owner() const {
|
||||
return std::get<sstring>(_location);
|
||||
table_id owner() const {
|
||||
if (std::holds_alternative<sstring>(_location)) {
|
||||
on_internal_error(sstlog, format("Storage holds {} prefix, but registry owner is expected", std::get<sstring>(_location)));
|
||||
}
|
||||
return std::get<table_id>(_location);
|
||||
}
|
||||
|
||||
public:
|
||||
@@ -697,7 +700,7 @@ future<lw_shared_ptr<const data_dictionary::storage_options>> init_table_storage
|
||||
nopts.value = data_dictionary::storage_options::s3 {
|
||||
.bucket = so.bucket,
|
||||
.endpoint = so.endpoint,
|
||||
.location = format("{}/{}/{}", mgr.config().data_file_directories()[0], s.ks_name(), replica::format_table_directory_name(s.cf_name(), s.id())),
|
||||
.location = s.id(),
|
||||
};
|
||||
co_return make_lw_shared<const data_dictionary::storage_options>(std::move(nopts));
|
||||
}
|
||||
|
||||
@@ -272,13 +272,13 @@ class mock_sstables_registry : public sstables::sstables_registry {
|
||||
sstables::sstable_state state;
|
||||
sstables::entry_descriptor desc;
|
||||
};
|
||||
std::map<std::pair<sstring, generation_type>, entry> _entries;
|
||||
std::map<std::pair<table_id, generation_type>, entry> _entries;
|
||||
public:
|
||||
virtual future<> create_entry(sstring location, sstring status, sstable_state state, sstables::entry_descriptor desc) override {
|
||||
virtual future<> create_entry(table_id location, sstring status, sstable_state state, sstables::entry_descriptor desc) override {
|
||||
_entries.emplace(std::make_pair(location, desc.generation), entry { status, state, desc });
|
||||
co_return;
|
||||
};
|
||||
virtual future<> update_entry_status(sstring location, sstables::generation_type gen, sstring status) override {
|
||||
virtual future<> update_entry_status(table_id location, sstables::generation_type gen, sstring status) override {
|
||||
auto it = _entries.find(std::make_pair(location, gen));
|
||||
if (it != _entries.end()) {
|
||||
it->second.status = status;
|
||||
@@ -287,7 +287,7 @@ public:
|
||||
}
|
||||
co_return;
|
||||
}
|
||||
virtual future<> update_entry_state(sstring location, sstables::generation_type gen, sstables::sstable_state state) override {
|
||||
virtual future<> update_entry_state(table_id location, sstables::generation_type gen, sstables::sstable_state state) override {
|
||||
auto it = _entries.find(std::make_pair(location, gen));
|
||||
if (it != _entries.end()) {
|
||||
it->second.state = state;
|
||||
@@ -296,7 +296,7 @@ public:
|
||||
}
|
||||
co_return;
|
||||
}
|
||||
virtual future<> delete_entry(sstring location, sstables::generation_type gen) override {
|
||||
virtual future<> delete_entry(table_id location, sstables::generation_type gen) override {
|
||||
auto it = _entries.find(std::make_pair(location, gen));
|
||||
if (it != _entries.end()) {
|
||||
_entries.erase(it);
|
||||
@@ -305,7 +305,7 @@ public:
|
||||
}
|
||||
co_return;
|
||||
}
|
||||
virtual future<> sstables_registry_list(sstring location, entry_consumer consumer) override {
|
||||
virtual future<> sstables_registry_list(table_id location, entry_consumer consumer) override {
|
||||
for (auto& [loc_and_gen, e] : _entries) {
|
||||
if (loc_and_gen.first == location) {
|
||||
co_await consumer(e.status, e.state, e.desc);
|
||||
@@ -352,7 +352,7 @@ test_env::make_sstable(schema_ptr schema, sstring dir, sstables::generation_type
|
||||
auto storage = _impl->storage;
|
||||
std::visit(overloaded_functor {
|
||||
[&dir] (data_dictionary::storage_options::local& o) { o.dir = dir; },
|
||||
[&dir] (data_dictionary::storage_options::s3& o) { o.location = dir; },
|
||||
[&schema] (data_dictionary::storage_options::s3& o) { o.location = schema->id(); },
|
||||
}, storage.value);
|
||||
return _impl->mgr.make_sstable(std::move(schema), storage, generation, sstables::sstable_state::normal, v, f, now, default_io_error_handler_gen(), buffer_size);
|
||||
}
|
||||
@@ -488,7 +488,7 @@ test_env::make_table_for_tests(schema_ptr s, sstring dir) {
|
||||
auto storage = _impl->storage;
|
||||
std::visit(overloaded_functor {
|
||||
[&dir] (data_dictionary::storage_options::local& o) { o.dir = dir; },
|
||||
[&dir] (data_dictionary::storage_options::s3& o) { o.location = dir; },
|
||||
[&s] (data_dictionary::storage_options::s3& o) { o.location = s->id(); },
|
||||
}, storage.value);
|
||||
return table_for_tests(manager(), _impl->cmgr->get_compaction_manager(), s, std::move(cfg), std::move(storage));
|
||||
}
|
||||
|
||||
@@ -72,8 +72,9 @@ async def test_basic(manager: ManagerClient, s3_server):
|
||||
|
||||
# Check that the ownership table is populated properly
|
||||
res = cql.execute("SELECT * FROM system.sstables;")
|
||||
tid = cql.execute(f"SELECT id FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = '{cf}'").one()
|
||||
for row in res:
|
||||
assert row.location.startswith(workdir), \
|
||||
assert row.location == tid.id, \
|
||||
f'Unexpected entry location in registry: {row.location}'
|
||||
assert row.status == 'sealed', f'Unexpected entry status in registry: {row.status}'
|
||||
|
||||
@@ -120,7 +121,7 @@ async def test_garbage_collect(manager: ManagerClient, s3_server):
|
||||
print(f'Found entries: {[ str(ent[1]) for ent in sstable_entries ]}')
|
||||
for loc, gen in sstable_entries:
|
||||
cql.execute("UPDATE system.sstables SET status = 'removing'"
|
||||
f" WHERE location = '{loc}' AND generation = {gen};")
|
||||
f" WHERE location = {loc} AND generation = {gen};")
|
||||
|
||||
print('Restart scylla')
|
||||
await manager.server_restart(server.server_id)
|
||||
@@ -162,7 +163,7 @@ async def test_populate_from_quarantine(manager: ManagerClient, s3_server):
|
||||
assert len(list(res)) > 0, 'No entries in registry'
|
||||
for row in res:
|
||||
cql.execute("UPDATE system.sstables SET state = 'quarantine'"
|
||||
f" WHERE location = '{row.location}' AND generation = {row.generation};")
|
||||
f" WHERE location = {row.location} AND generation = {row.generation};")
|
||||
|
||||
print('Restart scylla')
|
||||
await manager.server_restart(server.server_id)
|
||||
|
||||
Reference in New Issue
Block a user