forward_service: rename to mapreduce_service

forward_service is nondescriptive and misnamed, as it does more than
forward requests. It's a classic map/reduce algorithm (and in fact one
of its parameters is "reducer"), so name it accordingly.

The name "forward" leaked into the wire protocol for the messaging
service RPC isolation cookie, so it's kept there. It's also maintained
in the name of the logger (for "nodetool setlogginglevel") for
compatibility with tests.

Closes scylladb/scylladb#19444
This commit is contained in:
Avi Kivity
2024-06-23 17:51:20 +03:00
committed by Nadav Har'El
parent f798217293
commit 3fc4e23a36
19 changed files with 176 additions and 176 deletions

View File

@@ -971,7 +971,7 @@ scylla_core = (['message/messaging_service.cc',
'service/tablet_allocator.cc',
'service/storage_proxy.cc',
'query_ranges_to_vnodes.cc',
'service/forward_service.cc',
'service/mapreduce_service.cc',
'service/paxos/proposal.cc',
'service/paxos/prepare_response.cc',
'service/paxos/paxos_state.cc',
@@ -1294,7 +1294,7 @@ idls = ['idl/gossip_digest.idl.hh',
'idl/hinted_handoff.idl.hh',
'idl/storage_proxy.idl.hh',
'idl/group0_state_machine.idl.hh',
'idl/forward_request.idl.hh',
'idl/mapreduce_request.idl.hh',
'idl/replica_exception.idl.hh',
'idl/per_partition_rate_limit_info.idl.hh',
'idl/position_in_partition.idl.hh',

View File

@@ -481,7 +481,7 @@ functions::find(const function_name& name, const std::vector<data_type>& arg_typ
return {};
}
// This function is created only for forward_service use, thus it only checks for
// This function is created only for mapreduce_service use, thus it only checks for
// aggregate functions if no declared function was found.
//
// The reason for this function is, there is no serialization of `cql3::selection::selection`,

View File

@@ -17,7 +17,7 @@
#include "service/storage_proxy.hh"
#include "service/topology_mutation.hh"
#include "service/migration_manager.hh"
#include "service/forward_service.hh"
#include "service/mapreduce_service.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/storage_service.hh"
#include "cql3/CqlParser.hpp"
@@ -45,12 +45,12 @@ const sstring query_processor::CQL_VERSION = "3.3.1";
const std::chrono::minutes prepared_statements_cache::entry_expiry = std::chrono::minutes(60);
struct query_processor::remote {
remote(service::migration_manager& mm, service::forward_service& fwd,
remote(service::migration_manager& mm, service::mapreduce_service& fwd,
service::storage_service& ss, service::raft_group0_client& group0_client)
: mm(mm), forwarder(fwd), ss(ss), group0_client(group0_client) {}
: mm(mm), mapreducer(fwd), ss(ss), group0_client(group0_client) {}
service::migration_manager& mm;
service::forward_service& forwarder;
service::mapreduce_service& mapreducer;
service::storage_service& ss;
service::raft_group0_client& group0_client;
@@ -506,9 +506,9 @@ query_processor::~query_processor() {
}
}
void query_processor::start_remote(service::migration_manager& mm, service::forward_service& forwarder,
void query_processor::start_remote(service::migration_manager& mm, service::mapreduce_service& mapreducer,
service::storage_service& ss, service::raft_group0_client& group0_client) {
_remote = std::make_unique<struct remote>(mm, forwarder, ss, group0_client);
_remote = std::make_unique<struct remote>(mm, mapreducer, ss, group0_client);
}
future<> query_processor::stop_remote() {
@@ -988,10 +988,10 @@ query_processor::execute_broadcast_table_query(const service::broadcast_tables::
co_return co_await service::broadcast_tables::execute(remote_.get().group0_client, query);
}
future<query::forward_result>
query_processor::forward(query::forward_request req, tracing::trace_state_ptr tr_state) {
future<query::mapreduce_result>
query_processor::mapreduce(query::mapreduce_request req, tracing::trace_state_ptr tr_state) {
auto [remote_, holder] = remote();
co_return co_await remote_.get().forwarder.dispatch(std::move(req), std::move(tr_state));
co_return co_await remote_.get().mapreducer.dispatch(std::move(req), std::move(tr_state));
}
future<::shared_ptr<messages::result_message>>

View File

@@ -36,7 +36,7 @@ namespace lang { class manager; }
namespace service {
class migration_manager;
class query_state;
class forward_service;
class mapreduce_service;
class raft_group0_client;
namespace broadcast_tables {
@@ -146,7 +146,7 @@ public:
~query_processor();
void start_remote(service::migration_manager&, service::forward_service&,
void start_remote(service::migration_manager&, service::mapreduce_service&,
service::storage_service& ss, service::raft_group0_client&);
future<> stop_remote();
@@ -431,9 +431,9 @@ public:
future<service::broadcast_tables::query_result>
execute_broadcast_table_query(const service::broadcast_tables::query&);
// Splits given `forward_request` and distributes execution of resulting subrequests across a cluster.
future<query::forward_result>
forward(query::forward_request, tracing::trace_state_ptr);
// Splits given `mapreduce_request` and distributes execution of resulting subrequests across a cluster.
future<query::mapreduce_result>
mapreduce(query::mapreduce_request, tracing::trace_state_ptr);
struct retry_statement_execution_error : public std::exception {};

View File

@@ -278,9 +278,9 @@ public:
);
}
virtual query::forward_request::reductions_info get_reductions() const override {
std::vector<query::forward_request::reduction_type> types;
std::vector<query::forward_request::aggregation_info> infos;
virtual query::mapreduce_request::reductions_info get_reductions() const override {
std::vector<query::mapreduce_request::reduction_type> types;
std::vector<query::mapreduce_request::aggregation_info> infos;
auto bad = [] {
throw std::runtime_error("Selection doesn't have a reduction");
};
@@ -295,7 +295,7 @@ public:
}
auto agg_func = dynamic_pointer_cast<functions::aggregate_function>(std::move(func));
auto type = (agg_func->name().name == "countRows") ? query::forward_request::reduction_type::count : query::forward_request::reduction_type::aggregate;
auto type = (agg_func->name().name == "countRows") ? query::mapreduce_request::reduction_type::count : query::mapreduce_request::reduction_type::aggregate;
std::vector<sstring> column_names;
for (auto& arg : fc->args) {
@@ -306,7 +306,7 @@ public:
column_names.push_back(col->col->name_as_text());
}
auto info = query::forward_request::aggregation_info {
auto info = query::mapreduce_request::aggregation_info {
.name = agg_func->name(),
.column_names = std::move(column_names),
};

View File

@@ -154,7 +154,7 @@ public:
virtual bool is_reducible() const {return false;}
virtual query::forward_request::reductions_info get_reductions() const {return {{}, {}};}
virtual query::mapreduce_request::reductions_info get_reductions() const {return {{}, {}};}
/**
* Returns true if the selection is trivial, i.e. there are no function

View File

@@ -1626,7 +1626,7 @@ parallelized_select_statement::do_execute(
auto timeout = lowres_system_clock::now() + timeout_duration;
auto reductions = _selection->get_reductions();
query::forward_request req = {
query::mapreduce_request req = {
.reduction_types = reductions.types,
.cmd = *command,
.pr = std::move(key_ranges),
@@ -1636,7 +1636,7 @@ parallelized_select_statement::do_execute(
};
// dispatch execution of this statement to other nodes
return qp.forward(req, state.get_trace_state()).then([this] (query::forward_result res) {
return qp.mapreduce(req, state.get_trace_state()).then([this] (query::mapreduce_result res) {
auto meta = _selection->get_result_metadata();
auto rs = std::make_unique<result_set>(std::move(meta));
rs->add_row(res.query_results);
@@ -2021,11 +2021,11 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
};
// Used to determine if an execution of this statement can be parallelized
// using `forward_service`.
auto can_be_forwarded = [&] {
// using `mapreduce_service`.
auto can_be_mapreduced = [&] {
return all_aggregates(prepared_selectors) // Note: before we levellized aggregation depth
&& ( // SUPPORTED PARALLELIZATION
// All potential intermediate coordinators must support forwarding
// All potential intermediate coordinators must support mapreduceing
(db.features().parallelized_aggregation && selection->is_count())
|| (db.features().uda_native_parallelized_aggregation && selection->is_reducible())
)
@@ -2082,7 +2082,7 @@ std::unique_ptr<prepared_statement> select_statement::prepare(data_dictionary::d
prepare_limit(db, ctx, _per_partition_limit),
stats,
std::move(prepared_attrs));
} else if (can_be_forwarded()) {
} else if (can_be_mapreduced()) {
stmt = parallelized_select_statement::prepare(
schema,
ctx.bound_variables_size(),

View File

@@ -15,7 +15,7 @@ class cql
class cdc
class view
class alternator
class forward_service
class mapreduce_service
class storage_service
class gossiper
class db_config
@@ -29,9 +29,9 @@ storage_proxy ..> view : update
cql ..> cdc : configure
cql ..> view : configure
alternator ..> cdc : configure
cql ..> forward_service : data path for autopar aggregations
forward_service ..> storage_proxy : read
forward_service ..> messaging_service : rpc
cql ..> mapreduce_service : data path for autopar aggregations
mapreduce_service ..> storage_proxy : read
mapreduce_service ..> messaging_service : rpc
cql ..> storage_proxy : data path
alternator ..> storage_proxy : data path
database --* db_commitlog : commit

View File

@@ -54,7 +54,7 @@ set(idl_headers
storage_proxy.idl.hh
storage_service.idl.hh
group0_state_machine.idl.hh
forward_request.idl.hh
mapreduce_request.idl.hh
replica_exception.idl.hh
per_partition_rate_limit_info.idl.hh
position_in_partition.idl.hh

View File

@@ -20,7 +20,7 @@ class function_name {
}
}
namespace query {
struct forward_request {
struct mapreduce_request {
struct aggregation_info {
db::functions::function_name name;
std::vector<sstring> column_names;
@@ -30,7 +30,7 @@ struct forward_request {
aggregate
};
std::vector<query::forward_request::reduction_type> reduction_types;
std::vector<query::mapreduce_request::reduction_type> reduction_types;
query::read_command cmd;
dht::partition_range_vector pr;
@@ -38,12 +38,12 @@ struct forward_request {
db::consistency_level cl;
lowres_system_clock::time_point timeout;
std::optional<std::vector<query::forward_request::aggregation_info>> aggregation_infos [[version 5.1]];
std::optional<std::vector<query::mapreduce_request::aggregation_info>> aggregation_infos [[version 5.1]];
};
struct forward_result {
struct mapreduce_result {
std::vector<bytes_opt> query_results;
};
verb forward_request(query::forward_request req [[ref]], std::optional<tracing::trace_info> trace_info [[ref]]) -> query::forward_result;
verb mapreduce_request(query::mapreduce_request req [[ref]], std::optional<tracing::trace_info> trace_info [[ref]]) -> query::mapreduce_result;
}

16
main.cc
View File

@@ -96,7 +96,7 @@
#include "db/paxos_grace_seconds_extension.hh"
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
#include "service/storage_proxy.hh"
#include "service/forward_service.hh"
#include "service/mapreduce_service.hh"
#include "alternator/controller.hh"
#include "alternator/ttl.hh"
#include "tools/entry_point.hh"
@@ -692,7 +692,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
sharded<repair_service> repair;
sharded<sstables_loader> sst_loader;
sharded<streaming::stream_manager> stream_manager;
sharded<service::forward_service> forward_service;
sharded<service::mapreduce_service> mapreduce_service;
sharded<gms::gossiper> gossiper;
sharded<locator::snitch_ptr> snitch;
@@ -721,7 +721,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
tcp_syncookies_sanity();
tcp_timestamps_sanity();
return seastar::async([&app, cfg, ext, &cm, &sstm, &db, &qp, &bm, &proxy, &forward_service, &mm, &mm_notifier, &ctx, &opts, &dirs,
return seastar::async([&app, cfg, ext, &cm, &sstm, &db, &qp, &bm, &proxy, &mapreduce_service, &mm, &mm_notifier, &ctx, &opts, &dirs,
&prometheus_server, &cf_cache_hitrate_calculator, &load_meter, &feature_service, &gossiper, &snitch,
&token_metadata, &erm_factory, &snapshot_ctl, &messaging, &sst_dir_semaphore, &raft_gr, &service_memory_limiter,
&repair, &sst_loader, &ss, &lifecycle_notifier, &stream_manager, &task_manager] {
@@ -1487,10 +1487,10 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
api::unset_server_token_metadata(ctx).get();
});
supervisor::notify("starting forward service");
forward_service.start(std::ref(messaging), std::ref(proxy), std::ref(db), std::ref(token_metadata), std::ref(stop_signal.as_sharded_abort_source())).get();
auto stop_forward_service_handlers = defer_verbose_shutdown("forward service", [&forward_service] {
forward_service.stop().get();
supervisor::notify("starting mapreduce service");
mapreduce_service.start(std::ref(messaging), std::ref(proxy), std::ref(db), std::ref(token_metadata), std::ref(stop_signal.as_sharded_abort_source())).get();
auto stop_mapreduce_service_handlers = defer_verbose_shutdown("mapreduce service", [&mapreduce_service] {
mapreduce_service.stop().get();
});
supervisor::notify("starting migration manager");
@@ -1529,7 +1529,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
supervisor::notify("initializing query processor remote part");
// TODO: do this together with proxy.start_remote(...)
qp.invoke_on_all(&cql3::query_processor::start_remote, std::ref(mm), std::ref(forward_service),
qp.invoke_on_all(&cql3::query_processor::start_remote, std::ref(mm), std::ref(mapreduce_service),
std::ref(ss), std::ref(group0_client)).get();
auto stop_qp_remote = defer_verbose_shutdown("query processor remote part", [&qp] {
qp.invoke_on_all(&cql3::query_processor::stop_remote).get();

View File

@@ -114,8 +114,8 @@
#include "streaming/stream_manager.hh"
#include "streaming/stream_mutation_fragments_cmd.hh"
#include "idl/partition_checksum.dist.impl.hh"
#include "idl/forward_request.dist.hh"
#include "idl/forward_request.dist.impl.hh"
#include "idl/mapreduce_request.dist.hh"
#include "idl/mapreduce_request.dist.impl.hh"
#include "idl/storage_service.dist.impl.hh"
#include "idl/join_node.dist.impl.hh"
@@ -648,7 +648,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::MUTATION_DONE:
case messaging_verb::MUTATION_FAILED:
return 3;
case messaging_verb::FORWARD_REQUEST:
case messaging_verb::MAPREDUCE_REQUEST:
return 4;
case messaging_verb::LAST:
return -1; // should never happen

View File

@@ -181,7 +181,7 @@ enum class messaging_verb : int32_t {
GROUP0_MODIFY_CONFIG = 58,
REPAIR_UPDATE_SYSTEM_TABLE = 59,
REPAIR_FLUSH_HINTS_BATCHLOG = 60,
FORWARD_REQUEST = 61,
MAPREDUCE_REQUEST = 61,
GET_GROUP0_UPGRADE_STATE = 62,
DIRECT_FD_PING = 63,
RAFT_TOPOLOGY_CMD = 64,
@@ -531,7 +531,7 @@ public:
scheduling_group scheduling_group_for_isolation_cookie(const sstring& isolation_cookie) const;
std::vector<messaging_service::scheduling_info_for_connection_index> initial_scheduling_info() const;
unsigned get_rpc_client_idx(messaging_verb verb) const;
static constexpr std::array<std::string_view, 3> _connection_types_prefix = {"statement:", "statement-ack:", "forward:"};
static constexpr std::array<std::string_view, 3> _connection_types_prefix = {"statement:", "statement-ack:", "forward:"}; // "forward" is the old name for "mapreduce"
};
} // namespace netw

View File

@@ -475,7 +475,7 @@ public:
friend std::ostream& operator<<(std::ostream& out, const read_command& r);
};
struct forward_request {
struct mapreduce_request {
enum class reduction_type {
count,
aggregate
@@ -500,28 +500,28 @@ struct forward_request {
std::optional<std::vector<aggregation_info>> aggregation_infos;
};
std::ostream& operator<<(std::ostream& out, const forward_request& r);
std::ostream& operator<<(std::ostream& out, const forward_request::reduction_type& r);
std::ostream& operator<<(std::ostream& out, const forward_request::aggregation_info& a);
std::ostream& operator<<(std::ostream& out, const mapreduce_request& r);
std::ostream& operator<<(std::ostream& out, const mapreduce_request::reduction_type& r);
std::ostream& operator<<(std::ostream& out, const mapreduce_request::aggregation_info& a);
struct forward_result {
struct mapreduce_result {
// vector storing query result for each selected column
std::vector<bytes_opt> query_results;
struct printer {
const std::vector<::shared_ptr<db::functions::aggregate_function>> functions;
const query::forward_result& res;
const query::mapreduce_result& res;
};
};
std::ostream& operator<<(std::ostream& out, const query::forward_result::printer&);
std::ostream& operator<<(std::ostream& out, const query::mapreduce_result::printer&);
}
template <> struct fmt::formatter<query::specific_ranges> : fmt::ostream_formatter {};
template <> struct fmt::formatter<query::partition_slice> : fmt::ostream_formatter {};
template <> struct fmt::formatter<query::read_command> : fmt::ostream_formatter {};
template <> struct fmt::formatter<query::forward_request> : fmt::ostream_formatter {};
template <> struct fmt::formatter<query::forward_request::reduction_type> : fmt::ostream_formatter {};
template <> struct fmt::formatter<query::forward_request::aggregation_info> : fmt::ostream_formatter {};
template <> struct fmt::formatter<query::forward_result::printer> : fmt::ostream_formatter {};
template <> struct fmt::formatter<query::mapreduce_request> : fmt::ostream_formatter {};
template <> struct fmt::formatter<query::mapreduce_request::reduction_type> : fmt::ostream_formatter {};
template <> struct fmt::formatter<query::mapreduce_request::aggregation_info> : fmt::ostream_formatter {};
template <> struct fmt::formatter<query::mapreduce_result::printer> : fmt::ostream_formatter {};

View File

@@ -60,28 +60,28 @@ std::ostream& operator<<(std::ostream& out, const read_command& r) {
return out;
}
std::ostream& operator<<(std::ostream& out, const forward_request::reduction_type& r) {
std::ostream& operator<<(std::ostream& out, const mapreduce_request::reduction_type& r) {
out << "reduction_type{";
switch (r) {
case forward_request::reduction_type::count:
case mapreduce_request::reduction_type::count:
out << "count";
break;
case forward_request::reduction_type::aggregate:
case mapreduce_request::reduction_type::aggregate:
out << "aggregate";
break;
}
return out << "}";
}
std::ostream& operator<<(std::ostream& out, const forward_request::aggregation_info& a) {
std::ostream& operator<<(std::ostream& out, const mapreduce_request::aggregation_info& a) {
fmt::print(out, "aggregation_info{{, name={}, column_names=[{}]}}",
a.name, fmt::join(a.column_names, ","));;
return out;
}
std::ostream& operator<<(std::ostream& out, const forward_request& r) {
std::ostream& operator<<(std::ostream& out, const mapreduce_request& r) {
auto ms = std::chrono::time_point_cast<std::chrono::milliseconds>(r.timeout).time_since_epoch().count();
fmt::print(out, "forward_request{{reductions=[{}]",
fmt::print(out, "mapreduce_request{{reductions=[{}]",
fmt::join(r.reduction_types, ","));
if (r.aggregation_infos) {
fmt::print(out, ", aggregation_infos=[{}]",
@@ -405,9 +405,9 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
return make_foreign(make_lw_shared<query::result>(std::move(w), is_short_read, row_count, partition_count, std::move(last_position)));
}
std::ostream& operator<<(std::ostream& out, const query::forward_result::printer& p) {
std::ostream& operator<<(std::ostream& out, const query::mapreduce_result::printer& p) {
if (p.functions.size() != p.res.query_results.size()) {
return out << "[malformed forward_result (" << p.res.query_results.size()
return out << "[malformed mapreduce_result (" << p.res.query_results.size()
<< " results, " << p.functions.size() << " aggregates)]";
}

View File

@@ -3,7 +3,7 @@ target_sources(service
PRIVATE
broadcast_tables/experimental/lang.cc
client_state.cc
forward_service.cc
mapreduce_service.cc
migration_manager.cc
misc_services.cc
pager/paging_state.cc

View File

@@ -6,7 +6,7 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "service/forward_service.hh"
#include "service/mapreduce_service.hh"
#include <boost/range/algorithm/remove_if.hpp>
#include <seastar/core/coroutine.hh>
@@ -18,7 +18,7 @@
#include "db/consistency_level.hh"
#include "dht/sharder.hh"
#include "gms/gossiper.hh"
#include "idl/forward_request.dist.hh"
#include "idl/mapreduce_request.dist.hh"
#include "locator/abstract_replication_strategy.hh"
#include "log.hh"
#include "message/messaging_service.hh"
@@ -50,18 +50,18 @@
namespace service {
static constexpr int DEFAULT_INTERNAL_PAGING_SIZE = 10000;
static logging::logger flogger("forward_service");
static logging::logger flogger("forward_service"); // not "mapreduce", for compatibility with dtest
static std::vector<::shared_ptr<db::functions::aggregate_function>> get_functions(const query::forward_request& request);
static std::vector<::shared_ptr<db::functions::aggregate_function>> get_functions(const query::mapreduce_request& request);
class forward_aggregates {
class mapreduce_aggregates {
private:
std::vector<::shared_ptr<db::functions::aggregate_function>> _funcs;
std::vector<db::functions::stateless_aggregate_function> _aggrs;
public:
forward_aggregates(const query::forward_request& request);
void merge(query::forward_result& result, query::forward_result&& other);
void finalize(query::forward_result& result);
mapreduce_aggregates(const query::mapreduce_request& request);
void merge(query::mapreduce_result& result, query::mapreduce_result&& other);
void finalize(query::mapreduce_result& result);
template<typename Func>
auto with_thread_if_needed(Func&& func) const {
@@ -79,7 +79,7 @@ public:
}
};
forward_aggregates::forward_aggregates(const query::forward_request& request) {
mapreduce_aggregates::mapreduce_aggregates(const query::mapreduce_request& request) {
_funcs = get_functions(request);
std::vector<db::functions::stateless_aggregate_function> aggrs;
@@ -89,7 +89,7 @@ forward_aggregates::forward_aggregates(const query::forward_request& request) {
_aggrs = std::move(aggrs);
}
void forward_aggregates::merge(query::forward_result &result, query::forward_result&& other) {
void mapreduce_aggregates::merge(query::mapreduce_result &result, query::mapreduce_result&& other) {
if (result.query_results.empty()) {
result.query_results = std::move(other.query_results);
return;
@@ -100,7 +100,7 @@ void forward_aggregates::merge(query::forward_result &result, query::forward_res
if (result.query_results.size() != other.query_results.size() || result.query_results.size() != _aggrs.size()) {
on_internal_error(
flogger,
format("forward_aggregates::merge(): operation cannot be completed due to invalid argument sizes. "
format("mapreduce_aggregates::merge(): operation cannot be completed due to invalid argument sizes. "
"this.aggrs.size(): {} "
"result.query_result.size(): {} "
"other.query_results.size(): {} ",
@@ -113,7 +113,7 @@ void forward_aggregates::merge(query::forward_result &result, query::forward_res
}
}
void forward_aggregates::finalize(query::forward_result &result) {
void mapreduce_aggregates::finalize(query::mapreduce_result &result) {
if (result.query_results.empty()) {
// An empty result means that we didn't send the aggregation request
// to any node. I.e., it was a query that matched no partition, such
@@ -129,7 +129,7 @@ void forward_aggregates::finalize(query::forward_result &result) {
if (result.query_results.size() != _aggrs.size()) {
on_internal_error(
flogger,
format("forward_aggregates::finalize(): operation cannot be completed due to invalid argument sizes. "
format("mapreduce_aggregates::finalize(): operation cannot be completed due to invalid argument sizes. "
"this.aggrs.size(): {} "
"result.query_result.size(): {} ",
_aggrs.size(), result.query_results.size())
@@ -143,7 +143,7 @@ void forward_aggregates::finalize(query::forward_result &result) {
}
}
static std::vector<::shared_ptr<db::functions::aggregate_function>> get_functions(const query::forward_request& request) {
static std::vector<::shared_ptr<db::functions::aggregate_function>> get_functions(const query::mapreduce_request& request) {
schema_ptr schema = local_schema_registry().get(request.cmd.schema_version);
std::vector<::shared_ptr<db::functions::aggregate_function>> aggrs;
@@ -161,7 +161,7 @@ static std::vector<::shared_ptr<db::functions::aggregate_function>> get_function
::shared_ptr<db::functions::aggregate_function> aggr;
if (!request.aggregation_infos) {
if (request.reduction_types[i] == query::forward_request::reduction_type::aggregate) {
if (request.reduction_types[i] == query::mapreduce_request::reduction_type::aggregate) {
throw std::runtime_error("No aggregation info for reduction type aggregation.");
}
@@ -250,70 +250,70 @@ public:
}
};
// `retrying_dispatcher` is a class that dispatches forward_requests to other
// `retrying_dispatcher` is a class that dispatches mapreduce_requests to other
// nodes. In case of a failure, local retries are available - request being
// retried is executed on the super-coordinator.
class retrying_dispatcher {
forward_service& _forwarder;
mapreduce_service& _mapreducer;
tracing::trace_state_ptr _tr_state;
std::optional<tracing::trace_info> _tr_info;
public:
retrying_dispatcher(forward_service& forwarder, tracing::trace_state_ptr tr_state)
: _forwarder(forwarder),
retrying_dispatcher(mapreduce_service& mapreducer, tracing::trace_state_ptr tr_state)
: _mapreducer(mapreducer),
_tr_state(tr_state),
_tr_info(tracing::make_trace_info(tr_state))
{}
future<query::forward_result> dispatch_to_node(netw::msg_addr id, query::forward_request req) {
auto my_address = _forwarder._messaging.broadcast_address();
future<query::mapreduce_result> dispatch_to_node(netw::msg_addr id, query::mapreduce_request req) {
auto my_address = _mapreducer._messaging.broadcast_address();
if (id.addr == my_address) {
return _forwarder.dispatch_to_shards(req, _tr_info);
return _mapreducer.dispatch_to_shards(req, _tr_info);
}
_forwarder._stats.requests_dispatched_to_other_nodes += 1;
_mapreducer._stats.requests_dispatched_to_other_nodes += 1;
// Check for a shutdown request before sending a forward_request to
// Check for a shutdown request before sending a mapreduce_request to
// another node. During the drain process, the messaging service is shut
// down early (but not earlier than the forward_service::shutdown
// down early (but not earlier than the mapreduce_service::shutdown
// invocation), so by performing this check, we can prevent hanging on
// the RPC call.
if (_forwarder._shutdown) {
return make_exception_future<query::forward_result>(std::runtime_error("forward_service is shutting down"));
if (_mapreducer._shutdown) {
return make_exception_future<query::mapreduce_result>(std::runtime_error("mapreduce_service is shutting down"));
}
// Try to send this forward_request to another node.
return do_with(id, req, [this] (netw::msg_addr& id, query::forward_request& req) -> future<query::forward_result> {
return ser::forward_request_rpc_verbs::send_forward_request(
&_forwarder._messaging, id, req, _tr_info
).handle_exception_type([this, &req, &id] (rpc::closed_error& e) -> future<query::forward_result> {
if (_forwarder._shutdown) {
// Try to send this mapreduce_request to another node.
return do_with(id, req, [this] (netw::msg_addr& id, query::mapreduce_request& req) -> future<query::mapreduce_result> {
return ser::mapreduce_request_rpc_verbs::send_mapreduce_request(
&_mapreducer._messaging, id, req, _tr_info
).handle_exception_type([this, &req, &id] (rpc::closed_error& e) -> future<query::mapreduce_result> {
if (_mapreducer._shutdown) {
// Do not retry if shutting down.
return make_exception_future<query::forward_result>(e);
return make_exception_future<query::mapreduce_result>(e);
}
// In case of forwarding failure, retry using super-coordinator as a coordinator
flogger.warn("retrying forward_request={} on a super-coordinator after failing to send it to {} ({})", req, id, e.what());
tracing::trace(_tr_state, "retrying forward_request={} on a super-coordinator after failing to send it to {} ({})", req, id, e.what());
// In case of mapreduce failure, retry using super-coordinator as a coordinator
flogger.warn("retrying mapreduce_request={} on a super-coordinator after failing to send it to {} ({})", req, id, e.what());
tracing::trace(_tr_state, "retrying mapreduce_request={} on a super-coordinator after failing to send it to {} ({})", req, id, e.what());
return _forwarder.dispatch_to_shards(req, _tr_info);
return _mapreducer.dispatch_to_shards(req, _tr_info);
});
});
}
};
locator::token_metadata_ptr forward_service::get_token_metadata_ptr() const noexcept {
locator::token_metadata_ptr mapreduce_service::get_token_metadata_ptr() const noexcept {
return _shared_token_metadata.get();
}
future<> forward_service::stop() {
future<> mapreduce_service::stop() {
return uninit_messaging_service();
}
// Due to `cql3::selection::selection` not being serializable, it cannot be
// stored in `forward_request`. It has to mocked on the receiving node,
// stored in `mapreduce_request`. It has to mocked on the receiving node,
// based on requested reduction types.
static shared_ptr<cql3::selection::selection> mock_selection(
query::forward_request& request,
query::mapreduce_request& request,
schema_ptr schema,
replica::database& db
) {
@@ -323,8 +323,8 @@ static shared_ptr<cql3::selection::selection> mock_selection(
auto mock_singular_selection = [&] (
const ::shared_ptr<db::functions::aggregate_function>& aggr_function,
const query::forward_request::reduction_type& reduction,
const std::optional<query::forward_request::aggregation_info>& info
const query::mapreduce_request::reduction_type& reduction,
const std::optional<query::mapreduce_request::aggregation_info>& info
) {
auto name_as_expression = [] (const sstring& name) -> cql3::expr::expression {
constexpr bool keep_case = true;
@@ -333,7 +333,7 @@ static shared_ptr<cql3::selection::selection> mock_selection(
};
};
if (reduction == query::forward_request::reduction_type::count) {
if (reduction == query::mapreduce_request::reduction_type::count) {
auto count_expr = cql3::expr::function_call{
.func = cql3::functions::aggregate_fcts::make_count_rows_function(),
.args = {},
@@ -362,13 +362,13 @@ static shared_ptr<cql3::selection::selection> mock_selection(
return cql3::selection::selection::from_selectors(db.as_data_dictionary(), schema, schema->ks_name(), std::move(prepared_selectors));
}
future<query::forward_result> forward_service::dispatch_to_shards(
query::forward_request req,
future<query::mapreduce_result> mapreduce_service::dispatch_to_shards(
query::mapreduce_request req,
std::optional<tracing::trace_info> tr_info
) {
_stats.requests_dispatched_to_own_shards += 1;
std::optional<query::forward_result> result;
std::vector<future<query::forward_result>> futures;
std::optional<query::mapreduce_result> result;
std::vector<future<query::mapreduce_result>> futures;
for (const auto& s : smp::all_cpus()) {
futures.push_back(container().invoke_on(s, [req, tr_info] (auto& fs) {
@@ -377,7 +377,7 @@ future<query::forward_result> forward_service::dispatch_to_shards(
}
auto results = co_await when_all_succeed(futures.begin(), futures.end());
forward_aggregates aggrs(req);
mapreduce_aggregates aggrs(req);
co_return co_await aggrs.with_thread_if_needed([&aggrs, req, results = std::move(results), result = std::move(result)] () mutable {
for (auto&& r : results) {
if (result) {
@@ -389,7 +389,7 @@ future<query::forward_result> forward_service::dispatch_to_shards(
}
flogger.debug("on node execution result is {}", seastar::value_of([&req, &result] {
return query::forward_result::printer {
return query::mapreduce_result::printer {
.functions = get_functions(req),
.res = *result
};})
@@ -399,18 +399,18 @@ future<query::forward_result> forward_service::dispatch_to_shards(
});
}
static lowres_clock::time_point compute_timeout(const query::forward_request& req) {
static lowres_clock::time_point compute_timeout(const query::mapreduce_request& req) {
lowres_system_clock::duration time_left = req.timeout - lowres_system_clock::now();
lowres_clock::time_point timeout_point = lowres_clock::now() + time_left;
return timeout_point;
}
// This function executes forward_request on a shard.
// This function executes mapreduce_request on a shard.
// It retains partition ranges owned by this shard from requested partition
// ranges vector, so that only owned ones are queried.
future<query::forward_result> forward_service::execute_on_this_shard(
query::forward_request req,
future<query::mapreduce_result> mapreduce_service::execute_on_this_shard(
query::mapreduce_request req,
std::optional<tracing::trace_info> tr_info
) {
tracing::trace_state_ptr tr_state;
@@ -419,7 +419,7 @@ future<query::forward_result> forward_service::execute_on_this_shard(
tracing::begin(tr_state);
}
tracing::trace(tr_state, "Executing forward_request");
tracing::trace(tr_state, "Executing mapreduce_request");
_stats.requests_executed += 1;
schema_ptr schema = local_schema_registry().get(req.cmd.schema_version);
@@ -483,11 +483,11 @@ future<query::forward_result> forward_service::execute_on_this_shard(
// It is necessary to check for a shutdown request before each
// fetch_page operation. During the drain process, the messaging
// service is shut down early (but not earlier than the
// forward_service::shutdown invocation), so by performing this
// mapreduce_service::shutdown invocation), so by performing this
// check, we can prevent hanging on the RPC call (which can be made
// during fetching a page).
if (_shutdown) {
throw std::runtime_error("forward_service is shutting down");
throw std::runtime_error("mapreduce_service is shutting down");
}
co_await pager->fetch_page(rs_builder, DEFAULT_INTERNAL_PAGING_SIZE, now, timeout);
@@ -507,10 +507,10 @@ future<query::forward_result> forward_service::execute_on_this_shard(
flogger.error("aggregation result column count does not match requested column count");
throw std::runtime_error("aggregation result column count does not match requested column count");
}
query::forward_result res = { .query_results = boost::copy_range<std::vector<bytes_opt>>(rows[0] | boost::adaptors::transformed([] (const managed_bytes_opt& x) { return to_bytes_opt(x); })) };
query::mapreduce_result res = { .query_results = boost::copy_range<std::vector<bytes_opt>>(rows[0] | boost::adaptors::transformed([] (const managed_bytes_opt& x) { return to_bytes_opt(x); })) };
auto printer = seastar::value_of([&req, &res] {
return query::forward_result::printer {
return query::mapreduce_result::printer {
.functions = get_functions(req),
.res = res
};
@@ -522,20 +522,20 @@ future<query::forward_result> forward_service::execute_on_this_shard(
});
}
void forward_service::init_messaging_service() {
ser::forward_request_rpc_verbs::register_forward_request(
void mapreduce_service::init_messaging_service() {
ser::mapreduce_request_rpc_verbs::register_mapreduce_request(
&_messaging,
[this](query::forward_request req, std::optional<tracing::trace_info> tr_info) -> future<query::forward_result> {
[this](query::mapreduce_request req, std::optional<tracing::trace_info> tr_info) -> future<query::mapreduce_result> {
return dispatch_to_shards(req, tr_info);
}
);
}
future<> forward_service::uninit_messaging_service() {
return ser::forward_request_rpc_verbs::unregister(&_messaging);
future<> mapreduce_service::uninit_messaging_service() {
return ser::mapreduce_request_rpc_verbs::unregister(&_messaging);
}
future<query::forward_result> forward_service::dispatch(query::forward_request req, tracing::trace_state_ptr tr_state) {
future<query::mapreduce_result> mapreduce_service::dispatch(query::mapreduce_request req, tracing::trace_state_ptr tr_state) {
schema_ptr schema = local_schema_registry().get(req.cmd.schema_version);
replica::table& cf = _db.local().find_column_family(schema);
auto erm = cf.get_effective_replication_map();
@@ -570,41 +570,41 @@ future<query::forward_result> forward_service::dispatch(query::forward_request r
co_await coroutine::maybe_yield();
}
tracing::trace(tr_state, "Dispatching forward_request to {} endpoints", vnodes_per_addr.size());
flogger.debug("dispatching forward_request to {} endpoints", vnodes_per_addr.size());
tracing::trace(tr_state, "Dispatching mapreduce_request to {} endpoints", vnodes_per_addr.size());
flogger.debug("dispatching mapreduce_request to {} endpoints", vnodes_per_addr.size());
retrying_dispatcher dispatcher(*this, tr_state);
query::forward_result result;
query::mapreduce_result result;
co_await coroutine::parallel_for_each(vnodes_per_addr.begin(), vnodes_per_addr.end(),
[&req, &result, &tr_state, &dispatcher] (
std::pair<const netw::messaging_service::msg_addr, dht::partition_range_vector>& vnodes_with_addr
) -> future<> {
netw::messaging_service::msg_addr addr = vnodes_with_addr.first;
query::forward_result& result_ = result;
query::mapreduce_result& result_ = result;
tracing::trace_state_ptr& tr_state_ = tr_state;
retrying_dispatcher& dispatcher_ = dispatcher;
query::forward_request req_with_modified_pr = req;
query::mapreduce_request req_with_modified_pr = req;
req_with_modified_pr.pr = std::move(vnodes_with_addr.second);
tracing::trace(tr_state_, "Sending forward_request to {}", addr);
flogger.debug("dispatching forward_request={} to address={}", req_with_modified_pr, addr);
tracing::trace(tr_state_, "Sending mapreduce_request to {}", addr);
flogger.debug("dispatching mapreduce_request={} to address={}", req_with_modified_pr, addr);
return dispatcher_.dispatch_to_node(addr, std::move(req_with_modified_pr)).then(
[&req, addr = std::move(addr), &result_, tr_state_ = std::move(tr_state_)] (
query::forward_result partial_result
query::mapreduce_result partial_result
) mutable {
auto partial_printer = seastar::value_of([&req, &partial_result] {
return query::forward_result::printer {
return query::mapreduce_result::printer {
.functions = get_functions(req),
.res = partial_result
};
});
tracing::trace(tr_state_, "Received forward_result={} from {}", partial_printer, addr);
flogger.debug("received forward_result={} from {}", partial_printer, addr);
tracing::trace(tr_state_, "Received mapreduce_result={} from {}", partial_printer, addr);
flogger.debug("received mapreduce_result={} from {}", partial_printer, addr);
return do_with(forward_aggregates(req), [&result_, partial_result = std::move(partial_result)] (forward_aggregates& aggrs) mutable {
return do_with(mapreduce_aggregates(req), [&result_, partial_result = std::move(partial_result)] (mapreduce_aggregates& aggrs) mutable {
return aggrs.with_thread_if_needed([&result_, &aggrs, partial_result = std::move(partial_result)] () mutable {
aggrs.merge(result_, std::move(partial_result));
});
@@ -612,12 +612,12 @@ future<query::forward_result> forward_service::dispatch(query::forward_request r
});
});
forward_aggregates aggrs(req);
mapreduce_aggregates aggrs(req);
const bool requires_thread = aggrs.requires_thread();
auto merge_result = [&result, &req, &tr_state, aggrs = std::move(aggrs)] () mutable {
auto printer = seastar::value_of([&req, &result] {
return query::forward_result::printer {
return query::mapreduce_result::printer {
.functions = get_functions(req),
.res = result
};
@@ -635,15 +635,15 @@ future<query::forward_result> forward_service::dispatch(query::forward_request r
}
}
void forward_service::register_metrics() {
void mapreduce_service::register_metrics() {
namespace sm = seastar::metrics;
_metrics.add_group("forward_service", {
_metrics.add_group("mapreduce_service", {
sm::make_total_operations("requests_dispatched_to_other_nodes", _stats.requests_dispatched_to_other_nodes,
sm::description("how many forward requests were dispatched to other nodes"), {}),
sm::description("how many mapreduce requests were dispatched to other nodes"), {}),
sm::make_total_operations("requests_dispatched_to_own_shards", _stats.requests_dispatched_to_own_shards,
sm::description("how many forward requests were dispatched to local shards"), {}),
sm::description("how many mapreduce requests were dispatched to local shards"), {}),
sm::make_total_operations("requests_executed", _stats.requests_executed,
sm::description("how many forward requests were executed"), {}),
sm::description("how many mapreduce requests were executed"), {}),
});
}

View File

@@ -26,11 +26,11 @@ namespace service {
class storage_proxy;
// `forward_service` is a sharded service responsible for distributing and
// `mapreduce_service` is a sharded service responsible for distributing and
// executing aggregation requests across a cluster.
//
// To use this service, one needs to express its aggregation query using
// `query::forward_request` struct, and call the `dispatch` method with the
// `query::mapreduce_request` struct, and call the `dispatch` method with the
// previously mentioned struct acting as an argument. What will happen after
// calling it, is as follows:
// 1. `dispatch` splits aggregation query into sub-queries. The caller of
@@ -57,7 +57,7 @@ class storage_proxy;
// recipient is the endpoint that holds all vnodes in the set.
//
// Query splitting example (3 node cluster with num_tokens set to 3):
// Original query: forward_request{
// Original query: mapreduce_request{
// reduction_types=[reduction_type{count}],
// cmd=read_command{contents omitted},
// pr={(-inf, +inf)},
@@ -81,7 +81,7 @@ class storage_proxy;
//
// Created sub-queries:
//
// forward_request{
// mapreduce_request{
// reduction_types=[reduction_type{count}],
// cmd=read_command{contents omitted},
// pr={
@@ -94,7 +94,7 @@ class storage_proxy;
// timeout(ms)=4865767688
// } for 127.0.0.1
//
// forward_request{
// mapreduce_request{
// reduction_types=[reduction_type{count}],
// cmd=read_command{contents omitted},
// pr={
@@ -106,7 +106,7 @@ class storage_proxy;
// timeout(ms)=4865767688
// } for 127.0.0.2
//
// forward_request{
// mapreduce_request{
// reduction_types=[reduction_type{count}],
// cmd=read_command{contents omitted},
// pr={
@@ -118,7 +118,7 @@ class storage_proxy;
// timeout(ms)=4865767688
// } for 127.0.0.3
//
class forward_service : public seastar::peering_sharded_service<forward_service> {
class mapreduce_service : public seastar::peering_sharded_service<mapreduce_service> {
netw::messaging_service& _messaging;
service::storage_proxy& _proxy;
distributed<replica::database>& _db;
@@ -135,7 +135,7 @@ class forward_service : public seastar::peering_sharded_service<forward_service>
bool _shutdown = false;
public:
forward_service(netw::messaging_service& ms, service::storage_proxy& p, distributed<replica::database> &db,
mapreduce_service(netw::messaging_service& ms, service::storage_proxy& p, distributed<replica::database> &db,
const locator::shared_token_metadata& stm, abort_source& as)
: _messaging(ms)
, _proxy(p)
@@ -149,15 +149,15 @@ public:
future<> stop();
// Splits given `forward_request` and distributes execution of resulting
// Splits given `mapreduce_request` and distributes execution of resulting
// subrequests across a cluster.
future<query::forward_result> dispatch(query::forward_request req, tracing::trace_state_ptr tr_state);
future<query::mapreduce_result> dispatch(query::mapreduce_request req, tracing::trace_state_ptr tr_state);
private:
// Used to distribute given `forward_request` across shards.
future<query::forward_result> dispatch_to_shards(query::forward_request req, std::optional<tracing::trace_info> tr_info);
// Used to execute a `forward_request` on a shard.
future<query::forward_result> execute_on_this_shard(query::forward_request req, std::optional<tracing::trace_info> tr_info);
// Used to distribute given `mapreduce_request` across shards.
future<query::mapreduce_result> dispatch_to_shards(query::mapreduce_request req, std::optional<tracing::trace_info> tr_info);
// Used to execute a `mapreduce_request` on a shard.
future<query::mapreduce_result> execute_on_this_shard(query::mapreduce_request req, std::optional<tracing::trace_info> tr_info);
locator::token_metadata_ptr get_token_metadata_ptr() const noexcept;

View File

@@ -35,7 +35,7 @@
#include "service/raft/raft_group_registry.hh"
#include "service/storage_service.hh"
#include "service/storage_proxy.hh"
#include "service/forward_service.hh"
#include "service/mapreduce_service.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "auth/service.hh"
#include "auth/common.hh"
@@ -159,7 +159,7 @@ private:
sharded<cdc::generation_service> _cdc_generation_service;
sharded<repair_service> _repair;
sharded<streaming::stream_manager> _stream_manager;
sharded<service::forward_service> _forward_service;
sharded<service::mapreduce_service> _mapreduce_service;
sharded<direct_failure_detector::failure_detector> _fd;
sharded<service::raft_address_map> _raft_address_map;
sharded<service::direct_fd_pinger> _fd_pinger;
@@ -750,8 +750,8 @@ private:
return fs.enable(fs.supported_feature_set());
}).get();
_forward_service.start(std::ref(_ms), std::ref(_proxy), std::ref(_db), std::ref(_token_metadata), std::ref(abort_sources)).get();
auto stop_forward_service = defer([this] { _forward_service.stop().get(); });
_mapreduce_service.start(std::ref(_ms), std::ref(_proxy), std::ref(_db), std::ref(_token_metadata), std::ref(abort_sources)).get();
auto stop_mapreduce_service = defer([this] { _mapreduce_service.stop().get(); });
// gropu0 client exists only on shard 0
service::raft_group0_client group0_client(_group0_registry.local(), _sys_ks.local(), maintenance_mode_enabled::no);
@@ -802,7 +802,7 @@ private:
}).get();
_qp.invoke_on_all([this, &group0_client] (cql3::query_processor& qp) {
qp.start_remote(_mm.local(), _forward_service.local(), _ss.local(), group0_client);
qp.start_remote(_mm.local(), _mapreduce_service.local(), _ss.local(), group0_client);
}).get();
auto stop_qp_remote = defer([this] {
_qp.invoke_on_all(&cql3::query_processor::stop_remote).get();