From 953af382813e4a71f2dc8a097f58e11c80509ee9 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 19 Apr 2022 08:35:44 +0800 Subject: [PATCH] streaming: Allow drop table during streaming Currently, if a table is dropped during streaming, the streaming would fail with no_such_column_family error. Since the table is dropped anyway, it makes more sense to ignore the streaming result of the dropped table, whether it is successful or failed. This allows users to drop tables during node operations, e.g., bootstrap or decommission a node. This is especially useful for the cloud users where it is hard to coordinate between a node operation by admin and user cql change. This patch also fixes a possible user after free issue by not passing the table reference object around. Fixes #10395 Closes #10396 --- streaming/stream_session.cc | 17 ++++++++++++----- streaming/stream_transfer_task.cc | 16 ++++++++++++++-- streaming/stream_transfer_task.hh | 1 + 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 2b5d986f45..51ca45dfb6 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -78,14 +78,12 @@ void stream_manager::init_messaging_service_handler() { auto from = netw::messaging_service::get_source(cinfo); auto reason = reason_opt ? *reason_opt: stream_reason::unspecified; sslog.trace("Got stream_mutation_fragments from {} reason {}", from, int(reason)); - replica::table& cf = _db.local().find_column_family(cf_id); if (!_sys_dist_ks.local_is_initialized() || !_view_update_generator.local_is_initialized()) { return make_exception_future>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later", utils::fb_utilities::get_broadcast_address()))); } - - return _mm.local().get_schema_for_write(schema_id, from, _ms.local()).then([this, from, estimated_partitions, plan_id, schema_id, &cf, source, reason] (schema_ptr s) mutable { - return _db.local().obtain_reader_permit(cf, "stream-session", db::no_timeout).then([this, from, estimated_partitions, plan_id, schema_id, &cf, source, reason, s] (reader_permit permit) mutable { + return _mm.local().get_schema_for_write(schema_id, from, _ms.local()).then([this, from, estimated_partitions, plan_id, cf_id, schema_id, source, reason] (schema_ptr s) mutable { + return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout).then([this, from, estimated_partitions, plan_id, cf_id, schema_id, source, reason, s] (reader_permit permit) mutable { auto sink = _ms.local().make_sink_for_stream_mutation_fragments(source); struct stream_mutation_fragments_cmd_status { bool got_cmd = false; @@ -126,11 +124,15 @@ void stream_manager::init_messaging_service_handler() { } }); }; + try { + // Make sure the table with cf_id is still present at this point. + // Close the sink in case the table is dropped. + auto op = _db.local().find_column_family(cf_id).stream_in_progress(); //FIXME: discarded future. (void)mutation_writer::distribute_reader_and_consume_on_shards(s, make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)), make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason)), - cf.stream_in_progress() + std::move(op) ).then_wrapped([s, plan_id, from, sink, estimated_partitions] (future f) mutable { int32_t status = 0; uint64_t received_partitions = 0; @@ -152,6 +154,11 @@ void stream_manager::init_messaging_service_handler() { sslog.error("[Stream #{}] Failed to handle STREAM_MUTATION_FRAGMENTS (respond phase) for ks={}, cf={}, peer={}: {}", plan_id, s->ks_name(), s->cf_name(), from.addr, ep); }); + } catch (...) { + return sink.close().then([sink, eptr = std::current_exception()] () -> future> { + return make_exception_future>(eptr); + }); + } return make_ready_future>(sink); }); }); diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index ec0121c1c1..cd6d86cfe6 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -200,7 +200,7 @@ future<> stream_transfer_task::execute() { auto si = make_lw_shared(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, [&sm, plan_id, addr = id.addr] (size_t sz) { sm.update_progress(plan_id, addr, streaming::progress_info::direction::OUT, sz); }); - return si->has_relevant_range_on_this_shard().then([si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) { + return si->has_relevant_range_on_this_shard().then([&sm, si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) { if (!has_relevant_range_on_this_shard) { sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}: ignore ranges on shard={}", plan_id, cf_id, this_shard_id()); @@ -219,8 +219,20 @@ future<> stream_transfer_task::execute() { std::rethrow_exception(ep); }); }).then([this, id, plan_id, cf_id] { + _mutation_done_sent = true; sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE Reply from {}", plan_id, id.addr); - }).handle_exception([this, plan_id, id] (auto ep){ + }).handle_exception([this, plan_id, cf_id, id] (std::exception_ptr ep) { + // If the table is dropped during streaming, we can ignore the + // errors and make the stream successful. This allows user to + // drop tables during node operations like decommission or + // bootstrap. + if (!session->manager().db().column_family_exists(cf_id)) { + sslog.warn("[Stream #{}] Ignore the table with table_id {} which is dropped during streaming: {}", plan_id, cf_id, ep); + if (!_mutation_done_sent) { + return session->manager().ms().send_stream_mutation_done(id, plan_id, _ranges, cf_id, session->dst_cpu_id); + } + return make_ready_future<>(); + } sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep); std::rethrow_exception(ep); }); diff --git a/streaming/stream_transfer_task.hh b/streaming/stream_transfer_task.hh index ccb5136794..6cb152190b 100644 --- a/streaming/stream_transfer_task.hh +++ b/streaming/stream_transfer_task.hh @@ -30,6 +30,7 @@ private: dht::token_range_vector _ranges; std::map _shard_ranges; long _total_size; + bool _mutation_done_sent = false; public: using UUID = utils::UUID; stream_transfer_task(stream_transfer_task&&) = default;