diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 2b5d986f45..51ca45dfb6 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -78,14 +78,12 @@ void stream_manager::init_messaging_service_handler() { auto from = netw::messaging_service::get_source(cinfo); auto reason = reason_opt ? *reason_opt: stream_reason::unspecified; sslog.trace("Got stream_mutation_fragments from {} reason {}", from, int(reason)); - replica::table& cf = _db.local().find_column_family(cf_id); if (!_sys_dist_ks.local_is_initialized() || !_view_update_generator.local_is_initialized()) { return make_exception_future>(std::runtime_error(format("Node {} is not fully initialized for streaming, try again later", utils::fb_utilities::get_broadcast_address()))); } - - return _mm.local().get_schema_for_write(schema_id, from, _ms.local()).then([this, from, estimated_partitions, plan_id, schema_id, &cf, source, reason] (schema_ptr s) mutable { - return _db.local().obtain_reader_permit(cf, "stream-session", db::no_timeout).then([this, from, estimated_partitions, plan_id, schema_id, &cf, source, reason, s] (reader_permit permit) mutable { + return _mm.local().get_schema_for_write(schema_id, from, _ms.local()).then([this, from, estimated_partitions, plan_id, cf_id, schema_id, source, reason] (schema_ptr s) mutable { + return _db.local().obtain_reader_permit(s, "stream-session", db::no_timeout).then([this, from, estimated_partitions, plan_id, cf_id, schema_id, source, reason, s] (reader_permit permit) mutable { auto sink = _ms.local().make_sink_for_stream_mutation_fragments(source); struct stream_mutation_fragments_cmd_status { bool got_cmd = false; @@ -126,11 +124,15 @@ void stream_manager::init_messaging_service_handler() { } }); }; + try { + // Make sure the table with cf_id is still present at this point. + // Close the sink in case the table is dropped. + auto op = _db.local().find_column_family(cf_id).stream_in_progress(); //FIXME: discarded future. (void)mutation_writer::distribute_reader_and_consume_on_shards(s, make_generating_reader_v1(s, permit, std::move(get_next_mutation_fragment)), make_streaming_consumer("streaming", _db, _sys_dist_ks, _view_update_generator, estimated_partitions, reason, is_offstrategy_supported(reason)), - cf.stream_in_progress() + std::move(op) ).then_wrapped([s, plan_id, from, sink, estimated_partitions] (future f) mutable { int32_t status = 0; uint64_t received_partitions = 0; @@ -152,6 +154,11 @@ void stream_manager::init_messaging_service_handler() { sslog.error("[Stream #{}] Failed to handle STREAM_MUTATION_FRAGMENTS (respond phase) for ks={}, cf={}, peer={}: {}", plan_id, s->ks_name(), s->cf_name(), from.addr, ep); }); + } catch (...) { + return sink.close().then([sink, eptr = std::current_exception()] () -> future> { + return make_exception_future>(eptr); + }); + } return make_ready_future>(sink); }); }); diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index ec0121c1c1..cd6d86cfe6 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -200,7 +200,7 @@ future<> stream_transfer_task::execute() { auto si = make_lw_shared(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, [&sm, plan_id, addr = id.addr] (size_t sz) { sm.update_progress(plan_id, addr, streaming::progress_info::direction::OUT, sz); }); - return si->has_relevant_range_on_this_shard().then([si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) { + return si->has_relevant_range_on_this_shard().then([&sm, si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) { if (!has_relevant_range_on_this_shard) { sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}: ignore ranges on shard={}", plan_id, cf_id, this_shard_id()); @@ -219,8 +219,20 @@ future<> stream_transfer_task::execute() { std::rethrow_exception(ep); }); }).then([this, id, plan_id, cf_id] { + _mutation_done_sent = true; sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE Reply from {}", plan_id, id.addr); - }).handle_exception([this, plan_id, id] (auto ep){ + }).handle_exception([this, plan_id, cf_id, id] (std::exception_ptr ep) { + // If the table is dropped during streaming, we can ignore the + // errors and make the stream successful. This allows user to + // drop tables during node operations like decommission or + // bootstrap. + if (!session->manager().db().column_family_exists(cf_id)) { + sslog.warn("[Stream #{}] Ignore the table with table_id {} which is dropped during streaming: {}", plan_id, cf_id, ep); + if (!_mutation_done_sent) { + return session->manager().ms().send_stream_mutation_done(id, plan_id, _ranges, cf_id, session->dst_cpu_id); + } + return make_ready_future<>(); + } sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep); std::rethrow_exception(ep); }); diff --git a/streaming/stream_transfer_task.hh b/streaming/stream_transfer_task.hh index ccb5136794..6cb152190b 100644 --- a/streaming/stream_transfer_task.hh +++ b/streaming/stream_transfer_task.hh @@ -30,6 +30,7 @@ private: dht::token_range_vector _ranges; std::map _shard_ranges; long _total_size; + bool _mutation_done_sent = false; public: using UUID = utils::UUID; stream_transfer_task(stream_transfer_task&&) = default;