db/view/view_building_coordinator: handle tablet operations
If the view building coordinator is running, adjust view_building_tasks in case of tablet operations. The mutations are generated in the same batch as tablet mutations. At the start of tablet migration/resize/RF change, started view building tasks are aborted (by setting ABORTED state) if needed. Then, new adjusted tasks are created in group0 batch which ends the tablet operation and aborted tasks are removed from the table. In case the tablet operation fails or is revoked, aborted view building tasks are rollback by creating new copies of them and aborted ones are deleted from the table. View building tasks are not aborted/changed during tablet repair, because in this case, even if vb task is started, a staging sstable will be generated.
This commit is contained in:
@@ -12,6 +12,7 @@
|
||||
#include <ranges>
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include "db/view/view_building_coordinator.hh"
|
||||
#include "db/view/view_build_status.hh"
|
||||
#include "locator/tablets.hh"
|
||||
@@ -25,6 +26,7 @@
|
||||
#include "service/topology_coordinator.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "db/view/view_building_task_mutation_builder.hh"
|
||||
#include "utils/assert.hh"
|
||||
#include "idl/view.dist.hh"
|
||||
|
||||
@@ -486,6 +488,258 @@ future<> view_building_coordinator::stop() {
|
||||
});
|
||||
}
|
||||
|
||||
void view_building_coordinator::generate_tablet_migration_updates(utils::chunked_vector<canonical_mutation>& out, const service::group0_guard& guard, const locator::tablet_map& tmap, locator::global_tablet_id gid, const locator::tablet_transition_info& trinfo) {
|
||||
vbc_logger.debug("Generating updates for tablet migration for table {}", gid.table);
|
||||
|
||||
if (!_vb_sm.building_state.tasks_state.contains(gid.table)) {
|
||||
vbc_logger.debug("No view building tasks for table {} - skipping tablet migration updates generation", gid.table);
|
||||
return;
|
||||
}
|
||||
|
||||
auto& tinfo = tmap.get_tablet_info(gid.tablet);
|
||||
auto leaving_replica = locator::get_leaving_replica(tinfo, trinfo);
|
||||
|
||||
if (!leaving_replica && !trinfo.pending_replica) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto last_token = tmap.get_last_token(gid.tablet);
|
||||
view_building_task_mutation_builder builder(guard.write_timestamp());
|
||||
|
||||
auto create_task_copy_on_pending_replica = [&] (const view_building_task& task) {
|
||||
auto new_id = builder.new_id();
|
||||
builder.set_type(new_id, task.type)
|
||||
.set_state(new_id, view_building_task::task_state::idle)
|
||||
.set_base_id(new_id, task.base_id)
|
||||
.set_last_token(new_id, task.last_token)
|
||||
.set_replica(new_id, *trinfo.pending_replica);
|
||||
if (task.view_id) {
|
||||
builder.set_view_id(new_id, *task.view_id);
|
||||
}
|
||||
};
|
||||
|
||||
if (leaving_replica && trinfo.pending_replica) {
|
||||
// tablet migration
|
||||
auto tasks_to_migrate = _vb_sm.building_state.collect_tasks_by_last_token(gid.table, *leaving_replica)[last_token];
|
||||
for (auto& task: tasks_to_migrate) {
|
||||
create_task_copy_on_pending_replica(task);
|
||||
builder.del_task(task.id);
|
||||
vbc_logger.debug("Task {} was migrated from {} to {}.", task.id, task.replica, *trinfo.pending_replica);
|
||||
}
|
||||
|
||||
} else if (leaving_replica) {
|
||||
// RF decrease
|
||||
auto tasks_to_abort = _vb_sm.building_state.collect_tasks_by_last_token(gid.table, *leaving_replica)[last_token];
|
||||
for (auto& task: tasks_to_abort) {
|
||||
builder.del_task(task.id);
|
||||
vbc_logger.debug("Aborting task {} for abandoning replica {}", task.id, task.replica);
|
||||
}
|
||||
|
||||
} else if (trinfo.pending_replica) {
|
||||
// RF increase
|
||||
// Filter out staging tasks and group by remaining by view_id.
|
||||
// If a view has any unfinished task for this tablet id, create a task for each new replica.
|
||||
// TODO:
|
||||
// This might be optimized out depending on how data on the new replicas is built.
|
||||
// If all tablet replicas are built for the view, we're sure new view's replicas will also get correct data.
|
||||
std::unordered_map<::table_id, std::vector<view_building_task>> tasks_per_view;
|
||||
auto tasks_for_tablet = _vb_sm.building_state.collect_tasks_by_last_token(gid.table)[last_token];
|
||||
for (auto& t: tasks_for_tablet | std::views::filter([] (const view_building_task& t) {
|
||||
return t.type == view_building_task::task_type::build_range;
|
||||
})) {
|
||||
tasks_per_view[*t.view_id].push_back(t);
|
||||
}
|
||||
|
||||
for (auto& [_, tasks_for_view]: tasks_per_view) {
|
||||
auto task = tasks_for_view.front();
|
||||
create_task_copy_on_pending_replica(task);
|
||||
vbc_logger.debug("Creating new task for pending replica {}", *trinfo.pending_replica);
|
||||
}
|
||||
}
|
||||
|
||||
out.emplace_back(builder.build());
|
||||
}
|
||||
|
||||
void view_building_coordinator::generate_tablet_resize_updates(utils::chunked_vector<canonical_mutation>& out, const service::group0_guard& guard, table_id table_id, const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap) {
|
||||
vbc_logger.debug("Generating updates for tablet resize for table {}", table_id);
|
||||
if (!_vb_sm.building_state.tasks_state.contains(table_id)) {
|
||||
vbc_logger.debug("No view building tasks for table {} - skipping tablet migration updates generation", table_id);
|
||||
return;
|
||||
}
|
||||
|
||||
if (old_tmap.tablet_count() == new_tmap.tablet_count()) {
|
||||
vbc_logger.debug("Tablet map size wasn't changed - skipping tablet migration updates generation");
|
||||
return;
|
||||
}
|
||||
bool is_split = old_tmap.tablet_count() < new_tmap.tablet_count();
|
||||
view_building_task_mutation_builder builder(guard.write_timestamp());
|
||||
|
||||
auto create_task_copy = [&] (const view_building_task& task, dht::token last_token) -> utils::UUID {
|
||||
auto new_id = builder.new_id();
|
||||
builder.set_type(new_id, task.type)
|
||||
.set_state(new_id, view_building_task::task_state::idle)
|
||||
.set_base_id(new_id, task.base_id)
|
||||
.set_last_token(new_id, last_token)
|
||||
.set_replica(new_id, task.replica);
|
||||
if (task.view_id) {
|
||||
builder.set_view_id(new_id, *task.view_id);
|
||||
}
|
||||
return new_id;
|
||||
};
|
||||
|
||||
// Task with tablet id `n` is split into 2 tasks with tablet ids `2n` and `2n+1`
|
||||
auto split_task = [&] (const view_building_task& task) {
|
||||
auto new_tid = locator::tablet_id{old_tmap.get_tablet_id(task.last_token).id * 2};
|
||||
|
||||
auto new_id = create_task_copy(task, new_tmap.get_last_token(new_tid));
|
||||
auto new_id2 = create_task_copy(task, new_tmap.get_last_token(locator::tablet_id{new_tid.id + 1}));
|
||||
builder.del_task(task.id);
|
||||
|
||||
vbc_logger.debug("Task {} was split into task {} and task {}", task.id, new_id, new_id2);
|
||||
};
|
||||
|
||||
// Task with tablet id `n` is updated to new task with tablet id `n/2` (integer division).
|
||||
// If task with tablet id `n/2` is already created (information is stored in `created_tablet_ids`), only old task is removed.
|
||||
auto merge_task = [&] (std::unordered_set<locator::tablet_id>& created_tablet_ids, const view_building_task& task) {
|
||||
builder.del_task(task.id);
|
||||
|
||||
auto new_tid = locator::tablet_id(old_tmap.get_tablet_id(task.last_token).id / 2);
|
||||
if (!created_tablet_ids.contains(new_tid)) {
|
||||
created_tablet_ids.insert(new_tid);
|
||||
auto new_id = create_task_copy(task, new_tmap.get_last_token(new_tid));
|
||||
vbc_logger.debug("Task {} was merged into task {} ", task.id, new_id);
|
||||
} else {
|
||||
vbc_logger.debug("Task {} was removed during tablet merge. Task ending at token {} was already created.", task.id, task.last_token);
|
||||
}
|
||||
};
|
||||
|
||||
auto resize_task_map = [&] (const task_map& task_map) {
|
||||
std::unordered_set<locator::tablet_id> new_tasks_tablet_ids;
|
||||
for (auto& [_, task]: task_map) {
|
||||
if (is_split) {
|
||||
split_task(task);
|
||||
} else {
|
||||
merge_task(new_tasks_tablet_ids, task);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
for (auto& [_, replica_tasks]: _vb_sm.building_state.tasks_state.at(table_id)) {
|
||||
// Resize build_range tasks
|
||||
for (auto& [_, view_tasks]: replica_tasks.view_tasks) {
|
||||
resize_task_map(view_tasks);
|
||||
}
|
||||
// Migrate process_staging tasks
|
||||
resize_task_map(replica_tasks.staging_tasks);
|
||||
}
|
||||
|
||||
out.emplace_back(builder.build());
|
||||
}
|
||||
|
||||
void view_building_coordinator::abort_tasks(utils::chunked_vector<canonical_mutation>& out, const service::group0_guard& guard, table_id table_id) {
|
||||
if (!_vb_sm.building_state.tasks_state.contains(table_id)) {
|
||||
return;
|
||||
}
|
||||
vbc_logger.debug("Generating abort mutations for tasks for table {}", table_id);
|
||||
|
||||
view_building_task_mutation_builder builder(guard.write_timestamp());
|
||||
auto abort_task_map = [&] (const task_map& task_map) {
|
||||
for (auto& [id, _]: task_map) {
|
||||
vbc_logger.debug("Aborting task {}", id);
|
||||
builder.set_state(id, view_building_task::task_state::aborted);
|
||||
}
|
||||
};
|
||||
|
||||
for (auto& [_, replica_tasks]: _vb_sm.building_state.tasks_state.at(table_id)) {
|
||||
for (auto& [_, building_task_map]: replica_tasks.view_tasks) {
|
||||
abort_task_map(building_task_map);
|
||||
}
|
||||
abort_task_map(replica_tasks.staging_tasks);
|
||||
}
|
||||
|
||||
out.emplace_back(builder.build());
|
||||
}
|
||||
|
||||
void view_building_coordinator::abort_tasks(utils::chunked_vector<canonical_mutation>& out, const service::group0_guard& guard, table_id table_id, locator::tablet_replica replica, dht::token last_token) {
|
||||
return abort_view_building_tasks(_vb_sm, out, guard.write_timestamp(), table_id, replica, last_token);
|
||||
}
|
||||
|
||||
void abort_view_building_tasks(const view_building_state_machine& vb_sm,
|
||||
utils::chunked_vector<canonical_mutation>& out, api::timestamp_type write_timestamp, table_id table_id, const locator::tablet_replica& replica, dht::token last_token) {
|
||||
if (!vb_sm.building_state.tasks_state.contains(table_id) || !vb_sm.building_state.tasks_state.at(table_id).contains(replica)) {
|
||||
return;
|
||||
}
|
||||
vbc_logger.debug("Generating abort mutations for tasks for table {} on replica {} and last token {}", table_id, replica, last_token);
|
||||
|
||||
view_building_task_mutation_builder builder(write_timestamp);
|
||||
auto abort_task_map = [&] (const task_map& task_map) {
|
||||
for (auto& [id, task]: task_map) {
|
||||
if (task.last_token == last_token) {
|
||||
vbc_logger.debug("Aborting task {}", id);
|
||||
builder.set_state(id, view_building_task::task_state::aborted);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
auto& replica_tasks = vb_sm.building_state.tasks_state.at(table_id).at(replica);
|
||||
for (auto& [_, building_task_map]: replica_tasks.view_tasks) {
|
||||
abort_task_map(building_task_map);
|
||||
}
|
||||
abort_task_map(replica_tasks.staging_tasks);
|
||||
|
||||
out.emplace_back(builder.build());
|
||||
}
|
||||
|
||||
static void rollback_task_map(view_building_task_mutation_builder& builder, const task_map& task_map) {
|
||||
for (auto& [id, task]: task_map) {
|
||||
if (task.state == view_building_task::task_state::aborted) {
|
||||
auto new_id = builder.new_id();
|
||||
builder.set_type(new_id, task.type)
|
||||
.set_state(new_id, view_building_task::task_state::idle)
|
||||
.set_base_id(new_id, task.base_id)
|
||||
.set_last_token(new_id, task.last_token)
|
||||
.set_replica(new_id, task.replica);
|
||||
if (task.view_id) {
|
||||
builder.set_view_id(new_id, *task.view_id);
|
||||
}
|
||||
builder.del_task(task.id);
|
||||
vbc_logger.debug("Aborted task {} was recreated with new id {}", task.id, new_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void view_building_coordinator::rollback_aborted_tasks(utils::chunked_vector<canonical_mutation>& out, const service::group0_guard& guard, table_id table_id) {
|
||||
if (!_vb_sm.building_state.tasks_state.contains(table_id)) {
|
||||
return;
|
||||
}
|
||||
|
||||
view_building_task_mutation_builder builder(guard.write_timestamp());
|
||||
auto& base_tasks = _vb_sm.building_state.tasks_state.at(table_id);
|
||||
for (auto& [_, replica_tasks]: base_tasks) {
|
||||
for (auto& [_, building_task_map]: replica_tasks.view_tasks) {
|
||||
rollback_task_map(builder, building_task_map);
|
||||
}
|
||||
rollback_task_map(builder, replica_tasks.staging_tasks);
|
||||
}
|
||||
|
||||
out.emplace_back(builder.build());
|
||||
}
|
||||
|
||||
void view_building_coordinator::rollback_aborted_tasks(utils::chunked_vector<canonical_mutation>& out, const service::group0_guard& guard, table_id table_id, locator::tablet_replica replica, dht::token last_token) {
|
||||
if (!_vb_sm.building_state.tasks_state.contains(table_id) || !_vb_sm.building_state.tasks_state.at(table_id).contains(replica)) {
|
||||
return;
|
||||
}
|
||||
|
||||
view_building_task_mutation_builder builder(guard.write_timestamp());
|
||||
auto& replica_tasks = _vb_sm.building_state.tasks_state.at(table_id).at(replica);
|
||||
for (auto& [_, building_task_map]: replica_tasks.view_tasks) {
|
||||
rollback_task_map(builder, building_task_map);
|
||||
}
|
||||
rollback_task_map(builder, replica_tasks.staging_tasks);
|
||||
|
||||
out.emplace_back(builder.build());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -10,11 +10,14 @@
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/abort_source.hh>
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
#include "raft/raft.hh"
|
||||
#include "service/endpoint_lifecycle_subscriber.hh"
|
||||
#include "service/raft/raft_group0.hh"
|
||||
#include "service/raft/raft_group0_client.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "service/topology_state_machine.hh"
|
||||
#include "db/view/view_building_state.hh"
|
||||
|
||||
@@ -64,6 +67,14 @@ public:
|
||||
future<> run();
|
||||
future<> stop();
|
||||
|
||||
void generate_tablet_migration_updates(utils::chunked_vector<canonical_mutation>& out, const service::group0_guard& guard, const locator::tablet_map& tmap, locator::global_tablet_id gid, const locator::tablet_transition_info& trinfo);
|
||||
void generate_tablet_resize_updates(utils::chunked_vector<canonical_mutation>& out, const service::group0_guard& guard, table_id table_id, const locator::tablet_map& old_tmap, const locator::tablet_map& new_tmap);
|
||||
|
||||
void abort_tasks(utils::chunked_vector<canonical_mutation>& out, const service::group0_guard& guard, table_id table_id);
|
||||
void abort_tasks(utils::chunked_vector<canonical_mutation>& out, const service::group0_guard& guard, table_id table_id, locator::tablet_replica replica, dht::token last_token);
|
||||
void rollback_aborted_tasks(utils::chunked_vector<canonical_mutation>& out, const service::group0_guard& guard, table_id table_id);
|
||||
void rollback_aborted_tasks(utils::chunked_vector<canonical_mutation>& out, const service::group0_guard& guard, table_id table_id, locator::tablet_replica replica, dht::token last_token);
|
||||
|
||||
virtual void on_up(const gms::inet_address& endpoint, locator::host_id host_id) override;
|
||||
|
||||
private:
|
||||
@@ -89,6 +100,9 @@ private:
|
||||
future<utils::chunked_vector<mutation>> update_state_after_work_is_done(const service::group0_guard& guard, const locator::tablet_replica& replica, remote_work_results results);
|
||||
};
|
||||
|
||||
void abort_view_building_tasks(const db::view::view_building_state_machine& vb_sm,
|
||||
utils::chunked_vector<canonical_mutation>& out, api::timestamp_type write_timestamp, table_id table_id, const locator::tablet_replica& replica, dht::token last_token);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
*/
|
||||
|
||||
#include "db/view/view_building_task_mutation_builder.hh"
|
||||
#include "keys/keys.hh"
|
||||
|
||||
namespace db {
|
||||
|
||||
@@ -46,6 +47,11 @@ view_building_task_mutation_builder& view_building_task_mutation_builder::set_re
|
||||
return *this;
|
||||
}
|
||||
|
||||
view_building_task_mutation_builder& view_building_task_mutation_builder::del_task(utils::UUID id) {
|
||||
_m.partition().apply_delete(*_s, clustering_key_prefix::from_single_value(*_s, data_value(id).serialize_nonnull()), tombstone{_ts, gc_clock::now()});
|
||||
return *this;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -37,6 +37,7 @@ public:
|
||||
view_building_task_mutation_builder& set_view_id(utils::UUID id, table_id view_id);
|
||||
view_building_task_mutation_builder& set_last_token(utils::UUID id, dht::token last_token);
|
||||
view_building_task_mutation_builder& set_replica(utils::UUID id, const locator::tablet_replica& replica);
|
||||
view_building_task_mutation_builder& del_task(utils::UUID id);
|
||||
|
||||
mutation build() {
|
||||
return std::move(_m);
|
||||
|
||||
@@ -163,6 +163,7 @@ static const std::unordered_set<table_id>& get_view_building_state_tables() {
|
||||
db::system_keyspace::view_building_tasks()->id(),
|
||||
db::schema_tables::v3::views()->id(),
|
||||
db::system_keyspace::view_build_status_v2()->id(),
|
||||
db::system_keyspace::tablets()->id(),
|
||||
};
|
||||
return ids;
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include "storage_service.hh"
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include <seastar/core/shard_id.hh>
|
||||
#include "db/view/view_building_coordinator.hh"
|
||||
#include "utils/disk_space_monitor.hh"
|
||||
#include "compaction/task_manager_module.hh"
|
||||
#include "gc_clock.hh"
|
||||
@@ -6836,6 +6837,9 @@ future<> storage_service::move_tablet(table_id table, dht::token token, locator:
|
||||
: locator::tablet_transition_kind::migration)
|
||||
.set_migration_task_info(last_token, std::move(migration_task_info), _feature_service)
|
||||
.build());
|
||||
if (_feature_service.view_building_coordinator) {
|
||||
db::view::abort_view_building_tasks(_view_building_state_machine, updates, write_timestamp, table, src, last_token);
|
||||
}
|
||||
|
||||
sstring reason = format("Moving tablet {} from {} to {}", gid, src, dst);
|
||||
|
||||
@@ -6925,6 +6929,9 @@ future<> storage_service::del_tablet_replica(table_id table, dht::token token, l
|
||||
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
|
||||
.set_transition(last_token, locator::choose_rebuild_transition_kind(_feature_service))
|
||||
.build());
|
||||
if (_feature_service.view_building_coordinator) {
|
||||
db::view::abort_view_building_tasks(_view_building_state_machine, updates, write_timestamp, table, dst, last_token);
|
||||
}
|
||||
|
||||
sstring reason = format("Removing replica from tablet {}, node {}", gid, dst);
|
||||
|
||||
|
||||
@@ -948,6 +948,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
auto views = ks.metadata()->views();
|
||||
tables_with_mvs.insert(tables_with_mvs.end(), views.begin(), views.end());
|
||||
for (const auto& table_or_mv : tables_with_mvs) {
|
||||
locator::tablet_map old_tablets{unimportant_init_tablet_count};
|
||||
try {
|
||||
if (!tmptr->tablets().is_base_table(table_or_mv->id())) {
|
||||
// Apply the transition only on base tables.
|
||||
@@ -955,10 +956,10 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
// the base table will coordinate the transition for the entire group.
|
||||
continue;
|
||||
}
|
||||
locator::tablet_map old_tablets = co_await tmptr->tablets().get_tablet_map(table_or_mv->id()).clone_gently();
|
||||
old_tablets = co_await tmptr->tablets().get_tablet_map(table_or_mv->id()).clone_gently();
|
||||
locator::replication_strategy_params params{repl_opts, old_tablets.tablet_count()};
|
||||
auto new_strategy = locator::abstract_replication_strategy::create_replication_strategy("NetworkTopologyStrategy", params);
|
||||
new_tablet_map = co_await new_strategy->maybe_as_tablet_aware()->reallocate_tablets(table_or_mv, tmptr, std::move(old_tablets));
|
||||
new_tablet_map = co_await new_strategy->maybe_as_tablet_aware()->reallocate_tablets(table_or_mv, tmptr, co_await old_tablets.clone_gently());
|
||||
} catch (const std::exception& e) {
|
||||
error = e.what();
|
||||
rtlogger.error("Couldn't process global_topology_request::keyspace_rf_change, error: {},"
|
||||
@@ -977,6 +978,17 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
.set_transition(last_token, locator::choose_rebuild_transition_kind(_feature_service))
|
||||
.build()
|
||||
));
|
||||
|
||||
// Calculate abandoning replica and abort view building tasks on them
|
||||
auto old_tablet_info = old_tablets.get_tablet_info(last_token);
|
||||
auto abandoning_replicas = locator::substract_sets(old_tablet_info.replicas, tablet_info.replicas);
|
||||
if (!abandoning_replicas.empty()) {
|
||||
if (abandoning_replicas.size() != 1) {
|
||||
on_internal_error(rtlogger, fmt::format("Keyspace RF abandons {} replicas for table {} and tablet id {}", abandoning_replicas.size(), table_or_mv->id(), tablet_id));
|
||||
}
|
||||
_vb_coordinator->abort_tasks(updates, guard, table_or_mv->id(), *abandoning_replicas.begin(), last_token);
|
||||
}
|
||||
|
||||
co_await coroutine::maybe_yield();
|
||||
});
|
||||
}
|
||||
@@ -1224,6 +1236,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
.set_transition(last_token, mig.kind)
|
||||
.set_migration_task_info(last_token, std::move(migration_task_info), _feature_service)
|
||||
.build());
|
||||
_vb_coordinator->abort_tasks(out, guard, mig.tablet.table, mig.src, last_token);
|
||||
}
|
||||
|
||||
void generate_repair_update(utils::chunked_vector<canonical_mutation>& out, const group0_guard& guard, const locator::global_tablet_id& gid, db_clock::time_point sched_time) {
|
||||
@@ -1591,6 +1604,9 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
.del_transition(last_token)
|
||||
.del_migration_task_info(last_token, _feature_service)
|
||||
.build());
|
||||
if (trinfo.pending_replica) {
|
||||
_vb_coordinator->rollback_aborted_tasks(updates, guard, gid.table, *trinfo.pending_replica, last_token);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case locator::tablet_transition_stage::end_migration: {
|
||||
@@ -1604,6 +1620,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
.set_replicas(last_token, trinfo.next)
|
||||
.del_migration_task_info(last_token, _feature_service)
|
||||
.build());
|
||||
_vb_coordinator->generate_tablet_migration_updates(updates, guard, tmap, gid, trinfo);
|
||||
}
|
||||
}
|
||||
break;
|
||||
@@ -1838,6 +1855,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
|
||||
// Clears the resize decision for a table.
|
||||
generate_resize_update(updates, guard, table_id, locator::resize_decision{});
|
||||
_vb_coordinator->generate_tablet_resize_updates(updates, guard, table_id, tm->tablets().get_tablet_map(table_id), new_tablet_map);
|
||||
|
||||
const auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(table_id);
|
||||
auto old_cnt = tmap.tablet_count();
|
||||
|
||||
Reference in New Issue
Block a user