pager: Keep shared pointer to proxy onboard

Pagers are created by alternator and select statement, both
have the proxy reference at hands. Next, the pager's unique_ptr
is put on the lambda of its fetch_page() continuation and thus
it survives the fetch_page execution and then gets destroyed.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2021-12-22 16:46:16 +03:00
parent 05fa3e07f4
commit 095d93eaf8
6 changed files with 18 additions and 11 deletions

View File

@@ -3352,7 +3352,7 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
command->slice.options.set<query::partition_slice::option::allow_short_read>();
auto query_options = std::make_unique<cql3::query_options>(cl, std::vector<cql3::raw_value>{});
query_options = std::make_unique<cql3::query_options>(std::move(query_options), std::move(paging_state));
auto p = service::pager::query_pagers::pager(schema, selection, *query_state_ptr, *query_options, command, std::move(partition_ranges), nullptr);
auto p = service::pager::query_pagers::pager(proxy, schema, selection, *query_state_ptr, *query_options, command, std::move(partition_ranges), nullptr);
return p->fetch_page(limit, gc_clock::now(), executor::default_timeout()).then(
[p = std::move(p), schema, cql_stats, partition_slice = std::move(partition_slice),

View File

@@ -546,7 +546,7 @@ static future<> scan_table_ranges(
{
const schema_ptr& s = scan_ctx.s;
assert (partition_ranges.size() == 1); // otherwise issue #9167 will cause incorrect results.
auto p = service::pager::query_pagers::pager(s, scan_ctx.selection, *scan_ctx.query_state_ptr,
auto p = service::pager::query_pagers::pager(proxy, s, scan_ctx.selection, *scan_ctx.query_state_ptr,
*scan_ctx.query_options, scan_ctx.command, std::move(partition_ranges), nullptr);
while (!p->is_exhausted()) {
if (abort_source.abort_requested()) {

View File

@@ -379,7 +379,7 @@ select_statement::do_execute(query_processor& qp,
command->slice.options.set<query::partition_slice::option::allow_short_read>();
auto timeout_duration = get_timeout(state.get_client_state(), options);
auto timeout = db::timeout_clock::now() + timeout_duration;
auto p = service::pager::query_pagers::pager(_schema, _selection,
auto p = service::pager::query_pagers::pager(qp.proxy(), _schema, _selection,
state, options, command, std::move(key_ranges), _restrictions_need_filtering ? _restrictions : nullptr);
if (aggregate || nonpaged_filtering) {
@@ -1233,7 +1233,7 @@ indexed_table_select_statement::read_posting_list(query_processor& qp,
});
}
auto p = service::pager::query_pagers::pager(_view_schema, selection,
auto p = service::pager::query_pagers::pager(qp.proxy(), _view_schema, selection,
state, options, cmd, std::move(partition_ranges), nullptr);
return p->fetch_page(options.get_page_size(), now, timeout).then([p = std::move(p), &options, limit, now] (std::unique_ptr<cql3::result_set> rs) {
rs->get_metadata().set_paging_state(p->state());

View File

@@ -48,6 +48,7 @@
namespace service {
class storage_proxy;
class storage_proxy_coordinator_query_result;
namespace pager {
@@ -89,6 +90,7 @@ protected:
std::optional<clustering_key> _last_ckey;
std::optional<utils::UUID> _query_uuid;
shared_ptr<service::storage_proxy> _proxy;
schema_ptr _schema;
shared_ptr<const cql3::selection::selection> _selection;
service::query_state& _state;
@@ -100,7 +102,7 @@ protected:
uint64_t _rows_fetched_for_last_partition = 0;
stats _stats;
public:
query_pager(schema_ptr s, shared_ptr<const cql3::selection::selection> selection,
query_pager(service::storage_proxy& p, schema_ptr s, shared_ptr<const cql3::selection::selection> selection,
service::query_state& state,
const cql3::query_options& options,
lw_shared_ptr<query::read_command> cmd,

View File

@@ -65,7 +65,8 @@ static bool has_clustering_keys(const schema& s, const query::read_command& cmd)
&& !cmd.slice.options.contains<query::partition_slice::option::distinct>();
}
query_pager::query_pager(schema_ptr s, shared_ptr<const cql3::selection::selection> selection,
query_pager::query_pager(service::storage_proxy& p, schema_ptr s,
shared_ptr<const cql3::selection::selection> selection,
service::query_state& state,
const cql3::query_options& options,
lw_shared_ptr<query::read_command> cmd,
@@ -73,6 +74,7 @@ static bool has_clustering_keys(const schema& s, const query::read_command& cmd)
: _has_clustering_keys(has_clustering_keys(*s, *cmd))
, _max(cmd->get_row_limit())
, _per_partition_limit(cmd->slice.partition_row_limit())
, _proxy(p.shared_from_this())
, _schema(std::move(s))
, _selection(selection)
, _state(state)
@@ -241,13 +243,13 @@ future<cql3::result_generator> query_pager::fetch_page_generator(uint32_t page_s
class filtering_query_pager : public query_pager {
::shared_ptr<cql3::restrictions::statement_restrictions> _filtering_restrictions;
public:
filtering_query_pager(schema_ptr s, shared_ptr<const cql3::selection::selection> selection,
filtering_query_pager(service::storage_proxy& p, schema_ptr s, shared_ptr<const cql3::selection::selection> selection,
service::query_state& state,
const cql3::query_options& options,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector ranges,
::shared_ptr<cql3::restrictions::statement_restrictions> filtering_restrictions)
: query_pager(s, selection, state, options, std::move(cmd), std::move(ranges))
: query_pager(p, s, selection, state, options, std::move(cmd), std::move(ranges))
, _filtering_restrictions(std::move(filtering_restrictions))
{}
virtual ~filtering_query_pager() {}
@@ -418,6 +420,7 @@ bool service::pager::query_pagers::may_need_paging(const schema& s, uint32_t pag
}
std::unique_ptr<service::pager::query_pager> service::pager::query_pagers::pager(
service::storage_proxy& proxy,
schema_ptr s, shared_ptr<const cql3::selection::selection> selection,
service::query_state& state, const cql3::query_options& options,
lw_shared_ptr<query::read_command> cmd,
@@ -429,9 +432,9 @@ std::unique_ptr<service::pager::query_pager> service::pager::query_pagers::pager
filtering_restrictions = ::make_shared<cql3::restrictions::statement_restrictions>(s, true);
}
if (filtering_restrictions) {
return std::make_unique<filtering_query_pager>(std::move(s), std::move(selection), state,
return std::make_unique<filtering_query_pager>(proxy, std::move(s), std::move(selection), state,
options, std::move(cmd), std::move(ranges), std::move(filtering_restrictions));
}
return std::make_unique<query_pager>(std::move(s), std::move(selection), state,
return std::make_unique<query_pager>(proxy, std::move(s), std::move(selection), state,
options, std::move(cmd), std::move(ranges));
}

View File

@@ -54,13 +54,15 @@
namespace service {
class storage_proxy;
namespace pager {
class query_pagers {
public:
static bool may_need_paging(const schema& s, uint32_t page_size, const query::read_command&,
const dht::partition_range_vector&);
static std::unique_ptr<query_pager> pager(schema_ptr,
static std::unique_ptr<query_pager> pager(service::storage_proxy& p, schema_ptr,
shared_ptr<const cql3::selection::selection>,
service::query_state&,
const cql3::query_options&,