diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 76242488cc..d09ca944f5 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -178,8 +178,13 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { int32_t status = 0; uint64_t received_partitions = 0; if (f.failed()) { - sslog.error("[Stream #{}] Failed to handle STREAM_MUTATION_FRAGMENTS (receive and distribute phase) for ks={}, cf={}, peer={}: {}", - plan_id, s->ks_name(), s->cf_name(), from.addr, f.get_exception()); + auto ex = f.get_exception(); + auto level = seastar::log_level::error; + if (try_catch(ex)) { + level = seastar::log_level::debug; + } + sslog.log(level, "[Stream #{}] Failed to handle STREAM_MUTATION_FRAGMENTS (receive and distribute phase) for ks={}, cf={}, peer={}: {}", + plan_id, s->ks_name(), s->cf_name(), from.addr, ex); status = -1; } else { received_partitions = f.get0(); @@ -192,7 +197,11 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { return sink.close(); }); }).handle_exception([s, plan_id, from, sink] (std::exception_ptr ep) { - sslog.error("[Stream #{}] Failed to handle STREAM_MUTATION_FRAGMENTS (respond phase) for ks={}, cf={}, peer={}: {}", + auto level = seastar::log_level::error; + if (try_catch(ep)) { + level = seastar::log_level::debug; + } + sslog.log(level, "[Stream #{}] Failed to handle STREAM_MUTATION_FRAGMENTS (respond phase) for ks={}, cf={}, peer={}: {}", plan_id, s->ks_name(), s->cf_name(), from.addr, ep); }); } catch (...) {