Compare commits
5 Commits
copilot/im
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8e956fda27 | ||
|
|
d65c5926d5 | ||
|
|
1390736e81 | ||
|
|
624331018d | ||
|
|
3b4ff94437 |
@@ -1879,9 +1879,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
|
||||
bool has_nodes_to_drain = false;
|
||||
bool has_pending_finalizations = false;
|
||||
if (!preempt) {
|
||||
auto plan = co_await _tablet_allocator.balance_tablets(get_token_metadata_ptr(), &_topo_sm._topology, &_sys_ks, {}, get_dead_nodes());
|
||||
has_nodes_to_drain = plan.has_nodes_to_drain();
|
||||
has_pending_finalizations = !plan.resize_plan().finalize_resize.empty();
|
||||
if (!drain || plan.has_nodes_to_drain()) {
|
||||
co_await generate_migration_updates(updates, guard, plan);
|
||||
}
|
||||
@@ -1938,12 +1940,24 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
co_await sleep(3s); // Throttle retries
|
||||
co_return;
|
||||
}
|
||||
updates.emplace_back(
|
||||
topology_mutation_builder(guard.write_timestamp())
|
||||
.set_transition_state(topology::transition_state::write_both_read_old)
|
||||
.set_session(session_id(guard.new_group0_state_id()))
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.build());
|
||||
// If there are pending merge finalizations, transition to tablet_resize_finalization
|
||||
// to handle them before continuing with decommission
|
||||
if (has_pending_finalizations) {
|
||||
updates.emplace_back(
|
||||
topology_mutation_builder(guard.write_timestamp())
|
||||
.set_transition_state(
|
||||
_feature_service.tablet_merge ? topology::transition_state::tablet_resize_finalization
|
||||
: topology::transition_state::tablet_split_finalization)
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.build());
|
||||
} else {
|
||||
updates.emplace_back(
|
||||
topology_mutation_builder(guard.write_timestamp())
|
||||
.set_transition_state(topology::transition_state::write_both_read_old)
|
||||
.set_session(session_id(guard.new_group0_state_id()))
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.build());
|
||||
}
|
||||
} else {
|
||||
updates.emplace_back(
|
||||
topology_mutation_builder(guard.write_timestamp())
|
||||
@@ -2055,11 +2069,24 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
}
|
||||
|
||||
updates.emplace_back(
|
||||
topology_mutation_builder(guard.write_timestamp())
|
||||
.del_transition_state()
|
||||
.set_version(_topo_sm._topology.version + 1)
|
||||
.build());
|
||||
// Check if there are nodes being decommissioned or removed that still need further processing
|
||||
bool has_draining_nodes = false;
|
||||
for (const auto& [id, rs] : _topo_sm._topology.transition_nodes) {
|
||||
if (rs.state == node_state::decommissioning || rs.state == node_state::removing) {
|
||||
has_draining_nodes = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
topology_mutation_builder tbuilder(guard.write_timestamp());
|
||||
if (has_draining_nodes) {
|
||||
tbuilder.set_transition_state(topology::transition_state::tablet_draining);
|
||||
} else {
|
||||
tbuilder.del_transition_state();
|
||||
}
|
||||
tbuilder.set_version(_topo_sm._topology.version + 1);
|
||||
updates.emplace_back(tbuilder.build());
|
||||
|
||||
co_await update_topology_state(std::move(guard), std::move(updates), format("Finished tablet resize finalization"));
|
||||
|
||||
if (auto old_load_stats = _tablet_allocator.get_load_stats()) {
|
||||
|
||||
Reference in New Issue
Block a user