Merge 'Concurrent tablet migration and balancing' from Tomasz Grabiec

This change makes tablet load balancing more efficient by performing
migrations independently for different tablets, and making new load
balancing plans concurrently with active migrations.

The migration track is interrupted by pending topology change operations.

The coordinator executes the load balancer on edges of tablet state
machine transitions. This allows new migrations to be started as soon
as tablets finish streaming.

The load balancer is also continuously invoked as long as it produces
a non-empty plan. This is in order to saturate the cluster with
streaming. A single make_plan() call is still not saturating, due
to the way algorithm is implemented.

Overload of shards is limited by the fact that load balancer algorithm tracks
streaming concurrency on both source and target shards of active
migrations and takes concurrency limit into account when producing new
migrations.

Closes #14851

* github.com:scylladb/scylladb:
  tablets: load_balancer: Remove double logging
  tests: tablets: Check that load balancing is interrupted by topology change
  tests: tablets: Add test for load balancing with active migrations
  tablets: Balance tablets concurrently with active migrations
  storage_service, tablets: Extract generate_migration_updates()
  storage_service, tablets: Move get_leaving_replica() to tablets.cc
  locator: tablets: Move std::hash definition earlier
  storage_service: Advance tablets independently
  topology_coordinator: Fix missed notification on abort
  tablets: Add formatter for tablet_migration_info
This commit is contained in:
Avi Kivity
2023-07-31 16:44:33 +03:00
11 changed files with 698 additions and 287 deletions

View File

@@ -78,20 +78,20 @@ check if we need to rebalance. If so, it computes an incremental tablet migratio
plan, persists it by moving tablets into transitional states, and moves the state machine
into the tablet migration track. All this happens atomically form the perspective
of group0 state machine.
The tablet migration track also invokes the load balancer and starts new migrations
to keep the cluster saturated with streaming. The load balancer is invoked
on transition of tablet stages, and also continuously as long as it generates
new migrations.
If there is a pending topology change request, the load balancer
will not be invoked to allow for current migrations to drain, after which the
state machine will exit the tablet migration track and allow pending topology
operation to start.
The tablet migration track excludes with other topology changes, so node operations
will have to wait for the plan to finish before they can take over the state machine.
The tablet balancing track migrates a small bunch of tablets, decided by the
loaded balancer, and then moves back the state machine to the idle state.
This gives other topology changes a chance to start, and if there aren't any, the
load balancer will be called again to check the conditions. This way
we can avoid blocking topology changes for too long, but also drive the cluster
to eventually achieve balance in the absence of other requests.
The load balancer is always invoked with no pending tablet migrations. This
allows for simplicity in the implementation, but may lead to underutilization
of cluster resources if different tablets migrate with different speeds,
and thus limit the speed of load balancing.
will have to wait for tablet migration track to finish before they can take over
the state machine.
The reason why the load balancer is part of the main state machine and excludes with other topology
changes is that we want to share the infrastructure for fencing between vnode-based topology
@@ -101,10 +101,7 @@ don't interfere with each other. The simplest is to make them part of the same s
When the topology state machine is not in the tablet_migration track, it is guaranteed
that there are no tablet transitions in the system.
Currently, all tablets in a batch decided by the load balancer are migrated in parallel and
their state machines are advanced at the same time. This means that streaming has to complete
for all tablets in a batch before any of them can move to the next phase. This is suboptimal
and will be changed later to allow for independent transitions.
Tablets are migrated in parallel and independently.
# Tablet migration
@@ -115,7 +112,7 @@ these properties of a tablet:
- stage: determines which replicas should be used by requests on the coordinator side, and which
action should be taken by the state machine executor.
Currently, the tablet state machine is driven forward by the tablet balancing track of the
Currently, the tablet state machine is driven forward by the tablet migration track of the
topology state machine.
The "stage" serves two major purposes:

View File

@@ -57,6 +57,13 @@ class load_sketch {
};
std::unordered_map<host_id, node_load> _nodes;
token_metadata_ptr _tm;
private:
tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const {
// We reflect migrations in the load as if they already happened,
// optimistically assuming that they will succeed.
return trinfo ? trinfo->next : ti.replicas;
}
public:
load_sketch(token_metadata_ptr tm)
: _tm(std::move(tm)) {
@@ -65,10 +72,10 @@ public:
future<> populate(std::optional<host_id> host = std::nullopt) {
const topology& topo = _tm->get_topology();
co_await utils::clear_gently(_nodes);
for (auto&& [table, tmap] : _tm->tablets().all_tables()) {
for (const tablet_info& ti : tmap.tablets()) {
co_await coroutine::maybe_yield();
for (auto&& replica : ti.replicas) {
for (auto&& [table, tmap_] : _tm->tablets().all_tables()) {
auto& tmap = tmap_;
co_await tmap.for_each_tablet([&] (tablet_id tid, const tablet_info& ti) {
for (auto&& replica : get_replicas_for_tablet_load(ti, tmap.get_tablet_transition_info(tid))) {
if (host && *host != replica.host) {
continue;
}
@@ -80,7 +87,7 @@ public:
n._shards[replica.shard].load += 1;
}
}
}
});
}
for (auto&& n : _nodes) {
std::make_heap(n.second._shards.begin(), n.second._shards.end(), shard_load_cmp());

View File

@@ -75,6 +75,34 @@ tablet_transition_info::tablet_transition_info(tablet_transition_stage stage, ta
, reads(get_selector_for_reads(stage))
{ }
tablet_migration_streaming_info get_migration_streaming_info(const tablet_info& tinfo, const tablet_transition_info& trinfo) {
tablet_migration_streaming_info result = {
.read_from = std::unordered_set<tablet_replica>(tinfo.replicas.begin(), tinfo.replicas.end()),
.written_to = std::unordered_set<tablet_replica>(trinfo.next.begin(), trinfo.next.end())
};
for (auto&& r : trinfo.next) {
result.read_from.erase(r);
}
for (auto&& r : tinfo.replicas) {
result.written_to.erase(r);
}
return result;
}
tablet_replica get_leaving_replica(const tablet_info& tinfo, const tablet_transition_info& trinfo) {
std::unordered_set<tablet_replica> leaving(tinfo.replicas.begin(), tinfo.replicas.end());
for (auto&& r : trinfo.next) {
leaving.erase(r);
}
if (leaving.empty()) {
throw std::runtime_error(format("No leaving replicas"));
}
if (leaving.size() > 1) {
throw std::runtime_error(format("More than one leaving replica"));
}
return *leaving.begin();
}
const tablet_map& tablet_metadata::get_tablet_map(table_id id) const {
try {
return _tablets.at(id);
@@ -162,6 +190,10 @@ future<> tablet_map::for_each_tablet(seastar::noncopyable_function<void(tablet_i
}
}
void tablet_map::clear_transitions() {
_transitions.clear();
}
std::optional<shard_id> tablet_map::get_shard(tablet_id tid, host_id host) const {
auto&& info = get_tablet_info(tid);

View File

@@ -45,21 +45,6 @@ struct tablet_id {
bool operator<=>(const tablet_id&) const = default;
};
}
namespace std {
template<>
struct hash<locator::tablet_id> {
size_t operator()(const locator::tablet_id& id) const {
return std::hash<size_t>()(id.value());
}
};
}
namespace locator {
/// Identifies tablet (not be confused with tablet replica) in the scope of the whole cluster.
struct global_tablet_id {
table_id table;
@@ -80,6 +65,39 @@ std::ostream& operator<<(std::ostream&, const tablet_replica&);
using tablet_replica_set = utils::small_vector<tablet_replica, 3>;
}
namespace std {
template<>
struct hash<locator::tablet_id> {
size_t operator()(const locator::tablet_id& id) const {
return std::hash<size_t>()(id.value());
}
};
template<>
struct hash<locator::tablet_replica> {
size_t operator()(const locator::tablet_replica& r) const {
return utils::hash_combine(
std::hash<locator::host_id>()(r.host),
std::hash<shard_id>()(r.shard));
}
};
template<>
struct hash<locator::global_tablet_id> {
size_t operator()(const locator::global_tablet_id& id) const {
return utils::hash_combine(
std::hash<table_id>()(id.table),
std::hash<locator::tablet_id>()(id.tablet));
}
};
}
namespace locator {
/// Creates a new replica set with old_replica replaced by new_replica.
/// If there is no old_replica, the set is returned unchanged.
inline
@@ -150,6 +168,17 @@ struct tablet_transition_info {
bool operator==(const tablet_transition_info&) const = default;
};
// Returns the leaving replica for a given transition.
tablet_replica get_leaving_replica(const tablet_info&, const tablet_transition_info&);
/// Describes streaming required for a given tablet transition.
struct tablet_migration_streaming_info {
std::unordered_set<tablet_replica> read_from;
std::unordered_set<tablet_replica> written_to;
};
tablet_migration_streaming_info get_migration_streaming_info(const tablet_info&, const tablet_transition_info&);
/// Stores information about tablets of a single table.
///
/// The map contains a constant number of tablets, tablet_count().
@@ -264,6 +293,7 @@ public:
public:
void set_tablet(tablet_id, tablet_info);
void set_tablet_transition_info(tablet_id, tablet_transition_info);
void clear_transitions();
// Destroys gently.
// The tablet map is not usable after this call and should be destroyed.
@@ -297,6 +327,7 @@ private:
public:
const tablet_map& get_tablet_map(table_id id) const;
const table_to_tablet_map& all_tables() const { return _tablets; }
table_to_tablet_map& all_tables() { return _tablets; }
size_t external_memory_usage() const;
public:
void set_tablet_map(table_id, tablet_map);
@@ -309,28 +340,6 @@ public:
}
namespace std {
template<>
struct hash<locator::tablet_replica> {
size_t operator()(const locator::tablet_replica& r) const {
return utils::hash_combine(
std::hash<locator::host_id>()(r.host),
std::hash<shard_id>()(r.shard));
}
};
template<>
struct hash<locator::global_tablet_id> {
size_t operator()(const locator::global_tablet_id& id) const {
return utils::hash_combine(
std::hash<table_id>()(id.table),
std::hash<locator::tablet_id>()(id.tablet));
}
};
}
template <>
struct fmt::formatter<locator::tablet_transition_stage> : fmt::formatter<std::string_view> {
auto format(const locator::tablet_transition_stage&, fmt::format_context& ctx) const -> decltype(ctx.out());

View File

@@ -388,17 +388,7 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s
return read_new_t::no;
}
switch (*state) {
case topology::transition_state::tablet_allow_write_both_read_old:
[[fallthrough]];
case topology::transition_state::tablet_write_both_read_new:
[[fallthrough]];
case topology::transition_state::tablet_write_both_read_old:
[[fallthrough]];
case topology::transition_state::tablet_streaming:
[[fallthrough]];
case topology::transition_state::tablet_use_new:
[[fallthrough]];
case topology::transition_state::tablet_cleanup:
case topology::transition_state::tablet_migration:
[[fallthrough]];
case topology::transition_state::commit_cdc_generation:
[[fallthrough]];
@@ -1181,116 +1171,201 @@ class topology_coordinator {
co_return std::move(guard);
}
future<> set_tablet_transition_stage(std::vector<canonical_mutation>& out, group0_guard& guard,
locator::tablet_transition_stage stage) {
// Represents a two-state state machine which changes monotonically
// from "not executed" to "executed successfully". This state
// machine is transient, lives only on this coordinator.
// The transition is achieved by execution of an idempotent async
// operation which is tracked by a future. Even though the async
// action is idempotent, it is costly, so we want to avoid
// re-executing it if it was already started by this coordinator,
// that's why we track it.
using background_action_holder = std::optional<future<>>;
// Transient state of tablet migration which lives on this coordinator.
// It is guaranteed to die when migration is finished.
// Next migration of the same tablet is guaranteed to use a different instance.
struct tablet_migration_state {
background_action_holder streaming;
};
std::unordered_map<locator::global_tablet_id, tablet_migration_state> _tablets;
// Set to true when any action started on behalf of a background_action_holder
// for any tablet finishes, or fails and needs to be restarted.
bool _tablets_ready = false;
seastar::gate _async_gate;
// This function drives background_action_holder towards "executed successfully"
// by starting the action if it is not already running or if the previous instance
// of the action failed. If the action is already running, it does nothing.
// Returns true iff background_action_holder reached the "executed successfully" state.
bool advance_in_background(locator::global_tablet_id gid, background_action_holder& holder, const char* name,
std::function<future<>()> action) {
if (!holder || holder->failed()) {
holder = futurize_invoke(action)
.finally([this, g = _async_gate.hold(), gid, name] () noexcept {
slogger.trace("raft topology: {} for tablet {} resolved.", name, gid);
_tablets_ready = true;
_topo_sm.event.broadcast();
});
return false;
}
if (!holder->available()) {
slogger.trace("raft topology: Tablet {} still doing {}", gid, name);
return false;
}
return true;
}
future<> for_each_tablet_transition(std::function<void(const locator::tablet_map&,
schema_ptr,
locator::global_tablet_id,
const locator::tablet_transition_info&)> func) {
auto tm = get_token_metadata_ptr();
for (auto&& [table, tmap] : tm->tablets().all_tables()) {
co_await coroutine::maybe_yield();
auto s = _db.find_schema(table);
for (auto&& [tablet, trinfo] : tmap.transitions()) {
for (auto&& [tablet, trinfo]: tmap.transitions()) {
co_await coroutine::maybe_yield();
auto last_token = tmap.get_last_token(tablet);
out.emplace_back(
auto gid = locator::global_tablet_id {table, tablet};
func(tmap, s, gid, trinfo);
}
}
}
void generate_migration_update(std::vector<canonical_mutation>& out, const group0_guard& guard, const tablet_migration_info& mig) {
auto s = _db.find_schema(mig.tablet.table);
auto& tmap = get_token_metadata_ptr()->tablets().get_tablet_map(mig.tablet.table);
auto last_token = tmap.get_last_token(mig.tablet.tablet);
if (tmap.get_tablet_transition_info(mig.tablet.tablet)) {
slogger.warn("Tablet already in transition, ignoring migration: {}", mig);
return;
}
out.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), mig.tablet.table)
.set_new_replicas(last_token, replace_replica(tmap.get_tablet_info(mig.tablet.tablet).replicas, mig.src, mig.dst))
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
.build());
}
future<> generate_migration_updates(std::vector<canonical_mutation>& out, const group0_guard& guard, const migration_plan& plan) {
for (const tablet_migration_info& mig : plan) {
co_await coroutine::maybe_yield();
generate_migration_update(out, guard, mig);
}
}
future<> handle_tablet_migration(group0_guard guard) {
// This step acts like a pump which advances state machines of individual tablets,
// batching barriers and group0 updates.
// If progress cannot be made, e.g. because all transitions are streaming, we block
// and wait for notification.
slogger.trace("raft topology: handle_tablet_migration()");
std::vector<canonical_mutation> updates;
bool needs_barrier = false;
bool has_transitions = false;
_tablets_ready = false;
co_await for_each_tablet_transition([&] (const locator::tablet_map& tmap,
schema_ptr s,
locator::global_tablet_id gid,
const locator::tablet_transition_info& trinfo) {
has_transitions = true;
auto last_token = tmap.get_last_token(gid.tablet);
auto& tablet_state = _tablets[gid];
table_id table = s->id();
auto transition_to = [&] (locator::tablet_transition_stage stage) {
slogger.trace("raft topology: Will set tablet {} stage to {}", gid, stage);
updates.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table)
.set_stage(last_token, stage)
.build());
};
auto transition_to_with_barrier = [&] (locator::tablet_transition_stage stage) {
needs_barrier = true;
transition_to(stage);
};
switch (trinfo.stage) {
case locator::tablet_transition_stage::allow_write_both_read_old:
transition_to_with_barrier(locator::tablet_transition_stage::write_both_read_old);
break;
case locator::tablet_transition_stage::write_both_read_old:
transition_to_with_barrier(locator::tablet_transition_stage::streaming);
break;
// The state "streaming" is needed to ensure that stale stream_tablet() RPC doesn't
// get admitted before global_tablet_token_metadata_barrier() is finished for earlier
// stage in case of coordinator failover.
case locator::tablet_transition_stage::streaming:
if (advance_in_background(gid, tablet_state.streaming, "streaming", [&] {
slogger.info("raft topology: Initiating tablet streaming of {} to {}", gid, trinfo.pending_replica);
auto dst = trinfo.pending_replica.host;
return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging,
netw::msg_addr(id2ip(dst)), _as, gid);
})) {
transition_to(locator::tablet_transition_stage::write_both_read_new);
}
break;
case locator::tablet_transition_stage::write_both_read_new:
transition_to_with_barrier(locator::tablet_transition_stage::use_new);
break;
case locator::tablet_transition_stage::use_new:
transition_to_with_barrier(locator::tablet_transition_stage::cleanup);
break;
case locator::tablet_transition_stage::cleanup:
// FIXME: Actually perform the cleanup. Block on integration with compaction groups.
_tablets.erase(gid);
updates.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table)
.del_transition(last_token)
.set_replicas(last_token, trinfo.next)
.build());
break;
}
}
}
});
future<> del_tablet_transitions(std::vector<canonical_mutation>& out, group0_guard& guard) {
auto tm = get_token_metadata_ptr();
for (auto&& [table, tmap] : tm->tablets().all_tables()) {
co_await coroutine::maybe_yield();
auto s = _db.find_schema(table);
for (auto&& [tablet, trinfo] : tmap.transitions()) {
co_await coroutine::maybe_yield();
auto last_token = tmap.get_last_token(tablet);
out.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), table)
.del_transition(last_token)
.set_replicas(last_token, trinfo.next)
.build());
}
}
}
future<> transition_tablets_to_stage(group0_guard guard, locator::tablet_transition_stage stage, topology::transition_state topo_state) {
std::vector<canonical_mutation> updates;
co_await set_tablet_transition_stage(updates, guard, stage);
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.set_transition_state(topo_state)
.set_version(_topo_sm._topology.version + 1)
.build());
co_await update_topology_state(std::move(guard), std::move(updates), format("Moved tablet migration to stage: {}", stage));
}
future<> handle_tablet_allow_write_both_read_old(group0_guard guard) {
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
co_await transition_tablets_to_stage(std::move(guard),
locator::tablet_transition_stage::write_both_read_old,
topology::transition_state::tablet_write_both_read_old);
}
future<> handle_tablet_write_both_read_old(group0_guard guard) {
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
co_await transition_tablets_to_stage(std::move(guard),
locator::tablet_transition_stage::streaming,
topology::transition_state::tablet_streaming);
}
// The state "streaming" is needed to ensure that stale stream_tablet() RPC doesn't
// get admitted before global_tablet_token_metadata_barrier() is finished in
// handle_tablet_write_both_read_old().
future<> handle_tablet_streaming(group0_guard guard) {
std::vector<future<>> calls;
auto tm = get_token_metadata_ptr();
for (auto&& [table, tmap] : tm->tablets().all_tables()) {
co_await coroutine::maybe_yield();
for (auto&& [tablet, trinfo] : tmap.transitions()) {
co_await coroutine::maybe_yield();
auto gid = locator::global_tablet_id{table, tablet};
auto dst = trinfo.pending_replica.host;
slogger.info("Initiating tablet streaming of {} to {}", gid, trinfo.pending_replica);
calls.emplace_back(futurize_invoke([&] {
return ser::storage_service_rpc_verbs::send_tablet_stream_data(&_messaging,
netw::msg_addr(id2ip(dst)), _as, gid);
}).handle_exception([gid, dst](auto ep) {
slogger.error("Failed to stream tablet {} to {}: {}", gid, dst, ep);
std::rethrow_exception(ep);
}));
}
if (needs_barrier) {
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
}
release_guard(std::move(guard));
co_await seastar::when_all_succeed(calls.begin(), calls.end());
// In order to keep the cluster saturated, ask the load balancer for more transitions.
// Unless there is a pending topology change operation.
auto [preempt, new_guard] = should_preempt_balancing(std::move(guard));
guard = std::move(new_guard);
if (!preempt) {
auto plan = co_await balance_tablets(get_token_metadata_ptr());
co_await generate_migration_updates(updates, guard, plan);
}
guard = co_await start_operation();
co_await transition_tablets_to_stage(std::move(guard),
locator::tablet_transition_stage::write_both_read_new,
topology::transition_state::tablet_write_both_read_new);
}
// It's ok to execute planned updates after retaking the guard because as long
// as topology is in tablet_migration state only this coordinator has a right
// to advance the state machine of tablets.
future<> handle_tablet_write_both_read_new(group0_guard guard) {
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
co_await transition_tablets_to_stage(std::move(guard),
locator::tablet_transition_stage::use_new,
topology::transition_state::tablet_use_new);
}
if (!updates.empty()) {
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.set_version(_topo_sm._topology.version + 1)
.build());
co_await update_topology_state(std::move(guard), std::move(updates), format("Tablet migration"));
co_return;
}
future<> handle_tablet_use_new(group0_guard guard) {
guard = co_await global_tablet_token_metadata_barrier(std::move(guard));
co_await transition_tablets_to_stage(std::move(guard),
locator::tablet_transition_stage::cleanup,
topology::transition_state::tablet_cleanup);
}
if (has_transitions) {
// Streaming may have finished after we checked. To avoid missed notification, we need
// to check atomically with event.wait()
if (!_tablets_ready) {
slogger.trace("raft topology: Going to sleep with active tablet transitions");
co_await await_event();
}
co_return;
}
future<> handle_tablet_cleanup(group0_guard guard) {
// FIXME: Actually perform cleanup
std::vector<canonical_mutation> updates;
co_await del_tablet_transitions(updates, guard);
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.del_transition_state()
@@ -1299,10 +1374,26 @@ class topology_coordinator {
co_await update_topology_state(std::move(guard), std::move(updates), "Finished tablet migration");
}
std::pair<bool, group0_guard> should_preempt_balancing(group0_guard guard) {
auto node_or_guard = get_node_to_work_on_opt(std::move(guard));
if (auto* node = std::get_if<node_to_work_on>(&node_or_guard)) {
return std::make_pair(true, std::move(node->guard));
}
guard = std::get<group0_guard>(std::move(node_or_guard));
if (_topo_sm._topology.global_request) {
return std::make_pair(true, std::move(guard));
}
return std::make_pair(false, std::move(guard));
}
// Returns `true` iff there was work to do.
future<bool> handle_topology_transition(group0_guard guard) {
auto tstate = _topo_sm._topology.tstate;
if (!tstate) {
// When adding a new source of work, make sure to update should_preempt_balancing() as well.
auto node_or_guard = get_node_to_work_on_opt(std::move(guard));
if (auto* node = std::get_if<node_to_work_on>(&node_or_guard)) {
co_await handle_node_transition(std::move(*node));
@@ -1531,24 +1622,8 @@ class topology_coordinator {
// Reads are fenced. We can now remove topology::transition_state and move node state to normal
}
break;
case topology::transition_state::tablet_allow_write_both_read_old:
co_await handle_tablet_allow_write_both_read_old(std::move(guard));
break;
case topology::transition_state::tablet_write_both_read_old:
co_await handle_tablet_write_both_read_old(std::move(guard));
break;
case topology::transition_state::tablet_streaming:
co_await handle_tablet_streaming(std::move(guard));
break;
case topology::transition_state::tablet_write_both_read_new:
co_await handle_tablet_write_both_read_new(std::move(guard));
break;
case topology::transition_state::tablet_use_new:
co_await handle_tablet_use_new(std::move(guard));
break;
case topology::transition_state::tablet_cleanup:
co_await handle_tablet_cleanup(std::move(guard));
break;
case topology::transition_state::tablet_migration:
co_await handle_tablet_migration(std::move(guard));
}
co_return true;
};
@@ -1744,6 +1819,11 @@ class topology_coordinator {
// Returns true if the state machine was transitioned into tablet migration path.
future<bool> maybe_start_tablet_migration(group0_guard);
future<> await_event() {
_as.check();
co_await _topo_sm.event.when();
}
public:
topology_coordinator(
sharded<db::system_distributed_keyspace>& sys_dist_ks,
@@ -1774,25 +1854,11 @@ future<bool> topology_coordinator::maybe_start_tablet_migration(group0_guard gua
std::vector<canonical_mutation> updates;
for (const tablet_migration_info& mig : plan) {
co_await coroutine::maybe_yield();
auto s = _db.find_schema(mig.tablet.table);
auto& tmap = tm->tablets().get_tablet_map(mig.tablet.table);
auto last_token = tmap.get_last_token(mig.tablet.tablet);
if (tmap.get_tablet_transition_info(mig.tablet.tablet)) {
slogger.warn("Tablet {} is already in transition, ignoring migration", mig.tablet);
continue;
}
updates.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), s->ks_name(), mig.tablet.table)
.set_new_replicas(last_token, replace_replica(tmap.get_tablet_info(mig.tablet.tablet).replicas, mig.src, mig.dst))
.set_stage(last_token, locator::tablet_transition_stage::allow_write_both_read_old)
.build());
}
co_await generate_migration_updates(updates, guard, plan);
updates.emplace_back(
topology_mutation_builder(guard.write_timestamp())
.set_transition_state(topology::transition_state::tablet_allow_write_both_read_old)
.set_transition_state(topology::transition_state::tablet_migration)
.set_version(_topo_sm._topology.version + 1)
.build());
@@ -1817,7 +1883,7 @@ future<> topology_coordinator::run() {
if (!had_work) {
// Nothing to work on. Wait for topology change event.
slogger.trace("raft topology: topology coordinator fiber has nothing to do. Sleeping.");
co_await _topo_sm.event.when();
co_await await_event();
slogger.trace("raft topology: topology coordinator fiber got an event");
}
} catch (raft::request_aborted&) {
@@ -1833,10 +1899,15 @@ future<> topology_coordinator::run() {
sleep = true;
}
if (sleep) {
co_await seastar::sleep_abortable(std::chrono::seconds(1), _as);
try {
co_await seastar::sleep_abortable(std::chrono::seconds(1), _as);
} catch (...) {
slogger.debug("raft topology: sleep failed: {}", std::current_exception());
}
}
co_await coroutine::maybe_yield();
}
co_await _async_gate.close();
}
future<> storage_service::raft_state_monitor_fiber(raft::server& raft, cdc::generation_service& cdc_gen_svc, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
@@ -5585,21 +5656,6 @@ future<> storage_service::update_fence_version(token_metadata::version_t new_ver
});
}
static
locator::tablet_replica get_leaving_replica(const locator::tablet_info& tinfo, const locator::tablet_transition_info& trinfo) {
std::unordered_set<locator::tablet_replica> leaving(tinfo.replicas.begin(), tinfo.replicas.end());
for (auto&& r : trinfo.next) {
leaving.erase(r);
}
if (leaving.empty()) {
throw std::runtime_error(format("No leaving replicas"));
}
if (leaving.size() > 1) {
throw std::runtime_error(format("More than one leaving replica"));
}
return *leaving.begin();
}
inet_address storage_service::host2ip(locator::host_id host) {
auto ip = _group0->address_map().find(raft::server_id(host.uuid()));
if (!ip) {
@@ -5634,7 +5690,7 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
auto& tinfo = tmap.get_tablet_info(tablet.tablet);
auto range = tmap.get_token_range(tablet.tablet);
locator::tablet_replica leaving_replica = get_leaving_replica(tinfo, *trinfo);
locator::tablet_replica leaving_replica = locator::get_leaving_replica(tinfo, *trinfo);
if (leaving_replica.host == tm->get_my_id()) {
// The algorithm doesn't work with tablet migration within the same node because
// it assumes there is only one tablet replica, picked by the sharder, on local node.

View File

@@ -12,6 +12,7 @@
#include "replica/database.hh"
#include "service/migration_manager.hh"
#include "service/tablet_allocator.hh"
#include "utils/error_injection.hh"
#include "utils/stall_free.hh"
#include "locator/load_sketch.hh"
#include "utils/div_ceil.hh"
@@ -65,6 +66,13 @@ seastar::logger lblogger("load_balancer");
/// means that many under-loaded nodes can be driven forward to balance concurrently because the load balancer
/// will alternate between them across make_plan() calls.
///
/// If the algorithm is called with active tablet migrations in tablet metadata, those are treated
/// by load balancer as if they were already completed. This allows the algorithm to incrementally
/// make decision which when executed with active migrations will produce the desired result.
/// Overload of shards which still contain migrated-away tablets is limited by the fact
/// that the algorithm tracks streaming concurrency on both source and target shards of active
/// migrations and takes concurrency limit into account when producing new migrations.
///
/// The cost of make_plan() is relatively heavy in terms of preparing data structures, so the current
/// implementation is not efficient if the scheduler would like to call make_plan() multiple times
/// to parallelize execution. This will be addressed in the future by keeping the data structures
@@ -79,7 +87,13 @@ class load_balancer {
using load_type = double;
struct shard_load {
size_t tablet_count;
size_t tablet_count = 0;
// Number of tablets which are streamed from this shard.
size_t streaming_read_load = 0;
// Number of tablets which are streamed to this shard.
size_t streaming_write_load = 0;
// Tablets which still have a replica on this shard which are candidates for migrating away from this shard.
std::unordered_set<global_tablet_id> candidates;
@@ -120,7 +134,65 @@ class load_balancer {
}
};
// Per-shard limits for active tablet streaming sessions.
//
// There is no hard reason for these values being what they are other than
// the guidelines below.
//
// We want to limit concurrency of active streaming for several reasons.
// One is that we want to prevent over-utilization of memory required to carry out streaming,
// as that may lead to OOM or excessive cache eviction.
//
// There is no network scheduler yet, so we want to avoid over-utilization of network bandwidth.
// Limiting per-shard concurrency is a lame way to achieve that, but it's better than nothing.
//
// Scheduling groups should limit impact of streaming on other kinds of processes on the same node,
// so this aspect is not the reason for limiting concurrency.
//
// We don't want too much parallelism because it means that we have plenty of migrations
// which progress slowly. It's better to have fewer which complete faster because
// less user requests suffer from double-quorum overhead, and under-loaded nodes can take
// the load sooner. At the same time, we want to have enough concurrency to fully utilize resources.
//
// Streaming speed is supposed to be I/O bound and writes are more expensive in terms of IO than reads,
// so we allow more read concurrency.
//
// We allow at least two sessions per shard so that there is less chance for idling until load balancer
// makes the next decision after streaming is finished.
const size_t max_write_streaming_load = 2;
const size_t max_read_streaming_load = 4;
token_metadata_ptr _tm;
private:
tablet_replica_set get_replicas_for_tablet_load(const tablet_info& ti, const tablet_transition_info* trinfo) const {
// We reflect migrations in the load as if they already happened,
// optimistically assuming that they will succeed.
return trinfo ? trinfo->next : ti.replicas;
}
// Whether to count the tablet as putting streaming load on the system.
// Tablets which are streaming or are yet-to-stream are counted.
bool is_streaming(const tablet_transition_info* trinfo) {
if (!trinfo) {
return false;
}
switch (trinfo->stage) {
case tablet_transition_stage::allow_write_both_read_old:
return true;
case tablet_transition_stage::write_both_read_old:
return true;
case tablet_transition_stage::streaming:
return true;
case tablet_transition_stage::write_both_read_new:
return false;
case tablet_transition_stage::use_new:
return false;
case tablet_transition_stage::cleanup:
return false;
}
on_internal_error(lblogger, format("Invalid transition stage: {}", static_cast<int>(trinfo->stage)));
}
public:
load_balancer(token_metadata_ptr tm)
: _tm(std::move(tm)) {
@@ -144,6 +216,12 @@ public:
future<migration_plan> make_plan(sstring dc) {
lblogger.info("Examining DC {}", dc);
// Causes load balancer to move some tablet even though load is balanced.
auto shuffle = utils::get_local_injector().enter("tablet_allocator_shuffle");
if (shuffle) {
lblogger.warn("Running without convergence checks");
}
const locator::topology& topo = _tm->get_topology();
// Select subset of nodes to balance.
@@ -162,9 +240,14 @@ public:
// Compute tablet load on nodes.
for (auto&& [table, tmap] : _tm->tablets().all_tables()) {
for (auto&& [table, tmap_] : _tm->tablets().all_tables()) {
auto& tmap = tmap_;
co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) {
for (auto&& replica : ti.replicas) {
auto trinfo = tmap.get_tablet_transition_info(tid);
// We reflect migrations in the load as if they already happened,
// optimistically assuming that they will succeed.
for (auto&& replica : get_replicas_for_tablet_load(ti, trinfo)) {
if (nodes.contains(replica.host)) {
nodes[replica.host].tablet_count += 1;
// This invariant is assumed later.
@@ -193,7 +276,7 @@ public:
}
}
if (max_load == min_load) {
if (!shuffle && max_load == min_load) {
// load is balanced.
// TODO: Evaluate and fix intra-node balance.
co_return migration_plan();
@@ -209,45 +292,50 @@ public:
// We want to saturate the target node so we migrate several tablets in parallel, one for each shard
// on the target node. This assumes that the target node is well-balanced and that tablet migrations
// complete at the same time. Both assumptions are not generally true in practice, which we currently ignore.
// If target node is not balanced across shards, we will overload some shards.
// If tablets are not balanced in size, throughput will suffer because some shards will be idle sooner than others.
// But they will be true typically, because we fill shards starting from least-loaded shards,
// so we naturally strive towards balance between shards.
//
// FIXME: To handle the above, we should (1) rebalance the target node
// before migrating tablets from other nodes. If shards are balanced on the target node, the balancer
// will naturally distribute tablets to different shards. Also, (2) we should change this algorithm
// to be a generator for migrations and have a scheduler in the execution layer which pulls migrations
// from this algorithm, batches them and decides how many to execute.
//
// The scheduler decides in which order to execute the plan based on current activity in the system.
// We cannot just ask the planner for the next migration and stop when we hit overload on some shard,
// because that can lead to underutilization of the cluster. Just because the next migration is blocked
// by the target shard being busy doesn't mean we could not proceed with migrations for other shards
// which would be produced by the planner subsequently.
// If target node is not balanced across shards, we will overload some shards. Streaming concurrency
// will suffer because more loaded shards will not participate, which will under-utilize the node.
// FIXME: To handle the above, we should rebalance the target node before migrating tablets from other nodes.
auto target_node = topo.find_node(target);
auto batch_size = target_node->get_shard_count();
// Compute per-shard load and candidate tablets.
for (auto&& [table, tmap] : _tm->tablets().all_tables()) {
if (!tmap.transitions().empty()) {
// FIXME: The algorithm doesn't support balancing with active transitions yet. They must finish first.
lblogger.warn("Pending transitions active.");
co_return migration_plan();
}
for (auto&& [table, tmap_] : _tm->tablets().all_tables()) {
auto& tmap = tmap_;
co_await tmap.for_each_tablet([&, table = table] (tablet_id tid, const tablet_info& ti) {
for (auto&& replica : ti.replicas) {
auto trinfo = tmap.get_tablet_transition_info(tid);
if (is_streaming(trinfo)) {
auto streaming_info = get_migration_streaming_info(ti, *trinfo);
for (auto&& replica : streaming_info.read_from) {
if (nodes.contains(replica.host)) {
nodes[replica.host].shards[replica.shard].streaming_read_load += 1;
}
}
for (auto&& replica : streaming_info.written_to) {
if (nodes.contains(replica.host)) {
nodes[replica.host].shards[replica.shard].streaming_write_load += 1;
}
}
}
for (auto&& replica : get_replicas_for_tablet_load(ti, trinfo)) {
if (!nodes.contains(replica.host)) {
continue;
}
auto& node_load_info = nodes[replica.host];
auto&& shard_load_info = node_load_info.shards[replica.shard];
shard_load& shard_load_info = node_load_info.shards[replica.shard];
if (shard_load_info.tablet_count == 0) {
node_load_info.shards_by_load.push_back(replica.shard);
}
shard_load_info.tablet_count += 1;
shard_load_info.candidates.emplace(global_tablet_id{table, tid});
if (!trinfo) { // migrating tablets are not candidates
shard_load_info.candidates.emplace(global_tablet_id {table, tid});
}
}
});
}
@@ -283,6 +371,8 @@ public:
const tablet_metadata& tmeta = _tm->tablets();
load_type max_off_candidate_load = 0; // max load among nodes which ran out of candidates.
auto& target_info = nodes[target];
const size_t max_skipped_migrations = target_info.shards.size() * 2;
size_t skipped_migrations = 0;
while (plan.size() < batch_size && !nodes_by_load.empty()) {
co_await coroutine::maybe_yield();
@@ -290,35 +380,37 @@ public:
auto src_host = nodes_by_load.back();
auto& src_node_info = nodes[src_host];
// Check if all nodes reached the same avg_load. There are three sets of nodes: target, candidates (nodes_by_load)
// and off-candidates (removed from nodes_by_load). At any time, the avg_load for target is not greater than
// that of any candidate, and avg_load of any candidate is not greater than that of any in the off-candidates set.
// This is ensured by the fact that we remove candidates in the order of avg_load from the heap, and
// because we prevent load inversion between candidate and target in the next check.
// So the max avg_load of candidates is that of the current src_node_info, and max avg_load of off-candidates
// is tracked in max_off_candidate_load. If max_off_candidate_load is equal to target's avg_load,
// it means that all nodes have equal avg_load. We take the maximum with the current candidate in src_node_info
// to handle the case of off-candidates being empty. In that case, max_off_candidate_load is 0.
if (std::max(max_off_candidate_load, src_node_info.avg_load) == target_info.avg_load) {
lblogger.debug("Balance achieved.");
break;
}
if (!shuffle) {
// Check if all nodes reached the same avg_load. There are three sets of nodes: target, candidates (nodes_by_load)
// and off-candidates (removed from nodes_by_load). At any time, the avg_load for target is not greater than
// that of any candidate, and avg_load of any candidate is not greater than that of any in the off-candidates set.
// This is ensured by the fact that we remove candidates in the order of avg_load from the heap, and
// because we prevent load inversion between candidate and target in the next check.
// So the max avg_load of candidates is that of the current src_node_info, and max avg_load of off-candidates
// is tracked in max_off_candidate_load. If max_off_candidate_load is equal to target's avg_load,
// it means that all nodes have equal avg_load. We take the maximum with the current candidate in src_node_info
// to handle the case of off-candidates being empty. In that case, max_off_candidate_load is 0.
if (std::max(max_off_candidate_load, src_node_info.avg_load) == target_info.avg_load) {
lblogger.debug("Balance achieved.");
break;
}
// If balance is not achieved, still consider migrating from candidate nodes which have higher load than the target.
// max_off_candidate_load may be higher than the load of current candidate.
if (src_node_info.avg_load <= target_info.avg_load) {
lblogger.debug("No more candidate nodes.");
lblogger.debug("No more candidate nodes. Next candidate is {} with avg_load={}, target's avg_load={}",
src_host, src_node_info.avg_load, target_info.avg_load);
break;
}
// If balance is not achieved, still consider migrating from candidate nodes which have higher load than the target.
// max_off_candidate_load may be higher than the load of current candidate.
if (src_node_info.avg_load <= target_info.avg_load) {
lblogger.debug("No more candidate nodes. Next candidate is {} with avg_load={}, target's avg_load={}",
src_host, src_node_info.avg_load, target_info.avg_load);
break;
}
// Prevent load inversion which can lead to oscillations.
if (src_node_info.get_avg_load(nodes[src_host].tablet_count - 1) <
target_info.get_avg_load(target_info.tablet_count + 1)) {
lblogger.debug("No more candidate nodes, load would be inverted. Next candidate is {} with avg_load={}, target's avg_load={}",
src_host, src_node_info.avg_load, target_info.avg_load);
break;
// Prevent load inversion which can lead to oscillations.
if (src_node_info.get_avg_load(nodes[src_host].tablet_count - 1) <
target_info.get_avg_load(target_info.tablet_count + 1)) {
lblogger.debug("No more candidate nodes, load would be inverted. Next candidate is {} with "
"avg_load={}, target's avg_load={}",
src_host, src_node_info.avg_load, target_info.avg_load);
break;
}
}
if (src_node_info.shards_by_load.empty()) {
@@ -385,8 +477,30 @@ public:
}
auto dst = global_shard_id {target, target_load.next_shard(target)};
lblogger.debug("Select {} to move from {} to {}", source_tablet, src, dst);
plan.push_back(tablet_migration_info {source_tablet, src, dst});
auto mig = tablet_migration_info {source_tablet, src, dst};
if (target_info.shards[dst.shard].streaming_write_load < max_write_streaming_load
&& src_node_info.shards[src_shard].streaming_read_load < max_read_streaming_load) {
target_info.shards[dst.shard].streaming_write_load += 1;
src_node_info.shards[src_shard].streaming_read_load += 1;
lblogger.debug("Adding migration: {}", mig);
plan.push_back(std::move(mig));
} else {
// Shards are overloaded with streaming. Do not include the migration in the plan, but
// continue as if it was in the hope that we will find a migration which can be executed without
// violating the load. Next make_plan() invocation will notice that the migration was not executed.
// We should not just stop here because that can lead to underutilization of the cluster.
// Just because the next migration is blocked doesn't mean we could not proceed with migrations
// for other shards which are produced by the planner subsequently.
lblogger.debug("Migration {} skipped because of load limit: src_load={}, dst_load={}", mig,
src_node_info.shards[src_shard].streaming_read_load,
target_info.shards[dst.shard].streaming_write_load);
skipped_migrations++;
if (skipped_migrations >= max_skipped_migrations) {
lblogger.debug("Too many migrations skipped, aborting balancing");
break;
}
}
target_info.tablet_count += 1;
target_info.update();
@@ -502,3 +616,8 @@ tablet_allocator_impl& tablet_allocator::impl() {
}
}
auto fmt::formatter<service::tablet_migration_info>::format(const service::tablet_migration_info& mig, fmt::format_context& ctx) const
-> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{{tablet: {}, src: {}, dst: {}}}", mig.tablet, mig.src, mig.dst);
}

View File

@@ -41,6 +41,13 @@ using migration_plan = utils::chunked_vector<tablet_migration_info>;
/// co_await execute(plan);
/// }
///
/// It is ok to invoke the algorithm with already active tablet migrations. The algorithm will take them into account
/// when balancing the load as if they already succeeded. This means that applying a series of migration plans
/// produced by this function will give the same result regardless of whether applying they are fully executed or
/// only initiated by creating corresponding transitions in tablet metadata.
///
/// The algorithm takes care of limiting the streaming load on the system, also by taking active migrations into account.
///
future<migration_plan> balance_tablets(locator::token_metadata_ptr);
class tablet_allocator_impl;
@@ -61,3 +68,8 @@ public:
};
}
template <>
struct fmt::formatter<service::tablet_migration_info> : fmt::formatter<std::string_view> {
auto format(const service::tablet_migration_info&, fmt::format_context& ctx) const -> decltype(ctx.out());
};

View File

@@ -45,12 +45,7 @@ static std::unordered_map<topology::transition_state, sstring> transition_state_
{topology::transition_state::publish_cdc_generation, "publish cdc generation"},
{topology::transition_state::write_both_read_old, "write both read old"},
{topology::transition_state::write_both_read_new, "write both read new"},
{topology::transition_state::tablet_allow_write_both_read_old, "tablet allow write both read old"},
{topology::transition_state::tablet_write_both_read_old, "tablet write both read old"},
{topology::transition_state::tablet_write_both_read_new, "tablet write both read new"},
{topology::transition_state::tablet_streaming, "tablet streaming"},
{topology::transition_state::tablet_use_new, "tablet use new"},
{topology::transition_state::tablet_cleanup, "tablet cleanup"},
{topology::transition_state::tablet_migration, "tablet migration"},
};
std::ostream& operator<<(std::ostream& os, topology::transition_state s) {

View File

@@ -73,14 +73,7 @@ struct topology {
publish_cdc_generation,
write_both_read_old,
write_both_read_new,
// Tablet migration steps
tablet_allow_write_both_read_old,
tablet_write_both_read_old,
tablet_write_both_read_new,
tablet_streaming,
tablet_use_new,
tablet_cleanup,
tablet_migration,
};
std::optional<transition_state> tstate;

View File

@@ -25,6 +25,7 @@
#include "locator/tablet_replication_strategy.hh"
#include "utils/fb_utilities.hh"
#include "utils/UUID_gen.hh"
#include "utils/error_injection.hh"
using namespace locator;
using namespace replica;
@@ -588,6 +589,7 @@ SEASTAR_THREAD_TEST_CASE(test_token_ownership_splitting) {
}
}
// Reflects the plan in a given token metadata as if the migrations were fully executed.
static
void apply_plan(token_metadata& tm, const migration_plan& plan) {
for (auto&& mig : plan) {
@@ -598,6 +600,25 @@ void apply_plan(token_metadata& tm, const migration_plan& plan) {
}
}
static
tablet_transition_info migration_to_transition_info(const tablet_migration_info& mig, const tablet_info& ti) {
return tablet_transition_info {
tablet_transition_stage::allow_write_both_read_old,
replace_replica(ti.replicas, mig.src, mig.dst),
mig.dst
};
}
// Reflects the plan in a given token metadata as if the migrations were started but not yet executed.
static
void apply_plan_as_in_progress(token_metadata& tm, const migration_plan& plan) {
for (auto&& mig : plan) {
tablet_map& tmap = tm.tablets().get_tablet_map(mig.tablet.table);
auto tinfo = tmap.get_tablet_info(mig.tablet.tablet);
tmap.set_tablet_transition_info(mig.tablet.tablet, migration_to_transition_info(mig, tinfo));
}
}
static
void rebalance_tablets(shared_token_metadata& stm) {
while (true) {
@@ -612,6 +633,37 @@ void rebalance_tablets(shared_token_metadata& stm) {
}
}
static
void rebalance_tablets_as_in_progress(shared_token_metadata& stm) {
while (true) {
auto plan = balance_tablets(stm.get()).get0();
if (plan.empty()) {
break;
}
stm.mutate_token_metadata([&] (token_metadata& tm) {
apply_plan_as_in_progress(tm, plan);
return make_ready_future<>();
}).get();
}
}
// Completes any in progress tablet migrations.
static
void execute_transitions(shared_token_metadata& stm) {
stm.mutate_token_metadata([&] (token_metadata& tm) {
for (auto&& [tablet, tmap_] : tm.tablets().all_tables()) {
auto& tmap = tmap_;
for (auto&& [tablet, trinfo]: tmap.transitions()) {
auto ti = tmap.get_tablet_info(tablet);
ti.replicas = trinfo.next;
tmap.set_tablet(tablet, ti);
}
tmap.clear_transitions();
}
return make_ready_future<>();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) {
// Tests the scenario of bootstrapping a single node
// Verifies that load balancer sees it and moves tablets to that node.
@@ -707,6 +759,137 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) {
}
}
SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) {
// Tests the scenario of bootstrapping a single node.
// Verifies that the load balancer balances tablets on that node
// even though there is already an active migration.
// The test verifies that the load balancer creates a plan
// which when executed will achieve perfect balance,
// which is a proof that it doesn't stop due to active migrations.
inet_address ip1("192.168.0.1");
inet_address ip2("192.168.0.2");
inet_address ip3("192.168.0.3");
auto host1 = host_id(next_uuid());
auto host2 = host_id(next_uuid());
auto host3 = host_id(next_uuid());
auto table1 = table_id(next_uuid());
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{
locator::topology::config{
.this_endpoint = ip1,
.local_dc_rack = locator::endpoint_dc_rack::default_location
}
});
stm.mutate_token_metadata([&] (auto& tm) {
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_host_id(host3, ip3);
tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, 1);
tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, 1);
tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, 2);
tablet_map tmap(4);
std::optional<tablet_id> tid = tmap.first_tablet();
for (int i = 0; i < 4; ++i) {
tmap.set_tablet(*tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 0},
}
});
tid = tmap.next_tablet(*tid);
}
tmap.set_tablet_transition_info(tmap.first_tablet(), tablet_transition_info {
tablet_transition_stage::allow_write_both_read_old,
tablet_replica_set {
tablet_replica {host3, 0},
tablet_replica {host2, 0},
},
tablet_replica {host3, 0}
});
tablet_metadata tmeta;
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
rebalance_tablets_as_in_progress(stm);
execute_transitions(stm);
{
load_sketch load(stm.get());
load.populate().get();
for (auto h : {host1, host2, host3}) {
testlog.debug("Checking host {}", h);
BOOST_REQUIRE(load.get_avg_shard_load(h) == 2);
}
}
}
#ifdef SCYLLA_ENABLE_ERROR_INJECTION
SEASTAR_THREAD_TEST_CASE(test_load_balancer_shuffle_mode) {
inet_address ip1("192.168.0.1");
inet_address ip2("192.168.0.2");
inet_address ip3("192.168.0.3");
auto host1 = host_id(next_uuid());
auto host2 = host_id(next_uuid());
auto host3 = host_id(next_uuid());
auto table1 = table_id(next_uuid());
semaphore sem(1);
shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, locator::token_metadata::config{
locator::topology::config{
.this_endpoint = ip1,
.local_dc_rack = locator::endpoint_dc_rack::default_location
}
});
stm.mutate_token_metadata([&] (auto& tm) {
tm.update_host_id(host1, ip1);
tm.update_host_id(host2, ip2);
tm.update_host_id(host3, ip3);
tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, 1);
tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, 1);
tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, std::nullopt, 2);
tablet_map tmap(4);
std::optional<tablet_id> tid = tmap.first_tablet();
for (int i = 0; i < 4; ++i) {
tmap.set_tablet(*tid, tablet_info {
tablet_replica_set {
tablet_replica {host1, 0},
tablet_replica {host2, 0},
}
});
tid = tmap.next_tablet(*tid);
}
tablet_metadata tmeta;
tmeta.set_tablet_map(table1, std::move(tmap));
tm.set_tablets(std::move(tmeta));
return make_ready_future<>();
}).get();
rebalance_tablets(stm);
BOOST_REQUIRE(balance_tablets(stm.get()).get0().empty());
utils::get_local_injector().enable("tablet_allocator_shuffle");
auto disable_injection = seastar::defer([&] {
utils::get_local_injector().disable("tablet_allocator_shuffle");
});
BOOST_REQUIRE(!balance_tablets(stm.get()).get0().empty());
}
#endif
SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_two_empty_nodes) {
inet_address ip1("192.168.0.1");
inet_address ip2("192.168.0.2");

View File

@@ -6,6 +6,7 @@
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot
from test.pylib.rest_client import inject_error
from test.pylib.util import wait_for_cql_and_get_hosts
import pytest
@@ -22,6 +23,11 @@ async def inject_error_one_shot_on(manager, error_name, servers):
await asyncio.gather(*errs)
async def inject_error_on(manager, error_name, servers):
errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers]
await asyncio.gather(*errs)
@pytest.mark.asyncio
async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(manager: ManagerClient):
"""Test that you can create a table and insert and query data"""
@@ -133,6 +139,8 @@ async def test_bootstrap(manager: ManagerClient):
for r in rows:
assert r.c == r.pk
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
logger.info("Adding new server")
await manager.server_add()