storage_service, tablets: Extract do_tablet_operation() from stream_tablet()

It will be shared with cleanup_tablet().

Minor changes:
  - ditch the redundant optional<> around shared_future<>
This commit is contained in:
Tomasz Grabiec
2023-09-13 22:54:47 +02:00
parent e2c1f904c8
commit 2c6785dc8f
2 changed files with 78 additions and 46 deletions

View File

@@ -6071,55 +6071,83 @@ inet_address storage_service::host2ip(locator::host_id host) {
return *ip;
}
// Streams data to the pending tablet replica of a given tablet on this node.
// The source tablet replica is determined from the current transition info of the tablet.
future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
// The coordinator does not execute global token metadata barrier before jumping to "streaming" stage, so we need
// Performs a replica-side operation for a given tablet.
// What operation is performed is determined by "op" based on the
// current state of tablet metadata. The coordinator is supposed to prepare tablet
// metadata according to his intent and trigger the operation,
// without passing any transient information.
//
// If the operation succeeds, and the coordinator is still valid, it means
// that the operation intended by the coordinator was performed.
// If the coordinator is no longer valid, the operation may succeed but
// the actual operation performed may be different than intended, it may
// be the one intended by the new coordinator. This is not a problem
// because the old coordinator should do nothing with such result.
future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet,
sstring op_name,
std::function<future<>()> op) {
// The coordinator may not execute global token metadata barrier before triggering the operation, so we need
// a barrier here to see the token metadata which is at least as recent as that of the sender.
auto& raft_server = _group0->group0_server();
co_await raft_server.read_barrier(&_abort_source);
auto tm = _shared_token_metadata.get();
auto& tmap = tm->tablets().get_tablet_map(tablet.table);
auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet);
// Check if the request is still valid.
// If there is mismatch, it means this streaming was canceled and the coordinator moved on.
if (!trinfo) {
throw std::runtime_error(format("No transition info for tablet {}", tablet));
}
if (trinfo->stage != locator::tablet_transition_stage::streaming) {
throw std::runtime_error(format("Tablet {} stage is not at streaming", tablet));
}
if (trinfo->pending_replica.host != tm->get_my_id()) {
throw std::runtime_error(format("Tablet {} has pending replica different than this one", tablet));
}
auto& tinfo = tmap.get_tablet_info(tablet.tablet);
auto range = tmap.get_token_range(tablet.tablet);
locator::tablet_replica leaving_replica = locator::get_leaving_replica(tinfo, *trinfo);
if (leaving_replica.host == tm->get_my_id()) {
// The algorithm doesn't work with tablet migration within the same node because
// it assumes there is only one tablet replica, picked by the sharder, on local node.
throw std::runtime_error(format("Cannot stream within the same node, tablet: {}, shard {} -> {}",
tablet, leaving_replica.shard, trinfo->pending_replica.shard));
}
auto leaving_replica_ip = host2ip(leaving_replica.host);
if (_tablet_streaming[tablet]) {
slogger.debug("Streaming retry joining with existing session for tablet {}", tablet);
co_await _tablet_streaming[tablet]->get_future();
if (_tablet_ops.contains(tablet)) {
slogger.debug("{} retry joining with existing session for tablet {}", op_name, tablet);
co_await _tablet_ops[tablet].done.get_future();
co_return;
}
auto async_gate_holder = _async_gate.hold();
promise<> p;
_tablet_streaming[tablet] = seastar::shared_future<>(p.get_future());
auto erase_tablet_streaming = seastar::defer([&] {
_tablet_streaming.erase(tablet);
_tablet_ops.emplace(tablet, tablet_operation {
op_name, seastar::shared_future<>(p.get_future())
});
auto erase_registry_entry = seastar::defer([&] {
_tablet_ops.erase(tablet);
});
try {
co_await op();
p.set_value();
slogger.debug("{} for tablet migration of {} successful", op_name, tablet);
} catch (...) {
p.set_exception(std::current_exception());
slogger.warn("{} for tablet migration of {} failed: {}", op_name, tablet, std::current_exception());
throw;
}
}
// Streams data to the pending tablet replica of a given tablet on this node.
// The source tablet replica is determined from the current transition info of the tablet.
future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
return do_tablet_operation(tablet, "Streaming", [this, tablet] () -> future<> {
auto tm = _shared_token_metadata.get();
auto& tmap = tm->tablets().get_tablet_map(tablet.table);
auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet);
// Check if the request is still valid.
// If there is mismatch, it means this streaming was canceled and the coordinator moved on.
if (!trinfo) {
throw std::runtime_error(format("No transition info for tablet {}", tablet));
}
if (trinfo->stage != locator::tablet_transition_stage::streaming) {
throw std::runtime_error(format("Tablet {} stage is not at streaming", tablet));
}
if (trinfo->pending_replica.host != tm->get_my_id()) {
throw std::runtime_error(format("Tablet {} has pending replica different than this one", tablet));
}
auto& tinfo = tmap.get_tablet_info(tablet.tablet);
auto range = tmap.get_token_range(tablet.tablet);
locator::tablet_replica leaving_replica = locator::get_leaving_replica(tinfo, *trinfo);
if (leaving_replica.host == tm->get_my_id()) {
// The algorithm doesn't work with tablet migration within the same node because
// it assumes there is only one tablet replica, picked by the sharder, on local node.
throw std::runtime_error(format("Cannot stream within the same node, tablet: {}, shard {} -> {}",
tablet, leaving_replica.shard, trinfo->pending_replica.shard));
}
auto leaving_replica_ip = host2ip(leaving_replica.host);
auto& table = _db.local().find_column_family(tablet.table);
std::vector<sstring> tables = {table.schema()->cf_name()};
auto streamer = make_lw_shared<dht::range_streamer>(_db, _stream_manager, tm, _abort_source,
@@ -6132,14 +6160,8 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
ranges_per_endpoint[leaving_replica_ip].emplace_back(range);
streamer->add_rx_ranges(table.schema()->ks_name(), std::move(ranges_per_endpoint));
co_await streamer->stream_async();
p.set_value();
slogger.info("Streaming for tablet migration of {} successful", tablet);
} catch (...) {
p.set_exception(std::current_exception());
slogger.warn("Streaming for tablet migration of {} from {} failed: {}", tablet, leaving_replica, std::current_exception());
throw;
}
co_return;
});
}
void storage_service::init_messaging_service(sharded<service::storage_proxy>& proxy, sharded<db::system_distributed_keyspace>& sys_dist_ks) {

View File

@@ -114,6 +114,13 @@ private:
using inet_address = gms::inet_address;
using versioned_value = gms::versioned_value;
struct tablet_operation {
sstring name;
shared_future<> done;
};
using tablet_op_registry = std::unordered_map<locator::global_tablet_id, tablet_operation>;
abort_source& _abort_source;
gms::feature_service& _feature_service;
distributed<replica::database>& _db;
@@ -147,6 +154,9 @@ private:
future<> node_ops_abort(node_ops_id ops_uuid);
void node_ops_signal_abort(std::optional<node_ops_id> ops_uuid);
future<> node_ops_abort_thread();
future<> do_tablet_operation(locator::global_tablet_id tablet,
sstring op_name,
std::function<future<>()> op);
future<> stream_tablet(locator::global_tablet_id);
inet_address host2ip(locator::host_id);
public:
@@ -766,7 +776,7 @@ private:
std::optional<shared_future<>> _decomission_result;
std::optional<shared_future<>> _rebuild_result;
std::unordered_map<raft::server_id, std::optional<shared_future<>>> _remove_result;
std::unordered_map<locator::global_tablet_id, std::optional<shared_future<>>> _tablet_streaming;
tablet_op_registry _tablet_ops;
// During decommission, the node waits for the coordinator to tell it to shut down.
std::optional<promise<>> _shutdown_request_promise;
struct {