diff --git a/alternator/executor.cc b/alternator/executor.cc index e5abf2768f..5a979b43d3 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -864,7 +864,7 @@ future executor::create_table(client_state& clien } return create_keyspace(keyspace_name).then([this, table_name, request = std::move(request), schema, view_builders = std::move(view_builders)] () mutable { - return futurize_apply([&] { return _mm.announce_new_column_family(schema, false); }).then([this, table_info = std::move(request), schema, view_builders = std::move(view_builders)] () mutable { + return futurize_invoke([&] { return _mm.announce_new_column_family(schema, false); }).then([this, table_info = std::move(request), schema, view_builders = std::move(view_builders)] () mutable { return parallel_for_each(std::move(view_builders), [schema] (schema_builder builder) { return service::get_local_migration_manager().announce_new_view(view_ptr(builder.build())); }).then([this, table_info = std::move(table_info), schema] () mutable { diff --git a/alternator/server.cc b/alternator/server.cc index 30b3d190fd..4ccd70809f 100644 --- a/alternator/server.cc +++ b/alternator/server.cc @@ -69,7 +69,7 @@ class api_handler : public handler_base { public: api_handler(const std::function(std::unique_ptr req)>& _handle) : _f_handle( [this, _handle](std::unique_ptr req, std::unique_ptr rep) { - return seastar::futurize_apply(_handle, std::move(req)).then_wrapped([this, rep = std::move(rep)](future resf) mutable { + return seastar::futurize_invoke(_handle, std::move(req)).then_wrapped([this, rep = std::move(rep)](future resf) mutable { if (resf.failed()) { // Exceptions of type api_error are wrapped as JSON and // returned to the client as expected. Other types of diff --git a/api/storage_service.cc b/api/storage_service.cc index 3dc58d2918..c404fdd001 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -648,7 +648,7 @@ void set_storage_service(http_context& ctx, routes& r) { ss::set_trace_probability.set(r, [](std::unique_ptr req) { auto probability = req->get_query_param("probability"); - return futurize::apply([probability] { + return futurize::invoke([probability] { double real_prob = std::stod(probability.c_str()); return tracing::tracing::tracing_instance().invoke_on_all([real_prob] (auto& local_tracing) { local_tracing.set_trace_probability(real_prob); diff --git a/auth/common.cc b/auth/common.cc index 06c692f633..839b1648f4 100644 --- a/auth/common.cc +++ b/auth/common.cc @@ -65,7 +65,7 @@ static future<> create_metadata_table_if_missing_impl( std::string_view cql, ::service::migration_manager& mm) { static auto ignore_existing = [] (seastar::noncopyable_function()> func) { - return futurize_apply(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { }); + return futurize_invoke(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { }); }; auto& db = qp.db(); auto parsed_statement = cql3::query_processor::parse_statement(cql); @@ -92,7 +92,7 @@ future<> create_metadata_table_if_missing( cql3::query_processor& qp, std::string_view cql, ::service::migration_manager& mm) noexcept { - return futurize_apply(create_metadata_table_if_missing_impl, table_name, qp, cql, mm); + return futurize_invoke(create_metadata_table_if_missing_impl, table_name, qp, cql, mm); } future<> wait_for_schema_agreement(::service::migration_manager& mm, const database& db, seastar::abort_source& as) { diff --git a/auth/password_authenticator.cc b/auth/password_authenticator.cc index acfa29c592..d19333c2a5 100644 --- a/auth/password_authenticator.cc +++ b/auth/password_authenticator.cc @@ -230,7 +230,7 @@ future password_authenticator::authenticate( // obsolete prepared statements pretty quickly. // Rely on query processing caching statements instead, and lets assume // that a map lookup string->statement is not gonna kill us much. - return futurize_apply([this, username, password] { + return futurize_invoke([this, username, password] { static const sstring query = format("SELECT {} FROM {} WHERE {} = ?", SALTED_HASH, meta::roles_table::qualified_name(), diff --git a/auth/service.cc b/auth/service.cc index 026e25d0e9..0e832077d5 100644 --- a/auth/service.cc +++ b/auth/service.cc @@ -419,7 +419,7 @@ future<> create_role( return make_ready_future<>(); } - return futurize_apply( + return futurize_invoke( &validate_authentication_options_are_supported, options, ser.underlying_authenticator().supported_options()).then([&ser, name, &options] { @@ -443,7 +443,7 @@ future<> alter_role( return make_ready_future<>(); } - return futurize_apply( + return futurize_invoke( &validate_authentication_options_are_supported, options, ser.underlying_authenticator().supported_options()).then([&ser, name, &options] { diff --git a/auth/transitional.cc b/auth/transitional.cc index 58a0a461e6..f5d98a3e60 100644 --- a/auth/transitional.cc +++ b/auth/transitional.cc @@ -158,7 +158,7 @@ public: } virtual future get_authenticated_user() const { - return futurize_apply([this] { + return futurize_invoke([this] { return _sasl->get_authenticated_user().handle_exception([](auto ep) { try { std::rethrow_exception(ep); diff --git a/cql3/selection/selection.hh b/cql3/selection/selection.hh index 6d12edcbdd..4cd2a40cd2 100644 --- a/cql3/selection/selection.hh +++ b/cql3/selection/selection.hh @@ -268,7 +268,7 @@ public: if (_selectors->requires_thread()) { return async(std::move(func)); } else { - return futurize_apply(std::move(func)); + return futurize_invoke(std::move(func)); } } diff --git a/database.cc b/database.cc index 34a6dd4aaf..3d489ec26a 100644 --- a/database.cc +++ b/database.cc @@ -1470,7 +1470,7 @@ future database::apply_counter_update(schema_ptr s, const frozen_mutat update_write_metrics_for_timed_out_write(); return make_exception_future(timed_out_error{}); } - return update_write_metrics(seastar::futurize_apply([&] { + return update_write_metrics(seastar::futurize_invoke([&] { if (!s->is_synced()) { throw std::runtime_error(format("attempted to mutate using not synced schema of {}.{}, version={}", s->ks_name(), s->cf_name(), s->version())); diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 08e8ce7a5a..31a8ef38b3 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -1277,7 +1277,7 @@ void db::commitlog::segment_manager::flush_segments(bool force) { template static auto close_on_failure(future file_fut, Func func) { return file_fut.then([func = std::move(func)](file f) { - return futurize_apply(func, f).handle_exception([f] (std::exception_ptr e) mutable { + return futurize_invoke(func, f).handle_exception([f] (std::exception_ptr e) mutable { return f.close().then_wrapped([f, e = std::move(e)] (future<> x) { using futurator = futurize>; return futurator::make_exception_future(e); @@ -1424,7 +1424,7 @@ future db::commitlog::segment_manager: promise<> p; _segment_allocating.emplace(p.get_future()); auto f = _segment_allocating->get_future(timeout); - futurize_apply([this] { + futurize_invoke([this] { return with_gate(_gate, [this] { return new_segment().discard_result().finally([this]() { _segment_allocating = std::nullopt; diff --git a/db/hints/manager.cc b/db/hints/manager.cc index dfe5b07892..e77f8807b6 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -313,7 +313,7 @@ bool manager::store_hint(ep_key_type ep, schema_ptr s, lw_shared_ptr manager::end_point_hints_manager::add_store() noexcept { manager_logger.trace("Going to add a store to {}", _hints_dir.c_str()); - return futurize_apply([this] { + return futurize_invoke([this] { return io_check([name = _hints_dir.c_str()] { return recursive_touch_directory(name); }).then([this] () { commitlog::config cfg; @@ -349,7 +349,7 @@ future manager::end_point_hints_manager::add_store() noexcept { future<> manager::end_point_hints_manager::flush_current_hints() noexcept { // flush the currently created hints to disk if (_hints_store_anchor) { - return futurize_apply([this] { + return futurize_invoke([this] { return with_lock(file_update_mutex(), [this]() -> future<> { return get_or_load().then([] (hints_store_ptr cptr) { return cptr->shutdown(); @@ -397,7 +397,7 @@ future manager::end_point_hints_manager::sender::get_last_file_modific } future<> manager::end_point_hints_manager::sender::do_send_one_mutation(frozen_mutation_and_schema m, const std::vector& natural_endpoints) noexcept { - return futurize_apply([this, m = std::move(m), &natural_endpoints] () mutable -> future<> { + return futurize_invoke([this, m = std::move(m), &natural_endpoints] () mutable -> future<> { // The fact that we send with CL::ALL in both cases below ensures that new hints are not going // to be generated as a result of hints sending. if (boost::range::find(natural_endpoints, end_point_key()) != natural_endpoints.end()) { @@ -538,7 +538,7 @@ void manager::drain_for(gms::inet_address endpoint) { // Future is waited on indirectly in `stop()` (via `_draining_eps_gate`). (void)with_gate(_draining_eps_gate, [this, endpoint] { return with_semaphore(drain_lock(), 1, [this, endpoint] { - return futurize_apply([this, endpoint] () { + return futurize_invoke([this, endpoint] () { if (utils::fb_utilities::is_me(endpoint)) { return parallel_for_each(_ep_managers, [] (auto& pair) { return pair.second.stop(drain::yes).finally([&pair] { diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index df34d539b9..d09d1e4784 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -124,7 +124,7 @@ future<> system_distributed_keyspace::start() { } static auto ignore_existing = [] (seastar::noncopyable_function()> func) { - return futurize_apply(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { }); + return futurize_invoke(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { }); }; // We use min_timestamp so that the default keyspace metadata will lose with any manual adjustments. diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index bac6ea0bd4..c01f0640bc 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -720,7 +720,7 @@ future<> consume_partitions(flat_mutation_reader& reader, Consumer consumer, db: if (!mo) { return make_ready_future(stop_iteration::yes); } - return futurator::apply(c, std::move(*mo)); + return futurator::invoke(c, std::move(*mo)); }); }); }); diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 3efc9e8e4c..b987903299 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -274,7 +274,7 @@ future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) { } future<> gossiper::do_send_ack_msg(msg_addr from, gossip_digest_syn syn_msg) { - return futurize_apply([this, from, syn_msg = std::move(syn_msg)] () mutable { + return futurize_invoke([this, from, syn_msg = std::move(syn_msg)] () mutable { auto g_digest_list = syn_msg.get_gossip_digests(); do_sort(g_digest_list); utils::chunked_vector delta_gossip_digest_list; @@ -379,7 +379,7 @@ future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) { } future<> gossiper::do_send_ack2_msg(msg_addr from, utils::chunked_vector ack_msg_digest) { - return futurize_apply([this, from, ack_msg_digest = std::move(ack_msg_digest)] () mutable { + return futurize_invoke([this, from, ack_msg_digest = std::move(ack_msg_digest)] () mutable { /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */ std::map delta_ep_state_map; for (auto g_digest : ack_msg_digest) { diff --git a/memtable.cc b/memtable.cc index 0fd65dbea4..c862fbd0e9 100644 --- a/memtable.cc +++ b/memtable.cc @@ -149,7 +149,7 @@ void memtable::clear() noexcept { } future<> memtable::clear_gently() noexcept { - return futurize_apply([this] { + return futurize_invoke([this] { auto t = std::make_unique([this] { auto& alloc = allocator(); diff --git a/redis/server.cc b/redis/server.cc index 0a4ab48ba4..0e984af36b 100644 --- a/redis/server.cc +++ b/redis/server.cc @@ -124,7 +124,7 @@ future<> redis_server::do_accepts(int which, bool keepalive, socket_address serv } future redis_server::connection::process_request_one(redis::request&& request, redis::redis_options& opts, service_permit permit) { - return futurize_apply([this, request = std::move(request), &opts, permit] () mutable { + return futurize_invoke([this, request = std::move(request), &opts, permit] () mutable { return _server._query_processor.local().process(std::move(request), seastar::ref(opts), permit).then([] (auto&& message) { return make_ready_future (std::move(message)); }); diff --git a/row_cache.cc b/row_cache.cc index a8ce4d43bb..1e3919f020 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -1309,7 +1309,7 @@ std::ostream& operator<<(std::ostream& out, row_cache& rc) { } future<> row_cache::do_update(row_cache::external_updater eu, row_cache::internal_updater iu) noexcept { - return futurize_apply([this] { + return futurize_invoke([this] { return get_units(_update_sem, 1); }).then([this, eu = std::move(eu), iu = std::move(iu)] (auto permit) mutable { auto pos = dht::ring_position::min(); @@ -1319,7 +1319,7 @@ future<> row_cache::do_update(row_cache::external_updater eu, row_cache::interna _prev_snapshot = std::exchange(_underlying, _snapshot_source()); ++_underlying_phase; }(); - return futurize_apply([&iu] { + return futurize_invoke([&iu] { return iu(); }).then_wrapped([this, permit = std::move(permit)] (auto f) { _prev_snapshot_pos = {}; diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index a0b6e53d6e..1bd203b4fd 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -68,8 +68,8 @@ future paxos_state::prepare(tracing::trace_state_ptr tr_state, if (ballot.timestamp() > state._promised_ballot.timestamp()) { logger.debug("Promising ballot {}", ballot); tracing::trace(tr_state, "Promising ballot {}", ballot); - auto f1 = futurize_apply(db::system_keyspace::save_paxos_promise, *schema, std::ref(key), ballot, timeout); - auto f2 = futurize_apply([&] { + auto f1 = futurize_invoke(db::system_keyspace::save_paxos_promise, *schema, std::ref(key), ballot, timeout); + auto f2 = futurize_invoke([&] { return do_with(dht::partition_range_vector({dht::partition_range::make_singular({token, key})}), [tr_state, schema, &cmd, only_digest, da, timeout] (const dht::partition_range_vector& prv) { return get_local_storage_proxy().get_db().local().query(schema, cmd, diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 40acc8af05..4b29edf393 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -919,7 +919,7 @@ future paxos_response_handler::prepare_ballot(utils::UUI [this, ballot] (paxos::prepare_summary& summary, auto& request_tracker, shared_ptr& prh) mutable { paxos::paxos_state::logger.trace("CAS[{}] prepare_ballot: sending ballot {} to {}", _id, ballot, _live_endpoints); return parallel_for_each(_live_endpoints, [this, &summary, ballot, &request_tracker] (gms::inet_address peer) mutable { - return futurize_apply([&] { + return futurize_invoke([&] { // To generate less network traffic, only the closest replica (first one in the list of participants) // sends query result content while other replicas send digests needed to check consistency. bool only_digest = peer != _live_endpoints[0]; @@ -1083,7 +1083,7 @@ future paxos_response_handler::accept_proposal(const paxos::proposal& prop (auto& request_tracker, shared_ptr& prh) { paxos::paxos_state::logger.trace("CAS[{}] accept_proposal: sending commit {} to {}", _id, proposal, _live_endpoints); return parallel_for_each(_live_endpoints, [this, &request_tracker, timeout_if_partially_accepted, &proposal] (gms::inet_address peer) mutable { - return futurize_apply([&] { + return futurize_invoke([&] { if (fbu::is_me(peer)) { tracing::trace(tr_state, "accept_proposal: accept {} locally", proposal); return paxos::paxos_state::accept(tr_state, _schema, proposal.update.decorated_key(*_schema).token(), proposal, _timeout); @@ -1930,7 +1930,7 @@ storage_proxy::hint_to_dead_endpoints(response_id_type id, db::consistency_level template future> storage_proxy::mutate_prepare(Range&& mutations, db::consistency_level cl, db::write_type type, service_permit permit, CreateWriteHandler create_handler) { // apply is used to convert exceptions to exceptional future - return futurize>::apply([this] (Range&& mutations, db::consistency_level cl, db::write_type type, service_permit permit, CreateWriteHandler create_handler) { + return futurize>::invoke([this] (Range&& mutations, db::consistency_level cl, db::write_type type, service_permit permit, CreateWriteHandler create_handler) { std::vector ids; ids.reserve(std::distance(std::begin(mutations), std::end(mutations))); for (auto& m : mutations) { @@ -2565,9 +2565,9 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo } if (coordinator == my_address) { - f = futurize_apply(lmutate); + f = futurize_invoke(lmutate); } else { - f = futurize_apply(rmutate, coordinator, std::move(forward)); + f = futurize_invoke(rmutate, coordinator, std::move(forward)); } } @@ -3398,8 +3398,8 @@ protected: virtual future<> make_requests(digest_resolver_ptr resolver, clock_type::time_point timeout) { resolver->add_wait_targets(_targets.size()); auto want_digest = _targets.size() > 1; - auto f_data = futurize_apply([&] { return make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest); }); - auto f_digest = futurize_apply([&] { return make_digest_requests(resolver, _targets.begin() + 1, _targets.end(), timeout); }); + auto f_data = futurize_invoke([&] { return make_data_requests(resolver, _targets.begin(), _targets.begin() + 1, timeout, want_digest); }); + auto f_digest = futurize_invoke([&] { return make_digest_requests(resolver, _targets.begin() + 1, _targets.end(), timeout); }); return when_all_succeed(std::move(f_data), std::move(f_digest)).handle_exception([] (auto&&) { }); } virtual void got_cl() {} @@ -4647,7 +4647,7 @@ void storage_proxy::init_messaging_service() { p->get_stats().forwarded_mutations += forward.size(); return when_all( // mutate_locally() may throw, putting it into apply() converts exception to a future. - futurize_apply([timeout, &p, &m, reply_to, shard, src_addr = std::move(src_addr), schema_version, + futurize_invoke([timeout, &p, &m, reply_to, shard, src_addr = std::move(src_addr), schema_version, apply_fn = std::move(apply_fn), trace_state_ptr] () mutable { // FIXME: get_schema_for_write() doesn't timeout return get_schema_for_write(schema_version, netw::messaging_service::msg_addr{reply_to, shard}) diff --git a/test/boost/commitlog_test.cc b/test/boost/commitlog_test.cc index 25b9c63d06..696a02b27a 100644 --- a/test/boost/commitlog_test.cc +++ b/test/boost/commitlog_test.cc @@ -51,7 +51,7 @@ static future<> cl_test(commitlog::config cfg, noncopyable_function (co cfg.commit_log_location = tmp.path().string(); return commitlog::create_commitlog(cfg).then([f = std::move(f)](commitlog log) mutable { return do_with(std::move(log), [f = std::move(f)](commitlog& log) { - return futurize_apply(f, log).finally([&log] { + return futurize_invoke(f, log).finally([&log] { return log.shutdown().then([&log] { return log.clear(); }); diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index a4f7e9163b..08087e024a 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -1320,7 +1320,7 @@ SEASTAR_TEST_CASE(test_writetime_and_ttl) { .with_rows({{ {long_type->decompose(int64_t(ts1))}, }}); - auto msg2f = futurize_apply([&] { return e.execute_cql("SELECT writetime(c) FROM cf"); }); + auto msg2f = futurize_invoke([&] { return e.execute_cql("SELECT writetime(c) FROM cf"); }); msg2f.wait(); assert_that_failed(msg2f); }); @@ -1442,7 +1442,7 @@ auto validate_request_failure( const sstring& request, const sstring& expected_message, const source_location& loc = source_location::current()) { - return futurize_apply([&] { return env.execute_cql(request); }) + return futurize_invoke([&] { return env.execute_cql(request); }) .then_wrapped([expected_message, loc] (future> f) { BOOST_REQUIRE_EXCEPTION(f.get(), exceptions::invalid_request_exception, diff --git a/test/boost/secondary_index_test.cc b/test/boost/secondary_index_test.cc index 03c20f3910..0e480cb4c9 100644 --- a/test/boost/secondary_index_test.cc +++ b/test/boost/secondary_index_test.cc @@ -194,7 +194,7 @@ SEASTAR_TEST_CASE(test_secondary_index_if_exists) { e.execute_cql("drop index if exists cf_a_idx").get(); // Expect exceptions::invalid_request_exception: Index 'cf_a_idx' // could not be found in any of the tables of keyspace 'ks' - assert_that_failed(seastar::futurize_apply([&e] { return e.execute_cql("drop index cf_a_idx"); })); + assert_that_failed(seastar::futurize_invoke([&e] { return e.execute_cql("drop index cf_a_idx"); })); }); } diff --git a/test/manual/hint_test.cc b/test/manual/hint_test.cc index 876200984e..59b86f8275 100644 --- a/test/manual/hint_test.cc +++ b/test/manual/hint_test.cc @@ -52,7 +52,7 @@ static future<> cl_test(commitlog::config cfg, noncopyable_function (co cfg.commit_log_location = tmp.path().string(); return commitlog::create_commitlog(cfg).then([f = std::move(f)](commitlog log) mutable { return do_with(std::move(log), [f = std::move(f)](commitlog& log) { - return futurize_apply(f, log).finally([&log] { + return futurize_invoke(f, log).finally([&log] { return log.shutdown().then([&log] { return log.clear(); }); diff --git a/thrift/handler.cc b/thrift/handler.cc index 9182122e33..1c86dce08c 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -129,7 +129,7 @@ with_cob(thrift_fn::function&& cob, thrift_fn::function&& exn_cob, Func&& func) { // then_wrapped() terminates the fiber by calling one of the cob objects - (void)futurize>::apply([func = std::forward(func)] { + (void)futurize>::invoke([func = std::forward(func)] { return noexcept_movable::wrap(func()); }).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (auto&& f) { try { @@ -147,7 +147,7 @@ with_cob(thrift_fn::function&& cob, thrift_fn::function&& exn_cob, Func&& func) { // then_wrapped() terminates the fiber by calling one of the cob objects - (void)futurize_apply(func).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> f) { + (void)futurize_invoke(func).then_wrapped([cob = std::move(cob), exn_cob = std::move(exn_cob)] (future<> f) { try { f.get(); cob(); @@ -162,7 +162,7 @@ template void with_exn_cob(thrift_fn::function&& exn_cob, Func&& func) { // then_wrapped() terminates the fiber by calling one of the cob objects - (void)futurize_apply(func).then_wrapped([exn_cob = std::move(exn_cob)] (future<> f) { + (void)futurize_invoke(func).then_wrapped([exn_cob = std::move(exn_cob)] (future<> f) { try { f.get(); } catch (...) { diff --git a/transport/server.cc b/transport/server.cc index 1a5d163791..391b1a02ed 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -245,7 +245,7 @@ cql_server::do_accepts(int which, bool keepalive, socket_address server_addr) { ++_connects; ++_connections; // Move the processing into the background. - (void)futurize_apply([this, conn] { + (void)futurize_invoke([this, conn] { return advertise_new_connection(conn); // Notify any listeners about new connection. }).then_wrapped([this, conn] (future<> f) { try { @@ -394,7 +394,7 @@ future>> auto linearization_buffer = std::make_unique(); auto linearization_buffer_ptr = linearization_buffer.get(); - return futurize_apply([this, cqlop, stream, &fbuf, &client_state, linearization_buffer_ptr, permit = std::move(permit), trace_state] () mutable { + return futurize_invoke([this, cqlop, stream, &fbuf, &client_state, linearization_buffer_ptr, permit = std::move(permit), trace_state] () mutable { // When using authentication, we need to ensure we are doing proper state transitions, // i.e. we cannot simply accept any query/exec ops unless auth is complete switch (client_state.get_auth_state()) { diff --git a/utils/flush_queue.hh b/utils/flush_queue.hh index 4c484cbf98..9e538fb6d1 100644 --- a/utils/flush_queue.hh +++ b/utils/flush_queue.hh @@ -113,7 +113,7 @@ public: using futurator = futurize>; - return futurator::apply(std::forward(func)).then_wrapped([this, rp, post = std::forward(post)](typename futurator::type f) mutable { + return futurator::invoke(std::forward(func)).then_wrapped([this, rp, post = std::forward(post)](typename futurator::type f) mutable { auto i = _map.find(rp); assert(i != _map.end()); diff --git a/utils/joinpoint.hh b/utils/joinpoint.hh index 1cd39ff988..296722d6c2 100644 --- a/utils/joinpoint.hh +++ b/utils/joinpoint.hh @@ -95,7 +95,7 @@ private: template> joinpoint make_joinpoint(Func && f) { return joinpoint([f = std::forward(f)] { - return futurize::apply(f); + return futurize::invoke(f); }); } diff --git a/utils/loading_shared_values.hh b/utils/loading_shared_values.hh index 8b97192e61..2f71f15ce0 100644 --- a/utils/loading_shared_values.hh +++ b/utils/loading_shared_values.hh @@ -235,7 +235,7 @@ public: // get_shared_future() may throw, so make sure to call it before invoking the loader(key) f = e->loaded().get_shared_future(); // Future indirectly forwarded to `e`. - (void)futurize_apply([&] { return loader(key); }).then_wrapped([e](future&& val_fut) mutable { + (void)futurize_invoke([&] { return loader(key); }).then_wrapped([e](future&& val_fut) mutable { if (val_fut.failed()) { e->loaded().set_exception(val_fut.get_exception()); } else { diff --git a/utils/logalloc.hh b/utils/logalloc.hh index 390a495348..18ac457844 100644 --- a/utils/logalloc.hh +++ b/utils/logalloc.hh @@ -197,7 +197,7 @@ class region_group { Func func; public: void allocate() override { - futurator::apply(func).forward_to(std::move(pr)); + futurator::invoke(func).forward_to(std::move(pr)); } void fail(std::exception_ptr e) override { pr.set_exception(e); @@ -346,7 +346,7 @@ public: }); if (!blocked_at) { - return futurator::apply(func); + return futurator::invoke(func); } auto fn = std::make_unique>(std::forward(func)); diff --git a/utils/serialized_action.hh b/utils/serialized_action.hh index 99f18d6cab..d6d4dc9474 100644 --- a/utils/serialized_action.hh +++ b/utils/serialized_action.hh @@ -39,7 +39,7 @@ private: private: future<> do_trigger(seastar::shared_promise<> pr) { _pending = {}; - return futurize_apply(_func).then_wrapped([pr = std::move(pr)] (auto&& f) mutable { + return futurize_invoke(_func).then_wrapped([pr = std::move(pr)] (auto&& f) mutable { if (f.failed()) { pr.set_exception(f.get_exception()); } else {