diff --git a/db/config.cc b/db/config.cc index 888391340a..e188028707 100644 --- a/db/config.cc +++ b/db/config.cc @@ -979,6 +979,7 @@ db::config::config(std::shared_ptr exts) " This can reduce the amount of data repair has to process.") , repair_partition_count_estimation_ratio(this, "repair_partition_count_estimation_ratio", liveness::LiveUpdate, value_status::Used, 0.1, "Specify the fraction of partitions written by repair out of the total partitions. The value is currently only used for bloom filter estimation. Value is between 0 and 1.") + , repair_hints_batchlog_flush_cache_time_in_ms(this, "repair_hints_batchlog_flush_cache_time_in_ms", liveness::LiveUpdate, value_status::Used, 60 * 1000, "The repair hints and batchlog flush request cache time. Setting 0 disables the flush cache. The cache reduces the number of hints and batchlog flushes during repair when tombstone_gc is set to repair mode. When the cache is on, a slightly smaller repair time will be used with the benefits of dropped hints and batchlog flushes.") , ring_delay_ms(this, "ring_delay_ms", value_status::Used, 30 * 1000, "Time a node waits to hear from other nodes before joining the ring in milliseconds. Same as -Dcassandra.ring_delay_ms in cassandra.") , shadow_round_ms(this, "shadow_round_ms", value_status::Used, 300 * 1000, "The maximum gossip shadow round time. Can be used to reduce the gossip feature check time during node boot up.") , fd_max_interval_ms(this, "fd_max_interval_ms", value_status::Used, 2 * 1000, "The maximum failure_detector interval time in milliseconds. Interval larger than the maximum will be ignored. Larger cluster may need to increase the default.") diff --git a/db/config.hh b/db/config.hh index 94dc882260..8d6e14451e 100644 --- a/db/config.hh +++ b/db/config.hh @@ -341,6 +341,7 @@ public: named_value enable_compacting_data_for_streaming_and_repair; named_value enable_tombstone_gc_for_streaming_and_repair; named_value repair_partition_count_estimation_ratio; + named_value repair_hints_batchlog_flush_cache_time_in_ms; named_value ring_delay_ms; named_value shadow_round_ms; named_value fd_max_interval_ms; diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh index eba12b42b7..2d6f23be53 100644 --- a/idl/partition_checksum.idl.hh +++ b/idl/partition_checksum.idl.hh @@ -145,6 +145,7 @@ struct repair_flush_hints_batchlog_request { }; struct repair_flush_hints_batchlog_response { + gc_clock::time_point flush_time [[version 6.2]]; }; verb [[with_client_info]] repair_update_system_table (repair_update_system_table_request req [[ref]]) -> repair_update_system_table_response; diff --git a/repair/repair.cc b/repair/repair.cc index 6965258f89..ee94b07fab 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -370,10 +370,11 @@ static future> get_hosts_participating_in_repair( co_return std::list(participating_hosts.begin(), participating_hosts.end()); } -static future flush_hints(repair_service& rs, repair_uniq_id id, replica::database& db, + +future> repair_service::flush_hints(repair_uniq_id id, sstring keyspace, std::vector cfs, - std::unordered_set ignore_nodes, - std::list participants) { + std::unordered_set ignore_nodes, std::list participants) { + auto& db = get_db().local(); auto uuid = id.uuid(); bool needs_flush_before_repair = false; if (db.features().tombstone_gc_options) { @@ -388,6 +389,7 @@ static future flush_hints(repair_service& rs, repair_uniq_id id, replica:: } } + gc_clock::time_point flush_time; bool hints_batchlog_flushed = false; if (needs_flush_before_repair) { auto waiting_nodes = db.get_token_metadata().get_topology().get_all_ips(); @@ -397,22 +399,35 @@ static future flush_hints(repair_service& rs, repair_uniq_id id, replica:: auto hints_timeout = std::chrono::seconds(300); auto batchlog_timeout = std::chrono::seconds(300); repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout}; - + auto start_time = gc_clock::now(); + std::vector times; try { - co_await parallel_for_each(waiting_nodes, [&rs, uuid, &req, &participants] (gms::inet_address node) -> future<> { + co_await parallel_for_each(waiting_nodes, [this, uuid, start_time, ×, &req, &participants] (gms::inet_address node) -> future<> { rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started", uuid, node, participants); try { - auto& ms = rs.get_messaging(); + auto& ms = get_messaging(); auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req); - (void)resp; // nothing to do with response yet + if (resp.flush_time == gc_clock::time_point()) { + // This means the node does not support sending flush_time back. Use the time when the flush is requested for flush_time. + rlogger.debug("repair[{}]: Got empty flush_time from node={}. Please upgrade the node={}.", uuid, node, node); + times.push_back(start_time); + } else { + times.push_back(resp.flush_time); + } } catch (...) { rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}", uuid, node, participants, std::current_exception()); throw; } }); + if (!times.empty()) { + auto it = std::min_element(times.begin(), times.end()); + flush_time = *it; + } hints_batchlog_flushed = true; + auto duration = std::chrono::duration(gc_clock::now() - start_time); + rlogger.info("repair[{}]: Finished repair_flush_hints_batchlog flush_times={} flush_time={} flush_duration={}", uuid, times, flush_time, duration); } catch (...) { rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair", uuid, participants); @@ -420,7 +435,7 @@ static future flush_hints(repair_service& rs, repair_uniq_id id, replica:: } else { rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants); } - co_return hints_batchlog_flushed; + co_return std::make_tuple(hints_batchlog_flushed, flush_time); } @@ -616,7 +631,8 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu streaming::stream_reason reason_, bool hints_batchlog_flushed, bool small_table_optimization, - std::optional ranges_parallelism) + std::optional ranges_parallelism, + gc_clock::time_point flush_time) : repair_task_impl(module, id, 0, "shard", keyspace, "", "", parent_id_.uuid(), reason_) , rs(repair) , db(repair.get_db()) @@ -635,6 +651,7 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu , _hints_batchlog_flushed(std::move(hints_batchlog_flushed)) , _small_table_optimization(small_table_optimization) , _user_ranges_parallelism(ranges_parallelism ? std::optional(semaphore(*ranges_parallelism)) : std::nullopt) + , _flush_time(flush_time) { rlogger.debug("repair[{}]: Setting user_ranges_parallelism to {}", global_repair_id.uuid(), _user_ranges_parallelism ? std::to_string(_user_ranges_parallelism->available_units()) : "unlimited"); @@ -740,7 +757,7 @@ future<> repair::shard_repair_task_impl::repair_range(const dht::token_range& ra } try { auto dropped = co_await with_table_drop_silenced(db.local(), mm, table.id, [&] (const table_id& uuid) { - return repair_cf_range_row_level(*this, table.name, table.id, range, neighbors, _small_table_optimization); + return repair_cf_range_row_level(*this, table.name, table.id, range, neighbors, _small_table_optimization, _flush_time); }); if (dropped) { dropped_tables.insert(table.name); @@ -1337,7 +1354,7 @@ future<> repair::user_requested_repair_task_impl::run() { } else { participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, data_centers, hosts, ignore_nodes).get(); } - bool hints_batchlog_flushed = flush_hints(rs, id, db, keyspace, cfs, ignore_nodes, participants).get(); + auto [hints_batchlog_flushed, flush_time] = rs.flush_hints(id, keyspace, cfs, ignore_nodes, participants).get(); std::vector> repair_results; repair_results.reserve(smp::count); @@ -1387,12 +1404,12 @@ future<> repair::user_requested_repair_task_impl::run() { auto ranges_parallelism = _ranges_parallelism; bool small_table_optimization = _small_table_optimization; for (auto shard : std::views::iota(0u, smp::count)) { - auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed, ranges_parallelism, small_table_optimization, + auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed, flush_time, ranges_parallelism, small_table_optimization, data_centers, hosts, ignore_nodes, parent_data = get_repair_uniq_id().task_info, germs] (repair_service& local_repair) mutable -> future<> { local_repair.get_metrics().repair_total_ranges_sum += ranges.size(); auto task = co_await local_repair._repair_module->make_and_start_task(parent_data, tasks::task_id::create_random_id(), keyspace, local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids), - id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, hints_batchlog_flushed, small_table_optimization, ranges_parallelism); + id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, hints_batchlog_flushed, small_table_optimization, ranges_parallelism, flush_time); co_await task->done(); }); repair_results.push_back(std::move(f)); @@ -1512,9 +1529,10 @@ future<> repair::data_sync_repair_task_impl::run() { bool hints_batchlog_flushed = false; bool small_table_optimization = false; auto ranges_parallelism = std::nullopt; + auto flush_time = gc_clock::time_point(); auto task_impl_ptr = seastar::make_shared(local_repair._repair_module, tasks::task_id::create_random_id(), keyspace, local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids), - id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, hints_batchlog_flushed, small_table_optimization, ranges_parallelism); + id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, hints_batchlog_flushed, small_table_optimization, ranges_parallelism, flush_time); task_impl_ptr->neighbors = std::move(neighbors); auto task = co_await local_repair._repair_module->make_task(std::move(task_impl_ptr), parent_data); task->start(); @@ -2463,12 +2481,12 @@ future<> repair::tablet_repair_task_impl::run() { auto my_address = erm->get_topology().my_address(); auto participants = std::list(m.neighbors.all.begin(), m.neighbors.all.end()); participants.push_front(my_address); - bool hints_batchlog_flushed = co_await flush_hints(rs, id, rs._db.local(), m.keyspace_name, tables, ignore_nodes, participants); + auto [hints_batchlog_flushed, flush_time] = co_await rs.flush_hints(id, m.keyspace_name, tables, ignore_nodes, participants); bool small_table_optimization = false; auto task_impl_ptr = seastar::make_shared(rs._repair_module, tasks::task_id::create_random_id(), m.keyspace_name, rs, erm, std::move(ranges), std::move(table_ids), id, std::move(data_centers), std::move(hosts), - std::move(ignore_nodes), reason, hints_batchlog_flushed, small_table_optimization, ranges_parallelism); + std::move(ignore_nodes), reason, hints_batchlog_flushed, small_table_optimization, ranges_parallelism, flush_time); task_impl_ptr->neighbors = std::move(neighbors); auto task = co_await rs._repair_module->make_task(std::move(task_impl_ptr), parent_data); task->start(); diff --git a/repair/repair.hh b/repair/repair.hh index cc8985e4a9..dc58a4a1aa 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -265,6 +265,7 @@ struct repair_flush_hints_batchlog_request { }; struct repair_flush_hints_batchlog_response { + gc_clock::time_point flush_time; }; struct tablet_repair_task_meta { diff --git a/repair/row_level.cc b/repair/row_level.cc index e76c42871f..801e419974 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2243,36 +2243,86 @@ future repair_service::repair_update_system } future repair_service::repair_flush_hints_batchlog_handler(gms::inet_address from, repair_flush_hints_batchlog_request req) { - rlogger.info("repair[{}]: Started to process repair_flush_hints_batchlog_request from node={}, target_nodes={}, hints_timeout={}s, batchlog_timeout={}s", - req.repair_uuid, from, req.target_nodes, req.hints_timeout.count(), req.batchlog_timeout.count()); - std::vector target_nodes(req.target_nodes.begin(), req.target_nodes.end()); - db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::move(target_nodes)); - lowres_clock::time_point deadline = lowres_clock::now() + req.hints_timeout; - try { - bool bm_throw = utils::get_local_injector().enter("repair_flush_hints_batchlog_handler_bm_uninitialized"); - if (!_bm.local_is_initialized() || bm_throw) { - throw std::runtime_error("Backlog manager isn't initialized"); - } - co_await coroutine::all( - [this, &from, &req, &sync_point, &deadline] () -> future<> { - rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes); - co_await _sp.local().wait_for_hint_sync_point(std::move(sync_point), deadline); - rlogger.info("repair[{}]: Finished to flush hints for repair_flush_hints_batchlog_request from node={}, target_hosts={}", req.repair_uuid, from, req.target_nodes); - co_return; - }, - [this, &from, &req] () -> future<> { - rlogger.info("repair[{}]: Started to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes); - co_await _bm.local().do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no); - rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes); - } - ); - } catch (...) { - rlogger.warn("repair[{}]: Failed to process repair_flush_hints_batchlog_request from node={}, target_hosts={}, {}", - req.repair_uuid, from, req.target_nodes, std::current_exception()); - throw; + if (this_shard_id() != 0) { + co_return co_await container().invoke_on(0, [&] (auto& rs) { + return rs.repair_flush_hints_batchlog_handler(from, std::move(req)); + }); } - rlogger.info("repair[{}]: Finished to process repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes); - co_return repair_flush_hints_batchlog_response(); + rlogger.info("repair[{}]: Started to process repair_flush_hints_batchlog_request from node={} target_nodes={} hints_timeout={}s batchlog_timeout={}s", + req.repair_uuid, from, req.target_nodes, req.hints_timeout.count(), req.batchlog_timeout.count()); + auto permit = co_await seastar::get_units(_flush_hints_batchlog_sem, 1); + bool updated = false; + auto now = gc_clock::now(); + auto cache_time = std::chrono::milliseconds(get_db().local().get_config().repair_hints_batchlog_flush_cache_time_in_ms()); + auto cache_disabled = cache_time == std::chrono::milliseconds(0); + auto flush_time = now; + if (cache_disabled || (now - _flush_hints_batchlog_time > cache_time)) { + // Empty targets meants all nodes + std::vector target_nodes; + db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::move(target_nodes)); + lowres_clock::time_point deadline = lowres_clock::now() + req.hints_timeout; + try { + bool bm_throw = utils::get_local_injector().enter("repair_flush_hints_batchlog_handler_bm_uninitialized"); + if (!_bm.local_is_initialized() || bm_throw) { + throw std::runtime_error("Backlog manager isn't initialized"); + } + co_await coroutine::all( + [this, &from, &req, &sync_point, &deadline] () -> future<> { + rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes); + co_await _sp.local().wait_for_hint_sync_point(std::move(sync_point), deadline); + rlogger.info("repair[{}]: Finished to flush hints for repair_flush_hints_batchlog_request from node={}, target_hosts={}", req.repair_uuid, from, req.target_nodes); + co_return; + }, + [this, now, cache_disabled, &flush_time, &cache_time, &from, &req] () -> future<> { + rlogger.info("repair[{}]: Started to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes); + auto last_replay = _bm.local().get_last_replay(); + bool issue_flush = false; + if (cache_disabled) { + issue_flush = true; + flush_time = now; + } else { + if (now < last_replay) { + flush_time = now; + utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "skip_flush_use_now", fmt::to_string(flush_time)); + } else if (now - last_replay < cache_time) { + // Skip the replay request since last_replay is already + // updated since last _flush_hints_batchlog_time + // update. It is fine to use last_replay for the hint + // flush time because last_replay (batchlog replay + // time) is smaller than now (hint flush time). + flush_time = last_replay; + utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "skip_flush_use_last_replay", fmt::to_string(flush_time)); + } else { + // Issue the replay so the last_replay will be updated + // to bigger than now after the call. + issue_flush = true; + flush_time = now; + } + } + if (issue_flush) { + co_await _bm.local().do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no); + utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "issue_flush", fmt::to_string(flush_time)); + } + rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}, flushed={}", req.repair_uuid, from, req.target_nodes, issue_flush); + } + ); + } catch (...) { + rlogger.warn("repair[{}]: Failed to process repair_flush_hints_batchlog_request from node={} target_hosts={}: {}", + req.repair_uuid, from, req.target_nodes, std::current_exception()); + throw; + } + co_await container().invoke_on_all([flush_time] (repair_service& rs) { + rs._flush_hints_batchlog_time = flush_time; + }); + updated = true; + } else { + utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "skip_flush", fmt::to_string(flush_time)); + } + auto duration = std::chrono::duration(gc_clock::now() - now); + rlogger.info("repair[{}]: Finished to process repair_flush_hints_batchlog_request from node={} target_nodes={} updated={} flush_hints_batchlog_time={} flush_cache_time={} flush_duration={}", + req.repair_uuid, from, req.target_nodes, updated, _flush_hints_batchlog_time, cache_time, duration); + repair_flush_hints_batchlog_response resp{ .flush_time = _flush_hints_batchlog_time }; + co_return resp; } future<> repair_service::init_ms_handlers() { @@ -2567,7 +2617,8 @@ public: table_id table_id, dht::token_range range, std::vector all_live_peer_nodes, - bool small_table_optimization) + bool small_table_optimization, + gc_clock::time_point start_time) : _shard_task(shard_task) , _cf_name(std::move(cf_name)) , _table_id(std::move(table_id)) @@ -2575,7 +2626,7 @@ public: , _all_live_peer_nodes(sort_peer_nodes(all_live_peer_nodes)) , _small_table_optimization(small_table_optimization) , _seed(get_random_seed()) - , _start_time(gc_clock::now()) + , _start_time(start_time) , _is_tablet(_shard_task.db.local().find_column_family(_table_id).uses_tablets()) { repair_neighbors r_neighbors = _shard_task.get_repair_neighbors(_range); @@ -3075,8 +3126,9 @@ public: future<> repair_cf_range_row_level(repair::shard_repair_task_impl& shard_task, sstring cf_name, table_id table_id, dht::token_range range, - const std::vector& all_peer_nodes, bool small_table_optimization) { - auto repair = row_level_repair(shard_task, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes, small_table_optimization); + const std::vector& all_peer_nodes, bool small_table_optimization, gc_clock::time_point flush_time) { + auto start_time = flush_time; + auto repair = row_level_repair(shard_task, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes, small_table_optimization, start_time); co_return co_await repair.run(); } diff --git a/repair/row_level.hh b/repair/row_level.hh index 3dbf3d66de..366bab624f 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -120,6 +120,12 @@ class repair_service : public seastar::peering_sharded_service { future<> init_ms_handlers(); future<> uninit_ms_handlers(); + seastar::semaphore _flush_hints_batchlog_sem{1}; + gc_clock::time_point _flush_hints_batchlog_time; + future> flush_hints(repair_uniq_id id, + sstring keyspace, std::vector cfs, + std::unordered_set ignore_nodes, std::list participants); + public: repair_service(sharded& tsm, distributed& gossiper, @@ -265,7 +271,7 @@ class repair_writer; future<> repair_cf_range_row_level(repair::shard_repair_task_impl& shard_task, sstring cf_name, table_id table_id, dht::token_range range, - const std::vector& all_peer_nodes, bool small_table_optimization); + const std::vector& all_peer_nodes, bool small_table_optimization, gc_clock::time_point flush_time); future> to_repair_rows_list(repair_rows_on_wire rows, schema_ptr s, uint64_t seed, repair_master is_master, reader_permit permit, repair_hasher hasher); diff --git a/repair/task_manager_module.hh b/repair/task_manager_module.hh index 6834146ec2..b9d47a908c 100644 --- a/repair/task_manager_module.hh +++ b/repair/task_manager_module.hh @@ -158,6 +158,7 @@ private: std::optional _failed_because; std::optional _user_ranges_parallelism; uint64_t _ranges_complete = 0; + gc_clock::time_point _flush_time; public: shard_repair_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, @@ -173,7 +174,8 @@ public: streaming::stream_reason reason_, bool hints_batchlog_flushed, bool small_table_optimization, - std::optional ranges_parallelism); + std::optional ranges_parallelism, + gc_clock::time_point flush_time); void check_failed_ranges(); void check_in_abort_or_shutdown(); repair_neighbors get_repair_neighbors(const dht::token_range& range);