tablets: effective_replication_map: Take transition stage into account when computing replicas
This commit is contained in:
@@ -28,6 +28,53 @@ namespace locator {
|
||||
|
||||
seastar::logger tablet_logger("tablets");
|
||||
|
||||
|
||||
static
|
||||
write_replica_set_selector get_selector_for_writes(tablet_transition_stage stage) {
|
||||
switch (stage) {
|
||||
case tablet_transition_stage::allow_write_both_read_old:
|
||||
return write_replica_set_selector::previous;
|
||||
case tablet_transition_stage::write_both_read_old:
|
||||
return write_replica_set_selector::both;
|
||||
case tablet_transition_stage::streaming:
|
||||
return write_replica_set_selector::both;
|
||||
case tablet_transition_stage::write_both_read_new:
|
||||
return write_replica_set_selector::both;
|
||||
case tablet_transition_stage::use_new:
|
||||
return write_replica_set_selector::next;
|
||||
case tablet_transition_stage::cleanup:
|
||||
return write_replica_set_selector::next;
|
||||
}
|
||||
on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast<int>(stage)));
|
||||
}
|
||||
|
||||
static
|
||||
read_replica_set_selector get_selector_for_reads(tablet_transition_stage stage) {
|
||||
switch (stage) {
|
||||
case tablet_transition_stage::allow_write_both_read_old:
|
||||
return read_replica_set_selector::previous;
|
||||
case tablet_transition_stage::write_both_read_old:
|
||||
return read_replica_set_selector::previous;
|
||||
case tablet_transition_stage::streaming:
|
||||
return read_replica_set_selector::previous;
|
||||
case tablet_transition_stage::write_both_read_new:
|
||||
return read_replica_set_selector::next;
|
||||
case tablet_transition_stage::use_new:
|
||||
return read_replica_set_selector::next;
|
||||
case tablet_transition_stage::cleanup:
|
||||
return read_replica_set_selector::next;
|
||||
}
|
||||
on_internal_error(tablet_logger, format("Invalid tablet transition stage: {}", static_cast<int>(stage)));
|
||||
}
|
||||
|
||||
tablet_transition_info::tablet_transition_info(tablet_transition_stage stage, tablet_replica_set next, tablet_replica pending_replica)
|
||||
: stage(stage)
|
||||
, next(std::move(next))
|
||||
, pending_replica(std::move(pending_replica))
|
||||
, writes(get_selector_for_writes(stage))
|
||||
, reads(get_selector_for_reads(stage))
|
||||
{ }
|
||||
|
||||
const tablet_map& tablet_metadata::get_tablet_map(table_id id) const {
|
||||
try {
|
||||
return _tablets.at(id);
|
||||
@@ -267,7 +314,22 @@ public:
|
||||
virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token) const override {
|
||||
auto&& tablets = get_tablet_map();
|
||||
auto tablet = tablets.get_tablet_id(search_token);
|
||||
auto&& replicas = tablets.get_tablet_info(tablet).replicas;
|
||||
auto* info = tablets.get_tablet_transition_info(tablet);
|
||||
auto&& replicas = std::invoke([&] () -> const tablet_replica_set& {
|
||||
if (!info) {
|
||||
return tablets.get_tablet_info(tablet).replicas;
|
||||
}
|
||||
switch (info->writes) {
|
||||
case write_replica_set_selector::previous:
|
||||
[[fallthrough]];
|
||||
case write_replica_set_selector::both:
|
||||
return tablets.get_tablet_info(tablet).replicas;
|
||||
case write_replica_set_selector::next: {
|
||||
return info->next;
|
||||
}
|
||||
}
|
||||
on_internal_error(tablet_logger, format("Invalid replica selector", static_cast<int>(info->writes)));
|
||||
});
|
||||
tablet_logger.trace("get_natural_endpoints({}): table={}, tablet={}, replicas={}", search_token, _table, tablet, replicas);
|
||||
return to_replica_set(replicas);
|
||||
}
|
||||
@@ -285,13 +347,40 @@ public:
|
||||
if (!info) {
|
||||
return {};
|
||||
}
|
||||
tablet_logger.trace("get_pending_endpoints({}): table={}, tablet={}, replica={}",
|
||||
search_token, _table, tablet, info->pending_replica);
|
||||
return {get_endpoint_for_host_id(info->pending_replica.host)};
|
||||
switch (info->writes) {
|
||||
case write_replica_set_selector::previous:
|
||||
return {};
|
||||
case write_replica_set_selector::both:
|
||||
tablet_logger.trace("get_pending_endpoints({}): table={}, tablet={}, replica={}",
|
||||
search_token, _table, tablet, info->pending_replica);
|
||||
return {get_endpoint_for_host_id(info->pending_replica.host)};
|
||||
case write_replica_set_selector::next:
|
||||
return {};
|
||||
}
|
||||
on_internal_error(tablet_logger, format("Invalid replica selector", static_cast<int>(info->writes)));
|
||||
}
|
||||
|
||||
virtual inet_address_vector_replica_set get_endpoints_for_reading(const token& search_token) const override {
|
||||
return get_natural_endpoints_without_node_being_replaced(search_token);
|
||||
auto&& tablets = get_tablet_map();
|
||||
auto tablet = tablets.get_tablet_id(search_token);
|
||||
auto&& info = tablets.get_tablet_transition_info(tablet);
|
||||
auto&& replicas = std::invoke([&] () -> const tablet_replica_set& {
|
||||
if (!info) {
|
||||
return tablets.get_tablet_info(tablet).replicas;
|
||||
}
|
||||
switch (info->reads) {
|
||||
case read_replica_set_selector::previous:
|
||||
return tablets.get_tablet_info(tablet).replicas;
|
||||
case read_replica_set_selector::next: {
|
||||
return info->next;
|
||||
}
|
||||
}
|
||||
on_internal_error(tablet_logger, format("Invalid replica selector", static_cast<int>(info->reads)));
|
||||
});
|
||||
tablet_logger.trace("get_endpoints_for_reading({}): table={}, tablet={}, replicas={}", search_token, _table, tablet, replicas);
|
||||
auto result = to_replica_set(replicas);
|
||||
maybe_remove_node_being_replaced(*_tmptr, *_rs, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
virtual bool has_pending_ranges(inet_address endpoint) const override {
|
||||
|
||||
@@ -80,12 +80,24 @@ enum class tablet_transition_stage {
|
||||
sstring tablet_transition_stage_to_string(tablet_transition_stage);
|
||||
tablet_transition_stage tablet_transition_stage_from_string(const sstring&);
|
||||
|
||||
enum class write_replica_set_selector {
|
||||
previous, both, next
|
||||
};
|
||||
|
||||
enum class read_replica_set_selector {
|
||||
previous, next
|
||||
};
|
||||
|
||||
/// Used for storing tablet state transition during topology changes.
|
||||
/// Describes transition of a single tablet.
|
||||
struct tablet_transition_info {
|
||||
tablet_transition_stage stage;
|
||||
tablet_replica_set next;
|
||||
tablet_replica pending_replica; // Optimization (next - tablet_info::replicas)
|
||||
write_replica_set_selector writes;
|
||||
read_replica_set_selector reads;
|
||||
|
||||
tablet_transition_info(tablet_transition_stage stage, tablet_replica_set next, tablet_replica pending_replica);
|
||||
|
||||
bool operator==(const tablet_transition_info&) const = default;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user