diff --git a/service/storage_service.cc b/service/storage_service.cc index 1a6ed00e57..7cef958c9e 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6071,55 +6071,83 @@ inet_address storage_service::host2ip(locator::host_id host) { return *ip; } -// Streams data to the pending tablet replica of a given tablet on this node. -// The source tablet replica is determined from the current transition info of the tablet. -future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { - // The coordinator does not execute global token metadata barrier before jumping to "streaming" stage, so we need +// Performs a replica-side operation for a given tablet. +// What operation is performed is determined by "op" based on the +// current state of tablet metadata. The coordinator is supposed to prepare tablet +// metadata according to his intent and trigger the operation, +// without passing any transient information. +// +// If the operation succeeds, and the coordinator is still valid, it means +// that the operation intended by the coordinator was performed. +// If the coordinator is no longer valid, the operation may succeed but +// the actual operation performed may be different than intended, it may +// be the one intended by the new coordinator. This is not a problem +// because the old coordinator should do nothing with such result. +future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet, + sstring op_name, + std::function()> op) { + // The coordinator may not execute global token metadata barrier before triggering the operation, so we need // a barrier here to see the token metadata which is at least as recent as that of the sender. auto& raft_server = _group0->group0_server(); co_await raft_server.read_barrier(&_abort_source); - auto tm = _shared_token_metadata.get(); - auto& tmap = tm->tablets().get_tablet_map(tablet.table); - auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet); - - // Check if the request is still valid. - // If there is mismatch, it means this streaming was canceled and the coordinator moved on. - if (!trinfo) { - throw std::runtime_error(format("No transition info for tablet {}", tablet)); - } - if (trinfo->stage != locator::tablet_transition_stage::streaming) { - throw std::runtime_error(format("Tablet {} stage is not at streaming", tablet)); - } - if (trinfo->pending_replica.host != tm->get_my_id()) { - throw std::runtime_error(format("Tablet {} has pending replica different than this one", tablet)); - } - - auto& tinfo = tmap.get_tablet_info(tablet.tablet); - auto range = tmap.get_token_range(tablet.tablet); - locator::tablet_replica leaving_replica = locator::get_leaving_replica(tinfo, *trinfo); - if (leaving_replica.host == tm->get_my_id()) { - // The algorithm doesn't work with tablet migration within the same node because - // it assumes there is only one tablet replica, picked by the sharder, on local node. - throw std::runtime_error(format("Cannot stream within the same node, tablet: {}, shard {} -> {}", - tablet, leaving_replica.shard, trinfo->pending_replica.shard)); - } - auto leaving_replica_ip = host2ip(leaving_replica.host); - - if (_tablet_streaming[tablet]) { - slogger.debug("Streaming retry joining with existing session for tablet {}", tablet); - co_await _tablet_streaming[tablet]->get_future(); + if (_tablet_ops.contains(tablet)) { + slogger.debug("{} retry joining with existing session for tablet {}", op_name, tablet); + co_await _tablet_ops[tablet].done.get_future(); co_return; } auto async_gate_holder = _async_gate.hold(); promise<> p; - _tablet_streaming[tablet] = seastar::shared_future<>(p.get_future()); - auto erase_tablet_streaming = seastar::defer([&] { - _tablet_streaming.erase(tablet); + _tablet_ops.emplace(tablet, tablet_operation { + op_name, seastar::shared_future<>(p.get_future()) + }); + auto erase_registry_entry = seastar::defer([&] { + _tablet_ops.erase(tablet); }); try { + co_await op(); + p.set_value(); + slogger.debug("{} for tablet migration of {} successful", op_name, tablet); + } catch (...) { + p.set_exception(std::current_exception()); + slogger.warn("{} for tablet migration of {} failed: {}", op_name, tablet, std::current_exception()); + throw; + } +} + +// Streams data to the pending tablet replica of a given tablet on this node. +// The source tablet replica is determined from the current transition info of the tablet. +future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { + return do_tablet_operation(tablet, "Streaming", [this, tablet] () -> future<> { + auto tm = _shared_token_metadata.get(); + auto& tmap = tm->tablets().get_tablet_map(tablet.table); + auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet); + + // Check if the request is still valid. + // If there is mismatch, it means this streaming was canceled and the coordinator moved on. + if (!trinfo) { + throw std::runtime_error(format("No transition info for tablet {}", tablet)); + } + if (trinfo->stage != locator::tablet_transition_stage::streaming) { + throw std::runtime_error(format("Tablet {} stage is not at streaming", tablet)); + } + if (trinfo->pending_replica.host != tm->get_my_id()) { + throw std::runtime_error(format("Tablet {} has pending replica different than this one", tablet)); + } + + auto& tinfo = tmap.get_tablet_info(tablet.tablet); + auto range = tmap.get_token_range(tablet.tablet); + locator::tablet_replica leaving_replica = locator::get_leaving_replica(tinfo, *trinfo); + if (leaving_replica.host == tm->get_my_id()) { + // The algorithm doesn't work with tablet migration within the same node because + // it assumes there is only one tablet replica, picked by the sharder, on local node. + throw std::runtime_error(format("Cannot stream within the same node, tablet: {}, shard {} -> {}", + tablet, leaving_replica.shard, trinfo->pending_replica.shard)); + } + auto leaving_replica_ip = host2ip(leaving_replica.host); + auto& table = _db.local().find_column_family(tablet.table); std::vector tables = {table.schema()->cf_name()}; auto streamer = make_lw_shared(_db, _stream_manager, tm, _abort_source, @@ -6132,14 +6160,8 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { ranges_per_endpoint[leaving_replica_ip].emplace_back(range); streamer->add_rx_ranges(table.schema()->ks_name(), std::move(ranges_per_endpoint)); co_await streamer->stream_async(); - - p.set_value(); - slogger.info("Streaming for tablet migration of {} successful", tablet); - } catch (...) { - p.set_exception(std::current_exception()); - slogger.warn("Streaming for tablet migration of {} from {} failed: {}", tablet, leaving_replica, std::current_exception()); - throw; - } + co_return; + }); } void storage_service::init_messaging_service(sharded& proxy, sharded& sys_dist_ks) { diff --git a/service/storage_service.hh b/service/storage_service.hh index 9bc321840a..4296fcb00b 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -114,6 +114,13 @@ private: using inet_address = gms::inet_address; using versioned_value = gms::versioned_value; + struct tablet_operation { + sstring name; + shared_future<> done; + }; + + using tablet_op_registry = std::unordered_map; + abort_source& _abort_source; gms::feature_service& _feature_service; distributed& _db; @@ -147,6 +154,9 @@ private: future<> node_ops_abort(node_ops_id ops_uuid); void node_ops_signal_abort(std::optional ops_uuid); future<> node_ops_abort_thread(); + future<> do_tablet_operation(locator::global_tablet_id tablet, + sstring op_name, + std::function()> op); future<> stream_tablet(locator::global_tablet_id); inet_address host2ip(locator::host_id); public: @@ -766,7 +776,7 @@ private: std::optional> _decomission_result; std::optional> _rebuild_result; std::unordered_map>> _remove_result; - std::unordered_map>> _tablet_streaming; + tablet_op_registry _tablet_ops; // During decommission, the node waits for the coordinator to tell it to shut down. std::optional> _shutdown_request_promise; struct {