Merge 'Add support for decommission with tablets' from Tomasz Grabiec
Load balancer will recognize decommissioning nodes and will move tablet replicas away from such nodes with highest priority. Topology changes have now an extra step called "tablet draining" which calls the load balancer. The step will execute tablet migration track as long as there are nodes which require draining. It will not do regular load balancing. If load balancer is unable to find new tablet replicas, because RF cannot be met or availability is at risk due to insufficient node distribution in racks, it will throw an exception. Currently, topology change will retry in a loop. We should make this error cause topology change to be aborted. There is no infrastructure for aborts yet, so this is not implemented. Closes #15197 * github.com:scylladb/scylladb: tablets, raft topology: Add support for decommission with tablets tablet_allocator: Compute load sketch lazily tablet_allocator: Set node id correctly tablet_allocator: Make migration_plan a class tablets: Implement cleanup step storage_service, tablets: Prevent stale RPCs from running beyond their stage locator: Introduce tablet_metadata_guard locator, replica: Add a way to wait for table's effective_replication_map change storage_service, tablets: Extract do_tablet_operation() from stream_tablet() raft topology: Add break in the final case clause raft topology: Fix SIGSEGV when trace-level logging is enabled raft topology: Set node state in topology raft topology: Always set host id in topology
This commit is contained in:
@@ -76,7 +76,7 @@ such as `check_and_repair_cdc_streams`.
|
||||
If there is no work for the state machine, tablet load balancer is invoked to
|
||||
check if we need to rebalance. If so, it computes an incremental tablet migration
|
||||
plan, persists it by moving tablets into transitional states, and moves the state machine
|
||||
into the tablet migration track. All this happens atomically form the perspective
|
||||
into the tablet migration track. All this happens atomically from the perspective
|
||||
of group0 state machine.
|
||||
|
||||
The tablet migration track also invokes the load balancer and starts new migrations
|
||||
@@ -84,9 +84,9 @@ to keep the cluster saturated with streaming. The load balancer is invoked
|
||||
on transition of tablet stages, and also continuously as long as it generates
|
||||
new migrations.
|
||||
|
||||
If there is a pending topology change request, the load balancer
|
||||
will not be invoked to allow for current migrations to drain, after which the
|
||||
state machine will exit the tablet migration track and allow pending topology
|
||||
If there is a pending topology change request during tablet migration track,
|
||||
the load balancer will not be invoked to allow for current migrations to drain,
|
||||
after which the state machine will exit the tablet migration track and allow pending topology
|
||||
operation to start.
|
||||
|
||||
The tablet migration track excludes with other topology changes, so node operations
|
||||
@@ -103,6 +103,12 @@ that there are no tablet transitions in the system.
|
||||
|
||||
Tablets are migrated in parallel and independently.
|
||||
|
||||
There is a variant of tablet migration track called tablet draining track, which is invoked
|
||||
as a step of certain topology operations (e.g. decommission). Its goal is to readjust tablet replicas
|
||||
so that a given topology change can proceed. For example, when decommissioning a node, we
|
||||
need to migrate tablet replicas away from the node being decommissioned.
|
||||
Tablet draining happens before making changes to vnode-based replication.
|
||||
|
||||
# Tablet migration
|
||||
|
||||
Each tablet has its own migration state machine stored in group0 which is part of the tablet state. It involves
|
||||
|
||||
@@ -52,4 +52,5 @@ struct raft_topology_pull_params {};
|
||||
verb raft_topology_cmd (raft::term_t term, uint64_t cmd_index, service::raft_topology_cmd) -> service::raft_topology_cmd_result;
|
||||
verb [[cancellable]] raft_pull_topology_snapshot (service::raft_topology_pull_params) -> service::raft_topology_snapshot;
|
||||
verb [[cancellable]] tablet_stream_data (locator::global_tablet_id);
|
||||
verb [[cancellable]] tablet_cleanup (locator::global_tablet_id);
|
||||
}
|
||||
|
||||
@@ -494,6 +494,7 @@ effective_replication_map::effective_replication_map(replication_strategy_ptr rs
|
||||
: _rs(std::move(rs))
|
||||
, _tmptr(std::move(tmptr))
|
||||
, _replication_factor(replication_factor)
|
||||
, _validity_abort_source(std::make_unique<abort_source>())
|
||||
{ }
|
||||
|
||||
vnode_effective_replication_map::factory_key vnode_effective_replication_map::make_factory_key(const replication_strategy_ptr& rs, const token_metadata_ptr& tmptr) {
|
||||
|
||||
@@ -173,8 +173,10 @@ protected:
|
||||
replication_strategy_ptr _rs;
|
||||
token_metadata_ptr _tmptr;
|
||||
size_t _replication_factor;
|
||||
std::unique_ptr<abort_source> _validity_abort_source;
|
||||
public:
|
||||
effective_replication_map(replication_strategy_ptr, token_metadata_ptr, size_t replication_factor) noexcept;
|
||||
effective_replication_map(effective_replication_map&&) noexcept = default;
|
||||
virtual ~effective_replication_map() = default;
|
||||
|
||||
const abstract_replication_strategy& get_replication_strategy() const noexcept { return *_rs; }
|
||||
@@ -183,6 +185,16 @@ public:
|
||||
const topology& get_topology() const noexcept { return _tmptr->get_topology(); }
|
||||
size_t get_replication_factor() const noexcept { return _replication_factor; }
|
||||
|
||||
void invalidate() const noexcept {
|
||||
_validity_abort_source->request_abort();
|
||||
}
|
||||
|
||||
/// Returns a reference to abort_source which is aborted when this effective replication map
|
||||
/// is no longer the latest table's effective replication map.
|
||||
abort_source& get_validity_abort_source() const {
|
||||
return *_validity_abort_source;
|
||||
}
|
||||
|
||||
/// Returns addresses of replicas for a given token.
|
||||
/// Does not include pending replicas except for a pending replica which
|
||||
/// has the same address as one of the old replicas. This can be the case during "nodetool replace"
|
||||
|
||||
59
locator/tablet_metadata_guard.hh
Normal file
59
locator/tablet_metadata_guard.hh
Normal file
@@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Copyright (C) 2023-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "replica/database.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
|
||||
namespace locator {
|
||||
|
||||
/// A holder for table's effective_replication_map which
|
||||
/// automatically switches to the latest one as long as it
|
||||
/// does not affect the tablet associated with this guard.
|
||||
///
|
||||
/// This is useful for tracking long-running operations in
|
||||
/// the context of a single tablet which we don't want to
|
||||
/// block topology barriers for other tablets, only barriers for
|
||||
/// this tablet.
|
||||
class tablet_metadata_guard {
|
||||
lw_shared_ptr<replica::table> _table;
|
||||
global_tablet_id _tablet;
|
||||
effective_replication_map_ptr _erm;
|
||||
std::optional<tablet_transition_stage> _stage;
|
||||
seastar::abort_source _abort_source;
|
||||
optimized_optional<seastar::abort_source::subscription> _callback;
|
||||
private:
|
||||
void subscribe();
|
||||
void check() noexcept;
|
||||
public:
|
||||
tablet_metadata_guard(replica::table& table, global_tablet_id tablet);
|
||||
|
||||
/// Returns an abort_source which is signaled when effective_replication_map changes
|
||||
/// in a way which is relevant for the tablet associated with this guard.
|
||||
/// When this happens, the guard stops refreshing the effective_replication_map,
|
||||
/// which will block topology coordinator barriers until the guard is destroyed.
|
||||
///
|
||||
/// The abort_source is valid as long as this instance is alive.
|
||||
seastar::abort_source& get_abort_source() {
|
||||
return _abort_source;
|
||||
}
|
||||
|
||||
locator::token_metadata_ptr get_token_metadata() {
|
||||
return _erm->get_token_metadata_ptr();
|
||||
}
|
||||
|
||||
/// Returns tablet_map for the table of the tablet associated with this guard.
|
||||
/// The result is valid until the next deferring point.
|
||||
const locator::tablet_map& get_tablet_map() {
|
||||
return get_token_metadata()->tablets().get_tablet_map(_tablet.table);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace locator
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#include "locator/tablet_replication_strategy.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "locator/tablet_metadata_guard.hh"
|
||||
#include "locator/tablet_sharder.hh"
|
||||
#include "locator/token_range_splitter.hh"
|
||||
#include "dht/i_partitioner.hh"
|
||||
@@ -44,6 +45,8 @@ write_replica_set_selector get_selector_for_writes(tablet_transition_stage stage
|
||||
return write_replica_set_selector::next;
|
||||
case tablet_transition_stage::cleanup:
|
||||
return write_replica_set_selector::next;
|
||||
case tablet_transition_stage::end_migration:
|
||||
return write_replica_set_selector::next;
|
||||
}
|
||||
on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast<int>(stage)));
|
||||
}
|
||||
@@ -63,6 +66,8 @@ read_replica_set_selector get_selector_for_reads(tablet_transition_stage stage)
|
||||
return read_replica_set_selector::next;
|
||||
case tablet_transition_stage::cleanup:
|
||||
return read_replica_set_selector::next;
|
||||
case tablet_transition_stage::end_migration:
|
||||
return read_replica_set_selector::next;
|
||||
}
|
||||
on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast<int>(stage)));
|
||||
}
|
||||
@@ -231,6 +236,7 @@ static const std::unordered_map<tablet_transition_stage, sstring> tablet_transit
|
||||
{tablet_transition_stage::streaming, "streaming"},
|
||||
{tablet_transition_stage::use_new, "use_new"},
|
||||
{tablet_transition_stage::cleanup, "cleanup"},
|
||||
{tablet_transition_stage::end_migration, "end_migration"},
|
||||
};
|
||||
|
||||
static const std::unordered_map<sstring, tablet_transition_stage> tablet_transition_stage_from_name = std::invoke([] {
|
||||
@@ -511,6 +517,35 @@ effective_replication_map_ptr tablet_aware_replication_strategy::do_make_replica
|
||||
return seastar::make_shared<tablet_effective_replication_map>(table, std::move(rs), std::move(tm), replication_factor);
|
||||
}
|
||||
|
||||
void tablet_metadata_guard::check() noexcept {
|
||||
auto erm = _table->get_effective_replication_map();
|
||||
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(_tablet.table);
|
||||
auto* trinfo = tmap.get_tablet_transition_info(_tablet.tablet);
|
||||
if (bool(_stage) != bool(trinfo) || (_stage && _stage != trinfo->stage)) {
|
||||
_abort_source.request_abort();
|
||||
} else {
|
||||
_erm = std::move(erm);
|
||||
subscribe();
|
||||
}
|
||||
}
|
||||
|
||||
tablet_metadata_guard::tablet_metadata_guard(replica::table& table, global_tablet_id tablet)
|
||||
: _table(table.shared_from_this())
|
||||
, _tablet(tablet)
|
||||
, _erm(table.get_effective_replication_map())
|
||||
{
|
||||
subscribe();
|
||||
if (auto* trinfo = get_tablet_map().get_tablet_transition_info(tablet.tablet)) {
|
||||
_stage = trinfo->stage;
|
||||
}
|
||||
}
|
||||
|
||||
void tablet_metadata_guard::subscribe() {
|
||||
_callback = _erm->get_validity_abort_source().subscribe([this] () noexcept {
|
||||
check();
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
auto fmt::formatter<locator::global_tablet_id>::format(const locator::global_tablet_id& id, fmt::format_context& ctx) const
|
||||
|
||||
@@ -141,6 +141,7 @@ enum class tablet_transition_stage {
|
||||
write_both_read_new,
|
||||
use_new,
|
||||
cleanup,
|
||||
end_migration,
|
||||
};
|
||||
|
||||
sstring tablet_transition_stage_to_string(tablet_transition_stage);
|
||||
|
||||
@@ -597,6 +597,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::NODE_OPS_CMD:
|
||||
case messaging_verb::HINT_MUTATION:
|
||||
case messaging_verb::TABLET_STREAM_DATA:
|
||||
case messaging_verb::TABLET_CLEANUP:
|
||||
return 1;
|
||||
case messaging_verb::CLIENT_ID:
|
||||
case messaging_verb::MUTATION:
|
||||
|
||||
@@ -183,7 +183,8 @@ enum class messaging_verb : int32_t {
|
||||
RAFT_TOPOLOGY_CMD = 64,
|
||||
RAFT_PULL_TOPOLOGY_SNAPSHOT = 65,
|
||||
TABLET_STREAM_DATA = 66,
|
||||
LAST = 67,
|
||||
TABLET_CLEANUP = 67,
|
||||
LAST = 68,
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
|
||||
@@ -62,6 +62,7 @@
|
||||
#include "sstables/generation_type.hh"
|
||||
#include "db/rate_limiter.hh"
|
||||
#include "db/operation_type.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "utils/serialized_action.hh"
|
||||
#include "compaction/compaction_fwd.hh"
|
||||
|
||||
@@ -787,6 +788,7 @@ public:
|
||||
const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; }
|
||||
void update_effective_replication_map(locator::effective_replication_map_ptr);
|
||||
[[gnu::always_inline]] bool uses_tablets() const;
|
||||
future<> cleanup_tablet(locator::tablet_id);
|
||||
future<const_mutation_partition_ptr> find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const;
|
||||
future<const_row_ptr> find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const;
|
||||
shard_id shard_of(const mutation& m) const {
|
||||
|
||||
@@ -1675,6 +1675,9 @@ table::table(schema_ptr schema, config config, lw_shared_ptr<const storage_optio
|
||||
}
|
||||
|
||||
void table::update_effective_replication_map(locator::effective_replication_map_ptr erm) {
|
||||
if (_erm) {
|
||||
_erm->invalidate();
|
||||
}
|
||||
_erm = std::move(erm);
|
||||
}
|
||||
|
||||
@@ -2980,4 +2983,9 @@ bool table::requires_cleanup(const sstables::sstable_set& set) const {
|
||||
}));
|
||||
}
|
||||
|
||||
future<> table::cleanup_tablet(locator::tablet_id) {
|
||||
co_await flush();
|
||||
// FIXME: Remove sstables
|
||||
}
|
||||
|
||||
} // namespace replica
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#include "db/consistency_level.hh"
|
||||
#include "service/tablet_allocator.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "locator/tablet_metadata_guard.hh"
|
||||
#include "replica/tablet_mutation_builder.hh"
|
||||
#include <seastar/core/smp.hh>
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
@@ -303,6 +304,21 @@ future<> storage_service::wait_for_ring_to_settle() {
|
||||
slogger.info("Checking bootstrapping/leaving nodes: ok");
|
||||
}
|
||||
|
||||
static locator::node::state to_topology_node_state(node_state ns) {
|
||||
switch (ns) {
|
||||
case node_state::bootstrapping: return locator::node::state::bootstrapping;
|
||||
case node_state::decommissioning: return locator::node::state::being_decommissioned;
|
||||
case node_state::removing: return locator::node::state::being_removed;
|
||||
case node_state::normal: return locator::node::state::normal;
|
||||
case node_state::left_token_ring: return locator::node::state::left;
|
||||
case node_state::left: return locator::node::state::left;
|
||||
case node_state::replacing: return locator::node::state::replacing;
|
||||
case node_state::rebuilding: return locator::node::state::normal;
|
||||
case node_state::none: return locator::node::state::none;
|
||||
}
|
||||
on_internal_error(slogger, format("unhandled node state: {}", ns));
|
||||
}
|
||||
|
||||
future<> storage_service::topology_state_load() {
|
||||
#ifdef SEASTAR_DEBUG
|
||||
static bool running = false;
|
||||
@@ -364,8 +380,10 @@ future<> storage_service::topology_state_load() {
|
||||
|
||||
tmptr->set_version(_topology_state_machine._topology.version);
|
||||
|
||||
auto update_topology = [&] (inet_address ip, const replica_state& rs) {
|
||||
tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack}, std::nullopt, rs.shard_count);
|
||||
auto update_topology = [&] (locator::host_id id, inet_address ip, const replica_state& rs) {
|
||||
tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack},
|
||||
to_topology_node_state(rs.state), rs.shard_count);
|
||||
tmptr->update_host_id(id, ip);
|
||||
};
|
||||
|
||||
auto add_normal_node = [&] (raft::server_id id, const replica_state& rs) -> future<> {
|
||||
@@ -392,9 +410,8 @@ future<> storage_service::topology_state_load() {
|
||||
co_await _sys_ks.local().update_tokens(rs.ring.value().tokens);
|
||||
co_await _gossiper.add_local_application_state({{ gms::application_state::STATUS, gms::versioned_value::normal(rs.ring.value().tokens) }});
|
||||
}
|
||||
update_topology(ip, rs);
|
||||
update_topology(host_id, ip, rs);
|
||||
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip);
|
||||
tmptr->update_host_id(host_id, ip);
|
||||
};
|
||||
|
||||
for (const auto& [id, rs]: _topology_state_machine._topology.normal_nodes) {
|
||||
@@ -411,6 +428,8 @@ future<> storage_service::topology_state_load() {
|
||||
[[fallthrough]];
|
||||
case topology::transition_state::commit_cdc_generation:
|
||||
[[fallthrough]];
|
||||
case topology::transition_state::tablet_draining:
|
||||
[[fallthrough]];
|
||||
case topology::transition_state::write_both_read_old:
|
||||
return read_new_t::no;
|
||||
case topology::transition_state::write_both_read_new:
|
||||
@@ -423,7 +442,10 @@ future<> storage_service::topology_state_load() {
|
||||
auto ip = co_await id2ip(id);
|
||||
|
||||
slogger.trace("raft topology: loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}",
|
||||
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring->tokens);
|
||||
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate,
|
||||
seastar::value_of([&] () -> sstring {
|
||||
return rs.ring ? ::format("{}", rs.ring->tokens) : sstring("null");
|
||||
}));
|
||||
|
||||
switch (rs.state) {
|
||||
case node_state::bootstrapping:
|
||||
@@ -432,7 +454,7 @@ future<> storage_service::topology_state_load() {
|
||||
co_await _sys_ks.local().update_tokens(ip, {});
|
||||
co_await _sys_ks.local().update_peer_info(ip, "host_id", id.uuid());
|
||||
}
|
||||
update_topology(ip, rs);
|
||||
update_topology(host_id, ip, rs);
|
||||
if (_topology_state_machine._topology.normal_nodes.empty()) {
|
||||
// This is the first node in the cluster. Insert the tokens as normal to the token ring early
|
||||
// so we can perform writes to regular 'distributed' tables during the bootstrap procedure
|
||||
@@ -446,9 +468,8 @@ future<> storage_service::topology_state_load() {
|
||||
break;
|
||||
case node_state::decommissioning:
|
||||
case node_state::removing:
|
||||
update_topology(ip, rs);
|
||||
update_topology(host_id, ip, rs);
|
||||
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip);
|
||||
tmptr->update_host_id(host_id, ip);
|
||||
tmptr->add_leaving_endpoint(ip);
|
||||
co_await update_topology_change_info(tmptr, ::format("{} {}/{}", rs.state, id, ip));
|
||||
break;
|
||||
@@ -461,7 +482,10 @@ future<> storage_service::topology_state_load() {
|
||||
on_fatal_internal_error(slogger, ::format("Cannot map id of a node being replaced {} to its ip", replaced_id));
|
||||
}
|
||||
assert(existing_ip);
|
||||
update_topology(ip, rs);
|
||||
// FIXME: Topology cannot hold two IPs with different host ids yet so
|
||||
// when replacing we must advertise the replaced_id for the ip, otherwise
|
||||
// topology will complain about host id of a local node changing and fail.
|
||||
update_topology(ip == existing_ip ? locator::host_id(replaced_id.uuid()) : host_id, ip, rs);
|
||||
tmptr->add_replacing_endpoint(*existing_ip, ip);
|
||||
co_await update_topology_change_info(tmptr, ::format("replacing {}/{} by {}/{}", replaced_id, *existing_ip, id, ip));
|
||||
}
|
||||
@@ -1379,6 +1403,7 @@ class topology_coordinator {
|
||||
// Next migration of the same tablet is guaranteed to use a different instance.
|
||||
struct tablet_migration_state {
|
||||
background_action_holder streaming;
|
||||
background_action_holder cleanup;
|
||||
std::unordered_map<locator::tablet_transition_stage, background_action_holder> barriers;
|
||||
};
|
||||
|
||||
@@ -1446,13 +1471,17 @@ class topology_coordinator {
|
||||
}
|
||||
|
||||
future<> generate_migration_updates(std::vector<canonical_mutation>& out, const group0_guard& guard, const migration_plan& plan) {
|
||||
for (const tablet_migration_info& mig : plan) {
|
||||
for (const tablet_migration_info& mig : plan.migrations()) {
|
||||
co_await coroutine::maybe_yield();
|
||||
generate_migration_update(out, guard, mig);
|
||||
}
|
||||
}
|
||||
|
||||
future<> handle_tablet_migration(group0_guard guard) {
|
||||
// When "drain" is true, we migrate tablets only as long as there are nodes to drain
|
||||
// and then change the transition state to write_both_read_old. Also, while draining,
|
||||
// we ignore pending topology requests which normally interrupt load balancing.
|
||||
// When "drain" is false, we do regular load balancing.
|
||||
future<> handle_tablet_migration(group0_guard guard, bool drain) {
|
||||
// This step acts like a pump which advances state machines of individual tablets,
|
||||
// batching barriers and group0 updates.
|
||||
// If progress cannot be made, e.g. because all transitions are streaming, we block
|
||||
@@ -1488,11 +1517,15 @@ class topology_coordinator {
|
||||
.build());
|
||||
};
|
||||
|
||||
auto transition_to_with_barrier = [&] (locator::tablet_transition_stage stage) {
|
||||
if (advance_in_background(gid, tablet_state.barriers[stage], "barrier", [&] {
|
||||
auto do_barrier = [&] {
|
||||
return advance_in_background(gid, tablet_state.barriers[trinfo.stage], "barrier", [&] {
|
||||
needs_barrier = true;
|
||||
return barrier.get_shared_future();
|
||||
})) {
|
||||
});
|
||||
};
|
||||
|
||||
auto transition_to_with_barrier = [&] (locator::tablet_transition_stage stage) {
|
||||
if (do_barrier()) {
|
||||
transition_to(stage);
|
||||
}
|
||||
};
|
||||
@@ -1524,30 +1557,51 @@ class topology_coordinator {
|
||||
transition_to_with_barrier(locator::tablet_transition_stage::cleanup);
|
||||
break;
|
||||
case locator::tablet_transition_stage::cleanup:
|
||||
// FIXME: Actually perform the cleanup. Block on integration with compaction groups.
|
||||
_tablets.erase(gid);
|
||||
updates.emplace_back(
|
||||
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table)
|
||||
.del_transition(last_token)
|
||||
.set_replicas(last_token, trinfo.next)
|
||||
.build());
|
||||
if (advance_in_background(gid, tablet_state.cleanup, "cleanup", [&] {
|
||||
locator::tablet_replica dst = locator::get_leaving_replica(tmap.get_tablet_info(gid.tablet), trinfo);
|
||||
slogger.info("raft topology: Initiating tablet cleanup of {} on {}", gid, dst);
|
||||
return ser::storage_service_rpc_verbs::send_tablet_cleanup(&_messaging,
|
||||
netw::msg_addr(id2ip(dst.host)), _as, gid);
|
||||
})) {
|
||||
transition_to(locator::tablet_transition_stage::end_migration);
|
||||
}
|
||||
break;
|
||||
case locator::tablet_transition_stage::end_migration:
|
||||
// Need a separate stage and a barrier after cleanup RPC to cut off stale RPCs.
|
||||
// See do_tablet_operation() doc.
|
||||
if (do_barrier()) {
|
||||
_tablets.erase(gid);
|
||||
updates.emplace_back(
|
||||
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table)
|
||||
.del_transition(last_token)
|
||||
.set_replicas(last_token, trinfo.next)
|
||||
.build());
|
||||
}
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
// In order to keep the cluster saturated, ask the load balancer for more transitions.
|
||||
// Unless there is a pending topology change operation.
|
||||
auto ts = guard.write_timestamp();
|
||||
auto [preempt, new_guard] = should_preempt_balancing(std::move(guard));
|
||||
guard = std::move(new_guard);
|
||||
if (ts != guard.write_timestamp()) {
|
||||
// We rely on the fact that should_preempt_balancing() does not release the guard
|
||||
// so that tablet metadata reading and updates are atomic.
|
||||
on_internal_error(slogger, "should_preempt_balancing() retook the guard");
|
||||
bool preempt = false;
|
||||
if (!drain) {
|
||||
// When draining, this method is invoked with an active node transition, which
|
||||
// would normally cause preemption, which we don't want here.
|
||||
auto ts = guard.write_timestamp();
|
||||
auto [new_preempt, new_guard] = should_preempt_balancing(std::move(guard));
|
||||
preempt = new_preempt;
|
||||
guard = std::move(new_guard);
|
||||
if (ts != guard.write_timestamp()) {
|
||||
// We rely on the fact that should_preempt_balancing() does not release the guard
|
||||
// so that tablet metadata reading and updates are atomic.
|
||||
on_internal_error(slogger, "should_preempt_balancing() retook the guard");
|
||||
}
|
||||
}
|
||||
if (!preempt) {
|
||||
auto plan = co_await _tablet_allocator.balance_tablets(get_token_metadata_ptr());
|
||||
co_await generate_migration_updates(updates, guard, plan);
|
||||
if (!drain || plan.has_nodes_to_drain()) {
|
||||
co_await generate_migration_updates(updates, guard, plan);
|
||||
}
|
||||
}
|
||||
|
||||
// The updates have to be executed under the same guard which was used to read tablet metadata
|
||||
@@ -1589,11 +1643,19 @@ class topology_coordinator {
|
||||
co_return;
|
||||
}
|
||||
|
||||
updates.emplace_back(
|
||||
topology_mutation_builder(guard.write_timestamp())
|
||||
.del_transition_state()
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.build());
|
||||
if (drain) {
|
||||
updates.emplace_back(
|
||||
topology_mutation_builder(guard.write_timestamp())
|
||||
.set_transition_state(topology::transition_state::write_both_read_old)
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.build());
|
||||
} else {
|
||||
updates.emplace_back(
|
||||
topology_mutation_builder(guard.write_timestamp())
|
||||
.del_transition_state()
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.build());
|
||||
}
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), "Finished tablet migration");
|
||||
}
|
||||
|
||||
@@ -1731,6 +1793,9 @@ class topology_coordinator {
|
||||
co_await update_topology_state(std::move(guard), {builder.build()}, std::move(str));
|
||||
}
|
||||
break;
|
||||
case topology::transition_state::tablet_draining:
|
||||
co_await handle_tablet_migration(std::move(guard), true);
|
||||
break;
|
||||
case topology::transition_state::write_both_read_old: {
|
||||
auto node = get_node_to_work_on(std::move(guard));
|
||||
|
||||
@@ -1883,7 +1948,8 @@ class topology_coordinator {
|
||||
}
|
||||
break;
|
||||
case topology::transition_state::tablet_migration:
|
||||
co_await handle_tablet_migration(std::move(guard));
|
||||
co_await handle_tablet_migration(std::move(guard), false);
|
||||
break;
|
||||
}
|
||||
co_return true;
|
||||
};
|
||||
@@ -1930,7 +1996,7 @@ class topology_coordinator {
|
||||
// start decommission and put tokens of decommissioning nodes into write_both_read_old state
|
||||
// meaning that reads will go to the replica being decommissioned
|
||||
// but writes will go to new owner as well
|
||||
builder.set_transition_state(topology::transition_state::write_both_read_old)
|
||||
builder.set_transition_state(topology::transition_state::tablet_draining)
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.with_node(node.id)
|
||||
.set("node_state", node_state::decommissioning)
|
||||
@@ -1940,10 +2006,7 @@ class topology_coordinator {
|
||||
break;
|
||||
case topology_request::remove:
|
||||
assert(node.rs->ring);
|
||||
// start removing and put tokens of a node been removed into write_both_read_old state
|
||||
// meaning that reads will go to the replica being removed (it is dead though)
|
||||
// but writes will go to new owner as well
|
||||
builder.set_transition_state(topology::transition_state::write_both_read_old)
|
||||
builder.set_transition_state(topology::transition_state::tablet_draining)
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.with_node(node.id)
|
||||
.set("node_state", node_state::removing)
|
||||
@@ -1957,10 +2020,7 @@ class topology_coordinator {
|
||||
auto it = _topo_sm._topology.normal_nodes.find(replaced_id);
|
||||
assert(it != _topo_sm._topology.normal_nodes.end());
|
||||
assert(it->second.ring && it->second.state == node_state::normal);
|
||||
// start replacing and take ownership of the tokens of a node been replaced
|
||||
// and put them into write_both_read_old state meaning that reads will go
|
||||
// to the replica being removed (it is dead though) but writes will go to new owner as well
|
||||
builder.set_transition_state(topology::transition_state::write_both_read_old)
|
||||
builder.set_transition_state(topology::transition_state::tablet_draining)
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.with_node(node.id)
|
||||
.set("node_state", node_state::replacing)
|
||||
@@ -6060,58 +6120,129 @@ inet_address storage_service::host2ip(locator::host_id host) {
|
||||
return *ip;
|
||||
}
|
||||
|
||||
// Streams data to the pending tablet replica of a given tablet on this node.
|
||||
// The source tablet replica is determined from the current transition info of the tablet.
|
||||
future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
|
||||
// The coordinator does not execute global token metadata barrier before jumping to "streaming" stage, so we need
|
||||
// Performs a replica-side operation for a given tablet.
|
||||
// What operation is performed is determined by "op" based on the
|
||||
// current state of tablet metadata. The coordinator is supposed to prepare tablet
|
||||
// metadata according to his intent and trigger the operation,
|
||||
// without passing any transient information.
|
||||
//
|
||||
// If the operation succeeds, and the coordinator is still valid, it means
|
||||
// that the operation intended by the coordinator was performed.
|
||||
// If the coordinator is no longer valid, the operation may succeed but
|
||||
// the actual operation performed may be different than intended, it may
|
||||
// be the one intended by the new coordinator. This is not a problem
|
||||
// because the old coordinator should do nothing with such result.
|
||||
//
|
||||
// The triggers may be retried. They may also be reordered with older triggers, from
|
||||
// the same or a different coordinator. There is a protocol which ensures that
|
||||
// stale triggers won't cause operations to run beyond the migration stage they were
|
||||
// intended for. For example, that streaming is not still running after the coordinator
|
||||
// moved past the "streaming" stage, and that it won't be started when the stage is not appropriate.
|
||||
// A non-stale trigger is the one which completed successfully and caused the valid coordinator
|
||||
// to advance tablet migration to the next stage. Other triggers are called stale.
|
||||
// We can divide stale triggers into categories:
|
||||
// (1) Those which start after the tablet was moved to the next stage
|
||||
// Those which start before the tablet was moved to the next stage,
|
||||
// (2) ...but after the non-stale trigger finished
|
||||
// (3) ...but before the non-stale trigger finished
|
||||
//
|
||||
// By "start" I mean the atomic block which inserts into _tablet_ops, and by "finish" I mean
|
||||
// removal from _tablet_ops.
|
||||
// So event ordering is local from the perspective of this replica, and is linear because
|
||||
// this happens on the same shard.
|
||||
//
|
||||
// What prevents (1) from running is the fact that triggers check the state of tablet
|
||||
// metadata, and will fail immediately if the stage is not appropriate. It can happen
|
||||
// that the trigger is so stale that it will match with an appropriate stage of the next
|
||||
// migration of the same tablet. This is not a problem because we fall into the same
|
||||
// category as a stale trigger which was started in the new migration, so cases (2) or (3) apply.
|
||||
//
|
||||
// What prevents (2) from running is the fact that after the coordinator moves on to
|
||||
// the next stage, it executes a token metadata barrier, which will wait for such triggers
|
||||
// to complete as they hold on to erm via tablet_metadata_barrier. They should be aborted
|
||||
// soon after the coordinator changes the stage by the means of tablet_metadata_barrier::get_abort_source().
|
||||
//
|
||||
// What prevents (3) from running is that they will join with the non-stale trigger, or non-stale
|
||||
// trigger will join with them, depending on which came first. In that case they finish at the same time.
|
||||
//
|
||||
// It's very important that the global token metadata barrier involves all nodes which
|
||||
// may receive stale triggers started in the previous stage, so that those nodes will
|
||||
// see tablet metadata which reflects group0 state. This will cut-off stale triggers
|
||||
// as soon as the coordinator moves to the next stage.
|
||||
future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet,
|
||||
sstring op_name,
|
||||
std::function<future<>(locator::tablet_metadata_guard&)> op) {
|
||||
// The coordinator may not execute global token metadata barrier before triggering the operation, so we need
|
||||
// a barrier here to see the token metadata which is at least as recent as that of the sender.
|
||||
auto& raft_server = _group0->group0_server();
|
||||
co_await raft_server.read_barrier(&_abort_source);
|
||||
|
||||
auto tm = _shared_token_metadata.get();
|
||||
auto& tmap = tm->tablets().get_tablet_map(tablet.table);
|
||||
auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet);
|
||||
|
||||
// Check if the request is still valid.
|
||||
// If there is mismatch, it means this streaming was canceled and the coordinator moved on.
|
||||
if (!trinfo) {
|
||||
throw std::runtime_error(format("No transition info for tablet {}", tablet));
|
||||
}
|
||||
if (trinfo->stage != locator::tablet_transition_stage::streaming) {
|
||||
throw std::runtime_error(format("Tablet {} stage is not at streaming", tablet));
|
||||
}
|
||||
if (trinfo->pending_replica.host != tm->get_my_id()) {
|
||||
throw std::runtime_error(format("Tablet {} has pending replica different than this one", tablet));
|
||||
}
|
||||
|
||||
auto& tinfo = tmap.get_tablet_info(tablet.tablet);
|
||||
auto range = tmap.get_token_range(tablet.tablet);
|
||||
locator::tablet_replica leaving_replica = locator::get_leaving_replica(tinfo, *trinfo);
|
||||
if (leaving_replica.host == tm->get_my_id()) {
|
||||
// The algorithm doesn't work with tablet migration within the same node because
|
||||
// it assumes there is only one tablet replica, picked by the sharder, on local node.
|
||||
throw std::runtime_error(format("Cannot stream within the same node, tablet: {}, shard {} -> {}",
|
||||
tablet, leaving_replica.shard, trinfo->pending_replica.shard));
|
||||
}
|
||||
auto leaving_replica_ip = host2ip(leaving_replica.host);
|
||||
|
||||
if (_tablet_streaming[tablet]) {
|
||||
slogger.debug("Streaming retry joining with existing session for tablet {}", tablet);
|
||||
co_await _tablet_streaming[tablet]->get_future();
|
||||
if (_tablet_ops.contains(tablet)) {
|
||||
slogger.debug("{} retry joining with existing session for tablet {}", op_name, tablet);
|
||||
co_await _tablet_ops[tablet].done.get_future();
|
||||
co_return;
|
||||
}
|
||||
|
||||
locator::tablet_metadata_guard guard(_db.local().find_column_family(tablet.table), tablet);
|
||||
auto& as = guard.get_abort_source();
|
||||
auto sub = _abort_source.subscribe([&as] () noexcept {
|
||||
as.request_abort();
|
||||
});
|
||||
|
||||
auto async_gate_holder = _async_gate.hold();
|
||||
promise<> p;
|
||||
_tablet_streaming[tablet] = seastar::shared_future<>(p.get_future());
|
||||
auto erase_tablet_streaming = seastar::defer([&] {
|
||||
_tablet_streaming.erase(tablet);
|
||||
_tablet_ops.emplace(tablet, tablet_operation {
|
||||
op_name, seastar::shared_future<>(p.get_future())
|
||||
});
|
||||
auto erase_registry_entry = seastar::defer([&] {
|
||||
_tablet_ops.erase(tablet);
|
||||
});
|
||||
|
||||
try {
|
||||
co_await op(guard);
|
||||
p.set_value();
|
||||
slogger.debug("{} for tablet migration of {} successful", op_name, tablet);
|
||||
} catch (...) {
|
||||
p.set_exception(std::current_exception());
|
||||
slogger.warn("{} for tablet migration of {} failed: {}", op_name, tablet, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
// Streams data to the pending tablet replica of a given tablet on this node.
|
||||
// The source tablet replica is determined from the current transition info of the tablet.
|
||||
future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
|
||||
return do_tablet_operation(tablet, "Streaming", [this, tablet] (locator::tablet_metadata_guard& guard) -> future<> {
|
||||
auto tm = guard.get_token_metadata();
|
||||
auto& tmap = guard.get_tablet_map();
|
||||
auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet);
|
||||
|
||||
// Check if the request is still valid.
|
||||
// If there is mismatch, it means this streaming was canceled and the coordinator moved on.
|
||||
if (!trinfo) {
|
||||
throw std::runtime_error(format("No transition info for tablet {}", tablet));
|
||||
}
|
||||
if (trinfo->stage != locator::tablet_transition_stage::streaming) {
|
||||
throw std::runtime_error(format("Tablet {} stage is not at streaming", tablet));
|
||||
}
|
||||
if (trinfo->pending_replica.host != tm->get_my_id()) {
|
||||
throw std::runtime_error(format("Tablet {} has pending replica different than this one", tablet));
|
||||
}
|
||||
|
||||
auto& tinfo = tmap.get_tablet_info(tablet.tablet);
|
||||
auto range = tmap.get_token_range(tablet.tablet);
|
||||
locator::tablet_replica leaving_replica = locator::get_leaving_replica(tinfo, *trinfo);
|
||||
if (leaving_replica.host == tm->get_my_id()) {
|
||||
// The algorithm doesn't work with tablet migration within the same node because
|
||||
// it assumes there is only one tablet replica, picked by the sharder, on local node.
|
||||
throw std::runtime_error(format("Cannot stream within the same node, tablet: {}, shard {} -> {}",
|
||||
tablet, leaving_replica.shard, trinfo->pending_replica.shard));
|
||||
}
|
||||
auto leaving_replica_ip = host2ip(leaving_replica.host);
|
||||
|
||||
auto& table = _db.local().find_column_family(tablet.table);
|
||||
std::vector<sstring> tables = {table.schema()->cf_name()};
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tm, _abort_source,
|
||||
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, std::move(tm), guard.get_abort_source(),
|
||||
get_broadcast_address(), _snitch.local()->get_location(),
|
||||
"Tablet migration", streaming::stream_reason::tablet_migration, std::move(tables));
|
||||
streamer->add_source_filter(std::make_unique<dht::range_streamer::failure_detector_source_filter>(
|
||||
@@ -6121,14 +6252,44 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
|
||||
ranges_per_endpoint[leaving_replica_ip].emplace_back(range);
|
||||
streamer->add_rx_ranges(table.schema()->ks_name(), std::move(ranges_per_endpoint));
|
||||
co_await streamer->stream_async();
|
||||
co_return;
|
||||
});
|
||||
}
|
||||
|
||||
p.set_value();
|
||||
slogger.info("Streaming for tablet migration of {} successful", tablet);
|
||||
} catch (...) {
|
||||
p.set_exception(std::current_exception());
|
||||
slogger.warn("Streaming for tablet migration of {} from {} failed: {}", tablet, leaving_replica, std::current_exception());
|
||||
throw;
|
||||
}
|
||||
future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) {
|
||||
return do_tablet_operation(tablet, "Cleanup", [this, tablet] (locator::tablet_metadata_guard& guard) {
|
||||
shard_id shard;
|
||||
|
||||
{
|
||||
auto tm = guard.get_token_metadata();
|
||||
auto& tmap = guard.get_tablet_map();
|
||||
auto *trinfo = tmap.get_tablet_transition_info(tablet.tablet);
|
||||
|
||||
// Check if the request is still valid.
|
||||
// If there is mismatch, it means this cleanup was canceled and the coordinator moved on.
|
||||
if (!trinfo) {
|
||||
throw std::runtime_error(format("No transition info for tablet {}", tablet));
|
||||
}
|
||||
if (trinfo->stage != locator::tablet_transition_stage::cleanup) {
|
||||
throw std::runtime_error(format("Tablet {} stage is not at cleanup", tablet));
|
||||
}
|
||||
|
||||
auto& tinfo = tmap.get_tablet_info(tablet.tablet);
|
||||
locator::tablet_replica leaving_replica = locator::get_leaving_replica(tinfo, *trinfo);
|
||||
if (leaving_replica.host != tm->get_my_id()) {
|
||||
throw std::runtime_error(format("Tablet {} has leaving replica different than this one", tablet));
|
||||
}
|
||||
auto shard_opt = tmap.get_shard(tablet.tablet, tm->get_my_id());
|
||||
if (!shard_opt) {
|
||||
on_internal_error(slogger, format("Tablet {} has no shard on this node", tablet));
|
||||
}
|
||||
shard = *shard_opt;
|
||||
}
|
||||
return _db.invoke_on(shard, [tablet] (replica::database& db) {
|
||||
auto& table = db.find_column_family(tablet.table);
|
||||
return table.cleanup_tablet(tablet.tablet);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void storage_service::init_messaging_service(sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||
@@ -6197,6 +6358,9 @@ void storage_service::init_messaging_service(sharded<db::system_distributed_keys
|
||||
return ss.stream_tablet(tablet);
|
||||
});
|
||||
});
|
||||
ser::storage_service_rpc_verbs::register_tablet_cleanup(&_messaging.local(), [this] (locator::global_tablet_id tablet) {
|
||||
return cleanup_tablet(tablet);
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::uninit_messaging_service() {
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include "service/endpoint_lifecycle_subscriber.hh"
|
||||
#include "locator/abstract_replication_strategy.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "locator/tablet_metadata_guard.hh"
|
||||
#include "inet_address_vectors.hh"
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/core/condition-variable.hh>
|
||||
@@ -114,6 +115,13 @@ private:
|
||||
using inet_address = gms::inet_address;
|
||||
using versioned_value = gms::versioned_value;
|
||||
|
||||
struct tablet_operation {
|
||||
sstring name;
|
||||
shared_future<> done;
|
||||
};
|
||||
|
||||
using tablet_op_registry = std::unordered_map<locator::global_tablet_id, tablet_operation>;
|
||||
|
||||
abort_source& _abort_source;
|
||||
gms::feature_service& _feature_service;
|
||||
distributed<replica::database>& _db;
|
||||
@@ -147,7 +155,11 @@ private:
|
||||
future<> node_ops_abort(node_ops_id ops_uuid);
|
||||
void node_ops_signal_abort(std::optional<node_ops_id> ops_uuid);
|
||||
future<> node_ops_abort_thread();
|
||||
future<> do_tablet_operation(locator::global_tablet_id tablet,
|
||||
sstring op_name,
|
||||
std::function<future<>(locator::tablet_metadata_guard&)> op);
|
||||
future<> stream_tablet(locator::global_tablet_id);
|
||||
future<> cleanup_tablet(locator::global_tablet_id);
|
||||
inet_address host2ip(locator::host_id);
|
||||
public:
|
||||
storage_service(abort_source& as, distributed<replica::database>& db,
|
||||
@@ -769,7 +781,7 @@ private:
|
||||
std::optional<shared_future<>> _decomission_result;
|
||||
std::optional<shared_future<>> _rebuild_result;
|
||||
std::unordered_map<raft::server_id, std::optional<shared_future<>>> _remove_result;
|
||||
std::unordered_map<locator::global_tablet_id, std::optional<shared_future<>>> _tablet_streaming;
|
||||
tablet_op_registry _tablet_ops;
|
||||
// During decommission, the node waits for the coordinator to tell it to shut down.
|
||||
std::optional<promise<>> _shutdown_request_promise;
|
||||
struct {
|
||||
|
||||
@@ -141,6 +141,14 @@ public:
|
||||
/// means that many under-loaded nodes can be driven forward to balance concurrently because the load balancer
|
||||
/// will alternate between them across make_plan() calls.
|
||||
///
|
||||
/// The algorithm behaves differently when there are decommissioning nodes which have tablet replicas.
|
||||
/// In this case, we move those tablets away first. The balancing works in the opposite direction.
|
||||
/// Rather than picking a single least-loaded target and moving tablets into it from many sources,
|
||||
/// we have a single source and move tablets to multiple targets. This process necessarily disregards
|
||||
/// convergence checks, and the stop condition is that the source is drained. We still take target
|
||||
/// load into consideration and pick least-loaded targets first. When draining is not possible
|
||||
/// because there is no viable new replica for a tablet, load balancing will throw an exception.
|
||||
///
|
||||
/// If the algorithm is called with active tablet migrations in tablet metadata, those are treated
|
||||
/// by load balancer as if they were already completed. This allows the algorithm to incrementally
|
||||
/// make decision which when executed with active migrations will produce the desired result.
|
||||
@@ -179,7 +187,7 @@ class load_balancer {
|
||||
};
|
||||
|
||||
struct node_load {
|
||||
host_id node;
|
||||
host_id id;
|
||||
uint64_t shard_count = 0;
|
||||
uint64_t tablet_count = 0;
|
||||
|
||||
@@ -189,6 +197,16 @@ class load_balancer {
|
||||
std::vector<shard_id> shards_by_load; // heap which tracks most-loaded shards using shards_by_load_cmp().
|
||||
std::vector<shard_load> shards; // Indexed by shard_id to which a given shard_load corresponds.
|
||||
|
||||
std::optional<locator::load_sketch> target_load_sketch;
|
||||
|
||||
future<load_sketch&> get_load_sketch(const token_metadata_ptr& tm) {
|
||||
if (!target_load_sketch) {
|
||||
target_load_sketch.emplace(tm);
|
||||
co_await target_load_sketch->populate(id);
|
||||
}
|
||||
co_return *target_load_sketch;
|
||||
}
|
||||
|
||||
// Call when tablet_count changes.
|
||||
void update() {
|
||||
avg_load = get_avg_load(tablet_count);
|
||||
@@ -265,6 +283,8 @@ private:
|
||||
return false;
|
||||
case tablet_transition_stage::cleanup:
|
||||
return false;
|
||||
case tablet_transition_stage::end_migration:
|
||||
return false;
|
||||
}
|
||||
on_internal_error(lblogger, format("Invalid transition stage: {}", static_cast<int>(trinfo->stage)));
|
||||
}
|
||||
@@ -283,7 +303,7 @@ public:
|
||||
for (auto&& dc : topo.get_datacenters()) {
|
||||
auto dc_plan = co_await make_plan(dc);
|
||||
lblogger.info("Prepared {} migrations in DC {}", dc_plan.size(), dc);
|
||||
std::move(dc_plan.begin(), dc_plan.end(), std::back_inserter(plan));
|
||||
plan.merge(std::move(dc_plan));
|
||||
}
|
||||
|
||||
lblogger.info("Prepared {} migrations", plan.size());
|
||||
@@ -291,6 +311,8 @@ public:
|
||||
}
|
||||
|
||||
future<migration_plan> make_plan(dc_name dc) {
|
||||
migration_plan plan;
|
||||
|
||||
_stats.for_dc(dc).calls++;
|
||||
lblogger.info("Examining DC {}", dc);
|
||||
|
||||
@@ -305,17 +327,33 @@ public:
|
||||
// Select subset of nodes to balance.
|
||||
|
||||
std::unordered_map<host_id, node_load> nodes;
|
||||
std::unordered_set<host_id> nodes_to_drain;
|
||||
topo.for_each_node([&] (const locator::node* node_ptr) {
|
||||
if (node_ptr->get_state() == locator::node::state::normal && node_ptr->dc_rack().dc == dc) {
|
||||
if (node_ptr->dc_rack().dc != dc) {
|
||||
return;
|
||||
}
|
||||
if (node_ptr->get_state() == locator::node::state::normal
|
||||
|| node_ptr->get_state() == locator::node::state::being_decommissioned) {
|
||||
node_load& load = nodes[node_ptr->host_id()];
|
||||
load.id = node_ptr->host_id();
|
||||
load.shard_count = node_ptr->get_shard_count();
|
||||
load.shards.resize(load.shard_count);
|
||||
if (!load.shard_count) {
|
||||
throw std::runtime_error(format("Shard count of {} not found in topology", node_ptr->host_id()));
|
||||
}
|
||||
if (node_ptr->get_state() == locator::node::state::being_decommissioned) {
|
||||
lblogger.info("Will drain node {} from DC {}", node_ptr->host_id(), dc);
|
||||
nodes_to_drain.emplace(node_ptr->host_id());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (nodes.empty()) {
|
||||
lblogger.debug("No nodes to balance.");
|
||||
_stats.for_dc(dc).stop_balance++;
|
||||
co_return plan;
|
||||
}
|
||||
|
||||
// Compute tablet load on nodes.
|
||||
|
||||
for (auto&& [table, tmap_] : _tm->tablets().all_tables()) {
|
||||
@@ -338,6 +376,20 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
// Detect finished drain.
|
||||
|
||||
for (auto i = nodes_to_drain.begin(); i != nodes_to_drain.end();) {
|
||||
if (nodes[*i].tablet_count == 0) {
|
||||
lblogger.info("Node {} is already drained, ignoring", *i);
|
||||
nodes.erase(*i);
|
||||
i = nodes_to_drain.erase(i);
|
||||
} else {
|
||||
++i;
|
||||
}
|
||||
}
|
||||
|
||||
plan.set_has_nodes_to_drain(!nodes_to_drain.empty());
|
||||
|
||||
// Compute load imbalance.
|
||||
|
||||
load_type max_load = 0;
|
||||
@@ -345,29 +397,34 @@ public:
|
||||
std::optional<host_id> min_load_node = std::nullopt;
|
||||
for (auto&& [host, load] : nodes) {
|
||||
load.update();
|
||||
if (!min_load_node || load.avg_load < min_load) {
|
||||
min_load = load.avg_load;
|
||||
min_load_node = host;
|
||||
}
|
||||
if (load.avg_load > max_load) {
|
||||
max_load = load.avg_load;
|
||||
}
|
||||
_stats.for_node(dc, host).load = load.avg_load;
|
||||
}
|
||||
|
||||
if (!shuffle && max_load == min_load) {
|
||||
// load is balanced.
|
||||
// TODO: Evaluate and fix intra-node balance.
|
||||
_stats.for_dc(dc).stop_balance++;
|
||||
co_return migration_plan();
|
||||
if (!nodes_to_drain.contains(host)) {
|
||||
if (!min_load_node || load.avg_load < min_load) {
|
||||
min_load = load.avg_load;
|
||||
min_load_node = host;
|
||||
}
|
||||
if (load.avg_load > max_load) {
|
||||
max_load = load.avg_load;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (auto&& [host, load] : nodes) {
|
||||
lblogger.info("Node {}: rack={} avg_load={}, tablets={}, shards={}",
|
||||
host, topo.find_node(host)->dc_rack().rack, load.avg_load, load.tablet_count, load.shard_count);
|
||||
auto& node = topo.get_node(host);
|
||||
lblogger.info("Node {}: rack={} avg_load={}, tablets={}, shards={}, state={}",
|
||||
host, node.dc_rack().rack, load.avg_load, load.tablet_count, load.shard_count, node.get_state());
|
||||
}
|
||||
|
||||
if (nodes_to_drain.empty()) {
|
||||
if (!shuffle && max_load == min_load) {
|
||||
// load is balanced.
|
||||
// TODO: Evaluate and fix intra-node balance.
|
||||
_stats.for_dc(dc).stop_balance++;
|
||||
co_return plan;
|
||||
}
|
||||
lblogger.info("target node: {}, avg_load: {}, max: {}", *min_load_node, min_load, max_load);
|
||||
}
|
||||
lblogger.info("target node: {}, avg_load: {}, max: {}", *min_load_node, min_load, max_load);
|
||||
auto target = *min_load_node;
|
||||
|
||||
// We want to saturate the target node so we migrate several tablets in parallel, one for each shard
|
||||
// on the target node. This assumes that the target node is well-balanced and that tablet migrations
|
||||
@@ -379,8 +436,8 @@ public:
|
||||
// will suffer because more loaded shards will not participate, which will under-utilize the node.
|
||||
// FIXME: To handle the above, we should rebalance the target node before migrating tablets from other nodes.
|
||||
|
||||
auto target_node = topo.find_node(target);
|
||||
auto batch_size = target_node->get_shard_count();
|
||||
auto target = *min_load_node;
|
||||
auto batch_size = nodes[target].shard_count;
|
||||
|
||||
// Compute per-shard load and candidate tablets.
|
||||
|
||||
@@ -422,12 +479,25 @@ public:
|
||||
|
||||
// Prepare candidate nodes and shards for heap-based balancing.
|
||||
|
||||
// Any given node is either in nodes_by_load or nodes_by_load_dst, but not both.
|
||||
// This means that either of the heap needs to be updated when the node's load changes, not both.
|
||||
|
||||
// heap which tracks most-loaded nodes in terms of avg_load.
|
||||
// It is used to find source tablet candidates.
|
||||
std::vector<host_id> nodes_by_load;
|
||||
nodes_by_load.reserve(nodes.size());
|
||||
|
||||
// heap which tracks least-loaded nodes in terms of avg_load.
|
||||
// Used to find candidates for target nodes.
|
||||
std::vector<host_id> nodes_by_load_dst;
|
||||
nodes_by_load_dst.reserve(nodes.size());
|
||||
|
||||
auto nodes_cmp = [&] (const host_id& a, const host_id& b) {
|
||||
return nodes[a].avg_load < nodes[b].avg_load;
|
||||
};
|
||||
auto nodes_dst_cmp = [&] (const host_id& a, const host_id& b) {
|
||||
return nodes_cmp(b, a);
|
||||
};
|
||||
|
||||
for (auto&& [host, node_load] : nodes) {
|
||||
if (lblogger.is_enabled(seastar::log_level::debug)) {
|
||||
@@ -439,23 +509,27 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
nodes_by_load.push_back(host);
|
||||
std::make_heap(node_load.shards_by_load.begin(), node_load.shards_by_load.end(), node_load.shards_by_load_cmp());
|
||||
if (host != target && (nodes_to_drain.empty() || nodes_to_drain.contains(host))) {
|
||||
nodes_by_load.push_back(host);
|
||||
std::make_heap(node_load.shards_by_load.begin(), node_load.shards_by_load.end(),
|
||||
node_load.shards_by_load_cmp());
|
||||
} else {
|
||||
nodes_by_load_dst.push_back(host);
|
||||
}
|
||||
}
|
||||
|
||||
std::make_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp);
|
||||
std::make_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
||||
|
||||
locator::load_sketch target_load(_tm);
|
||||
co_await target_load.populate(target);
|
||||
migration_plan plan;
|
||||
const tablet_metadata& tmeta = _tm->tablets();
|
||||
load_type max_off_candidate_load = 0; // max load among nodes which ran out of candidates.
|
||||
auto& target_info = nodes[target];
|
||||
const size_t max_skipped_migrations = target_info.shards.size() * 2;
|
||||
const size_t max_skipped_migrations = nodes[target].shards.size() * 2;
|
||||
size_t skipped_migrations = 0;
|
||||
while (plan.size() < batch_size) {
|
||||
co_await coroutine::maybe_yield();
|
||||
|
||||
// Pick a source tablet.
|
||||
|
||||
if (nodes_by_load.empty()) {
|
||||
lblogger.debug("No more candidate nodes");
|
||||
_stats.for_dc(dc).stop_no_candidates++;
|
||||
@@ -466,7 +540,110 @@ public:
|
||||
auto src_host = nodes_by_load.back();
|
||||
auto& src_node_info = nodes[src_host];
|
||||
|
||||
if (!shuffle) {
|
||||
if (src_node_info.shards_by_load.empty()) {
|
||||
lblogger.debug("candidate node {} ran out of candidate shards with {} tablets remaining.",
|
||||
src_host, src_node_info.tablet_count);
|
||||
max_off_candidate_load = std::max(max_off_candidate_load, src_node_info.avg_load);
|
||||
nodes_by_load.pop_back();
|
||||
continue;
|
||||
}
|
||||
auto push_back_node_candidate = seastar::defer([&] {
|
||||
std::push_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp);
|
||||
});
|
||||
|
||||
std::pop_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp());
|
||||
auto src_shard = src_node_info.shards_by_load.back();
|
||||
auto src = tablet_replica{src_host, src_shard};
|
||||
auto&& src_shard_info = src_node_info.shards[src_shard];
|
||||
if (src_shard_info.candidates.empty()) {
|
||||
lblogger.debug("shard {} ran out of candidates with {} tablets remaining.", src, src_shard_info.tablet_count);
|
||||
src_node_info.shards_by_load.pop_back();
|
||||
continue;
|
||||
}
|
||||
auto push_back_shard_candidate = seastar::defer([&] {
|
||||
std::push_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp());
|
||||
});
|
||||
|
||||
auto source_tablet = *src_shard_info.candidates.begin();
|
||||
src_shard_info.candidates.erase(source_tablet);
|
||||
auto& tmap = tmeta.get_tablet_map(source_tablet.table);
|
||||
|
||||
// Pick a target node.
|
||||
|
||||
if (nodes_by_load_dst.empty()) {
|
||||
lblogger.debug("No more target nodes");
|
||||
_stats.for_dc(dc).stop_no_candidates++;
|
||||
break;
|
||||
}
|
||||
|
||||
// The post-condition of this block is that nodes_by_load_dst.back() is a viable target node
|
||||
// for the source tablet.
|
||||
if (nodes_to_drain.empty()) {
|
||||
std::pop_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
||||
} else {
|
||||
std::unordered_set<host_id> replicas;
|
||||
std::unordered_map<sstring, int> rack_load;
|
||||
int max_rack_load = 0;
|
||||
for (auto&& r : tmap.get_tablet_info(source_tablet.tablet).replicas) {
|
||||
replicas.insert(r.host);
|
||||
if (nodes.contains(r.host)) {
|
||||
const locator::node& node = topo.get_node(r.host);
|
||||
rack_load[node.dc_rack().rack] += 1;
|
||||
max_rack_load = std::max(max_rack_load, rack_load[node.dc_rack().rack]);
|
||||
}
|
||||
}
|
||||
|
||||
auto end = nodes_by_load_dst.end();
|
||||
while (true) {
|
||||
if (nodes_by_load_dst.begin() == end) {
|
||||
throw std::runtime_error(format("Unable to find new replica for tablet {} on {} when draining {}",
|
||||
source_tablet, src, nodes_to_drain));
|
||||
}
|
||||
|
||||
pop_heap(nodes_by_load_dst.begin(), end, nodes_dst_cmp);
|
||||
--end;
|
||||
auto new_target = *end;
|
||||
|
||||
if (replicas.contains(new_target)) {
|
||||
lblogger.debug("next best target {} (avg_load={}) skipped because it is already a replica for {}",
|
||||
new_target, nodes[new_target].avg_load, source_tablet);
|
||||
continue;
|
||||
}
|
||||
|
||||
const locator::node& target_node = topo.get_node(new_target);
|
||||
const locator::node& source_node = topo.get_node(src_host);
|
||||
if (target_node.dc_rack().rack != source_node.dc_rack().rack
|
||||
&& (rack_load[target_node.dc_rack().rack] + 1 > max_rack_load)) {
|
||||
lblogger.debug("next best target {} (avg_load={}) skipped because it would overload rack {} "
|
||||
"with {} replicas of {}, current max is {}",
|
||||
new_target, nodes[new_target].avg_load, target_node.dc_rack().rack,
|
||||
rack_load[target_node.dc_rack().rack] + 1, source_tablet, max_rack_load);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Found a viable target, restore the heap
|
||||
std::swap(*end, nodes_by_load_dst.back());
|
||||
while (end != std::prev(nodes_by_load_dst.end())) {
|
||||
++end;
|
||||
push_heap(nodes_by_load_dst.begin(), end, nodes_dst_cmp);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
target = nodes_by_load_dst.back();
|
||||
auto& target_info = nodes[target];
|
||||
const locator::node& target_node = topo.get_node(target);
|
||||
auto push_back_target_node = seastar::defer([&] {
|
||||
std::push_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
|
||||
});
|
||||
|
||||
lblogger.debug("target node: {}, avg_load={}", target, target_info.avg_load);
|
||||
|
||||
// Check convergence conditions.
|
||||
|
||||
// When draining nodes, disable convergence checks so that all tablets are migrated away.
|
||||
if (!shuffle && nodes_to_drain.empty()) {
|
||||
// Check if all nodes reached the same avg_load. There are three sets of nodes: target, candidates (nodes_by_load)
|
||||
// and off-candidates (removed from nodes_by_load). At any time, the avg_load for target is not greater than
|
||||
// that of any candidate, and avg_load of any candidate is not greater than that of any in the off-candidates set.
|
||||
@@ -502,48 +679,24 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
if (src_node_info.shards_by_load.empty()) {
|
||||
lblogger.debug("candidate node {} ran out of candidate shards with {} tablets remaining.",
|
||||
src_host, src_node_info.tablet_count);
|
||||
max_off_candidate_load = std::max(max_off_candidate_load, src_node_info.avg_load);
|
||||
nodes_by_load.pop_back();
|
||||
continue;
|
||||
}
|
||||
auto push_back_node_candidate = seastar::defer([&] {
|
||||
std::push_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp);
|
||||
});
|
||||
|
||||
std::pop_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp());
|
||||
auto src_shard = src_node_info.shards_by_load.back();
|
||||
auto src = tablet_replica{src_host, src_shard};
|
||||
auto&& src_shard_info = src_node_info.shards[src_shard];
|
||||
if (src_shard_info.candidates.empty()) {
|
||||
lblogger.debug("shard {} ran out of candidates with {} tablets remaining.", src, src_shard_info.tablet_count);
|
||||
src_node_info.shards_by_load.pop_back();
|
||||
continue;
|
||||
}
|
||||
auto push_back_shard_candidate = seastar::defer([&] {
|
||||
std::push_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp());
|
||||
});
|
||||
|
||||
auto source_tablet = *src_shard_info.candidates.begin();
|
||||
src_shard_info.candidates.erase(source_tablet);
|
||||
|
||||
// Check replication strategy constraints.
|
||||
|
||||
auto same_rack = target_node->dc_rack().rack == topo.get_node(src.host).dc_rack().rack;
|
||||
std::unordered_map<sstring, int> rack_load; // Will be built if !same_rack
|
||||
bool check_rack_load = false;
|
||||
bool has_replica_on_target = false;
|
||||
auto& tmap = tmeta.get_tablet_map(source_tablet.table);
|
||||
for (auto&& r : tmap.get_tablet_info(source_tablet.tablet).replicas) {
|
||||
if (r.host == target) {
|
||||
has_replica_on_target = true;
|
||||
break;
|
||||
}
|
||||
if (!same_rack) {
|
||||
const locator::node& node = topo.get_node(r.host);
|
||||
if (node.dc_rack().dc == dc) {
|
||||
rack_load[node.dc_rack().rack] += 1;
|
||||
std::unordered_map<sstring, int> rack_load; // Will be built if check_rack_load
|
||||
|
||||
if (nodes_to_drain.empty()) {
|
||||
check_rack_load = target_node.dc_rack().rack != topo.get_node(src.host).dc_rack().rack;
|
||||
for (auto&& r: tmap.get_tablet_info(source_tablet.tablet).replicas) {
|
||||
if (r.host == target) {
|
||||
has_replica_on_target = true;
|
||||
break;
|
||||
}
|
||||
if (check_rack_load) {
|
||||
const locator::node& node = topo.get_node(r.host);
|
||||
if (node.dc_rack().dc == dc) {
|
||||
rack_load[node.dc_rack().rack] += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -555,19 +708,20 @@ public:
|
||||
}
|
||||
|
||||
// Make sure we don't increase level of duplication of racks in the replica list.
|
||||
if (!same_rack) {
|
||||
if (check_rack_load) {
|
||||
auto max_rack_load = std::max_element(rack_load.begin(), rack_load.end(),
|
||||
[] (auto& a, auto& b) { return a.second < b.second; })->second;
|
||||
auto new_rack_load = rack_load[target_node->dc_rack().rack] + 1;
|
||||
auto new_rack_load = rack_load[target_node.dc_rack().rack] + 1;
|
||||
if (new_rack_load > max_rack_load) {
|
||||
lblogger.debug("candidate tablet {} skipped because it would increase load on rack {} to {}, max={}",
|
||||
source_tablet, target_node->dc_rack().rack, new_rack_load, max_rack_load);
|
||||
source_tablet, target_node.dc_rack().rack, new_rack_load, max_rack_load);
|
||||
_stats.for_dc(dc).tablets_skipped_rack++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
auto dst = global_shard_id {target, target_load.next_shard(target)};
|
||||
auto& target_load_sketch = co_await target_info.get_load_sketch(_tm);
|
||||
auto dst = global_shard_id {target, target_load_sketch.next_shard(target)};
|
||||
auto mig = tablet_migration_info {source_tablet, src, dst};
|
||||
|
||||
if (target_info.shards[dst.shard].streaming_write_load < max_write_streaming_load
|
||||
@@ -576,7 +730,7 @@ public:
|
||||
src_node_info.shards[src_shard].streaming_read_load += 1;
|
||||
lblogger.debug("Adding migration: {}", mig);
|
||||
_stats.for_dc(dc).migrations_produced++;
|
||||
plan.push_back(std::move(mig));
|
||||
plan.add(std::move(mig));
|
||||
} else {
|
||||
// Shards are overloaded with streaming. Do not include the migration in the plan, but
|
||||
// continue as if it was in the hope that we will find a migration which can be executed without
|
||||
|
||||
@@ -22,7 +22,33 @@ struct tablet_migration_info {
|
||||
locator::tablet_replica dst;
|
||||
};
|
||||
|
||||
using migration_plan = utils::chunked_vector<tablet_migration_info>;
|
||||
class migration_plan {
|
||||
public:
|
||||
using migrations_vector = utils::chunked_vector<tablet_migration_info>;
|
||||
private:
|
||||
migrations_vector _migrations;
|
||||
bool _has_nodes_to_drain = false;
|
||||
public:
|
||||
/// Returns true iff there are decommissioning nodes which own some tablet replicas.
|
||||
bool has_nodes_to_drain() const { return _has_nodes_to_drain; }
|
||||
|
||||
const migrations_vector& migrations() const { return _migrations; }
|
||||
bool empty() const { return _migrations.empty(); }
|
||||
size_t size() const { return _migrations.size(); }
|
||||
|
||||
void add(tablet_migration_info info) {
|
||||
_migrations.emplace_back(std::move(info));
|
||||
}
|
||||
|
||||
void merge(migration_plan&& other) {
|
||||
std::move(other._migrations.begin(), other._migrations.end(), std::back_inserter(_migrations));
|
||||
_has_nodes_to_drain |= other._has_nodes_to_drain;
|
||||
}
|
||||
|
||||
void set_has_nodes_to_drain(bool b) {
|
||||
_has_nodes_to_drain = b;
|
||||
}
|
||||
};
|
||||
|
||||
class tablet_allocator_impl;
|
||||
|
||||
|
||||
@@ -79,6 +79,7 @@ static std::unordered_map<topology::transition_state, sstring> transition_state_
|
||||
{topology::transition_state::write_both_read_old, "write both read old"},
|
||||
{topology::transition_state::write_both_read_new, "write both read new"},
|
||||
{topology::transition_state::tablet_migration, "tablet migration"},
|
||||
{topology::transition_state::tablet_draining, "tablet draining"},
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, topology::transition_state s) {
|
||||
|
||||
@@ -97,6 +97,7 @@ struct topology_features {
|
||||
struct topology {
|
||||
enum class transition_state: uint16_t {
|
||||
commit_cdc_generation,
|
||||
tablet_draining,
|
||||
write_both_read_old,
|
||||
write_both_read_new,
|
||||
tablet_migration,
|
||||
|
||||
@@ -592,7 +592,7 @@ SEASTAR_THREAD_TEST_CASE(test_token_ownership_splitting) {
|
||||
// Reflects the plan in a given token metadata as if the migrations were fully executed.
|
||||
static
|
||||
void apply_plan(token_metadata& tm, const migration_plan& plan) {
|
||||
for (auto&& mig : plan) {
|
||||
for (auto&& mig : plan.migrations()) {
|
||||
tablet_map& tmap = tm.tablets().get_tablet_map(mig.tablet.table);
|
||||
auto tinfo = tmap.get_tablet_info(mig.tablet.tablet);
|
||||
tinfo.replicas = replace_replica(tinfo.replicas, mig.src, mig.dst);
|
||||
@@ -612,7 +612,7 @@ tablet_transition_info migration_to_transition_info(const tablet_migration_info&
|
||||
// Reflects the plan in a given token metadata as if the migrations were started but not yet executed.
|
||||
static
|
||||
void apply_plan_as_in_progress(token_metadata& tm, const migration_plan& plan) {
|
||||
for (auto&& mig : plan) {
|
||||
for (auto&& mig : plan.migrations()) {
|
||||
tablet_map& tmap = tm.tablets().get_tablet_map(mig.tablet.table);
|
||||
auto tinfo = tmap.get_tablet_info(mig.tablet.tablet);
|
||||
tmap.set_tablet_transition_info(mig.tablet.tablet, migration_to_transition_info(mig, tinfo));
|
||||
@@ -761,6 +761,335 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) {
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) {
|
||||
// Verifies that load balancer moves tablets out of the decommissioned node.
|
||||
// The scenario is such that replication factor of tablets can be satisfied after decommission.
|
||||
do_with_cql_env_thread([](auto& e) {
|
||||
inet_address ip1("192.168.0.1");
|
||||
inet_address ip2("192.168.0.2");
|
||||
inet_address ip3("192.168.0.3");
|
||||
|
||||
auto host1 = host_id(next_uuid());
|
||||
auto host2 = host_id(next_uuid());
|
||||
auto host3 = host_id(next_uuid());
|
||||
|
||||
auto table1 = table_id(next_uuid());
|
||||
|
||||
semaphore sem(1);
|
||||
shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config {
|
||||
locator::topology::config {
|
||||
.this_endpoint = ip1,
|
||||
.local_dc_rack = locator::endpoint_dc_rack::default_location
|
||||
}
|
||||
});
|
||||
|
||||
stm.mutate_token_metadata([&](auto& tm) {
|
||||
const unsigned shard_count = 2;
|
||||
|
||||
tm.update_host_id(host1, ip1);
|
||||
tm.update_host_id(host2, ip2);
|
||||
tm.update_host_id(host3, ip3);
|
||||
tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
|
||||
tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
|
||||
tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned,
|
||||
shard_count);
|
||||
|
||||
tablet_map tmap(4);
|
||||
auto tid = tmap.first_tablet();
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {host1, 0},
|
||||
tablet_replica {host2, 1},
|
||||
}
|
||||
});
|
||||
tid = *tmap.next_tablet(tid);
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {host1, 0},
|
||||
tablet_replica {host2, 1},
|
||||
}
|
||||
});
|
||||
tid = *tmap.next_tablet(tid);
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {host1, 0},
|
||||
tablet_replica {host3, 0},
|
||||
}
|
||||
});
|
||||
tid = *tmap.next_tablet(tid);
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {host2, 1},
|
||||
tablet_replica {host3, 1},
|
||||
}
|
||||
});
|
||||
tablet_metadata tmeta;
|
||||
tmeta.set_tablet_map(table1, std::move(tmap));
|
||||
tm.set_tablets(std::move(tmeta));
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
|
||||
rebalance_tablets(e.get_tablet_allocator().local(), stm);
|
||||
|
||||
{
|
||||
load_sketch load(stm.get());
|
||||
load.populate().get();
|
||||
BOOST_REQUIRE(load.get_avg_shard_load(host1) == 2);
|
||||
BOOST_REQUIRE(load.get_avg_shard_load(host2) == 2);
|
||||
BOOST_REQUIRE(load.get_avg_shard_load(host3) == 0);
|
||||
}
|
||||
|
||||
stm.mutate_token_metadata([&](auto& tm) {
|
||||
tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, node::state::left);
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
|
||||
rebalance_tablets(e.get_tablet_allocator().local(), stm);
|
||||
|
||||
{
|
||||
load_sketch load(stm.get());
|
||||
load.populate().get();
|
||||
BOOST_REQUIRE(load.get_avg_shard_load(host1) == 2);
|
||||
BOOST_REQUIRE(load.get_avg_shard_load(host2) == 2);
|
||||
BOOST_REQUIRE(load.get_avg_shard_load(host3) == 0);
|
||||
}
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) {
|
||||
// Verifies that load balancer moves tablets out of the decommissioned node.
|
||||
// The scenario is such that replication constraints of tablets can be satisfied after decommission.
|
||||
do_with_cql_env_thread([](auto& e) {
|
||||
inet_address ip1("192.168.0.1");
|
||||
inet_address ip2("192.168.0.2");
|
||||
inet_address ip3("192.168.0.3");
|
||||
inet_address ip4("192.168.0.4");
|
||||
|
||||
auto host1 = host_id(next_uuid());
|
||||
auto host2 = host_id(next_uuid());
|
||||
auto host3 = host_id(next_uuid());
|
||||
auto host4 = host_id(next_uuid());
|
||||
|
||||
std::vector<endpoint_dc_rack> racks = {
|
||||
endpoint_dc_rack{ "dc1", "rack-1" },
|
||||
endpoint_dc_rack{ "dc1", "rack-2" }
|
||||
};
|
||||
|
||||
auto table1 = table_id(next_uuid());
|
||||
|
||||
semaphore sem(1);
|
||||
shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config {
|
||||
locator::topology::config {
|
||||
.this_endpoint = ip1,
|
||||
.local_dc_rack = racks[0]
|
||||
}
|
||||
});
|
||||
|
||||
stm.mutate_token_metadata([&](auto& tm) {
|
||||
const unsigned shard_count = 1;
|
||||
|
||||
tm.update_host_id(host1, ip1);
|
||||
tm.update_host_id(host2, ip2);
|
||||
tm.update_host_id(host3, ip3);
|
||||
tm.update_host_id(host4, ip4);
|
||||
tm.update_topology(ip1, racks[0], std::nullopt, shard_count);
|
||||
tm.update_topology(ip2, racks[1], std::nullopt, shard_count);
|
||||
tm.update_topology(ip3, racks[0], std::nullopt, shard_count);
|
||||
tm.update_topology(ip4, racks[1], node::state::being_decommissioned,
|
||||
shard_count);
|
||||
|
||||
tablet_map tmap(4);
|
||||
auto tid = tmap.first_tablet();
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {host1, 0},
|
||||
tablet_replica {host2, 0},
|
||||
}
|
||||
});
|
||||
tid = *tmap.next_tablet(tid);
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {host2, 0},
|
||||
tablet_replica {host3, 0},
|
||||
}
|
||||
});
|
||||
tid = *tmap.next_tablet(tid);
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {host3, 0},
|
||||
tablet_replica {host4, 0},
|
||||
}
|
||||
});
|
||||
tid = *tmap.next_tablet(tid);
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {host1, 0},
|
||||
tablet_replica {host2, 0},
|
||||
}
|
||||
});
|
||||
tablet_metadata tmeta;
|
||||
tmeta.set_tablet_map(table1, std::move(tmap));
|
||||
tm.set_tablets(std::move(tmeta));
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
|
||||
rebalance_tablets(e.get_tablet_allocator().local(), stm);
|
||||
|
||||
{
|
||||
load_sketch load(stm.get());
|
||||
load.populate().get();
|
||||
BOOST_REQUIRE(load.get_avg_shard_load(host1) >= 2);
|
||||
BOOST_REQUIRE(load.get_avg_shard_load(host2) >= 2);
|
||||
BOOST_REQUIRE(load.get_avg_shard_load(host3) >= 2);
|
||||
BOOST_REQUIRE(load.get_avg_shard_load(host4) == 0);
|
||||
}
|
||||
|
||||
// Verify replicas are not collocated on racks
|
||||
{
|
||||
auto tm = stm.get();
|
||||
auto& tmap = tm->tablets().get_tablet_map(table1);
|
||||
tmap.for_each_tablet([&](auto tid, auto& tinfo) {
|
||||
auto rack1 = tm->get_topology().get_rack(tinfo.replicas[0].host);
|
||||
auto rack2 = tm->get_topology().get_rack(tinfo.replicas[1].host);
|
||||
BOOST_REQUIRE(rack1 != rack2);
|
||||
}).get();
|
||||
}
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_decommission_rack_load_failure) {
|
||||
// Verifies that load balancer moves tablets out of the decommissioned node.
|
||||
// The scenario is such that it is impossible to distribute replicas without violating rack uniqueness.
|
||||
do_with_cql_env_thread([](auto& e) {
|
||||
inet_address ip1("192.168.0.1");
|
||||
inet_address ip2("192.168.0.2");
|
||||
inet_address ip3("192.168.0.3");
|
||||
inet_address ip4("192.168.0.4");
|
||||
|
||||
auto host1 = host_id(next_uuid());
|
||||
auto host2 = host_id(next_uuid());
|
||||
auto host3 = host_id(next_uuid());
|
||||
auto host4 = host_id(next_uuid());
|
||||
|
||||
std::vector<endpoint_dc_rack> racks = {
|
||||
endpoint_dc_rack{ "dc1", "rack-1" },
|
||||
endpoint_dc_rack{ "dc1", "rack-2" }
|
||||
};
|
||||
|
||||
auto table1 = table_id(next_uuid());
|
||||
|
||||
semaphore sem(1);
|
||||
shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config {
|
||||
locator::topology::config {
|
||||
.this_endpoint = ip1,
|
||||
.local_dc_rack = racks[0]
|
||||
}
|
||||
});
|
||||
|
||||
stm.mutate_token_metadata([&](auto& tm) {
|
||||
const unsigned shard_count = 1;
|
||||
|
||||
tm.update_host_id(host1, ip1);
|
||||
tm.update_host_id(host2, ip2);
|
||||
tm.update_host_id(host3, ip3);
|
||||
tm.update_host_id(host4, ip4);
|
||||
tm.update_topology(ip1, racks[0], std::nullopt, shard_count);
|
||||
tm.update_topology(ip2, racks[0], std::nullopt, shard_count);
|
||||
tm.update_topology(ip3, racks[0], std::nullopt, shard_count);
|
||||
tm.update_topology(ip4, racks[1], node::state::being_decommissioned,
|
||||
shard_count);
|
||||
|
||||
tablet_map tmap(4);
|
||||
auto tid = tmap.first_tablet();
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {host1, 0},
|
||||
tablet_replica {host4, 0},
|
||||
}
|
||||
});
|
||||
tid = *tmap.next_tablet(tid);
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {host2, 0},
|
||||
tablet_replica {host4, 0},
|
||||
}
|
||||
});
|
||||
tid = *tmap.next_tablet(tid);
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {host3, 0},
|
||||
tablet_replica {host4, 0},
|
||||
}
|
||||
});
|
||||
tid = *tmap.next_tablet(tid);
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {host1, 0},
|
||||
tablet_replica {host4, 0},
|
||||
}
|
||||
});
|
||||
tablet_metadata tmeta;
|
||||
tmeta.set_tablet_map(table1, std::move(tmap));
|
||||
tm.set_tablets(std::move(tmeta));
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
|
||||
BOOST_REQUIRE_THROW(rebalance_tablets(e.get_tablet_allocator().local(), stm), std::runtime_error);
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_decommission_rf_not_met) {
|
||||
// Verifies that load balancer moves tablets out of the decommissioned node.
|
||||
// The scenario is such that replication factor of tablets can be satisfied after decommission.
|
||||
do_with_cql_env_thread([](auto& e) {
|
||||
inet_address ip1("192.168.0.1");
|
||||
inet_address ip2("192.168.0.2");
|
||||
inet_address ip3("192.168.0.3");
|
||||
|
||||
auto host1 = host_id(next_uuid());
|
||||
auto host2 = host_id(next_uuid());
|
||||
auto host3 = host_id(next_uuid());
|
||||
|
||||
auto table1 = table_id(next_uuid());
|
||||
|
||||
semaphore sem(1);
|
||||
shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config {
|
||||
locator::topology::config {
|
||||
.this_endpoint = ip1,
|
||||
.local_dc_rack = locator::endpoint_dc_rack::default_location
|
||||
}
|
||||
});
|
||||
|
||||
stm.mutate_token_metadata([&](auto& tm) {
|
||||
const unsigned shard_count = 2;
|
||||
|
||||
tm.update_host_id(host1, ip1);
|
||||
tm.update_host_id(host2, ip2);
|
||||
tm.update_host_id(host3, ip3);
|
||||
tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
|
||||
tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
|
||||
tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned,
|
||||
shard_count);
|
||||
|
||||
tablet_map tmap(1);
|
||||
auto tid = tmap.first_tablet();
|
||||
tmap.set_tablet(tid, tablet_info {
|
||||
tablet_replica_set {
|
||||
tablet_replica {host1, 0},
|
||||
tablet_replica {host2, 0},
|
||||
tablet_replica {host3, 0},
|
||||
}
|
||||
});
|
||||
tablet_metadata tmeta;
|
||||
tmeta.set_tablet_map(table1, std::move(tmap));
|
||||
tm.set_tablets(std::move(tmeta));
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
|
||||
BOOST_REQUIRE_THROW(rebalance_tablets(e.get_tablet_allocator().local(), stm), std::runtime_error);
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) {
|
||||
do_with_cql_env_thread([] (auto& e) {
|
||||
// Tests the scenario of bootstrapping a single node.
|
||||
|
||||
@@ -143,7 +143,7 @@ async def test_table_drop_with_auto_snapshot(manager: ManagerClient):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bootstrap(manager: ManagerClient):
|
||||
async def test_topology_changes(manager: ManagerClient):
|
||||
logger.info("Bootstrapping cluster")
|
||||
servers = [await manager.server_add(), await manager.server_add(), await manager.server_add()]
|
||||
|
||||
@@ -178,4 +178,8 @@ async def test_bootstrap(manager: ManagerClient):
|
||||
time.sleep(5) # Give load balancer some time to do work
|
||||
await check()
|
||||
|
||||
await manager.decommission_node(servers[0].server_id)
|
||||
|
||||
await check()
|
||||
|
||||
await cql.run_async("DROP KEYSPACE test;")
|
||||
|
||||
Reference in New Issue
Block a user