Merge 'repair: Wire repair_time in system.tablets for tombstone gc' from Asias He

The repair_time in system.tablets will be updated when repair runs
successfully. We can now use it to update the repair time for tombstone
gc, i.e, when the system.tablets.repair_time is propagated, call
gc_state.update_repair_time() on the node that is the owner of the
tablet.

Since b3b3e880d3 ("repair: Reduce hints and batchlog flush"), the
repair time that could be used for tombstone gc might be smaller than
when the repair is started, so the actual repair time for tombstone gc
is returned by the repair rpc call from the repair master node.

Fixes #17507

New feature. No backport is needed.

Closes scylladb/scylladb#21896

* github.com:scylladb/scylladb:
  repair: Stop using rpc to update repair time for repairs scheduled by scheduler
  repair: Wire repair_time in system.tablets for tombstone gc
  test: Disable flush_cache_time for two tablet repair tests
  test: Introduce guarantee_repair_time_next_second helper
  repair: Return repair time for repair_service::repair_tablet
  service: Add tablet_operation.hh
This commit is contained in:
Tomasz Grabiec
2025-01-20 18:08:49 +01:00
18 changed files with 243 additions and 59 deletions

View File

@@ -6,6 +6,8 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "service/tablet_operation.hh"
namespace locator {
struct tablet_id final {
@@ -59,10 +61,14 @@ struct raft_snapshot_pull_params {
std::vector<table_id> tables;
};
struct tablet_operation_repair_result {
gc_clock::time_point repair_time;
};
verb raft_topology_cmd (raft::server_id dst_id, raft::term_t term, uint64_t cmd_index, service::raft_topology_cmd) -> service::raft_topology_cmd_result;
verb [[cancellable]] raft_pull_snapshot (raft::server_id dst_id, service::raft_snapshot_pull_params) -> service::raft_snapshot;
verb [[cancellable]] tablet_stream_data (raft::server_id dst_id, locator::global_tablet_id);
verb [[cancellable]] tablet_cleanup (raft::server_id dst_id, locator::global_tablet_id);
verb [[cancellable]] table_load_stats (raft::server_id dst_id) -> locator::load_stats;
verb [[cancellable]] tablet_repair(raft::server_id dst_id, locator::global_tablet_id);
verb [[cancellable]] tablet_repair(raft::server_id dst_id, locator::global_tablet_id) -> service::tablet_operation_repair_result;
}

View File

@@ -48,6 +48,7 @@
#include "replica/exceptions.hh"
#include "serializer.hh"
#include "db/per_partition_rate_limit_info.hh"
#include "service/tablet_operation.hh"
#include "service/topology_state_machine.hh"
#include "service/topology_guard.hh"
#include "service/raft/join_node.hh"

View File

@@ -619,7 +619,8 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu
bool hints_batchlog_flushed,
bool small_table_optimization,
std::optional<int> ranges_parallelism,
gc_clock::time_point flush_time)
gc_clock::time_point flush_time,
bool sched_by_scheduler)
: repair_task_impl(module, id, 0, "shard", keyspace, "", "", parent_id_.uuid(), reason_)
, rs(repair)
, db(repair.get_db())
@@ -639,6 +640,7 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu
, _small_table_optimization(small_table_optimization)
, _user_ranges_parallelism(ranges_parallelism ? std::optional<semaphore>(semaphore(*ranges_parallelism)) : std::nullopt)
, _flush_time(flush_time)
, sched_by_scheduler(sched_by_scheduler)
{
rlogger.debug("repair[{}]: Setting user_ranges_parallelism to {}", global_repair_id.uuid(),
_user_ranges_parallelism ? std::to_string(_user_ranges_parallelism->available_units()) : "unlimited");
@@ -2400,7 +2402,7 @@ future<> repair_service::repair_tablets(repair_uniq_id rid, sstring keyspace_nam
}
// It is called by the repair_tablet rpc verb to repair the given tablet
future<> repair_service::repair_tablet(gms::gossip_address_map& addr_map, locator::tablet_metadata_guard& guard, locator::global_tablet_id gid) {
future<gc_clock::time_point> repair_service::repair_tablet(gms::gossip_address_map& addr_map, locator::tablet_metadata_guard& guard, locator::global_tablet_id gid) {
auto id = _repair_module->new_repair_uniq_id();
rlogger.debug("repair[{}]: Starting tablet repair global_tablet_id={}", id.uuid(), gid);
auto& db = get_db().local();
@@ -2409,7 +2411,7 @@ future<> repair_service::repair_tablet(gms::gossip_address_map& addr_map, locato
auto t = db.get_tables_metadata().get_table_if_exists(table_id);
if (!t) {
co_return;
co_return gc_clock::now();
}
auto& tmap = guard.get_tablet_map();
auto s = t->schema();
@@ -2438,16 +2440,21 @@ future<> repair_service::repair_tablet(gms::gossip_address_map& addr_map, locato
auto ranges_parallelism = std::nullopt;
auto start = std::chrono::steady_clock::now();
task_metas.push_back(tablet_repair_task_meta{keyspace_name, table_name, table_id, master_shard_id, range, repair_neighbors(nodes, shards), replicas});
auto task = co_await _repair_module->make_and_start_task<repair::tablet_repair_task_impl>({}, id, keyspace_name, table_names, streaming::stream_reason::repair, std::move(task_metas), ranges_parallelism);
auto task_impl_ptr = seastar::make_shared<repair::tablet_repair_task_impl>(_repair_module, id, keyspace_name, table_names, streaming::stream_reason::repair, std::move(task_metas), ranges_parallelism);
task_impl_ptr->sched_by_scheduler = true;
auto task = co_await _repair_module->make_task(task_impl_ptr, {});
task->start();
co_await task->done();
auto flush_time = task_impl_ptr->get_flush_time();
auto delay = utils::get_local_injector().inject_parameter<uint32_t>("tablet_repair_add_delay_in_ms");
if (delay) {
rlogger.debug("Execute tablet_repair_add_delay_in_ms={}", *delay);
co_await seastar::sleep(std::chrono::milliseconds(*delay));
}
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now()- start);
rlogger.info("repair[{}]: Finished tablet repair for table={}.{} range={} duration={} replicas={} global_tablet_id={}",
id.uuid(), keyspace_name, table_name, range, duration, replicas, gid);
rlogger.info("repair[{}]: Finished tablet repair for table={}.{} range={} duration={} replicas={} global_tablet_id={} flush_time={}",
id.uuid(), keyspace_name, table_name, range, duration, replicas, gid, flush_time);
co_return flush_time;
}
tasks::is_user_task repair::tablet_repair_task_impl::is_user_task() const noexcept {
@@ -2523,7 +2530,8 @@ future<> repair::tablet_repair_task_impl::run() {
});
auto parent_shard = this_shard_id();
rs.container().invoke_on_all([&idx, id, metas = _metas, parent_data, reason = _reason, tables = _tables, ranges_parallelism = _ranges_parallelism, parent_shard] (repair_service& rs) -> future<> {
std::vector<gc_clock::time_point> flush_times(smp::count);
rs.container().invoke_on_all([&idx, &flush_times, id, metas = _metas, parent_data, reason = _reason, tables = _tables, sched_by_scheduler = sched_by_scheduler, ranges_parallelism = _ranges_parallelism, parent_shard] (repair_service& rs) -> future<> {
std::exception_ptr error;
for (auto& m : metas) {
if (m.master_shard_id != this_shard_id()) {
@@ -2557,9 +2565,9 @@ future<> repair::tablet_repair_task_impl::run() {
auto task_impl_ptr = seastar::make_shared<repair::shard_repair_task_impl>(rs._repair_module, tasks::task_id::create_random_id(),
m.keyspace_name, rs, erm, std::move(ranges), std::move(table_ids), id, std::move(data_centers), std::move(hosts),
std::move(ignore_nodes), reason, hints_batchlog_flushed, small_table_optimization, ranges_parallelism, flush_time);
std::move(ignore_nodes), reason, hints_batchlog_flushed, small_table_optimization, ranges_parallelism, flush_time, sched_by_scheduler);
task_impl_ptr->neighbors = std::move(neighbors);
auto task = co_await rs._repair_module->make_task(std::move(task_impl_ptr), parent_data);
auto task = co_await rs._repair_module->make_task(task_impl_ptr, parent_data);
task->start();
auto res = co_await coroutine::as_future(task->done());
if (res.failed()) {
@@ -2577,11 +2585,19 @@ future<> repair::tablet_repair_task_impl::run() {
error = std::move(ep);
}
}
auto current = flush_times[this_shard_id()];
auto time = task_impl_ptr->get_flush_time();
flush_times[this_shard_id()] = current == gc_clock::time_point() ? time : std::min(current, time);
}
if (error) {
co_await coroutine::return_exception_ptr(std::move(error));
}
}).get();
for (auto& time : flush_times) {
if (time != gc_clock::time_point()) {
_flush_time = _flush_time == gc_clock::time_point() ? time : std::min(_flush_time, time);
}
}
auto duration = std::chrono::duration<float>(std::chrono::steady_clock::now() - start_time);
rlogger.info("repair[{}]: Finished user-requested repair for tablet keyspace={} tables={} repair_id={} tablets_repaired={} duration={}",
id.uuid(), _keyspace, _tables, id.id, _metas.size(), duration);

View File

@@ -2992,6 +2992,15 @@ private:
if (!_shard_task.hints_batchlog_flushed()) {
co_return;
}
// The tablet repair time for tombstone gc will be updated when the
// system.tablet.repair_time is updated.
if (_is_tablet && _shard_task.sched_by_scheduler) {
rlogger.debug("repair[{}]: Skipped to update system.repair_history for tablet repair scheduled by scheduler total_rf={} repaired_replicas={} local={} peers={}",
_shard_task.global_repair_id.uuid(), _shard_task.total_rf, repaired_replicas, my_address, _all_live_peer_nodes);
co_return;
}
repair_service& rs = _shard_task.rs;
std::optional<gc_clock::time_point> repair_time_opt = co_await rs.update_history(_shard_task.global_repair_id.uuid(), _table_id, _range, _start_time, _is_tablet);
if (!repair_time_opt) {

View File

@@ -177,7 +177,7 @@ private:
public:
future<> repair_tablets(repair_uniq_id id, sstring keyspace_name, std::vector<sstring> table_names, bool primary_replica_only = true, dht::token_range_vector ranges_specified = {}, std::vector<sstring> dcs = {}, std::unordered_set<locator::host_id> hosts = {}, std::unordered_set<locator::host_id> ignore_nodes = {}, std::optional<int> ranges_parallelism = std::nullopt);
future<> repair_tablet(gms::gossip_address_map& addr_map, locator::tablet_metadata_guard& guard, locator::global_tablet_id gid);
future<gc_clock::time_point> repair_tablet(gms::gossip_address_map& addr_map, locator::tablet_metadata_guard& guard, locator::global_tablet_id gid);
private:
future<repair_update_system_table_response> repair_update_system_table_handler(

View File

@@ -113,6 +113,9 @@ private:
optimized_optional<abort_source::subscription> _abort_subscription;
std::optional<int> _ranges_parallelism;
size_t _metas_size = 0;
gc_clock::time_point _flush_time;
public:
bool sched_by_scheduler = false;
public:
tablet_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, sstring keyspace, std::vector<sstring> tables, streaming::stream_reason reason, std::vector<tablet_repair_task_meta> metas, std::optional<int> ranges_parallelism)
: repair_task_impl(module, id.uuid(), id.id, "keyspace", keyspace, "", "", tasks::task_id::create_null_id(), reason)
@@ -127,6 +130,8 @@ public:
return tasks::is_abortable(!_abort_subscription);
}
gc_clock::time_point get_flush_time() const { return _flush_time; }
tasks::is_user_task is_user_task() const noexcept override;
virtual void release_resources() noexcept override;
private:
@@ -170,6 +175,8 @@ private:
std::optional<semaphore> _user_ranges_parallelism;
uint64_t _ranges_complete = 0;
gc_clock::time_point _flush_time;
public:
bool sched_by_scheduler = false;
public:
shard_repair_task_impl(tasks::task_manager::module_ptr module,
tasks::task_id id,
@@ -186,10 +193,12 @@ public:
bool hints_batchlog_flushed,
bool small_table_optimization,
std::optional<int> ranges_parallelism,
gc_clock::time_point flush_time);
gc_clock::time_point flush_time,
bool sched_by_scheduler = false);
void check_failed_ranges();
void check_in_abort_or_shutdown();
repair_neighbors get_repair_neighbors(const dht::token_range& range);
gc_clock::time_point get_flush_time() const { return _flush_time; }
void update_statistics(const repair_stats& stats) {
_stats.add(stats);
}

View File

@@ -24,6 +24,7 @@
#include "sstables/sstable_set.hh"
#include "dht/token.hh"
#include "mutation/async_utils.hh"
#include "compaction/compaction_manager.hh"
namespace replica {
@@ -493,7 +494,7 @@ void update_tablet_metadata_change_hint(locator::tablet_metadata_change_hint& hi
namespace {
tablet_id process_one_row(table_id table, tablet_map& map, tablet_id tid, const cql3::untyped_result_set_row& row) {
tablet_id process_one_row(replica::database* db, table_id table, tablet_map& map, tablet_id tid, const cql3::untyped_result_set_row& row) {
tablet_replica_set tablet_replicas;
if (row.has("replicas")) {
tablet_replicas = deserialize_replica_set(row.get_view("replicas"));
@@ -505,8 +506,10 @@ tablet_id process_one_row(table_id table, tablet_map& map, tablet_id tid, const
}
db_clock::time_point repair_time;
bool update_repair_time = false;
if (row.has("repair_time")) {
repair_time = row.get_as<db_clock::time_point>("repair_time");
update_repair_time = true;
}
locator::tablet_task_info repair_task_info;
@@ -542,6 +545,21 @@ tablet_id process_one_row(table_id table, tablet_map& map, tablet_id tid, const
map.set_tablet(tid, tablet_info{std::move(tablet_replicas), repair_time, repair_task_info, migration_task_info});
if (update_repair_time && db) {
auto myid = db->get_token_metadata().get_my_id();
auto range = map.get_token_range(tid);
auto& info = map.get_tablet_info(tid);
for (auto r : info.replicas) {
if (r.host == myid) {
auto& gc_state = db->get_compaction_manager().get_tombstone_gc_state();
gc_state.insert_pending_repair_time_update(table, range, to_gc_clock(repair_time), r.shard);
tablet_logger.debug("Insert pending repair time for tombstone gc: table={} tablet={} range={} repair_time={}",
table, tid, range, repair_time);
break;
}
}
}
auto persisted_last_token = dht::token::from_int64(row.get_as<int64_t>("last_token"));
auto current_last_token = map.get_last_token(tid);
if (current_last_token != persisted_last_token) {
@@ -562,7 +580,7 @@ struct tablet_metadata_builder {
};
std::optional<active_tablet_map> current;
void process_row(const cql3::untyped_result_set_row& row) {
void process_row(const cql3::untyped_result_set_row& row, replica::database* db) {
auto table = table_id(row.get_as<utils::UUID>("table_id"));
if (!current || current->table != table) {
@@ -591,7 +609,7 @@ struct tablet_metadata_builder {
}
}
current->tid = process_one_row(current->table, current->map, current->tid, row);
current->tid = process_one_row(db, current->table, current->map, current->tid, row);
}
void on_end_of_stream() {
@@ -610,7 +628,7 @@ future<tablet_metadata> read_tablet_metadata(cql3::query_processor& qp) {
try {
co_await qp.query_internal("select * from system.tablets",
[&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
builder.process_row(row);
builder.process_row(row, qp.db().real_database_ptr());
return make_ready_future<stop_iteration>(stop_iteration::no);
});
} catch (...) {
@@ -669,7 +687,7 @@ do_update_tablet_metadata_partition(cql3::query_processor& qp, tablet_metadata&
{data_value(hint.table_id.uuid())},
1000,
[&] (const cql3::untyped_result_set_row& row) -> future<stop_iteration> {
builder.process_row(row);
builder.process_row(row, qp.db().real_database_ptr());
return make_ready_future<stop_iteration>(stop_iteration::no);
});
if (builder.current) {
@@ -680,7 +698,7 @@ do_update_tablet_metadata_partition(cql3::query_processor& qp, tablet_metadata&
}
static future<>
do_update_tablet_metadata_rows(cql3::query_processor& qp, tablet_map& tmap, const tablet_metadata_change_hint::table_hint& hint) {
do_update_tablet_metadata_rows(replica::database& db, cql3::query_processor& qp, tablet_map& tmap, const tablet_metadata_change_hint::table_hint& hint) {
for (const auto token : hint.tokens) {
auto res = co_await qp.execute_internal(
"select * from system.tablets where table_id = ? and last_token = ?",
@@ -692,19 +710,19 @@ do_update_tablet_metadata_rows(cql3::query_processor& qp, tablet_map& tmap, cons
throw std::runtime_error("Failed to update tablet metadata: updated row is empty");
} else {
tmap.clear_tablet_transition_info(tid);
process_one_row(hint.table_id, tmap, tid, res->one());
process_one_row(&db, hint.table_id, tmap, tid, res->one());
}
}
}
future<> update_tablet_metadata(cql3::query_processor& qp, tablet_metadata& tm, const locator::tablet_metadata_change_hint& hint) {
future<> update_tablet_metadata(replica::database& db, cql3::query_processor& qp, tablet_metadata& tm, const locator::tablet_metadata_change_hint& hint) {
try {
for (const auto& [_, table_hint] : hint.tables) {
if (table_hint.tokens.empty()) {
co_await do_update_tablet_metadata_partition(qp, tm, table_hint);
} else {
co_await tm.mutate_tablet_map_async(table_hint.table_id, [&] (tablet_map& tmap) -> future<> {
co_await do_update_tablet_metadata_rows(qp, tmap, table_hint);
co_await do_update_tablet_metadata_rows(db, qp, tmap, table_hint);
});
}
}

View File

@@ -93,7 +93,7 @@ future<std::unordered_set<locator::host_id>> read_required_hosts(cql3::query_pro
///
/// The hint is used to determine what has changed and only reload the changed
/// parts from disk, updating the passed-in metadata in-place accordingly.
future<> update_tablet_metadata(cql3::query_processor&, locator::tablet_metadata&, const locator::tablet_metadata_change_hint&);
future<> update_tablet_metadata(replica::database& db, cql3::query_processor&, locator::tablet_metadata&, const locator::tablet_metadata_change_hint&);
/// Reads tablet metadata from system.tablets in the form of mutations.
future<std::vector<canonical_mutation>> read_tablet_mutations(seastar::sharded<database>&);

View File

@@ -761,7 +761,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
// We want to update the tablet metadata incrementally, so copy it
// from the current token metadata and update only the changed parts.
tablets = co_await get_token_metadata().tablets().copy();
co_await replica::update_tablet_metadata(_qp, *tablets, *hint.tablets_hint);
co_await replica::update_tablet_metadata(_db.local(), _qp, *tablets, *hint.tablets_hint);
} else {
tablets = co_await replica::read_tablet_metadata(_qp);
}
@@ -3230,6 +3230,9 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
for (auto id : open_sessions) {
session_mgr.create_session(id);
}
auto& gc_state = db.get_compaction_manager().get_tombstone_gc_state();
co_await gc_state.flush_pending_repair_time_update(db);
});
} catch (...) {
// applying the changes on all shards should never fail
@@ -5396,7 +5399,7 @@ void storage_service::on_update_tablet_metadata(const locator::tablet_metadata_c
future<> storage_service::load_tablet_metadata(const locator::tablet_metadata_change_hint& hint) {
return mutate_token_metadata([this, &hint] (mutable_token_metadata_ptr tmptr) -> future<> {
if (hint) {
co_await replica::update_tablet_metadata(_qp, tmptr->tablets(), hint);
co_await replica::update_tablet_metadata(_db.local(), _qp, tmptr->tablets(), hint);
} else {
tmptr->set_tablets(co_await replica::read_tablet_metadata(_qp));
}
@@ -5920,9 +5923,9 @@ inet_address storage_service::host2ip(locator::host_id host) const {
// may receive stale triggers started in the previous stage, so that those nodes will
// see tablet metadata which reflects group0 state. This will cut-off stale triggers
// as soon as the coordinator moves to the next stage.
future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet,
future<tablet_operation_result> storage_service::do_tablet_operation(locator::global_tablet_id tablet,
sstring op_name,
std::function<future<>(locator::tablet_metadata_guard&)> op) {
std::function<future<tablet_operation_result>(locator::tablet_metadata_guard&)> op) {
// The coordinator may not execute global token metadata barrier before triggering the operation, so we need
// a barrier here to see the token metadata which is at least as recent as that of the sender.
auto& raft_server = _group0->group0_server();
@@ -5930,8 +5933,8 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet,
if (_tablet_ops.contains(tablet)) {
rtlogger.debug("{} retry joining with existing session for tablet {}", op_name, tablet);
co_await _tablet_ops[tablet].done.get_future();
co_return;
auto result = co_await _tablet_ops[tablet].done.get_future();
co_return result;
}
locator::tablet_metadata_guard guard(_db.local().find_column_family(tablet.table), tablet);
@@ -5941,18 +5944,19 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet,
});
auto async_gate_holder = _async_gate.hold();
promise<> p;
promise<tablet_operation_result> p;
_tablet_ops.emplace(tablet, tablet_operation {
op_name, seastar::shared_future<>(p.get_future())
op_name, seastar::shared_future<tablet_operation_result>(p.get_future())
});
auto erase_registry_entry = seastar::defer([&] {
_tablet_ops.erase(tablet);
});
try {
co_await op(guard);
p.set_value();
auto result = co_await op(guard);
p.set_value(result);
rtlogger.debug("{} for tablet migration of {} successful", op_name, tablet);
co_return result;
} catch (...) {
p.set_exception(std::current_exception());
rtlogger.warn("{} for tablet migration of {} failed: {}", op_name, tablet, std::current_exception());
@@ -5960,8 +5964,8 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet,
}
}
future<> storage_service::repair_tablet(locator::global_tablet_id tablet) {
return do_tablet_operation(tablet, "Repair", [this, tablet] (locator::tablet_metadata_guard& guard) -> future<> {
future<service::tablet_operation_repair_result> storage_service::repair_tablet(locator::global_tablet_id tablet) {
auto result = co_await do_tablet_operation(tablet, "Repair", [this, tablet] (locator::tablet_metadata_guard& guard) -> future<tablet_operation_result> {
slogger.debug("Executing repair for tablet={}", tablet);
auto& tmap = guard.get_tablet_map();
auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet);
@@ -5983,11 +5987,17 @@ future<> storage_service::repair_tablet(locator::global_tablet_id tablet) {
utils::get_local_injector().inject("repair_tablet_fail_on_rpc_call",
[] { throw std::runtime_error("repair_tablet failed due to error injection"); });
co_await do_with_repair_service(_repair, [&] (repair_service& local_repair) {
return local_repair.repair_tablet(_address_map, guard, tablet);
service::tablet_operation_repair_result result;
co_await do_with_repair_service(_repair, [&] (repair_service& local_repair) -> future<> {
auto time = co_await local_repair.repair_tablet(_address_map, guard, tablet);
result = service::tablet_operation_repair_result{time};
});
co_return;
co_return result;
});
if (std::holds_alternative<service::tablet_operation_repair_result>(result)) {
co_return std::get<service::tablet_operation_repair_result>(result);
}
on_internal_error(slogger, "Got wrong tablet_operation_repair_result");
}
future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id tablet, locator::tablet_replica leaving, locator::tablet_replica pending) {
@@ -6035,7 +6045,7 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id
// Streams data to the pending tablet replica of a given tablet on this node.
// The source tablet replica is determined from the current transition info of the tablet.
future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
return do_tablet_operation(tablet, "Streaming", [this, tablet] (locator::tablet_metadata_guard& guard) -> future<> {
co_await do_tablet_operation(tablet, "Streaming", [this, tablet] (locator::tablet_metadata_guard& guard) -> future<tablet_operation_result> {
auto tm = guard.get_token_metadata();
auto& tmap = guard.get_tablet_map();
auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet);
@@ -6191,7 +6201,7 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
return table.maybe_split_compaction_group_of(tablet.tablet);
});
co_return;
co_return tablet_operation_result();
});
}
@@ -6201,7 +6211,7 @@ future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) {
_exit(1);
});
return do_tablet_operation(tablet, "Cleanup", [this, tablet] (locator::tablet_metadata_guard& guard) {
co_await do_tablet_operation(tablet, "Cleanup", [this, tablet] (locator::tablet_metadata_guard& guard) -> future<tablet_operation_result> {
shard_id shard;
{
@@ -6237,10 +6247,11 @@ future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) {
throw std::runtime_error(fmt::format("Tablet {} stage is not at cleanup/cleanup_target", tablet));
}
}
return _db.invoke_on(shard, [tablet, &sys_ks = _sys_ks] (replica::database& db) {
co_await _db.invoke_on(shard, [tablet, &sys_ks = _sys_ks] (replica::database& db) {
auto& table = db.find_column_family(tablet.table);
return table.cleanup_tablet(db, sys_ks.local(), tablet.tablet);
});
co_return tablet_operation_result();
});
}
@@ -7185,8 +7196,9 @@ void storage_service::init_messaging_service() {
});
});
ser::storage_service_rpc_verbs::register_tablet_repair(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet) {
return handle_raft_rpc(dst_id, [tablet] (auto& ss) {
return ss.repair_tablet(tablet);
return handle_raft_rpc(dst_id, [tablet] (auto& ss) -> future<service::tablet_operation_repair_result> {
auto res = co_await ss.repair_tablet(tablet);
co_return res;
});
});
ser::storage_service_rpc_verbs::register_tablet_cleanup(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet) {

View File

@@ -47,6 +47,7 @@
#include "raft/server.hh"
#include "service/topology_state_machine.hh"
#include "service/tablet_allocator.hh"
#include "service/tablet_operation.hh"
#include "utils/user_provided_param.hh"
#include "utils/sequenced_set.hh"
@@ -151,7 +152,7 @@ private:
struct tablet_operation {
sstring name;
shared_future<> done;
shared_future<service::tablet_operation_result> done;
};
using tablet_op_registry = std::unordered_map<locator::global_tablet_id, tablet_operation>;
@@ -198,10 +199,10 @@ private:
future<> node_ops_abort(node_ops_id ops_uuid);
void node_ops_signal_abort(std::optional<node_ops_id> ops_uuid);
future<> node_ops_abort_thread();
future<> do_tablet_operation(locator::global_tablet_id tablet,
future<service::tablet_operation_result> do_tablet_operation(locator::global_tablet_id tablet,
sstring op_name,
std::function<future<>(locator::tablet_metadata_guard&)> op);
future<> repair_tablet(locator::global_tablet_id);
std::function<future<service::tablet_operation_result>(locator::tablet_metadata_guard&)> op);
future<service::tablet_operation_repair_result> repair_tablet(locator::global_tablet_id);
future<> stream_tablet(locator::global_tablet_id);
// Clones storage of leaving tablet into pending one. Done in the context of intra-node migration,
// when both of which sit on the same node. So all the movement is local.

View File

@@ -0,0 +1,21 @@
// Copyright (C) 2024-present ScyllaDB
// SPDX-License-Identifier: AGPL-3.0-or-later
#pragma once
#include <variant>
#include "gc_clock.hh"
namespace service {
struct tablet_operation_empty_result {
};
struct tablet_operation_repair_result {
gc_clock::time_point repair_time;
};
using tablet_operation_result = std::variant<tablet_operation_empty_result, tablet_operation_repair_result>;
}

View File

@@ -46,6 +46,7 @@
#include "service/raft/raft_group0.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/tablet_allocator.hh"
#include "service/tablet_operation.hh"
#include "service/topology_state_machine.hh"
#include "topology_mutation.hh"
#include "utils/assert.hh"
@@ -1159,6 +1160,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
background_action_holder cleanup;
background_action_holder repair;
std::unordered_map<locator::tablet_transition_stage, background_action_holder> barriers;
// Record the repair_time returned by the repair_tablet rpc call
db_clock::time_point repair_time;
};
std::unordered_map<locator::global_tablet_id, tablet_migration_state> _tablets;
@@ -1571,10 +1574,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
auto dst = primary.host;
auto tablet = gid;
rtlogger.info("Initiating tablet repair host={} tablet={}", dst, gid);
co_await ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
auto res = co_await ser::storage_service_rpc_verbs::send_tablet_repair(&_messaging,
dst, _as, raft::server_id(dst.uuid()), gid);
auto duration = std::chrono::duration<float>(db_clock::now() - sched_time);
rtlogger.info("Finished tablet repair host={} tablet={} duration={}", dst, tablet, duration);
auto& tablet_state = _tablets[tablet];
tablet_state.repair_time = db_clock::from_time_t(gc_clock::to_time_t(res.repair_time));
rtlogger.info("Finished tablet repair host={} tablet={} duration={} repair_time={}",
dst, tablet, duration, res.repair_time);
})) {
auto& tinfo = tmap.get_tablet_info(gid.tablet);
bool valid = tinfo.repair_task_info.is_valid();
@@ -1584,7 +1590,10 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.del_repair_task_info(last_token)
.del_session(last_token);
if (valid) {
auto time = tinfo.repair_task_info.sched_time;
auto sched_time = tinfo.repair_task_info.sched_time;
auto time = tablet_state.repair_time;
rtlogger.debug("Set tablet repair time sched_time={} return_time={} set_time={}",
sched_time, tablet_state.repair_time, time);
update.set_repair_time(last_token, time);
}
updates.emplace_back(update.build());

View File

@@ -78,7 +78,7 @@ void verify_tablet_metadata_update(cql_test_env& env, tablet_metadata& tm, std::
update_tablet_metadata_change_hint(hint, mut);
}
update_tablet_metadata(env.local_qp(), tm, hint).get();
update_tablet_metadata(db, env.local_qp(), tm, hint).get();
auto tm_reload = read_tablet_metadata(env.local_qp()).get();
BOOST_REQUIRE_EQUAL(tm, tm_reload);

View File

@@ -182,7 +182,7 @@ static future<> test_basic_operations(app_template& app) {
const auto full_reload_duration = std::chrono::duration<double, std::milli>(end_full_reload - start_full_reload);
const auto start_partial_reload = clk::now();
update_tablet_metadata(e.local_qp(), tm, hint).get();
update_tablet_metadata(e.local_db(), e.local_qp(), tm, hint).get();
const auto end_partial_reload = clk::now();
const auto partial_reload_duration = std::chrono::duration<double, std::milli>(end_partial_reload - start_partial_reload);

View File

@@ -28,11 +28,13 @@ async def load_tablet_repair_time(cql, hosts, table_id):
return repair_time_map
async def create_table_insert_data_for_repair(manager, rf = 3 , tablets = 8, fast_stats_refresh = True, nr_keys = 256):
async def create_table_insert_data_for_repair(manager, rf = 3 , tablets = 8, fast_stats_refresh = True, nr_keys = 256, disable_flush_cache_time = False):
if fast_stats_refresh:
config = {'error_injections_at_startup': ['short_tablet_stats_refresh_interval']}
else:
config = {}
if disable_flush_cache_time:
config.update({'repair_hints_batchlog_flush_cache_time_in_ms': 0})
servers = [await manager.server_add(config=config), await manager.server_add(config=config), await manager.server_add(config=config)]
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {{'class': 'NetworkTopologyStrategy', "

View File

@@ -7,7 +7,7 @@
from test.pylib.manager_client import ManagerClient
from test.topology.conftest import skip_mode
from test.pylib.repair import load_tablet_repair_time, create_table_insert_data_for_repair, get_tablet_task_id
from test.pylib.rest_client import inject_error_one_shot
from test.pylib.rest_client import inject_error_one_shot, read_barrier
import pytest
import asyncio
@@ -30,9 +30,14 @@ async def inject_error_off(manager, error_name, servers):
errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)
async def guarantee_repair_time_next_second():
# The repair time granularity is seconds. This ensures the repair time is
# different than the previous one.
await asyncio.sleep(1)
@pytest.mark.asyncio
async def test_tablet_manual_repair(manager: ManagerClient):
servers, cql, hosts, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=False)
servers, cql, hosts, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=False, disable_flush_cache_time=True)
token = -1
start = time.time()
@@ -41,9 +46,7 @@ async def test_tablet_manual_repair(manager: ManagerClient):
map1 = await load_tablet_repair_time(cql, hosts[0:1], table_id)
logging.info(f'map1={map1} duration={duration}')
# The repair time granularity is seconds. This makes sure the second repair time
# is different than the previous one.
await asyncio.sleep(1)
await guarantee_repair_time_next_second()
start = time.time()
await manager.api.tablet_repair(servers[0].ip_addr, "test", "test", token)
@@ -57,12 +60,46 @@ async def test_tablet_manual_repair(manager: ManagerClient):
assert t2 > t1
@pytest.mark.asyncio
async def test_tombstone_gc_insert_flush(manager: ManagerClient):
servers, cql, hosts, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=False, disable_flush_cache_time=True)
token = "all"
logs = []
for s in servers:
await manager.api.set_logger_level(s.ip_addr, "database", "debug")
await manager.api.set_logger_level(s.ip_addr, "tablets", "debug")
logs.append(await manager.server_open_log(s.server_id))
await manager.api.tablet_repair(servers[0].ip_addr, "test", "test", token)
timeout = 600
deadline = time.time() + timeout
while True:
done = True
for s in servers:
await read_barrier(manager.api, s.ip_addr)
for log in logs:
inserts = await log.grep(rf'.*Insert pending repair time for tombstone gc: table={table_id}.*')
flushes = await log.grep(rf'.*Flush pending repair time for tombstone gc: table={table_id}.*')
logging.info(f'{inserts=} {flushes=}');
logging.info(f'{len(inserts)=} {len(flushes)=}');
ok = len(inserts) == len(flushes) and len(inserts) > 0
if not ok:
done = False
if done:
break
else:
assert time.time() < deadline
@pytest.mark.asyncio
async def test_tablet_manual_repair_all_tokens(manager: ManagerClient):
servers, cql, hosts, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=False)
servers, cql, hosts, table_id = await create_table_insert_data_for_repair(manager, fast_stats_refresh=False, disable_flush_cache_time=True)
token = "all"
now = datetime.datetime.utcnow()
map1 = await load_tablet_repair_time(cql, hosts[0:1], table_id)
await guarantee_repair_time_next_second()
await manager.api.tablet_repair(servers[0].ip_addr, "test", "test", token)
map2 = await load_tablet_repair_time(cql, hosts[0:1], table_id)
logging.info(f'{map1=} {map2=}')

View File

@@ -18,6 +18,8 @@
#include "replica/database.hh"
#include "data_dictionary/data_dictionary.hh"
#include "gms/feature_service.hh"
#include "compaction/compaction_manager.hh"
#include <seastar/coroutine/maybe_yield.hh>
extern logging::logger dblog;
@@ -217,6 +219,30 @@ void tombstone_gc_state::update_repair_time(table_id id, const dht::token_range&
*m += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time);
}
void tombstone_gc_state::insert_pending_repair_time_update(table_id id,
const dht::token_range& range, gc_clock::time_point repair_time, shard_id shard) {
_pending_updates[id].push_back(range_repair_time{range, repair_time, shard});
}
future<> tombstone_gc_state::flush_pending_repair_time_update(replica::database& db) {
auto pending_updates = std::exchange(_pending_updates, {});
co_await db.container().invoke_on_all([&pending_updates] (replica::database &localdb) -> future<> {
auto& gc_state = localdb.get_compaction_manager().get_tombstone_gc_state();
for (auto& x : pending_updates) {
auto& table = x.first;
for (auto& update : x.second) {
co_await coroutine::maybe_yield();
if (update.shard == this_shard_id()) {
gc_state.update_repair_time(table, update.range, update.time);
dblog.debug("Flush pending repair time for tombstone gc: table={} range={} repair_time={}",
table, update.range, update.time);
}
}
}
});
};
void tombstone_gc_state::update_group0_refresh_time(gc_clock::time_point refresh_time) {
auto m = get_or_create_group0_gc_time();
if (!m) {

View File

@@ -13,6 +13,11 @@
#include "dht/token.hh"
#include "schema/schema_fwd.hh"
#include "interval.hh"
#include "utils/chunked_vector.hh"
namespace replica {
class database;
}
namespace dht {
@@ -49,6 +54,12 @@ class tombstone_gc_options;
using gc_time_min_source = std::function<gc_clock::time_point(const table_id&)>;
struct range_repair_time {
dht::token_range range;
gc_clock::time_point time;
shard_id shard;
};
class tombstone_gc_state {
gc_time_min_source _gc_min_source;
per_table_history_maps* _reconcile_history_maps;
@@ -62,6 +73,9 @@ class tombstone_gc_state {
[[nodiscard]] gc_clock::time_point get_gc_before_for_group0(schema_ptr s) const;
private:
std::unordered_map<table_id, utils::chunked_vector<range_repair_time>> _pending_updates;
public:
tombstone_gc_state() = delete;
explicit tombstone_gc_state(per_table_history_maps* maps) noexcept : _reconcile_history_maps(maps) {}
@@ -94,6 +108,9 @@ public:
// returns a tombstone_gc_state copy with the commitlog check disabled (i.e.) without _gc_min_source.
[[nodiscard]] tombstone_gc_state with_commitlog_check_disabled() const { return tombstone_gc_state(_reconcile_history_maps); }
void insert_pending_repair_time_update(table_id id, const dht::token_range& range, gc_clock::time_point repair_time, shard_id shard);
future<> flush_pending_repair_time_update(replica::database& db);
};
std::map<sstring, sstring> get_default_tombstonesonte_gc_mode(data_dictionary::database db, sstring ks_name);