raft topology: rename update_replica_state -> update_topology_state
The new name is more generic and appropriate for topology transitions which don't affect any specific replica but the entire cluster as a whole (which we'll introduce later). Also take `guard` directly instead of `node_to_work_on` in this more generic function. Since we want `node_to_work_on` to die when we steal its guard, introduce `take_guard` which takes ownership of the object and returns the guard.
This commit is contained in:
@@ -714,13 +714,17 @@ class topology_coordinator {
|
||||
co_return node_to_work_on{std::move(guard), &topo, id, &it->second, std::move(req), std::move(req_param)};
|
||||
}
|
||||
|
||||
future<> update_replica_state(
|
||||
node_to_work_on&& node, std::vector<canonical_mutation>&& updates, const sstring& reason) {
|
||||
group0_guard take_guard(node_to_work_on&& node) {
|
||||
return std::move(node.guard);
|
||||
}
|
||||
|
||||
future<> update_topology_state(
|
||||
group0_guard guard, std::vector<canonical_mutation>&& updates, const sstring& reason) {
|
||||
try {
|
||||
slogger.trace("raft topology: do update {} reason {}", updates, reason);
|
||||
topology_change change{std::move(updates)};
|
||||
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), node.guard, reason);
|
||||
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(node.guard));
|
||||
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
|
||||
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard));
|
||||
} catch (group0_concurrent_modification&) {
|
||||
slogger.info("raft topology: race while changing state: {}. Retrying", reason);
|
||||
throw;
|
||||
@@ -868,7 +872,7 @@ class topology_coordinator {
|
||||
builder.set_transition_state(topology::transition_state::write_both_read_old)
|
||||
.set_current_cdc_generation_id(cdc_gen_id);
|
||||
auto str = fmt::format("{}: committed new CDC generation, ID: {}", node.rs->state, cdc_gen_id);
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, std::move(str));
|
||||
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str));
|
||||
}
|
||||
break;
|
||||
case topology::transition_state::write_both_read_old: {
|
||||
@@ -915,7 +919,7 @@ class topology_coordinator {
|
||||
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
|
||||
builder.set_transition_state(topology::transition_state::write_both_read_new);
|
||||
auto str = fmt::format("{}: streaming completed for node {}", node.rs->state, node.id);
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, std::move(str));
|
||||
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str));
|
||||
}
|
||||
break;
|
||||
case topology::transition_state::write_both_read_new: {
|
||||
@@ -933,7 +937,8 @@ class topology_coordinator {
|
||||
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
|
||||
builder.del_transition_state()
|
||||
.set("node_state", node_state::normal);
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, "bootstrap: read fence completed");
|
||||
co_await update_topology_state(take_guard(std::move(node)), {builder.build()},
|
||||
"bootstrap: read fence completed");
|
||||
}
|
||||
break;
|
||||
case node_state::decommissioning:
|
||||
@@ -943,7 +948,7 @@ class topology_coordinator {
|
||||
.set("node_state", node_state::left)
|
||||
.del_transition_state();
|
||||
auto str = fmt::format("{}: read fence completed", node.rs->state);
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, std::move(str));
|
||||
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str));
|
||||
}
|
||||
break;
|
||||
case node_state::replacing: {
|
||||
@@ -956,7 +961,7 @@ class topology_coordinator {
|
||||
topology_mutation_builder builder2(node.guard.write_timestamp(), parse_replaced_node(node));
|
||||
builder2.del("tokens")
|
||||
.set("node_state", node_state::left);
|
||||
co_await update_replica_state(std::move(node), {builder1.build(), builder2.build()},
|
||||
co_await update_topology_state(take_guard(std::move(node)), {builder1.build(), builder2.build()},
|
||||
"replace: read fence completed");
|
||||
}
|
||||
break;
|
||||
@@ -1057,7 +1062,7 @@ class topology_coordinator {
|
||||
updates.push_back(builder.build());
|
||||
auto reason = format(
|
||||
"bootstrap: assign tokens and insert CDC generation data (UUID: {})", gen_uuid);
|
||||
co_await update_replica_state(std::move(node), {std::move(updates)}, reason);
|
||||
co_await update_topology_state(take_guard(std::move(node)), {std::move(updates)}, reason);
|
||||
break;
|
||||
}
|
||||
case topology_request::leave:
|
||||
@@ -1068,7 +1073,8 @@ class topology_coordinator {
|
||||
builder.set("node_state", node_state::decommissioning)
|
||||
.del("topology_request")
|
||||
.set_transition_state(topology::transition_state::write_both_read_old);
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, "start decommission");
|
||||
co_await update_topology_state(take_guard(std::move(node)), {builder.build()},
|
||||
"start decommission");
|
||||
break;
|
||||
case topology_request::remove:
|
||||
assert(node.rs->ring);
|
||||
@@ -1078,7 +1084,8 @@ class topology_coordinator {
|
||||
builder.set("node_state", node_state::removing)
|
||||
.del("topology_request")
|
||||
.set_transition_state(topology::transition_state::write_both_read_old);
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, "start removenode");
|
||||
co_await update_topology_state(take_guard(std::move(node)), {builder.build()},
|
||||
"start removenode");
|
||||
break;
|
||||
case topology_request::replace: {
|
||||
assert(!node.rs->ring);
|
||||
@@ -1093,14 +1100,15 @@ class topology_coordinator {
|
||||
.del("topology_request")
|
||||
.set("tokens", it->second.ring->tokens)
|
||||
.set_transition_state(topology::transition_state::write_both_read_old);
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, "start replace");
|
||||
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, "start replace");
|
||||
break;
|
||||
}
|
||||
case topology_request::rebuild: {
|
||||
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
|
||||
builder.set("node_state", node_state::rebuilding)
|
||||
.del("topology_request");
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, "start rebuilding");
|
||||
co_await update_topology_state(take_guard(std::move(node)), {builder.build()},
|
||||
"start rebuilding");
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -1112,7 +1120,7 @@ class topology_coordinator {
|
||||
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
|
||||
builder.set("node_state", node_state::normal)
|
||||
.del("rebuild_option");
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, "rebuilding completed");
|
||||
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, "rebuilding completed");
|
||||
}
|
||||
break;
|
||||
case node_state::bootstrapping:
|
||||
|
||||
Reference in New Issue
Block a user