diff --git a/service/mapreduce_service.cc b/service/mapreduce_service.cc index e9692362f2..971c2a5781 100644 --- a/service/mapreduce_service.cc +++ b/service/mapreduce_service.cc @@ -618,12 +618,164 @@ future<> mapreduce_service::dispatch_to_vnodes(schema_ptr schema, replica::colum }); } +class mapreduce_tablet_algorithm { +private: + class ranges_per_tablet_replica_t; +public: + mapreduce_tablet_algorithm(mapreduce_service& mapreducer, schema_ptr schema, replica::column_family& cf, query::mapreduce_request& req, query::mapreduce_result& result, tracing::trace_state_ptr tr_state) + : _mapreducer(mapreducer), + _schema(schema), + _cf(cf), + _req(req), + _result(result), + _tr_state(tr_state), + _dispatcher(_mapreducer, tr_state), + _limit_per_replica(2) + {} + + future<> initialize_ranges_left() { + auto erm = _cf.get_effective_replication_map(); + auto generator = query_ranges_to_vnodes_generator(erm->make_splitter(), _schema, _req.pr); + while (std::optional range = get_next_partition_range(generator)) { + _ranges_left.insert(std::move(*range)); + // can potentially stall e.g. with a large tablet count. + co_await coroutine::maybe_yield(); + } + + tracing::trace(_tr_state, "Dispatching {} ranges", _ranges_left.size()); + flogger.debug("Dispatching {} ranges", _ranges_left.size()); + } + + future<> prepare_ranges_per_replica() { + auto erm = _cf.get_effective_replication_map(); + const auto& topo = erm->get_topology(); + auto& tablets = erm->get_token_metadata_ptr()->tablets().get_tablet_map(_schema->id()); + + std::map ranges_per_tablet_replica_map; + for (auto& range : _ranges_left) { + auto tablet_id = tablets.get_tablet_id(end_token(range)); + const auto& tablet_info = tablets.get_tablet_info(tablet_id); + + size_t skipped_replicas = 0; + for (auto& replica : tablet_info.replicas) { + bool is_alive = _mapreducer._proxy.is_alive(*erm, replica.host); + bool has_correct_locality = !db::is_datacenter_local(_req.cl) || topo.get_datacenter(replica.host) == topo.get_datacenter(); + if (is_alive && has_correct_locality) { + ranges_per_tablet_replica_map[replica].push_back(range); + } else { + ++skipped_replicas; + if (skipped_replicas == tablet_info.replicas.size()) { + throw std::runtime_error("No live endpoint available"); + } + } + } + + // can potentially stall e.g. with a large tablet count. + co_await coroutine::maybe_yield(); + } + + _ranges_per_replica = ranges_per_tablet_replica_t(erm->get_token_metadata_ptr()->get_version(), std::move(ranges_per_tablet_replica_map)); + } + + std::vector get_processing_slots() const { + std::vector slots; + for (const auto& [replica, _] : _ranges_per_replica.get_map()) { + for (size_t i = 0; i < _limit_per_replica; ++i) { + slots.push_back(replica); + } + } + return slots; + } + + future<> dispatch_work_and_wait_to_finish() { + while (_ranges_left.size() > 0) { + co_await prepare_ranges_per_replica(); + + co_await coroutine::parallel_for_each(get_processing_slots(), + [&] (locator::tablet_replica replica) -> future<> { + auto& ranges = _ranges_per_replica.get_map().find(replica)->second; + for (const auto& range : ranges) { + auto erm = _cf.get_effective_replication_map(); + if (!_ranges_per_replica.is_up_to_date(erm->get_token_metadata_ptr())) { + co_return; + } + + auto it = _ranges_left.find(range); + if (it != _ranges_left.end()) { + _ranges_left.erase(it); + query::mapreduce_request req_with_modified_pr = _req; + req_with_modified_pr.pr = dht::partition_range_vector{range}; + co_await _mapreducer.dispatch_range_and_reduce(erm, _dispatcher, _req, std::move(req_with_modified_pr), replica.host, _result, _tr_state); + } + + // can potentially stall e.g. with a large tablet count. + co_await coroutine::maybe_yield(); + } + }); + } + } + +private: + // The motivation for ranges_per_tablet_replica_t is to store + // a `tablet_replica -> range` mapping that is guaranteed to be + // consistent with the given topology version + class ranges_per_tablet_replica_t { + public: + ranges_per_tablet_replica_t() = default; + ranges_per_tablet_replica_t(topology::version_t topology_version, std::map&& map) + : _topology_version(topology_version) + , _map(std::move(map)) + {} + + ranges_per_tablet_replica_t& operator=(ranges_per_tablet_replica_t&& other) noexcept = default; + + bool is_up_to_date(locator::token_metadata_ptr token_metadata_ptr) const { + return _topology_version == token_metadata_ptr->get_version(); + } + const std::map& get_map() const { + return _map; + } + + private: + topology::version_t _topology_version; + std::map _map; + }; + + mapreduce_service& _mapreducer; + schema_ptr _schema; + replica::column_family& _cf; + query::mapreduce_request& _req; + query::mapreduce_result& _result; + tracing::trace_state_ptr _tr_state; + retrying_dispatcher _dispatcher; + size_t _limit_per_replica; + + struct partition_range_cmp { + bool operator() (const dht::partition_range& a, const dht::partition_range& b) const { + return end_token(a) < end_token(b); + }; + }; + + std::set _ranges_left; + ranges_per_tablet_replica_t _ranges_per_replica; +}; + +future<> mapreduce_service::dispatch_to_tablets(schema_ptr schema, replica::column_family& cf, query::mapreduce_request& req, query::mapreduce_result& result, tracing::trace_state_ptr tr_state) { + mapreduce_tablet_algorithm algorithm(*this, schema, cf, req, result, tr_state); + co_await algorithm.initialize_ranges_left(); + co_await algorithm.dispatch_work_and_wait_to_finish(); +} + future mapreduce_service::dispatch(query::mapreduce_request req, tracing::trace_state_ptr tr_state) { schema_ptr schema = local_schema_registry().get(req.cmd.schema_version); replica::table& cf = _db.local().find_column_family(schema); query::mapreduce_result result; - co_await dispatch_to_vnodes(schema, cf, req, result, tr_state); + if (cf.uses_tablets()) { + co_await dispatch_to_tablets(schema, cf, req, result, tr_state); + } else { + co_await dispatch_to_vnodes(schema, cf, req, result, tr_state); + } mapreduce_aggregates aggrs(req); const bool requires_thread = aggrs.requires_thread(); diff --git a/service/mapreduce_service.hh b/service/mapreduce_service.hh index ac4eb04020..7bed3985d8 100644 --- a/service/mapreduce_service.hh +++ b/service/mapreduce_service.hh @@ -47,7 +47,10 @@ class retrying_dispatcher; // 5. `dispatch` merges results from all coordinators and returns merged // result. // -// Splitting query into sub-queries in is implemented as: +// Splitting query into sub-queries is implemented separately for vnodes +// and for tablets. +// +// The splitting algorithm for vnodes works as follows: // a. Partition ranges of the original query are split into a sequence of // vnodes. // b. Each vnode in the sequence is added to a set associated with some @@ -58,7 +61,24 @@ class retrying_dispatcher; // by the vnode set. This replacement will create a sub-query whose // recipient is the endpoint that holds all vnodes in the set. // -// Query splitting example (3 node cluster with num_tokens set to 3): +// The splitting algorithm for tablets is more dynamic, and unlike +// the algorithm for vnodes, it doesn't block topology changes (i.e. ERM) for +// the whole duration of the query execution: +// a. Prepare a set of exclusive `partition_ranges`, where each range +// represents one tablet. This set is called `ranges_left`, because it +// contains ranges that still need processing. +// b. Loop until `ranges_left` is empty: +// I. Create `tablet_replica` -> `ranges` mapping for the current ERM +// and `ranges_left`. Store this mapping and the number +// representing current ERM version as `ranges_per_replica`. +// II. In parallel, for each tablet_replica, iterate through +// ranges_per_tablet_replica. Select independently up to two ranges +// that are still existing in ranges_left. Remove each range +// selected for processing from ranges_left. Before each iteration, +// verify that ERM version has not changed. If it has, +// return to Step I. +// +// Query splitting (vnodes) example (3 node cluster with num_tokens set to 3): // Original query: mapreduce_request{ // reduction_types=[reduction_type{count}], // cmd=read_command{contents omitted}, @@ -160,6 +180,7 @@ public: private: future<> dispatch_range_and_reduce(const locator::effective_replication_map_ptr& erm, retrying_dispatcher& dispatcher, query::mapreduce_request const& req, query::mapreduce_request&& req_with_modified_pr, locator::host_id addr, query::mapreduce_result& result_, tracing::trace_state_ptr tr_state); future<> dispatch_to_vnodes(schema_ptr schema, replica::column_family& cf, query::mapreduce_request& req, query::mapreduce_result& result, tracing::trace_state_ptr tr_state); + future<> dispatch_to_tablets(schema_ptr schema, replica::column_family& cf, query::mapreduce_request& req, query::mapreduce_result& result, tracing::trace_state_ptr tr_state); // Used to distribute given `mapreduce_request` across shards. future dispatch_to_shards(query::mapreduce_request req, std::optional tr_info); @@ -171,6 +192,7 @@ private: future<> uninit_messaging_service(); friend class retrying_dispatcher; + friend class mapreduce_tablet_algorithm; }; } // namespace service