indexed_table_select_statement: return some exceptions as exception messages

Adjusts the indexed_table_select_statement so that it uses the
result-aware methods in storage_proxy and propagates failed results as
result_message::exception.
This commit is contained in:
Piotr Dulikowski
2022-02-17 03:08:25 +01:00
parent 091b20019b
commit ddf049738d
2 changed files with 66 additions and 64 deletions

View File

@@ -534,7 +534,7 @@ indexed_table_select_statement::prepare_command_for_base_query(query_processor&
return cmd;
}
future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>>
future<coordinator_result<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>>>
indexed_table_select_statement::do_execute_base_query(
query_processor& qp,
dht::partition_range_vector&& partition_ranges,
@@ -568,7 +568,7 @@ indexed_table_select_statement::do_execute_base_query(
auto& ranges_to_vnodes = query_state.ranges_to_vnodes;
auto& concurrency = query_state.concurrency;
auto& previous_result_size = query_state.previous_result_size;
return repeat([this, is_paged, &previous_result_size, &ranges_to_vnodes, &merger, &qp, &state, &options, &concurrency, cmd, timeout]() {
return utils::result_repeat([this, is_paged, &previous_result_size, &ranges_to_vnodes, &merger, &qp, &state, &options, &concurrency, cmd, timeout]() {
// Starting with 1 range, we check if the result was a short read, and if not,
// we continue exponentially, asking for 2x more ranges than before
dht::partition_range_vector prange = ranges_to_vnodes(concurrency);
@@ -605,20 +605,18 @@ indexed_table_select_statement::do_execute_base_query(
if (previous_result_size < query::result_memory_limiter::maximum_result_size && concurrency < max_base_table_query_concurrency) {
concurrency *= 2;
}
return qp.proxy().query(_schema, command, std::move(prange), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
.then([is_paged, &previous_result_size, &ranges_to_vnodes, &merger] (service::storage_proxy::coordinator_query_result qr) {
return qp.proxy().query_result(_schema, command, std::move(prange), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
.then(utils::result_wrap([is_paged, &previous_result_size, &ranges_to_vnodes, &merger] (service::storage_proxy::coordinator_query_result qr) -> coordinator_result<stop_iteration> {
auto is_short_read = qr.query_result->is_short_read();
// Results larger than 1MB should be shipped to the client immediately
const bool page_limit_reached = is_paged && qr.query_result->buf().size() >= query::result_memory_limiter::maximum_result_size;
previous_result_size = qr.query_result->buf().size();
merger(std::move(qr.query_result));
return stop_iteration(is_short_read || ranges_to_vnodes.empty() || page_limit_reached);
});
}).then([&merger]() {
return merger.get();
});
}).then([cmd] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
return make_ready_future<value_type>(value_type(std::move(result), std::move(cmd)));
}));
}).then(utils::result_wrap([&merger, cmd]() mutable {
return make_ready_future<coordinator_result<value_type>>(value_type(merger.get(), std::move(cmd)));
}));
});
}
@@ -630,13 +628,14 @@ indexed_table_select_statement::execute_base_query(
const query_options& options,
gc_clock::time_point now,
lw_shared_ptr<const service::pager::paging_state> paging_state) const {
return do_execute_base_query(qp, std::move(partition_ranges), state, options, now, paging_state).then_unpack(
[this, &state, &options, now, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result, lw_shared_ptr<query::read_command> cmd) {
return do_execute_base_query(qp, std::move(partition_ranges), state, options, now, paging_state).then(wrap_result_to_error_message(
[this, &state, &options, now, paging_state = std::move(paging_state)] (std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>> result_and_cmd) {
auto&& [result, cmd] = result_and_cmd;
return process_base_query_results(std::move(result), std::move(cmd), state, options, now, std::move(paging_state));
});
}));
}
future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>>
future<coordinator_result<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>>>
indexed_table_select_statement::do_execute_base_query(
query_processor& qp,
std::vector<primary_key>&& primary_keys,
@@ -671,7 +670,7 @@ indexed_table_select_statement::do_execute_base_query(
auto &key_it = query_state.current_primary_key;
auto &previous_result_size = query_state.previous_result_size;
auto &next_iteration_size = query_state.next_iteration_size;
return repeat([this, is_paged, &previous_result_size, &next_iteration_size, &keys, &key_it, &merger, &qp, &state, &options, cmd, timeout]() {
return utils::result_repeat([this, is_paged, &previous_result_size, &next_iteration_size, &keys, &key_it, &merger, &qp, &state, &options, cmd, timeout]() {
// Starting with 1 key, we check if the result was a short read, and if not,
// we continue exponentially, asking for 2x more key than before
auto already_done = std::distance(keys.begin(), key_it);
@@ -685,7 +684,7 @@ indexed_table_select_statement::do_execute_base_query(
auto command = ::make_lw_shared<query::read_command>(*cmd);
query::result_merger oneshot_merger(cmd->get_row_limit(), query::max_partitions);
return map_reduce(key_it, key_it_end, [this, &qp, &state, &options, cmd, timeout] (auto& key) {
return utils::result_map_reduce(key_it, key_it_end, [this, &qp, &state, &options, cmd, timeout] (auto& key) {
auto command = ::make_lw_shared<query::read_command>(*cmd);
// for each partition, read just one clustering row (TODO: can
// get all needed rows of one partition at once.)
@@ -693,11 +692,11 @@ indexed_table_select_statement::do_execute_base_query(
if (key.clustering) {
command->slice._row_ranges.push_back(query::clustering_range::make_singular(key.clustering));
}
return qp.proxy().query(_schema, command, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
.then([] (service::storage_proxy::coordinator_query_result qr) {
return qp.proxy().query_result(_schema, command, {dht::partition_range::make_singular(key.partition)}, options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
.then(utils::result_wrap([] (service::storage_proxy::coordinator_query_result qr) -> coordinator_result<foreign_ptr<lw_shared_ptr<query::result>>> {
return std::move(qr.query_result);
});
}, std::move(oneshot_merger)).then([is_paged, &previous_result_size, &key_it, key_it_end = std::move(key_it_end), &keys, &merger] (foreign_ptr<lw_shared_ptr<query::result>> result) {
}));
}, std::move(oneshot_merger)).then(utils::result_wrap([is_paged, &previous_result_size, &key_it, key_it_end = std::move(key_it_end), &keys, &merger] (foreign_ptr<lw_shared_ptr<query::result>> result) -> coordinator_result<stop_iteration> {
auto is_short_read = result->is_short_read();
// Results larger than 1MB should be shipped to the client immediately
const bool page_limit_reached = is_paged && result->buf().size() >= query::result_memory_limiter::maximum_result_size;
@@ -705,12 +704,10 @@ indexed_table_select_statement::do_execute_base_query(
merger(std::move(result));
key_it = key_it_end;
return stop_iteration(is_short_read || key_it == keys.end() || page_limit_reached);
});
}).then([&merger] () {
return merger.get();
}).then([cmd] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
return make_ready_future<value_type>(value_type(std::move(result), std::move(cmd)));
});
}));
}).then(utils::result_wrap([&merger, cmd] () mutable {
return make_ready_future<coordinator_result<value_type>>(value_type(merger.get(), std::move(cmd)));
}));
});
}
@@ -722,10 +719,11 @@ indexed_table_select_statement::execute_base_query(
const query_options& options,
gc_clock::time_point now,
lw_shared_ptr<const service::pager::paging_state> paging_state) const {
return do_execute_base_query(qp, std::move(primary_keys), state, options, now, paging_state).then_unpack(
[this, &state, &options, now, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result, lw_shared_ptr<query::read_command> cmd) {
return do_execute_base_query(qp, std::move(primary_keys), state, options, now, paging_state).then(wrap_result_to_error_message(
[this, &state, &options, now, paging_state = std::move(paging_state)] (std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>> result_and_cmd){
auto&& [result, cmd] = result_and_cmd;
return process_base_query_results(std::move(result), std::move(cmd), state, options, now, std::move(paging_state));
});
}));
}
future<shared_ptr<cql_transport::messages::result_message>>
@@ -1102,8 +1100,8 @@ indexed_table_select_statement::do_execute(query_processor& qp,
[this, &options, &qp, &state, now, whole_partitions, partition_slices] (cql3::selection::result_set_builder& builder, std::unique_ptr<cql3::query_options>& internal_options) {
// page size is set to the internal count page size, regardless of the user-provided value
internal_options.reset(new cql3::query_options(std::move(internal_options), options.get_paging_state(), internal_paging_size));
return repeat([this, &builder, &options, &internal_options, &qp, &state, now, whole_partitions, partition_slices] () {
auto consume_results = [this, &builder, &options, &internal_options, &state] (foreign_ptr<lw_shared_ptr<query::result>> results, lw_shared_ptr<query::read_command> cmd, lw_shared_ptr<const service::pager::paging_state> paging_state) {
return utils::result_repeat([this, &builder, &options, &internal_options, &qp, &state, now, whole_partitions, partition_slices] () {
auto consume_results = [this, &builder, &options, &internal_options, &state] (foreign_ptr<lw_shared_ptr<query::result>> results, lw_shared_ptr<query::read_command> cmd, lw_shared_ptr<const service::pager::paging_state> paging_state) -> coordinator_result<stop_iteration> {
if (paging_state) {
paging_state = generate_view_paging_state_from_base_query_results(paging_state, results, state, options);
}
@@ -1120,29 +1118,29 @@ indexed_table_select_statement::do_execute(query_processor& qp,
};
if (whole_partitions || partition_slices) {
return find_index_partition_ranges(qp, state, *internal_options).then_unpack(
return find_index_partition_ranges(qp, state, *internal_options).then(utils::result_wrap_unpack(
[this, now, &state, &internal_options, &qp, consume_results = std::move(consume_results)] (dht::partition_range_vector partition_ranges, lw_shared_ptr<const service::pager::paging_state> paging_state) {
return do_execute_base_query(qp, std::move(partition_ranges), state, *internal_options, now, paging_state)
.then_unpack([paging_state, consume_results = std::move(consume_results)](foreign_ptr<lw_shared_ptr<query::result>> results, lw_shared_ptr<query::read_command> cmd) {
.then(utils::result_wrap_unpack([paging_state, consume_results = std::move(consume_results)](foreign_ptr<lw_shared_ptr<query::result>> results, lw_shared_ptr<query::read_command> cmd) {
return consume_results(std::move(results), std::move(cmd), std::move(paging_state));
});
});
}));
}));
} else {
return find_index_clustering_rows(qp, state, *internal_options).then_unpack(
return find_index_clustering_rows(qp, state, *internal_options).then(utils::result_wrap_unpack(
[this, now, &state, &internal_options, &qp, consume_results = std::move(consume_results)] (std::vector<primary_key> primary_keys, lw_shared_ptr<const service::pager::paging_state> paging_state) {
return this->do_execute_base_query(qp, std::move(primary_keys), state, *internal_options, now, paging_state)
.then_unpack([paging_state, consume_results = std::move(consume_results)](foreign_ptr<lw_shared_ptr<query::result>> results, lw_shared_ptr<query::read_command> cmd) {
.then(utils::result_wrap_unpack([paging_state, consume_results = std::move(consume_results)](foreign_ptr<lw_shared_ptr<query::result>> results, lw_shared_ptr<query::read_command> cmd) {
return consume_results(std::move(results), std::move(cmd), std::move(paging_state));
});
});
}));
}));
}
}).then([this, &builder] () {
}).then(wrap_result_to_error_message([this, &builder] () {
auto rs = builder.build();
update_stats_rows_read(rs->size());
_stats.filtered_rows_matched_total += _restrictions_need_filtering ? rs->size() : 0;
auto msg = ::make_shared<cql_transport::messages::result_message::rows>(result(std::move(rs)));
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(std::move(msg));
});
}));
});
}
@@ -1150,16 +1148,18 @@ indexed_table_select_statement::do_execute(query_processor& qp,
tracing::trace(state.get_trace_state(), "Consulting index {} for a single slice of keys", _index.metadata().name());
// In this case, can use our normal query machinery, which retrieves
// entire partitions or the same slice for many partitions.
return find_index_partition_ranges(qp, state, options).then_unpack([now, &state, &options, &qp, this] (dht::partition_range_vector partition_ranges, lw_shared_ptr<const service::pager::paging_state> paging_state) {
return find_index_partition_ranges(qp, state, options).then(wrap_result_to_error_message([now, &state, &options, &qp, this] (std::tuple<dht::partition_range_vector, lw_shared_ptr<const service::pager::paging_state>> result) {
auto&& [partition_ranges, paging_state] = result;
return this->execute_base_query(qp, std::move(partition_ranges), state, options, now, std::move(paging_state));
});
}));
} else {
tracing::trace(state.get_trace_state(), "Consulting index {} for a list of rows containing keys", _index.metadata().name());
// In this case, we need to retrieve a list of rows (not entire
// partitions) and then retrieve those specific rows.
return find_index_clustering_rows(qp, state, options).then_unpack([now, &state, &options, &qp, this] (std::vector<primary_key> primary_keys, lw_shared_ptr<const service::pager::paging_state> paging_state) {
return find_index_clustering_rows(qp, state, options).then(wrap_result_to_error_message([now, &state, &options, &qp, this] (std::tuple<std::vector<primary_key>, lw_shared_ptr<const service::pager::paging_state>> result) {
auto&& [primary_keys, paging_state] = result;
return this->execute_base_query(qp, std::move(primary_keys), state, options, now, std::move(paging_state));
});
}));
}
}
@@ -1217,7 +1217,7 @@ query::partition_slice indexed_table_select_statement::get_partition_slice_for_l
// Utility function for reading from the index view (get_index_view()))
// the posting-list for a particular value of the indexed column.
// Remember a secondary index can only be created on a single column.
future<::shared_ptr<cql_transport::messages::result_message::rows>>
future<coordinator_result<::shared_ptr<cql_transport::messages::result_message::rows>>>
indexed_table_select_statement::read_posting_list(query_processor& qp,
const query_options& options,
uint64_t limit,
@@ -1255,27 +1255,29 @@ indexed_table_select_statement::read_posting_list(query_processor& qp,
int32_t page_size = options.get_page_size();
if (page_size <= 0 || !service::pager::query_pagers::may_need_paging(*_view_schema, page_size, *cmd, partition_ranges)) {
return qp.proxy().query(_view_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
.then([this, now, &options, selection = std::move(selection), partition_slice = std::move(partition_slice)] (service::storage_proxy::coordinator_query_result qr) {
return qp.proxy().query_result(_view_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
.then(utils::result_wrap([this, now, &options, selection = std::move(selection), partition_slice = std::move(partition_slice)] (service::storage_proxy::coordinator_query_result qr)
-> coordinator_result<::shared_ptr<cql_transport::messages::result_message::rows>> {
cql3::selection::result_set_builder builder(*selection, now, options.get_cql_serialization_format());
query::result_view::consume(*qr.query_result,
std::move(partition_slice),
cql3::selection::result_set_builder::visitor(builder, *_view_schema, *selection));
return ::make_shared<cql_transport::messages::result_message::rows>(result(builder.build()));
});
}));
}
auto p = service::pager::query_pagers::pager(qp.proxy(), _view_schema, selection,
state, options, cmd, std::move(partition_ranges), nullptr);
return p->fetch_page(options.get_page_size(), now, timeout).then([p = std::move(p), &options, limit, now] (std::unique_ptr<cql3::result_set> rs) {
return p->fetch_page_result(options.get_page_size(), now, timeout).then(utils::result_wrap([p = std::move(p), &options, limit, now] (std::unique_ptr<cql3::result_set> rs)
-> coordinator_result<::shared_ptr<cql_transport::messages::result_message::rows>> {
rs->get_metadata().set_paging_state(p->state());
return ::make_shared<cql_transport::messages::result_message::rows>(result(std::move(rs)));
});
}));
}
// Note: the partitions keys returned by this function are sorted
// in token order. See issue #3423.
future<std::tuple<dht::partition_range_vector, lw_shared_ptr<const service::pager::paging_state>>>
future<coordinator_result<std::tuple<dht::partition_range_vector, lw_shared_ptr<const service::pager::paging_state>>>>
indexed_table_select_statement::find_index_partition_ranges(query_processor& qp,
service::query_state& state,
const query_options& options) const
@@ -1283,7 +1285,7 @@ indexed_table_select_statement::find_index_partition_ranges(query_processor& qp,
using value_type = std::tuple<dht::partition_range_vector, lw_shared_ptr<const service::pager::paging_state>>;
auto now = gc_clock::now();
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
return read_posting_list(qp, options, get_limit(options), state, now, timeout, false).then(
return read_posting_list(qp, options, get_limit(options), state, now, timeout, false).then(utils::result_wrap(
[this, now, &options] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {
auto rs = cql3::untyped_result_set(rows);
dht::partition_range_vector partition_ranges;
@@ -1312,19 +1314,19 @@ indexed_table_select_statement::find_index_partition_ranges(query_processor& qp,
partition_ranges.emplace_back(range);
}
auto paging_state = rows->rs().get_metadata().paging_state();
return make_ready_future<value_type>(value_type(std::move(partition_ranges), std::move(paging_state)));
});
return make_ready_future<coordinator_result<value_type>>(value_type(std::move(partition_ranges), std::move(paging_state)));
}));
}
// Note: the partitions keys returned by this function are sorted
// in token order. See issue #3423.
future<std::tuple<std::vector<indexed_table_select_statement::primary_key>, lw_shared_ptr<const service::pager::paging_state>>>
future<coordinator_result<std::tuple<std::vector<indexed_table_select_statement::primary_key>, lw_shared_ptr<const service::pager::paging_state>>>>
indexed_table_select_statement::find_index_clustering_rows(query_processor& qp, service::query_state& state, const query_options& options) const
{
using value_type = std::tuple<std::vector<indexed_table_select_statement::primary_key>, lw_shared_ptr<const service::pager::paging_state>>;
auto now = gc_clock::now();
auto timeout = db::timeout_clock::now() + get_timeout(state.get_client_state(), options);
return read_posting_list(qp, options, get_limit(options), state, now, timeout, true).then(
return read_posting_list(qp, options, get_limit(options), state, now, timeout, true).then(utils::result_wrap(
[this, now, &options] (::shared_ptr<cql_transport::messages::result_message::rows> rows) {
auto rs = cql3::untyped_result_set(rows);
@@ -1344,8 +1346,8 @@ indexed_table_select_statement::find_index_clustering_rows(query_processor& qp,
primary_keys.emplace_back(primary_key{std::move(dk), std::move(ck)});
}
auto paging_state = rows->rs().get_metadata().paging_state();
return make_ready_future<value_type>(value_type(std::move(primary_keys), std::move(paging_state)));
});
return make_ready_future<coordinator_result<value_type>>(value_type(std::move(primary_keys), std::move(paging_state)));
}));
}

View File

@@ -216,11 +216,11 @@ private:
lw_shared_ptr<const service::pager::paging_state> generate_view_paging_state_from_base_query_results(lw_shared_ptr<const service::pager::paging_state> paging_state,
const foreign_ptr<lw_shared_ptr<query::result>>& results, service::query_state& state, const query_options& options) const;
future<std::tuple<dht::partition_range_vector, lw_shared_ptr<const service::pager::paging_state>>> find_index_partition_ranges(query_processor& qp,
future<coordinator_result<std::tuple<dht::partition_range_vector, lw_shared_ptr<const service::pager::paging_state>>>> find_index_partition_ranges(query_processor& qp,
service::query_state& state,
const query_options& options) const;
future<std::tuple<std::vector<primary_key>, lw_shared_ptr<const service::pager::paging_state>>> find_index_clustering_rows(query_processor& qp,
future<coordinator_result<std::tuple<std::vector<primary_key>, lw_shared_ptr<const service::pager::paging_state>>>> find_index_clustering_rows(query_processor& qp,
service::query_state& state,
const query_options& options) const;
@@ -237,7 +237,7 @@ private:
prepare_command_for_base_query(query_processor& qp, const query_options& options, service::query_state& state, gc_clock::time_point now,
bool use_paging) const;
future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>>
future<coordinator_result<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>>>
do_execute_base_query(
query_processor& qp,
dht::partition_range_vector&& partition_ranges,
@@ -263,7 +263,7 @@ private:
// but to implement the general case (multiple rows from multiple partitions)
// efficiently, we will need more support from other layers.
// Keys are ordered in token order (see #3423)
future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>>
future<coordinator_result<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>>>
do_execute_base_query(
query_processor& qp,
std::vector<primary_key>&& primary_keys,
@@ -285,7 +285,7 @@ private:
_stats.secondary_index_rows_read += rows_read;
}
future<::shared_ptr<cql_transport::messages::result_message::rows>>read_posting_list(
future<coordinator_result<::shared_ptr<cql_transport::messages::result_message::rows>>> read_posting_list(
query_processor& qp,
const query_options& options,
uint64_t limit,