tablets: switch to token_metadata2

locator_topology_test, network_topology_strategy_test and
tablets_test are fully switched to the host_id-based token_metadata,
meaning they no longer populate the old token_metadata.

All the boost and topology tests pass with this change.
This commit is contained in:
Petr Gusev
2023-11-02 13:15:32 +04:00
parent f5038f6c72
commit d9283bd025
13 changed files with 225 additions and 240 deletions

View File

@@ -221,7 +221,7 @@ insert_token_range_to_sorted_container_while_unwrapping(
dht::token_range_vector
vnode_effective_replication_map::do_get_ranges(noncopyable_function<stop_iteration(bool&, const inet_address&)> consider_range_for_endpoint) const {
dht::token_range_vector ret;
const auto& tm = *_tmptr;
const auto& tm = *_tmptr->get_new();
const auto& sorted_tokens = tm.sorted_tokens();
if (sorted_tokens.empty()) {
on_internal_error(rslogger, "Token metadata is empty");
@@ -305,7 +305,7 @@ vnode_effective_replication_map::get_primary_ranges(inet_address ep) const {
dht::token_range_vector
vnode_effective_replication_map::get_primary_ranges_within_dc(inet_address ep) const {
const topology& topo = _tmptr->get_topology();
const topology& topo = _tmptr->get_new()->get_topology();
sstring local_dc = topo.get_datacenter(ep);
std::unordered_set<inet_address> local_dc_nodes = topo.get_datacenter_endpoints().at(local_dc);
// The callback function below is called for each endpoint

View File

@@ -56,7 +56,7 @@ class load_sketch {
}
};
std::unordered_map<host_id, node_load> _nodes;
token_metadata_ptr _tm;
token_metadata2_ptr _tm;
private:
tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const {
// We reflect migrations in the load as if they already happened,
@@ -65,7 +65,7 @@ private:
}
public:
load_sketch(token_metadata_ptr tm)
load_sketch(token_metadata2_ptr tm)
: _tm(std::move(tm)) {
}

View File

@@ -300,23 +300,23 @@ future<tablet_map> network_topology_strategy::allocate_tablets_for_new_table(sch
}
tablet_map tablets(tablet_count);
load_sketch load(tm);
load_sketch load(tm->get_new_strong());
co_await load.populate();
// FIXME: Don't use tokens to distribute nodes.
// The following reuses the existing token-based algorithm used by NetworkTopologyStrategy.
assert(!tm->sorted_tokens().empty());
auto token_range = tm->ring_range(dht::token::get_random_token());
assert(!tm->get_new()->sorted_tokens().empty());
auto token_range = tm->get_new()->ring_range(dht::token::get_random_token());
for (tablet_id tb : tablets.tablet_ids()) {
natural_endpoints_tracker<gms::inet_address> tracker(*tm, _dc_rep_factor);
natural_endpoints_tracker<locator::host_id> tracker(*tm->get_new(), _dc_rep_factor);
while (true) {
co_await coroutine::maybe_yield();
if (token_range.begin() == token_range.end()) {
token_range = tm->ring_range(dht::minimum_token());
token_range = tm->get_new()->ring_range(dht::minimum_token());
}
inet_address ep = *tm->get_endpoint(*token_range.begin());
locator::host_id ep = *tm->get_new()->get_endpoint(*token_range.begin());
token_range.drop_front();
if (tracker.add_endpoint_and_check_if_done(ep)) {
break;
@@ -325,8 +325,7 @@ future<tablet_map> network_topology_strategy::allocate_tablets_for_new_table(sch
tablet_replica_set replicas;
for (auto&& ep : tracker.replicas()) {
auto host = tm->get_host_id(ep);
replicas.emplace_back(tablet_replica{host, load.next_shard(host)});
replicas.emplace_back(tablet_replica{ep, load.next_shard(ep)});
}
tablets.set_tablet(tb, tablet_info{std::move(replicas)});

View File

@@ -52,7 +52,7 @@ public:
/// Returns tablet_map for the table of the tablet associated with this guard.
/// The result is valid until the next deferring point.
const locator::tablet_map& get_tablet_map() {
return get_token_metadata()->tablets().get_tablet_map(_tablet.table);
return get_token_metadata()->get_new()->tablets().get_tablet_map(_tablet.table);
}
};

View File

@@ -17,7 +17,7 @@ namespace locator {
/// Implements sharder object which reflects assignment of tablets of a given table to local shards.
/// Token ranges which don't have local tablets are reported to belong to shard 0.
class tablet_sharder : public dht::sharder {
const token_metadata& _tm;
const token_metadata2& _tm;
table_id _table;
mutable const tablet_map* _tmap = nullptr;
private:
@@ -29,7 +29,7 @@ private:
}
}
public:
tablet_sharder(const token_metadata& tm, table_id table)
tablet_sharder(const token_metadata2& tm, table_id table)
: _tm(tm)
, _table(table)
{ }

View File

@@ -115,7 +115,7 @@ const tablet_map& tablet_metadata::get_tablet_map(table_id id) const {
try {
return _tablets.at(id);
} catch (const std::out_of_range&) {
throw std::runtime_error(format("Tablet map not found for table {}", id));
throw_with_backtrace<std::runtime_error>(format("Tablet map not found for table {}", id));
}
}
@@ -338,12 +338,12 @@ private:
inet_address_vector_replica_set result;
result.reserve(replicas.size());
for (auto&& replica : replicas) {
result.emplace_back(_tmptr->get_endpoint_for_host_id(replica.host));
result.emplace_back(_tmptr->get_new()->get_endpoint_for_host_id(replica.host));
}
return result;
}
const tablet_map& get_tablet_map() const {
return _tmptr->tablets().get_tablet_map(_table);
return _tmptr->get_new()->tablets().get_tablet_map(_table);
}
public:
tablet_effective_replication_map(table_id table,
@@ -352,7 +352,7 @@ public:
size_t replication_factor)
: effective_replication_map(std::move(rs), std::move(tmptr), replication_factor)
, _table(table)
, _sharder(*_tmptr, table)
, _sharder(*_tmptr->get_new(), table)
{ }
virtual ~tablet_effective_replication_map() = default;
@@ -399,7 +399,7 @@ public:
case write_replica_set_selector::both:
tablet_logger.trace("get_pending_endpoints({}): table={}, tablet={}, replica={}",
search_token, _table, tablet, info->pending_replica);
return {_tmptr->get_endpoint_for_host_id(info->pending_replica.host)};
return {_tmptr->get_new()->get_endpoint_for_host_id(info->pending_replica.host)};
case write_replica_set_selector::next:
return {};
}
@@ -466,7 +466,7 @@ public:
}
virtual bool has_pending_ranges(inet_address endpoint) const override {
const auto host_id = _tmptr->get_host_id_if_known(endpoint);
const auto host_id = _tmptr->get_new()->get_host_id_if_known(endpoint);
if (!host_id.has_value()) {
return false;
}
@@ -480,11 +480,11 @@ public:
virtual std::unique_ptr<token_range_splitter> make_splitter() const override {
class splitter : public token_range_splitter {
token_metadata_ptr _tmptr; // To keep the tablet map alive.
token_metadata2_ptr _tmptr; // To keep the tablet map alive.
const tablet_map& _tmap;
std::optional<tablet_id> _next;
public:
splitter(token_metadata_ptr tmptr, const tablet_map& tmap)
splitter(token_metadata2_ptr tmptr, const tablet_map& tmap)
: _tmptr(std::move(tmptr))
, _tmap(tmap)
{ }
@@ -502,7 +502,7 @@ public:
return t;
}
};
return std::make_unique<splitter>(_tmptr, get_tablet_map());
return std::make_unique<splitter>(_tmptr->get_new_strong(), get_tablet_map());
}
const dht::sharder& get_sharder(const schema& s) const override {
@@ -554,7 +554,7 @@ effective_replication_map_ptr tablet_aware_replication_strategy::do_make_replica
void tablet_metadata_guard::check() noexcept {
auto erm = _table->get_effective_replication_map();
auto& tmap = erm->get_token_metadata_ptr()->tablets().get_tablet_map(_tablet.table);
auto& tmap = erm->get_token_metadata_ptr()->get_new()->tablets().get_tablet_map(_tablet.table);
auto* trinfo = tmap.get_tablet_transition_info(_tablet.tablet);
if (bool(_stage) != bool(trinfo) || (_stage && _stage != trinfo->stage)) {
_abort_source.request_abort();

View File

@@ -569,7 +569,7 @@ private:
const locator::tablet_map& tablet_map() const {
// FIXME: cheaper way to retrieve tablet_map than looking up every time in tablet_metadata's map.
auto& tm = erm()->get_token_metadata();
return tm.tablets().get_tablet_map(schema()->id());
return tm.get_new()->tablets().get_tablet_map(schema()->id());
}
public:
tablet_compaction_group_manager(replica::table& t) : _t(t) {}

View File

@@ -1608,7 +1608,7 @@ class topology_coordinator {
schema_ptr,
locator::global_tablet_id,
const locator::tablet_transition_info&)> func) {
auto tm = get_token_metadata_ptr();
auto tm = get_token_metadata_ptr()->get_new();
for (auto&& [table, tmap] : tm->tablets().all_tables()) {
co_await coroutine::maybe_yield();
auto s = _db.find_schema(table);
@@ -1622,7 +1622,7 @@ class topology_coordinator {
void generate_migration_update(std::vector<canonical_mutation>& out, const group0_guard& guard, const tablet_migration_info& mig) {
auto s = _db.find_schema(mig.tablet.table);
auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(mig.tablet.table);
auto& tmap = get_token_metadata_ptr()->get_new()->tablets().get_tablet_map(mig.tablet.table);
auto last_token = tmap.get_last_token(mig.tablet.tablet);
if (tmap.get_tablet_transition_info(mig.tablet.tablet)) {
slogger.warn("Tablet already in transition, ignoring migration: {}", mig);
@@ -1781,7 +1781,7 @@ class topology_coordinator {
}
}
if (!preempt) {
auto plan = co_await _tablet_allocator.balance_tablets(get_token_metadata_ptr());
auto plan = co_await _tablet_allocator.balance_tablets(get_token_metadata_ptr()->get_new_strong());
if (!drain || plan.has_nodes_to_drain()) {
co_await generate_migration_updates(updates, guard, plan);
}
@@ -2562,7 +2562,7 @@ future<bool> topology_coordinator::maybe_start_tablet_migration(group0_guard gua
slogger.debug("raft topology: Evaluating tablet balance");
auto tm = get_token_metadata_ptr();
auto plan = co_await _tablet_allocator.balance_tablets(tm);
auto plan = co_await _tablet_allocator.balance_tablets(tm->get_new_strong());
if (plan.empty()) {
slogger.debug("raft topology: Tablets are balanced");
co_return false;

View File

@@ -199,7 +199,7 @@ class load_balancer {
std::optional<locator::load_sketch> target_load_sketch;
future<load_sketch&> get_load_sketch(const token_metadata_ptr& tm) {
future<load_sketch&> get_load_sketch(const token_metadata2_ptr& tm) {
if (!target_load_sketch) {
target_load_sketch.emplace(tm);
co_await target_load_sketch->populate(id);
@@ -255,7 +255,7 @@ class load_balancer {
const size_t max_write_streaming_load = 2;
const size_t max_read_streaming_load = 4;
token_metadata_ptr _tm;
token_metadata2_ptr _tm;
load_balancer_stats_manager& _stats;
private:
tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const {
@@ -290,7 +290,7 @@ private:
}
public:
load_balancer(token_metadata_ptr tm, load_balancer_stats_manager& stats)
load_balancer(token_metadata2_ptr tm, load_balancer_stats_manager& stats)
: _tm(std::move(tm))
, _stats(stats)
{ }
@@ -819,7 +819,7 @@ public:
_stopped = true;
}
future<migration_plan> balance_tablets(token_metadata_ptr tm) {
future<migration_plan> balance_tablets(token_metadata2_ptr tm) {
load_balancer lb(tm, _load_balancer_stats);
co_return co_await lb.make_plan();
}
@@ -869,7 +869,7 @@ future<> tablet_allocator::stop() {
return impl().stop();
}
future<migration_plan> tablet_allocator::balance_tablets(locator::token_metadata_ptr tm) {
future<migration_plan> tablet_allocator::balance_tablets(locator::token_metadata2_ptr tm) {
return impl().balance_tablets(tm);
}

View File

@@ -90,7 +90,7 @@ public:
///
/// The algorithm takes care of limiting the streaming load on the system, also by taking active migrations into account.
///
future<migration_plan> balance_tablets(locator::token_metadata_ptr);
future<migration_plan> balance_tablets(locator::token_metadata2_ptr);
/// Should be called when the node is no longer a leader.
void on_leadership_lost();

View File

@@ -263,23 +263,24 @@ SEASTAR_THREAD_TEST_CASE(test_load_sketch) {
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{
topology::config{
.this_endpoint = ip1,
.this_host_id = host1
}
});
stm.mutate_token_metadata([&] (token_metadata& tm) {
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_host_id(host3, ip3);
tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, node1_shard_count);
tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, node2_shard_count);
tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, node3_shard_count);
tm.get_new()->update_host_id(host1, ip1);
tm.get_new()->update_host_id(host2, ip2);
tm.get_new()->update_host_id(host3, ip3);
tm.get_new()->update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, node1_shard_count);
tm.get_new()->update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, node2_shard_count);
tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, std::nullopt, node3_shard_count);
return make_ready_future<>();
}).get();
// Check that allocation is even when starting from empty state
{
auto tm = stm.get();
load_sketch load(tm);
load_sketch load(tm->get_new_strong());
load.populate().get();
std::vector<unsigned> node1_shards(node1_shard_count, 0);
@@ -341,13 +342,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_sketch) {
auto table = table_id(utils::make_random_uuid());
tab_meta.set_tablet_map(table, tmap);
tm.set_tablets(std::move(tab_meta));
tm.get_new()->set_tablets(std::move(tab_meta));
return make_ready_future<>();
}).get();
{
auto tm = stm.get();
load_sketch load(tm);
load_sketch load(tm->get_new_strong());
load.populate().get();
// host3 has max shard load of 3 and 3 shards, and 4 tablets allocated.

View File

@@ -72,7 +72,7 @@ static void check_ranges_are_sorted(vnode_effective_replication_map_ptr erm, gms
void strategy_sanity_check(
replication_strategy_ptr ars_ptr,
const token_metadata& tm,
const token_metadata2_ptr& tm,
const std::map<sstring, sstring>& options) {
const network_topology_strategy* nts_ptr =
@@ -90,16 +90,16 @@ void strategy_sanity_check(
total_rf += rf;
}
BOOST_CHECK(ars_ptr->get_replication_factor(tm) == total_rf);
BOOST_CHECK(ars_ptr->get_replication_factor(token_metadata(tm)) == total_rf);
}
void endpoints_check(
replication_strategy_ptr ars_ptr,
const token_metadata& tm,
const token_metadata2_ptr& tm,
const inet_address_vector_replica_set& endpoints,
const locator::topology& topo) {
auto&& nodes_per_dc = tm.get_topology().get_datacenter_endpoints();
auto&& nodes_per_dc = tm->get_topology().get_datacenter_endpoints();
const network_topology_strategy* nts_ptr =
dynamic_cast<const network_topology_strategy*>(ars_ptr.get());
@@ -111,7 +111,7 @@ void endpoints_check(
// Check the total RF
BOOST_CHECK(endpoints.size() == total_rf);
BOOST_CHECK(total_rf <= ars_ptr->get_replication_factor(tm));
BOOST_CHECK(total_rf <= ars_ptr->get_replication_factor(token_metadata(tm)));
// Check the uniqueness
std::unordered_set<inet_address> ep_set(endpoints.begin(), endpoints.end());
@@ -156,19 +156,19 @@ auto d2t = [](double d) -> int64_t {
void full_ring_check(const std::vector<ring_point>& ring_points,
const std::map<sstring, sstring>& options,
replication_strategy_ptr ars_ptr,
locator::token_metadata_ptr tmptr) {
locator::token_metadata2_ptr tmptr) {
auto& tm = *tmptr;
const auto& topo = tm.get_topology();
strategy_sanity_check(ars_ptr, tm, options);
strategy_sanity_check(ars_ptr, tmptr, options);
auto erm = calculate_effective_replication_map(ars_ptr, tmptr).get0();
auto erm = calculate_effective_replication_map(ars_ptr, make_token_metadata_ptr(tmptr)).get0();
for (auto& rp : ring_points) {
double cur_point1 = rp.point - 0.5;
token t1(dht::token::kind::key, d2t(cur_point1 / ring_points.size()));
auto endpoints1 = erm->get_natural_endpoints(t1);
endpoints_check(ars_ptr, tm, endpoints1, topo);
endpoints_check(ars_ptr, tmptr, endpoints1, topo);
print_natural_endpoints(cur_point1, endpoints1);
@@ -181,7 +181,7 @@ void full_ring_check(const std::vector<ring_point>& ring_points,
token t2(dht::token::kind::key, d2t(cur_point2 / ring_points.size()));
auto endpoints2 = erm->get_natural_endpoints(t2);
endpoints_check(ars_ptr, tm, endpoints2, topo);
endpoints_check(ars_ptr, tmptr, endpoints2, topo);
check_ranges_are_sorted(erm, rp.host);
BOOST_CHECK(endpoints1 == endpoints2);
}
@@ -190,7 +190,7 @@ void full_ring_check(const std::vector<ring_point>& ring_points,
void full_ring_check(const tablet_map& tmap,
const std::map<sstring, sstring>& options,
replication_strategy_ptr rs_ptr,
locator::token_metadata_ptr tmptr) {
locator::token_metadata2_ptr tmptr) {
auto& tm = *tmptr;
const auto& topo = tm.get_topology();
@@ -204,7 +204,7 @@ void full_ring_check(const tablet_map& tmap,
};
for (tablet_id tb : tmap.tablet_ids()) {
endpoints_check(rs_ptr, tm, to_endpoint_set(tmap.get_tablet_info(tb).replicas), topo);
endpoints_check(rs_ptr, tmptr, to_endpoint_set(tmap.get_tablet_info(tb).replicas), topo);
}
}
@@ -251,21 +251,13 @@ void simple_test() {
// Initialize the token_metadata
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto update_tm = [&]<typename NodeId>(generic_token_metadata<NodeId>& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
std::unordered_set<token> tokens;
tokens.insert({dht::token::kind::key, d2t(ring_point / ring_points.size())});
topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal);
if constexpr(std::is_same_v<NodeId, gms::inet_address>) {
co_await tm.update_normal_tokens(std::move(tokens), endpoint);
} else {
co_await tm.update_normal_tokens(std::move(tokens), id);
}
}
};
co_await update_tm(tm);
co_await update_tm(*tm.get_new());
auto& topo = tm.get_new()->get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
std::unordered_set<token> tokens;
tokens.insert({dht::token::kind::key, d2t(ring_point / ring_points.size())});
topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal);
co_await tm.get_new()->update_normal_tokens(std::move(tokens), id);
}
}).get();
/////////////////////////////////////
@@ -279,7 +271,7 @@ void simple_test() {
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", options323);
full_ring_check(ring_points, options323, ars_ptr, stm.get());
full_ring_check(ring_points, options323, ars_ptr, stm.get()->get_new_strong());
///////////////
// Create the replication strategy
@@ -292,7 +284,7 @@ void simple_test() {
ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", options320);
full_ring_check(ring_points, options320, ars_ptr, stm.get());
full_ring_check(ring_points, options320, ars_ptr, stm.get()->get_new_strong());
//
// Check cache invalidation: invalidate the cache and run a full ring
@@ -301,11 +293,10 @@ void simple_test() {
// corresponding check will fail.
//
stm.mutate_token_metadata([] (token_metadata& tm) {
tm.invalidate_cached_rings();
tm.get_new()->invalidate_cached_rings();
return make_ready_future<>();
}).get();
full_ring_check(ring_points, options320, ars_ptr, stm.get());
full_ring_check(ring_points, options320, ars_ptr, stm.get()->get_new_strong());
}
// Run in a seastar thread.
@@ -367,25 +358,17 @@ void heavy_origin_test() {
}
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto update_tm = [&]<typename NodeId>(generic_token_metadata<NodeId>& tm) -> future<> {
auto& topo = tm.get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal);
if constexpr (std::is_same_v<NodeId, gms::inet_address>) {
co_await tm.update_normal_tokens(tokens[endpoint], endpoint);
} else {
co_await tm.update_normal_tokens(tokens[endpoint], id);
}
}
};
co_await update_tm(tm);
co_await update_tm(*tm.get_new());
auto& topo = tm.get_new()->get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal);
co_await tm.get_new()->update_normal_tokens(tokens[endpoint], id);
}
}).get();
auto ars_ptr = abstract_replication_strategy::create_replication_strategy(
"NetworkTopologyStrategy", config_options);
full_ring_check(ring_points, config_options, ars_ptr, stm.get());
full_ring_check(ring_points, config_options, ars_ptr, stm.get()->get_new_strong());
}
@@ -431,13 +414,13 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) {
// Initialize the token_metadata
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
auto& topo = tm.get_topology();
auto& topo = tm.get_new()->get_topology();
for (const auto& [ring_point, endpoint, id] : ring_points) {
std::unordered_set<token> tokens;
tokens.insert({dht::token::kind::key, d2t(ring_point / ring_points.size())});
topo.add_node(id, endpoint, make_endpoint_dc_rack(endpoint), locator::node::state::normal, 1);
tm.update_host_id(id, endpoint);
co_await tm.update_normal_tokens(std::move(tokens), endpoint);
tm.get_new()->update_host_id(id, endpoint);
co_await tm.get_new()->update_normal_tokens(std::move(tokens), id);
}
}).get();
@@ -462,7 +445,7 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) {
.build();
auto tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get()).get0();
full_ring_check(tmap, options323, ars_ptr, stm.get());
full_ring_check(tmap, options323, ars_ptr, stm.get()->get_new_strong());
///////////////
// Create the replication strategy
@@ -479,7 +462,7 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) {
BOOST_REQUIRE(tab_awr_ptr);
tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get()).get0();
full_ring_check(tmap, options320, ars_ptr, stm.get());
full_ring_check(tmap, options320, ars_ptr, stm.get()->get_new_strong());
// Test the case of not enough nodes to meet RF in DC 102
std::map<sstring, sstring> options324 = {
@@ -495,7 +478,7 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) {
BOOST_REQUIRE(tab_awr_ptr);
tmap = tab_awr_ptr->allocate_tablets_for_new_table(s, stm.get()).get0();
full_ring_check(tmap, options324, ars_ptr, stm.get());
full_ring_check(tmap, options324, ars_ptr, stm.get()->get_new_strong());
}
/**
@@ -508,7 +491,7 @@ static size_t get_replication_factor(const sstring& dc,
}
static bool has_sufficient_replicas(const sstring& dc,
const std::unordered_map<sstring, std::unordered_set<inet_address>>& dc_replicas,
const std::unordered_map<sstring, std::unordered_set<host_id>>& dc_replicas,
const std::unordered_map<sstring, std::unordered_set<inet_address>>& all_endpoints,
const std::unordered_map<sstring, size_t>& datacenters) noexcept {
auto dc_replicas_it = dc_replicas.find(dc);
@@ -526,7 +509,7 @@ static bool has_sufficient_replicas(const sstring& dc,
}
static bool has_sufficient_replicas(
const std::unordered_map<sstring, std::unordered_set<inet_address>>& dc_replicas,
const std::unordered_map<sstring, std::unordered_set<host_id>>& dc_replicas,
const std::unordered_map<sstring, std::unordered_set<inet_address>>& all_endpoints,
const std::unordered_map<sstring, size_t>& datacenters) noexcept {
@@ -540,18 +523,18 @@ static bool has_sufficient_replicas(
return true;
}
static locator::endpoint_set calculate_natural_endpoints(
const token& search_token, const token_metadata& tm,
static locator::host_id_set calculate_natural_endpoints(
const token& search_token, const token_metadata2& tm,
const locator::topology& topo,
const std::unordered_map<sstring, size_t>& datacenters) {
//
// We want to preserve insertion order so that the first added endpoint
// becomes primary.
//
locator::endpoint_set replicas;
locator::host_id_set replicas;
// replicas we have found in each DC
std::unordered_map<sstring, std::unordered_set<inet_address>> dc_replicas;
std::unordered_map<sstring, std::unordered_set<host_id>> dc_replicas;
// tracks the racks we have already placed replicas in
std::unordered_map<sstring, std::unordered_set<sstring>> seen_racks;
//
@@ -559,7 +542,7 @@ static locator::endpoint_set calculate_natural_endpoints(
// when we relax the rack uniqueness we can append this to the current
// result so we don't have to wind back the iterator
//
std::unordered_map<sstring, locator::endpoint_set>
std::unordered_map<sstring, locator::host_id_set>
skipped_dc_endpoints;
//
@@ -600,7 +583,7 @@ static locator::endpoint_set calculate_natural_endpoints(
break;
}
inet_address ep = *tm.get_endpoint(next);
host_id ep = *tm.get_endpoint(next);
sstring dc = topo.get_location(ep).dc;
auto& seen_racks_dc_set = seen_racks[dc];
@@ -639,7 +622,7 @@ static locator::endpoint_set calculate_natural_endpoints(
auto skipped_it = skipped_dc_endpoints_set.begin();
while (skipped_it != skipped_dc_endpoints_set.end() &&
!has_sufficient_replicas(dc, dc_replicas, all_endpoints, datacenters)) {
inet_address skipped = *skipped_it++;
host_id skipped = *skipped_it++;
dc_replicas_dc_set.insert(skipped);
replicas.push_back(skipped);
}
@@ -667,25 +650,25 @@ static void test_equivalence(const shared_token_metadata& stm, const locator::to
return std::make_pair(p.first, to_sstring(p.second));
})));
const token_metadata& tm = *stm.get();
const token_metadata2& tm = *stm.get()->get_new();
for (size_t i = 0; i < 1000; ++i) {
auto token = dht::token::get_random_token();
auto expected = calculate_natural_endpoints(token, tm, topo, datacenters);
auto actual = get<endpoint_set>(nts.calculate_natural_endpoints(token, tm, false).get0());
auto actual = get<host_id_set>(nts.calculate_natural_endpoints(token, token_metadata(stm.get()->get_new_strong()), true).get0());
// Because the old algorithm does not put the nodes in the correct order in the case where more replicas
// are required than there are racks in a dc, we accept different order as long as the primary
// replica is the same.
BOOST_REQUIRE_EQUAL(expected[0], actual[0]);
BOOST_REQUIRE_EQUAL(std::set<inet_address>(expected.begin(), expected.end()),
std::set<inet_address>(actual.begin(), actual.end()));
BOOST_REQUIRE_EQUAL(std::set<host_id>(expected.begin(), expected.end()),
std::set<host_id>(actual.begin(), actual.end()));
}
}
void generate_topology(topology& topo, const std::unordered_map<sstring, size_t> datacenters, const std::vector<inet_address>& nodes) {
void generate_topology(topology& topo, const std::unordered_map<sstring, size_t> datacenters, const std::vector<host_id>& nodes) {
auto& e1 = seastar::testing::local_random_engine;
std::unordered_map<sstring, size_t> racks_per_dc;
@@ -705,11 +688,12 @@ void generate_topology(topology& topo, const std::unordered_map<sstring, size_t>
out = std::fill_n(out, rf, std::cref(dc));
}
unsigned i = 0;
for (auto& node : nodes) {
const sstring& dc = dcs[udist(0, dcs.size() - 1)(e1)];
auto rc = racks_per_dc.at(dc);
auto r = udist(0, rc)(e1);
topo.add_node(host_id::create_random_id(), node, {dc, to_sstring(r)}, locator::node::state::normal);
topo.add_node(node, inet_address((127u << 24) | ++i), {dc, to_sstring(r)}, locator::node::state::normal);
}
}
@@ -730,10 +714,10 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {
{ "rf5_2", 5 },
{ "rf5_3", 5 },
};
std::vector<inet_address> nodes;
std::vector<host_id> nodes;
nodes.reserve(NODES);
std::generate_n(std::back_inserter(nodes), NODES, [i = 0u]() mutable {
return inet_address((127u << 24) | ++i);
return host_id{utils::UUID(0, ++i)};
});
for (size_t run = 0; run < RUNS; ++run) {
@@ -744,7 +728,7 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {
while (random_tokens.size() < nodes.size() * VNODES) {
random_tokens.insert(dht::token::get_random_token());
}
std::unordered_map<inet_address, std::unordered_set<token>> endpoint_tokens;
std::unordered_map<host_id, std::unordered_set<token>> endpoint_tokens;
auto next_token_it = random_tokens.begin();
for (auto& node : nodes) {
for (size_t i = 0; i < VNODES; ++i) {
@@ -752,14 +736,14 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) {
next_token_it++;
}
}
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
generate_topology(tm.get_topology(), datacenters, nodes);
generate_topology(tm.get_new()->get_topology(), datacenters, nodes);
for (auto&& i : endpoint_tokens) {
co_await tm.update_normal_tokens(std::move(i.second), i.first);
co_await tm.get_new()->update_normal_tokens(std::move(i.second), i.first);
}
}).get();
test_equivalence(stm, stm.get()->get_topology(), datacenters);
test_equivalence(stm, stm.get()->get_new()->get_topology(), datacenters);
}
}
@@ -837,27 +821,27 @@ SEASTAR_THREAD_TEST_CASE(test_topology_compare_endpoints) {
{ "rf2", 2 },
{ "rf3", 3 },
};
std::vector<inet_address> nodes;
std::vector<host_id> nodes;
nodes.reserve(NODES);
auto make_address = [] (unsigned i) {
return inet_address((127u << 24) | i);
return host_id{utils::UUID(0, i)};
};
std::generate_n(std::back_inserter(nodes), NODES, [&, i = 0u]() mutable {
return make_address(++i);
});
auto bogus_address = make_address(NODES + 1);
auto bogus_address = inet_address((127u << 24) | static_cast<int>(NODES + 1));
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg);
stm.mutate_token_metadata([&] (token_metadata& tm) {
auto& topo = tm.get_topology();
auto& topo = tm.get_new()->get_topology();
generate_topology(topo, datacenters, nodes);
const auto& address = nodes[tests::random::get_int<size_t>(0, NODES-1)];
const auto& a1 = nodes[tests::random::get_int<size_t>(0, NODES-1)];
const auto& a2 = nodes[tests::random::get_int<size_t>(0, NODES-1)];
const auto& address = tm.get_new()->get_endpoint_for_host_id(nodes[tests::random::get_int<size_t>(0, NODES-1)]);
const auto& a1 = tm.get_new()->get_endpoint_for_host_id(nodes[tests::random::get_int<size_t>(0, NODES-1)]);
const auto& a2 = tm.get_new()->get_endpoint_for_host_id(nodes[tests::random::get_int<size_t>(0, NODES-1)]);
topo.test_compare_endpoints(address, address, address);
topo.test_compare_endpoints(address, address, a1);
@@ -896,23 +880,23 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) {
// get_location() should work before any node is added
BOOST_REQUIRE(stm.get()->get_topology().get_location() == ip1_dc_rack);
BOOST_REQUIRE(stm.get()->get_new()->get_topology().get_location() == ip1_dc_rack);
stm.mutate_token_metadata([&] (token_metadata& tm) {
tm.update_host_id(host2, ip2);
tm.update_host_id(host1, ip1); // this_node added last on purpose
tm.get_new()->update_host_id(host2, ip2);
tm.get_new()->update_host_id(host1, ip1); // this_node added last on purpose
return make_ready_future<>();
}).get();
const node* n1 = stm.get()->get_topology().find_node(host1);
const node* n1 = stm.get()->get_new()->get_topology().find_node(host1);
BOOST_REQUIRE(n1);
BOOST_REQUIRE(bool(n1->is_this_node()));
BOOST_REQUIRE_EQUAL(n1->host_id(), host1);
BOOST_REQUIRE_EQUAL(n1->endpoint(), ip1);
BOOST_REQUIRE(n1->dc_rack() == ip1_dc_rack);
BOOST_REQUIRE(stm.get()->get_topology().get_location() == ip1_dc_rack);
BOOST_REQUIRE(stm.get()->get_new()->get_topology().get_location() == ip1_dc_rack);
const node* n2 = stm.get()->get_topology().find_node(host2);
const node* n2 = stm.get()->get_new()->get_topology().find_node(host2);
BOOST_REQUIRE(n2);
BOOST_REQUIRE(!bool(n2->is_this_node()));
BOOST_REQUIRE_EQUAL(n2->host_id(), host2);
@@ -922,45 +906,45 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) {
// Removing local node
stm.mutate_token_metadata([&] (token_metadata& tm) {
tm.remove_endpoint(ip1);
tm.update_host_id(host3, ip3);
tm.get_new()->remove_endpoint(host1);
tm.get_new()->update_host_id(host3, ip3);
return make_ready_future<>();
}).get();
n1 = stm.get()->get_topology().find_node(host1);
n1 = stm.get()->get_new()->get_topology().find_node(host1);
BOOST_REQUIRE(!n1);
n1 = stm.get()->get_topology().find_node(ip1);
n1 = stm.get()->get_new()->get_topology().find_node(ip1);
BOOST_REQUIRE(!n1);
// Removing node with no local node
stm.mutate_token_metadata([&] (token_metadata& tm) {
tm.remove_endpoint(ip2);
tm.get_new()->remove_endpoint(host2);
return make_ready_future<>();
}).get();
n2 = stm.get()->get_topology().find_node(host2);
n2 = stm.get()->get_new()->get_topology().find_node(host2);
BOOST_REQUIRE(!n2);
n2 = stm.get()->get_topology().find_node(ip2);
n2 = stm.get()->get_new()->get_topology().find_node(ip2);
BOOST_REQUIRE(!n2);
// Repopulate after clear_gently()
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
co_await tm.clear_gently();
tm.update_host_id(host2, ip2);
tm.update_host_id(host1, ip1); // this_node added last on purpose
co_await tm.get_new()->clear_gently();
tm.get_new()->update_host_id(host2, ip2);
tm.get_new()->update_host_id(host1, ip1); // this_node added last on purpose
}).get();
n1 = stm.get()->get_topology().find_node(host1);
n1 = stm.get()->get_new()->get_topology().find_node(host1);
BOOST_REQUIRE(n1);
BOOST_REQUIRE(bool(n1->is_this_node()));
BOOST_REQUIRE_EQUAL(n1->host_id(), host1);
BOOST_REQUIRE_EQUAL(n1->endpoint(), ip1);
BOOST_REQUIRE(n1->dc_rack() == ip1_dc_rack);
BOOST_REQUIRE(stm.get()->get_topology().get_location() == ip1_dc_rack);
BOOST_REQUIRE(stm.get()->get_new()->get_topology().get_location() == ip1_dc_rack);
n2 = stm.get()->get_topology().find_node(host2);
n2 = stm.get()->get_new()->get_topology().find_node(host2);
BOOST_REQUIRE(n2);
BOOST_REQUIRE(!bool(n2->is_this_node()));
BOOST_REQUIRE_EQUAL(n2->host_id(), host2);
@@ -970,15 +954,15 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) {
// get_location() should pick up endpoint_dc_rack from node info
stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> {
co_await tm.clear_gently();
tm.get_topology().add_or_update_endpoint(ip1, host1, ip1_dc_rack_v2, node::state::being_decommissioned);
co_await tm.get_new()->clear_gently();
tm.get_new()->get_topology().add_or_update_endpoint(ip1, host1, ip1_dc_rack_v2, node::state::being_decommissioned);
}).get();
n1 = stm.get()->get_topology().find_node(host1);
n1 = stm.get()->get_new()->get_topology().find_node(host1);
BOOST_REQUIRE(n1);
BOOST_REQUIRE(bool(n1->is_this_node()));
BOOST_REQUIRE_EQUAL(n1->host_id(), host1);
BOOST_REQUIRE_EQUAL(n1->endpoint(), ip1);
BOOST_REQUIRE(n1->dc_rack() == ip1_dc_rack_v2);
BOOST_REQUIRE(stm.get()->get_topology().get_location() == ip1_dc_rack_v2);
BOOST_REQUIRE(stm.get()->get_new()->get_topology().get_location() == ip1_dc_rack_v2);
}

View File

@@ -433,7 +433,7 @@ SEASTAR_TEST_CASE(test_sharder) {
auto table1 = table_id(utils::UUID_gen::get_time_UUID());
token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1 } });
token_metadata2 tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1 } });
tokm.get_topology().add_or_update_endpoint(tokm.get_topology().my_address(), h1);
std::vector<tablet_id> tablet_ids;
@@ -591,7 +591,7 @@ SEASTAR_THREAD_TEST_CASE(test_token_ownership_splitting) {
// Reflects the plan in a given token metadata as if the migrations were fully executed.
static
void apply_plan(token_metadata& tm, const migration_plan& plan) {
void apply_plan(token_metadata2& tm, const migration_plan& plan) {
for (auto&& mig : plan.migrations()) {
tablet_map& tmap = tm.tablets().get_tablet_map(mig.tablet.table);
auto tinfo = tmap.get_tablet_info(mig.tablet.tablet);
@@ -611,7 +611,7 @@ tablet_transition_info migration_to_transition_info(const tablet_migration_info&
// Reflects the plan in a given token metadata as if the migrations were started but not yet executed.
static
void apply_plan_as_in_progress(token_metadata& tm, const migration_plan& plan) {
void apply_plan_as_in_progress(token_metadata2& tm, const migration_plan& plan) {
for (auto&& mig : plan.migrations()) {
tablet_map& tmap = tm.tablets().get_tablet_map(mig.tablet.table);
auto tinfo = tmap.get_tablet_info(mig.tablet.tablet);
@@ -622,12 +622,12 @@ void apply_plan_as_in_progress(token_metadata& tm, const migration_plan& plan) {
static
void rebalance_tablets(tablet_allocator& talloc, shared_token_metadata& stm) {
while (true) {
auto plan = talloc.balance_tablets(stm.get()).get0();
auto plan = talloc.balance_tablets(stm.get()->get_new_strong()).get0();
if (plan.empty()) {
break;
}
stm.mutate_token_metadata([&] (token_metadata& tm) {
apply_plan(tm, plan);
apply_plan(*tm.get_new(), plan);
return make_ready_future<>();
}).get();
}
@@ -636,12 +636,12 @@ void rebalance_tablets(tablet_allocator& talloc, shared_token_metadata& stm) {
static
void rebalance_tablets_as_in_progress(tablet_allocator& talloc, shared_token_metadata& stm) {
while (true) {
auto plan = talloc.balance_tablets(stm.get()).get0();
auto plan = talloc.balance_tablets(stm.get()->get_new_strong()).get0();
if (plan.empty()) {
break;
}
stm.mutate_token_metadata([&] (token_metadata& tm) {
apply_plan_as_in_progress(tm, plan);
apply_plan_as_in_progress(*tm.get_new(), plan);
return make_ready_future<>();
}).get();
}
@@ -651,7 +651,7 @@ void rebalance_tablets_as_in_progress(tablet_allocator& talloc, shared_token_met
static
void execute_transitions(shared_token_metadata& stm) {
stm.mutate_token_metadata([&] (token_metadata& tm) {
for (auto&& [tablet, tmap_] : tm.tablets().all_tables()) {
for (auto&& [tablet, tmap_] : tm.get_new()->tablets().all_tables()) {
auto& tmap = tmap_;
for (auto&& [tablet, trinfo]: tmap.transitions()) {
auto ti = tmap.get_tablet_info(tablet);
@@ -690,12 +690,12 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) {
});
stm.mutate_token_metadata([&] (auto& tm) {
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_host_id(host3, ip3);
tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.get_new()->update_host_id(host1, ip1);
tm.get_new()->update_host_id(host2, ip2);
tm.get_new()->update_host_id(host3, ip3);
tm.get_new()->update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.get_new()->update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tablet_map tmap(4);
auto tid = tmap.first_tablet();
@@ -728,13 +728,13 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) {
});
tablet_metadata tmeta;
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
tm.get_new()->set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
// Sanity check
{
load_sketch load(stm.get());
load_sketch load(stm.get()->get_new_strong());
load.populate().get();
BOOST_REQUIRE_EQUAL(load.get_load(host1), 4);
BOOST_REQUIRE_EQUAL(load.get_avg_shard_load(host1), 2);
@@ -747,7 +747,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) {
rebalance_tablets(e.get_tablet_allocator().local(), stm);
{
load_sketch load(stm.get());
load_sketch load(stm.get()->template get_new_strong());
load.populate().get();
for (auto h : {host1, host2, host3}) {
@@ -786,12 +786,12 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) {
stm.mutate_token_metadata([&](auto& tm) {
const unsigned shard_count = 2;
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_host_id(host3, ip3);
tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned,
tm.get_new()->update_host_id(host1, ip1);
tm.get_new()->update_host_id(host2, ip2);
tm.get_new()->update_host_id(host3, ip3);
tm.get_new()->update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.get_new()->update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned,
shard_count);
tablet_map tmap(4);
@@ -825,14 +825,14 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) {
});
tablet_metadata tmeta;
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
tm.get_new()->set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
rebalance_tablets(e.get_tablet_allocator().local(), stm);
{
load_sketch load(stm.get());
load_sketch load(stm.get()->get_new_strong());
load.populate().get();
BOOST_REQUIRE(load.get_avg_shard_load(host1) == 2);
BOOST_REQUIRE(load.get_avg_shard_load(host2) == 2);
@@ -840,14 +840,14 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) {
}
stm.mutate_token_metadata([&](auto& tm) {
tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, node::state::left);
tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::left);
return make_ready_future<>();
}).get();
rebalance_tablets(e.get_tablet_allocator().local(), stm);
{
load_sketch load(stm.get());
load_sketch load(stm.get()->get_new_strong());
load.populate().get();
BOOST_REQUIRE(load.get_avg_shard_load(host1) == 2);
BOOST_REQUIRE(load.get_avg_shard_load(host2) == 2);
@@ -888,14 +888,14 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) {
stm.mutate_token_metadata([&](auto& tm) {
const unsigned shard_count = 1;
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_host_id(host3, ip3);
tm.update_host_id(host4, ip4);
tm.update_topology(ip1, racks[0], std::nullopt, shard_count);
tm.update_topology(ip2, racks[1], std::nullopt, shard_count);
tm.update_topology(ip3, racks[0], std::nullopt, shard_count);
tm.update_topology(ip4, racks[1], node::state::being_decommissioned,
tm.get_new()->update_host_id(host1, ip1);
tm.get_new()->update_host_id(host2, ip2);
tm.get_new()->update_host_id(host3, ip3);
tm.get_new()->update_host_id(host4, ip4);
tm.get_new()->update_topology(host1, racks[0], std::nullopt, shard_count);
tm.get_new()->update_topology(host2, racks[1], std::nullopt, shard_count);
tm.get_new()->update_topology(host3, racks[0], std::nullopt, shard_count);
tm.get_new()->update_topology(host4, racks[1], node::state::being_decommissioned,
shard_count);
tablet_map tmap(4);
@@ -929,14 +929,14 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) {
});
tablet_metadata tmeta;
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
tm.get_new()->set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
rebalance_tablets(e.get_tablet_allocator().local(), stm);
{
load_sketch load(stm.get());
load_sketch load(stm.get()->get_new_strong());
load.populate().get();
BOOST_REQUIRE(load.get_avg_shard_load(host1) >= 2);
BOOST_REQUIRE(load.get_avg_shard_load(host2) >= 2);
@@ -947,10 +947,10 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) {
// Verify replicas are not collocated on racks
{
auto tm = stm.get();
auto& tmap = tm->tablets().get_tablet_map(table1);
auto& tmap = tm->get_new()->tablets().get_tablet_map(table1);
tmap.for_each_tablet([&](auto tid, auto& tinfo) {
auto rack1 = tm->get_topology().get_rack(tinfo.replicas[0].host);
auto rack2 = tm->get_topology().get_rack(tinfo.replicas[1].host);
auto rack1 = tm->get_new()->get_topology().get_rack(tinfo.replicas[0].host);
auto rack2 = tm->get_new()->get_topology().get_rack(tinfo.replicas[1].host);
BOOST_REQUIRE(rack1 != rack2);
}).get();
}
@@ -989,14 +989,14 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rack_load_failure) {
stm.mutate_token_metadata([&](auto& tm) {
const unsigned shard_count = 1;
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_host_id(host3, ip3);
tm.update_host_id(host4, ip4);
tm.update_topology(ip1, racks[0], std::nullopt, shard_count);
tm.update_topology(ip2, racks[0], std::nullopt, shard_count);
tm.update_topology(ip3, racks[0], std::nullopt, shard_count);
tm.update_topology(ip4, racks[1], node::state::being_decommissioned,
tm.get_new()->update_host_id(host1, ip1);
tm.get_new()->update_host_id(host2, ip2);
tm.get_new()->update_host_id(host3, ip3);
tm.get_new()->update_host_id(host4, ip4);
tm.get_new()->update_topology(host1, racks[0], std::nullopt, shard_count);
tm.get_new()->update_topology(host2, racks[0], std::nullopt, shard_count);
tm.get_new()->update_topology(host3, racks[0], std::nullopt, shard_count);
tm.get_new()->update_topology(host4, racks[1], node::state::being_decommissioned,
shard_count);
tablet_map tmap(4);
@@ -1030,7 +1030,7 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rack_load_failure) {
});
tablet_metadata tmeta;
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
tm.get_new()->set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
@@ -1063,12 +1063,12 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_not_met) {
stm.mutate_token_metadata([&](auto& tm) {
const unsigned shard_count = 2;
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_host_id(host3, ip3);
tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned,
tm.get_new()->update_host_id(host1, ip1);
tm.get_new()->update_host_id(host2, ip2);
tm.get_new()->update_host_id(host3, ip3);
tm.get_new()->update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.get_new()->update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned,
shard_count);
tablet_map tmap(1);
@@ -1082,7 +1082,7 @@ SEASTAR_THREAD_TEST_CASE(test_decommission_rf_not_met) {
});
tablet_metadata tmeta;
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
tm.get_new()->set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
@@ -1118,12 +1118,12 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions)
});
stm.mutate_token_metadata([&] (auto& tm) {
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_host_id(host3, ip3);
tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, 1);
tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, 1);
tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, 2);
tm.get_new()->update_host_id(host1, ip1);
tm.get_new()->update_host_id(host2, ip2);
tm.get_new()->update_host_id(host3, ip3);
tm.get_new()->update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, 1);
tm.get_new()->update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, 1);
tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, std::nullopt, 2);
tablet_map tmap(4);
std::optional<tablet_id> tid = tmap.first_tablet();
@@ -1146,7 +1146,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions)
});
tablet_metadata tmeta;
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
tm.get_new()->set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
@@ -1154,7 +1154,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions)
execute_transitions(stm);
{
load_sketch load(stm.get());
load_sketch load(stm.get()->get_new_strong());
load.populate().get();
for (auto h : {host1, host2, host3}) {
@@ -1187,12 +1187,12 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) {
});
stm.mutate_token_metadata([&] (auto& tm) {
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_host_id(host3, ip3);
tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, 1);
tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, 1);
tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, 2);
tm.get_new()->update_host_id(host1, ip1);
tm.get_new()->update_host_id(host2, ip2);
tm.get_new()->update_host_id(host3, ip3);
tm.get_new()->update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, 1);
tm.get_new()->update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, 1);
tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, std::nullopt, 2);
tablet_map tmap(4);
std::optional<tablet_id> tid = tmap.first_tablet();
@@ -1207,20 +1207,20 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) {
}
tablet_metadata tmeta;
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
tm.get_new()->set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
rebalance_tablets(e.get_tablet_allocator().local(), stm);
BOOST_REQUIRE(e.get_tablet_allocator().local().balance_tablets(stm.get()).get0().empty());
BOOST_REQUIRE(e.get_tablet_allocator().local().balance_tablets(stm.get()->get_new_strong()).get0().empty());
utils::get_local_injector().enable("tablet_allocator_shuffle");
auto disable_injection = seastar::defer([&] {
utils::get_local_injector().disable("tablet_allocator_shuffle");
});
BOOST_REQUIRE(!e.get_tablet_allocator().local().balance_tablets(stm.get()).get0().empty());
BOOST_REQUIRE(!e.get_tablet_allocator().local().balance_tablets(stm.get()->get_new_strong()).get0().empty());
}).get();
}
#endif
@@ -1250,14 +1250,14 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) {
});
stm.mutate_token_metadata([&] (auto& tm) {
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_host_id(host3, ip3);
tm.update_host_id(host4, ip4);
tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip4, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.get_new()->update_host_id(host1, ip1);
tm.get_new()->update_host_id(host2, ip2);
tm.get_new()->update_host_id(host3, ip3);
tm.get_new()->update_host_id(host4, ip4);
tm.get_new()->update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.get_new()->update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.get_new()->update_topology(host3, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.get_new()->update_topology(host4, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tablet_map tmap(16);
for (auto tid : tmap.tablet_ids()) {
@@ -1270,14 +1270,14 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) {
}
tablet_metadata tmeta;
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
tm.get_new()->set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
rebalance_tablets(e.get_tablet_allocator().local(), stm);
{
load_sketch load(stm.get());
load_sketch load(stm.get()->get_new_strong());
load.populate().get();
for (auto h : {host1, host2, host3, host4}) {
@@ -1312,8 +1312,8 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancer_disabling) {
stm.mutate_token_metadata([&] (auto& tm) {
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(host1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tm.update_topology(host2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
tablet_map tmap(16);
for (auto tid : tmap.tablet_ids()) {
@@ -1399,6 +1399,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config {
locator::topology::config {
.this_endpoint = inet_address("192.168.0.1"),
.this_host_id = hosts[0],
.local_dc_rack = racks[1]
}
});
@@ -1411,9 +1412,9 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
for (auto h : hosts) {
auto ip = inet_address(format("192.168.0.{}", ++i));
auto shard_count = 2;
tm.update_host_id(h, ip);
tm.get_new()->update_host_id(h, ip);
auto rack = racks[i % racks.size()];
tm.update_topology(ip, rack, std::nullopt, shard_count);
tm.get_new()->update_topology(h, rack, std::nullopt, shard_count);
if (h != hosts[0]) {
// Leave the first host empty by making it invisible to allocation algorithm.
hosts_by_rack[rack.rack].push_back(h);
@@ -1444,7 +1445,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
}
tablet_replica_set replicas;
for (auto h : replica_hosts) {
auto shard_count = tm.get_topology().find_node(h)->get_shard_count();
auto shard_count = tm.get_new()->get_topology().find_node(h)->get_shard_count();
auto shard = tests::random::get_int<shard_id>(0, shard_count - 1);
replicas.push_back(tablet_replica {h, shard});
}
@@ -1453,17 +1454,17 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
total_tablet_count += tmap.tablet_count();
tmeta.set_tablet_map(table, std::move(tmap));
}
tm.set_tablets(std::move(tmeta));
tm.get_new()->set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
testlog.debug("tablet metadata: {}", stm.get()->tablets());
testlog.debug("tablet metadata: {}", stm.get()->get_new()->tablets());
testlog.info("Total tablet count: {}, hosts: {}", total_tablet_count, hosts.size());
rebalance_tablets(e.get_tablet_allocator().local(), stm);
{
load_sketch load(stm.get());
load_sketch load(stm.get()->get_new_strong());
load.populate().get();
min_max_tracker<unsigned> min_max_load;
@@ -1473,7 +1474,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_random_load) {
min_max_load.update(l);
}
testlog.debug("tablet metadata: {}", stm.get()->tablets());
testlog.debug("tablet metadata: {}", stm.get()->get_new()->tablets());
testlog.debug("Min load: {}, max load: {}", min_max_load.min(), min_max_load.max());
// FIXME: The algorithm cannot achieve balance in all cases yet, so we only check that it stops.