From 1f57d1ea2836b4831d9acb6ba21b3bebeb95b94b Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 22 Nov 2023 17:28:29 +0100 Subject: [PATCH] storage_service, api: Add API to migrate a tablet Will be used in tests, or for hot fixes in production. --- api/api-doc/storage_service.json | 72 +++++++++++++++++++++++++++++ api/storage_service.cc | 23 +++++++++ locator/tablets.hh | 5 ++ service/storage_service.cc | 77 +++++++++++++++++++++++++++++++ service/storage_service.hh | 4 ++ service/topology_state_machine.cc | 4 ++ service/topology_state_machine.hh | 3 ++ test/pylib/rest_client.py | 11 +++++ 8 files changed, 199 insertions(+) diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index 3faad75458..b80b8911f3 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -2465,6 +2465,78 @@ } ] }, + { + "path":"/storage_service/tablets/move", + "operations":[ + { + "nickname":"move_tablet", + "method":"POST", + "summary":"Moves a tablet replica", + "type":"void", + "produces":[ + "application/json" + ], + "parameters":[ + { + "name":"ks", + "description":"Keyspace name", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"query" + }, + { + "name":"table", + "description":"Table name", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"query" + }, + { + "name":"token", + "description":"Token owned by the tablet to move", + "required":true, + "allowMultiple":false, + "type":"integer", + "paramType":"query" + }, + { + "name":"src_host", + "description":"Source host id", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"query" + }, + { + "name":"dst_host", + "description":"Destination host id", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"query" + }, + { + "name":"src_shard", + "description":"Source shard number", + "required":true, + "allowMultiple":false, + "type":"integer", + "paramType":"query" + }, + { + "name":"dst_shard", + "description":"Destination shard number", + "required":true, + "allowMultiple":false, + "type":"integer", + "paramType":"query" + } + ] + } + ] + }, { "path":"/storage_service/metrics/total_hints", "operations":[ diff --git a/api/storage_service.cc b/api/storage_service.cc index 9dd955eb72..60c6afe000 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -74,6 +74,11 @@ locator::host_id validate_host_id(const sstring& param) { return hoep.id; } +static +int64_t validate_int(const sstring& param) { + return std::atoll(param.c_str()); +} + // splits a request parameter assumed to hold a comma-separated list of table names // verify that the tables are found, otherwise a bad_param_exception exception is thrown // containing the description of the respective no_such_column_family error. @@ -1359,6 +1364,23 @@ void set_storage_service(http_context& ctx, routes& r, sharded req) -> future { + auto src_host_id = validate_host_id(req->get_query_param("src_host")); + shard_id src_shard_id = validate_int(req->get_query_param("src_shard")); + auto dst_host_id = validate_host_id(req->get_query_param("dst_host")); + shard_id dst_shard_id = validate_int(req->get_query_param("dst_shard")); + auto token = dht::token::from_int64(validate_int(req->get_query_param("token"))); + auto ks = req->get_query_param("ks"); + auto table = req->get_query_param("table"); + auto table_id = ctx.db.local().find_column_family(ks, table).schema()->id(); + + co_await ss.local().move_tablet(table_id, token, + locator::tablet_replica{src_host_id, src_shard_id}, + locator::tablet_replica{dst_host_id, dst_shard_id}); + + co_return json_void(); + }); + sp::get_schema_versions.set(r, [&ss](std::unique_ptr req) { return ss.local().describe_schema_versions().then([] (auto result) { std::vector res; @@ -1456,6 +1478,7 @@ void unset_storage_service(http_context& ctx, routes& r) { ss::get_effective_ownership.unset(r); ss::sstable_info.unset(r); ss::reload_raft_topology_state.unset(r); + ss::move_tablet.unset(r); sp::get_schema_versions.unset(r); } diff --git a/locator/tablets.hh b/locator/tablets.hh index 13ba07fd29..8a3dce75da 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -125,6 +125,11 @@ bool contains(const tablet_replica_set& rs, host_id host) { return false; } +inline +bool contains(const tablet_replica_set& rs, const tablet_replica& r) { + return std::ranges::any_of(rs, [&] (auto&& r_) { return r_ == r; }); +} + /// Stores information about a single tablet. struct tablet_info { tablet_replica_set replicas; diff --git a/service/storage_service.cc b/service/storage_service.cc index 867d7c769c..af72cf103e 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6644,6 +6644,83 @@ future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) { }); } +future<> storage_service::move_tablet(table_id table, dht::token token, locator::tablet_replica src, locator::tablet_replica dst) { + auto holder = _async_gate.hold(); + + if (this_shard_id() != 0) { + // group0 is only set on shard 0. + co_return co_await container().invoke_on(0, [&] (auto& ss) { + return ss.move_tablet(table, token, src, dst); + }); + } + + while (true) { + auto guard = co_await _group0->client().start_operation(&_abort_source); + + while (_topology_state_machine._topology.is_busy()) { + slogger.debug("move_tablet(): topology state machine is busy"); + release_guard(std::move(guard)); + co_await _topology_state_machine.event.wait(); + guard = co_await _group0->client().start_operation(&_abort_source); + } + + std::vector updates; + auto ks_name = _db.local().find_schema(table)->ks_name(); + auto& tmap = get_token_metadata().tablets().get_tablet_map(table); + auto tid = tmap.get_tablet_id(token); + auto& tinfo = tmap.get_tablet_info(tid); + auto last_token = tmap.get_last_token(tid); + auto gid = locator::global_tablet_id{table, tid}; + + // FIXME: Validate replication strategy constraints. + + if (!locator::contains(tinfo.replicas, src)) { + throw std::runtime_error(format("Tablet {} has no replica on {}", gid, src)); + } + auto* node = get_token_metadata().get_topology().find_node(dst.host); + if (!node) { + throw std::runtime_error(format("Unknown host: {}", dst.host)); + } + if (dst.shard >= node->get_shard_count()) { + throw std::runtime_error(format("Host {} does not have shard {}", dst.shard)); + } + if (src.host == dst.host) { + throw std::runtime_error("Migrating within the same node is not supported"); + } + + if (src == dst) { + co_return; + } + + updates.push_back(canonical_mutation(replica::tablet_mutation_builder(guard.write_timestamp(), ks_name, table) + .set_new_replicas(last_token, locator::replace_replica(tinfo.replicas, src, dst)) + .set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old) + .build())); + updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp()) + .set_transition_state(topology::transition_state::tablet_migration) + .set_version(_topology_state_machine._topology.version + 1) + .build())); + + sstring reason = format("Moving tablet {} from {} to {}", gid, src, dst); + slogger.info("raft topology: {}", reason); + slogger.trace("raft topology: do update {} reason {}", updates, reason); + topology_change change{std::move(updates)}; + group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason); + try { + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard)); + break; + } catch (group0_concurrent_modification&) { + slogger.debug("move_tablet(): concurrent modification, retrying"); + } + } + + // Wait for migration to finish. + co_await _topology_state_machine.event.wait([&] { + auto& tmap = get_token_metadata().tablets().get_tablet_map(table); + return !tmap.get_tablet_transition_info(tmap.get_tablet_id(token)); + }); +} + future storage_service::join_node_request_handler(join_node_request_params params) { join_node_request_result result; slogger.info("raft topology: received request to join from host_id: {}", params.host_id); diff --git a/service/storage_service.hh b/service/storage_service.hh index 75adbb818e..be7051d34f 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -772,6 +772,10 @@ public: // Precondition: the topology mutations were already written to disk; the function only transitions the in-memory state machine. // Public for `reload_raft_topology_state` REST API. future<> topology_transition(); + +public: + future<> move_tablet(table_id, dht::token, locator::tablet_replica src, locator::tablet_replica dst); + private: // load topology state machine snapshot into memory // raft_group0_client::_read_apply_mutex must be held diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index 4764ca1b75..06f598f592 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -39,6 +39,10 @@ bool topology::contains(raft::server_id id) { left_nodes.contains(id); } +bool topology::is_busy() const { + return tstate.has_value(); +} + std::set calculate_not_yet_enabled_features(const std::set& enabled_features, const auto& supported_features) { std::set to_enable; bool first = true; diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 9bd29ef7d0..f85405c2ab 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -159,6 +159,9 @@ struct topology { // Are there any non-left nodes? bool is_empty() const; + // Returns false iff we can safely start a new topology change. + bool is_busy() const; + // Calculates a set of features that are supported by all normal nodes but not yet enabled. std::set calculate_not_yet_enabled_features() const; }; diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index 79e4e4ed23..d2d982dd57 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -209,6 +209,17 @@ class ScyllaRESTAPIClient(): await self.client.post(f"/v2/error_injection/injection/{injection}", host=node_ip, params={"one_shot": str(one_shot)}, json={ key: str(value) for key, value in parameters.items() }) + async def move_tablet(self, node_ip: str, ks: str, table: str, src_host: HostID, src_shard: int, dst_host: HostID, dst_shard: int, token: int) -> None: + await self.client.post(f"/storage_service/tablets/move", host=node_ip, params={ + "ks": ks, + "table": table, + "src_host": str(src_host), + "src_shard": str(src_shard), + "dst_host": str(dst_host), + "dst_shard": str(dst_shard), + "token": str(token) + }) + async def disable_injection(self, node_ip: str, injection: str) -> None: await self.client.delete(f"/v2/error_injection/injection/{injection}", host=node_ip)