From 5cf035878d955255dc8a4d40436353a5c2351b86 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 13 Sep 2023 22:58:47 +0200 Subject: [PATCH] storage_service, tablets: Prevent stale RPCs from running beyond their stage Example scenario: 1. coordinator A sends RPC #1 to trigger streaming 2. coordinator fails over to B 3. coordinator B performs streaming successfully 4. RPC #1 arrives and starts streaming 5. coordinator B commits the transition to the post-streaming stage 6. coordinator B executes global token metadata barrier We end up with streaming running despite the fact that the current coordinator moved on. Currently, this won't happen, because streaming holds on to erm. But we want to change that (see #14995), so that it does not block barriers for migrations of other tablets. The same problem applies to tablet cleanup. The fix is to use tablet_metadata_guard around such long running operations, which will keep hold to erm so that in the above scenario coordinator B will wait for it in step 6. The guard ensures that erm doesn't block other migrations because it switches to the latest erm if it's compatible. If it's not, it signals abort_source for the guard so that such stale operation aborts soon and the barrier in step 6 doesn't wait for long. --- service/storage_service.cc | 56 ++++++++++++++++++++++++++++++++++---- service/storage_service.hh | 3 +- 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 7cef958c9e..db0e932c1d 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -22,6 +22,7 @@ #include "db/consistency_level.hh" #include "service/tablet_allocator.hh" #include "locator/tablets.hh" +#include "locator/tablet_metadata_guard.hh" #include "replica/tablet_mutation_builder.hh" #include #include "mutation/canonical_mutation.hh" @@ -6083,9 +6084,46 @@ inet_address storage_service::host2ip(locator::host_id host) { // the actual operation performed may be different than intended, it may // be the one intended by the new coordinator. This is not a problem // because the old coordinator should do nothing with such result. +// +// The triggers may be retried. They may also be reordered with older triggers, from +// the same or a different coordinator. There is a protocol which ensures that +// stale triggers won't cause operations to run beyond the migration stage they were +// intended for. For example, that streaming is not still running after the coordinator +// moved past the "streaming" stage, and that it won't be started when the stage is not appropriate. +// A non-stale trigger is the one which completed successfully and caused the valid coordinator +// to advance tablet migration to the next stage. Other triggers are called stale. +// We can divide stale triggers into categories: +// (1) Those which start after the tablet was moved to the next stage +// Those which start before the tablet was moved to the next stage, +// (2) ...but after the non-stale trigger finished +// (3) ...but before the non-stale trigger finished +// +// By "start" I mean the atomic block which inserts into _tablet_ops, and by "finish" I mean +// removal from _tablet_ops. +// So event ordering is local from the perspective of this replica, and is linear because +// this happens on the same shard. +// +// What prevents (1) from running is the fact that triggers check the state of tablet +// metadata, and will fail immediately if the stage is not appropriate. It can happen +// that the trigger is so stale that it will match with an appropriate stage of the next +// migration of the same tablet. This is not a problem because we fall into the same +// category as a stale trigger which was started in the new migration, so cases (2) or (3) apply. +// +// What prevents (2) from running is the fact that after the coordinator moves on to +// the next stage, it executes a token metadata barrier, which will wait for such triggers +// to complete as they hold on to erm via tablet_metadata_barrier. They should be aborted +// soon after the coordinator changes the stage by the means of tablet_metadata_barrier::get_abort_source(). +// +// What prevents (3) from running is that they will join with the non-stale trigger, or non-stale +// trigger will join with them, depending on which came first. In that case they finish at the same time. +// +// It's very important that the global token metadata barrier involves all nodes which +// may receive stale triggers started in the previous stage, so that those nodes will +// see tablet metadata which reflects group0 state. This will cut-off stale triggers +// as soon as the coordinator moves to the next stage. future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet, sstring op_name, - std::function()> op) { + std::function(locator::tablet_metadata_guard&)> op) { // The coordinator may not execute global token metadata barrier before triggering the operation, so we need // a barrier here to see the token metadata which is at least as recent as that of the sender. auto& raft_server = _group0->group0_server(); @@ -6097,6 +6135,12 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet, co_return; } + locator::tablet_metadata_guard guard(_db.local().find_column_family(tablet.table), tablet); + auto& as = guard.get_abort_source(); + auto sub = _abort_source.subscribe([&as] () noexcept { + as.request_abort(); + }); + auto async_gate_holder = _async_gate.hold(); promise<> p; _tablet_ops.emplace(tablet, tablet_operation { @@ -6107,7 +6151,7 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet, }); try { - co_await op(); + co_await op(guard); p.set_value(); slogger.debug("{} for tablet migration of {} successful", op_name, tablet); } catch (...) { @@ -6120,9 +6164,9 @@ future<> storage_service::do_tablet_operation(locator::global_tablet_id tablet, // Streams data to the pending tablet replica of a given tablet on this node. // The source tablet replica is determined from the current transition info of the tablet. future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { - return do_tablet_operation(tablet, "Streaming", [this, tablet] () -> future<> { - auto tm = _shared_token_metadata.get(); - auto& tmap = tm->tablets().get_tablet_map(tablet.table); + return do_tablet_operation(tablet, "Streaming", [this, tablet] (locator::tablet_metadata_guard& guard) -> future<> { + auto tm = guard.get_token_metadata(); + auto& tmap = guard.get_tablet_map(); auto* trinfo = tmap.get_tablet_transition_info(tablet.tablet); // Check if the request is still valid. @@ -6150,7 +6194,7 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { auto& table = _db.local().find_column_family(tablet.table); std::vector tables = {table.schema()->cf_name()}; - auto streamer = make_lw_shared(_db, _stream_manager, tm, _abort_source, + auto streamer = make_lw_shared(_db, _stream_manager, std::move(tm), guard.get_abort_source(), get_broadcast_address(), _snitch.local()->get_location(), "Tablet migration", streaming::stream_reason::tablet_migration, std::move(tables)); streamer->add_source_filter(std::make_unique( diff --git a/service/storage_service.hh b/service/storage_service.hh index 4296fcb00b..4df3e909d2 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -16,6 +16,7 @@ #include "service/endpoint_lifecycle_subscriber.hh" #include "locator/abstract_replication_strategy.hh" #include "locator/tablets.hh" +#include "locator/tablet_metadata_guard.hh" #include "inet_address_vectors.hh" #include #include @@ -156,7 +157,7 @@ private: future<> node_ops_abort_thread(); future<> do_tablet_operation(locator::global_tablet_id tablet, sstring op_name, - std::function()> op); + std::function(locator::tablet_metadata_guard&)> op); future<> stream_tablet(locator::global_tablet_id); inet_address host2ip(locator::host_id); public: