From e338679266dc2a9199086095d3c35231799fe380 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 25 Jul 2023 00:00:20 +0200 Subject: [PATCH 01/10] tablets: Add formatter for tablet_migration_info --- service/tablet_allocator.cc | 5 +++++ service/tablet_allocator.hh | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 3b7359b574..275399c2ef 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -502,3 +502,8 @@ tablet_allocator_impl& tablet_allocator::impl() { } } + +auto fmt::formatter::format(const service::tablet_migration_info& mig, fmt::format_context& ctx) const + -> decltype(ctx.out()) { + return fmt::format_to(ctx.out(), "{{tablet: {}, src: {}, dst: {}}}", mig.tablet, mig.src, mig.dst); +} diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index a6546de921..90d0a08a53 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -61,3 +61,8 @@ public: }; } + +template <> +struct fmt::formatter : fmt::formatter { + auto format(const service::tablet_migration_info&, fmt::format_context& ctx) const -> decltype(ctx.out()); +}; From 2811b1df0a8f4b2e6868ccac28fd90147c2165da Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sun, 16 Jul 2023 23:26:02 +0200 Subject: [PATCH 02/10] topology_coordinator: Fix missed notification on abort If _as is aborted while the coordinator is in the middle of handling, and decides to go to sleep, it may go to sleep without noticing that it was aborted. Fix by checking before blocking on the condition variable. In general, every condition which can cause signal() should be checked before when(). This patch doesn't fix all the cases. For example, signal() can be called when there arrives a new topology request. This can happen after the coordinator checked because it releases the guard before calling when(). --- service/storage_service.cc | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 83066bc5ab..d2a86544f0 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1744,6 +1744,11 @@ class topology_coordinator { // Returns true if the state machine was transitioned into tablet migration path. future maybe_start_tablet_migration(group0_guard); + + future<> await_event() { + _as.check(); + co_await _topo_sm.event.when(); + } public: topology_coordinator( sharded& sys_dist_ks, @@ -1817,7 +1822,7 @@ future<> topology_coordinator::run() { if (!had_work) { // Nothing to work on. Wait for topology change event. slogger.trace("raft topology: topology coordinator fiber has nothing to do. Sleeping."); - co_await _topo_sm.event.when(); + co_await await_event(); slogger.trace("raft topology: topology coordinator fiber got an event"); } } catch (raft::request_aborted&) { From 889f2ceb1ebeced7539de8ec73249772134dc636 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sun, 16 Jul 2023 23:23:57 +0200 Subject: [PATCH 03/10] storage_service: Advance tablets independently This change makes the topology state machine advance each tablet independently which allows them to finish migrations at different speeds, not at the speed of the slowest tablet. It will also open the possibility of starting new transitions concurrently with already active ones. This is implemented by having a single transition state "tablet migration", and handling it by scanning all the transitions and advancing tablet state machines. Updates and barriers are batched for all tablets in each cycle. One complication is the tracking of streaming sessions. The operations are no longer nested in the scope of a single handle method, and cannot be waited on explicitly, as that would inhibit progress of the coordinator, which starts later migrations. They live as independent fibers, which associated with tablets in a transient data structure which lives within the coordinator instance. This data structure is consulted for a given tablet in each cycle of the handle_tablet_migration() pump to check if streaming has finished and we can move the tablet to the next stage. If the pump has no work, only then it waits for any streaming to finish by blocking on the _topo_sm.event. --- service/storage_service.cc | 285 +++++++++++++++++------------- service/topology_state_machine.cc | 7 +- service/topology_state_machine.hh | 9 +- 3 files changed, 161 insertions(+), 140 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index d2a86544f0..7257cd1bf9 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -388,17 +388,7 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s return read_new_t::no; } switch (*state) { - case topology::transition_state::tablet_allow_write_both_read_old: - [[fallthrough]]; - case topology::transition_state::tablet_write_both_read_new: - [[fallthrough]]; - case topology::transition_state::tablet_write_both_read_old: - [[fallthrough]]; - case topology::transition_state::tablet_streaming: - [[fallthrough]]; - case topology::transition_state::tablet_use_new: - [[fallthrough]]; - case topology::transition_state::tablet_cleanup: + case topology::transition_state::tablet_migration: [[fallthrough]]; case topology::transition_state::commit_cdc_generation: [[fallthrough]]; @@ -1181,116 +1171,170 @@ class topology_coordinator { co_return std::move(guard); } - future<> set_tablet_transition_stage(std::vector& out, group0_guard& guard, - locator::tablet_transition_stage stage) { + // Represents a two-state state machine which changes monotonically + // from "not executed" to "executed successfully". This state + // machine is transient, lives only on this coordinator. + // The transition is achieved by execution of an idempotent async + // operation which is tracked by a future. Even though the async + // action is idempotent, it is costly, so we want to avoid + // re-executing it if it was already started by this coordinator, + // that's why we track it. + using background_action_holder = std::optional>; + + // Transient state of tablet migration which lives on this coordinator. + // It is guaranteed to die when migration is finished. + // Next migration of the same tablet is guaranteed to use a different instance. + struct tablet_migration_state { + background_action_holder streaming; + }; + + std::unordered_map _tablets; + + // Set to true when any action started on behalf of a background_action_holder + // for any tablet finishes, or fails and needs to be restarted. + bool _tablets_ready = false; + + seastar::gate _async_gate; + + // This function drives background_action_holder towards "executed successfully" + // by starting the action if it is not already running or if the previous instance + // of the action failed. If the action is already running, it does nothing. + // Returns true iff background_action_holder reached the "executed successfully" state. + bool advance_in_background(locator::global_tablet_id gid, background_action_holder& holder, const char* name, + std::function()> action) { + if (!holder || holder->failed()) { + holder = futurize_invoke(action) + .finally([this, g = _async_gate.hold(), gid, name] () noexcept { + slogger.trace("raft topology: {} for tablet {} resolved.", name, gid); + _tablets_ready = true; + _topo_sm.event.broadcast(); + }); + return false; + } + + if (!holder->available()) { + slogger.trace("raft topology: Tablet {} still doing {}", gid, name); + return false; + } + + return true; + } + + future<> for_each_tablet_transition(std::function func) { auto tm = get_token_metadata_ptr(); for (auto&& [table, tmap] : tm->tablets().all_tables()) { co_await coroutine::maybe_yield(); auto s = _db.find_schema(table); - for (auto&& [tablet, trinfo] : tmap.transitions()) { + for (auto&& [tablet, trinfo]: tmap.transitions()) { co_await coroutine::maybe_yield(); - auto last_token = tmap.get_last_token(tablet); - out.emplace_back( + auto gid = locator::global_tablet_id {table, tablet}; + func(tmap, s, gid, trinfo); + } + } + } + + future<> handle_tablet_migration(group0_guard guard) { + // This step acts like a pump which advances state machines of individual tablets, + // batching barriers and group0 updates. + // If progress cannot be made, e.g. because all transitions are streaming, we block + // and wait for notification. + + slogger.trace("raft topology: handle_tablet_migration()"); + std::vector updates; + bool needs_barrier = false; + bool has_transitions = false; + + _tablets_ready = false; + co_await for_each_tablet_transition([&] (const locator::tablet_map& tmap, + schema_ptr s, + locator::global_tablet_id gid, + const locator::tablet_transition_info& trinfo) { + has_transitions = true; + auto last_token = tmap.get_last_token(gid.tablet); + auto& tablet_state = _tablets[gid]; + table_id table = s->id(); + + auto transition_to = [&] (locator::tablet_transition_stage stage) { + slogger.trace("raft topology: Will set tablet {} stage to {}", gid, stage); + updates.emplace_back( replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table) .set_stage(last_token, stage) .build()); + }; + + auto transition_to_with_barrier = [&] (locator::tablet_transition_stage stage) { + needs_barrier = true; + transition_to(stage); + }; + + switch (trinfo.stage) { + case locator::tablet_transition_stage::allow_write_both_read_old: + transition_to_with_barrier(locator::tablet_transition_stage::write_both_read_old); + break; + case locator::tablet_transition_stage::write_both_read_old: + transition_to_with_barrier(locator::tablet_transition_stage::streaming); + break; + // The state "streaming" is needed to ensure that stale stream_tablet() RPC doesn't + // get admitted before global_tablet_token_metadata_barrier() is finished for earlier + // stage in case of coordinator failover. + case locator::tablet_transition_stage::streaming: + if (advance_in_background(gid, tablet_state.streaming, "streaming", [&] { + slogger.info("raft topology: Initiating tablet streaming of {} to {}", gid, trinfo.pending_replica); + auto dst = trinfo.pending_replica.host; + return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging, + netw::msg_addr(id2ip(dst)), _as, gid); + })) { + transition_to(locator::tablet_transition_stage::write_both_read_new); + } + break; + case locator::tablet_transition_stage::write_both_read_new: + transition_to_with_barrier(locator::tablet_transition_stage::use_new); + break; + case locator::tablet_transition_stage::use_new: + transition_to_with_barrier(locator::tablet_transition_stage::cleanup); + break; + case locator::tablet_transition_stage::cleanup: + // FIXME: Actually perform the cleanup. Block on integration with compaction groups. + _tablets.erase(gid); + updates.emplace_back( + replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table) + .del_transition(last_token) + .set_replicas(last_token, trinfo.next) + .build()); + break; } - } - } + }); - future<> del_tablet_transitions(std::vector& out, group0_guard& guard) { - auto tm = get_token_metadata_ptr(); - for (auto&& [table, tmap] : tm->tablets().all_tables()) { - co_await coroutine::maybe_yield(); - auto s = _db.find_schema(table); - for (auto&& [tablet, trinfo] : tmap.transitions()) { - co_await coroutine::maybe_yield(); - auto last_token = tmap.get_last_token(tablet); - out.emplace_back( - replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table) - .del_transition(last_token) - .set_replicas(last_token, trinfo.next) - .build()); - } - } - } - - future<> transition_tablets_to_stage(group0_guard guard, locator::tablet_transition_stage stage, topology::transition_state topo_state) { - std::vector updates; - co_await set_tablet_transition_stage(updates, guard, stage); - updates.emplace_back( - topology_mutation_builder(guard.write_timestamp()) - .set_transition_state(topo_state) - .set_version(_topo_sm._topology.version + 1) - .build()); - co_await update_topology_state(std::move(guard), std::move(updates), format("Moved tablet migration to stage: {}", stage)); - } - - future<> handle_tablet_allow_write_both_read_old(group0_guard guard) { - guard = co_await global_tablet_token_metadata_barrier(std::move(guard)); - co_await transition_tablets_to_stage(std::move(guard), - locator::tablet_transition_stage::write_both_read_old, - topology::transition_state::tablet_write_both_read_old); - } - - future<> handle_tablet_write_both_read_old(group0_guard guard) { - guard = co_await global_tablet_token_metadata_barrier(std::move(guard)); - co_await transition_tablets_to_stage(std::move(guard), - locator::tablet_transition_stage::streaming, - topology::transition_state::tablet_streaming); - } - - // The state "streaming" is needed to ensure that stale stream_tablet() RPC doesn't - // get admitted before global_tablet_token_metadata_barrier() is finished in - // handle_tablet_write_both_read_old(). - future<> handle_tablet_streaming(group0_guard guard) { - std::vector> calls; - auto tm = get_token_metadata_ptr(); - for (auto&& [table, tmap] : tm->tablets().all_tables()) { - co_await coroutine::maybe_yield(); - for (auto&& [tablet, trinfo] : tmap.transitions()) { - co_await coroutine::maybe_yield(); - auto gid = locator::global_tablet_id{table, tablet}; - auto dst = trinfo.pending_replica.host; - slogger.info("Initiating tablet streaming of {} to {}", gid, trinfo.pending_replica); - calls.emplace_back(futurize_invoke([&] { - return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging, - netw::msg_addr(id2ip(dst)), _as, gid); - }).handle_exception([gid, dst](auto ep) { - slogger.error("Failed to stream tablet {} to {}: {}", gid, dst, ep); - std::rethrow_exception(ep); - })); - } + if (needs_barrier) { + guard = co_await global_tablet_token_metadata_barrier(std::move(guard)); } - release_guard(std::move(guard)); - co_await seastar::when_all_succeed(calls.begin(), calls.end()); + // It's ok to execute planned updates after retaking the guard because as long + // as topology is in tablet_migration state only this coordinator has a right + // to advance the state machine of tablets. - guard = co_await start_operation(); - co_await transition_tablets_to_stage(std::move(guard), - locator::tablet_transition_stage::write_both_read_new, - topology::transition_state::tablet_write_both_read_new); - } + if (!updates.empty()) { + updates.emplace_back( + topology_mutation_builder(guard.write_timestamp()) + .set_version(_topo_sm._topology.version + 1) + .build()); + co_await update_topology_state(std::move(guard), std::move(updates), format("Tablet migration")); + co_return; + } - future<> handle_tablet_write_both_read_new(group0_guard guard) { - guard = co_await global_tablet_token_metadata_barrier(std::move(guard)); - co_await transition_tablets_to_stage(std::move(guard), - locator::tablet_transition_stage::use_new, - topology::transition_state::tablet_use_new); - } + if (has_transitions) { + // Streaming may have finished after we checked. To avoid missed notification, we need + // to check atomically with event.wait() + if (!_tablets_ready) { + slogger.trace("raft topology: Going to sleep with active tablet transitions"); + co_await await_event(); + } + co_return; + } - future<> handle_tablet_use_new(group0_guard guard) { - guard = co_await global_tablet_token_metadata_barrier(std::move(guard)); - co_await transition_tablets_to_stage(std::move(guard), - locator::tablet_transition_stage::cleanup, - topology::transition_state::tablet_cleanup); - } - - future<> handle_tablet_cleanup(group0_guard guard) { - // FIXME: Actually perform cleanup - - std::vector updates; - co_await del_tablet_transitions(updates, guard); updates.emplace_back( topology_mutation_builder(guard.write_timestamp()) .del_transition_state() @@ -1531,24 +1575,8 @@ class topology_coordinator { // Reads are fenced. We can now remove topology::transition_state and move node state to normal } break; - case topology::transition_state::tablet_allow_write_both_read_old: - co_await handle_tablet_allow_write_both_read_old(std::move(guard)); - break; - case topology::transition_state::tablet_write_both_read_old: - co_await handle_tablet_write_both_read_old(std::move(guard)); - break; - case topology::transition_state::tablet_streaming: - co_await handle_tablet_streaming(std::move(guard)); - break; - case topology::transition_state::tablet_write_both_read_new: - co_await handle_tablet_write_both_read_new(std::move(guard)); - break; - case topology::transition_state::tablet_use_new: - co_await handle_tablet_use_new(std::move(guard)); - break; - case topology::transition_state::tablet_cleanup: - co_await handle_tablet_cleanup(std::move(guard)); - break; + case topology::transition_state::tablet_migration: + co_await handle_tablet_migration(std::move(guard)); } co_return true; }; @@ -1797,7 +1825,7 @@ future topology_coordinator::maybe_start_tablet_migration(group0_guard gua updates.emplace_back( topology_mutation_builder(guard.write_timestamp()) - .set_transition_state(topology::transition_state::tablet_allow_write_both_read_old) + .set_transition_state(topology::transition_state::tablet_migration) .set_version(_topo_sm._topology.version + 1) .build()); @@ -1838,10 +1866,15 @@ future<> topology_coordinator::run() { sleep = true; } if (sleep) { - co_await seastar::sleep_abortable(std::chrono::seconds(1), _as); + try { + co_await seastar::sleep_abortable(std::chrono::seconds(1), _as); + } catch (...) { + slogger.debug("raft topology: sleep failed: {}", std::current_exception()); + } } co_await coroutine::maybe_yield(); } + co_await _async_gate.close(); } future<> storage_service::raft_state_monitor_fiber(raft::server& raft, cdc::generation_service& cdc_gen_svc, sharded& sys_dist_ks) { diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index edd48e5410..a0ebc78b80 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -45,12 +45,7 @@ static std::unordered_map transition_state_ {topology::transition_state::publish_cdc_generation, "publish cdc generation"}, {topology::transition_state::write_both_read_old, "write both read old"}, {topology::transition_state::write_both_read_new, "write both read new"}, - {topology::transition_state::tablet_allow_write_both_read_old, "tablet allow write both read old"}, - {topology::transition_state::tablet_write_both_read_old, "tablet write both read old"}, - {topology::transition_state::tablet_write_both_read_new, "tablet write both read new"}, - {topology::transition_state::tablet_streaming, "tablet streaming"}, - {topology::transition_state::tablet_use_new, "tablet use new"}, - {topology::transition_state::tablet_cleanup, "tablet cleanup"}, + {topology::transition_state::tablet_migration, "tablet migration"}, }; std::ostream& operator<<(std::ostream& os, topology::transition_state s) { diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index a04bf14750..85e8637d69 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -73,14 +73,7 @@ struct topology { publish_cdc_generation, write_both_read_old, write_both_read_new, - - // Tablet migration steps - tablet_allow_write_both_read_old, - tablet_write_both_read_old, - tablet_write_both_read_new, - tablet_streaming, - tablet_use_new, - tablet_cleanup, + tablet_migration, }; std::optional tstate; From 18a59ab5ff51c4990b568dcd496367ac2b4b1f8e Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 24 Jul 2023 23:42:28 +0200 Subject: [PATCH 04/10] locator: tablets: Move std::hash definition earlier Will be needed in order to define a struct which has unordered_set as a field. --- locator/tablets.hh | 70 ++++++++++++++++++++++------------------------ 1 file changed, 33 insertions(+), 37 deletions(-) diff --git a/locator/tablets.hh b/locator/tablets.hh index b40df63b59..9032645e00 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -45,21 +45,6 @@ struct tablet_id { bool operator<=>(const tablet_id&) const = default; }; -} - -namespace std { - -template<> -struct hash { - size_t operator()(const locator::tablet_id& id) const { - return std::hash()(id.value()); - } -}; - -} - -namespace locator { - /// Identifies tablet (not be confused with tablet replica) in the scope of the whole cluster. struct global_tablet_id { table_id table; @@ -80,6 +65,39 @@ std::ostream& operator<<(std::ostream&, const tablet_replica&); using tablet_replica_set = utils::small_vector; +} + +namespace std { + +template<> +struct hash { + size_t operator()(const locator::tablet_id& id) const { + return std::hash()(id.value()); + } +}; + +template<> +struct hash { + size_t operator()(const locator::tablet_replica& r) const { + return utils::hash_combine( + std::hash()(r.host), + std::hash()(r.shard)); + } +}; + +template<> +struct hash { + size_t operator()(const locator::global_tablet_id& id) const { + return utils::hash_combine( + std::hash()(id.table), + std::hash()(id.tablet)); + } +}; + +} + +namespace locator { + /// Creates a new replica set with old_replica replaced by new_replica. /// If there is no old_replica, the set is returned unchanged. inline @@ -309,28 +327,6 @@ public: } -namespace std { - -template<> -struct hash { - size_t operator()(const locator::tablet_replica& r) const { - return utils::hash_combine( - std::hash()(r.host), - std::hash()(r.shard)); - } -}; - -template<> -struct hash { - size_t operator()(const locator::global_tablet_id& id) const { - return utils::hash_combine( - std::hash()(id.table), - std::hash()(id.tablet)); - } -}; - -} - template <> struct fmt::formatter : fmt::formatter { auto format(const locator::tablet_transition_stage&, fmt::format_context& ctx) const -> decltype(ctx.out()); From fbc6076e6a9f68b4133dfc63e6bd74b7d4fe731c Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 24 Jul 2023 23:46:23 +0200 Subject: [PATCH 05/10] storage_service, tablets: Move get_leaving_replica() to tablets.cc For better encapsulation of tablet-specific code. --- locator/tablets.cc | 14 ++++++++++++++ locator/tablets.hh | 3 +++ service/storage_service.cc | 17 +---------------- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/locator/tablets.cc b/locator/tablets.cc index 8889f74447..b05e332d3e 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -75,6 +75,20 @@ tablet_transition_info::tablet_transition_info(tablet_transition_stage stage, ta , reads(get_selector_for_reads(stage)) { } +tablet_replica get_leaving_replica(const tablet_info& tinfo, const tablet_transition_info& trinfo) { + std::unordered_set leaving(tinfo.replicas.begin(), tinfo.replicas.end()); + for (auto&& r : trinfo.next) { + leaving.erase(r); + } + if (leaving.empty()) { + throw std::runtime_error(format("No leaving replicas")); + } + if (leaving.size() > 1) { + throw std::runtime_error(format("More than one leaving replica")); + } + return *leaving.begin(); +} + const tablet_map& tablet_metadata::get_tablet_map(table_id id) const { try { return _tablets.at(id); diff --git a/locator/tablets.hh b/locator/tablets.hh index 9032645e00..4948a7be9e 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -168,6 +168,9 @@ struct tablet_transition_info { bool operator==(const tablet_transition_info&) const = default; }; +// Returns the leaving replica for a given transition. +tablet_replica get_leaving_replica(const tablet_info&, const tablet_transition_info&); + /// Stores information about tablets of a single table. /// /// The map contains a constant number of tablets, tablet_count(). diff --git a/service/storage_service.cc b/service/storage_service.cc index 7257cd1bf9..f17e7c4d42 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5611,21 +5611,6 @@ future<> storage_service::update_fence_version(token_metadata::version_t new_ver }); } -static -locator::tablet_replica get_leaving_replica(const locator::tablet_info& tinfo, const locator::tablet_transition_info& trinfo) { - std::unordered_set leaving(tinfo.replicas.begin(), tinfo.replicas.end()); - for (auto&& r : trinfo.next) { - leaving.erase(r); - } - if (leaving.empty()) { - throw std::runtime_error(format("No leaving replicas")); - } - if (leaving.size() > 1) { - throw std::runtime_error(format("More than one leaving replica")); - } - return *leaving.begin(); -} - inet_address storage_service::host2ip(locator::host_id host) { auto ip = _group0->address_map().find(raft::server_id(host.uuid())); if (!ip) { @@ -5660,7 +5645,7 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { auto& tinfo = tmap.get_tablet_info(tablet.tablet); auto range = tmap.get_token_range(tablet.tablet); - locator::tablet_replica leaving_replica = get_leaving_replica(tinfo, *trinfo); + locator::tablet_replica leaving_replica = locator::get_leaving_replica(tinfo, *trinfo); if (leaving_replica.host == tm->get_my_id()) { // The algorithm doesn't work with tablet migration within the same node because // it assumes there is only one tablet replica, picked by the sharder, on local node. From c9ea215ce122634a794b6f7f282f4bf58dfdc451 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 24 Jul 2023 23:52:59 +0200 Subject: [PATCH 06/10] storage_service, tablets: Extract generate_migration_updates() --- service/storage_service.cc | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index f17e7c4d42..dda8f212f1 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1236,6 +1236,28 @@ class topology_coordinator { } } + void generate_migration_update(std::vector& out, const group0_guard& guard, const tablet_migration_info& mig) { + auto s = _db.find_schema(mig.tablet.table); + auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(mig.tablet.table); + auto last_token = tmap.get_last_token(mig.tablet.tablet); + if (tmap.get_tablet_transition_info(mig.tablet.tablet)) { + slogger.warn("Tablet already in transition, ignoring migration: {}", mig); + return; + } + out.emplace_back( + replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), mig.tablet.table) + .set_new_replicas(last_token, replace_replica(tmap.get_tablet_info(mig.tablet.tablet).replicas, mig.src, mig.dst)) + .set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old) + .build()); + } + + future<> generate_migration_updates(std::vector& out, const group0_guard& guard, const migration_plan& plan) { + for (const tablet_migration_info& mig : plan) { + co_await coroutine::maybe_yield(); + generate_migration_update(out, guard, mig); + } + } + future<> handle_tablet_migration(group0_guard guard) { // This step acts like a pump which advances state machines of individual tablets, // batching barriers and group0 updates. @@ -1807,21 +1829,7 @@ future topology_coordinator::maybe_start_tablet_migration(group0_guard gua std::vector updates; - for (const tablet_migration_info& mig : plan) { - co_await coroutine::maybe_yield(); - auto s = _db.find_schema(mig.tablet.table); - auto& tmap = tm->tablets().get_tablet_map(mig.tablet.table); - auto last_token = tmap.get_last_token(mig.tablet.tablet); - if (tmap.get_tablet_transition_info(mig.tablet.tablet)) { - slogger.warn("Tablet {} is already in transition, ignoring migration", mig.tablet); - continue; - } - updates.emplace_back( - replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), mig.tablet.table) - .set_new_replicas(last_token, replace_replica(tmap.get_tablet_info(mig.tablet.tablet).replicas, mig.src, mig.dst)) - .set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old) - .build()); - } + co_await generate_migration_updates(updates, guard, plan); updates.emplace_back( topology_mutation_builder(guard.write_timestamp()) From fe181b3bac79422fc606d52bc60380efb7c77be5 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 24 Jul 2023 23:55:27 +0200 Subject: [PATCH 07/10] tablets: Balance tablets concurrently with active migrations After this change, the load balancer can make progress with active migrations. If the algorithm is called with active tablet migrations in tablet metadata, those are treated by load balancer as if they were already completed. This allows the algorithm to incrementally make decision which when executed with active migrations will produce the desired result. Overload of shards is limited by the fact that the algorithm tracks streaming concurrency on both source and target shards of active migrations and takes concurrency limit into account when producing new migrations. The coordinator executes the load balancer on edges of tablet state machine stransitions. This allows new migrations to be started as soon as tablets finish streaming. The load balancer is also continuously invoked as long as it produces a non-empty plan. This is in order to saturate the cluster with streaming. A single make_plan() call is still not saturating, due to the way algorithm is implemented. --- docs/dev/topology-over-raft.md | 33 +++---- locator/load_sketch.hh | 17 +++- locator/tablets.cc | 14 +++ locator/tablets.hh | 8 ++ service/storage_service.cc | 25 +++++ service/tablet_allocator.cc | 161 +++++++++++++++++++++++++++------ service/tablet_allocator.hh | 7 ++ 7 files changed, 214 insertions(+), 51 deletions(-) diff --git a/docs/dev/topology-over-raft.md b/docs/dev/topology-over-raft.md index 1a17f90a3a..39199fd936 100644 --- a/docs/dev/topology-over-raft.md +++ b/docs/dev/topology-over-raft.md @@ -78,20 +78,20 @@ check if we need to rebalance. If so, it computes an incremental tablet migratio plan, persists it by moving tablets into transitional states, and moves the state machine into the tablet migration track. All this happens atomically form the perspective of group0 state machine. + +The tablet migration track also invokes the load balancer and starts new migrations +to keep the cluster saturated with streaming. The load balancer is invoked +on transition of tablet stages, and also continuously as long as it generates +new migrations. + +If there is a pending topology change request, the load balancer +will not be invoked to allow for current migrations to drain, after which the +state machine will exit the tablet migration track and allow pending topology +operation to start. + The tablet migration track excludes with other topology changes, so node operations -will have to wait for the plan to finish before they can take over the state machine. - -The tablet balancing track migrates a small bunch of tablets, decided by the -loaded balancer, and then moves back the state machine to the idle state. -This gives other topology changes a chance to start, and if there aren't any, the -load balancer will be called again to check the conditions. This way -we can avoid blocking topology changes for too long, but also drive the cluster -to eventually achieve balance in the absence of other requests. - -The load balancer is always invoked with no pending tablet migrations. This -allows for simplicity in the implementation, but may lead to underutilization -of cluster resources if different tablets migrate with different speeds, -and thus limit the speed of load balancing. +will have to wait for tablet migration track to finish before they can take over +the state machine. The reason why the load balancer is part of the main state machine and excludes with other topology changes is that we want to share the infrastructure for fencing between vnode-based topology @@ -101,10 +101,7 @@ don't interfere with each other. The simplest is to make them part of the same s When the topology state machine is not in the tablet_migration track, it is guaranteed that there are no tablet transitions in the system. -Currently, all tablets in a batch decided by the load balancer are migrated in parallel and -their state machines are advanced at the same time. This means that streaming has to complete -for all tablets in a batch before any of them can move to the next phase. This is suboptimal -and will be changed later to allow for independent transitions. +Tablets are migrated in parallel and independently. # Tablet migration @@ -115,7 +112,7 @@ these properties of a tablet: - stage: determines which replicas should be used by requests on the coordinator side, and which action should be taken by the state machine executor. -Currently, the tablet state machine is driven forward by the tablet balancing track of the +Currently, the tablet state machine is driven forward by the tablet migration track of the topology state machine. The "stage" serves two major purposes: diff --git a/locator/load_sketch.hh b/locator/load_sketch.hh index 850247233a..df4ac4bbfe 100644 --- a/locator/load_sketch.hh +++ b/locator/load_sketch.hh @@ -57,6 +57,13 @@ class load_sketch { }; std::unordered_map _nodes; token_metadata_ptr _tm; +private: + tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const { + // We reflect migrations in the load as if they already happened, + // optimistically assuming that they will succeed. + return trinfo ? trinfo->next : ti.replicas; + } + public: load_sketch(token_metadata_ptr tm) : _tm(std::move(tm)) { @@ -65,10 +72,10 @@ public: future<> populate(std::optional host = std::nullopt) { const topology& topo = _tm->get_topology(); co_await utils::clear_gently(_nodes); - for (auto&& [table, tmap] : _tm->tablets().all_tables()) { - for (const tablet_info& ti : tmap.tablets()) { - co_await coroutine::maybe_yield(); - for (auto&& replica : ti.replicas) { + for (auto&& [table, tmap_] : _tm->tablets().all_tables()) { + auto& tmap = tmap_; + co_await tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& ti) { + for (auto&& replica : get_replicas_for_tablet_load(ti, tmap.get_tablet_transition_info(tid))) { if (host && *host != replica.host) { continue; } @@ -80,7 +87,7 @@ public: n._shards[replica.shard].load += 1; } } - } + }); } for (auto&& n : _nodes) { std::make_heap(n.second._shards.begin(), n.second._shards.end(), shard_load_cmp()); diff --git a/locator/tablets.cc b/locator/tablets.cc index b05e332d3e..565b7e8b86 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -75,6 +75,20 @@ tablet_transition_info::tablet_transition_info(tablet_transition_stage stage, ta , reads(get_selector_for_reads(stage)) { } +tablet_migration_streaming_info get_migration_streaming_info(const tablet_info& tinfo, const tablet_transition_info& trinfo) { + tablet_migration_streaming_info result = { + .read_from = std::unordered_set(tinfo.replicas.begin(), tinfo.replicas.end()), + .written_to = std::unordered_set(trinfo.next.begin(), trinfo.next.end()) + }; + for (auto&& r : trinfo.next) { + result.read_from.erase(r); + } + for (auto&& r : tinfo.replicas) { + result.written_to.erase(r); + } + return result; +} + tablet_replica get_leaving_replica(const tablet_info& tinfo, const tablet_transition_info& trinfo) { std::unordered_set leaving(tinfo.replicas.begin(), tinfo.replicas.end()); for (auto&& r : trinfo.next) { diff --git a/locator/tablets.hh b/locator/tablets.hh index 4948a7be9e..e143a74771 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -171,6 +171,14 @@ struct tablet_transition_info { // Returns the leaving replica for a given transition. tablet_replica get_leaving_replica(const tablet_info&, const tablet_transition_info&); +/// Describes streaming required for a given tablet transition. +struct tablet_migration_streaming_info { + std::unordered_set read_from; + std::unordered_set written_to; +}; + +tablet_migration_streaming_info get_migration_streaming_info(const tablet_info&, const tablet_transition_info&); + /// Stores information about tablets of a single table. /// /// The map contains a constant number of tablets, tablet_count(). diff --git a/service/storage_service.cc b/service/storage_service.cc index dda8f212f1..c75a412337 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1334,6 +1334,15 @@ class topology_coordinator { guard = co_await global_tablet_token_metadata_barrier(std::move(guard)); } + // In order to keep the cluster saturated, ask the load balancer for more transitions. + // Unless there is a pending topology change operation. + auto [preempt, new_guard] = should_preempt_balancing(std::move(guard)); + guard = std::move(new_guard); + if (!preempt) { + auto plan = co_await balance_tablets(get_token_metadata_ptr()); + co_await generate_migration_updates(updates, guard, plan); + } + // It's ok to execute planned updates after retaking the guard because as long // as topology is in tablet_migration state only this coordinator has a right // to advance the state machine of tablets. @@ -1365,10 +1374,26 @@ class topology_coordinator { co_await update_topology_state(std::move(guard), std::move(updates), "Finished tablet migration"); } + std::pair should_preempt_balancing(group0_guard guard) { + auto node_or_guard = get_node_to_work_on_opt(std::move(guard)); + if (auto* node = std::get_if(&node_or_guard)) { + return std::make_pair(true, std::move(node->guard)); + } + + guard = std::get(std::move(node_or_guard)); + if (_topo_sm._topology.global_request) { + return std::make_pair(true, std::move(guard)); + } + + return std::make_pair(false, std::move(guard)); + } + // Returns `true` iff there was work to do. future handle_topology_transition(group0_guard guard) { auto tstate = _topo_sm._topology.tstate; if (!tstate) { + // When adding a new source of work, make sure to update should_preempt_balancing() as well. + auto node_or_guard = get_node_to_work_on_opt(std::move(guard)); if (auto* node = std::get_if(&node_or_guard)) { co_await handle_node_transition(std::move(*node)); diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 275399c2ef..764e7db7ed 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -65,6 +65,13 @@ seastar::logger lblogger("load_balancer"); /// means that many under-loaded nodes can be driven forward to balance concurrently because the load balancer /// will alternate between them across make_plan() calls. /// +/// If the algorithm is called with active tablet migrations in tablet metadata, those are treated +/// by load balancer as if they were already completed. This allows the algorithm to incrementally +/// make decision which when executed with active migrations will produce the desired result. +/// Overload of shards which still contain migrated-away tablets is limited by the fact +/// that the algorithm tracks streaming concurrency on both source and target shards of active +/// migrations and takes concurrency limit into account when producing new migrations. +/// /// The cost of make_plan() is relatively heavy in terms of preparing data structures, so the current /// implementation is not efficient if the scheduler would like to call make_plan() multiple times /// to parallelize execution. This will be addressed in the future by keeping the data structures @@ -79,7 +86,13 @@ class load_balancer { using load_type = double; struct shard_load { - size_t tablet_count; + size_t tablet_count = 0; + + // Number of tablets which are streamed from this shard. + size_t streaming_read_load = 0; + + // Number of tablets which are streamed to this shard. + size_t streaming_write_load = 0; // Tablets which still have a replica on this shard which are candidates for migrating away from this shard. std::unordered_set candidates; @@ -120,7 +133,65 @@ class load_balancer { } }; + // Per-shard limits for active tablet streaming sessions. + // + // There is no hard reason for these values being what they are other than + // the guidelines below. + // + // We want to limit concurrency of active streaming for several reasons. + // One is that we want to prevent over-utilization of memory required to carry out streaming, + // as that may lead to OOM or excessive cache eviction. + // + // There is no network scheduler yet, so we want to avoid over-utilization of network bandwidth. + // Limiting per-shard concurrency is a lame way to achieve that, but it's better than nothing. + // + // Scheduling groups should limit impact of streaming on other kinds of processes on the same node, + // so this aspect is not the reason for limiting concurrency. + // + // We don't want too much parallelism because it means that we have plenty of migrations + // which progress slowly. It's better to have fewer which complete faster because + // less user requests suffer from double-quorum overhead, and under-loaded nodes can take + // the load sooner. At the same time, we want to have enough concurrency to fully utilize resources. + // + // Streaming speed is supposed to be I/O bound and writes are more expensive in terms of IO than reads, + // so we allow more read concurrency. + // + // We allow at least two sessions per shard so that there is less chance for idling until load balancer + // makes the next decision after streaming is finished. + const size_t max_write_streaming_load = 2; + const size_t max_read_streaming_load = 4; + token_metadata_ptr _tm; +private: + tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const { + // We reflect migrations in the load as if they already happened, + // optimistically assuming that they will succeed. + return trinfo ? trinfo->next : ti.replicas; + } + + // Whether to count the tablet as putting streaming load on the system. + // Tablets which are streaming or are yet-to-stream are counted. + bool is_streaming(const tablet_transition_info* trinfo) { + if (!trinfo) { + return false; + } + switch (trinfo->stage) { + case tablet_transition_stage::allow_write_both_read_old: + return true; + case tablet_transition_stage::write_both_read_old: + return true; + case tablet_transition_stage::streaming: + return true; + case tablet_transition_stage::write_both_read_new: + return false; + case tablet_transition_stage::use_new: + return false; + case tablet_transition_stage::cleanup: + return false; + } + on_internal_error(lblogger, format("Invalid transition stage: {}", static_cast(trinfo->stage))); + } + public: load_balancer(token_metadata_ptr tm) : _tm(std::move(tm)) { @@ -162,9 +233,14 @@ public: // Compute tablet load on nodes. - for (auto&& [table, tmap] : _tm->tablets().all_tables()) { + for (auto&& [table, tmap_] : _tm->tablets().all_tables()) { + auto& tmap = tmap_; co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) { - for (auto&& replica : ti.replicas) { + auto trinfo = tmap.get_tablet_transition_info(tid); + + // We reflect migrations in the load as if they already happened, + // optimistically assuming that they will succeed. + for (auto&& replica : get_replicas_for_tablet_load(ti, trinfo)) { if (nodes.contains(replica.host)) { nodes[replica.host].tablet_count += 1; // This invariant is assumed later. @@ -209,45 +285,50 @@ public: // We want to saturate the target node so we migrate several tablets in parallel, one for each shard // on the target node. This assumes that the target node is well-balanced and that tablet migrations // complete at the same time. Both assumptions are not generally true in practice, which we currently ignore. - // If target node is not balanced across shards, we will overload some shards. - // If tablets are not balanced in size, throughput will suffer because some shards will be idle sooner than others. + // But they will be true typically, because we fill shards starting from least-loaded shards, + // so we naturally strive towards balance between shards. // - // FIXME: To handle the above, we should (1) rebalance the target node - // before migrating tablets from other nodes. If shards are balanced on the target node, the balancer - // will naturally distribute tablets to different shards. Also, (2) we should change this algorithm - // to be a generator for migrations and have a scheduler in the execution layer which pulls migrations - // from this algorithm, batches them and decides how many to execute. - // - // The scheduler decides in which order to execute the plan based on current activity in the system. - // We cannot just ask the planner for the next migration and stop when we hit overload on some shard, - // because that can lead to underutilization of the cluster. Just because the next migration is blocked - // by the target shard being busy doesn't mean we could not proceed with migrations for other shards - // which would be produced by the planner subsequently. + // If target node is not balanced across shards, we will overload some shards. Streaming concurrency + // will suffer because more loaded shards will not participate, which will under-utilize the node. + // FIXME: To handle the above, we should rebalance the target node before migrating tablets from other nodes. auto target_node = topo.find_node(target); auto batch_size = target_node->get_shard_count(); // Compute per-shard load and candidate tablets. - for (auto&& [table, tmap] : _tm->tablets().all_tables()) { - if (!tmap.transitions().empty()) { - // FIXME: The algorithm doesn't support balancing with active transitions yet. They must finish first. - lblogger.warn("Pending transitions active."); - co_return migration_plan(); - } - + for (auto&& [table, tmap_] : _tm->tablets().all_tables()) { + auto& tmap = tmap_; co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) { - for (auto&& replica : ti.replicas) { + auto trinfo = tmap.get_tablet_transition_info(tid); + + if (is_streaming(trinfo)) { + auto streaming_info = get_migration_streaming_info(ti, *trinfo); + for (auto&& replica : streaming_info.read_from) { + if (nodes.contains(replica.host)) { + nodes[replica.host].shards[replica.shard].streaming_read_load += 1; + } + } + for (auto&& replica : streaming_info.written_to) { + if (nodes.contains(replica.host)) { + nodes[replica.host].shards[replica.shard].streaming_write_load += 1; + } + } + } + + for (auto&& replica : get_replicas_for_tablet_load(ti, trinfo)) { if (!nodes.contains(replica.host)) { continue; } auto& node_load_info = nodes[replica.host]; - auto&& shard_load_info = node_load_info.shards[replica.shard]; + shard_load& shard_load_info = node_load_info.shards[replica.shard]; if (shard_load_info.tablet_count == 0) { node_load_info.shards_by_load.push_back(replica.shard); } shard_load_info.tablet_count += 1; - shard_load_info.candidates.emplace(global_tablet_id{table, tid}); + if (!trinfo) { // migrating tablets are not candidates + shard_load_info.candidates.emplace(global_tablet_id {table, tid}); + } } }); } @@ -283,6 +364,8 @@ public: const tablet_metadata& tmeta = _tm->tablets(); load_type max_off_candidate_load = 0; // max load among nodes which ran out of candidates. auto& target_info = nodes[target]; + const size_t max_skipped_migrations = target_info.shards.size() * 2; + size_t skipped_migrations = 0; while (plan.size() < batch_size && !nodes_by_load.empty()) { co_await coroutine::maybe_yield(); @@ -385,8 +468,30 @@ public: } auto dst = global_shard_id {target, target_load.next_shard(target)}; - lblogger.debug("Select {} to move from {} to {}", source_tablet, src, dst); - plan.push_back(tablet_migration_info {source_tablet, src, dst}); + auto mig = tablet_migration_info {source_tablet, src, dst}; + + if (target_info.shards[dst.shard].streaming_write_load < max_write_streaming_load + && src_node_info.shards[src_shard].streaming_read_load < max_read_streaming_load) { + target_info.shards[dst.shard].streaming_write_load += 1; + src_node_info.shards[src_shard].streaming_read_load += 1; + lblogger.debug("Adding migration: {}", mig); + plan.push_back(std::move(mig)); + } else { + // Shards are overloaded with streaming. Do not include the migration in the plan, but + // continue as if it was in the hope that we will find a migration which can be executed without + // violating the load. Next make_plan() invocation will notice that the migration was not executed. + // We should not just stop here because that can lead to underutilization of the cluster. + // Just because the next migration is blocked doesn't mean we could not proceed with migrations + // for other shards which are produced by the planner subsequently. + lblogger.debug("Migration {} skipped because of load limit: src_load={}, dst_load={}", mig, + src_node_info.shards[src_shard].streaming_read_load, + target_info.shards[dst.shard].streaming_write_load); + skipped_migrations++; + if (skipped_migrations >= max_skipped_migrations) { + lblogger.debug("Too many migrations skipped, aborting balancing"); + break; + } + } target_info.tablet_count += 1; target_info.update(); diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh index 90d0a08a53..70341e760e 100644 --- a/service/tablet_allocator.hh +++ b/service/tablet_allocator.hh @@ -41,6 +41,13 @@ using migration_plan = utils::chunked_vector; /// co_await execute(plan); /// } /// +/// It is ok to invoke the algorithm with already active tablet migrations. The algorithm will take them into account +/// when balancing the load as if they already succeeded. This means that applying a series of migration plans +/// produced by this function will give the same result regardless of whether applying they are fully executed or +/// only initiated by creating corresponding transitions in tablet metadata. +/// +/// The algorithm takes care of limiting the streaming load on the system, also by taking active migrations into account. +/// future balance_tablets(locator::token_metadata_ptr); class tablet_allocator_impl; From 8fdbc42e7122af39f622a197703f4ae7f99debf8 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 24 Jul 2023 23:59:10 +0200 Subject: [PATCH 08/10] tests: tablets: Add test for load balancing with active migrations --- locator/tablets.cc | 4 ++ locator/tablets.hh | 2 + test/boost/tablets_test.cc | 124 +++++++++++++++++++++++++++++++++++++ 3 files changed, 130 insertions(+) diff --git a/locator/tablets.cc b/locator/tablets.cc index 565b7e8b86..7e238573a6 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -190,6 +190,10 @@ future<> tablet_map::for_each_tablet(seastar::noncopyable_function tablet_map::get_shard(tablet_id tid, host_id host) const { auto&& info = get_tablet_info(tid); diff --git a/locator/tablets.hh b/locator/tablets.hh index e143a74771..cea0aa71c1 100644 --- a/locator/tablets.hh +++ b/locator/tablets.hh @@ -293,6 +293,7 @@ public: public: void set_tablet(tablet_id, tablet_info); void set_tablet_transition_info(tablet_id, tablet_transition_info); + void clear_transitions(); // Destroys gently. // The tablet map is not usable after this call and should be destroyed. @@ -326,6 +327,7 @@ private: public: const tablet_map& get_tablet_map(table_id id) const; const table_to_tablet_map& all_tables() const { return _tablets; } + table_to_tablet_map& all_tables() { return _tablets; } size_t external_memory_usage() const; public: void set_tablet_map(table_id, tablet_map); diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 7d6110bbc5..4c27d60544 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -588,6 +588,7 @@ SEASTAR_THREAD_TEST_CASE(test_token_ownership_splitting) { } } +// Reflects the plan in a given token metadata as if the migrations were fully executed. static void apply_plan(token_metadata& tm, const migration_plan& plan) { for (auto&& mig : plan) { @@ -598,6 +599,25 @@ void apply_plan(token_metadata& tm, const migration_plan& plan) { } } +static +tablet_transition_info migration_to_transition_info(const tablet_migration_info& mig, const tablet_info& ti) { + return tablet_transition_info { + tablet_transition_stage::allow_write_both_read_old, + replace_replica(ti.replicas, mig.src, mig.dst), + mig.dst + }; +} + +// Reflects the plan in a given token metadata as if the migrations were started but not yet executed. +static +void apply_plan_as_in_progress(token_metadata& tm, const migration_plan& plan) { + for (auto&& mig : plan) { + tablet_map& tmap = tm.tablets().get_tablet_map(mig.tablet.table); + auto tinfo = tmap.get_tablet_info(mig.tablet.tablet); + tmap.set_tablet_transition_info(mig.tablet.tablet, migration_to_transition_info(mig, tinfo)); + } +} + static void rebalance_tablets(shared_token_metadata& stm) { while (true) { @@ -612,6 +632,37 @@ void rebalance_tablets(shared_token_metadata& stm) { } } +static +void rebalance_tablets_as_in_progress(shared_token_metadata& stm) { + while (true) { + auto plan = balance_tablets(stm.get()).get0(); + if (plan.empty()) { + break; + } + stm.mutate_token_metadata([&] (token_metadata& tm) { + apply_plan_as_in_progress(tm, plan); + return make_ready_future<>(); + }).get(); + } +} + +// Completes any in progress tablet migrations. +static +void execute_transitions(shared_token_metadata& stm) { + stm.mutate_token_metadata([&] (token_metadata& tm) { + for (auto&& [tablet, tmap_] : tm.tablets().all_tables()) { + auto& tmap = tmap_; + for (auto&& [tablet, trinfo]: tmap.transitions()) { + auto ti = tmap.get_tablet_info(tablet); + ti.replicas = trinfo.next; + tmap.set_tablet(tablet, ti); + } + tmap.clear_transitions(); + } + return make_ready_future<>(); + }).get(); +} + SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { // Tests the scenario of bootstrapping a single node // Verifies that load balancer sees it and moves tablets to that node. @@ -707,6 +758,79 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) { } } +SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) { + // Tests the scenario of bootstrapping a single node. + // Verifies that the load balancer balances tablets on that node + // even though there is already an active migration. + // The test verifies that the load balancer creates a plan + // which when executed will achieve perfect balance, + // which is a proof that it doesn't stop due to active migrations. + + inet_address ip1("192.168.0.1"); + inet_address ip2("192.168.0.2"); + inet_address ip3("192.168.0.3"); + + auto host1 = host_id(next_uuid()); + auto host2 = host_id(next_uuid()); + auto host3 = host_id(next_uuid()); + + auto table1 = table_id(next_uuid()); + + semaphore sem(1); + shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ + locator::topology::config{ + .this_endpoint = ip1, + .local_dc_rack = locator::endpoint_dc_rack::default_location + } + }); + + stm.mutate_token_metadata([&] (auto& tm) { + tm.update_host_id(host1, ip1); + tm.update_host_id(host2, ip2); + tm.update_host_id(host3, ip3); + tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, 1); + tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, 1); + tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, 2); + + tablet_map tmap(4); + std::optional tid = tmap.first_tablet(); + for (int i = 0; i < 4; ++i) { + tmap.set_tablet(*tid, tablet_info { + tablet_replica_set { + tablet_replica {host1, 0}, + tablet_replica {host2, 0}, + } + }); + tid = tmap.next_tablet(*tid); + } + tmap.set_tablet_transition_info(tmap.first_tablet(), tablet_transition_info { + tablet_transition_stage::allow_write_both_read_old, + tablet_replica_set { + tablet_replica {host3, 0}, + tablet_replica {host2, 0}, + }, + tablet_replica {host3, 0} + }); + tablet_metadata tmeta; + tmeta.set_tablet_map(table1, std::move(tmap)); + tm.set_tablets(std::move(tmeta)); + return make_ready_future<>(); + }).get(); + + rebalance_tablets_as_in_progress(stm); + execute_transitions(stm); + + { + load_sketch load(stm.get()); + load.populate().get(); + + for (auto h : {host1, host2, host3}) { + testlog.debug("Checking host {}", h); + BOOST_REQUIRE(load.get_avg_shard_load(h) == 2); + } + } +} + SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { inet_address ip1("192.168.0.1"); inet_address ip2("192.168.0.2"); From 96d06b58df6a0b5e3b05ccd97b7dccf42d3a67ec Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 27 Jul 2023 02:39:50 +0200 Subject: [PATCH 09/10] tests: tablets: Check that load balancing is interrupted by topology change We add a special mode of load balancing, enabled through error injection, which causes it to continuously generate plans. This should keep the topology coordinator continuously in the tablet migration track. We enable this mode in test_tablets.py:test_bootstrap before bootstrapping nodes to see that bootstrap request interrupts tablet migration track. If this would not be the case, the test will hang. --- service/tablet_allocator.cc | 66 +++++++++++-------- test/boost/tablets_test.cc | 59 +++++++++++++++++ .../test_tablets.py | 8 +++ 3 files changed, 105 insertions(+), 28 deletions(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 764e7db7ed..8afb951a5b 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -12,6 +12,7 @@ #include "replica/database.hh" #include "service/migration_manager.hh" #include "service/tablet_allocator.hh" +#include "utils/error_injection.hh" #include "utils/stall_free.hh" #include "locator/load_sketch.hh" #include "utils/div_ceil.hh" @@ -215,6 +216,12 @@ public: future make_plan(sstring dc) { lblogger.info("Examining DC {}", dc); + // Causes load balancer to move some tablet even though load is balanced. + auto shuffle = utils::get_local_injector().enter("tablet_allocator_shuffle"); + if (shuffle) { + lblogger.warn("Running without convergence checks"); + } + const locator::topology& topo = _tm->get_topology(); // Select subset of nodes to balance. @@ -269,7 +276,7 @@ public: } } - if (max_load == min_load) { + if (!shuffle && max_load == min_load) { // load is balanced. // TODO: Evaluate and fix intra-node balance. co_return migration_plan(); @@ -373,35 +380,38 @@ public: auto src_host = nodes_by_load.back(); auto& src_node_info = nodes[src_host]; - // Check if all nodes reached the same avg_load. There are three sets of nodes: target, candidates (nodes_by_load) - // and off-candidates (removed from nodes_by_load). At any time, the avg_load for target is not greater than - // that of any candidate, and avg_load of any candidate is not greater than that of any in the off-candidates set. - // This is ensured by the fact that we remove candidates in the order of avg_load from the heap, and - // because we prevent load inversion between candidate and target in the next check. - // So the max avg_load of candidates is that of the current src_node_info, and max avg_load of off-candidates - // is tracked in max_off_candidate_load. If max_off_candidate_load is equal to target's avg_load, - // it means that all nodes have equal avg_load. We take the maximum with the current candidate in src_node_info - // to handle the case of off-candidates being empty. In that case, max_off_candidate_load is 0. - if (std::max(max_off_candidate_load, src_node_info.avg_load) == target_info.avg_load) { - lblogger.debug("Balance achieved."); - break; - } + if (!shuffle) { + // Check if all nodes reached the same avg_load. There are three sets of nodes: target, candidates (nodes_by_load) + // and off-candidates (removed from nodes_by_load). At any time, the avg_load for target is not greater than + // that of any candidate, and avg_load of any candidate is not greater than that of any in the off-candidates set. + // This is ensured by the fact that we remove candidates in the order of avg_load from the heap, and + // because we prevent load inversion between candidate and target in the next check. + // So the max avg_load of candidates is that of the current src_node_info, and max avg_load of off-candidates + // is tracked in max_off_candidate_load. If max_off_candidate_load is equal to target's avg_load, + // it means that all nodes have equal avg_load. We take the maximum with the current candidate in src_node_info + // to handle the case of off-candidates being empty. In that case, max_off_candidate_load is 0. + if (std::max(max_off_candidate_load, src_node_info.avg_load) == target_info.avg_load) { + lblogger.debug("Balance achieved."); + break; + } - // If balance is not achieved, still consider migrating from candidate nodes which have higher load than the target. - // max_off_candidate_load may be higher than the load of current candidate. - if (src_node_info.avg_load <= target_info.avg_load) { - lblogger.debug("No more candidate nodes."); - lblogger.debug("No more candidate nodes. Next candidate is {} with avg_load={}, target's avg_load={}", - src_host, src_node_info.avg_load, target_info.avg_load); - break; - } + // If balance is not achieved, still consider migrating from candidate nodes which have higher load than the target. + // max_off_candidate_load may be higher than the load of current candidate. + if (src_node_info.avg_load <= target_info.avg_load) { + lblogger.debug("No more candidate nodes."); + lblogger.debug("No more candidate nodes. Next candidate is {} with avg_load={}, target's avg_load={}", + src_host, src_node_info.avg_load, target_info.avg_load); + break; + } - // Prevent load inversion which can lead to oscillations. - if (src_node_info.get_avg_load(nodes[src_host].tablet_count - 1) < - target_info.get_avg_load(target_info.tablet_count + 1)) { - lblogger.debug("No more candidate nodes, load would be inverted. Next candidate is {} with avg_load={}, target's avg_load={}", - src_host, src_node_info.avg_load, target_info.avg_load); - break; + // Prevent load inversion which can lead to oscillations. + if (src_node_info.get_avg_load(nodes[src_host].tablet_count - 1) < + target_info.get_avg_load(target_info.tablet_count + 1)) { + lblogger.debug("No more candidate nodes, load would be inverted. Next candidate is {} with " + "avg_load={}, target's avg_load={}", + src_host, src_node_info.avg_load, target_info.avg_load); + break; + } } if (src_node_info.shards_by_load.empty()) { diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 4c27d60544..da4a4d921a 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -25,6 +25,7 @@ #include "locator/tablet_replication_strategy.hh" #include "utils/fb_utilities.hh" #include "utils/UUID_gen.hh" +#include "utils/error_injection.hh" using namespace locator; using namespace replica; @@ -831,6 +832,64 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) } } +#ifdef SCYLLA_ENABLE_ERROR_INJECTION +SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) { + inet_address ip1("192.168.0.1"); + inet_address ip2("192.168.0.2"); + inet_address ip3("192.168.0.3"); + + auto host1 = host_id(next_uuid()); + auto host2 = host_id(next_uuid()); + auto host3 = host_id(next_uuid()); + + auto table1 = table_id(next_uuid()); + + semaphore sem(1); + shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{ + locator::topology::config{ + .this_endpoint = ip1, + .local_dc_rack = locator::endpoint_dc_rack::default_location + } + }); + + stm.mutate_token_metadata([&] (auto& tm) { + tm.update_host_id(host1, ip1); + tm.update_host_id(host2, ip2); + tm.update_host_id(host3, ip3); + tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, 1); + tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, 1); + tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, 2); + + tablet_map tmap(4); + std::optional tid = tmap.first_tablet(); + for (int i = 0; i < 4; ++i) { + tmap.set_tablet(*tid, tablet_info { + tablet_replica_set { + tablet_replica {host1, 0}, + tablet_replica {host2, 0}, + } + }); + tid = tmap.next_tablet(*tid); + } + tablet_metadata tmeta; + tmeta.set_tablet_map(table1, std::move(tmap)); + tm.set_tablets(std::move(tmeta)); + return make_ready_future<>(); + }).get(); + + rebalance_tablets(stm); + + BOOST_REQUIRE(balance_tablets(stm.get()).get0().empty()); + + utils::get_local_injector().enable("tablet_allocator_shuffle"); + auto disable_injection = seastar::defer([&] { + utils::get_local_injector().disable("tablet_allocator_shuffle"); + }); + + BOOST_REQUIRE(!balance_tablets(stm.get()).get0().empty()); +} +#endif + SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) { inet_address ip1("192.168.0.1"); inet_address ip2("192.168.0.2"); diff --git a/test/topology_experimental_raft/test_tablets.py b/test/topology_experimental_raft/test_tablets.py index 0e5b4b144c..91171e61fe 100644 --- a/test/topology_experimental_raft/test_tablets.py +++ b/test/topology_experimental_raft/test_tablets.py @@ -6,6 +6,7 @@ from test.pylib.manager_client import ManagerClient from test.pylib.rest_client import inject_error_one_shot +from test.pylib.rest_client import inject_error from test.pylib.util import wait_for_cql_and_get_hosts import pytest @@ -22,6 +23,11 @@ async def inject_error_one_shot_on(manager, error_name, servers): await asyncio.gather(*errs) +async def inject_error_on(manager, error_name, servers): + errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers] + await asyncio.gather(*errs) + + @pytest.mark.asyncio async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(manager: ManagerClient): """Test that you can create a table and insert and query data""" @@ -133,6 +139,8 @@ async def test_bootstrap(manager: ManagerClient): for r in rows: assert r.c == r.pk + await inject_error_on(manager, "tablet_allocator_shuffle", servers) + logger.info("Adding new server") await manager.server_add() From 3f221b1f0547be40cfd211daa3fdbad447f350a9 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 27 Jul 2023 03:12:50 +0200 Subject: [PATCH 10/10] tablets: load_balancer: Remove double logging --- service/tablet_allocator.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc index 8afb951a5b..acdef7e1fc 100644 --- a/service/tablet_allocator.cc +++ b/service/tablet_allocator.cc @@ -398,7 +398,6 @@ public: // If balance is not achieved, still consider migrating from candidate nodes which have higher load than the target. // max_off_candidate_load may be higher than the load of current candidate. if (src_node_info.avg_load <= target_info.avg_load) { - lblogger.debug("No more candidate nodes."); lblogger.debug("No more candidate nodes. Next candidate is {} with avg_load={}, target's avg_load={}", src_host, src_node_info.avg_load, target_info.avg_load); break;