everywhere: Prepare for seastar api v4 (when_all_succeed return value)
The seastar api v4 changes the return type of when_all_succeed. This patch adds discard_result when that is best solution to handle the change. This doesn't do the actual update to v4 since there are still a few issues left to fix in seastar. A patch doing just the update will follow. Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com> Message-Id: <20200617233150.918110-1-espindola@scylladb.com>
This commit is contained in:
committed by
Avi Kivity
parent
bc854342e7
commit
f6e407ecd2
@@ -178,7 +178,7 @@ future<> service::start(::service::migration_manager& mm) {
|
||||
return create_keyspace_if_missing(mm);
|
||||
}).then([this] {
|
||||
return _role_manager->start().then([this] {
|
||||
return when_all_succeed(_authorizer->start(), _authenticator->start());
|
||||
return when_all_succeed(_authorizer->start(), _authenticator->start()).discard_result();
|
||||
});
|
||||
}).then([this] {
|
||||
_permissions_cache = std::make_unique<permissions_cache>(_permissions_cache_config, *this, log);
|
||||
@@ -199,7 +199,7 @@ future<> service::stop() {
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).then([this] {
|
||||
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop());
|
||||
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop()).discard_result();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -458,7 +458,9 @@ future<> drop_role(const service& ser, std::string_view name) {
|
||||
|
||||
return when_all_succeed(
|
||||
a.revoke_all(name),
|
||||
a.revoke_all(r)).handle_exception_type([](const unsupported_authorization_operation&) {
|
||||
a.revoke_all(r))
|
||||
.discard_result()
|
||||
.handle_exception_type([](const unsupported_authorization_operation&) {
|
||||
// Nothing.
|
||||
});
|
||||
}).then([&ser, name] {
|
||||
|
||||
@@ -161,7 +161,7 @@ future<> standard_role_manager::create_metadata_tables_if_missing() const {
|
||||
meta::role_members_table::name,
|
||||
_qp,
|
||||
create_role_members_query,
|
||||
_migration_manager));
|
||||
_migration_manager)).discard_result();
|
||||
}
|
||||
|
||||
future<> standard_role_manager::create_default_role_if_missing() const {
|
||||
@@ -416,7 +416,7 @@ standard_role_manager::modify_membership(
|
||||
return make_ready_future<>();
|
||||
};
|
||||
|
||||
return when_all_succeed(modify_roles(), modify_role_members());
|
||||
return when_all_succeed(modify_roles(), modify_role_members()).discard_result();
|
||||
}
|
||||
|
||||
future<>
|
||||
|
||||
@@ -1495,7 +1495,7 @@ future<> view_builder::add_new_view(view_ptr view, build_step& step) {
|
||||
auto f = this_shard_id() == 0 ? _sys_dist_ks.start_view_build(view->ks_name(), view->cf_name()) : make_ready_future<>();
|
||||
return when_all_succeed(
|
||||
std::move(f),
|
||||
system_keyspace::register_view_for_building(view->ks_name(), view->cf_name(), step.current_token()));
|
||||
system_keyspace::register_view_for_building(view->ks_name(), view->cf_name(), step.current_token())).discard_result();
|
||||
}
|
||||
|
||||
static future<> flush_base(lw_shared_ptr<column_family> base, abort_source& as) {
|
||||
@@ -1581,7 +1581,9 @@ void view_builder::on_drop_view(const sstring& ks_name, const sstring& view_name
|
||||
return when_all_succeed(
|
||||
system_keyspace::remove_view_build_progress(ks_name, view_name),
|
||||
system_keyspace::remove_built_view(ks_name, view_name),
|
||||
_sys_dist_ks.remove_view(ks_name, view_name)).handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
||||
_sys_dist_ks.remove_view(ks_name, view_name))
|
||||
.discard_result()
|
||||
.handle_exception([ks_name, view_name] (std::exception_ptr ep) {
|
||||
vlogger.warn("Failed to cleanup view {}.{}: {}", ks_name, view_name, ep);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -34,7 +34,7 @@ future<> feed_writer(flat_mutation_reader&& rd, Writer&& wr) {
|
||||
return do_until([&rd] { return rd.is_buffer_empty() && rd.is_end_of_stream(); }, [&rd, &wr] {
|
||||
auto f1 = rd.pop_mutation_fragment().consume(wr);
|
||||
auto f2 = rd.is_buffer_empty() ? rd.fill_buffer(db::no_timeout) : make_ready_future<>();
|
||||
return when_all_succeed(std::move(f1), std::move(f2));
|
||||
return when_all_succeed(std::move(f1), std::move(f2)).discard_result();
|
||||
});
|
||||
}).finally([&wr] {
|
||||
return wr.consume_end_of_stream();
|
||||
|
||||
@@ -406,7 +406,7 @@ future<> timestamp_based_splitting_mutation_writer::write_marker_and_tombstone(c
|
||||
if (tomb_bucket_id) {
|
||||
write_tomb_fut = write_to_bucket(*tomb_bucket_id, clustering_row(cr.key(), cr.tomb(), {}, {}));
|
||||
}
|
||||
return when_all_succeed(std::move(write_marker_fut), std::move(write_tomb_fut));
|
||||
return when_all_succeed(std::move(write_marker_fut), std::move(write_tomb_fut)).discard_result();
|
||||
}
|
||||
|
||||
future<> timestamp_based_splitting_mutation_writer::consume(partition_start&& ps) {
|
||||
|
||||
@@ -188,9 +188,7 @@ future<> create_keyspace_if_not_exists_impl(db::config& config, int default_repl
|
||||
table_gen(ks_name, redis::SETs, sets_schema(ks_name)),
|
||||
table_gen(ks_name, redis::HASHes, hashes_schema(ks_name)),
|
||||
table_gen(ks_name, redis::ZSETs, zsets_schema(ks_name))
|
||||
).then([] {
|
||||
return make_ready_future<>();
|
||||
});
|
||||
).discard_result();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -585,7 +585,7 @@ public:
|
||||
|
||||
future<> wait_for_writer_done() {
|
||||
return parallel_for_each(boost::irange(unsigned(0), unsigned(_nr_peer_nodes)), [this] (unsigned node_idx) {
|
||||
return when_all_succeed(write_end_of_stream(node_idx), do_wait_for_writer_done(node_idx));
|
||||
return when_all_succeed(write_end_of_stream(node_idx), do_wait_for_writer_done(node_idx)).discard_result();
|
||||
}).handle_exception([this] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair_writer: keyspace={}, table={}, wait_for_writer_done failed: {}",
|
||||
_schema->ks_name(), _schema->cf_name(), ep);
|
||||
@@ -735,7 +735,7 @@ public:
|
||||
auto f1 = _sink_source_for_get_full_row_hashes.close();
|
||||
auto f2 = _sink_source_for_get_row_diff.close();
|
||||
auto f3 = _sink_source_for_put_row_diff.close();
|
||||
return when_all_succeed(std::move(gate_future), std::move(writer_future), std::move(f1), std::move(f2), std::move(f3));
|
||||
return when_all_succeed(std::move(gate_future), std::move(writer_future), std::move(f1), std::move(f2), std::move(f3)).discard_result();
|
||||
}
|
||||
|
||||
static std::unordered_map<node_repair_meta_id, lw_shared_ptr<repair_meta>>& repair_meta_map() {
|
||||
@@ -1398,7 +1398,7 @@ public:
|
||||
(rpc::sink<repair_stream_cmd>& sink, rpc::source<repair_hash_with_cmd>& source) mutable {
|
||||
auto source_op = get_full_row_hashes_source_op(current_hashes, remote_node, node_idx, source);
|
||||
auto sink_op = get_full_row_hashes_sink_op(sink);
|
||||
return when_all_succeed(std::move(source_op), std::move(sink_op));
|
||||
return when_all_succeed(std::move(source_op), std::move(sink_op)).discard_result();
|
||||
}).then([this, current_hashes] () mutable {
|
||||
stats().rx_hashes_nr += current_hashes->size();
|
||||
_metrics.rx_hashes_nr += current_hashes->size();
|
||||
@@ -1792,7 +1792,7 @@ public:
|
||||
(rpc::sink<repair_row_on_wire_with_cmd>& sink, rpc::source<repair_stream_cmd>& source) mutable {
|
||||
auto source_op = put_row_diff_source_op(remote_node, node_idx, source);
|
||||
auto sink_op = put_row_diff_sink_op(std::move(rows), sink, remote_node);
|
||||
return when_all_succeed(std::move(source_op), std::move(sink_op));
|
||||
return when_all_succeed(std::move(source_op), std::move(sink_op)).discard_result();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -167,7 +167,7 @@ future<> migration_manager::uninit_messaging_service()
|
||||
ms.unregister_migration_request(),
|
||||
ms.unregister_definitions_update(),
|
||||
ms.unregister_schema_check()
|
||||
);
|
||||
).discard_result();
|
||||
}
|
||||
|
||||
void migration_notifier::register_listener(migration_listener* listener)
|
||||
|
||||
@@ -1248,7 +1248,7 @@ future<> paxos_response_handler::learn_decision(lw_shared_ptr<paxos::proposal> d
|
||||
std::array<std::tuple<lw_shared_ptr<paxos::proposal>, schema_ptr, shared_ptr<paxos_response_handler>, dht::token>, 1> m{std::make_tuple(std::move(decision), _schema, shared_from_this(), _key.token())};
|
||||
future<> f_lwt = _proxy->mutate_internal(std::move(m), _cl_for_learn, false, tr_state, _permit, _timeout);
|
||||
|
||||
return when_all_succeed(std::move(f_cdc), std::move(f_lwt));
|
||||
return when_all_succeed(std::move(f_cdc), std::move(f_lwt)).discard_result();
|
||||
}
|
||||
|
||||
void paxos_response_handler::prune(utils::UUID ballot) {
|
||||
@@ -2278,7 +2278,7 @@ future<> storage_proxy::do_mutate(std::vector<mutation> mutations, db::consisten
|
||||
return seastar::when_all_succeed(
|
||||
mutate_counters(boost::make_iterator_range(mutations.begin(), mid), cl, tr_state, permit, timeout),
|
||||
mutate_internal(boost::make_iterator_range(mid, mutations.end()), cl, false, tr_state, permit, timeout, std::move(cdc_tracker))
|
||||
);
|
||||
).discard_result();
|
||||
}
|
||||
|
||||
future<> storage_proxy::replicate_counter_from_leader(mutation m, db::consistency_level cl, tracing::trace_state_ptr tr_state,
|
||||
@@ -3506,7 +3506,7 @@ protected:
|
||||
auto want_digest = _targets.size() > 1;
|
||||
auto f_data = futurize_invoke([&] { return make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest); });
|
||||
auto f_digest = futurize_invoke([&] { return make_digest_requests(resolver, _targets.begin() + 1, _targets.end(), timeout); });
|
||||
return when_all_succeed(std::move(f_data), std::move(f_digest)).handle_exception([] (auto&&) { });
|
||||
return when_all_succeed(std::move(f_data), std::move(f_digest)).discard_result().handle_exception([] (auto&&) { });
|
||||
}
|
||||
virtual void got_cl() {}
|
||||
uint32_t original_row_limit() const {
|
||||
@@ -5123,7 +5123,7 @@ future<> storage_proxy::uninit_messaging_service() {
|
||||
ms.unregister_paxos_accept(),
|
||||
ms.unregister_paxos_learn(),
|
||||
ms.unregister_paxos_prune()
|
||||
);
|
||||
).discard_result();
|
||||
}
|
||||
|
||||
future<rpc::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>>
|
||||
|
||||
@@ -686,7 +686,7 @@ public:
|
||||
future<> advance_to(const dht::partition_range& range) {
|
||||
return seastar::when_all_succeed(
|
||||
advance_lower_to_start(range),
|
||||
advance_upper_to_end(range));
|
||||
advance_upper_to_end(range)).discard_result();
|
||||
}
|
||||
|
||||
// Get current index entry
|
||||
|
||||
@@ -315,7 +315,7 @@ sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager&
|
||||
desc.creator = std::move(creator);
|
||||
|
||||
return sstables::compact_sstables(std::move(desc), table).then([this, &sstlist] (sstables::compaction_info result) {
|
||||
return when_all_succeed(collect_output_sstables_from_resharding(std::move(result.new_sstables)), remove_input_sstables_from_resharding(sstlist));
|
||||
return when_all_succeed(collect_output_sstables_from_resharding(std::move(result.new_sstables)), remove_input_sstables_from_resharding(sstlist)).discard_result();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -2710,7 +2710,7 @@ future<> sstable::move_to_new_dir(sstring new_dir, int64_t new_generation, bool
|
||||
if (!do_sync_dirs) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return when_all_succeed(sync_directory(old_dir), sync_directory(new_dir));
|
||||
return when_all_succeed(sync_directory(old_dir), sync_directory(new_dir)).discard_result();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -161,7 +161,7 @@ static future<> test_propagation(bool propagate,
|
||||
return seastar::when_all_succeed(std::move(f1), std::move(f2)).finally([sem, queue, want_except_in_run, want_except_in_wait, xr, xw] {
|
||||
BOOST_CHECK_EQUAL(want_except_in_run, *xr);
|
||||
BOOST_CHECK_EQUAL(want_except_in_wait, *xw);
|
||||
}).finally([queue] {
|
||||
}).discard_result().finally([queue] {
|
||||
return queue->close().finally([queue] { });
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user