|
|
|
|
@@ -189,7 +189,8 @@ void remove_item(Collection& c, T& item) {
|
|
|
|
|
static std::vector<gms::inet_address> get_neighbors(database& db,
|
|
|
|
|
const sstring& ksname, query::range<dht::token> range,
|
|
|
|
|
const std::vector<sstring>& data_centers,
|
|
|
|
|
const std::vector<sstring>& hosts) {
|
|
|
|
|
const std::vector<sstring>& hosts,
|
|
|
|
|
const std::unordered_set<gms::inet_address>& ignore_nodes) {
|
|
|
|
|
|
|
|
|
|
keyspace& ks = db.find_keyspace(ksname);
|
|
|
|
|
auto& rs = ks.get_replication_strategy();
|
|
|
|
|
@@ -271,6 +272,11 @@ static std::vector<gms::inet_address> get_neighbors(database& db,
|
|
|
|
|
"part of the supplied list of hosts to use during the "
|
|
|
|
|
"repair (%s).", me, others, hosts));
|
|
|
|
|
}
|
|
|
|
|
} else if (!ignore_nodes.empty()) {
|
|
|
|
|
auto it = std::remove_if(ret.begin(), ret.end(), [&ignore_nodes] (const gms::inet_address& node) {
|
|
|
|
|
return ignore_nodes.contains(node);
|
|
|
|
|
});
|
|
|
|
|
ret.erase(it, ret.end());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
|
@@ -811,6 +817,7 @@ repair_info::repair_info(seastar::sharded<database>& db_,
|
|
|
|
|
repair_uniq_id id_,
|
|
|
|
|
const std::vector<sstring>& data_centers_,
|
|
|
|
|
const std::vector<sstring>& hosts_,
|
|
|
|
|
const std::unordered_set<gms::inet_address>& ignore_nodes_,
|
|
|
|
|
streaming::stream_reason reason_,
|
|
|
|
|
std::optional<utils::UUID> ops_uuid)
|
|
|
|
|
: db(db_)
|
|
|
|
|
@@ -824,6 +831,7 @@ repair_info::repair_info(seastar::sharded<database>& db_,
|
|
|
|
|
, shard(this_shard_id())
|
|
|
|
|
, data_centers(data_centers_)
|
|
|
|
|
, hosts(hosts_)
|
|
|
|
|
, ignore_nodes(ignore_nodes_)
|
|
|
|
|
, reason(reason_)
|
|
|
|
|
, nr_ranges_total(ranges.size())
|
|
|
|
|
, _row_level_repair(db.local().features().cluster_supports_row_level_repair())
|
|
|
|
|
@@ -930,7 +938,7 @@ void repair_info::check_in_abort() {
|
|
|
|
|
|
|
|
|
|
repair_neighbors repair_info::get_repair_neighbors(const dht::token_range& range) {
|
|
|
|
|
return neighbors.empty() ?
|
|
|
|
|
repair_neighbors(get_neighbors(db.local(), keyspace, range, data_centers, hosts)) :
|
|
|
|
|
repair_neighbors(get_neighbors(db.local(), keyspace, range, data_centers, hosts, ignore_nodes)) :
|
|
|
|
|
neighbors[range];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1311,6 +1319,10 @@ struct repair_options {
|
|
|
|
|
// range repaired, only the relevant subset of the hosts (holding a
|
|
|
|
|
// replica of this range) is used.
|
|
|
|
|
std::vector<sstring> hosts;
|
|
|
|
|
// The ignore_nodes specifies the list of nodes to ignore in this repair,
|
|
|
|
|
// e.g., the user knows a node is down and wants to run repair without this
|
|
|
|
|
// specific node.
|
|
|
|
|
std::vector<sstring> ignore_nodes;
|
|
|
|
|
// data_centers is used to restrict the repair to the local data center.
|
|
|
|
|
// The node starting the repair must be in the data center; Issuing a
|
|
|
|
|
// repair to a data center other than the named one returns an error.
|
|
|
|
|
@@ -1321,6 +1333,7 @@ struct repair_options {
|
|
|
|
|
ranges_opt(ranges, options, RANGES_KEY);
|
|
|
|
|
list_opt(column_families, options, COLUMNFAMILIES_KEY);
|
|
|
|
|
list_opt(hosts, options, HOSTS_KEY);
|
|
|
|
|
list_opt(ignore_nodes, options, IGNORE_NODES_KEY);
|
|
|
|
|
list_opt(data_centers, options, DATACENTERS_KEY);
|
|
|
|
|
// We currently do not support incremental repair. We could probably
|
|
|
|
|
// ignore this option as it is just an optimization, but for now,
|
|
|
|
|
@@ -1366,6 +1379,7 @@ struct repair_options {
|
|
|
|
|
static constexpr const char* COLUMNFAMILIES_KEY = "columnFamilies";
|
|
|
|
|
static constexpr const char* DATACENTERS_KEY = "dataCenters";
|
|
|
|
|
static constexpr const char* HOSTS_KEY = "hosts";
|
|
|
|
|
static constexpr const char* IGNORE_NODES_KEY = "ignore_nodes";
|
|
|
|
|
static constexpr const char* TRACE_KEY = "trace";
|
|
|
|
|
static constexpr const char* START_TOKEN = "startToken";
|
|
|
|
|
static constexpr const char* END_TOKEN = "endToken";
|
|
|
|
|
@@ -1600,6 +1614,20 @@ static int do_repair_start(seastar::sharded<database>& db, seastar::sharded<netw
|
|
|
|
|
throw std::runtime_error("Cannot combine data centers and hosts options.");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!options.ignore_nodes.empty() && !options.hosts.empty()) {
|
|
|
|
|
throw std::runtime_error("Cannot combine ignore_nodes and hosts options.");
|
|
|
|
|
}
|
|
|
|
|
std::unordered_set<gms::inet_address> ignore_nodes;
|
|
|
|
|
for (const auto& n: options.ignore_nodes) {
|
|
|
|
|
try {
|
|
|
|
|
auto node = gms::inet_address(n);
|
|
|
|
|
ignore_nodes.insert(node);
|
|
|
|
|
} catch(...) {
|
|
|
|
|
throw std::runtime_error(format("Failed to parse node={} in ignore_nodes={} specified by user: {}",
|
|
|
|
|
n, options.ignore_nodes, std::current_exception()));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!options.start_token.empty() || !options.end_token.empty()) {
|
|
|
|
|
// Intersect the list of local ranges with the given token range,
|
|
|
|
|
// dropping ranges with no intersection.
|
|
|
|
|
@@ -1636,17 +1664,17 @@ static int do_repair_start(seastar::sharded<database>& db, seastar::sharded<netw
|
|
|
|
|
|
|
|
|
|
// Do it in the background.
|
|
|
|
|
(void)repair_tracker().run(id, [&db, &ms, id, keyspace = std::move(keyspace),
|
|
|
|
|
cfs = std::move(cfs), ranges = std::move(ranges), options = std::move(options)] () mutable {
|
|
|
|
|
cfs = std::move(cfs), ranges = std::move(ranges), options = std::move(options), ignore_nodes = std::move(ignore_nodes)] () mutable {
|
|
|
|
|
std::vector<future<>> repair_results;
|
|
|
|
|
repair_results.reserve(smp::count);
|
|
|
|
|
auto table_ids = get_table_ids(db.local(), keyspace, cfs);
|
|
|
|
|
for (auto shard : boost::irange(unsigned(0), smp::count)) {
|
|
|
|
|
auto f = db.invoke_on(shard, [&db, &ms, keyspace, table_ids, id, ranges,
|
|
|
|
|
data_centers = options.data_centers, hosts = options.hosts] (database& localdb) mutable {
|
|
|
|
|
data_centers = options.data_centers, hosts = options.hosts, ignore_nodes] (database& localdb) mutable {
|
|
|
|
|
_node_ops_metrics.repair_total_ranges_sum += ranges.size();
|
|
|
|
|
auto ri = make_lw_shared<repair_info>(db, ms,
|
|
|
|
|
std::move(keyspace), std::move(ranges), std::move(table_ids),
|
|
|
|
|
id, std::move(data_centers), std::move(hosts), streaming::stream_reason::repair, id.uuid);
|
|
|
|
|
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, id.uuid);
|
|
|
|
|
return repair_ranges(ri);
|
|
|
|
|
});
|
|
|
|
|
repair_results.push_back(std::move(f));
|
|
|
|
|
@@ -1737,9 +1765,10 @@ static future<> sync_data_using_repair(seastar::sharded<database>& db,
|
|
|
|
|
auto f = db.invoke_on(shard, [&db, &ms, keyspace, table_ids, id, ranges, neighbors, reason, ops_uuid] (database& localdb) mutable {
|
|
|
|
|
auto data_centers = std::vector<sstring>();
|
|
|
|
|
auto hosts = std::vector<sstring>();
|
|
|
|
|
auto ignore_nodes = std::unordered_set<gms::inet_address>();
|
|
|
|
|
auto ri = make_lw_shared<repair_info>(db, ms,
|
|
|
|
|
std::move(keyspace), std::move(ranges), std::move(table_ids),
|
|
|
|
|
id, std::move(data_centers), std::move(hosts), reason, ops_uuid);
|
|
|
|
|
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, ops_uuid);
|
|
|
|
|
ri->neighbors = std::move(neighbors);
|
|
|
|
|
return repair_ranges(ri);
|
|
|
|
|
});
|
|
|
|
|
|