From 0bee872fb15fa9cfd1517ca493173a75206b7ca5 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Tue, 25 Apr 2023 15:24:35 +0200 Subject: [PATCH] raft topology: rename `update_replica_state` -> `update_topology_state` The new name is more generic and appropriate for topology transitions which don't affect any specific replica but the entire cluster as a whole (which we'll introduce later). Also take `guard` directly instead of `node_to_work_on` in this more generic function. Since we want `node_to_work_on` to die when we steal its guard, introduce `take_guard` which takes ownership of the object and returns the guard. --- service/storage_service.cc | 38 +++++++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index a53e9fa6d9..a1f6e62a52 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -714,13 +714,17 @@ class topology_coordinator { co_return node_to_work_on{std::move(guard), &topo, id, &it->second, std::move(req), std::move(req_param)}; } - future<> update_replica_state( - node_to_work_on&& node, std::vector&& updates, const sstring& reason) { + group0_guard take_guard(node_to_work_on&& node) { + return std::move(node.guard); + } + + future<> update_topology_state( + group0_guard guard, std::vector&& updates, const sstring& reason) { try { slogger.trace("raft topology: do update {} reason {}", updates, reason); topology_change change{std::move(updates)}; - group0_command g0_cmd = _group0.client().prepare_command(std::move(change), node.guard, reason); - co_await _group0.client().add_entry(std::move(g0_cmd), std::move(node.guard)); + group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason); + co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard)); } catch (group0_concurrent_modification&) { slogger.info("raft topology: race while changing state: {}. Retrying", reason); throw; @@ -868,7 +872,7 @@ class topology_coordinator { builder.set_transition_state(topology::transition_state::write_both_read_old) .set_current_cdc_generation_id(cdc_gen_id); auto str = fmt::format("{}: committed new CDC generation, ID: {}", node.rs->state, cdc_gen_id); - co_await update_replica_state(std::move(node), {builder.build()}, std::move(str)); + co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str)); } break; case topology::transition_state::write_both_read_old: { @@ -915,7 +919,7 @@ class topology_coordinator { topology_mutation_builder builder(node.guard.write_timestamp(), node.id); builder.set_transition_state(topology::transition_state::write_both_read_new); auto str = fmt::format("{}: streaming completed for node {}", node.rs->state, node.id); - co_await update_replica_state(std::move(node), {builder.build()}, std::move(str)); + co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str)); } break; case topology::transition_state::write_both_read_new: { @@ -933,7 +937,8 @@ class topology_coordinator { topology_mutation_builder builder(node.guard.write_timestamp(), node.id); builder.del_transition_state() .set("node_state", node_state::normal); - co_await update_replica_state(std::move(node), {builder.build()}, "bootstrap: read fence completed"); + co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, + "bootstrap: read fence completed"); } break; case node_state::decommissioning: @@ -943,7 +948,7 @@ class topology_coordinator { .set("node_state", node_state::left) .del_transition_state(); auto str = fmt::format("{}: read fence completed", node.rs->state); - co_await update_replica_state(std::move(node), {builder.build()}, std::move(str)); + co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str)); } break; case node_state::replacing: { @@ -956,7 +961,7 @@ class topology_coordinator { topology_mutation_builder builder2(node.guard.write_timestamp(), parse_replaced_node(node)); builder2.del("tokens") .set("node_state", node_state::left); - co_await update_replica_state(std::move(node), {builder1.build(), builder2.build()}, + co_await update_topology_state(take_guard(std::move(node)), {builder1.build(), builder2.build()}, "replace: read fence completed"); } break; @@ -1057,7 +1062,7 @@ class topology_coordinator { updates.push_back(builder.build()); auto reason = format( "bootstrap: assign tokens and insert CDC generation data (UUID: {})", gen_uuid); - co_await update_replica_state(std::move(node), {std::move(updates)}, reason); + co_await update_topology_state(take_guard(std::move(node)), {std::move(updates)}, reason); break; } case topology_request::leave: @@ -1068,7 +1073,8 @@ class topology_coordinator { builder.set("node_state", node_state::decommissioning) .del("topology_request") .set_transition_state(topology::transition_state::write_both_read_old); - co_await update_replica_state(std::move(node), {builder.build()}, "start decommission"); + co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, + "start decommission"); break; case topology_request::remove: assert(node.rs->ring); @@ -1078,7 +1084,8 @@ class topology_coordinator { builder.set("node_state", node_state::removing) .del("topology_request") .set_transition_state(topology::transition_state::write_both_read_old); - co_await update_replica_state(std::move(node), {builder.build()}, "start removenode"); + co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, + "start removenode"); break; case topology_request::replace: { assert(!node.rs->ring); @@ -1093,14 +1100,15 @@ class topology_coordinator { .del("topology_request") .set("tokens", it->second.ring->tokens) .set_transition_state(topology::transition_state::write_both_read_old); - co_await update_replica_state(std::move(node), {builder.build()}, "start replace"); + co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, "start replace"); break; } case topology_request::rebuild: { topology_mutation_builder builder(node.guard.write_timestamp(), node.id); builder.set("node_state", node_state::rebuilding) .del("topology_request"); - co_await update_replica_state(std::move(node), {builder.build()}, "start rebuilding"); + co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, + "start rebuilding"); break; } } @@ -1112,7 +1120,7 @@ class topology_coordinator { topology_mutation_builder builder(node.guard.write_timestamp(), node.id); builder.set("node_state", node_state::normal) .del("rebuild_option"); - co_await update_replica_state(std::move(node), {builder.build()}, "rebuilding completed"); + co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, "rebuilding completed"); } break; case node_state::bootstrapping: