db,view: add delete ghost rows visitor
The visitor is used to traverse view rows, and if it detects a ghost row it qualifies it for deletion. Qualification is based on a base table read with cl=ALL: if the corresponding row is not present in the base table, it is considered a ghost.
This commit is contained in:
47
db/view/delete_ghost_rows_visitor.hh
Normal file
47
db/view/delete_ghost_rows_visitor.hh
Normal file
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
/* Copyright 2022-present ScyllaDB */
|
||||
|
||||
#include "query-result-reader.hh"
|
||||
#include "replica/database_fwd.hh"
|
||||
|
||||
namespace service {
|
||||
class storage_proxy;
|
||||
class query_state;
|
||||
}
|
||||
|
||||
namespace db::view {
|
||||
|
||||
class delete_ghost_rows_visitor {
|
||||
service::storage_proxy& _proxy;
|
||||
service::query_state& _state;
|
||||
db::timeout_clock::duration _timeout_duration;
|
||||
view_ptr _view;
|
||||
replica::table& _view_table;
|
||||
schema_ptr _base_schema;
|
||||
std::optional<partition_key> _view_pk;
|
||||
public:
|
||||
delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration);
|
||||
|
||||
void add_value(const column_definition& def, query::result_row_view::iterator_type& i) {
|
||||
}
|
||||
|
||||
void accept_new_partition(const partition_key& key, uint32_t row_count);
|
||||
|
||||
void accept_new_partition(uint32_t row_count) {
|
||||
}
|
||||
|
||||
// Assumes running in seastar::thread
|
||||
void accept_new_row(const clustering_key& ck, const query::result_row_view& static_row, const query::result_row_view& row);
|
||||
|
||||
void accept_new_row(const query::result_row_view& static_row, const query::result_row_view& row) {
|
||||
}
|
||||
|
||||
uint32_t accept_partition_end(const query::result_row_view& static_row) {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
} //namespace db::view
|
||||
@@ -55,6 +55,7 @@
|
||||
#include "query-result-writer.hh"
|
||||
#include "readers/from_fragments_v2.hh"
|
||||
#include "readers/evictable.hh"
|
||||
#include "delete_ghost_rows_visitor.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -2169,5 +2170,59 @@ std::vector<db::view::view_and_base> with_base_info_snapshot(std::vector<view_pt
|
||||
}));
|
||||
}
|
||||
|
||||
delete_ghost_rows_visitor::delete_ghost_rows_visitor(service::storage_proxy& proxy, service::query_state& state, view_ptr view, db::timeout_clock::duration timeout_duration)
|
||||
: _proxy(proxy)
|
||||
, _state(state)
|
||||
, _timeout_duration(timeout_duration)
|
||||
, _view(view)
|
||||
, _view_table(_proxy.get_db().local().find_column_family(view))
|
||||
, _base_schema(_proxy.get_db().local().find_schema(_view->view_info()->base_id()))
|
||||
, _view_pk()
|
||||
{}
|
||||
|
||||
void delete_ghost_rows_visitor::accept_new_partition(const partition_key& key, uint32_t row_count) {
|
||||
assert(thread::running_in_thread());
|
||||
_view_pk = key;
|
||||
}
|
||||
|
||||
// Assumes running in seastar::thread
|
||||
void delete_ghost_rows_visitor::accept_new_row(const clustering_key& ck, const query::result_row_view& static_row, const query::result_row_view& row) {
|
||||
auto view_exploded_pk = _view_pk->explode();
|
||||
auto view_exploded_ck = ck.explode();
|
||||
std::vector<bytes> base_exploded_pk(_base_schema->partition_key_size());
|
||||
std::vector<bytes> base_exploded_ck(_base_schema->clustering_key_size());
|
||||
for (const column_definition& view_cdef : _view->all_columns()) {
|
||||
const column_definition* base_cdef = _base_schema->get_column_definition(view_cdef.name());
|
||||
if (base_cdef) {
|
||||
std::vector<bytes>& view_exploded_key = view_cdef.is_partition_key() ? view_exploded_pk : view_exploded_ck;
|
||||
if (base_cdef->is_partition_key()) {
|
||||
base_exploded_pk[base_cdef->id] = view_exploded_key[view_cdef.id];
|
||||
} else if (base_cdef->is_clustering_key()) {
|
||||
base_exploded_ck[base_cdef->id] = view_exploded_key[view_cdef.id];
|
||||
}
|
||||
}
|
||||
}
|
||||
partition_key base_pk = partition_key::from_exploded(base_exploded_pk);
|
||||
clustering_key base_ck = clustering_key::from_exploded(base_exploded_ck);
|
||||
|
||||
dht::partition_range_vector partition_ranges({dht::partition_range::make_singular(dht::decorate_key(*_base_schema, base_pk))});
|
||||
auto selection = cql3::selection::selection::for_columns(_base_schema, std::vector<const column_definition*>({&_base_schema->partition_key_columns().front()}));
|
||||
|
||||
std::vector<query::clustering_range> bounds{query::clustering_range::make_singular(base_ck)};
|
||||
query::partition_slice partition_slice(std::move(bounds), {}, {}, selection->get_query_options());
|
||||
auto command = ::make_lw_shared<query::read_command>(_base_schema->id(), _base_schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice));
|
||||
auto timeout = db::timeout_clock::now() + _timeout_duration;
|
||||
service::storage_proxy::coordinator_query_options opts{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state()};
|
||||
auto base_qr = _proxy.query(_base_schema, command, std::move(partition_ranges), db::consistency_level::ALL, opts).get0();
|
||||
query::result& result = *base_qr.query_result;
|
||||
if (result.row_count().value_or(0) == 0) {
|
||||
mutation m(_view, *_view_pk);
|
||||
auto& row = m.partition().clustered_row(*_view, ck);
|
||||
row.apply(tombstone(api::new_timestamp(), gc_clock::now()));
|
||||
timeout = db::timeout_clock::now() + _timeout_duration;
|
||||
_proxy.mutate({m}, db::consistency_level::ALL, timeout, _state.get_trace_state(), empty_service_permit()).get();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace view
|
||||
} // namespace db
|
||||
|
||||
Reference in New Issue
Block a user