db/view: Calculate clustering ranges for MV read-before-write query

Introduce the calculate_affected_clustering_ranges() function to
calculate the smallest subject of affected clustering ranges that we
need to query for.

The update_requires_read_before_write() function checks whether
a view is potentially affected by the base update.

The patch also cleans up the may_be_affected_by() function.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2017-03-04 14:25:48 +01:00
parent ec681060a8
commit 8a77bfe35b
2 changed files with 92 additions and 3 deletions

View File

@@ -39,6 +39,9 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <vector>
#include <functional>
#include <boost/range/algorithm/transform.hpp>
#include <boost/range/adaptors.hpp>
@@ -47,6 +50,7 @@
#include "cql3/util.hh"
#include "db/view/view.hh"
#include "gms/inet_address.hh"
#include "keys.hh"
#include "locator/network_topology_strategy.hh"
#include "service/storage_service.hh"
#include "view_info.hh"
@@ -131,14 +135,14 @@ bool clustering_prefix_matches(const schema& base, const view_info& view, const
});
}
bool may_be_affected_by(const schema& base, const view_info view, const dht::decorated_key& key, const rows_entry& update) {
bool may_be_affected_by(const schema& base, const view_info& view, const dht::decorated_key& key, const rows_entry& update) {
// We can guarantee that the view won't be affected if:
// - the primary key is excluded by the view filter (note that this isn't true of the filter on regular columns:
// even if an update don't match a view condition on a regular column, that update can still invalidate a
// pre-existing entry);
// pre-existing entry) - note that the upper layers should already have checked the partition key;
// - the update doesn't modify any of the columns impacting the view (where "impacting" the view means that column
// is neither included in the view, nor used by the view filter).
if (!partition_key_matches(base, view, key) && !clustering_prefix_matches(base, view, key.key(), update.key())) {
if (!clustering_prefix_matches(base, view, key.key(), update.key())) {
return false;
}
@@ -157,6 +161,27 @@ bool may_be_affected_by(const schema& base, const view_info view, const dht::dec
return affected;
}
static bool update_requires_read_before_write(const schema& base,
const std::vector<view_ptr>& views,
const dht::decorated_key& key,
const rows_entry& update) {
for (auto&& v : views) {
view_info& vf = *v->view_info();
// A view whose primary key contains only the base's primary key columns doesn't require a read-before-write.
// However, if the view has restrictions on regular columns, then a write that doesn't match those filters
// needs to add a tombstone (assuming a previous update matched those filter and created a view entry); for
// now we just do a read-before-write in that case.
if (!vf.base_non_pk_column_in_view_pk(base)
&& vf.select_statement().get_restrictions()->get_non_pk_restriction().empty()) {
continue;
}
if (may_be_affected_by(base, vf, key, update)) {
return true;
}
}
return false;
}
bool matches_view_filter(const schema& base, const view_info& view, const partition_key& key, const clustering_row& update, gc_clock::time_point now) {
return clustering_prefix_matches(base, view, key, update.key())
&& boost::algorithm::all_of(
@@ -629,6 +654,64 @@ future<std::vector<mutation>> generate_view_updates(
return f.finally([builder = std::move(builder)] { });
}
query::clustering_row_ranges calculate_affected_clustering_ranges(const schema& base,
const dht::decorated_key& key,
const mutation_partition& mp,
const std::vector<view_ptr>& views) {
std::vector<nonwrapping_range<clustering_key_prefix_view>> row_ranges;
std::vector<nonwrapping_range<clustering_key_prefix_view>> view_row_ranges;
clustering_key_prefix_view::tri_compare cmp(base);
if (mp.partition_tombstone() || !mp.row_tombstones().empty()) {
for (auto&& v : views) {
// FIXME: #2371
if (v->view_info()->select_statement().get_restrictions()->has_unrestricted_clustering_columns()) {
view_row_ranges.push_back(nonwrapping_range<clustering_key_prefix_view>::make_open_ended_both_sides());
break;
}
for (auto&& r : v->view_info()->partition_slice().default_row_ranges()) {
view_row_ranges.push_back(r.transform(std::mem_fn(&clustering_key_prefix::view)));
}
}
}
if (mp.partition_tombstone()) {
std::swap(row_ranges, view_row_ranges);
} else {
// FIXME: Optimize, as most often than not clustering keys will not be restricted.
for (auto&& rt : mp.row_tombstones()) {
nonwrapping_range<clustering_key_prefix_view> rtr(
bound_view::to_range_bound<nonwrapping_range>(rt.start_bound()),
bound_view::to_range_bound<nonwrapping_range>(rt.end_bound()));
for (auto&& vr : view_row_ranges) {
auto overlap = rtr.intersection(vr, cmp);
if (overlap) {
row_ranges.push_back(std::move(overlap).value());
}
}
}
}
for (auto&& row : mp.clustered_rows()) {
if (update_requires_read_before_write(base, views, key, row)) {
row_ranges.emplace_back(row.key());
}
}
// Note that the views could have restrictions on regular columns,
// but even if that's the case we shouldn't apply those when we read,
// because even if an existing row doesn't match the view filter, the
// update can change that in which case we'll need to know the existing
// content, in case the view includes a column that is not included in
// this mutation.
//FIXME: Unfortunate copy.
return boost::copy_range<query::clustering_row_ranges>(
nonwrapping_range<clustering_key_prefix_view>::deoverlap(std::move(row_ranges), cmp)
| boost::adaptors::transformed([] (auto&& v) {
return std::move(v).transform([] (auto&& ckv) { return clustering_key_prefix(ckv); });
}));
}
// Calculate the node ("natural endpoint") to which this node should send
// a view update.
//

View File

@@ -85,6 +85,12 @@ future<std::vector<mutation>> generate_view_updates(
streamed_mutation&& updates,
streamed_mutation&& existings);
query::clustering_row_ranges calculate_affected_clustering_ranges(
const schema& base,
const dht::decorated_key& key,
const mutation_partition& mp,
const std::vector<view_ptr>& views);
void mutate_MV(const dht::token& base_token,
std::vector<mutation> mutations);