mapreduce: add tablet-aware dispatching algorithm
The primary goal of this change is to reduce the time during which the
Effective Replication Map (ERM) is retained by the mapreduce service.
This ensures that long aggregate queries do not block topology
operations. As ScyllaDB transitions towards tablets, which simplify
work dispatching, the new algorithm is designed specifically for
tablets.
The algorithm divides work so that each `tablet_replica` (a <host,
shard> pair) processes two tablets at a time. After processing of each
`tablet_replica`, the ERM is released and re-acquired.
The new algorithm can be summarized as follows:
1. Prepare a set of exclusive `partition_ranges`, where each range
represents one tablet. This set is called `ranges_left`, because it
contains ranges that still need processing.
2. Loop until `ranges_left` is empty:
I. Create `tablet_replica` -> `ranges` mapping for the current ERM
and `ranges_left`. Store this mapping and the number
representing current ERM version as `ranges_per_replica`.
II. In parallel, for each tablet_replica, iterate through
ranges_per_tablet_replica. Select independently up to two ranges
that are still existing in ranges_left. Remove each range
selected for processing from ranges_left. Before each iteration,
verify that ERM version has not changed. If it has,
return to Step I.
Steps I and II are exclusive to simplify maintaining `ranges_left` and
`ranges_per_replica`:
- Step I iterates through `ranges_left` and creates
`ranges_per_replica`
- Step II iterates through `ranges_per_replica` and remove processed
ranges from `ranges_left`
To maintain the exclusivity, the algorithm uses `parallel_for_each` in
Step II, requiring all ongoing `tablet_replica` processing to finish
before returning to Step I.
Currently, each node can handle any partition range, even if the
mapreduce supercoordinator does not retain the ERM and the range is
absent locally. This is because `execute_on_this_shard` creates a new
pager to coordinate the partition range read, including obtaining its
own ERM. However, absent ranges are handled by shard 0, so proper
routing is necessary to avoid overloading shard 0. Thus, in Step II,
the ERM is retained during each `tablet_replica` processing.
The tablet split scenario is not well-handled in this implementation.
After a split, the entire pre-split range is sent to a node hosting
the `tablet_replica` containing the range's `end_token`. The node
will typically not have other tablets in the range, and as
aforementioned, absent ranges are handled by shard 0. As a result,
in such scenario, shard 0 handles a significant portion of the range.
This issue is addressed later in this patch series by introducing
`shard_id` in `mapreduce_request`.
Ref. scylladb#21831
This commit is contained in:
@@ -618,12 +618,164 @@ future<> mapreduce_service::dispatch_to_vnodes(schema_ptr schema, replica::colum
|
||||
});
|
||||
}
|
||||
|
||||
class mapreduce_tablet_algorithm {
|
||||
private:
|
||||
class ranges_per_tablet_replica_t;
|
||||
public:
|
||||
mapreduce_tablet_algorithm(mapreduce_service& mapreducer, schema_ptr schema, replica::column_family& cf, query::mapreduce_request& req, query::mapreduce_result& result, tracing::trace_state_ptr tr_state)
|
||||
: _mapreducer(mapreducer),
|
||||
_schema(schema),
|
||||
_cf(cf),
|
||||
_req(req),
|
||||
_result(result),
|
||||
_tr_state(tr_state),
|
||||
_dispatcher(_mapreducer, tr_state),
|
||||
_limit_per_replica(2)
|
||||
{}
|
||||
|
||||
future<> initialize_ranges_left() {
|
||||
auto erm = _cf.get_effective_replication_map();
|
||||
auto generator = query_ranges_to_vnodes_generator(erm->make_splitter(), _schema, _req.pr);
|
||||
while (std::optional<dht::partition_range> range = get_next_partition_range(generator)) {
|
||||
_ranges_left.insert(std::move(*range));
|
||||
// can potentially stall e.g. with a large tablet count.
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
tracing::trace(_tr_state, "Dispatching {} ranges", _ranges_left.size());
|
||||
flogger.debug("Dispatching {} ranges", _ranges_left.size());
|
||||
}
|
||||
|
||||
future<> prepare_ranges_per_replica() {
|
||||
auto erm = _cf.get_effective_replication_map();
|
||||
const auto& topo = erm->get_topology();
|
||||
auto& tablets = erm->get_token_metadata_ptr()->tablets().get_tablet_map(_schema->id());
|
||||
|
||||
std::map<locator::tablet_replica, dht::partition_range_vector> ranges_per_tablet_replica_map;
|
||||
for (auto& range : _ranges_left) {
|
||||
auto tablet_id = tablets.get_tablet_id(end_token(range));
|
||||
const auto& tablet_info = tablets.get_tablet_info(tablet_id);
|
||||
|
||||
size_t skipped_replicas = 0;
|
||||
for (auto& replica : tablet_info.replicas) {
|
||||
bool is_alive = _mapreducer._proxy.is_alive(*erm, replica.host);
|
||||
bool has_correct_locality = !db::is_datacenter_local(_req.cl) || topo.get_datacenter(replica.host) == topo.get_datacenter();
|
||||
if (is_alive && has_correct_locality) {
|
||||
ranges_per_tablet_replica_map[replica].push_back(range);
|
||||
} else {
|
||||
++skipped_replicas;
|
||||
if (skipped_replicas == tablet_info.replicas.size()) {
|
||||
throw std::runtime_error("No live endpoint available");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// can potentially stall e.g. with a large tablet count.
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
|
||||
_ranges_per_replica = ranges_per_tablet_replica_t(erm->get_token_metadata_ptr()->get_version(), std::move(ranges_per_tablet_replica_map));
|
||||
}
|
||||
|
||||
std::vector<locator::tablet_replica> get_processing_slots() const {
|
||||
std::vector<locator::tablet_replica> slots;
|
||||
for (const auto& [replica, _] : _ranges_per_replica.get_map()) {
|
||||
for (size_t i = 0; i < _limit_per_replica; ++i) {
|
||||
slots.push_back(replica);
|
||||
}
|
||||
}
|
||||
return slots;
|
||||
}
|
||||
|
||||
future<> dispatch_work_and_wait_to_finish() {
|
||||
while (_ranges_left.size() > 0) {
|
||||
co_await prepare_ranges_per_replica();
|
||||
|
||||
co_await coroutine::parallel_for_each(get_processing_slots(),
|
||||
[&] (locator::tablet_replica replica) -> future<> {
|
||||
auto& ranges = _ranges_per_replica.get_map().find(replica)->second;
|
||||
for (const auto& range : ranges) {
|
||||
auto erm = _cf.get_effective_replication_map();
|
||||
if (!_ranges_per_replica.is_up_to_date(erm->get_token_metadata_ptr())) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto it = _ranges_left.find(range);
|
||||
if (it != _ranges_left.end()) {
|
||||
_ranges_left.erase(it);
|
||||
query::mapreduce_request req_with_modified_pr = _req;
|
||||
req_with_modified_pr.pr = dht::partition_range_vector{range};
|
||||
co_await _mapreducer.dispatch_range_and_reduce(erm, _dispatcher, _req, std::move(req_with_modified_pr), replica.host, _result, _tr_state);
|
||||
}
|
||||
|
||||
// can potentially stall e.g. with a large tablet count.
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
// The motivation for ranges_per_tablet_replica_t is to store
|
||||
// a `tablet_replica -> range` mapping that is guaranteed to be
|
||||
// consistent with the given topology version
|
||||
class ranges_per_tablet_replica_t {
|
||||
public:
|
||||
ranges_per_tablet_replica_t() = default;
|
||||
ranges_per_tablet_replica_t(topology::version_t topology_version, std::map<locator::tablet_replica, dht::partition_range_vector>&& map)
|
||||
: _topology_version(topology_version)
|
||||
, _map(std::move(map))
|
||||
{}
|
||||
|
||||
ranges_per_tablet_replica_t& operator=(ranges_per_tablet_replica_t&& other) noexcept = default;
|
||||
|
||||
bool is_up_to_date(locator::token_metadata_ptr token_metadata_ptr) const {
|
||||
return _topology_version == token_metadata_ptr->get_version();
|
||||
}
|
||||
const std::map<locator::tablet_replica, dht::partition_range_vector>& get_map() const {
|
||||
return _map;
|
||||
}
|
||||
|
||||
private:
|
||||
topology::version_t _topology_version;
|
||||
std::map<locator::tablet_replica, dht::partition_range_vector> _map;
|
||||
};
|
||||
|
||||
mapreduce_service& _mapreducer;
|
||||
schema_ptr _schema;
|
||||
replica::column_family& _cf;
|
||||
query::mapreduce_request& _req;
|
||||
query::mapreduce_result& _result;
|
||||
tracing::trace_state_ptr _tr_state;
|
||||
retrying_dispatcher _dispatcher;
|
||||
size_t _limit_per_replica;
|
||||
|
||||
struct partition_range_cmp {
|
||||
bool operator() (const dht::partition_range& a, const dht::partition_range& b) const {
|
||||
return end_token(a) < end_token(b);
|
||||
};
|
||||
};
|
||||
|
||||
std::set<dht::partition_range, partition_range_cmp> _ranges_left;
|
||||
ranges_per_tablet_replica_t _ranges_per_replica;
|
||||
};
|
||||
|
||||
future<> mapreduce_service::dispatch_to_tablets(schema_ptr schema, replica::column_family& cf, query::mapreduce_request& req, query::mapreduce_result& result, tracing::trace_state_ptr tr_state) {
|
||||
mapreduce_tablet_algorithm algorithm(*this, schema, cf, req, result, tr_state);
|
||||
co_await algorithm.initialize_ranges_left();
|
||||
co_await algorithm.dispatch_work_and_wait_to_finish();
|
||||
}
|
||||
|
||||
future<query::mapreduce_result> mapreduce_service::dispatch(query::mapreduce_request req, tracing::trace_state_ptr tr_state) {
|
||||
schema_ptr schema = local_schema_registry().get(req.cmd.schema_version);
|
||||
replica::table& cf = _db.local().find_column_family(schema);
|
||||
|
||||
query::mapreduce_result result;
|
||||
co_await dispatch_to_vnodes(schema, cf, req, result, tr_state);
|
||||
if (cf.uses_tablets()) {
|
||||
co_await dispatch_to_tablets(schema, cf, req, result, tr_state);
|
||||
} else {
|
||||
co_await dispatch_to_vnodes(schema, cf, req, result, tr_state);
|
||||
}
|
||||
|
||||
mapreduce_aggregates aggrs(req);
|
||||
const bool requires_thread = aggrs.requires_thread();
|
||||
|
||||
@@ -47,7 +47,10 @@ class retrying_dispatcher;
|
||||
// 5. `dispatch` merges results from all coordinators and returns merged
|
||||
// result.
|
||||
//
|
||||
// Splitting query into sub-queries in is implemented as:
|
||||
// Splitting query into sub-queries is implemented separately for vnodes
|
||||
// and for tablets.
|
||||
//
|
||||
// The splitting algorithm for vnodes works as follows:
|
||||
// a. Partition ranges of the original query are split into a sequence of
|
||||
// vnodes.
|
||||
// b. Each vnode in the sequence is added to a set associated with some
|
||||
@@ -58,7 +61,24 @@ class retrying_dispatcher;
|
||||
// by the vnode set. This replacement will create a sub-query whose
|
||||
// recipient is the endpoint that holds all vnodes in the set.
|
||||
//
|
||||
// Query splitting example (3 node cluster with num_tokens set to 3):
|
||||
// The splitting algorithm for tablets is more dynamic, and unlike
|
||||
// the algorithm for vnodes, it doesn't block topology changes (i.e. ERM) for
|
||||
// the whole duration of the query execution:
|
||||
// a. Prepare a set of exclusive `partition_ranges`, where each range
|
||||
// represents one tablet. This set is called `ranges_left`, because it
|
||||
// contains ranges that still need processing.
|
||||
// b. Loop until `ranges_left` is empty:
|
||||
// I. Create `tablet_replica` -> `ranges` mapping for the current ERM
|
||||
// and `ranges_left`. Store this mapping and the number
|
||||
// representing current ERM version as `ranges_per_replica`.
|
||||
// II. In parallel, for each tablet_replica, iterate through
|
||||
// ranges_per_tablet_replica. Select independently up to two ranges
|
||||
// that are still existing in ranges_left. Remove each range
|
||||
// selected for processing from ranges_left. Before each iteration,
|
||||
// verify that ERM version has not changed. If it has,
|
||||
// return to Step I.
|
||||
//
|
||||
// Query splitting (vnodes) example (3 node cluster with num_tokens set to 3):
|
||||
// Original query: mapreduce_request{
|
||||
// reduction_types=[reduction_type{count}],
|
||||
// cmd=read_command{contents omitted},
|
||||
@@ -160,6 +180,7 @@ public:
|
||||
private:
|
||||
future<> dispatch_range_and_reduce(const locator::effective_replication_map_ptr& erm, retrying_dispatcher& dispatcher, query::mapreduce_request const& req, query::mapreduce_request&& req_with_modified_pr, locator::host_id addr, query::mapreduce_result& result_, tracing::trace_state_ptr tr_state);
|
||||
future<> dispatch_to_vnodes(schema_ptr schema, replica::column_family& cf, query::mapreduce_request& req, query::mapreduce_result& result, tracing::trace_state_ptr tr_state);
|
||||
future<> dispatch_to_tablets(schema_ptr schema, replica::column_family& cf, query::mapreduce_request& req, query::mapreduce_result& result, tracing::trace_state_ptr tr_state);
|
||||
|
||||
// Used to distribute given `mapreduce_request` across shards.
|
||||
future<query::mapreduce_result> dispatch_to_shards(query::mapreduce_request req, std::optional<tracing::trace_info> tr_info);
|
||||
@@ -171,6 +192,7 @@ private:
|
||||
future<> uninit_messaging_service();
|
||||
|
||||
friend class retrying_dispatcher;
|
||||
friend class mapreduce_tablet_algorithm;
|
||||
};
|
||||
|
||||
} // namespace service
|
||||
|
||||
Reference in New Issue
Block a user