repair/row_level: opt in to compacting the stream

Using a centrally generated compaction-time, generated on the repair
master and propagated to all repair followers. For repair it is
imperative that all participants use the exact same compaction time,
otherwise there can be artificial differences between participants,
generating unnecessary repair activity.
If a repair follower doesn't get a compaction-time from the repair
master, it uses a locally generated one. This is no worse than the
previous state of each node being on some undefined state of compaction.
This commit is contained in:
Botond Dénes
2023-07-19 02:59:44 -04:00
parent 5452fd1ce4
commit fdaf908967
6 changed files with 72 additions and 31 deletions

View File

@@ -1303,14 +1303,14 @@ future<> messaging_service::send_repair_put_row_diff(msg_addr id, uint32_t repai
}
// Wrapper for REPAIR_ROW_LEVEL_START
void messaging_service::register_repair_row_level_start(std::function<future<repair_row_level_start_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason)>&& func) {
void messaging_service::register_repair_row_level_start(std::function<future<repair_row_level_start_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason, rpc::optional<gc_clock::time_point> compaction_time)>&& func) {
register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(func));
}
future<> messaging_service::unregister_repair_row_level_start() {
return unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_START);
}
future<rpc::optional<repair_row_level_start_response>> messaging_service::send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason) {
return send_message<rpc::optional<repair_row_level_start_response>>(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name), std::move(schema_version), reason);
future<rpc::optional<repair_row_level_start_response>> messaging_service::send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time) {
return send_message<rpc::optional<repair_row_level_start_response>>(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name), std::move(schema_version), reason, compaction_time);
}
// Wrapper for REPAIR_ROW_LEVEL_STOP

View File

@@ -420,9 +420,9 @@ public:
future<> send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff);
// Wrapper for REPAIR_ROW_LEVEL_START
void register_repair_row_level_start(std::function<future<repair_row_level_start_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason)>&& func);
void register_repair_row_level_start(std::function<future<repair_row_level_start_response> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason, rpc::optional<gc_clock::time_point> compaction_time)>&& func);
future<> unregister_repair_row_level_start();
future<rpc::optional<repair_row_level_start_response>> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason);
future<rpc::optional<repair_row_level_start_response>> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time);
// Wrapper for REPAIR_ROW_LEVEL_STOP
void register_repair_row_level_stop(std::function<future<> (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range)>&& func);

View File

@@ -52,7 +52,8 @@ private:
replica::column_family& cf,
read_strategy strategy,
const dht::sharder& remote_sharder,
unsigned remote_shard);
unsigned remote_shard,
gc_clock::time_point compaction_time);
public:
repair_reader(
@@ -64,7 +65,8 @@ public:
const dht::sharder& remote_sharder,
unsigned remote_shard,
uint64_t seed,
read_strategy strategy);
read_strategy strategy,
gc_clock::time_point compaction_time);
future<mutation_fragment_opt>
read_mutation_fragment();

View File

