storage_service, api: Add API to migrate a tablet

Will be used in tests, or for hot fixes in production.
This commit is contained in:
Tomasz Grabiec
2023-11-22 17:28:29 +01:00
parent 31c995332c
commit 1f57d1ea28
8 changed files with 199 additions and 0 deletions

View File

@@ -2465,6 +2465,78 @@
}
]
},
{
"path":"/storage_service/tablets/move",
"operations":[
{
"nickname":"move_tablet",
"method":"POST",
"summary":"Moves a tablet replica",
"type":"void",
"produces":[
"application/json"
],
"parameters":[
{
"name":"ks",
"description":"Keyspace name",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"table",
"description":"Table name",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"token",
"description":"Token owned by the tablet to move",
"required":true,
"allowMultiple":false,
"type":"integer",
"paramType":"query"
},
{
"name":"src_host",
"description":"Source host id",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"dst_host",
"description":"Destination host id",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
},
{
"name":"src_shard",
"description":"Source shard number",
"required":true,
"allowMultiple":false,
"type":"integer",
"paramType":"query"
},
{
"name":"dst_shard",
"description":"Destination shard number",
"required":true,
"allowMultiple":false,
"type":"integer",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/metrics/total_hints",
"operations":[

View File

@@ -74,6 +74,11 @@ locator::host_id validate_host_id(const sstring& param) {
return hoep.id;
}
static
int64_t validate_int(const sstring& param) {
return std::atoll(param.c_str());
}
// splits a request parameter assumed to hold a comma-separated list of table names
// verify that the tables are found, otherwise a bad_param_exception exception is thrown
// containing the description of the respective no_such_column_family error.
@@ -1359,6 +1364,23 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
co_return json_void();
});
ss::move_tablet.set(r, [&ctx, &ss] (std::unique_ptr<http::request> req) -> future<json_return_type> {
auto src_host_id = validate_host_id(req->get_query_param("src_host"));
shard_id src_shard_id = validate_int(req->get_query_param("src_shard"));
auto dst_host_id = validate_host_id(req->get_query_param("dst_host"));
shard_id dst_shard_id = validate_int(req->get_query_param("dst_shard"));
auto token = dht::token::from_int64(validate_int(req->get_query_param("token")));
auto ks = req->get_query_param("ks");
auto table = req->get_query_param("table");
auto table_id = ctx.db.local().find_column_family(ks, table).schema()->id();
co_await ss.local().move_tablet(table_id, token,
locator::tablet_replica{src_host_id, src_shard_id},
locator::tablet_replica{dst_host_id, dst_shard_id});
co_return json_void();
});
sp::get_schema_versions.set(r, [&ss](std::unique_ptr<http::request> req) {
return ss.local().describe_schema_versions().then([] (auto result) {
std::vector<sp::mapper_list> res;
@@ -1456,6 +1478,7 @@ void unset_storage_service(http_context& ctx, routes& r) {
ss::get_effective_ownership.unset(r);
ss::sstable_info.unset(r);
ss::reload_raft_topology_state.unset(r);
ss::move_tablet.unset(r);
sp::get_schema_versions.unset(r);
}

View File

@@ -125,6 +125,11 @@ bool contains(const tablet_replica_set& rs, host_id host) {
return false;
}
inline
bool contains(const tablet_replica_set& rs, const tablet_replica& r) {
return std::ranges::any_of(rs, [&] (auto&& r_) { return r_ == r; });
}
/// Stores information about a single tablet.
struct tablet_info {
tablet_replica_set replicas;

View File

@@ -6644,6 +6644,83 @@ future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) {
});
}
future<> storage_service::move_tablet(table_id table, dht::token token, locator::tablet_replica src, locator::tablet_replica dst) {
auto holder = _async_gate.hold();
if (this_shard_id() != 0) {
// group0 is only set on shard 0.
co_return co_await container().invoke_on(0, [&] (auto& ss) {
return ss.move_tablet(table, token, src, dst);
});
}
while (true) {
auto guard = co_await _group0->client().start_operation(&_abort_source);
while (_topology_state_machine._topology.is_busy()) {
slogger.debug("move_tablet(): topology state machine is busy");
release_guard(std::move(guard));
co_await _topology_state_machine.event.wait();
guard = co_await _group0->client().start_operation(&_abort_source);
}
std::vector<canonical_mutation> updates;
auto ks_name = _db.local().find_schema(table)->ks_name();
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
auto tid = tmap.get_tablet_id(token);
auto& tinfo = tmap.get_tablet_info(tid);
auto last_token = tmap.get_last_token(tid);
auto gid = locator::global_tablet_id{table, tid};
// FIXME: Validate replication strategy constraints.
if (!locator::contains(tinfo.replicas, src)) {
throw std::runtime_error(format("Tablet {} has no replica on {}", gid, src));
}
auto* node = get_token_metadata().get_topology().find_node(dst.host);
if (!node) {
throw std::runtime_error(format("Unknown host: {}", dst.host));
}
if (dst.shard >= node->get_shard_count()) {
throw std::runtime_error(format("Host {} does not have shard {}", dst.shard));
}
if (src.host == dst.host) {
throw std::runtime_error("Migrating within the same node is not supported");
}
if (src == dst) {
co_return;
}
updates.push_back(canonical_mutation(replica::tablet_mutation_builder(guard.write_timestamp(), ks_name, table)
.set_new_replicas(last_token, locator::replace_replica(tinfo.replicas, src, dst))
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
.build()));
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
.set_transition_state(topology::transition_state::tablet_migration)
.set_version(_topology_state_machine._topology.version + 1)
.build()));
sstring reason = format("Moving tablet {} from {} to {}", gid, src, dst);
slogger.info("raft topology: {}", reason);
slogger.trace("raft topology: do update {} reason {}", updates, reason);
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard));
break;
} catch (group0_concurrent_modification&) {
slogger.debug("move_tablet(): concurrent modification, retrying");
}
}
// Wait for migration to finish.
co_await _topology_state_machine.event.wait([&] {
auto& tmap = get_token_metadata().tablets().get_tablet_map(table);
return !tmap.get_tablet_transition_info(tmap.get_tablet_id(token));
});
}
future<join_node_request_result> storage_service::join_node_request_handler(join_node_request_params params) {
join_node_request_result result;
slogger.info("raft topology: received request to join from host_id: {}", params.host_id);

View File

@@ -772,6 +772,10 @@ public:
// Precondition: the topology mutations were already written to disk; the function only transitions the in-memory state machine.
// Public for `reload_raft_topology_state` REST API.
future<> topology_transition();
public:
future<> move_tablet(table_id, dht::token, locator::tablet_replica src, locator::tablet_replica dst);
private:
// load topology state machine snapshot into memory
// raft_group0_client::_read_apply_mutex must be held

View File

@@ -39,6 +39,10 @@ bool topology::contains(raft::server_id id) {
left_nodes.contains(id);
}
bool topology::is_busy() const {
return tstate.has_value();
}
std::set<sstring> calculate_not_yet_enabled_features(const std::set<sstring>& enabled_features, const auto& supported_features) {
std::set<sstring> to_enable;
bool first = true;

View File

@@ -159,6 +159,9 @@ struct topology {
// Are there any non-left nodes?
bool is_empty() const;
// Returns false iff we can safely start a new topology change.
bool is_busy() const;
// Calculates a set of features that are supported by all normal nodes but not yet enabled.
std::set<sstring> calculate_not_yet_enabled_features() const;
};

View File

@@ -209,6 +209,17 @@ class ScyllaRESTAPIClient():
await self.client.post(f"/v2/error_injection/injection/{injection}",
host=node_ip, params={"one_shot": str(one_shot)}, json={ key: str(value) for key, value in parameters.items() })
async def move_tablet(self, node_ip: str, ks: str, table: str, src_host: HostID, src_shard: int, dst_host: HostID, dst_shard: int, token: int) -> None:
await self.client.post(f"/storage_service/tablets/move", host=node_ip, params={
"ks": ks,
"table": table,
"src_host": str(src_host),
"src_shard": str(src_shard),
"dst_host": str(dst_host),
"dst_shard": str(dst_shard),
"token": str(token)
})
async def disable_injection(self, node_ip: str, injection: str) -> None:
await self.client.delete(f"/v2/error_injection/injection/{injection}", host=node_ip)