mutation_partion: Use row_tombstone
This patch replaces the current row tombstone representation by a row_tombstone. The intent of the patch is thus to reify the idea of shadowable tombstones, that up until now we considered all materialized view row tombstones to be. We need to distinguish shadowable from non-shadowable row tombstones to support scenarios such as, when inserting to a table with a materialzied view: 1. insert into base (p, v1, v2) values (3, 1, 3) using timestamp 1 2. delete from base using timestamp 2 where p = 3 3. insert into base (p, v1) values (3, 1) using timestamp 3 These should yield a view row where v2 is definitely null, but with the current implementation, v2 will pop back with its value v2=3@TS=1, even though its dead in the base row. This is because the row tombstone inserted at 2) is a shadowable one. This patch only addresses the memory representation of such row_tombstones. Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
@@ -94,7 +94,7 @@ public:
|
||||
_p.apply_row_tombstone(_p_schema, rt);
|
||||
}
|
||||
|
||||
virtual void accept_row(clustering_key_view key, tombstone deleted_at, const row_marker& rm) override {
|
||||
virtual void accept_row(clustering_key_view key, const row_tombstone& deleted_at, const row_marker& rm) override {
|
||||
deletable_row& r = _p.clustered_row(_p_schema, key);
|
||||
r.apply(rm);
|
||||
r.apply(deleted_at);
|
||||
|
||||
@@ -233,7 +233,7 @@ void batch_statement::verify_batch_size(const std::vector<mutation>& mutations)
|
||||
size += v.data.size();
|
||||
}
|
||||
void accept_row_tombstone(const range_tombstone&) override {}
|
||||
void accept_row(clustering_key_view, tombstone, const row_marker&) override {}
|
||||
void accept_row(clustering_key_view, const row_tombstone&, const row_marker&) override {}
|
||||
void accept_row_cell(column_id, atomic_cell_view v) override {
|
||||
size += v.value().size();
|
||||
}
|
||||
|
||||
@@ -355,7 +355,7 @@ void view_updates::do_delete_old_entry(const partition_key& base_key, const clus
|
||||
static_pointer_cast<const collection_type_impl>(def->type)->for_each_cell(cell.as_collection_mutation(), set_max_ts);
|
||||
}
|
||||
});
|
||||
get_view_row(base_key, existing).apply(tombstone(ts, now));
|
||||
get_view_row(base_key, existing).apply(shadowable_tombstone(ts, now));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -528,12 +528,12 @@ void view_update_builder::generate_update(clustering_row&& update, stdx::optiona
|
||||
// We allow existing to be disengaged, which we treat the same as an empty row.
|
||||
if (existing) {
|
||||
existing->marker().compact_and_expire(tombstone(), _now, always_gc, gc_before);
|
||||
existing->cells().compact_and_expire(*_schema, column_kind::regular_column, tombstone(), _now, always_gc, gc_before);
|
||||
existing->cells().compact_and_expire(*_schema, column_kind::regular_column, row_tombstone(), _now, always_gc, gc_before);
|
||||
update.apply(*_schema, *existing);
|
||||
}
|
||||
|
||||
update.marker().compact_and_expire(tombstone(), _now, always_gc, gc_before);
|
||||
update.cells().compact_and_expire(*_schema, column_kind::regular_column, tombstone(), _now, always_gc, gc_before);
|
||||
update.cells().compact_and_expire(*_schema, column_kind::regular_column, row_tombstone(), _now, always_gc, gc_before);
|
||||
|
||||
for (auto&& v : _view_updates) {
|
||||
v.generate_update(_updates.key(), update, existing, _now);
|
||||
@@ -558,7 +558,7 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
apply_tracked_tombstones(_update_tombstone_tracker, update);
|
||||
auto tombstone = _existing_tombstone_tracker.current_tombstone();
|
||||
auto existing = tombstone
|
||||
? stdx::optional<clustering_row>(stdx::in_place, update.key(), std::move(tombstone), row_marker(), ::row())
|
||||
? stdx::optional<clustering_row>(stdx::in_place, update.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row())
|
||||
: stdx::nullopt;
|
||||
generate_update(std::move(update), std::move(existing));
|
||||
}
|
||||
@@ -577,7 +577,7 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
// tombstone, since we wouldn't have read the existing row otherwise. We don't assert that in case the
|
||||
// read method ever changes.
|
||||
if (tombstone) {
|
||||
auto update = clustering_row(existing.key(), std::move(tombstone), row_marker(), ::row());
|
||||
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
|
||||
generate_update(std::move(update), { std::move(existing) });
|
||||
}
|
||||
}
|
||||
@@ -602,7 +602,7 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
// We don't care if it's a range tombstone, as we're only looking for existing entries that get deleted
|
||||
if (!_existing->is_range_tombstone()) {
|
||||
auto& existing = _existing->as_clustering_row();
|
||||
auto update = clustering_row(existing.key(), std::move(tombstone), row_marker(), ::row());
|
||||
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
|
||||
generate_update(std::move(update), { std::move(existing) });
|
||||
}
|
||||
return advance_existings();
|
||||
|
||||
@@ -63,7 +63,7 @@ public:
|
||||
rt.feed_hash(_h, _s);
|
||||
}
|
||||
|
||||
virtual void accept_row(clustering_key_view key, tombstone deleted_at, const row_marker& rm) {
|
||||
virtual void accept_row(clustering_key_view key, const row_tombstone& deleted_at, const row_marker& rm) {
|
||||
key.feed_hash(_h, _s);
|
||||
feed_hash(_h, deleted_at);
|
||||
feed_hash(_h, rm);
|
||||
|
||||
@@ -103,6 +103,7 @@ class deletable_row stub [[writable]] {
|
||||
boost::variant<live_marker, expiring_marker, dead_marker, no_marker> marker;
|
||||
tombstone deleted_at;
|
||||
row cells;
|
||||
tombstone shadowable_deleted_at [[version 1.8]] = deleted_at;
|
||||
};
|
||||
|
||||
enum class bound_kind : uint8_t {
|
||||
|
||||
@@ -104,6 +104,10 @@ private:
|
||||
return t.deletion_time < _gc_before && can_gc(t);
|
||||
};
|
||||
|
||||
bool can_purge_tombstone(const row_tombstone& t) {
|
||||
return t.max_deletion_time() < _gc_before && can_gc(t.tomb());
|
||||
};
|
||||
|
||||
bool can_gc(tombstone t) {
|
||||
if (!sstable_compaction()) {
|
||||
return true;
|
||||
@@ -172,7 +176,7 @@ public:
|
||||
stop_iteration consume(static_row&& sr) {
|
||||
auto current_tombstone = _range_tombstones.get_partition_tombstone();
|
||||
bool is_live = sr.cells().compact_and_expire(_schema, column_kind::static_column,
|
||||
current_tombstone,
|
||||
row_tombstone(current_tombstone),
|
||||
_query_time, _can_gc, _gc_before);
|
||||
_static_row_live = is_live;
|
||||
if (is_live || (!only_live() && !sr.empty())) {
|
||||
@@ -184,12 +188,12 @@ public:
|
||||
|
||||
stop_iteration consume(clustering_row&& cr) {
|
||||
auto current_tombstone = _range_tombstones.tombstone_for_row(cr.key());
|
||||
auto t = current_tombstone;
|
||||
t.apply(cr.tomb());
|
||||
if (cr.tomb() <= current_tombstone || can_purge_tombstone(cr.tomb())) {
|
||||
auto t = cr.tomb();
|
||||
if (t.tomb() <= current_tombstone || can_purge_tombstone(t)) {
|
||||
cr.remove_tombstone();
|
||||
}
|
||||
bool is_live = cr.marker().compact_and_expire(t, _query_time, _can_gc, _gc_before);
|
||||
t.apply(current_tombstone);
|
||||
bool is_live = cr.marker().compact_and_expire(t.tomb(), _query_time, _can_gc, _gc_before);
|
||||
is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before);
|
||||
if (only_live() && is_live) {
|
||||
partition_is_not_empty();
|
||||
|
||||
@@ -387,22 +387,22 @@ mutation_partition::range_tombstone_for_row(const schema& schema, const clusteri
|
||||
return t;
|
||||
}
|
||||
|
||||
tombstone
|
||||
row_tombstone
|
||||
mutation_partition::tombstone_for_row(const schema& schema, const clustering_key& key) const {
|
||||
tombstone t = range_tombstone_for_row(schema, key);
|
||||
row_tombstone t = row_tombstone(range_tombstone_for_row(schema, key));
|
||||
|
||||
auto j = _rows.find(key, rows_entry::compare(schema));
|
||||
if (j != _rows.end()) {
|
||||
t.apply(j->row().deleted_at());
|
||||
t.apply(j->row().deleted_at(), j->row().marker());
|
||||
}
|
||||
|
||||
return t;
|
||||
}
|
||||
|
||||
tombstone
|
||||
row_tombstone
|
||||
mutation_partition::tombstone_for_row(const schema& schema, const rows_entry& e) const {
|
||||
tombstone t = range_tombstone_for_row(schema, e.key());
|
||||
t.apply(e.row().deleted_at());
|
||||
row_tombstone t = e.row().deleted_at();
|
||||
t.apply(range_tombstone_for_row(schema, e.key()));
|
||||
return t;
|
||||
}
|
||||
|
||||
@@ -754,7 +754,7 @@ mutation_partition::query_compacted(query::result::partition_writer& pw, const s
|
||||
e.key().feed_hash(pw.digest(), s);
|
||||
::feed_hash(pw.digest(), row_tombstone);
|
||||
auto t = hash_row_slice(pw.digest(), s, column_kind::regular_column, row.cells(), slice.regular_columns);
|
||||
pw.last_modified() = std::max({pw.last_modified(), row_tombstone.timestamp, t});
|
||||
pw.last_modified() = std::max({pw.last_modified(), row_tombstone.tomb().timestamp, t});
|
||||
}
|
||||
|
||||
if (row.is_live(s)) {
|
||||
@@ -882,11 +882,8 @@ deletable_row::equal(column_kind kind, const schema& s, const deletable_row& oth
|
||||
|
||||
void deletable_row::apply_reversibly(const schema& s, deletable_row& src) {
|
||||
_cells.apply_reversibly(s, column_kind::regular_column, src._cells);
|
||||
_deleted_at.apply_reversibly(src._deleted_at); // noexcept
|
||||
_marker.apply_reversibly(src._marker); // noexcept
|
||||
if (row_tombstone_is_shadowed(s, _deleted_at, _marker)) {
|
||||
remove_tombstone();
|
||||
}
|
||||
_deleted_at.apply_reversibly(src._deleted_at, _marker); // noexcept
|
||||
}
|
||||
|
||||
void deletable_row::revert(const schema& s, deletable_row& src) {
|
||||
@@ -1196,8 +1193,11 @@ uint32_t mutation_partition::do_compact(const schema& s,
|
||||
auto should_purge_tombstone = [&] (const tombstone& t) {
|
||||
return t.deletion_time < gc_before && can_gc(t);
|
||||
};
|
||||
auto should_purge_row_tombstone = [&] (const row_tombstone& t) {
|
||||
return t.max_deletion_time() < gc_before && can_gc(t.tomb());
|
||||
};
|
||||
|
||||
bool static_row_live = _static_row.compact_and_expire(s, column_kind::static_column, _tombstone,
|
||||
bool static_row_live = _static_row.compact_and_expire(s, column_kind::static_column, row_tombstone(_tombstone),
|
||||
query_time, can_gc, gc_before);
|
||||
|
||||
uint32_t row_count = 0;
|
||||
@@ -1205,12 +1205,12 @@ uint32_t mutation_partition::do_compact(const schema& s,
|
||||
auto row_callback = [&] (rows_entry& e) {
|
||||
deletable_row& row = e.row();
|
||||
|
||||
tombstone tomb = tombstone_for_row(s, e);
|
||||
row_tombstone tomb = tombstone_for_row(s, e);
|
||||
|
||||
bool is_live = row.cells().compact_and_expire(s, column_kind::regular_column, tomb, query_time, can_gc, gc_before);
|
||||
is_live |= row.marker().compact_and_expire(tomb, query_time, can_gc, gc_before);
|
||||
is_live |= row.marker().compact_and_expire(tomb.tomb(), query_time, can_gc, gc_before);
|
||||
|
||||
if (should_purge_tombstone(row.deleted_at())) {
|
||||
if (should_purge_row_tombstone(row.deleted_at())) {
|
||||
row.remove_tombstone();
|
||||
}
|
||||
|
||||
@@ -1288,7 +1288,7 @@ deletable_row::is_live(const schema& s, tombstone base_tombstone, gc_clock::time
|
||||
// created with the 'insert' statement. If row marker is live, we know the
|
||||
// row is live. Otherwise, a row is considered live if it has any cell
|
||||
// which is live.
|
||||
base_tombstone.apply(_deleted_at);
|
||||
base_tombstone.apply(_deleted_at.tomb());
|
||||
return _marker.is_live(base_tombstone, query_time)
|
||||
|| has_any_live_data(s, column_kind::regular_column, _cells, base_tombstone, query_time);
|
||||
}
|
||||
@@ -1525,7 +1525,7 @@ void row::revert(const schema& s, column_kind kind, row& other) noexcept {
|
||||
});
|
||||
}
|
||||
|
||||
bool row::compact_and_expire(const schema& s, column_kind kind, tombstone tomb, gc_clock::time_point query_time,
|
||||
bool row::compact_and_expire(const schema& s, column_kind kind, row_tombstone tomb, gc_clock::time_point query_time,
|
||||
can_gc_fn& can_gc, gc_clock::time_point gc_before)
|
||||
{
|
||||
bool any_live = false;
|
||||
@@ -1538,7 +1538,7 @@ bool row::compact_and_expire(const schema& s, column_kind kind, tombstone tomb,
|
||||
return cell.deletion_time() < gc_before && can_gc(tombstone(cell.timestamp(), cell.deletion_time()));
|
||||
};
|
||||
|
||||
if (cell.is_covered_by(tomb, def.is_counter())) {
|
||||
if (cell.is_covered_by(tomb.regular(), def.is_counter())) {
|
||||
erase = true;
|
||||
} else if (cell.has_expired(query_time)) {
|
||||
erase = can_erase_cell();
|
||||
@@ -1547,8 +1547,10 @@ bool row::compact_and_expire(const schema& s, column_kind kind, tombstone tomb,
|
||||
}
|
||||
} else if (!cell.is_live()) {
|
||||
erase = can_erase_cell();
|
||||
} else if (cell.is_covered_by(tomb.shadowable().tomb(), def.is_counter())) {
|
||||
erase = true;
|
||||
} else {
|
||||
any_live |= true;
|
||||
any_live = true;
|
||||
}
|
||||
} else {
|
||||
auto&& cell = c.as_collection_mutation();
|
||||
@@ -1556,7 +1558,7 @@ bool row::compact_and_expire(const schema& s, column_kind kind, tombstone tomb,
|
||||
auto m_view = ctype->deserialize_mutation_form(cell);
|
||||
collection_type_impl::mutation m = m_view.materialize();
|
||||
any_live |= m.compact_and_expire(tomb, query_time, can_gc, gc_before);
|
||||
if (m.cells.empty() && m.tomb <= tomb) {
|
||||
if (m.cells.empty() && m.tomb <= tomb.tomb()) {
|
||||
erase = true;
|
||||
} else {
|
||||
c = ctype->serialize_mutation_form(m);
|
||||
@@ -1709,7 +1711,7 @@ public:
|
||||
// Requires that sr.has_any_live_data()
|
||||
stop_iteration consume(static_row&& sr, tombstone current_tombstone);
|
||||
// Requires that cr.has_any_live_data()
|
||||
stop_iteration consume(clustering_row&& cr, tombstone current_tombstone);
|
||||
stop_iteration consume(clustering_row&& cr, row_tombstone current_tombstone);
|
||||
stop_iteration consume(range_tombstone&&) { return stop_iteration::no; }
|
||||
uint32_t consume_end_of_stream();
|
||||
};
|
||||
@@ -1764,7 +1766,7 @@ void mutation_querier::prepare_writers() {
|
||||
}
|
||||
}
|
||||
|
||||
stop_iteration mutation_querier::consume(clustering_row&& cr, tombstone current_tombstone) {
|
||||
stop_iteration mutation_querier::consume(clustering_row&& cr, row_tombstone current_tombstone) {
|
||||
prepare_writers();
|
||||
|
||||
const query::partition_slice& slice = _pw.slice();
|
||||
@@ -1773,7 +1775,7 @@ stop_iteration mutation_querier::consume(clustering_row&& cr, tombstone current_
|
||||
cr.key().feed_hash(_pw.digest(), _schema);
|
||||
::feed_hash(_pw.digest(), current_tombstone);
|
||||
auto t = hash_row_slice(_pw.digest(), _schema, column_kind::regular_column, cr.cells(), slice.regular_columns);
|
||||
_pw.last_modified() = std::max({_pw.last_modified(), current_tombstone.timestamp, t});
|
||||
_pw.last_modified() = std::max({_pw.last_modified(), current_tombstone.tomb().timestamp, t});
|
||||
}
|
||||
|
||||
auto write_row = [&] (auto& rows_writer) {
|
||||
@@ -1850,7 +1852,7 @@ public:
|
||||
_stop = _mutation_consumer->consume(std::move(sr), t) && _short_read_allowed;
|
||||
return _stop;
|
||||
}
|
||||
stop_iteration consume(clustering_row&& cr, tombstone t, bool) {
|
||||
stop_iteration consume(clustering_row&& cr, row_tombstone t, bool) {
|
||||
_stop = _mutation_consumer->consume(std::move(cr), t) && _short_read_allowed;
|
||||
return _stop;
|
||||
}
|
||||
@@ -1937,7 +1939,7 @@ public:
|
||||
_memory_accounter.update(sr.memory_usage());
|
||||
return _mutation_consumer->consume(std::move(sr));
|
||||
}
|
||||
stop_iteration consume(clustering_row&& cr, tombstone, bool is_alive) {
|
||||
stop_iteration consume(clustering_row&& cr, row_tombstone, bool is_alive) {
|
||||
_live_rows += is_alive;
|
||||
auto stop = _memory_accounter.update_and_check(cr.memory_usage());
|
||||
if (is_alive) {
|
||||
@@ -2020,10 +2022,6 @@ mutation_query(schema_ptr s,
|
||||
row_limit, partition_limit, query_time, std::move(accounter), std::move(trace_ptr));
|
||||
}
|
||||
|
||||
bool row_tombstone_is_shadowed(const schema& schema, const tombstone& row_tombstone, const row_marker& marker) {
|
||||
return schema.is_view() && marker.timestamp() > row_tombstone.timestamp;
|
||||
}
|
||||
|
||||
deletable_row::deletable_row(clustering_row&& cr)
|
||||
: _deleted_at(cr.tomb())
|
||||
, _marker(std::move(cr.marker()))
|
||||
@@ -2043,7 +2041,7 @@ public:
|
||||
_mutation->partition().static_row() = std::move(sr.cells());
|
||||
return stop_iteration::no;
|
||||
}
|
||||
stop_iteration consume(clustering_row&& cr, tombstone, bool) {
|
||||
stop_iteration consume(clustering_row&& cr, row_tombstone, bool) {
|
||||
_mutation->partition().insert_row(_schema, cr.key(), deletable_row(std::move(cr)));
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
@@ -267,7 +267,7 @@ public:
|
||||
// Expires cells based on query_time. Expires tombstones based on gc_before
|
||||
// and max_purgeable. Removes cells covered by tomb.
|
||||
// Returns true iff there are any live cells left.
|
||||
bool compact_and_expire(const schema& s, column_kind kind, tombstone tomb, gc_clock::time_point query_time,
|
||||
bool compact_and_expire(const schema& s, column_kind kind, row_tombstone tomb, gc_clock::time_point query_time,
|
||||
can_gc_fn&, gc_clock::time_point gc_before);
|
||||
|
||||
row difference(const schema&, column_kind, const row& other) const;
|
||||
@@ -454,7 +454,7 @@ public:
|
||||
|
||||
void maybe_shadow(tombstone t, row_marker marker) noexcept {
|
||||
if (is_shadowed_by(marker)) {
|
||||
_tomb = t;
|
||||
_tomb = std::move(t);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -592,7 +592,7 @@ struct appending_hash<row_tombstone> {
|
||||
};
|
||||
|
||||
class deletable_row final {
|
||||
tombstone _deleted_at;
|
||||
row_tombstone _deleted_at;
|
||||
row_marker _marker;
|
||||
row _cells;
|
||||
public:
|
||||
@@ -603,12 +603,21 @@ public:
|
||||
_deleted_at.apply(deleted_at);
|
||||
}
|
||||
|
||||
void apply(shadowable_tombstone deleted_at) {
|
||||
_deleted_at.apply(deleted_at, _marker);
|
||||
}
|
||||
|
||||
void apply(row_tombstone deleted_at) {
|
||||
_deleted_at.apply(deleted_at, _marker);
|
||||
}
|
||||
|
||||
void apply(const row_marker& rm) {
|
||||
_marker.apply(rm);
|
||||
_deleted_at.maybe_shadow(_marker);
|
||||
}
|
||||
|
||||
void remove_tombstone() {
|
||||
_deleted_at = tombstone();
|
||||
_deleted_at = {};
|
||||
}
|
||||
|
||||
// See reversibly_mergeable.hh
|
||||
@@ -616,7 +625,7 @@ public:
|
||||
// See reversibly_mergeable.hh
|
||||
void revert(const schema& s, deletable_row& src);
|
||||
public:
|
||||
tombstone deleted_at() const { return _deleted_at; }
|
||||
row_tombstone deleted_at() const { return _deleted_at; }
|
||||
api::timestamp_type created_at() const { return _marker.timestamp(); }
|
||||
row_marker& marker() { return _marker; }
|
||||
const row_marker& marker() const { return _marker; }
|
||||
@@ -664,7 +673,7 @@ public:
|
||||
const deletable_row& row() const {
|
||||
return _row;
|
||||
}
|
||||
void apply(tombstone t) {
|
||||
void apply(row_tombstone t) {
|
||||
_row.apply(t);
|
||||
}
|
||||
// See reversibly_mergeable.hh
|
||||
@@ -876,8 +885,8 @@ public:
|
||||
range_tombstone_list& row_tombstones() { return _row_tombstones; }
|
||||
const row* find_row(const schema& s, const clustering_key& key) const;
|
||||
tombstone range_tombstone_for_row(const schema& schema, const clustering_key& key) const;
|
||||
tombstone tombstone_for_row(const schema& schema, const clustering_key& key) const;
|
||||
tombstone tombstone_for_row(const schema& schema, const rows_entry& e) const;
|
||||
row_tombstone tombstone_for_row(const schema& schema, const clustering_key& key) const;
|
||||
row_tombstone tombstone_for_row(const schema& schema, const rows_entry& e) const;
|
||||
boost::iterator_range<rows_type::const_iterator> range(const schema& schema, const query::clustering_range& r) const;
|
||||
rows_type::const_iterator lower_bound(const schema& schema, const query::clustering_range& r) const;
|
||||
rows_type::const_iterator upper_bound(const schema& schema, const query::clustering_range& r) const;
|
||||
@@ -907,15 +916,3 @@ private:
|
||||
void for_each_row(const schema& schema, const query::clustering_range& row_range, bool reversed, Func&& func) const;
|
||||
friend class counter_write_query_result_builder;
|
||||
};
|
||||
|
||||
// A shadowable row tombstone is valid only if the row has no live marker. In other words,
|
||||
// the row tombstone is only valid as long as no newer insert is done (thus setting a
|
||||
// live row marker; note that if the row timestamp set is lower than the tombstone's,
|
||||
// then the tombstone remains in effect as usual). If a row has a shadowable tombstone
|
||||
// with timestamp Ti and that row is updated with a timestamp Tj, such that Tj > Ti
|
||||
// (and that update sets the row marker), then the shadowable tombstone is shadowed by
|
||||
// that update. A concrete consequence is that if the update has cells with timestamp
|
||||
// lower than Ti, then those cells are preserved (since the deletion is removed), and
|
||||
// this is contrary to a regular, non-shadowable row tombstone where the tombstone is
|
||||
// preserved and such cells are removed.
|
||||
bool row_tombstone_is_shadowed(const schema& schema, const tombstone& row_tombstone, const row_marker& marker);
|
||||
|
||||
@@ -50,7 +50,7 @@ public:
|
||||
_p.apply_row_tombstone(_schema, rt);
|
||||
}
|
||||
|
||||
virtual void accept_row(clustering_key_view key, tombstone deleted_at, const row_marker& rm) override {
|
||||
virtual void accept_row(clustering_key_view key, const row_tombstone& deleted_at, const row_marker& rm) override {
|
||||
deletable_row& r = _p.clustered_row(_schema, key);
|
||||
r.apply(rm);
|
||||
r.apply(deleted_at);
|
||||
|
||||
@@ -188,6 +188,20 @@ static void write_tombstones(const schema& s, RowTombstones& row_tombstones, con
|
||||
}
|
||||
}
|
||||
|
||||
template<typename Writer>
|
||||
static auto write_tombstone(Writer&& writer, const tombstone& t) {
|
||||
return std::move(writer).write_timestamp(t.timestamp).write_deletion_time(t.deletion_time);
|
||||
}
|
||||
|
||||
template<typename Writer>
|
||||
static auto write_row(Writer&& writer, const schema& s, const clustering_key_prefix& key, const row& cells, const row_marker& m, const row_tombstone& t) {
|
||||
auto marker_writer = std::move(writer).write_key(key);
|
||||
auto deleted_at_writer = write_row_marker(std::move(marker_writer), m).start_deleted_at();
|
||||
auto row_writer = write_tombstone(std::move(deleted_at_writer), t.regular()).end_deleted_at().start_cells();
|
||||
auto shadowable_deleted_at_writer = write_row_cells(std::move(row_writer), cells, s, column_kind::regular_column).end_cells().start_shadowable_deleted_at();
|
||||
return write_tombstone(std::move(shadowable_deleted_at_writer), t.shadowable().tomb()).end_shadowable_deleted_at().end_deletable_row();
|
||||
}
|
||||
|
||||
template<typename Writer>
|
||||
void mutation_partition_serializer::write_serialized(Writer&& writer, const schema& s, const mutation_partition& mp)
|
||||
{
|
||||
@@ -196,15 +210,7 @@ void mutation_partition_serializer::write_serialized(Writer&& writer, const sche
|
||||
write_tombstones(s, row_tombstones, mp.row_tombstones());
|
||||
auto clustering_rows = std::move(row_tombstones).end_range_tombstones().start_rows();
|
||||
for (auto&& cr : mp.clustered_rows()) {
|
||||
auto marker_writer = clustering_rows.add().write_key(cr.key());
|
||||
auto deleted_at_writer = write_row_marker(std::move(marker_writer), cr.row().marker());
|
||||
auto&& dt = cr.row().deleted_at();
|
||||
auto row_writer = std::move(deleted_at_writer).start_deleted_at()
|
||||
.write_timestamp(dt.timestamp)
|
||||
.write_deletion_time(dt.deletion_time)
|
||||
.end_deleted_at()
|
||||
.start_cells();
|
||||
write_row_cells(std::move(row_writer), cr.row().cells(), s, column_kind::regular_column).end_cells().end_deletable_row();
|
||||
write_row(clustering_rows.add(), s, cr.key(), cr.row().cells(), cr.row().marker(), cr.row().deleted_at());
|
||||
}
|
||||
std::move(clustering_rows).end_rows().end_mutation_partition();
|
||||
}
|
||||
@@ -243,15 +249,7 @@ void serialize_mutation_fragments(const schema& s, tombstone partition_tombstone
|
||||
auto clustering_rows = std::move(row_tombstones).end_range_tombstones().start_rows();
|
||||
while (!crs.empty()) {
|
||||
auto& cr = crs.front();
|
||||
auto marker_writer = clustering_rows.add().write_key(cr.key());
|
||||
auto deleted_at_writer = write_row_marker(std::move(marker_writer), cr.marker());
|
||||
auto&& dt = cr.tomb();
|
||||
auto row_writer = std::move(deleted_at_writer).start_deleted_at()
|
||||
.write_timestamp(dt.timestamp)
|
||||
.write_deletion_time(dt.deletion_time)
|
||||
.end_deleted_at()
|
||||
.start_cells();
|
||||
write_row_cells(std::move(row_writer), cr.cells(), s, column_kind::regular_column).end_cells().end_deletable_row();
|
||||
write_row(clustering_rows.add(), s, cr.key(), cr.cells(), cr.marker(), cr.tomb());
|
||||
crs.pop_front();
|
||||
}
|
||||
std::move(clustering_rows).end_rows().end_mutation_partition();
|
||||
|
||||
@@ -209,7 +209,8 @@ mutation_partition_view::accept(const column_mapping& cm, mutation_partition_vis
|
||||
}
|
||||
|
||||
for (auto&& cr : mpv.rows()) {
|
||||
visitor.accept_row(cr.key(), cr.deleted_at(), read_row_marker(cr.marker()));
|
||||
auto t = row_tombstone(cr.deleted_at(), shadowable_tombstone(cr.shadowable_deleted_at()));
|
||||
visitor.accept_row(cr.key(), t, read_row_marker(cr.marker()));
|
||||
|
||||
struct cell_visitor {
|
||||
mutation_partition_visitor& _visitor;
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include "keys.hh"
|
||||
|
||||
class row_marker;
|
||||
class row_tombstone;
|
||||
|
||||
// Guarantees:
|
||||
//
|
||||
@@ -55,7 +56,7 @@ public:
|
||||
|
||||
virtual void accept_row_tombstone(const range_tombstone&) = 0;
|
||||
|
||||
virtual void accept_row(clustering_key_view key, tombstone deleted_at, const row_marker& rm) = 0;
|
||||
virtual void accept_row(clustering_key_view key, const row_tombstone& deleted_at, const row_marker& rm) = 0;
|
||||
|
||||
virtual void accept_row_cell(column_id id, atomic_cell_view) = 0;
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ public:
|
||||
_partition.apply_row_tombstone(_schema, rt);
|
||||
}
|
||||
|
||||
virtual void accept_row(clustering_key_view key, tombstone deleted_at, const row_marker& rm) override {
|
||||
virtual void accept_row(clustering_key_view key, const row_tombstone& deleted_at, const row_marker& rm) override {
|
||||
deletable_row& r = _partition.clustered_row(_schema, key);
|
||||
r.apply(rm);
|
||||
r.apply(deleted_at);
|
||||
|
||||
@@ -2031,7 +2031,7 @@ private:
|
||||
virtual void accept_static_cell(column_id, atomic_cell_view) override { }
|
||||
virtual void accept_static_cell(column_id, collection_mutation_view) override { }
|
||||
virtual void accept_row_tombstone(const range_tombstone&) override { }
|
||||
virtual void accept_row(clustering_key_view key, tombstone, const row_marker&) override {
|
||||
virtual void accept_row(clustering_key_view key, const row_tombstone&, const row_marker&) override {
|
||||
if (!_is_reversed || !_last_ck) {
|
||||
_last_ck = clustering_key(key);
|
||||
}
|
||||
|
||||
@@ -158,7 +158,7 @@ public:
|
||||
|
||||
void consume(tombstone t) { _writer->consume(t); }
|
||||
stop_iteration consume(static_row&& sr, tombstone, bool) { return _writer->consume(std::move(sr)); }
|
||||
stop_iteration consume(clustering_row&& cr, tombstone, bool) { return _writer->consume(std::move(cr)); }
|
||||
stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { return _writer->consume(std::move(cr)); }
|
||||
stop_iteration consume(range_tombstone&& rt) { return _writer->consume(std::move(rt)); }
|
||||
|
||||
stop_iteration consume_end_of_partition();
|
||||
|
||||
@@ -1535,7 +1535,8 @@ void sstable::write_clustered_row(file_writer& out, const schema& schema, const
|
||||
// Before writing cells, range tombstone must be written if the row has any (deletable_row::t).
|
||||
if (clustered_row.tomb()) {
|
||||
maybe_flush_pi_block(out, clustering_key, {});
|
||||
write_range_tombstone(out, clustering_key, clustering_key, {}, clustered_row.tomb());
|
||||
//FIXME: Write a row_tombstone
|
||||
write_range_tombstone(out, clustering_key, clustering_key, {}, clustered_row.tomb().tomb());
|
||||
// Because we currently may break a partition to promoted-index blocks
|
||||
// in the middle of a clustered row, we also need to track the current
|
||||
// row's tombstone - not just range tombstones - which may effect the
|
||||
@@ -1544,7 +1545,7 @@ void sstable::write_clustered_row(file_writer& out, const schema& schema, const
|
||||
// following code can be dropped:
|
||||
_pi_write.tombstone_accumulator->apply(range_tombstone(
|
||||
clustered_row.key(), bound_kind::incl_start,
|
||||
clustered_row.key(), bound_kind::incl_end, clustered_row.tomb()));
|
||||
clustered_row.key(), bound_kind::incl_end, clustered_row.tomb().tomb()));
|
||||
}
|
||||
|
||||
if (schema.clustering_key_size()) {
|
||||
|
||||
@@ -42,23 +42,25 @@ class position_in_partition_view;
|
||||
|
||||
class clustering_row {
|
||||
clustering_key_prefix _ck;
|
||||
tombstone _t;
|
||||
row_tombstone _t;
|
||||
row_marker _marker;
|
||||
row _cells;
|
||||
public:
|
||||
explicit clustering_row(clustering_key_prefix ck) : _ck(std::move(ck)) { }
|
||||
clustering_row(clustering_key_prefix ck, tombstone t, row_marker marker, row cells)
|
||||
: _ck(std::move(ck)), _t(t), _marker(std::move(marker)), _cells(std::move(cells)) { }
|
||||
clustering_row(clustering_key_prefix ck, row_tombstone t, row_marker marker, row cells)
|
||||
: _ck(std::move(ck)), _t(t), _marker(std::move(marker)), _cells(std::move(cells)) {
|
||||
_t.maybe_shadow(marker);
|
||||
}
|
||||
clustering_row(const rows_entry& re)
|
||||
: _ck(re.key()), _t(re.row().deleted_at()), _marker(re.row().marker()), _cells(re.row().cells()) { }
|
||||
: clustering_row(re.key(), re.row().deleted_at(), re.row().marker(), re.row().cells()) { }
|
||||
clustering_row(rows_entry&& re)
|
||||
: _ck(std::move(re.key())), _t(re.row().deleted_at()), _marker(re.row().marker()), _cells(std::move(re.row().cells())) { }
|
||||
: clustering_row(std::move(re.key()), re.row().deleted_at(), re.row().marker(), std::move(re.row().cells())) { }
|
||||
|
||||
clustering_key_prefix& key() { return _ck; }
|
||||
const clustering_key_prefix& key() const { return _ck; }
|
||||
|
||||
tombstone tomb() const { return _t; }
|
||||
void remove_tombstone() { _t = tombstone(); }
|
||||
void remove_tombstone() { _t = {}; }
|
||||
row_tombstone tomb() const { return _t; }
|
||||
|
||||
const row_marker& marker() const { return _marker; }
|
||||
row_marker& marker() { return _marker; }
|
||||
@@ -71,28 +73,32 @@ public:
|
||||
}
|
||||
|
||||
void apply(const schema& s, clustering_row&& cr) {
|
||||
_t.apply(cr._t);
|
||||
_marker.apply(std::move(cr._marker));
|
||||
_t.apply(cr._t, _marker);
|
||||
_cells.apply(s, column_kind::regular_column, std::move(cr._cells));
|
||||
maybe_shadow_deletion(s);
|
||||
}
|
||||
void apply(const schema& s, const clustering_row& cr) {
|
||||
_t.apply(cr._t);
|
||||
_marker.apply(cr._marker);
|
||||
_t.apply(cr._t, _marker);
|
||||
_cells.apply(s, column_kind::regular_column, cr._cells);
|
||||
maybe_shadow_deletion(s);
|
||||
}
|
||||
void set_cell(const column_definition& def, atomic_cell_or_collection&& value) {
|
||||
_cells.apply(def, std::move(value));
|
||||
}
|
||||
void apply(row_marker rm) { _marker.apply(std::move(rm)); }
|
||||
void apply(tombstone t) { _t.apply(t); }
|
||||
|
||||
void apply(row_marker rm) {
|
||||
_marker.apply(std::move(rm));
|
||||
_t.maybe_shadow(_marker);
|
||||
}
|
||||
void apply(tombstone t) {
|
||||
_t.apply(t);
|
||||
}
|
||||
void apply(shadowable_tombstone t) {
|
||||
_t.apply(t, _marker);
|
||||
}
|
||||
void apply(const schema& s, const rows_entry& r) {
|
||||
_t.apply(r.row().deleted_at());
|
||||
_marker.apply(r.row().marker());
|
||||
_t.apply(r.row().deleted_at(), _marker);
|
||||
_cells.apply(s, column_kind::regular_column, r.row().cells());
|
||||
maybe_shadow_deletion(s);
|
||||
}
|
||||
|
||||
position_in_partition_view position() const;
|
||||
@@ -106,13 +112,6 @@ public:
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const clustering_row& row);
|
||||
|
||||
private:
|
||||
void maybe_shadow_deletion(const schema& s) {
|
||||
if (row_tombstone_is_shadowed(s, _t, _marker)) {
|
||||
_t = tombstone();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class static_row {
|
||||
|
||||
@@ -126,20 +126,20 @@ SEASTAR_TEST_CASE(test_multi_level_row_tombstones) {
|
||||
};
|
||||
|
||||
m.partition().apply_row_tombstone(*s, make_prefix({1, 2}), tombstone(9, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 3})), tombstone(9, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 3})), row_tombstone(tombstone(9, ttl)));
|
||||
|
||||
m.partition().apply_row_tombstone(*s, make_prefix({1, 3}), tombstone(8, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 0})), tombstone(9, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 3, 0})), tombstone(8, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 0})), row_tombstone(tombstone(9, ttl)));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 3, 0})), row_tombstone(tombstone(8, ttl)));
|
||||
|
||||
m.partition().apply_row_tombstone(*s, make_prefix({1}), tombstone(11, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 0})), tombstone(11, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 3, 0})), tombstone(11, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 0})), row_tombstone(tombstone(11, ttl)));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 3, 0})), row_tombstone(tombstone(11, ttl)));
|
||||
|
||||
m.partition().apply_row_tombstone(*s, make_prefix({1, 4}), tombstone(6, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 0})), tombstone(11, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 3, 0})), tombstone(11, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 4, 0})), tombstone(11, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 0})), row_tombstone(tombstone(11, ttl)));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 3, 0})), row_tombstone(tombstone(11, ttl)));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 4, 0})), row_tombstone(tombstone(11, ttl)));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
@@ -159,11 +159,11 @@ SEASTAR_TEST_CASE(test_row_tombstone_updates) {
|
||||
m.partition().apply_row_tombstone(*s, c_key1_prefix, tombstone(1, ttl));
|
||||
m.partition().apply_row_tombstone(*s, c_key2_prefix, tombstone(0, ttl));
|
||||
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, c_key1), tombstone(1, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, c_key2), tombstone(0, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, c_key1), row_tombstone(tombstone(1, ttl)));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, c_key2), row_tombstone(tombstone(0, ttl)));
|
||||
|
||||
m.partition().apply_row_tombstone(*s, c_key2_prefix, tombstone(1, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, c_key2), tombstone(1, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, c_key2), row_tombstone(tombstone(1, ttl)));
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
|
||||
@@ -1581,7 +1581,7 @@ SEASTAR_TEST_CASE(datafile_generation_41) {
|
||||
auto& mp = mutation->partition();
|
||||
BOOST_REQUIRE(mp.clustered_rows().calculate_size() == 1);
|
||||
auto c_row = *(mp.clustered_rows().begin());
|
||||
BOOST_REQUIRE(c_row.row().deleted_at() == tomb);
|
||||
BOOST_REQUIRE(c_row.row().deleted_at().tomb() == tomb);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -598,7 +598,7 @@ SEASTAR_TEST_CASE(tombstone_in_tombstone) {
|
||||
BOOST_REQUIRE(rows.calculate_size() == 1);
|
||||
for (auto e : rows) {
|
||||
BOOST_REQUIRE(e.key().equal(*s, make_ckey("aaa", "bbb")));
|
||||
BOOST_REQUIRE(e.row().deleted_at().timestamp == 1459334681244989LL);
|
||||
BOOST_REQUIRE(e.row().deleted_at().tomb().timestamp == 1459334681244989LL);
|
||||
}
|
||||
|
||||
return stop_iteration::no;
|
||||
@@ -780,7 +780,7 @@ SEASTAR_TEST_CASE(tombstone_in_tombstone2) {
|
||||
BOOST_REQUIRE(rows.calculate_size() == 1);
|
||||
for (auto e : rows) {
|
||||
BOOST_REQUIRE(e.key().equal(*s, make_ckey("aaa", "bbb", "ccc")));
|
||||
BOOST_REQUIRE(e.row().deleted_at().timestamp == 1459438519958850LL);
|
||||
BOOST_REQUIRE(e.row().deleted_at().tomb().timestamp == 1459438519958850LL);
|
||||
}
|
||||
|
||||
return stop_iteration::no;
|
||||
|
||||
7
types.cc
7
types.cc
@@ -45,6 +45,7 @@
|
||||
#include <boost/multiprecision/cpp_int.hpp>
|
||||
#include "utils/big_decimal.hh"
|
||||
#include "utils/date.h"
|
||||
#include "mutation_partition.hh"
|
||||
|
||||
template<typename T>
|
||||
sstring time_point_to_string(const T& tp)
|
||||
@@ -2141,11 +2142,11 @@ do_serialize_mutation_form(
|
||||
return collection_mutation{std::move(ret)};
|
||||
}
|
||||
|
||||
bool collection_type_impl::mutation::compact_and_expire(tombstone base_tomb, gc_clock::time_point query_time,
|
||||
bool collection_type_impl::mutation::compact_and_expire(row_tombstone base_tomb, gc_clock::time_point query_time,
|
||||
can_gc_fn& can_gc, gc_clock::time_point gc_before)
|
||||
{
|
||||
bool any_live = false;
|
||||
tomb.apply(base_tomb);
|
||||
tomb.apply(base_tomb.regular());
|
||||
std::vector<std::pair<bytes, atomic_cell>> survivors;
|
||||
for (auto&& name_and_cell : cells) {
|
||||
atomic_cell& cell = name_and_cell.second;
|
||||
@@ -2165,7 +2166,7 @@ bool collection_type_impl::mutation::compact_and_expire(tombstone base_tomb, gc_
|
||||
if (cannot_erase_cell()) {
|
||||
survivors.emplace_back(std::move(name_and_cell));
|
||||
}
|
||||
} else {
|
||||
} else if (!cell.is_covered_by(base_tomb.shadowable().tomb(), false)) {
|
||||
any_live |= true;
|
||||
survivors.emplace_back(std::move(name_and_cell));
|
||||
}
|
||||
|
||||
4
types.hh
4
types.hh
@@ -735,6 +735,8 @@ bool equal(data_type t, bytes_view e1, bytes_view e2) {
|
||||
return t->equal(e1, e2);
|
||||
}
|
||||
|
||||
class row_tombstone;
|
||||
|
||||
class collection_type_impl : public abstract_type {
|
||||
static logging::logger _logger;
|
||||
static thread_local std::unordered_map<data_type, shared_ptr<cql3::cql3_type>> _cql3_type_cache; // initialized on demand
|
||||
@@ -765,7 +767,7 @@ public:
|
||||
std::vector<std::pair<bytes, atomic_cell>> cells;
|
||||
// Expires cells based on query_time. Expires tombstones based on max_purgeable and gc_before.
|
||||
// Removes cells covered by tomb or this->tomb.
|
||||
bool compact_and_expire(tombstone tomb, gc_clock::time_point query_time,
|
||||
bool compact_and_expire(row_tombstone tomb, gc_clock::time_point query_time,
|
||||
can_gc_fn&, gc_clock::time_point gc_before);
|
||||
};
|
||||
struct mutation_view {
|
||||
|
||||
Reference in New Issue
Block a user