@@ -264,10 +264,11 @@ flat_mutation_reader_v2 repair_reader::make_reader(
replica::column_family& cf,
read_strategy strategy,
const dht::sharder& remote_sharder,
unsigned remote_shard) {
unsigned remote_shard,
gc_clock::time_point compaction_time) {
switch (strategy) {
case read_strategy::local: {
auto ms = mutation_source([&cf] (
auto ms = mutation_source([&cf, compaction_time] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
@@ -275,7 +276,7 @@ flat_mutation_reader_v2 repair_reader::make_reader(
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding fwd_mr) {
return cf.make_streaming_reader(std::move(s), std::move(permit), pr, ps, fwd_mr, {});
return cf.make_streaming_reader(std::move(s), std::move(permit), pr, ps, fwd_mr, compaction_time);
});
flat_mutation_reader_v2 rd(nullptr);
std::tie(rd, _reader_handle) = make_manually_paused_evictable_reader_v2(
@@ -299,14 +300,14 @@ flat_mutation_reader_v2 repair_reader::make_reader(
return std::optional<dht::partition_range>(dht::to_partition_range(*shard_range));
}
return std::optional<dht::partition_range>();
}, {});
}, compaction_time);
}
case read_strategy::multishard_filter: {
// We can't have two permits with count resource for 1 repair.
// So we release the one on _permit so the only one is the one the
// shard reader will obtain.
_permit.release_base_resources();
return make_filtering_reader(make_multishard_streaming_reader(db, _schema, _permit, _range, {}),
return make_filtering_reader(make_multishard_streaming_reader(db, _schema, _permit, _range, compaction_time),
[&remote_sharder, remote_shard](const dht::decorated_key& k) {
return remote_sharder.shard_of(k.token()) == remote_shard;
});
@@ -326,14 +327,15 @@ repair_reader::repair_reader(
const dht::sharder& remote_sharder,
unsigned remote_shard,
uint64_t seed,
read_strategy strategy)
read_strategy strategy,
gc_clock::time_point compaction_time)
: _schema(s)
, _permit(std::move(permit))
, _range(dht::to_partition_range(range))
, _sharder(remote_sharder, range, remote_shard)
, _seed(seed)
, _local_read_op(strategy == read_strategy::local ? std::optional(cf.read_in_progress()) : std::nullopt)
, _reader(make_reader(db, cf, strategy, remote_sharder, remote_shard))
, _reader(make_reader(db, cf, strategy, remote_sharder, remote_shard, compaction_time))
{ }
future<mutation_fragment_opt>
@@ -735,6 +737,7 @@ private:
is_dirty_on_master _dirty_on_master = is_dirty_on_master::no;
std::optional<shared_future<>> _stopped;
repair_hasher _repair_hasher;
gc_clock::time_point _compaction_time;
public:
std::vector<repair_node_state>& all_nodes() {
return _all_node_states;
@@ -773,6 +776,7 @@ public:
}
public:
// master constructor
repair_meta(
repair_service& rs,
replica::column_family& cf,
@@ -787,8 +791,9 @@ public:
streaming::stream_reason reason,
shard_config master_node_shard_config,
inet_address_vector_replica_set all_live_peer_nodes,
size_t nr_peer_nodes = 1,
row_level_repair* row_level_repair_ptr = nullptr)
size_t nr_peer_nodes,
row_level_repair* row_level_repair_ptr,
gc_clock::time_point compaction_time)
: _rs(rs)
, _db(rs.get_db())
, _messaging(rs.get_messaging())
@@ -825,6 +830,7 @@ public:
})
, _row_level_repair_ptr(row_level_repair_ptr)
, _repair_hasher(_seed, _schema)
, _compaction_time(compaction_time)
{
if (master) {
add_to_repair_meta_for_masters(*this);
@@ -837,6 +843,27 @@ public:
}
}
// follower constructor
repair_meta(
repair_service& rs,
replica::column_family& cf,
schema_ptr s,
reader_permit permit,
dht::token_range range,
row_level_diff_detect_algorithm algo,
size_t max_row_buf_size,
uint64_t seed,
repair_master master,
uint32_t repair_meta_id,
streaming::stream_reason reason,
shard_config master_node_shard_config,
inet_address_vector_replica_set all_live_peer_nodes,
gc_clock::time_point compaction_time)
: repair_meta(rs, cf, std::move(s), std::move(permit), std::move(range), algo, max_row_buf_size, seed, master, repair_meta_id, reason,
std::move(master_node_shard_config), std::move(all_live_peer_nodes), 1, nullptr, compaction_time)
{
}
public:
future<> clear_gently() noexcept {
co_await utils::clear_gently(_peer_row_hash_sets);
@@ -1098,7 +1125,8 @@ private:
_local_range_estimation->master_subranges_count,
read_strategy);
return read_strategy;
}));
}),
_compaction_time);
}
try {
while (cur_size < _max_row_buf_size) {
@@ -1483,7 +1511,7 @@ public:
// RPC API
future<>
repair_row_level_start(gms::inet_address remote_node, sstring ks_name, sstring cf_name, dht::token_range range, table_schema_version schema_version, streaming::stream_reason reason) {
repair_row_level_start(gms::inet_address remote_node, sstring ks_name, sstring cf_name, dht::token_range range, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time) {
if (remote_node == _myip) {
return make_ready_future<>();
}
@@ -1497,7 +1525,7 @@ public:
return _messaging.send_repair_row_level_start(msg_addr(remote_node),
_repair_meta_id, ks_name, cf_name, std::move(range), _algo, _max_row_buf_size, _seed,
_master_node_shard_config.shard, _master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb,
remote_partitioner_name, std::move(schema_version), reason).then([ks_name, cf_name] (rpc::optional<repair_row_level_start_response> resp) {
remote_partitioner_name, std::move(schema_version), reason, compaction_time).then([ks_name, cf_name] (rpc::optional<repair_row_level_start_response> resp) {
if (resp && resp->status == repair_row_level_start_status::no_such_column_family) {
return make_exception_future<>(replica::no_such_column_family(ks_name, cf_name));
} else {
@@ -1510,10 +1538,11 @@ public:
static future<repair_row_level_start_response>
repair_row_level_start_handler(repair_service& repair, gms::inet_address from, uint32_t src_cpu_id, uint32_t repair_meta_id, sstring ks_name, sstring cf_name,
dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size,
uint64_t seed, shard_config master_node_shard_config, table_schema_version schema_version, streaming::stream_reason reason, abort_source& as) {
uint64_t seed, shard_config master_node_shard_config, table_schema_version schema_version, streaming::stream_reason reason,
gc_clock::time_point compaction_time, abort_source& as) {
rlogger.debug(">>> Started Row Level Repair (Follower): local={}, peers={}, repair_meta_id={}, keyspace={}, cf={}, schema_version={}, range={}, seed={}, max_row_buf_siz={}",
utils::fb_utilities::get_broadcast_address(), from, repair_meta_id, ks_name, cf_name, schema_version, range, seed, max_row_buf_size);
return repair.insert_repair_meta(from, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version), reason, as).then([] {
return repair.insert_repair_meta(from, src_cpu_id, repair_meta_id, std::move(range), algo, max_row_buf_size, seed, std::move(master_node_shard_config), std::move(schema_version), reason, compaction_time, as).then([] {
return repair_row_level_start_response{repair_row_level_start_status::ok};
}).handle_exception_type([] (replica::no_such_column_family&) {
return repair_row_level_start_response{repair_row_level_start_status::no_such_column_family};
@@ -2302,20 +2331,22 @@ future<> repair_service::init_ms_handlers() {
});
ms.register_repair_row_level_start([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name,
sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed,
unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional<streaming::stream_reason> reason) {
unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version,
rpc::optional<streaming::stream_reason> reason, rpc::optional<gc_clock::time_point> compaction_time) {
auto src_cpu_id = cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id");
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return container().invoke_on(src_cpu_id % smp::count, [from, src_cpu_id, repair_meta_id, ks_name, cf_name,
range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, schema_version, reason, this] (repair_service& local_repair) mutable {
range, algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, schema_version, reason, compaction_time, this] (repair_service& local_repair) mutable {
if (!local_repair._sys_dist_ks.local_is_initialized() || !local_repair._view_update_generator.local_is_initialized()) {
return make_exception_future<repair_row_level_start_response>(std::runtime_error(format("Node {} is not fully initialized for repair, try again later",
utils::fb_utilities::get_broadcast_address())));
}
streaming::stream_reason r = reason ? *reason : streaming::stream_reason::repair;
const gc_clock::time_point ct = compaction_time ? *compaction_time : gc_clock::now();
return repair_meta::repair_row_level_start_handler(local_repair, from, src_cpu_id, repair_meta_id, std::move(ks_name),
std::move(cf_name), std::move(range), algo, max_row_buf_size, seed,
shard_config{remote_shard, remote_shard_count, remote_ignore_msb},
schema_version, r, _repair_module->abort_source());
schema_version, r, ct, _repair_module->abort_source());
});
});
ms.register_repair_row_level_stop([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
@@ -2802,6 +2833,8 @@ public:
auto permit = _shard_task.db.local().obtain_reader_permit(_cf, "repair-meta", db::no_timeout, {}).get0();
auto compaction_time = gc_clock::now();
repair_meta master(_shard_task.rs,
_cf,
s,
@@ -2816,7 +2849,8 @@ public:
std::move(master_node_shard_config),
_all_live_peer_nodes,
_all_live_peer_nodes.size(),
this);
this,
compaction_time);
auto auto_stop_master = defer([&master] {
master.stop().handle_exception([] (std::exception_ptr ep) {
rlogger.warn("Failed auto-stopping Row Level Repair (Master): {}. Ignored.", ep);
@@ -2833,7 +2867,7 @@ public:
parallel_for_each(master.all_nodes(), [&, this] (repair_node_state& ns) {
const auto& node = ns.node;
ns.state = repair_state::row_level_start_started;
return master.repair_row_level_start(node, _shard_task.get_keyspace(), _cf_name, _range, schema_version, _shard_task.reason()).then([&] () {
return master.repair_row_level_start(node, _shard_task.get_keyspace(), _cf_name, _range, schema_version, _shard_task.reason(), compaction_time).then([&] () {
ns.state = repair_state::row_level_start_finished;
nodes_to_stop.push_back(node);
ns.state = repair_state::get_estimated_partitions_started;
@@ -3103,6 +3137,7 @@ repair_service::insert_repair_meta(
shard_config master_node_shard_config,
table_schema_version schema_version,
streaming::stream_reason reason,
gc_clock::time_point compaction_time,
abort_source& as) {
return get_migration_manager().get_schema_for_write(schema_version, {from, src_cpu_id}, get_messaging(), &as).then([this,
from,
@@ -3112,7 +3147,8 @@ repair_service::insert_repair_meta(
max_row_buf_size,
seed,
master_node_shard_config,
reason] (schema_ptr s) {
reason,
compaction_time] (schema_ptr s) {
auto& db = get_db();
auto& cf = db.local().find_column_family(s->id());
return db.local().obtain_reader_permit(cf, "repair-meta", db::no_timeout, {}).then([s = std::move(s),
@@ -3125,9 +3161,10 @@ repair_service::insert_repair_meta(
max_row_buf_size,
seed,
master_node_shard_config,
reason] (reader_permit permit) mutable {
reason,
compaction_time] (reader_permit permit) mutable {
node_repair_meta_id id{from, repair_meta_id};
auto rm = make_shared<repair_meta>(*this,
auto rm = seastar::make_shared<repair_meta>(*this,
cf,
s,
std::move(permit),
@@ -3139,7 +3176,8 @@ repair_service::insert_repair_meta(
repair_meta_id,
reason,
std::move(master_node_shard_config),
inet_address_vector_replica_set{from});
inet_address_vector_replica_set{from},
compaction_time);
rm->set_repair_state_for_local_node(repair_state::row_level_start_started);
bool insertion = repair_meta_map().emplace(id, rm).second;
if (!insertion) {

View File

@@ -214,6 +214,7 @@ public:
shard_config master_node_shard_config,
table_schema_version schema_version,
streaming::stream_reason reason,
gc_clock::time_point compaction_time,
abort_source& as);
future<>

View File

@@ -186,7 +186,7 @@ SEASTAR_TEST_CASE(test_reader_with_different_strategies) {
});
auto read_all = [&](repair_reader::read_strategy strategy) -> future<std::vector<mutation_fragment>> {
auto reader = repair_reader(e.db(), cf, cf.schema(), make_reader_permit(e),
random_range, remote_sharder, remote_shard, 0, strategy);
random_range, remote_sharder, remote_shard, 0, strategy, gc_clock::now());
std::vector<mutation_fragment> result;
while (auto mf = co_await reader.read_mutation_fragment()) {
result.push_back(std::move(*mf));