Merge "repair: Rename names to be consistent with rpc verb
" from Asias Some of the function names are not updated after we change the rpc verb names. Rename them to make them consistent with the rpc verb names. * seastar-dev.git asias/row_level_repair_rename_consistent_with_rpc_verb/v1: repair: Rename request_sync_boundary to get_sync_boundary repair: Rename request_full_row_hashes to get_full_row_hashes repair: Rename request_combined_row_hash to get_combined_row_hash repair: Rename request_row_diff to get_row_diff repair: Rename send_row_diff to put_row_diff repair: Update function name in docs/row_level_repair.md
This commit is contained in:
@@ -96,15 +96,15 @@ Start:
|
||||
- repair_range_start()
|
||||
|
||||
Step A:
|
||||
- request_sync_boundary()
|
||||
- get_sync_boundary()
|
||||
|
||||
Step B:
|
||||
- request_combined_row_hashes()
|
||||
- request_full_row_hashes()
|
||||
- request_row_diff()
|
||||
- get_combined_row_hashes()
|
||||
- get_full_row_hashes()
|
||||
- get_row_diff()
|
||||
|
||||
Step C:
|
||||
- send_row_diff()
|
||||
- put_row_diff()
|
||||
|
||||
Finish:
|
||||
- repair_range_stop()
|
||||
|
||||
@@ -800,7 +800,7 @@ private:
|
||||
// Calculate the combined checksum of the rows
|
||||
// Calculate the total size of the rows in _row_buf
|
||||
future<get_sync_boundary_response>
|
||||
request_sync_boundary(std::optional<repair_sync_boundary> skipped_sync_boundary) {
|
||||
get_sync_boundary(std::optional<repair_sync_boundary> skipped_sync_boundary) {
|
||||
if (skipped_sync_boundary) {
|
||||
_current_sync_boundary = skipped_sync_boundary;
|
||||
_row_buf.clear();
|
||||
@@ -822,7 +822,7 @@ private:
|
||||
if (!_row_buf.empty()) {
|
||||
sb_max = _row_buf.back().boundary();
|
||||
}
|
||||
rlogger.debug("request_sync_boundary: Got nr={} rows, sb_max={}, row_buf_size={}, repair_hash={}, skipped_sync_boundary={}",
|
||||
rlogger.debug("get_sync_boundary: Got nr={} rows, sb_max={}, row_buf_size={}, repair_hash={}, skipped_sync_boundary={}",
|
||||
new_rows.size(), sb_max, row_buf_size(), row_buf_csum(), skipped_sync_boundary);
|
||||
return get_sync_boundary_response{sb_max, row_buf_csum(), row_buf_size(), new_rows_size, new_rows_nr};
|
||||
});
|
||||
@@ -875,7 +875,7 @@ private:
|
||||
}
|
||||
|
||||
std::unordered_set<repair_hash>
|
||||
request_full_row_hashes() {
|
||||
get_full_row_hashes() {
|
||||
return working_row_hashes();
|
||||
}
|
||||
|
||||
@@ -984,9 +984,9 @@ public:
|
||||
// RPC API
|
||||
// Return the hashes of the rows in _working_row_buf
|
||||
future<std::unordered_set<repair_hash>>
|
||||
request_full_row_hashes(gms::inet_address remote_node) {
|
||||
get_full_row_hashes(gms::inet_address remote_node) {
|
||||
if (remote_node == _myip) {
|
||||
return make_ready_future<std::unordered_set<repair_hash>>(request_full_row_hashes_handler());
|
||||
return make_ready_future<std::unordered_set<repair_hash>>(get_full_row_hashes_handler());
|
||||
}
|
||||
return netw::get_local_messaging_service().send_repair_get_full_row_hashes(msg_addr(remote_node),
|
||||
_repair_meta_id).then([this, remote_node] (std::unordered_set<repair_hash> hashes) {
|
||||
@@ -1000,16 +1000,16 @@ public:
|
||||
|
||||
// RPC handler
|
||||
std::unordered_set<repair_hash>
|
||||
request_full_row_hashes_handler() {
|
||||
return request_full_row_hashes();
|
||||
get_full_row_hashes_handler() {
|
||||
return get_full_row_hashes();
|
||||
}
|
||||
|
||||
// RPC API
|
||||
// Return the combined hashes of the current working row buf
|
||||
future<repair_hash>
|
||||
request_combined_row_hash(std::optional<repair_sync_boundary> common_sync_boundary, gms::inet_address remote_node) {
|
||||
get_combined_row_hash(std::optional<repair_sync_boundary> common_sync_boundary, gms::inet_address remote_node) {
|
||||
if (remote_node == _myip) {
|
||||
return request_combined_row_hash_handler(common_sync_boundary);
|
||||
return get_combined_row_hash_handler(common_sync_boundary);
|
||||
}
|
||||
return netw::get_local_messaging_service().send_repair_get_combined_row_hash(msg_addr(remote_node),
|
||||
_repair_meta_id, common_sync_boundary).then([this] (repair_hash combined_hash) {
|
||||
@@ -1022,10 +1022,10 @@ public:
|
||||
|
||||
// RPC handler
|
||||
future<repair_hash>
|
||||
request_combined_row_hash_handler(std::optional<repair_sync_boundary> common_sync_boundary) {
|
||||
get_combined_row_hash_handler(std::optional<repair_sync_boundary> common_sync_boundary) {
|
||||
// We can not call this function twice. The good thing is we do not use
|
||||
// retransmission at messaging_service level, so no message will be retransmited.
|
||||
rlogger.trace("Calling request_combined_row_hash_handler");
|
||||
rlogger.trace("Calling get_combined_row_hash_handler");
|
||||
return request_row_hashes(common_sync_boundary);
|
||||
}
|
||||
|
||||
@@ -1108,9 +1108,9 @@ public:
|
||||
// RPC API
|
||||
// Return the largest sync point contained in the _row_buf , current _row_buf checksum, and the _row_buf size
|
||||
future<get_sync_boundary_response>
|
||||
request_sync_boundary(gms::inet_address remote_node, std::optional<repair_sync_boundary> skipped_sync_boundary) {
|
||||
get_sync_boundary(gms::inet_address remote_node, std::optional<repair_sync_boundary> skipped_sync_boundary) {
|
||||
if (remote_node == _myip) {
|
||||
return request_sync_boundary_handler(skipped_sync_boundary);
|
||||
return get_sync_boundary_handler(skipped_sync_boundary);
|
||||
}
|
||||
stats().rpc_call_nr++;
|
||||
return netw::get_local_messaging_service().send_repair_get_sync_boundary(msg_addr(remote_node), _repair_meta_id, skipped_sync_boundary);
|
||||
@@ -1118,13 +1118,13 @@ public:
|
||||
|
||||
// RPC handler
|
||||
future<get_sync_boundary_response>
|
||||
request_sync_boundary_handler(std::optional<repair_sync_boundary> skipped_sync_boundary) {
|
||||
return request_sync_boundary(std::move(skipped_sync_boundary));
|
||||
get_sync_boundary_handler(std::optional<repair_sync_boundary> skipped_sync_boundary) {
|
||||
return get_sync_boundary(std::move(skipped_sync_boundary));
|
||||
}
|
||||
|
||||
// RPC API
|
||||
// Return rows in the _working_row_buf with hash within the given sef_diff
|
||||
future<> request_row_diff(std::unordered_set<repair_hash> set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node, unsigned node_idx) {
|
||||
future<> get_row_diff(std::unordered_set<repair_hash> set_diff, needs_all_rows_t needs_all_rows, gms::inet_address remote_node, unsigned node_idx) {
|
||||
if (needs_all_rows || !set_diff.empty()) {
|
||||
if (remote_node == _myip) {
|
||||
return make_ready_future<>();
|
||||
@@ -1147,7 +1147,7 @@ public:
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> request_row_diff_and_update_peer_row_hash_sets(gms::inet_address remote_node, unsigned node_idx) {
|
||||
future<> get_row_diff_and_update_peer_row_hash_sets(gms::inet_address remote_node, unsigned node_idx) {
|
||||
if (remote_node == _myip) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -1163,14 +1163,14 @@ public:
|
||||
}
|
||||
|
||||
// RPC handler
|
||||
future<repair_rows_on_wire> request_row_diff_handler(const std::unordered_set<repair_hash>& set_diff, needs_all_rows_t needs_all_rows) {
|
||||
future<repair_rows_on_wire> get_row_diff_handler(const std::unordered_set<repair_hash>& set_diff, needs_all_rows_t needs_all_rows) {
|
||||
std::list<repair_row> row_diff = get_row_diff(set_diff, needs_all_rows);
|
||||
return to_repair_rows_on_wire(std::move(row_diff));
|
||||
}
|
||||
|
||||
// RPC API
|
||||
// Send rows in the _working_row_buf with hash within the given sef_diff
|
||||
future<> send_row_diff(const std::unordered_set<repair_hash>& set_diff, gms::inet_address remote_node) {
|
||||
future<> put_row_diff(const std::unordered_set<repair_hash>& set_diff, gms::inet_address remote_node) {
|
||||
if (!set_diff.empty()) {
|
||||
if (remote_node == _myip) {
|
||||
return make_ready_future<>();
|
||||
@@ -1191,7 +1191,7 @@ public:
|
||||
}
|
||||
|
||||
// RPC handler
|
||||
future<> send_row_diff_handler(repair_rows_on_wire rows, gms::inet_address from) {
|
||||
future<> put_row_diff_handler(repair_rows_on_wire rows, gms::inet_address from) {
|
||||
return apply_rows(std::move(rows), from, update_working_row_buf::no, update_peer_row_hash_sets::no);
|
||||
}
|
||||
};
|
||||
@@ -1205,7 +1205,7 @@ future<> repair_init_messaging_service_handler(distributed<db::system_distribute
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id] {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
std::unordered_set<repair_hash> hashes = rm->request_full_row_hashes_handler();
|
||||
std::unordered_set<repair_hash> hashes = rm->get_full_row_hashes_handler();
|
||||
_metrics.tx_hashes_nr += hashes.size();
|
||||
return make_ready_future<std::unordered_set<repair_hash>>(std::move(hashes));
|
||||
}) ;
|
||||
@@ -1218,7 +1218,7 @@ future<> repair_init_messaging_service_handler(distributed<db::system_distribute
|
||||
common_sync_boundary = std::move(common_sync_boundary)] () mutable {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
_metrics.tx_hashes_nr++;
|
||||
return rm->request_combined_row_hash_handler(std::move(common_sync_boundary));
|
||||
return rm->get_combined_row_hash_handler(std::move(common_sync_boundary));
|
||||
});
|
||||
});
|
||||
ms.register_repair_get_sync_boundary([] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
@@ -1228,7 +1228,7 @@ future<> repair_init_messaging_service_handler(distributed<db::system_distribute
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id,
|
||||
skipped_sync_boundary = std::move(skipped_sync_boundary)] () mutable {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
return rm->request_sync_boundary_handler(std::move(skipped_sync_boundary));
|
||||
return rm->get_sync_boundary_handler(std::move(skipped_sync_boundary));
|
||||
});
|
||||
});
|
||||
ms.register_repair_get_row_diff([] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
@@ -1237,7 +1237,7 @@ future<> repair_init_messaging_service_handler(distributed<db::system_distribute
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, set_diff = std::move(set_diff), needs_all_rows] () mutable {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
return rm->request_row_diff_handler(set_diff, repair_meta::needs_all_rows_t(needs_all_rows));
|
||||
return rm->get_row_diff_handler(set_diff, repair_meta::needs_all_rows_t(needs_all_rows));
|
||||
});
|
||||
});
|
||||
ms.register_repair_put_row_diff([] (const rpc::client_info& cinfo, uint32_t repair_meta_id,
|
||||
@@ -1246,7 +1246,7 @@ future<> repair_init_messaging_service_handler(distributed<db::system_distribute
|
||||
auto from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return smp::submit_to(src_cpu_id % smp::count, [from, repair_meta_id, row_diff = std::move(row_diff)] () mutable {
|
||||
auto rm = repair_meta::get_repair_meta(from, repair_meta_id);
|
||||
return rm->send_row_diff_handler(std::move(row_diff), from);
|
||||
return rm->put_row_diff_handler(std::move(row_diff), from);
|
||||
});
|
||||
});
|
||||
ms.register_repair_row_level_start([] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name,
|
||||
@@ -1304,9 +1304,9 @@ class row_level_repair {
|
||||
// Repair master and followers will propose a sync boundary. Each of them
|
||||
// read N bytes of rows from disk, the row with largest
|
||||
// `position_in_partition` value is the proposed sync boundary of that
|
||||
// node. The repair master uses `request_sync_boundary` rpc call to
|
||||
// node. The repair master uses `get_sync_boundary` rpc call to
|
||||
// get all the proposed sync boundary and stores in in
|
||||
// `_sync_boundaries`. The `request_sync_boundary` rpc call also
|
||||
// `_sync_boundaries`. The `get_sync_boundary` rpc call also
|
||||
// returns the combined hashes and the total size for the rows which are
|
||||
// in the `_row_buf`. `_row_buf` buffers the rows read from sstable. It
|
||||
// contains rows at most of `_max_row_buf_size` bytes.
|
||||
@@ -1320,7 +1320,7 @@ class row_level_repair {
|
||||
std::optional<repair_sync_boundary> _common_sync_boundary = {};
|
||||
|
||||
// `_skipped_sync_boundary` is used in case we find the range is synced
|
||||
// only with the `request_sync_boundary` rpc call. We use it to make
|
||||
// only with the `get_sync_boundary` rpc call. We use it to make
|
||||
// sure the remote peers update the `_current_sync_boundary` and
|
||||
// `_last_sync_boundary` correctly.
|
||||
std::optional<repair_sync_boundary> _skipped_sync_boundary = {};
|
||||
@@ -1374,10 +1374,10 @@ private:
|
||||
master.stats().round_nr, master.last_sync_boundary(), master.current_sync_boundary(), _skipped_sync_boundary);
|
||||
master.stats().round_nr++;
|
||||
parallel_for_each(_all_nodes, [&, this] (const gms::inet_address& node) {
|
||||
// By calling `request_sync_boundary`, the `_last_sync_boundary`
|
||||
// By calling `get_sync_boundary`, the `_last_sync_boundary`
|
||||
// is moved to the `_current_sync_boundary` or
|
||||
// `_skipped_sync_boundary` if it is not std::nullopt.
|
||||
return master.request_sync_boundary(node, _skipped_sync_boundary).then([&, this] (get_sync_boundary_response res) {
|
||||
return master.get_sync_boundary(node, _skipped_sync_boundary).then([&, this] (get_sync_boundary_response res) {
|
||||
master.stats().row_from_disk_bytes[node] += res.new_rows_size;
|
||||
master.stats().row_from_disk_nr[node] += res.new_rows_nr;
|
||||
if (res.boundary && res.row_buf_size > 0) {
|
||||
@@ -1389,7 +1389,7 @@ private:
|
||||
// this node when calculating common sync boundary
|
||||
_zero_rows = true;
|
||||
}
|
||||
rlogger.debug("Called master.request_sync_boundary for node {} sb={}, combined_csum={}, row_size={}, zero_rows={}, skipped_sync_boundary={}",
|
||||
rlogger.debug("Called master.get_sync_boundary for node {} sb={}, combined_csum={}, row_size={}, zero_rows={}, skipped_sync_boundary={}",
|
||||
node, res.boundary, res.row_buf_combined_csum, res.row_buf_size, _zero_rows, _skipped_sync_boundary);
|
||||
});
|
||||
}).get();
|
||||
@@ -1426,7 +1426,7 @@ private:
|
||||
// `combined_hashes` contains the combined hashes for the
|
||||
// `_working_row_buf`. Like `_row_buf`, `_working_row_buf` contains
|
||||
// rows which are within the (_last_sync_boundary, _current_sync_boundary]
|
||||
// By calling `request_combined_row_hash(_common_sync_boundary)`,
|
||||
// By calling `get_combined_row_hash(_common_sync_boundary)`,
|
||||
// all the nodes move the `_current_sync_boundary` to `_common_sync_boundary`,
|
||||
// Rows within the (_last_sync_boundary, _current_sync_boundary] are
|
||||
// moved from the `_row_buf` to `_working_row_buf`.
|
||||
@@ -1439,13 +1439,13 @@ private:
|
||||
// - Move rows from `_row_buf` to `_working_row_buf`
|
||||
// But the full hashes (each and every hashes for the rows in
|
||||
// the `_working_row_buf`) are not returned until repair master
|
||||
// explicitly requests with request_full_row_hashes() below as
|
||||
// explicitly requests with get_full_row_hashes() below as
|
||||
// an optimization. Because if the combined_hashes from all
|
||||
// peers are identical, we think rows in the `_working_row_buff`
|
||||
// are identical, there is no need to transfer each and every
|
||||
// row hashes to the repair master.
|
||||
return master.request_combined_row_hash(_common_sync_boundary, _all_nodes[idx]).then([&, this, idx] (repair_hash h) {
|
||||
rlogger.debug("Calling master.request_combined_row_hash for node {}, got hash={}", _all_nodes[idx], h);
|
||||
return master.get_combined_row_hash(_common_sync_boundary, _all_nodes[idx]).then([&, this, idx] (repair_hash h) {
|
||||
rlogger.debug("Calling master.get_combined_row_hash for node {}, got hash={}", _all_nodes[idx], h);
|
||||
combined_hashes[idx]= std::move(h);
|
||||
});
|
||||
}).get();
|
||||
@@ -1487,13 +1487,13 @@ private:
|
||||
|
||||
// Fast path: if local has zero row and remote has rows, request them all.
|
||||
if (master.working_row_buf_combined_hash() == repair_hash() && combined_hashes[node_idx + 1] != repair_hash()) {
|
||||
master.request_row_diff_and_update_peer_row_hash_sets(node, node_idx).get();
|
||||
master.get_row_diff_and_update_peer_row_hash_sets(node, node_idx).get();
|
||||
continue;
|
||||
}
|
||||
|
||||
// Ask the peer to send the full list hashes in the working row buf.
|
||||
master.peer_row_hash_sets(node_idx) = master.request_full_row_hashes(node).get0();
|
||||
rlogger.debug("Calling master.request_full_row_hashes for node {}, hash_sets={}",
|
||||
master.peer_row_hash_sets(node_idx) = master.get_full_row_hashes(node).get0();
|
||||
rlogger.debug("Calling master.get_full_row_hashes for node {}, hash_sets={}",
|
||||
node, master.peer_row_hash_sets(node_idx).size());
|
||||
|
||||
// With hashes of rows from peer node, we can figure out
|
||||
@@ -1505,13 +1505,13 @@ private:
|
||||
// between repair master and repair follower 2.
|
||||
std::unordered_set<repair_hash> set_diff = repair_meta::get_set_diff(master.peer_row_hash_sets(node_idx), master.working_row_hashes());
|
||||
// Request missing sets from peer node
|
||||
rlogger.debug("Calling master.request_row_diff to node {}, local={}, peer={}, set_diff={}",
|
||||
rlogger.debug("Calling master.get_row_diff to node {}, local={}, peer={}, set_diff={}",
|
||||
node, master.working_row_hashes().size(), master.peer_row_hash_sets(node_idx).size(), set_diff);
|
||||
// If we need to pull all rows from the peer. We can avoid
|
||||
// sending the row hashes on wire by setting needs_all_rows flag.
|
||||
auto needs_all_rows = repair_meta::needs_all_rows_t(set_diff.size() == master.peer_row_hash_sets(node_idx).size());
|
||||
master.request_row_diff(std::move(set_diff), needs_all_rows, node, node_idx).get();
|
||||
rlogger.debug("After request_row_diff node {}, hash_sets={}", master.myip(), master.working_row_hashes().size());
|
||||
master.get_row_diff(std::move(set_diff), needs_all_rows, node, node_idx).get();
|
||||
rlogger.debug("After get_row_diff node {}, hash_sets={}", master.myip(), master.working_row_hashes().size());
|
||||
}
|
||||
return op_status::next_step;
|
||||
}
|
||||
@@ -1524,8 +1524,8 @@ private:
|
||||
std::unordered_set<repair_hash> local_row_hash_sets = master.working_row_hashes();
|
||||
parallel_for_each(boost::irange(size_t(0), _all_live_peer_nodes.size()), [&, this] (size_t idx) {
|
||||
auto set_diff = repair_meta::get_set_diff(local_row_hash_sets, master.peer_row_hash_sets(idx));
|
||||
rlogger.trace("Calling master.send_row_diff to node {}, set_diff={}", _all_live_peer_nodes[idx], set_diff.size());
|
||||
return master.send_row_diff(set_diff, _all_live_peer_nodes[idx]);
|
||||
rlogger.trace("Calling master.put_row_diff to node {}, set_diff={}", _all_live_peer_nodes[idx], set_diff.size());
|
||||
return master.put_row_diff(set_diff, _all_live_peer_nodes[idx]);
|
||||
}).get();
|
||||
master.stats().round_nr_slow_path++;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user