db,views: unify time points used for update generation
Until now, view updates were generated with a bunch of random
time points, because the interface was not adjusted for passing
a single time point. The time points were used to determine
whether cells were alive (e.g. because of TTL), so it's better
to unify the process:
1. when generating view updates from user writes, a single time point
is used for the whole operation
2. when generating view updates via the view building process,
a single time point is used for each build step
NOTE: I don't see any reliable and deterministic way of writing
test scenarios which trigger problems with the old code.
After #6488 is resolved and error injection is integrated
into view.cc, tests can be added.
Fixes #6429
Tests: unit(dev)
Message-Id: <f864e965eb2e27ffc13d50359ad1e228894f7121.1590070130.git.sarna@scylladb.com>
This commit is contained in:
committed by
Nadav Har'El
parent
d7fb51a094
commit
77e943e9a3
@@ -1017,7 +1017,8 @@ public:
|
||||
future<> populate_views(
|
||||
std::vector<view_ptr>,
|
||||
dht::token base_token,
|
||||
flat_mutation_reader&&);
|
||||
flat_mutation_reader&&,
|
||||
gc_clock::time_point);
|
||||
|
||||
reader_concurrency_semaphore& read_concurrency_semaphore() {
|
||||
return *_config.read_concurrency_semaphore;
|
||||
@@ -1030,12 +1031,13 @@ public:
|
||||
private:
|
||||
future<row_locker::lock_holder> do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source,
|
||||
tracing::trace_state_ptr tr_state, const io_priority_class& io_priority, query::partition_slice::option_set custom_opts) const;
|
||||
std::vector<view_ptr> affected_views(const schema_ptr& base, const mutation& update) const;
|
||||
std::vector<view_ptr> affected_views(const schema_ptr& base, const mutation& update, gc_clock::time_point now) const;
|
||||
future<> generate_and_propagate_view_updates(const schema_ptr& base,
|
||||
std::vector<view_ptr>&& views,
|
||||
mutation&& m,
|
||||
flat_mutation_reader_opt existings,
|
||||
tracing::trace_state_ptr tr_state) const;
|
||||
tracing::trace_state_ptr tr_state,
|
||||
gc_clock::time_point now) const;
|
||||
|
||||
mutable row_locker _row_locker;
|
||||
future<row_locker::lock_holder> local_base_lock(
|
||||
|
||||
@@ -169,31 +169,32 @@ void stats::register_stats() {
|
||||
});
|
||||
}
|
||||
|
||||
bool partition_key_matches(const schema& base, const view_info& view, const dht::decorated_key& key) {
|
||||
bool partition_key_matches(const schema& base, const view_info& view, const dht::decorated_key& key, gc_clock::time_point now) {
|
||||
return view.select_statement().get_restrictions()->get_partition_key_restrictions()->is_satisfied_by(
|
||||
base, key.key(), clustering_key_prefix::make_empty(), row(), cql3::query_options({ }), gc_clock::now());
|
||||
base, key.key(), clustering_key_prefix::make_empty(), row(), cql3::query_options({ }), now);
|
||||
}
|
||||
|
||||
bool clustering_prefix_matches(const schema& base, const view_info& view, const partition_key& key, const clustering_key_prefix& ck) {
|
||||
bool clustering_prefix_matches(const schema& base, const view_info& view, const partition_key& key, const clustering_key_prefix& ck, gc_clock::time_point now) {
|
||||
return view.select_statement().get_restrictions()->get_clustering_columns_restrictions()->is_satisfied_by(
|
||||
base, key, ck, row(), cql3::query_options({ }), gc_clock::now());
|
||||
base, key, ck, row(), cql3::query_options({ }), now);
|
||||
}
|
||||
|
||||
bool may_be_affected_by(const schema& base, const view_info& view, const dht::decorated_key& key, const rows_entry& update) {
|
||||
bool may_be_affected_by(const schema& base, const view_info& view, const dht::decorated_key& key, const rows_entry& update, gc_clock::time_point now) {
|
||||
// We can guarantee that the view won't be affected if:
|
||||
// - the primary key is excluded by the view filter (note that this isn't true of the filter on regular columns:
|
||||
// even if an update don't match a view condition on a regular column, that update can still invalidate a
|
||||
// pre-existing entry) - note that the upper layers should already have checked the partition key;
|
||||
return clustering_prefix_matches(base, view, key.key(), update.key());
|
||||
return clustering_prefix_matches(base, view, key.key(), update.key(), now);
|
||||
}
|
||||
|
||||
static bool update_requires_read_before_write(const schema& base,
|
||||
const std::vector<view_ptr>& views,
|
||||
const dht::decorated_key& key,
|
||||
const rows_entry& update) {
|
||||
const rows_entry& update,
|
||||
gc_clock::time_point now) {
|
||||
for (auto&& v : views) {
|
||||
view_info& vf = *v->view_info();
|
||||
if (may_be_affected_by(base, vf, key, update)) {
|
||||
if (may_be_affected_by(base, vf, key, update, now)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -227,7 +228,7 @@ static bool is_partition_key_empty(
|
||||
}
|
||||
|
||||
bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_row& update, gc_clock::time_point now) {
|
||||
return clustering_prefix_matches(base, view, key, update.key())
|
||||
return clustering_prefix_matches(base, view, key, update.key(), now)
|
||||
&& boost::algorithm::all_of(
|
||||
view.select_statement().get_restrictions()->get_non_pk_restriction() | boost::adaptors::map_values,
|
||||
[&] (auto&& r) {
|
||||
@@ -755,14 +756,15 @@ public:
|
||||
view_update_builder(schema_ptr s,
|
||||
std::vector<view_updates>&& views_to_update,
|
||||
flat_mutation_reader&& updates,
|
||||
flat_mutation_reader_opt&& existings)
|
||||
flat_mutation_reader_opt&& existings,
|
||||
gc_clock::time_point now)
|
||||
: _schema(std::move(s))
|
||||
, _view_updates(std::move(views_to_update))
|
||||
, _updates(std::move(updates))
|
||||
, _existings(std::move(existings))
|
||||
, _update_tombstone_tracker(*_schema, false)
|
||||
, _existing_tombstone_tracker(*_schema, false)
|
||||
, _now(gc_clock::now()) {
|
||||
, _now(now) {
|
||||
}
|
||||
|
||||
future<std::vector<frozen_mutation_and_schema>> build();
|
||||
@@ -934,11 +936,12 @@ future<std::vector<frozen_mutation_and_schema>> generate_view_updates(
|
||||
const schema_ptr& base,
|
||||
std::vector<view_ptr>&& views_to_update,
|
||||
flat_mutation_reader&& updates,
|
||||
flat_mutation_reader_opt&& existings) {
|
||||
flat_mutation_reader_opt&& existings,
|
||||
gc_clock::time_point now) {
|
||||
auto vs = boost::copy_range<std::vector<view_updates>>(views_to_update | boost::adaptors::transformed([&] (auto&& v) {
|
||||
return view_updates(std::move(v), base);
|
||||
}));
|
||||
auto builder = std::make_unique<view_update_builder>(base, std::move(vs), std::move(updates), std::move(existings));
|
||||
auto builder = std::make_unique<view_update_builder>(base, std::move(vs), std::move(updates), std::move(existings), now);
|
||||
auto f = builder->build();
|
||||
return f.finally([builder = std::move(builder)] { });
|
||||
}
|
||||
@@ -946,7 +949,8 @@ future<std::vector<frozen_mutation_and_schema>> generate_view_updates(
|
||||
query::clustering_row_ranges calculate_affected_clustering_ranges(const schema& base,
|
||||
const dht::decorated_key& key,
|
||||
const mutation_partition& mp,
|
||||
const std::vector<view_ptr>& views) {
|
||||
const std::vector<view_ptr>& views,
|
||||
gc_clock::time_point now) {
|
||||
std::vector<nonwrapping_range<clustering_key_prefix_view>> row_ranges;
|
||||
std::vector<nonwrapping_range<clustering_key_prefix_view>> view_row_ranges;
|
||||
clustering_key_prefix_view::tri_compare cmp(base);
|
||||
@@ -980,7 +984,7 @@ query::clustering_row_ranges calculate_affected_clustering_ranges(const schema&
|
||||
}
|
||||
|
||||
for (auto&& row : mp.clustered_rows()) {
|
||||
if (update_requires_read_before_write(base, views, key, row)) {
|
||||
if (update_requires_read_before_write(base, views, key, row, now)) {
|
||||
row_ranges.emplace_back(row.key());
|
||||
}
|
||||
}
|
||||
@@ -1659,6 +1663,7 @@ private:
|
||||
view_builder& _builder;
|
||||
build_step& _step;
|
||||
built_views _built_views;
|
||||
gc_clock::time_point _now;
|
||||
std::vector<view_ptr> _views_to_build;
|
||||
std::deque<mutation_fragment> _fragments;
|
||||
// The compact_for_query<> that feeds this consumer is already configured
|
||||
@@ -1672,10 +1677,11 @@ private:
|
||||
// beyond our limit on mutation size (by default 32 MB).
|
||||
size_t _fragments_memory_usage = 0;
|
||||
public:
|
||||
consumer(view_builder& builder, build_step& step)
|
||||
consumer(view_builder& builder, build_step& step, gc_clock::time_point now)
|
||||
: _builder(builder)
|
||||
, _step(step)
|
||||
, _built_views{step} {
|
||||
, _built_views{step}
|
||||
, _now(now) {
|
||||
if (!step.current_key.key().is_empty(*_step.reader.schema())) {
|
||||
load_views_to_build();
|
||||
}
|
||||
@@ -1684,7 +1690,7 @@ public:
|
||||
void load_views_to_build() {
|
||||
for (auto&& vs : _step.build_status) {
|
||||
if (_step.current_token() >= vs.next_token) {
|
||||
if (partition_key_matches(*_step.reader.schema(), *vs.view->view_info(), _step.current_key)) {
|
||||
if (partition_key_matches(*_step.reader.schema(), *vs.view->view_info(), _step.current_key, _now)) {
|
||||
_views_to_build.push_back(vs.view);
|
||||
}
|
||||
if (vs.next_token || _step.current_token() != vs.first_token) {
|
||||
@@ -1756,7 +1762,8 @@ public:
|
||||
_step.base->populate_views(
|
||||
_views_to_build,
|
||||
_step.current_token(),
|
||||
make_flat_mutation_reader_from_fragments(_step.base->schema(), std::move(_fragments))).get();
|
||||
make_flat_mutation_reader_from_fragments(_step.base->schema(), std::move(_fragments)),
|
||||
_now).get();
|
||||
_fragments.clear();
|
||||
_fragments_memory_usage = 0;
|
||||
}
|
||||
@@ -1790,13 +1797,14 @@ public:
|
||||
|
||||
// Called in the context of a seastar::thread.
|
||||
void view_builder::execute(build_step& step, exponential_backoff_retry r) {
|
||||
gc_clock::time_point now = gc_clock::now();
|
||||
auto consumer = compact_for_query<emit_only_live_rows::yes, view_builder::consumer>(
|
||||
*step.reader.schema(),
|
||||
gc_clock::now(),
|
||||
now,
|
||||
step.pslice,
|
||||
batch_size,
|
||||
query::max_partitions,
|
||||
view_builder::consumer{*this, step});
|
||||
view_builder::consumer{*this, step, now});
|
||||
consumer.consume_new_partition(step.current_key); // Initialize the state in case we're resuming a partition
|
||||
auto built = step.reader.consume_in_thread(std::move(consumer), db::no_timeout);
|
||||
|
||||
|
||||
@@ -49,10 +49,11 @@ namespace view {
|
||||
* @param base the base table schema.
|
||||
* @param view_info the view info.
|
||||
* @param key the partition key that is updated.
|
||||
* @param time_point time of the operation (for handling liveness: TTL, tombstones, etc).
|
||||
* @return false if we can guarantee that inserting an update for specified key
|
||||
* won't affect the view in any way, true otherwise.
|
||||
*/
|
||||
bool partition_key_matches(const schema& base, const view_info& view, const dht::decorated_key& key);
|
||||
bool partition_key_matches(const schema& base, const view_info& view, const dht::decorated_key& key, gc_clock::time_point now);
|
||||
|
||||
/**
|
||||
* Whether the view might be affected by the provided update.
|
||||
@@ -64,10 +65,11 @@ bool partition_key_matches(const schema& base, const view_info& view, const dht:
|
||||
* @param view_info the view info.
|
||||
* @param key the partition key that is updated.
|
||||
* @param update the base table update being applied.
|
||||
* @param time_point time of the operation (for handling liveness: TTL, tombstones, etc).
|
||||
* @return false if we can guarantee that inserting update for key
|
||||
* won't affect the view in any way, true otherwise.
|
||||
*/
|
||||
bool may_be_affected_by(const schema& base, const view_info& view, const dht::decorated_key& key, const rows_entry& update);
|
||||
bool may_be_affected_by(const schema& base, const view_info& view, const dht::decorated_key& key, const rows_entry& update, gc_clock::time_point now);
|
||||
|
||||
/**
|
||||
* Whether a given base row matches the view filter (and thus if the view should have a corresponding entry).
|
||||
@@ -88,19 +90,21 @@ bool may_be_affected_by(const schema& base, const view_info& view, const dht::de
|
||||
*/
|
||||
bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_row& update, gc_clock::time_point now);
|
||||
|
||||
bool clustering_prefix_matches(const schema& base, const partition_key& key, const clustering_key_prefix& ck);
|
||||
bool clustering_prefix_matches(const schema& base, const partition_key& key, const clustering_key_prefix& ck, gc_clock::time_point now);
|
||||
|
||||
future<std::vector<frozen_mutation_and_schema>> generate_view_updates(
|
||||
const schema_ptr& base,
|
||||
std::vector<view_ptr>&& views_to_update,
|
||||
flat_mutation_reader&& updates,
|
||||
flat_mutation_reader_opt&& existings);
|
||||
flat_mutation_reader_opt&& existings,
|
||||
gc_clock::time_point now);
|
||||
|
||||
query::clustering_row_ranges calculate_affected_clustering_ranges(
|
||||
const schema& base,
|
||||
const dht::decorated_key& key,
|
||||
const mutation_partition& mp,
|
||||
const std::vector<view_ptr>& views);
|
||||
const std::vector<view_ptr>& views,
|
||||
gc_clock::time_point now);
|
||||
|
||||
struct wait_for_all_updates_tag {};
|
||||
using wait_for_all_updates = bool_class<wait_for_all_updates_tag>;
|
||||
|
||||
29
table.cc
29
table.cc
@@ -2038,10 +2038,10 @@ const std::vector<view_ptr>& table::views() const {
|
||||
return _views;
|
||||
}
|
||||
|
||||
std::vector<view_ptr> table::affected_views(const schema_ptr& base, const mutation& update) const {
|
||||
std::vector<view_ptr> table::affected_views(const schema_ptr& base, const mutation& update, gc_clock::time_point now) const {
|
||||
//FIXME: Avoid allocating a vector here; consider returning the boost iterator.
|
||||
return boost::copy_range<std::vector<view_ptr>>(_views | boost::adaptors::filtered([&, this] (auto&& view) {
|
||||
return db::view::partition_key_matches(*base, *view->view_info(), update.decorated_key());
|
||||
return db::view::partition_key_matches(*base, *view->view_info(), update.decorated_key(), now);
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -2070,13 +2070,15 @@ future<> table::generate_and_propagate_view_updates(const schema_ptr& base,
|
||||
std::vector<view_ptr>&& views,
|
||||
mutation&& m,
|
||||
flat_mutation_reader_opt existings,
|
||||
tracing::trace_state_ptr tr_state) const {
|
||||
tracing::trace_state_ptr tr_state,
|
||||
gc_clock::time_point now) const {
|
||||
auto base_token = m.token();
|
||||
return db::view::generate_view_updates(
|
||||
base,
|
||||
std::move(views),
|
||||
flat_mutation_reader_from_mutations({std::move(m)}),
|
||||
std::move(existings)).then([this, base_token = std::move(base_token), tr_state = std::move(tr_state)] (std::vector<frozen_mutation_and_schema>&& updates) mutable {
|
||||
std::move(existings),
|
||||
now).then([this, base_token = std::move(base_token), tr_state = std::move(tr_state)] (std::vector<frozen_mutation_and_schema>&& updates) mutable {
|
||||
tracing::trace(tr_state, "Generated {} view update mutations", updates.size());
|
||||
auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates));
|
||||
return db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, *_config.cf_stats, std::move(tr_state),
|
||||
@@ -2179,13 +2181,15 @@ table::local_base_lock(
|
||||
future<> table::populate_views(
|
||||
std::vector<view_ptr> views,
|
||||
dht::token base_token,
|
||||
flat_mutation_reader&& reader) {
|
||||
flat_mutation_reader&& reader,
|
||||
gc_clock::time_point now) {
|
||||
auto& schema = reader.schema();
|
||||
return db::view::generate_view_updates(
|
||||
schema,
|
||||
std::move(views),
|
||||
std::move(reader),
|
||||
{ }).then([base_token = std::move(base_token), this] (std::vector<frozen_mutation_and_schema>&& updates) mutable {
|
||||
{ },
|
||||
now).then([base_token = std::move(base_token), this] (std::vector<frozen_mutation_and_schema>&& updates) mutable {
|
||||
size_t update_size = memory_usage_of(updates);
|
||||
size_t units_to_wait_for = std::min(_config.view_update_concurrency_semaphore_limit, update_size);
|
||||
return seastar::get_units(*_config.view_update_concurrency_semaphore, units_to_wait_for).then(
|
||||
@@ -2539,14 +2543,15 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
|
||||
}
|
||||
auto& base = schema();
|
||||
m.upgrade(base);
|
||||
auto views = affected_views(base, m);
|
||||
gc_clock::time_point now = gc_clock::now();
|
||||
auto views = affected_views(base, m, now);
|
||||
if (views.empty()) {
|
||||
return make_ready_future<row_locker::lock_holder>();
|
||||
}
|
||||
auto cr_ranges = db::view::calculate_affected_clustering_ranges(*base, m.decorated_key(), m.partition(), views);
|
||||
auto cr_ranges = db::view::calculate_affected_clustering_ranges(*base, m.decorated_key(), m.partition(), views, now);
|
||||
if (cr_ranges.empty()) {
|
||||
tracing::trace(tr_state, "View updates do not require read-before-write");
|
||||
return generate_and_propagate_view_updates(base, std::move(views), std::move(m), { }, std::move(tr_state)).then([] {
|
||||
return generate_and_propagate_view_updates(base, std::move(views), std::move(m), { }, std::move(tr_state), now).then([] {
|
||||
// In this case we are not doing a read-before-write, just a
|
||||
// write, so no lock is needed.
|
||||
return make_ready_future<row_locker::lock_holder>();
|
||||
@@ -2569,15 +2574,15 @@ future<row_locker::lock_holder> table::do_push_view_replica_updates(const schema
|
||||
// We'll return this lock to the caller, which will release it after
|
||||
// writing the base-table update.
|
||||
future<row_locker::lock_holder> lockf = local_base_lock(base, m.decorated_key(), slice.default_row_ranges(), timeout);
|
||||
return lockf.then([m = std::move(m), slice = std::move(slice), views = std::move(views), base, this, timeout, source = std::move(source), tr_state = std::move(tr_state), &io_priority] (row_locker::lock_holder lock) mutable {
|
||||
return lockf.then([m = std::move(m), slice = std::move(slice), views = std::move(views), base, this, timeout, now, source = std::move(source), tr_state = std::move(tr_state), &io_priority] (row_locker::lock_holder lock) mutable {
|
||||
tracing::trace(tr_state, "View updates for {}.{} require read-before-write - base table reader is created", base->ks_name(), base->cf_name());
|
||||
return do_with(
|
||||
dht::partition_range::make_singular(m.decorated_key()),
|
||||
std::move(slice),
|
||||
std::move(m),
|
||||
[base, views = std::move(views), lock = std::move(lock), this, timeout, source = std::move(source), &io_priority, tr_state = std::move(tr_state)] (auto& pk, auto& slice, auto& m) mutable {
|
||||
[base, views = std::move(views), lock = std::move(lock), this, timeout, now, source = std::move(source), &io_priority, tr_state = std::move(tr_state)] (auto& pk, auto& slice, auto& m) mutable {
|
||||
auto reader = source.make_reader(base, no_reader_permit(), pk, slice, io_priority, tr_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
||||
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader), tr_state).then([base, tr_state = std::move(tr_state), lock = std::move(lock)] () mutable {
|
||||
return this->generate_and_propagate_view_updates(base, std::move(views), std::move(m), std::move(reader), tr_state, now).then([base, tr_state = std::move(tr_state), lock = std::move(lock)] () mutable {
|
||||
tracing::trace(tr_state, "View updates for {}.{} were generated and propagated", base->ks_name(), base->cf_name());
|
||||
// return the local partition/row lock we have taken so it
|
||||
// remains locked until the caller is done modifying this
|
||||
|
||||
Reference in New Issue
Block a user