treewide: silence discarded future warnings for legit discards
This patch silences those future discard warnings where it is clear that discarding the future was actually the intent of the original author, *and* they did the necessary precautions (handling errors). The patch also adds some trivial error handling (logging the error) in some places, which were lacking this, but otherwise look ok. No functional changes.
This commit is contained in:
@@ -77,17 +77,23 @@ private:
|
||||
void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {}
|
||||
|
||||
void on_drop_keyspace(const sstring& ks_name) override {
|
||||
_authorizer.revoke_all(
|
||||
// Do it in the background.
|
||||
(void)_authorizer.revoke_all(
|
||||
auth::make_data_resource(ks_name)).handle_exception_type([](const unsupported_authorization_operation&) {
|
||||
// Nothing.
|
||||
}).handle_exception([] (std::exception_ptr e) {
|
||||
log.error("Unexpected exception while revoking all permissions on dropped keyspace: {}", e);
|
||||
});
|
||||
}
|
||||
|
||||
void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {
|
||||
_authorizer.revoke_all(
|
||||
// Do it in the background.
|
||||
(void)_authorizer.revoke_all(
|
||||
auth::make_data_resource(
|
||||
ks_name, cf_name)).handle_exception_type([](const unsupported_authorization_operation&) {
|
||||
// Nothing.
|
||||
}).handle_exception([] (std::exception_ptr e) {
|
||||
log.error("Unexpected exception while revoking all permissions on dropped table: {}", e);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
10
database.cc
10
database.cc
@@ -621,7 +621,8 @@ database::init_commitlog() {
|
||||
_commitlog->discard_completed_segments(id);
|
||||
return;
|
||||
}
|
||||
_column_families[id]->flush();
|
||||
// Initiate a background flush. Waited upon in `stop()`.
|
||||
(void)_column_families[id]->flush();
|
||||
}).release(); // we have longer life time than CL. Ignore reg anchor
|
||||
});
|
||||
}
|
||||
@@ -1377,7 +1378,7 @@ future<> dirty_memory_manager::flush_when_needed() {
|
||||
// Do not wait. The semaphore will protect us against a concurrent flush. But we
|
||||
// want to start a new one as soon as the permits are destroyed and the semaphore is
|
||||
// made ready again, not when we are done with the current one.
|
||||
this->flush_one(*(candidate_memtable.get_memtable_list()), std::move(permit));
|
||||
(void)this->flush_one(*(candidate_memtable.get_memtable_list()), std::move(permit));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
@@ -1948,10 +1949,13 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
|
||||
return cf.make_streaming_reader(std::move(schema), *_contexts[shard].range, slice, fwd_mr);
|
||||
}
|
||||
virtual void destroy_reader(shard_id shard, future<stopped_reader> reader_fut) noexcept override {
|
||||
reader_fut.then([this, zis = shared_from_this(), shard] (stopped_reader&& reader) mutable {
|
||||
// Move to the background.
|
||||
(void)reader_fut.then([this, zis = shared_from_this(), shard] (stopped_reader&& reader) mutable {
|
||||
return smp::submit_to(shard, [ctx = std::move(_contexts[shard]), handle = std::move(reader.handle)] () mutable {
|
||||
ctx.semaphore->unregister_inactive_read(std::move(*handle));
|
||||
});
|
||||
}).handle_exception([shard] (std::exception_ptr e) {
|
||||
dblog.warn("Failed to destroy shard reader of streaming multishard reader on shard {}: {}", shard, e);
|
||||
});
|
||||
}
|
||||
virtual reader_concurrency_semaphore& semaphore() override {
|
||||
|
||||
@@ -118,7 +118,8 @@ future<> db::batchlog_manager::start() {
|
||||
// round-robin scheduling.
|
||||
if (engine().cpu_id() == 0) {
|
||||
_timer.set_callback([this] {
|
||||
do_batch_log_replay().handle_exception([] (auto ep) {
|
||||
// Do it in the background.
|
||||
(void)do_batch_log_replay().handle_exception([] (auto ep) {
|
||||
blogger.error("Exception in batch replay: {}", ep);
|
||||
}).finally([this] {
|
||||
_timer.arm(lowres_clock::now() + std::chrono::milliseconds(replay_interval));
|
||||
|
||||
@@ -1447,7 +1447,7 @@ void db::commitlog::segment_manager::discard_unused_segments() {
|
||||
// segments on deletion queue could be non-empty, and we don't want
|
||||
// those accidentally left around for replay.
|
||||
if (!_shutdown) {
|
||||
with_gate(_gate, [this] {
|
||||
(void)with_gate(_gate, [this] {
|
||||
return do_pending_deletes();
|
||||
});
|
||||
}
|
||||
@@ -1598,7 +1598,7 @@ future<> db::commitlog::segment_manager::clear() {
|
||||
*/
|
||||
void db::commitlog::segment_manager::sync() {
|
||||
for (auto s : _segments) {
|
||||
s->sync(); // we do not care about waiting...
|
||||
(void)s->sync(); // we do not care about waiting...
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1606,7 +1606,7 @@ void db::commitlog::segment_manager::on_timer() {
|
||||
// Gate, because we are starting potentially blocking ops
|
||||
// without waiting for them, so segement_manager could be shut down
|
||||
// while they are running.
|
||||
seastar::with_gate(_gate, [this] {
|
||||
(void)seastar::with_gate(_gate, [this] {
|
||||
if (cfg.mode != sync_mode::BATCH) {
|
||||
sync();
|
||||
}
|
||||
|
||||
@@ -158,7 +158,8 @@ void manager::forbid_hints_for_eps_with_pending_hints() {
|
||||
|
||||
bool manager::end_point_hints_manager::store_hint(schema_ptr s, lw_shared_ptr<const frozen_mutation> fm, tracing::trace_state_ptr tr_state) noexcept {
|
||||
try {
|
||||
with_gate(_store_gate, [this, s = std::move(s), fm = std::move(fm), tr_state] () mutable {
|
||||
// Future is waited on indirectly in `stop()` (via `_store_gate`).
|
||||
(void)with_gate(_store_gate, [this, s = std::move(s), fm = std::move(fm), tr_state] () mutable {
|
||||
++_hints_in_progress;
|
||||
size_t mut_size = fm->representation().size();
|
||||
shard_stats().size_of_hints_in_progress += mut_size;
|
||||
@@ -534,7 +535,8 @@ void manager::drain_for(gms::inet_address endpoint) {
|
||||
|
||||
manager_logger.trace("on_leave_cluster: {} is removed/decommissioned", endpoint);
|
||||
|
||||
with_gate(_draining_eps_gate, [this, endpoint] {
|
||||
// Future is waited on indirectly in `stop()` (via `_draining_eps_gate`).
|
||||
(void)with_gate(_draining_eps_gate, [this, endpoint] {
|
||||
return futurize_apply([this, endpoint] () {
|
||||
if (utils::fb_utilities::is_me(endpoint)) {
|
||||
return parallel_for_each(_ep_managers, [] (auto& pair) {
|
||||
@@ -672,7 +674,8 @@ future<> manager::end_point_hints_manager::sender::send_one_mutation(frozen_muta
|
||||
|
||||
future<> manager::end_point_hints_manager::sender::send_one_hint(lw_shared_ptr<send_one_file_ctx> ctx_ptr, fragmented_temporary_buffer buf, db::replay_position rp, gc_clock::duration secs_since_file_mod, const sstring& fname) {
|
||||
return _resource_manager.get_send_units_for(buf.size_bytes()).then([this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] (auto units) mutable {
|
||||
with_gate(ctx_ptr->file_send_gate, [this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable {
|
||||
// Future is waited on indirectly in `send_one_file()` (via `ctx_ptr->file_send_gate`).
|
||||
(void)with_gate(ctx_ptr->file_send_gate, [this, secs_since_file_mod, &fname, buf = std::move(buf), rp, ctx_ptr] () mutable {
|
||||
try {
|
||||
try {
|
||||
ctx_ptr->rps_set.emplace(rp);
|
||||
|
||||
@@ -55,7 +55,9 @@ private:
|
||||
template<typename Func>
|
||||
future<> with_sem(Func&& func) {
|
||||
return get_units(_sem, 1).then([func = std::forward<Func>(func)] (auto units) mutable {
|
||||
func().finally([units = std::move(units)] {});
|
||||
// Future is discarded purposefully, see method description.
|
||||
// FIXME: error handling.
|
||||
(void)func().finally([units = std::move(units)] {});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -1150,7 +1150,8 @@ future<> view_builder::start() {
|
||||
calculate_shard_build_step(std::move(built), std::move(in_progress)).get();
|
||||
_mm.register_listener(this);
|
||||
_current_step = _base_to_build_step.begin();
|
||||
_build_step.trigger();
|
||||
// Waited on indirectly in stop().
|
||||
(void)_build_step.trigger();
|
||||
});
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -1427,7 +1428,8 @@ static future<> flush_base(lw_shared_ptr<column_family> base, abort_source& as)
|
||||
}
|
||||
|
||||
void view_builder::on_create_view(const sstring& ks_name, const sstring& view_name) {
|
||||
with_semaphore(_sem, 1, [ks_name, view_name, this] {
|
||||
// Do it in the background, serialized.
|
||||
(void)with_semaphore(_sem, 1, [ks_name, view_name, this] {
|
||||
auto view = view_ptr(_db.find_schema(ks_name, view_name));
|
||||
auto& step = get_or_create_build_step(view->view_info()->base_id());
|
||||
return when_all(step.base->await_pending_writes(), step.base->await_pending_streams()).discard_result().then([this, &step] {
|
||||
@@ -1442,14 +1444,16 @@ void view_builder::on_create_view(const sstring& ks_name, const sstring& view_na
|
||||
if (f.failed()) {
|
||||
vlogger.error("Error setting up view for building {}.{}: {}", view->ks_name(), view->cf_name(), f.get_exception());
|
||||
}
|
||||
_build_step.trigger();
|
||||
// Waited on indirectly in stop().
|
||||
(void)_build_step.trigger();
|
||||
});
|
||||
});
|
||||
}).handle_exception_type([] (no_such_column_family&) { });
|
||||
}
|
||||
|
||||
void view_builder::on_update_view(const sstring& ks_name, const sstring& view_name, bool) {
|
||||
with_semaphore(_sem, 1, [ks_name, view_name, this] {
|
||||
// Do it in the background, serialized.
|
||||
(void)with_semaphore(_sem, 1, [ks_name, view_name, this] {
|
||||
auto view = view_ptr(_db.find_schema(ks_name, view_name));
|
||||
auto step_it = _base_to_build_step.find(view->view_info()->base_id());
|
||||
if (step_it == _base_to_build_step.end()) {
|
||||
@@ -1466,7 +1470,8 @@ void view_builder::on_update_view(const sstring& ks_name, const sstring& view_na
|
||||
|
||||
void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name) {
|
||||
vlogger.info0("Stopping to build view {}.{}", ks_name, view_name);
|
||||
with_semaphore(_sem, 1, [ks_name, view_name, this] {
|
||||
// Do it in the background, serialized.
|
||||
(void)with_semaphore(_sem, 1, [ks_name, view_name, this] {
|
||||
// The view is absent from the database at this point, so find it by brute force.
|
||||
([&, this] {
|
||||
for (auto& [_, step] : _base_to_build_step) {
|
||||
|
||||
@@ -459,7 +459,7 @@ void distributed_loader::reshard(distributed<database>& db, sstring ks_name, sst
|
||||
}
|
||||
}).then([&cf, sstables] {
|
||||
// schedule deletion of shared sstables after we're certain that new unshared ones were successfully forwarded to respective shards.
|
||||
sstables::delete_atomically(std::move(sstables)).handle_exception([op = sstables::background_jobs().start()] (std::exception_ptr eptr) {
|
||||
(void)sstables::delete_atomically(std::move(sstables)).handle_exception([op = sstables::background_jobs().start()] (std::exception_ptr eptr) {
|
||||
try {
|
||||
std::rethrow_exception(eptr);
|
||||
} catch (...) {
|
||||
|
||||
@@ -339,7 +339,8 @@ void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
|
||||
_ms_registered = true;
|
||||
ms().register_gossip_digest_syn([] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) {
|
||||
auto from = netw::messaging_service::get_source(cinfo);
|
||||
smp::submit_to(0, [from, syn_msg = std::move(syn_msg)] () mutable {
|
||||
// In a new fiber.
|
||||
(void)smp::submit_to(0, [from, syn_msg = std::move(syn_msg)] () mutable {
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
return gossiper.handle_syn_msg(from, std::move(syn_msg));
|
||||
}).handle_exception([] (auto ep) {
|
||||
@@ -349,7 +350,8 @@ void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
|
||||
});
|
||||
ms().register_gossip_digest_ack([] (const rpc::client_info& cinfo, gossip_digest_ack msg) {
|
||||
auto from = netw::messaging_service::get_source(cinfo);
|
||||
smp::submit_to(0, [from, msg = std::move(msg)] () mutable {
|
||||
// In a new fiber.
|
||||
(void)smp::submit_to(0, [from, msg = std::move(msg)] () mutable {
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
return gossiper.handle_ack_msg(from, std::move(msg));
|
||||
}).handle_exception([] (auto ep) {
|
||||
@@ -358,7 +360,8 @@ void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
ms().register_gossip_digest_ack2([] (gossip_digest_ack2 msg) {
|
||||
smp::submit_to(0, [msg = std::move(msg)] () mutable {
|
||||
// In a new fiber.
|
||||
(void)smp::submit_to(0, [msg = std::move(msg)] () mutable {
|
||||
return gms::get_local_gossiper().handle_ack2_msg(std::move(msg));
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep);
|
||||
@@ -371,7 +374,8 @@ void gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
|
||||
});
|
||||
});
|
||||
ms().register_gossip_shutdown([] (inet_address from) {
|
||||
smp::submit_to(0, [from] {
|
||||
// In a new fiber.
|
||||
(void)smp::submit_to(0, [from] {
|
||||
return gms::get_local_gossiper().handle_shutdown_msg(from);
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_SHUTDOWN: {}", ep);
|
||||
@@ -525,7 +529,7 @@ void gossiper::remove_endpoint(inet_address endpoint) {
|
||||
// do subscribers first so anything in the subscriber that depends on gossiper state won't get confused
|
||||
// We can not run on_remove callbacks here becasue on_remove in
|
||||
// storage_service might take the gossiper::timer_callback_lock
|
||||
seastar::async([this, endpoint] {
|
||||
(void)seastar::async([this, endpoint] {
|
||||
_subscribers.for_each([endpoint] (auto& subscriber) {
|
||||
subscriber->on_remove(endpoint);
|
||||
});
|
||||
@@ -606,7 +610,8 @@ future<gossiper::endpoint_permit> gossiper::lock_endpoint(inet_address ep) {
|
||||
// - failure_detector
|
||||
// - on_remove callbacks, e.g, storage_service -> access token_metadata
|
||||
void gossiper::run() {
|
||||
timer_callback_lock().then([this, g = this->shared_from_this()] {
|
||||
// Run it in the background.
|
||||
(void)timer_callback_lock().then([this, g = this->shared_from_this()] {
|
||||
return seastar::async([this, g] {
|
||||
logger.trace("=== Gossip round START");
|
||||
|
||||
@@ -653,13 +658,15 @@ void gossiper::run() {
|
||||
}
|
||||
logger.debug("Talk to {} live nodes: {}", nr_live_nodes, live_nodes);
|
||||
for (auto& ep: live_nodes) {
|
||||
do_gossip_to_live_member(message, ep).handle_exception([] (auto ep) {
|
||||
// Do it in the background.
|
||||
(void)do_gossip_to_live_member(message, ep).handle_exception([] (auto ep) {
|
||||
logger.trace("Failed to do_gossip_to_live_member: {}", ep);
|
||||
});
|
||||
}
|
||||
|
||||
/* Gossip to some unreachable member with some probability to check if he is back up */
|
||||
do_gossip_to_unreachable_member(message).handle_exception([] (auto ep) {
|
||||
// Do it in the background.
|
||||
(void)do_gossip_to_unreachable_member(message).handle_exception([] (auto ep) {
|
||||
logger.trace("Faill to do_gossip_to_unreachable_member: {}", ep);
|
||||
});
|
||||
|
||||
@@ -682,7 +689,8 @@ void gossiper::run() {
|
||||
logger.trace("gossiped_to_seed={}, _live_endpoints.size={}, _seeds.size={}",
|
||||
_gossiped_to_seed, _live_endpoints.size(), _seeds.size());
|
||||
if (!_gossiped_to_seed || _live_endpoints.size() < _seeds.size()) {
|
||||
do_gossip_to_seed(message).handle_exception([] (auto ep) {
|
||||
// Do it in the background.
|
||||
(void)do_gossip_to_seed(message).handle_exception([] (auto ep) {
|
||||
logger.trace("Faill to do_gossip_to_seed: {}", ep);
|
||||
});
|
||||
}
|
||||
@@ -1294,7 +1302,8 @@ void gossiper::mark_alive(inet_address addr, endpoint_state& local_state) {
|
||||
local_state.mark_dead();
|
||||
msg_addr id = get_msg_addr(addr);
|
||||
logger.trace("Sending a EchoMessage to {}", id);
|
||||
ms().send_gossip_echo(id).then([this, addr] {
|
||||
// Do it in the background.
|
||||
(void)ms().send_gossip_echo(id).then([this, addr] {
|
||||
logger.trace("Got EchoMessage Reply");
|
||||
set_last_processed_message_at();
|
||||
return seastar::async([this, addr] {
|
||||
@@ -1656,7 +1665,8 @@ future<> gossiper::do_shadow_round() {
|
||||
gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), digests);
|
||||
auto id = get_msg_addr(seed);
|
||||
logger.trace("Sending a GossipDigestSyn (ShadowRound) to {} ...", id);
|
||||
ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) {
|
||||
// Do it in the background.
|
||||
(void)ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) {
|
||||
logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, ep);
|
||||
});
|
||||
}
|
||||
|
||||
5
init.cc
5
init.cc
@@ -165,8 +165,11 @@ void init_ms_fd_gossiper(sharded<gms::gossiper>& gossiper
|
||||
throw bad_configuration_error();
|
||||
}
|
||||
gossiper.local().set_seeds(seeds);
|
||||
gossiper.invoke_on_all([cluster_name](gms::gossiper& g) {
|
||||
// Do it in the background.
|
||||
(void)gossiper.invoke_on_all([cluster_name](gms::gossiper& g) {
|
||||
g.set_cluster_name(cluster_name);
|
||||
}).handle_exception([] (std::exception_ptr e) {
|
||||
startlog.error("Unexpected exception while setting cluster name: {}", e);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -623,7 +623,8 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
|
||||
assert(res.second);
|
||||
it = res.first;
|
||||
uint32_t src_cpu_id = engine().cpu_id();
|
||||
_rpc->make_client<rpc::no_wait_type(gms::inet_address, uint32_t, uint64_t)>(messaging_verb::CLIENT_ID)(*it->second.rpc_client, utils::fb_utilities::get_broadcast_address(), src_cpu_id,
|
||||
// No reply is received, nothing to wait for.
|
||||
(void)_rpc->make_client<rpc::no_wait_type(gms::inet_address, uint32_t, uint64_t)>(messaging_verb::CLIENT_ID)(*it->second.rpc_client, utils::fb_utilities::get_broadcast_address(), src_cpu_id,
|
||||
query::result_memory_limiter::maximum_result_size).handle_exception([ms = shared_from_this(), remote_addr, verb] (std::exception_ptr ep) {
|
||||
mlogger.debug("Failed to send client id to {} for verb {}: {}", remote_addr, std::underlying_type_t<messaging_verb>(verb), ep);
|
||||
});
|
||||
@@ -649,7 +650,7 @@ bool messaging_service::remove_rpc_client_one(clients_map& clients, msg_addr id,
|
||||
// This will make sure messaging_service::stop() blocks until
|
||||
// client->stop() is over.
|
||||
//
|
||||
client->stop().finally([id, client, ms = shared_from_this()] {
|
||||
(void)client->stop().finally([id, client, ms = shared_from_this()] {
|
||||
mlogger.debug("dropped connection to {}", id.addr);
|
||||
}).discard_result();
|
||||
found = true;
|
||||
|
||||
@@ -299,7 +299,8 @@ flat_mutation_reader read_context::create_reader(
|
||||
}
|
||||
|
||||
void read_context::destroy_reader(shard_id shard, future<stopped_reader> reader_fut) noexcept {
|
||||
with_gate(_dismantling_gate, [this, shard, reader_fut = std::move(reader_fut)] () mutable {
|
||||
// Future is waited on indirectly in `stop()` (via `_dismantling_gate`).
|
||||
(void)with_gate(_dismantling_gate, [this, shard, reader_fut = std::move(reader_fut)] () mutable {
|
||||
return reader_fut.then_wrapped([this, shard] (future<stopped_reader>&& reader_fut) {
|
||||
auto& rm = _readers[shard];
|
||||
|
||||
@@ -331,10 +332,12 @@ future<> read_context::stop() {
|
||||
auto pr = promise<>();
|
||||
auto fut = pr.get_future();
|
||||
auto gate_fut = _dismantling_gate.is_closed() ? make_ready_future<>() : _dismantling_gate.close();
|
||||
gate_fut.then([this] {
|
||||
// Forwarded to `fut`.
|
||||
(void)gate_fut.then([this] {
|
||||
for (shard_id shard = 0; shard != smp::count; ++shard) {
|
||||
if (_readers[shard].state == reader_state::saving) {
|
||||
_db.invoke_on(shard, [schema = global_schema_ptr(_schema), rm = std::move(_readers[shard])] (database& db) mutable {
|
||||
// Move to the background.
|
||||
(void)_db.invoke_on(shard, [schema = global_schema_ptr(_schema), rm = std::move(_readers[shard])] (database& db) mutable {
|
||||
// We cannot use semaphore() here, as this can be already destroyed.
|
||||
auto& table = db.find_column_family(schema);
|
||||
table.read_concurrency_semaphore().unregister_inactive_read(std::move(*rm.handle));
|
||||
|
||||
@@ -776,7 +776,8 @@ static future<> repair_cf_range(repair_info& ri,
|
||||
completion.enter();
|
||||
auto leave = defer([&completion] { completion.leave(); });
|
||||
|
||||
when_all(checksums.begin(), checksums.end()).then(
|
||||
// Do it in the background.
|
||||
(void)when_all(checksums.begin(), checksums.end()).then(
|
||||
[&ri, &cf, range, &neighbors, &success]
|
||||
(std::vector<future<partition_checksum>> checksums) {
|
||||
// If only some of the replicas of this range are alive,
|
||||
@@ -1412,7 +1413,8 @@ static int do_repair_start(seastar::sharded<database>& db, sstring keyspace,
|
||||
repair_results.push_back(std::move(f));
|
||||
}
|
||||
|
||||
when_all(repair_results.begin(), repair_results.end()).then([id, fail = std::move(fail)] (std::vector<future<>> results) mutable {
|
||||
// Do it in the background.
|
||||
(void)when_all(repair_results.begin(), repair_results.end()).then([id, fail = std::move(fail)] (std::vector<future<>> results) mutable {
|
||||
if (std::any_of(results.begin(), results.end(), [] (auto&& f) { return f.failed(); })) {
|
||||
rlogger.info("repair {} failed", id);
|
||||
} else {
|
||||
|
||||
@@ -1995,7 +1995,8 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
|
||||
return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(),
|
||||
[&ms, src_cpu_id, from, repair_meta_id, source] () mutable {
|
||||
auto sink = ms.make_sink_for_repair_get_row_diff_with_rpc_stream(source);
|
||||
repair_get_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
|
||||
// Start a new fiber.
|
||||
(void)repair_get_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
|
||||
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
|
||||
rlogger.info("Failed to process get_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
|
||||
});
|
||||
@@ -2008,7 +2009,8 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
|
||||
return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(),
|
||||
[&ms, src_cpu_id, from, repair_meta_id, source] () mutable {
|
||||
auto sink = ms.make_sink_for_repair_put_row_diff_with_rpc_stream(source);
|
||||
repair_put_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
|
||||
// Start a new fiber.
|
||||
(void)repair_put_row_diff_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
|
||||
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
|
||||
rlogger.info("Failed to process put_row_diff_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
|
||||
});
|
||||
@@ -2021,7 +2023,8 @@ future<> repair_init_messaging_service_handler(repair_service& rs, distributed<d
|
||||
return with_scheduling_group(service::get_local_storage_service().db().local().get_streaming_scheduling_group(),
|
||||
[&ms, src_cpu_id, from, repair_meta_id, source] () mutable {
|
||||
auto sink = ms.make_sink_for_repair_get_full_row_hashes_with_rpc_stream(source);
|
||||
repair_get_full_row_hashes_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
|
||||
// Start a new fiber.
|
||||
(void)repair_get_full_row_hashes_with_rpc_stream_handler(from, src_cpu_id, repair_meta_id, sink, source).handle_exception(
|
||||
[from, repair_meta_id, sink, source] (std::exception_ptr ep) {
|
||||
rlogger.info("Failed to process get_full_row_hashes_with_rpc_stream_handler from={}, repair_meta_id={}: {}", from, repair_meta_id, ep);
|
||||
});
|
||||
|
||||
@@ -161,7 +161,8 @@ future<schema_ptr> schema_registry_entry::start_loading(async_schema_loader load
|
||||
auto sf = _schema_promise.get_shared_future();
|
||||
_state = state::LOADING;
|
||||
slogger.trace("Loading {}", _version);
|
||||
f.then_wrapped([self = shared_from_this(), this] (future<frozen_schema>&& f) {
|
||||
// Move to background.
|
||||
(void)f.then_wrapped([self = shared_from_this(), this] (future<frozen_schema>&& f) {
|
||||
_loader = {};
|
||||
if (_state != state::LOADING) {
|
||||
slogger.trace("Loading of {} aborted", _version);
|
||||
@@ -223,7 +224,8 @@ future<> schema_registry_entry::maybe_sync(std::function<future<>()> syncer) {
|
||||
});
|
||||
auto sf = _synced_promise.get_shared_future();
|
||||
_sync_state = schema_registry_entry::sync_state::SYNCING;
|
||||
f.then_wrapped([this, self = shared_from_this()] (auto&& f) {
|
||||
// Move to background.
|
||||
(void)f.then_wrapped([this, self = shared_from_this()] (auto&& f) {
|
||||
if (_sync_state != sync_state::SYNCING) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -96,7 +96,8 @@ void migration_manager::init_messaging_service()
|
||||
auto& ms = netw::get_local_messaging_service();
|
||||
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> m) {
|
||||
auto src = netw::messaging_service::get_source(cinfo);
|
||||
do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector<frozen_mutation>& mutations, shared_ptr<storage_proxy>& p) {
|
||||
// Start a new fiber.
|
||||
(void)do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector<frozen_mutation>& mutations, shared_ptr<storage_proxy>& p) {
|
||||
return service::get_local_migration_manager().merge_schema_from(src, mutations);
|
||||
}).then_wrapped([src] (auto&& f) {
|
||||
if (f.failed()) {
|
||||
|
||||
@@ -106,7 +106,8 @@ void cache_hitrate_calculator::recalculate_timer() {
|
||||
|
||||
void cache_hitrate_calculator::run_on(size_t master, lowres_clock::duration d) {
|
||||
if (!_stopped) {
|
||||
_me.invoke_on(master, [d] (cache_hitrate_calculator& local) {
|
||||
// Do it in the background.
|
||||
(void)_me.invoke_on(master, [d] (cache_hitrate_calculator& local) {
|
||||
local._timer.arm(d);
|
||||
}).handle_exception_type([] (seastar::no_sharded_instance_exception&) { /* ignore */ });
|
||||
}
|
||||
|
||||
@@ -405,7 +405,8 @@ public:
|
||||
++stats().throttled_base_writes;
|
||||
tracing::trace(trace, "Delaying user write due to view update backlog {}/{} by {}us",
|
||||
backlog.current, backlog.max, delay.count());
|
||||
sleep_abortable<seastar::steady_clock_type>(delay).finally([self = shared_from_this(), on_resume = std::forward<Func>(on_resume)] {
|
||||
// Waited on indirectly.
|
||||
(void)sleep_abortable<seastar::steady_clock_type>(delay).finally([self = shared_from_this(), on_resume = std::forward<Func>(on_resume)] {
|
||||
--self->stats().throttled_base_writes;
|
||||
on_resume(self.get());
|
||||
}).handle_exception_type([] (const seastar::sleep_aborted& ignored) { });
|
||||
@@ -1636,7 +1637,8 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
|
||||
}
|
||||
}
|
||||
|
||||
f.handle_exception([response_id, forward_size, coordinator, handler_ptr, p = shared_from_this(), &stats] (std::exception_ptr eptr) {
|
||||
// Waited on indirectly.
|
||||
(void)f.handle_exception([response_id, forward_size, coordinator, handler_ptr, p = shared_from_this(), &stats] (std::exception_ptr eptr) {
|
||||
++stats.writes_errors.get_ep_stat(coordinator);
|
||||
p->got_failure_response(response_id, coordinator, forward_size + 1, std::nullopt);
|
||||
try {
|
||||
@@ -2476,9 +2478,11 @@ protected:
|
||||
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(_schema, cl, _targets.size(), timeout);
|
||||
auto exec = shared_from_this();
|
||||
|
||||
make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout).finally([exec]{});
|
||||
// Waited on indirectly.
|
||||
(void)make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout).finally([exec]{});
|
||||
|
||||
data_resolver->done().then_wrapped([this, exec, data_resolver, cmd = std::move(cmd), cl, timeout] (future<> f) {
|
||||
// Waited on indirectly.
|
||||
(void)data_resolver->done().then_wrapped([this, exec, data_resolver, cmd = std::move(cmd), cl, timeout] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
auto rr_opt = data_resolver->resolve(_schema, *cmd, original_row_limit(), original_per_partition_row_limit(), original_partition_limit()); // reconciliation happens here
|
||||
@@ -2495,7 +2499,8 @@ protected:
|
||||
// wait for write to complete before returning result to prevent multiple concurrent read requests to
|
||||
// trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum
|
||||
// another read had returned a newer value (but the newer value had not yet been sent to the other replicas)
|
||||
_proxy->schedule_repair(data_resolver->get_diffs_for_repair(), _cl, _trace_state, _permit).then([this, result = std::move(result)] () mutable {
|
||||
// Waited on indirectly.
|
||||
(void)_proxy->schedule_repair(data_resolver->get_diffs_for_repair(), _cl, _trace_state, _permit).then([this, result = std::move(result)] () mutable {
|
||||
_result_promise.set_value(std::move(result));
|
||||
on_read_resolved();
|
||||
}).handle_exception([this, exec] (std::exception_ptr eptr) {
|
||||
@@ -2560,11 +2565,13 @@ public:
|
||||
db::is_datacenter_local(_cl) ? db::count_local_endpoints(_targets): _targets.size(), timeout);
|
||||
auto exec = shared_from_this();
|
||||
|
||||
make_requests(digest_resolver, timeout).finally([exec]() {
|
||||
// Waited on indirectly.
|
||||
(void)make_requests(digest_resolver, timeout).finally([exec]() {
|
||||
// hold on to executor until all queries are complete
|
||||
});
|
||||
|
||||
digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future<foreign_ptr<lw_shared_ptr<query::result>>, bool> f) mutable {
|
||||
// Waited on indirectly.
|
||||
(void)digest_resolver->has_cl().then_wrapped([exec, digest_resolver, timeout] (future<foreign_ptr<lw_shared_ptr<query::result>>, bool> f) mutable {
|
||||
bool background_repair_check = false;
|
||||
try {
|
||||
exec->got_cl();
|
||||
@@ -2598,7 +2605,8 @@ public:
|
||||
exec->on_read_resolved();
|
||||
}
|
||||
|
||||
digest_resolver->done().then([exec, digest_resolver, timeout, background_repair_check] () mutable {
|
||||
// Waited on indirectly.
|
||||
(void)digest_resolver->done().then([exec, digest_resolver, timeout, background_repair_check] () mutable {
|
||||
if (background_repair_check && !digest_resolver->digests_match()) {
|
||||
exec->_proxy->_stats.read_repair_repaired_background++;
|
||||
exec->_result_promise = promise<foreign_ptr<lw_shared_ptr<query::result>>>();
|
||||
@@ -2661,7 +2669,8 @@ public:
|
||||
return make_data_requests(resolver, _targets.end() - 1, _targets.end(), timeout, true);
|
||||
}
|
||||
};
|
||||
send_request(resolver->has_data()).finally([exec = shared_from_this()]{});
|
||||
// Waited on indirectly.
|
||||
(void)send_request(resolver->has_data()).finally([exec = shared_from_this()]{});
|
||||
}
|
||||
});
|
||||
auto& sr = _schema->speculative_retry();
|
||||
|
||||
@@ -127,7 +127,8 @@ public:
|
||||
~data_consume_context() {
|
||||
if (_ctx) {
|
||||
auto f = _ctx->close();
|
||||
f.handle_exception([ctx = std::move(_ctx), sst = std::move(_sst)](auto) {});
|
||||
// Can't wait on the future in the destructor.
|
||||
(void)f.handle_exception([ctx = std::move(_ctx), sst = std::move(_sst)](auto) {});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -46,7 +46,9 @@ public:
|
||||
|
||||
void seek(uint64_t pos) {
|
||||
if (_in) {
|
||||
seastar::with_gate(_close_gate, [in = std::move(_in)]() mutable {
|
||||
// Future is waited on indirectly in `close()` (via `_close_gate`).
|
||||
// FIXME: error handling
|
||||
(void)seastar::with_gate(_close_gate, [in = std::move(_in)]() mutable {
|
||||
auto fut = in->close();
|
||||
return fut.then([in = std::move(in)] {});
|
||||
});
|
||||
|
||||
@@ -2847,13 +2847,15 @@ int sstable::compare_by_max_timestamp(const sstable& other) const {
|
||||
|
||||
sstable::~sstable() {
|
||||
if (_index_file) {
|
||||
_index_file.close().handle_exception([save = _index_file, op = background_jobs().start()] (auto ep) {
|
||||
// Registered as background job.
|
||||
(void)_index_file.close().handle_exception([save = _index_file, op = background_jobs().start()] (auto ep) {
|
||||
sstlog.warn("sstable close index_file failed: {}", ep);
|
||||
general_disk_error();
|
||||
});
|
||||
}
|
||||
if (_data_file) {
|
||||
_data_file.close().handle_exception([save = _data_file, op = background_jobs().start()] (auto ep) {
|
||||
// Registered as background job.
|
||||
(void)_data_file.close().handle_exception([save = _data_file, op = background_jobs().start()] (auto ep) {
|
||||
sstlog.warn("sstable close data_file failed: {}", ep);
|
||||
general_disk_error();
|
||||
});
|
||||
@@ -2870,7 +2872,7 @@ sstable::~sstable() {
|
||||
// FIXME:
|
||||
// - Longer term fix is to hand off deletion of sstables to a manager that can
|
||||
// deal with sstable marked to be deleted after the corresponding object is destructed.
|
||||
unlink().handle_exception(
|
||||
(void)unlink().handle_exception(
|
||||
[op = background_jobs().start()] (std::exception_ptr eptr) {
|
||||
try {
|
||||
std::rethrow_exception(eptr);
|
||||
|
||||
@@ -113,7 +113,7 @@ void stream_manager::remove_stream(UUID plan_id) {
|
||||
_initiated_streams.erase(plan_id);
|
||||
_receiving_streams.erase(plan_id);
|
||||
// FIXME: Do not ignore the future
|
||||
remove_progress_on_all_shards(plan_id).handle_exception([plan_id] (auto ep) {
|
||||
(void)remove_progress_on_all_shards(plan_id).handle_exception([plan_id] (auto ep) {
|
||||
sslog.info("stream_manager: Fail to remove progress for plan_id={}: {}", plan_id, ep);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -138,7 +138,8 @@ future<stop_iteration> do_send_mutations(lw_shared_ptr<send_info> si, frozen_mut
|
||||
return get_local_stream_manager().mutation_send_limiter().wait().then([si, fragmented, fm = std::move(fm)] () mutable {
|
||||
sslog.debug("[Stream #{}] SEND STREAM_MUTATION to {}, cf_id={}", si->plan_id, si->id, si->cf_id);
|
||||
auto fm_size = fm.representation().size();
|
||||
netw::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented, si->reason).then([si, fm_size] {
|
||||
// Do it in the background.
|
||||
(void)netw::get_local_messaging_service().send_stream_mutation(si->id, si->plan_id, std::move(fm), si->dst_cpu_id, fragmented, si->reason).then([si, fm_size] {
|
||||
sslog.debug("[Stream #{}] GOT STREAM_MUTATION Reply from {}", si->plan_id, si->id.addr);
|
||||
get_local_stream_manager().update_progress(si->plan_id, si->id.addr, progress_info::direction::OUT, fm_size);
|
||||
si->mutations_done.signal();
|
||||
|
||||
2
table.cc
2
table.cc
@@ -1196,7 +1196,7 @@ table::on_compaction_completion(const std::vector<sstables::shared_sstable>& new
|
||||
rebuild_statistics();
|
||||
|
||||
// This is done in the background, so we can consider this compaction completed.
|
||||
seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] {
|
||||
(void)seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] {
|
||||
return with_semaphore(_sstable_deletion_sem, 1, [this, sstables_to_remove = std::move(sstables_to_remove)] {
|
||||
return sstables::delete_atomically(sstables_to_remove).then_wrapped([this, sstables_to_remove] (future<> f) {
|
||||
std::exception_ptr eptr;
|
||||
|
||||
@@ -129,7 +129,7 @@ with_cob(thrift_fn::function<void (const T& ret)>&& cob,
|
||||
thrift_fn::function<void (::apache::thrift::TDelayedException* _throw)>&& exn_cob,
|
||||
Func&& func) {
|
||||
// then_wrapped() terminates the fiber by calling one of the cob objects
|
||||
futurize<noexcept_movable_t<T>>::apply([func = std::forward<Func>(func)] {
|
||||
(void)futurize<noexcept_movable_t<T>>::apply([func = std::forward<Func>(func)] {
|
||||
return noexcept_movable<T>::wrap(func());
|
||||
}).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (auto&& f) {
|
||||
try {
|
||||
@@ -147,7 +147,7 @@ with_cob(thrift_fn::function<void ()>&& cob,
|
||||
thrift_fn::function<void (::apache::thrift::TDelayedException* _throw)>&& exn_cob,
|
||||
Func&& func) {
|
||||
// then_wrapped() terminates the fiber by calling one of the cob objects
|
||||
futurize<void>::apply(func).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> f) {
|
||||
(void)futurize<void>::apply(func).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
cob();
|
||||
@@ -162,7 +162,7 @@ template <typename Func>
|
||||
void
|
||||
with_exn_cob(thrift_fn::function<void (::apache::thrift::TDelayedException* _throw)>&& exn_cob, Func&& func) {
|
||||
// then_wrapped() terminates the fiber by calling one of the cob objects
|
||||
futurize<void>::apply(func).then_wrapped([exn_cob = std::move(exn_cob)] (future<> f) {
|
||||
(void)futurize<void>::apply(func).then_wrapped([exn_cob = std::move(exn_cob)] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (...) {
|
||||
|
||||
@@ -230,12 +230,14 @@ thrift_server::do_accepts(int which, bool keepalive) {
|
||||
if (_stop_gate.is_closed()) {
|
||||
return;
|
||||
}
|
||||
with_gate(_stop_gate, [&, this] {
|
||||
// Future is waited on indirectly in `stop()` (via `_stop_gate`).
|
||||
(void)with_gate(_stop_gate, [&, this] {
|
||||
return _listeners[which].accept().then([this, which, keepalive] (accept_result ar) {
|
||||
auto&& [fd, addr] = ar;
|
||||
fd.set_nodelay(true);
|
||||
fd.set_keepalive(keepalive);
|
||||
with_gate(_stop_gate, [&, this] {
|
||||
// Future is waited on indirectly in `stop()` (via `_stop_gate`).
|
||||
(void)with_gate(_stop_gate, [&, this] {
|
||||
return do_with(connection(*this, std::move(fd), addr), [this] (auto& conn) {
|
||||
return conn.process().then_wrapped([this, &conn] (future<> f) {
|
||||
conn.shutdown();
|
||||
@@ -262,7 +264,8 @@ void thrift_server::maybe_retry_accept(int which, bool keepalive, std::exception
|
||||
};
|
||||
auto retry_with_backoff = [&] {
|
||||
// FIXME: Consider using exponential backoff
|
||||
sleep(1ms).then([retry = std::move(retry)] { retry(); });
|
||||
// Done in the background.
|
||||
(void)sleep(1ms).then([retry = std::move(retry)] { retry(); });
|
||||
};
|
||||
try {
|
||||
std::rethrow_exception(std::move(ex));
|
||||
|
||||
@@ -209,7 +209,8 @@ future<> trace_keyspace_helper::start() {
|
||||
}
|
||||
|
||||
void trace_keyspace_helper::write_one_session_records(lw_shared_ptr<one_session_records> records) {
|
||||
with_gate(_pending_writes, [this, records = std::move(records)] {
|
||||
// Future is waited on indirectly in `stop()` (via `_pending_writes`).
|
||||
(void)with_gate(_pending_writes, [this, records = std::move(records)] {
|
||||
auto num_records = records->size();
|
||||
return this->flush_one_session_mutations(std::move(records)).finally([this, num_records] { _local_tracing.write_complete(num_records); });
|
||||
}).handle_exception([this] (auto ep) {
|
||||
|
||||
@@ -243,7 +243,8 @@ cql_server::do_accepts(int which, bool keepalive, socket_address server_addr) {
|
||||
auto conn = make_shared<connection>(*this, server_addr, std::move(fd), std::move(addr));
|
||||
++_connects;
|
||||
++_connections;
|
||||
conn->process().then_wrapped([this, conn] (future<> f) {
|
||||
// Move connection to the background, monitor for lifetime and errors.
|
||||
(void)conn->process().then_wrapped([this, conn] (future<> f) {
|
||||
--_connections;
|
||||
try {
|
||||
f.get();
|
||||
|
||||
@@ -580,7 +580,9 @@ private:
|
||||
}
|
||||
|
||||
// Reload all those which value needs to be reloaded.
|
||||
with_gate(_timer_reads_gate, [this] {
|
||||
// Future is waited on indirectly in `stop()` (via `_timer_reads_gate`).
|
||||
// FIXME: error handling
|
||||
(void)with_gate(_timer_reads_gate, [this] {
|
||||
auto to_reload = boost::copy_range<utils::chunked_vector<timestamped_val_ptr>>(_lru_list
|
||||
| boost::adaptors::filtered([this] (ts_value_lru_entry& lru_entry) {
|
||||
return lru_entry.timestamped_value().loaded() + _refresh < loading_cache_clock_type::now();
|
||||
|
||||
@@ -234,7 +234,8 @@ public:
|
||||
_set.insert(*e);
|
||||
// get_shared_future() may throw, so make sure to call it before invoking the loader(key)
|
||||
f = e->loaded().get_shared_future();
|
||||
futurize_apply([&] { return loader(key); }).then_wrapped([e](future<value_type>&& val_fut) mutable {
|
||||
// Future indirectly forwarded to `e`.
|
||||
(void)futurize_apply([&] { return loader(key); }).then_wrapped([e](future<value_type>&& val_fut) mutable {
|
||||
if (val_fut.failed()) {
|
||||
e->loaded().set_exception(val_fut.get_exception());
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user