repair: Introduce small table optimization
*) Problem:
We have seen in the field it takes longer than expected to repair system tables
like system_auth which has a tiny amount of data but is replicated to all nodes
in the cluster. The cluster has multiple DCs. Each DC has multiple nodes. The
main reason for the slowness is that even if the amount of data is small,
repair has to walk though all the token ranges, that is num_tokens *
number_of_nodes_in_the_cluster. The overhead of the repair protocol for each
token range dominates due to the small amount of data per token range. Another
reason is the high network latency between DCs makes the RPC calls used to
repair consume more time.
*) Solution:
To solve this problem, a small table optimization for repair is introduced in
this patch. A new repair option is added to turn on this optimization.
- No token range to repair is needed by the user. It will repair all token
ranges automatically.
- Users only need to send the repair rest api to one of the nodes in the
cluster. It can be any of the nodes in the cluster.
- It does not require the RF to be configured to replicate to all nodes in the
cluster. This means it can work with any tables as long as the amount of data
is low, e.g., less than 100MiB per node.
*) Performance:
1)
3 DCs, each DC has 2 nodes, 6 nodes in the cluster. RF = {dc1: 2, dc2: 2, dc3: 2}
Before:
```
repair - repair[744cd573-2621-45e4-9b27-00634963d0bd]: stats:
repair_reason=repair, keyspace=system_auth, tables={roles, role_attributes,
role_members}, ranges_nr=1537, round_nr=4612,
round_nr_fast_path_already_synced=4611,
round_nr_fast_path_same_combined_hashes=0, round_nr_slow_path=1,
rpc_call_nr=115289, tx_hashes_nr=0, rx_hashes_nr=5, duration=1.5648403 seconds,
tx_row_nr=2, rx_row_nr=0, tx_row_bytes=356, rx_row_bytes=0,
row_from_disk_bytes={{127.0.14.1, 178}, {127.0.14.2, 178}, {127.0.14.3, 0},
{127.0.14.4, 0}, {127.0.14.5, 178}, {127.0.14.6, 178}},
row_from_disk_nr={{127.0.14.1, 1}, {127.0.14.2, 1}, {127.0.14.3, 0},
{127.0.14.4, 0}, {127.0.14.5, 1}, {127.0.14.6, 1}},
row_from_disk_bytes_per_sec={{127.0.14.1, 0.00010848}, {127.0.14.2,
0.00010848}, {127.0.14.3, 0}, {127.0.14.4, 0}, {127.0.14.5, 0.00010848},
{127.0.14.6, 0.00010848}} MiB/s, row_from_disk_rows_per_sec={{127.0.14.1,
0.639043}, {127.0.14.2, 0.639043}, {127.0.14.3, 0}, {127.0.14.4, 0},
{127.0.14.5, 0.639043}, {127.0.14.6, 0.639043}} Rows/s,
tx_row_nr_peer={{127.0.14.3, 1}, {127.0.14.4, 1}}, rx_row_nr_peer={}
```
After:
```
repair - repair[d6e544ba-cb68-4465-ab91-6980bcbb46a9]: stats:
repair_reason=repair, keyspace=system_auth, tables={roles, role_attributes,
role_members}, ranges_nr=1, round_nr=4, round_nr_fast_path_already_synced=4,
round_nr_fast_path_same_combined_hashes=0, round_nr_slow_path=0,
rpc_call_nr=80, tx_hashes_nr=0, rx_hashes_nr=0, duration=0.001459798 seconds,
tx_row_nr=0, rx_row_nr=0, tx_row_bytes=0, rx_row_bytes=0,
row_from_disk_bytes={{127.0.14.1, 178}, {127.0.14.2, 178}, {127.0.14.3, 178},
{127.0.14.4, 178}, {127.0.14.5, 178}, {127.0.14.6, 178}},
row_from_disk_nr={{127.0.14.1, 1}, {127.0.14.2, 1}, {127.0.14.3, 1},
{127.0.14.4, 1}, {127.0.14.5, 1}, {127.0.14.6, 1}},
row_from_disk_bytes_per_sec={{127.0.14.1, 0.116286}, {127.0.14.2, 0.116286},
{127.0.14.3, 0.116286}, {127.0.14.4, 0.116286}, {127.0.14.5, 0.116286},
{127.0.14.6, 0.116286}} MiB/s, row_from_disk_rows_per_sec={{127.0.14.1,
685.026}, {127.0.14.2, 685.026}, {127.0.14.3, 685.026}, {127.0.14.4, 685.026},
{127.0.14.5, 685.026}, {127.0.14.6, 685.026}} Rows/s, tx_row_nr_peer={},
rx_row_nr_peer={}
```
The time to finish repair difference = 1.5648403 seconds / 0.001459798 seconds = 1072X
2)
3 DCs, each DC has 2 nodes, 6 nodes in the cluster. RF = {dc1: 2, dc2: 2, dc3: 2}
Same test as above except 5ms delay is added to simulate multiple dc
network latency:
The time to repair is reduced from 333s to 0.2s.
333.26758 s / 0.22625381s = 1472.98
3)
3 DCs, each DC has 3 nodes, 9 nodes in the cluster. RF = {dc1: 3, dc2: 3, dc3: 3}
, 10 ms network latency
Before:
```
repair - repair[86124a4a-fd26-42ea-a078-437ca9e372df]: stats:
repair_reason=repair, keyspace=system_auth, tables={role_attributes,
role_members, roles}, ranges_nr=2305, round_nr=6916,
round_nr_fast_path_already_synced=6915,
round_nr_fast_path_same_combined_hashes=0, round_nr_slow_path=1,
rpc_call_nr=276630, tx_hashes_nr=0, rx_hashes_nr=8, duration=986.34015
seconds, tx_row_nr=7, rx_row_nr=0, tx_row_bytes=1246, rx_row_bytes=0,
row_from_disk_bytes={{127.0.57.1, 178}, {127.0.57.2, 178}, {127.0.57.3,
0}, {127.0.57.4, 0}, {127.0.57.5, 0}, {127.0.57.6, 0}, {127.0.57.7, 0},
{127.0.57.8, 0}, {127.0.57.9, 0}}, row_from_disk_nr={{127.0.57.1, 1},
{127.0.57.2, 1}, {127.0.57.3, 0}, {127.0.57.4, 0}, {127.0.57.5, 0},
{127.0.57.6, 0}, {127.0.57.7, 0}, {127.0.57.8, 0}, {127.0.57.9, 0}},
row_from_disk_bytes_per_sec={{127.0.57.1, 1.72105e-07}, {127.0.57.2,
1.72105e-07}, {127.0.57.3, 0}, {127.0.57.4, 0}, {127.0.57.5, 0},
{127.0.57.6, 0}, {127.0.57.7, 0}, {127.0.57.8, 0}, {127.0.57.9, 0}}
MiB/s, row_from_disk_rows_per_sec={{127.0.57.1, 0.00101385},
{127.0.57.2, 0.00101385}, {127.0.57.3, 0}, {127.0.57.4, 0},
{127.0.57.5, 0}, {127.0.57.6, 0}, {127.0.57.7, 0}, {127.0.57.8, 0},
{127.0.57.9, 0}} Rows/s, tx_row_nr_peer={{127.0.57.3, 1},
{127.0.57.4, 1}, {127.0.57.5, 1}, {127.0.57.6, 1}, {127.0.57.7, 1},
{127.0.57.8, 1}, {127.0.57.9, 1}}, rx_row_nr_peer={}
```
After:
```
repair - repair[07ebd571-63cb-4ef6-9465-6e5f1e98f04f]: stats:
repair_reason=repair, keyspace=system_auth, tables={role_attributes,
role_members, roles}, ranges_nr=1, round_nr=4,
round_nr_fast_path_already_synced=4,
round_nr_fast_path_same_combined_hashes=0, round_nr_slow_path=0,
rpc_call_nr=128, tx_hashes_nr=0, rx_hashes_nr=0, duration=1.6052915
seconds, tx_row_nr=0, rx_row_nr=0, tx_row_bytes=0, rx_row_bytes=0,
row_from_disk_bytes={{127.0.57.1, 178}, {127.0.57.2, 178}, {127.0.57.3,
178}, {127.0.57.4, 178}, {127.0.57.5, 178}, {127.0.57.6, 178},
{127.0.57.7, 178}, {127.0.57.8, 178}, {127.0.57.9, 178}},
row_from_disk_nr={{127.0.57.1, 1}, {127.0.57.2, 1}, {127.0.57.3, 1},
{127.0.57.4, 1}, {127.0.57.5, 1}, {127.0.57.6, 1}, {127.0.57.7, 1},
{127.0.57.8, 1}, {127.0.57.9, 1}},
row_from_disk_bytes_per_sec={{127.0.57.1, 0.00037793}, {127.0.57.2,
0.00037793}, {127.0.57.3, 0.00037793}, {127.0.57.4, 0.00037793},
{127.0.57.5, 0.00037793}, {127.0.57.6, 0.00037793}, {127.0.57.7,
0.00037793}, {127.0.57.8, 0.00037793}, {127.0.57.9, 0.00037793}}
MiB/s, row_from_disk_rows_per_sec={{127.0.57.1, 2.22634},
{127.0.57.2, 2.22634}, {127.0.57.3, 2.22634}, {127.0.57.4,
2.22634}, {127.0.57.5, 2.22634}, {127.0.57.6, 2.22634},
{127.0.57.7, 2.22634}, {127.0.57.8, 2.22634}, {127.0.57.9,
2.22634}} Rows/s, tx_row_nr_peer={}, rx_row_nr_peer={}
```
The time to repair is reduced from 986s (16 minutes) to 1.6s
*) Summary
So, a more than 1000X difference is observed for this common usage of
system table repair procedure.
Fixes #16011
Refs #15159
This commit is contained in:
@@ -1122,6 +1122,14 @@
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"small_table_optimization",
|
||||
"description":"If the value is the string 'true' with any capitalization, perform small table optimization. When this option is enabled, user can send the repair request to any of the nodes in the cluster. There is no need to send repair requests to multiple nodes. All token ranges for the table will be repaired automatically.",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
@@ -317,7 +317,7 @@ void set_repair(http_context& ctx, routes& r, sharded<repair_service>& repair) {
|
||||
ss::repair_async.set(r, [&ctx, &repair](std::unique_ptr<http::request> req) {
|
||||
static std::vector<sstring> options = {"primaryRange", "parallelism", "incremental",
|
||||
"jobThreads", "ranges", "columnFamilies", "dataCenters", "hosts", "ignore_nodes", "trace",
|
||||
"startToken", "endToken", "ranges_parallelism"};
|
||||
"startToken", "endToken", "ranges_parallelism", "small_table_optimization"};
|
||||
std::unordered_map<sstring, sstring> options_map;
|
||||
for (auto o : options) {
|
||||
auto s = req->get_query_param(o);
|
||||
|
||||
@@ -213,9 +213,13 @@ static std::vector<gms::inet_address> get_neighbors(
|
||||
const sstring& ksname, query::range<dht::token> range,
|
||||
const std::vector<sstring>& data_centers,
|
||||
const std::vector<sstring>& hosts,
|
||||
const std::unordered_set<gms::inet_address>& ignore_nodes) {
|
||||
const std::unordered_set<gms::inet_address>& ignore_nodes, bool small_table_optimization = false) {
|
||||
dht::token tok = range.end() ? range.end()->value() : dht::maximum_token();
|
||||
auto ret = erm.get_natural_endpoints(tok);
|
||||
if (small_table_optimization) {
|
||||
auto normal_nodes = erm.get_token_metadata().get_all_endpoints();
|
||||
ret = inet_address_vector_replica_set(normal_nodes.begin(), normal_nodes.end());
|
||||
}
|
||||
remove_item(ret, utils::fb_utilities::get_broadcast_address());
|
||||
|
||||
if (!data_centers.empty()) {
|
||||
@@ -568,6 +572,7 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu
|
||||
const std::unordered_set<gms::inet_address>& ignore_nodes_,
|
||||
streaming::stream_reason reason_,
|
||||
bool hints_batchlog_flushed,
|
||||
bool small_table_optimization,
|
||||
std::optional<int> ranges_parallelism)
|
||||
: repair_task_impl(module, id, 0, "shard", keyspace, "", "", parent_id_.uuid(), reason_)
|
||||
, rs(repair)
|
||||
@@ -586,6 +591,7 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu
|
||||
, ignore_nodes(ignore_nodes_)
|
||||
, total_rf(erm->get_replication_factor())
|
||||
, _hints_batchlog_flushed(std::move(hints_batchlog_flushed))
|
||||
, _small_table_optimization(small_table_optimization)
|
||||
, _user_ranges_parallelism(ranges_parallelism ? std::optional<semaphore>(semaphore(*ranges_parallelism)) : std::nullopt)
|
||||
{
|
||||
rlogger.debug("repair[{}]: Setting user_ranges_parallelism to {}", global_repair_id.uuid(),
|
||||
@@ -628,7 +634,7 @@ void repair::shard_repair_task_impl::check_in_abort_or_shutdown() {
|
||||
|
||||
repair_neighbors repair::shard_repair_task_impl::get_repair_neighbors(const dht::token_range& range) {
|
||||
return neighbors.empty() ?
|
||||
repair_neighbors(get_neighbors(*erm, _status.keyspace, range, data_centers, hosts, ignore_nodes)) :
|
||||
repair_neighbors(get_neighbors(*erm, _status.keyspace, range, data_centers, hosts, ignore_nodes, _small_table_optimization)) :
|
||||
neighbors[range];
|
||||
}
|
||||
|
||||
@@ -696,7 +702,7 @@ future<> repair::shard_repair_task_impl::repair_range(const dht::token_range& ra
|
||||
co_return;
|
||||
}
|
||||
try {
|
||||
co_await repair_cf_range_row_level(*this, cf, table.id, range, neighbors);
|
||||
co_await repair_cf_range_row_level(*this, cf, table.id, range, neighbors, _small_table_optimization);
|
||||
} catch (replica::no_such_column_family&) {
|
||||
dropped_tables.insert(cf);
|
||||
} catch (...) {
|
||||
@@ -802,6 +808,8 @@ struct repair_options {
|
||||
|
||||
int ranges_parallelism = -1;
|
||||
|
||||
bool small_table_optimization = false;
|
||||
|
||||
repair_options(std::unordered_map<sstring, sstring> options) {
|
||||
bool_opt(primary_range, options, PRIMARY_RANGE_KEY);
|
||||
ranges_opt(ranges, options, RANGES_KEY);
|
||||
@@ -839,6 +847,8 @@ struct repair_options {
|
||||
|
||||
int_opt(ranges_parallelism, options, RANGES_PARALLELISM_KEY);
|
||||
|
||||
bool_opt(small_table_optimization, options, SMALL_TABLE_OPTIMIZATION_KEY);
|
||||
|
||||
// The parsing code above removed from the map options we have parsed.
|
||||
// If anything is left there in the end, it's an unsupported option.
|
||||
if (!options.empty()) {
|
||||
@@ -860,6 +870,7 @@ struct repair_options {
|
||||
static constexpr const char* START_TOKEN = "startToken";
|
||||
static constexpr const char* END_TOKEN = "endToken";
|
||||
static constexpr const char* RANGES_PARALLELISM_KEY = "ranges_parallelism";
|
||||
static constexpr const char* SMALL_TABLE_OPTIMIZATION_KEY = "small_table_optimization";
|
||||
|
||||
// Settings of "parallelism" option. Numbers must match Cassandra's
|
||||
// RepairParallelism enum, which is used by the caller.
|
||||
@@ -1174,8 +1185,15 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
|
||||
co_return id.id;
|
||||
}
|
||||
|
||||
auto small_table_optimization = options.small_table_optimization;
|
||||
if (small_table_optimization) {
|
||||
auto range = dht::token_range(dht::token_range::bound(dht::minimum_token(), false), dht::token_range::bound(dht::maximum_token(), false));
|
||||
ranges = {range};
|
||||
rlogger.info("repair[{}]: Using small table optimization for keyspace={} tables={} range={}", id.uuid(), keyspace, cfs, range);
|
||||
}
|
||||
|
||||
auto ranges_parallelism = options.ranges_parallelism == -1 ? std::nullopt : std::optional<int>(options.ranges_parallelism);
|
||||
auto task = co_await _repair_module->make_and_start_task<repair::user_requested_repair_task_impl>({}, id, std::move(keyspace), "", germs, std::move(cfs), std::move(ranges), std::move(options.hosts), std::move(options.data_centers), std::move(ignore_nodes), ranges_parallelism);
|
||||
auto task = co_await _repair_module->make_and_start_task<repair::user_requested_repair_task_impl>({}, id, std::move(keyspace), "", germs, std::move(cfs), std::move(ranges), std::move(options.hosts), std::move(options.data_centers), std::move(ignore_nodes), small_table_optimization, ranges_parallelism);
|
||||
co_return id.id;
|
||||
}
|
||||
|
||||
@@ -1204,7 +1222,13 @@ future<> repair::user_requested_repair_task_impl::run() {
|
||||
}
|
||||
|
||||
bool hints_batchlog_flushed = false;
|
||||
auto participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, data_centers, hosts, ignore_nodes).get();
|
||||
std::list<gms::inet_address> participants;
|
||||
if (_small_table_optimization) {
|
||||
auto normal_nodes = germs->get().get_token_metadata().get_all_endpoints();
|
||||
participants = std::list<gms::inet_address>(normal_nodes.begin(), normal_nodes.end());
|
||||
} else {
|
||||
participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, data_centers, hosts, ignore_nodes).get();
|
||||
}
|
||||
if (needs_flush_before_repair) {
|
||||
auto waiting_nodes = db.get_token_metadata().get_all_endpoints();
|
||||
std::erase_if(waiting_nodes, [&] (const auto& addr) {
|
||||
@@ -1283,13 +1307,14 @@ future<> repair::user_requested_repair_task_impl::run() {
|
||||
}
|
||||
|
||||
auto ranges_parallelism = _ranges_parallelism;
|
||||
bool small_table_optimization = _small_table_optimization;
|
||||
for (auto shard : boost::irange(unsigned(0), smp::count)) {
|
||||
auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed, ranges_parallelism,
|
||||
auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed, ranges_parallelism, small_table_optimization,
|
||||
data_centers, hosts, ignore_nodes, parent_data = get_repair_uniq_id().task_info, germs] (repair_service& local_repair) mutable -> future<> {
|
||||
local_repair.get_metrics().repair_total_ranges_sum += ranges.size();
|
||||
auto task = co_await local_repair._repair_module->make_and_start_task<repair::shard_repair_task_impl>(parent_data, tasks::task_id::create_random_id(), keyspace,
|
||||
local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, hints_batchlog_flushed, ranges_parallelism);
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, hints_batchlog_flushed, small_table_optimization, ranges_parallelism);
|
||||
auto release_task_resources = defer([&] () noexcept {
|
||||
task->release_resources();
|
||||
});
|
||||
@@ -1408,10 +1433,11 @@ future<> repair::data_sync_repair_task_impl::run() {
|
||||
auto hosts = std::vector<sstring>();
|
||||
auto ignore_nodes = std::unordered_set<gms::inet_address>();
|
||||
bool hints_batchlog_flushed = false;
|
||||
bool small_table_optimization = false;
|
||||
auto ranges_parallelism = std::nullopt;
|
||||
auto task_impl_ptr = seastar::make_shared<repair::shard_repair_task_impl>(local_repair._repair_module, tasks::task_id::create_random_id(), keyspace,
|
||||
local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, hints_batchlog_flushed, ranges_parallelism);
|
||||
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, hints_batchlog_flushed, small_table_optimization, ranges_parallelism);
|
||||
task_impl_ptr->neighbors = std::move(neighbors);
|
||||
auto task = co_await local_repair._repair_module->make_task(std::move(task_impl_ptr), parent_data);
|
||||
auto release_task_resources = defer([&] () noexcept {
|
||||
|
||||
@@ -662,18 +662,30 @@ future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows, sche
|
||||
co_return std::move(row_list);
|
||||
}
|
||||
|
||||
void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_writer>& writer) {
|
||||
void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_writer>& writer, locator::effective_replication_map_ptr erm, bool small_table_optimization) {
|
||||
auto cmp = position_in_partition::tri_compare(*s);
|
||||
lw_shared_ptr<mutation_fragment> last_mf;
|
||||
lw_shared_ptr<const decorated_key_with_hash> last_dk;
|
||||
bool do_small_table_optimization = erm && small_table_optimization;
|
||||
auto* strat = do_small_table_optimization ? &erm->get_replication_strategy() : nullptr;
|
||||
auto* tm = do_small_table_optimization ? &erm->get_token_metadata() : nullptr;
|
||||
auto myip = do_small_table_optimization ? utils::fb_utilities::get_broadcast_address() : gms::inet_address();
|
||||
for (auto& r : rows) {
|
||||
thread::maybe_yield();
|
||||
if (!r.dirty_on_master()) {
|
||||
continue;
|
||||
}
|
||||
const auto& dk = r.get_dk_with_hash()->dk;
|
||||
if (do_small_table_optimization) {
|
||||
// Check if the token is owned by the node
|
||||
auto eps = strat->calculate_natural_endpoints(dk.token(), *tm).get0();
|
||||
if (!eps.contains(myip)) {
|
||||
rlogger.trace("master: ignore row, token={}", dk.token());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
writer->create_writer();
|
||||
auto mf = r.get_mutation_fragment_ptr();
|
||||
const auto& dk = r.get_dk_with_hash()->dk;
|
||||
if (last_mf && last_dk &&
|
||||
cmp(last_mf->position(), mf->position()) == 0 &&
|
||||
dk.tri_compare(*s, last_dk->dk) == 0 &&
|
||||
@@ -1368,13 +1380,13 @@ private:
|
||||
}
|
||||
public:
|
||||
// Must run inside a seastar thread
|
||||
void flush_rows_in_working_row_buf() {
|
||||
void flush_rows_in_working_row_buf(locator::effective_replication_map_ptr erm, bool small_table_optimization) {
|
||||
if (_dirty_on_master) {
|
||||
_dirty_on_master = is_dirty_on_master::no;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
flush_rows(_schema, _working_row_buf, _repair_writer);
|
||||
flush_rows(_schema, _working_row_buf, _repair_writer, erm, small_table_optimization);
|
||||
}
|
||||
|
||||
private:
|
||||
@@ -1870,7 +1882,8 @@ public:
|
||||
future<> put_row_diff_with_rpc_stream(
|
||||
repair_hash_set set_diff,
|
||||
needs_all_rows_t needs_all_rows,
|
||||
gms::inet_address remote_node, unsigned node_idx) {
|
||||
gms::inet_address remote_node, unsigned node_idx,
|
||||
const locator::effective_replication_map& erm, bool small_table_optimization) {
|
||||
if (set_diff.empty()) {
|
||||
co_return;
|
||||
}
|
||||
@@ -1884,6 +1897,22 @@ public:
|
||||
rlogger.warn("Hash conflict detected, keyspace={}, table={}, range={}, row_diff.size={}, set_diff.size={}. It is recommended to compact the table and rerun repair for the range.",
|
||||
_schema->ks_name(), _schema->cf_name(), _range, row_diff.size(), sz);
|
||||
}
|
||||
if (small_table_optimization) {
|
||||
auto& strat = erm.get_replication_strategy();
|
||||
auto& tm = erm.get_token_metadata();
|
||||
std::list<repair_row> tmp;
|
||||
for (auto& row : row_diff) {
|
||||
repair_row r = std::move(row);
|
||||
const auto& dk = r.get_dk_with_hash()->dk;
|
||||
auto eps = co_await strat.calculate_natural_endpoints(dk.token(), tm);
|
||||
if (eps.contains(remote_node)) {
|
||||
tmp.push_back(std::move(r));
|
||||
} else {
|
||||
rlogger.trace("master: put : ignore row, token={}", dk.token());
|
||||
}
|
||||
}
|
||||
row_diff = std::move(tmp);
|
||||
}
|
||||
|
||||
size_t row_bytes = co_await get_repair_rows_size(row_diff);
|
||||
|
||||
@@ -2462,6 +2491,7 @@ class row_level_repair {
|
||||
table_id _table_id;
|
||||
dht::token_range _range;
|
||||
inet_address_vector_replica_set _all_live_peer_nodes;
|
||||
bool _small_table_optimization;
|
||||
replica::column_family& _cf;
|
||||
|
||||
// Repair master and followers will propose a sync boundary. Each of them
|
||||
@@ -2510,12 +2540,14 @@ public:
|
||||
sstring cf_name,
|
||||
table_id table_id,
|
||||
dht::token_range range,
|
||||
std::vector<gms::inet_address> all_live_peer_nodes)
|
||||
std::vector<gms::inet_address> all_live_peer_nodes,
|
||||
bool small_table_optimization)
|
||||
: _shard_task(shard_task)
|
||||
, _cf_name(std::move(cf_name))
|
||||
, _table_id(std::move(table_id))
|
||||
, _range(std::move(range))
|
||||
, _all_live_peer_nodes(sort_peer_nodes(all_live_peer_nodes))
|
||||
, _small_table_optimization(small_table_optimization)
|
||||
, _cf(_shard_task.db.local().find_column_family(_table_id))
|
||||
, _seed(get_random_seed())
|
||||
, _start_time(gc_clock::now()) {
|
||||
@@ -2742,7 +2774,7 @@ private:
|
||||
throw;
|
||||
}
|
||||
}
|
||||
master.flush_rows_in_working_row_buf();
|
||||
master.flush_rows_in_working_row_buf(get_erm(), _small_table_optimization);
|
||||
return op_status::next_step;
|
||||
}
|
||||
|
||||
@@ -2765,7 +2797,7 @@ private:
|
||||
auto& node = master.all_nodes()[idx].node;
|
||||
if (master.use_rpc_stream()) {
|
||||
ns.state = repair_state::put_row_diff_with_rpc_stream_started;
|
||||
return master.put_row_diff_with_rpc_stream(std::move(set_diff), needs_all_rows, _all_live_peer_nodes[idx], idx).then([&ns] {
|
||||
return master.put_row_diff_with_rpc_stream(std::move(set_diff), needs_all_rows, _all_live_peer_nodes[idx], idx, *get_erm(), _small_table_optimization).then([&ns] {
|
||||
ns.state = repair_state::put_row_diff_with_rpc_stream_finished;
|
||||
}).handle_exception([this, &node] (std::exception_ptr ep) {
|
||||
auto s = _cf.schema();
|
||||
@@ -2789,6 +2821,11 @@ private:
|
||||
master.stats().round_nr_slow_path++;
|
||||
}
|
||||
|
||||
private:
|
||||
locator::effective_replication_map_ptr get_erm() {
|
||||
return _shard_task.erm;
|
||||
}
|
||||
|
||||
private:
|
||||
// Update system.repair_history table
|
||||
future<> update_system_repair_table() {
|
||||
@@ -2964,8 +3001,8 @@ public:
|
||||
|
||||
future<> repair_cf_range_row_level(repair::shard_repair_task_impl& shard_task,
|
||||
sstring cf_name, table_id table_id, dht::token_range range,
|
||||
const std::vector<gms::inet_address>& all_peer_nodes) {
|
||||
auto repair = row_level_repair(shard_task, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes);
|
||||
const std::vector<gms::inet_address>& all_peer_nodes, bool small_table_optimization) {
|
||||
auto repair = row_level_repair(shard_task, std::move(cf_name), std::move(table_id), std::move(range), all_peer_nodes, small_table_optimization);
|
||||
co_return co_await repair.run();
|
||||
}
|
||||
|
||||
|
||||
@@ -244,8 +244,8 @@ class repair_writer;
|
||||
|
||||
future<> repair_cf_range_row_level(repair::shard_repair_task_impl& shard_task,
|
||||
sstring cf_name, table_id table_id, dht::token_range range,
|
||||
const std::vector<gms::inet_address>& all_peer_nodes);
|
||||
const std::vector<gms::inet_address>& all_peer_nodes, bool small_table_optimization);
|
||||
future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows,
|
||||
schema_ptr s, uint64_t seed, repair_master is_master,
|
||||
reader_permit permit, repair_hasher hasher);
|
||||
void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_writer>& writer);
|
||||
void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_writer>& writer, locator::effective_replication_map_ptr erm = {}, bool small_table_optimization = false);
|
||||
|
||||
@@ -46,9 +46,10 @@ private:
|
||||
std::vector<sstring> _hosts;
|
||||
std::vector<sstring> _data_centers;
|
||||
std::unordered_set<gms::inet_address> _ignore_nodes;
|
||||
bool _small_table_optimization;
|
||||
std::optional<int> _ranges_parallelism;
|
||||
public:
|
||||
user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr<locator::global_vnode_effective_replication_map> germs, std::vector<sstring> cfs, dht::token_range_vector ranges, std::vector<sstring> hosts, std::vector<sstring> data_centers, std::unordered_set<gms::inet_address> ignore_nodes, std::optional<int> ranges_parallelism) noexcept
|
||||
user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr<locator::global_vnode_effective_replication_map> germs, std::vector<sstring> cfs, dht::token_range_vector ranges, std::vector<sstring> hosts, std::vector<sstring> data_centers, std::unordered_set<gms::inet_address> ignore_nodes, bool small_table_optimization, std::optional<int> ranges_parallelism) noexcept
|
||||
: repair_task_impl(module, id.uuid(), id.id, "keyspace", std::move(keyspace), "", std::move(entity), tasks::task_id::create_null_id(), streaming::stream_reason::repair)
|
||||
, _germs(germs)
|
||||
, _cfs(std::move(cfs))
|
||||
@@ -56,6 +57,7 @@ public:
|
||||
, _hosts(std::move(hosts))
|
||||
, _data_centers(std::move(data_centers))
|
||||
, _ignore_nodes(std::move(ignore_nodes))
|
||||
, _small_table_optimization(small_table_optimization)
|
||||
, _ranges_parallelism(ranges_parallelism)
|
||||
{}
|
||||
|
||||
@@ -123,6 +125,7 @@ public:
|
||||
std::unordered_set<sstring> dropped_tables;
|
||||
bool _hints_batchlog_flushed = false;
|
||||
std::unordered_set<gms::inet_address> nodes_down;
|
||||
bool _small_table_optimization = false;
|
||||
private:
|
||||
bool _aborted = false;
|
||||
std::optional<sstring> _failed_because;
|
||||
@@ -142,6 +145,7 @@ public:
|
||||
const std::unordered_set<gms::inet_address>& ignore_nodes_,
|
||||
streaming::stream_reason reason_,
|
||||
bool hints_batchlog_flushed,
|
||||
bool small_table_optimization,
|
||||
std::optional<int> ranges_parallelism);
|
||||
void check_failed_ranges();
|
||||
void check_in_abort_or_shutdown();
|
||||
|
||||
Reference in New Issue
Block a user