everywhere: Update for deprecated apply functions

Now apply is only for tuples, for varargs use invoke.

This depends on the seastar changes adding invoke.

Signed-off-by: Rafael Ávila de Espíndola <espindola@scylladb.com>
Message-Id: <20200324163809.93648-1-espindola@scylladb.com>
This commit is contained in:
Rafael Ávila de Espíndola
2020-03-24 09:38:09 -07:00
committed by Avi Kivity
parent 088660680c
commit eca0ac5772
30 changed files with 51 additions and 51 deletions

View File

@@ -864,7 +864,7 @@ future<executor::request_return_type> executor::create_table(client_state& clien
}
return create_keyspace(keyspace_name).then([this, table_name, request = std::move(request), schema, view_builders = std::move(view_builders)] () mutable {
return futurize_apply([&] { return _mm.announce_new_column_family(schema, false); }).then([this, table_info = std::move(request), schema, view_builders = std::move(view_builders)] () mutable {
return futurize_invoke([&] { return _mm.announce_new_column_family(schema, false); }).then([this, table_info = std::move(request), schema, view_builders = std::move(view_builders)] () mutable {
return parallel_for_each(std::move(view_builders), [schema] (schema_builder builder) {
return service::get_local_migration_manager().announce_new_view(view_ptr(builder.build()));
}).then([this, table_info = std::move(table_info), schema] () mutable {

View File

@@ -69,7 +69,7 @@ class api_handler : public handler_base {
public:
api_handler(const std::function<future<executor::request_return_type>(std::unique_ptr<request> req)>& _handle) : _f_handle(
[this, _handle](std::unique_ptr<request> req, std::unique_ptr<reply> rep) {
return seastar::futurize_apply(_handle, std::move(req)).then_wrapped([this, rep = std::move(rep)](future<executor::request_return_type> resf) mutable {
return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped([this, rep = std::move(rep)](future<executor::request_return_type> resf) mutable {
if (resf.failed()) {
// Exceptions of type api_error are wrapped as JSON and
// returned to the client as expected. Other types of

View File

@@ -648,7 +648,7 @@ void set_storage_service(http_context& ctx, routes& r) {
ss::set_trace_probability.set(r, [](std::unique_ptr<request> req) {
auto probability = req->get_query_param("probability");
return futurize<json::json_return_type>::apply([probability] {
return futurize<json::json_return_type>::invoke([probability] {
double real_prob = std::stod(probability.c_str());
return tracing::tracing::tracing_instance().invoke_on_all([real_prob] (auto& local_tracing) {
local_tracing.set_trace_probability(real_prob);

View File

@@ -65,7 +65,7 @@ static future<> create_metadata_table_if_missing_impl(
std::string_view cql,
::service::migration_manager& mm) {
static auto ignore_existing = [] (seastar::noncopyable_function<future<>()> func) {
return futurize_apply(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { });
return futurize_invoke(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { });
};
auto& db = qp.db();
auto parsed_statement = cql3::query_processor::parse_statement(cql);
@@ -92,7 +92,7 @@ future<> create_metadata_table_if_missing(
cql3::query_processor& qp,
std::string_view cql,
::service::migration_manager& mm) noexcept {
return futurize_apply(create_metadata_table_if_missing_impl, table_name, qp, cql, mm);
return futurize_invoke(create_metadata_table_if_missing_impl, table_name, qp, cql, mm);
}
future<> wait_for_schema_agreement(::service::migration_manager& mm, const database& db, seastar::abort_source& as) {

View File

@@ -230,7 +230,7 @@ future<authenticated_user> password_authenticator::authenticate(
// obsolete prepared statements pretty quickly.
// Rely on query processing caching statements instead, and lets assume
// that a map lookup string->statement is not gonna kill us much.
return futurize_apply([this, username, password] {
return futurize_invoke([this, username, password] {
static const sstring query = format("SELECT {} FROM {} WHERE {} = ?",
SALTED_HASH,
meta::roles_table::qualified_name(),

View File

@@ -419,7 +419,7 @@ future<> create_role(
return make_ready_future<>();
}
return futurize_apply(
return futurize_invoke(
&validate_authentication_options_are_supported,
options,
ser.underlying_authenticator().supported_options()).then([&ser, name, &options] {
@@ -443,7 +443,7 @@ future<> alter_role(
return make_ready_future<>();
}
return futurize_apply(
return futurize_invoke(
&validate_authentication_options_are_supported,
options,
ser.underlying_authenticator().supported_options()).then([&ser, name, &options] {

View File

@@ -158,7 +158,7 @@ public:
}
virtual future<authenticated_user> get_authenticated_user() const {
return futurize_apply([this] {
return futurize_invoke([this] {
return _sasl->get_authenticated_user().handle_exception([](auto ep) {
try {
std::rethrow_exception(ep);

View File

@@ -268,7 +268,7 @@ public:
if (_selectors->requires_thread()) {
return async(std::move(func));
} else {
return futurize_apply(std::move(func));
return futurize_invoke(std::move(func));
}
}

View File

@@ -1470,7 +1470,7 @@ future<mutation> database::apply_counter_update(schema_ptr s, const frozen_mutat
update_write_metrics_for_timed_out_write();
return make_exception_future<mutation>(timed_out_error{});
}
return update_write_metrics(seastar::futurize_apply([&] {
return update_write_metrics(seastar::futurize_invoke([&] {
if (!s->is_synced()) {
throw std::runtime_error(format("attempted to mutate using not synced schema of {}.{}, version={}",
s->ks_name(), s->cf_name(), s->version()));

View File

@@ -1277,7 +1277,7 @@ void db::commitlog::segment_manager::flush_segments(bool force) {
template <typename Func>
static auto close_on_failure(future<file> file_fut, Func func) {
return file_fut.then([func = std::move(func)](file f) {
return futurize_apply(func, f).handle_exception([f] (std::exception_ptr e) mutable {
return futurize_invoke(func, f).handle_exception([f] (std::exception_ptr e) mutable {
return f.close().then_wrapped([f, e = std::move(e)] (future<> x) {
using futurator = futurize<std::result_of_t<Func(file)>>;
return futurator::make_exception_future(e);
@@ -1424,7 +1424,7 @@ future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager:
promise<> p;
_segment_allocating.emplace(p.get_future());
auto f = _segment_allocating->get_future(timeout);
futurize_apply([this] {
futurize_invoke([this] {
return with_gate(_gate, [this] {
return new_segment().discard_result().finally([this]() {
_segment_allocating = std::nullopt;

View File

@@ -313,7 +313,7 @@ bool manager::store_hint(ep_key_type ep, schema_ptr s, lw_shared_ptr<const froze
future<db::commitlog> manager::end_point_hints_manager::add_store() noexcept {
manager_logger.trace("Going to add a store to {}", _hints_dir.c_str());
return futurize_apply([this] {
return futurize_invoke([this] {
return io_check([name = _hints_dir.c_str()] { return recursive_touch_directory(name); }).then([this] () {
commitlog::config cfg;
@@ -349,7 +349,7 @@ future<db::commitlog> manager::end_point_hints_manager::add_store() noexcept {
future<> manager::end_point_hints_manager::flush_current_hints() noexcept {
// flush the currently created hints to disk
if (_hints_store_anchor) {
return futurize_apply([this] {
return futurize_invoke([this] {
return with_lock(file_update_mutex(), [this]() -> future<> {
return get_or_load().then([] (hints_store_ptr cptr) {
return cptr->shutdown();
@@ -397,7 +397,7 @@ future<timespec> manager::end_point_hints_manager::sender::get_last_file_modific
}
future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_mutation_and_schema m, const std::vector<gms::inet_address>& natural_endpoints) noexcept {
return futurize_apply([this, m = std::move(m), &natural_endpoints] () mutable -> future<> {
return futurize_invoke([this, m = std::move(m), &natural_endpoints] () mutable -> future<> {
// The fact that we send with CL::ALL in both cases below ensures that new hints are not going
// to be generated as a result of hints sending.
if (boost::range::find(natural_endpoints, end_point_key()) != natural_endpoints.end()) {
@@ -538,7 +538,7 @@ void manager::drain_for(gms::inet_address endpoint) {
// Future is waited on indirectly in `stop()` (via `_draining_eps_gate`).
(void)with_gate(_draining_eps_gate, [this, endpoint] {
return with_semaphore(drain_lock(), 1, [this, endpoint] {
return futurize_apply([this, endpoint] () {
return futurize_invoke([this, endpoint] () {
if (utils::fb_utilities::is_me(endpoint)) {
return parallel_for_each(_ep_managers, [] (auto& pair) {
return pair.second.stop(drain::yes).finally([&pair] {

View File

@@ -124,7 +124,7 @@ future<> system_distributed_keyspace::start() {
}
static auto ignore_existing = [] (seastar::noncopyable_function<future<>()> func) {
return futurize_apply(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { });
return futurize_invoke(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { });
};
// We use min_timestamp so that the default keyspace metadata will lose with any manual adjustments.

View File

@@ -720,7 +720,7 @@ future<> consume_partitions(flat_mutation_reader& reader, Consumer consumer, db:
if (!mo) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return futurator::apply(c, std::move(*mo));
return futurator::invoke(c, std::move(*mo));
});
});
});

View File

@@ -274,7 +274,7 @@ future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) {
}
future<> gossiper::do_send_ack_msg(msg_addr from, gossip_digest_syn syn_msg) {
return futurize_apply([this, from, syn_msg = std::move(syn_msg)] () mutable {
return futurize_invoke([this, from, syn_msg = std::move(syn_msg)] () mutable {
auto g_digest_list = syn_msg.get_gossip_digests();
do_sort(g_digest_list);
utils::chunked_vector<gossip_digest> delta_gossip_digest_list;
@@ -379,7 +379,7 @@ future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) {
}
future<> gossiper::do_send_ack2_msg(msg_addr from, utils::chunked_vector<gossip_digest> ack_msg_digest) {
return futurize_apply([this, from, ack_msg_digest = std::move(ack_msg_digest)] () mutable {
return futurize_invoke([this, from, ack_msg_digest = std::move(ack_msg_digest)] () mutable {
/* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
std::map<inet_address, endpoint_state> delta_ep_state_map;
for (auto g_digest : ack_msg_digest) {

View File

@@ -149,7 +149,7 @@ void memtable::clear() noexcept {
}
future<> memtable::clear_gently() noexcept {
return futurize_apply([this] {
return futurize_invoke([this] {
auto t = std::make_unique<seastar::thread>([this] {
auto& alloc = allocator();

View File

@@ -124,7 +124,7 @@ future<> redis_server::do_accepts(int which, bool keepalive, socket_address serv
}
future<redis_server::result> redis_server::connection::process_request_one(redis::request&& request, redis::redis_options& opts, service_permit permit) {
return futurize_apply([this, request = std::move(request), &opts, permit] () mutable {
return futurize_invoke([this, request = std::move(request), &opts, permit] () mutable {
return _server._query_processor.local().process(std::move(request), seastar::ref(opts), permit).then([] (auto&& message) {
return make_ready_future<redis_server::result> (std::move(message));
});

View File

@@ -1309,7 +1309,7 @@ std::ostream& operator<<(std::ostream& out, row_cache& rc) {
}
future<> row_cache::do_update(row_cache::external_updater eu, row_cache::internal_updater iu) noexcept {
return futurize_apply([this] {
return futurize_invoke([this] {
return get_units(_update_sem, 1);
}).then([this, eu = std::move(eu), iu = std::move(iu)] (auto permit) mutable {
auto pos = dht::ring_position::min();
@@ -1319,7 +1319,7 @@ future<> row_cache::do_update(row_cache::external_updater eu, row_cache::interna
_prev_snapshot = std::exchange(_underlying, _snapshot_source());
++_underlying_phase;
}();
return futurize_apply([&iu] {
return futurize_invoke([&iu] {
return iu();
}).then_wrapped([this, permit = std::move(permit)] (auto f) {
_prev_snapshot_pos = {};

View File

@@ -68,8 +68,8 @@ future<prepare_response> paxos_state::prepare(tracing::trace_state_ptr tr_state,
if (ballot.timestamp() > state._promised_ballot.timestamp()) {
logger.debug("Promising ballot {}", ballot);
tracing::trace(tr_state, "Promising ballot {}", ballot);
auto f1 = futurize_apply(db::system_keyspace::save_paxos_promise, *schema, std::ref(key), ballot, timeout);
auto f2 = futurize_apply([&] {
auto f1 = futurize_invoke(db::system_keyspace::save_paxos_promise, *schema, std::ref(key), ballot, timeout);
auto f2 = futurize_invoke([&] {
return do_with(dht::partition_range_vector({dht::partition_range::make_singular({token, key})}),
[tr_state, schema, &cmd, only_digest, da, timeout] (const dht::partition_range_vector& prv) {
return get_local_storage_proxy().get_db().local().query(schema, cmd,

View File

@@ -919,7 +919,7 @@ future<paxos::prepare_summary> paxos_response_handler::prepare_ballot(utils::UUI
[this, ballot] (paxos::prepare_summary& summary, auto& request_tracker, shared_ptr<paxos_response_handler>& prh) mutable {
paxos::paxos_state::logger.trace("CAS[{}] prepare_ballot: sending ballot {} to {}", _id, ballot, _live_endpoints);
return parallel_for_each(_live_endpoints, [this, &summary, ballot, &request_tracker] (gms::inet_address peer) mutable {
return futurize_apply([&] {
return futurize_invoke([&] {
// To generate less network traffic, only the closest replica (first one in the list of participants)
// sends query result content while other replicas send digests needed to check consistency.
bool only_digest = peer != _live_endpoints[0];
@@ -1083,7 +1083,7 @@ future<bool> paxos_response_handler::accept_proposal(const paxos::proposal& prop
(auto& request_tracker, shared_ptr<paxos_response_handler>& prh) {
paxos::paxos_state::logger.trace("CAS[{}] accept_proposal: sending commit {} to {}", _id, proposal, _live_endpoints);
return parallel_for_each(_live_endpoints, [this, &request_tracker, timeout_if_partially_accepted, &proposal] (gms::inet_address peer) mutable {
return futurize_apply([&] {
return futurize_invoke([&] {
if (fbu::is_me(peer)) {
tracing::trace(tr_state, "accept_proposal: accept {} locally", proposal);
return paxos::paxos_state::accept(tr_state, _schema, proposal.update.decorated_key(*_schema).token(), proposal, _timeout);
@@ -1930,7 +1930,7 @@ storage_proxy::hint_to_dead_endpoints(response_id_type id, db::consistency_level
template<typename Range, typename CreateWriteHandler>
future<std::vector<storage_proxy::unique_response_handler>> storage_proxy::mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, service_permit permit, CreateWriteHandler create_handler) {
// apply is used to convert exceptions to exceptional future
return futurize<std::vector<storage_proxy::unique_response_handler>>::apply([this] (Range&& mutations, db::consistency_level cl, db::write_type type, service_permit permit, CreateWriteHandler create_handler) {
return futurize<std::vector<storage_proxy::unique_response_handler>>::invoke([this] (Range&& mutations, db::consistency_level cl, db::write_type type, service_permit permit, CreateWriteHandler create_handler) {
std::vector<unique_response_handler> ids;
ids.reserve(std::distance(std::begin(mutations), std::end(mutations)));
for (auto& m : mutations) {
@@ -2565,9 +2565,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
}
if (coordinator == my_address) {
f = futurize_apply(lmutate);
f = futurize_invoke(lmutate);
} else {
f = futurize_apply(rmutate, coordinator, std::move(forward));
f = futurize_invoke(rmutate, coordinator, std::move(forward));
}
}
@@ -3398,8 +3398,8 @@ protected:
virtual future<> make_requests(digest_resolver_ptr resolver, clock_type::time_point timeout) {
resolver->add_wait_targets(_targets.size());
auto want_digest = _targets.size() > 1;
auto f_data = futurize_apply([&] { return make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest); });
auto f_digest = futurize_apply([&] { return make_digest_requests(resolver, _targets.begin() + 1, _targets.end(), timeout); });
auto f_data = futurize_invoke([&] { return make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest); });
auto f_digest = futurize_invoke([&] { return make_digest_requests(resolver, _targets.begin() + 1, _targets.end(), timeout); });
return when_all_succeed(std::move(f_data), std::move(f_digest)).handle_exception([] (auto&&) { });
}
virtual void got_cl() {}
@@ -4647,7 +4647,7 @@ void storage_proxy::init_messaging_service() {
p->get_stats().forwarded_mutations += forward.size();
return when_all(
// mutate_locally() may throw, putting it into apply() converts exception to a future.
futurize_apply([timeout, &p, &m, reply_to, shard, src_addr = std::move(src_addr), schema_version,
futurize_invoke([timeout, &p, &m, reply_to, shard, src_addr = std::move(src_addr), schema_version,
apply_fn = std::move(apply_fn), trace_state_ptr] () mutable {
// FIXME: get_schema_for_write() doesn't timeout
return get_schema_for_write(schema_version, netw::messaging_service::msg_addr{reply_to, shard})

View File

@@ -51,7 +51,7 @@ static future<> cl_test(commitlog::config cfg, noncopyable_function<future<> (co
cfg.commit_log_location = tmp.path().string();
return commitlog::create_commitlog(cfg).then([f = std::move(f)](commitlog log) mutable {
return do_with(std::move(log), [f = std::move(f)](commitlog& log) {
return futurize_apply(f, log).finally([&log] {
return futurize_invoke(f, log).finally([&log] {
return log.shutdown().then([&log] {
return log.clear();
});

View File

@@ -1320,7 +1320,7 @@ SEASTAR_TEST_CASE(test_writetime_and_ttl) {
.with_rows({{
{long_type->decompose(int64_t(ts1))},
}});
auto msg2f = futurize_apply([&] { return e.execute_cql("SELECT writetime(c) FROM cf"); });
auto msg2f = futurize_invoke([&] { return e.execute_cql("SELECT writetime(c) FROM cf"); });
msg2f.wait();
assert_that_failed(msg2f);
});
@@ -1442,7 +1442,7 @@ auto validate_request_failure(
const sstring& request,
const sstring& expected_message,
const source_location& loc = source_location::current()) {
return futurize_apply([&] { return env.execute_cql(request); })
return futurize_invoke([&] { return env.execute_cql(request); })
.then_wrapped([expected_message, loc] (future<shared_ptr<cql_transport::messages::result_message>> f) {
BOOST_REQUIRE_EXCEPTION(f.get(),
exceptions::invalid_request_exception,

View File

@@ -194,7 +194,7 @@ SEASTAR_TEST_CASE(test_secondary_index_if_exists) {
e.execute_cql("drop index if exists cf_a_idx").get();
// Expect exceptions::invalid_request_exception: Index 'cf_a_idx'
// could not be found in any of the tables of keyspace 'ks'
assert_that_failed(seastar::futurize_apply([&e] { return e.execute_cql("drop index cf_a_idx"); }));
assert_that_failed(seastar::futurize_invoke([&e] { return e.execute_cql("drop index cf_a_idx"); }));
});
}

View File

@@ -52,7 +52,7 @@ static future<> cl_test(commitlog::config cfg, noncopyable_function<future<> (co
cfg.commit_log_location = tmp.path().string();
return commitlog::create_commitlog(cfg).then([f = std::move(f)](commitlog log) mutable {
return do_with(std::move(log), [f = std::move(f)](commitlog& log) {
return futurize_apply(f, log).finally([&log] {
return futurize_invoke(f, log).finally([&log] {
return log.shutdown().then([&log] {
return log.clear();
});

View File

@@ -129,7 +129,7 @@ with_cob(thrift_fn::function<void (const T& ret)>&& cob,
thrift_fn::function<void (::apache::thrift::TDelayedException* _throw)>&& exn_cob,
Func&& func) {
// then_wrapped() terminates the fiber by calling one of the cob objects
(void)futurize<noexcept_movable_t<T>>::apply([func = std::forward<Func>(func)] {
(void)futurize<noexcept_movable_t<T>>::invoke([func = std::forward<Func>(func)] {
return noexcept_movable<T>::wrap(func());
}).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (auto&& f) {
try {
@@ -147,7 +147,7 @@ with_cob(thrift_fn::function<void ()>&& cob,
thrift_fn::function<void (::apache::thrift::TDelayedException* _throw)>&& exn_cob,
Func&& func) {
// then_wrapped() terminates the fiber by calling one of the cob objects
(void)futurize_apply(func).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> f) {
(void)futurize_invoke(func).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> f) {
try {
f.get();
cob();
@@ -162,7 +162,7 @@ template <typename Func>
void
with_exn_cob(thrift_fn::function<void (::apache::thrift::TDelayedException* _throw)>&& exn_cob, Func&& func) {
// then_wrapped() terminates the fiber by calling one of the cob objects
(void)futurize_apply(func).then_wrapped([exn_cob = std::move(exn_cob)] (future<> f) {
(void)futurize_invoke(func).then_wrapped([exn_cob = std::move(exn_cob)] (future<> f) {
try {
f.get();
} catch (...) {

View File

@@ -245,7 +245,7 @@ cql_server::do_accepts(int which, bool keepalive, socket_address server_addr) {
++_connects;
++_connections;
// Move the processing into the background.
(void)futurize_apply([this, conn] {
(void)futurize_invoke([this, conn] {
return advertise_new_connection(conn); // Notify any listeners about new connection.
}).then_wrapped([this, conn] (future<> f) {
try {
@@ -394,7 +394,7 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>>
auto linearization_buffer = std::make_unique<bytes_ostream>();
auto linearization_buffer_ptr = linearization_buffer.get();
return futurize_apply([this, cqlop, stream, &fbuf, &client_state, linearization_buffer_ptr, permit = std::move(permit), trace_state] () mutable {
return futurize_invoke([this, cqlop, stream, &fbuf, &client_state, linearization_buffer_ptr, permit = std::move(permit), trace_state] () mutable {
// When using authentication, we need to ensure we are doing proper state transitions,
// i.e. we cannot simply accept any query/exec ops unless auth is complete
switch (client_state.get_auth_state()) {

View File

@@ -113,7 +113,7 @@ public:
using futurator = futurize<std::result_of_t<Func()>>;
return futurator::apply(std::forward<Func>(func)).then_wrapped([this, rp, post = std::forward<Post>(post)](typename futurator::type f) mutable {
return futurator::invoke(std::forward<Func>(func)).then_wrapped([this, rp, post = std::forward<Post>(post)](typename futurator::type f) mutable {
auto i = _map.find(rp);
assert(i != _map.end());

View File

@@ -95,7 +95,7 @@ private:
template<typename Func, typename T = std::result_of_t<Func()>>
joinpoint<T> make_joinpoint(Func && f) {
return joinpoint<T>([f = std::forward<Func>(f)] {
return futurize<T>::apply(f);
return futurize<T>::invoke(f);
});
}

View File

@@ -235,7 +235,7 @@ public:
// get_shared_future() may throw, so make sure to call it before invoking the loader(key)
f = e->loaded().get_shared_future();
// Future indirectly forwarded to `e`.
(void)futurize_apply([&] { return loader(key); }).then_wrapped([e](future<value_type>&& val_fut) mutable {
(void)futurize_invoke([&] { return loader(key); }).then_wrapped([e](future<value_type>&& val_fut) mutable {
if (val_fut.failed()) {
e->loaded().set_exception(val_fut.get_exception());
} else {

View File

@@ -197,7 +197,7 @@ class region_group {
Func func;
public:
void allocate() override {
futurator::apply(func).forward_to(std::move(pr));
futurator::invoke(func).forward_to(std::move(pr));
}
void fail(std::exception_ptr e) override {
pr.set_exception(e);
@@ -346,7 +346,7 @@ public:
});
if (!blocked_at) {
return futurator::apply(func);
return futurator::invoke(func);
}
auto fn = std::make_unique<concrete_allocating_function<Func>>(std::forward<Func>(func));

View File

@@ -39,7 +39,7 @@ private:
private:
future<> do_trigger(seastar::shared_promise<> pr) {
_pending = {};
return futurize_apply(_func).then_wrapped([pr = std::move(pr)] (auto&& f) mutable {
return futurize_invoke(_func).then_wrapped([pr = std::move(pr)] (auto&& f) mutable {
if (f.failed()) {
pr.set_exception(f.get_exception());
} else {