partition_snapshot_row_cursor: handle multi-schema snapshots

To support gentle schema upgrades, each version has its own schema.
Currently this facility is unused, and the schema is equal for
all versions in a snapshot. But in upcoming commits this will change.

In the new design, after an entry upgrade, there will be a transitional
period where two versions with different schemas will coexist in a snapshot.
Eventually, these versions will be merged by mutation_cleaner into one
version with the current schema, but until then reads have to merge
multi-schema snapshots on the fly.

This commit implements in the cursor support for per-version schemas.
This commit is contained in:
Michał Chojnowski
2023-02-28 01:49:38 +01:00
parent f4e853b32d
commit 5f68409934

View File

@@ -111,6 +111,7 @@ class partition_snapshot_row_cursor final {
mutation_partition::rows_type::iterator it;
utils::immutable_collection<mutation_partition::rows_type> rows;
int version_no;
const schema* schema;
bool unique_owner = false;
is_continuous continuous = is_continuous::no; // Range continuity in the direction of lower keys (in cursor schema domain).
@@ -202,7 +203,7 @@ class partition_snapshot_row_cursor final {
position_in_version& v = _heap.back();
rows_entry& e = *v.it;
if (_digest_requested) {
e.row().cells().prepare_hash(_schema, column_kind::regular_column);
e.row().cells().prepare_hash(*v.schema, column_kind::regular_column);
}
_dummy &= bool(e.dummy());
_continuous |= bool(v.continuous);
@@ -280,7 +281,7 @@ class partition_snapshot_row_cursor final {
rt = pos->range_tombstone();
}
if (pos) [[likely]] {
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, unique_owner, cont, rt});
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, v.get_schema().get(), unique_owner, cont, rt});
}
} else {
if (_reversed) [[unlikely]] {
@@ -293,7 +294,7 @@ class partition_snapshot_row_cursor final {
_background_continuity = true; // Default continuity past the last entry
}
if (pos) [[likely]] {
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, unique_owner, is_continuous::yes});
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, v.get_schema().get(), unique_owner, is_continuous::yes});
}
}
++version_no;
@@ -469,7 +470,7 @@ public:
}
} else if (match) {
_current_row.insert(_current_row.begin(), position_in_version{
it, std::move(rows), 0, _unique_owner, cont, rt});
it, std::move(rows), 0, _snp.version()->get_schema().get(), _unique_owner, cont, rt});
if (heap_i != _heap.end()) {
_heap.erase(heap_i);
boost::range::make_heap(_heap, heap_less);
@@ -482,7 +483,7 @@ public:
boost::range::make_heap(_heap, heap_less);
} else {
_heap.push_back(position_in_version{
it, std::move(rows), 0, _unique_owner, cont, rt});
it, std::move(rows), 0, _snp.version()->get_schema().get(), _unique_owner, cont, rt});
boost::range::push_heap(_heap, heap_less);
}
}
@@ -556,9 +557,9 @@ public:
clustering_row row() const {
// Note: if the precondition ("cursor is valid and pointing at a row") is fulfilled
// then _current_row is not empty, so the below is valid.
clustering_row cr(key(), deletable_row(_schema, _current_row[0].it->row()));
clustering_row cr(key(), deletable_row(_schema, *_current_row[0].schema, _current_row[0].it->row()));
for (size_t i = 1; i < _current_row.size(); ++i) {
cr.apply(_schema, _current_row[i].it->row());
cr.apply(_schema, *_current_row[i].schema, _current_row[i].it->row());
}
return cr;
}
@@ -571,32 +572,23 @@ public:
// Can be called only when cursor is valid and pointing at a row.
// Monotonic exception guarantees.
template <typename Consumer>
requires std::is_invocable_v<Consumer, deletable_row>
requires std::is_invocable_v<Consumer, deletable_row&&>
void consume_row(Consumer&& consumer) {
for (position_in_version& v : _current_row) {
if (v.unique_owner) {
if (v.unique_owner && (_schema.version() == v.schema->version())) [[likely]] {
consumer(std::move(v.it->row()));
} else {
consumer(deletable_row(_schema, v.it->row()));
consumer(deletable_row(_schema, *v.schema, v.it->row()));
}
}
}
// Can be called only when cursor is valid and pointing at a row.
template <typename Consumer>
requires std::is_invocable_v<Consumer, const deletable_row&>
void consume_row(Consumer&& consumer) const {
for (const position_in_version& v : _current_row) {
consumer(v.it->row());
}
}
// Returns memory footprint of row entries under the cursor.
// Can be called only when cursor is valid and pointing at a row.
size_t memory_usage() const {
size_t result = 0;
for (const position_in_version& v : _current_row) {
result += v.it->memory_usage(_schema);
result += v.it->memory_usage(*v.schema);
}
return result;
}
@@ -631,7 +623,7 @@ public:
is_dummy(!_position.is_clustering_row()), is_continuous::no));
} else {
return alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(*_snp.schema(), *_current_row[0].it));
current_allocator().construct<rows_entry>(*_snp.schema(), *_current_row[0].schema, *_current_row[0].it));
}
}();
rows_entry& re = *e;
@@ -707,7 +699,7 @@ public:
}
}
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(_schema, pos,
current_allocator().construct<rows_entry>(*_snp.version()->get_schema(), pos,
is_dummy(!pos.is_clustering_row()),
is_continuous::no));
if (latest_i && latest_i->continuous()) {