From 354ce0b2c79f4b2c23e7d71bbe64e1beff2a43e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Thu, 9 Feb 2017 10:47:37 +0000 Subject: [PATCH] mutation_fragment: make write access more explicit mutation_fragments are going to be caching their size in memory. In order to be able to invalidate that correctly, they need to know when that size may change (but avoid invalidation when it is not necessary). --- db/view/view.cc | 16 ++++++++-------- partition_version.hh | 2 +- sstables/partition.cc | 24 ++++++++++++------------ streamed_mutation.cc | 4 ++-- streamed_mutation.hh | 16 ++++++++++------ 5 files changed, 33 insertions(+), 29 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index 5094e405e9..987ec93fbd 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -538,7 +538,7 @@ future view_update_builder::on_results() { if (_update->is_range_tombstone()) { _update_tombstone_tracker.apply(std::move(_update->as_range_tombstone())); } else { - auto& update = _update->as_clustering_row(); + auto& update = _update->as_mutable_clustering_row(); apply_tracked_tombstones(_update_tombstone_tracker, update); auto tombstone = _existing_tombstone_tracker.current_tombstone(); auto existing = tombstone @@ -554,7 +554,7 @@ future view_update_builder::on_results() { if (_existing->is_range_tombstone()) { _existing_tombstone_tracker.apply(std::move(_existing->as_range_tombstone())); } else { - auto& existing = _existing->as_clustering_row(); + auto& existing = _existing->as_mutable_clustering_row(); apply_tracked_tombstones(_existing_tombstone_tracker, existing); auto tombstone = _update_tombstone_tracker.current_tombstone(); // The way we build the read command used for existing rows, we should always have a non-empty @@ -570,13 +570,13 @@ future view_update_builder::on_results() { // We're updating a row that had pre-existing data if (_update->is_range_tombstone()) { assert(_existing->is_range_tombstone()); - _existing_tombstone_tracker.apply(std::move(_existing->as_range_tombstone())); - _update_tombstone_tracker.apply(std::move(_update->as_range_tombstone())); + _existing_tombstone_tracker.apply(std::move(*_existing).as_range_tombstone()); + _update_tombstone_tracker.apply(std::move(*_update).as_range_tombstone()); } else { assert(!_existing->is_range_tombstone()); - apply_tracked_tombstones(_update_tombstone_tracker, _update->as_clustering_row()); - apply_tracked_tombstones(_existing_tombstone_tracker, _existing->as_clustering_row()); - generate_update(std::move(_update->as_clustering_row()), { std::move(_existing->as_clustering_row()) }); + apply_tracked_tombstones(_update_tombstone_tracker, _update->as_mutable_clustering_row()); + apply_tracked_tombstones(_existing_tombstone_tracker, _existing->as_mutable_clustering_row()); + generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() }); } return advance_all(); } @@ -594,7 +594,7 @@ future view_update_builder::on_results() { // If we have updates and it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it if (_update && !_update->is_range_tombstone()) { - generate_update(std::move(_update->as_clustering_row()), { }); + generate_update(std::move(*_update).as_clustering_row(), { }); return advance_updates(); } diff --git a/partition_version.hh b/partition_version.hh index bc74cb2acb..a02c975eca 100644 --- a/partition_version.hh +++ b/partition_version.hh @@ -383,7 +383,7 @@ private: if (!sr) { sr = mutation_fragment(static_row(v.partition().static_row())); } else { - sr->as_static_row().apply(*_schema, v.partition().static_row()); + sr->as_mutable_static_row().apply(*_schema, v.partition().static_row()); } } } diff --git a/sstables/partition.cc b/sstables/partition.cc index a8bad886a5..040e1c6c62 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -242,9 +242,9 @@ private: auto ctype = static_pointer_cast(_cdef->type); auto ac = atomic_cell_or_collection::from_collection_mutation(ctype->serialize_mutation_form(cm)); if (_cdef->is_static()) { - mf.as_static_row().set_cell(*_cdef, std::move(ac)); + mf.as_mutable_static_row().set_cell(*_cdef, std::move(ac)); } else { - mf.as_clustering_row().set_cell(*_cdef, std::move(ac)); + mf.as_mutable_clustering_row().set_cell(*_cdef, std::move(ac)); } } }; @@ -470,7 +470,7 @@ public: if (col.cell.size() == 0) { row_marker rm(timestamp, gc_clock::duration(ttl), gc_clock::time_point(gc_clock::duration(expiration))); - _in_progress->as_clustering_row().apply(std::move(rm)); + _in_progress->as_mutable_clustering_row().apply(std::move(rm)); return ret; } @@ -487,9 +487,9 @@ public: auto ac = make_counter_cell(timestamp, value); if (col.is_static) { - _in_progress->as_static_row().set_cell(*(col.cdef), std::move(ac)); + _in_progress->as_mutable_static_row().set_cell(*(col.cdef), std::move(ac)); } else { - _in_progress->as_clustering_row().set_cell(*(col.cdef), atomic_cell_or_collection(std::move(ac))); + _in_progress->as_mutable_clustering_row().set_cell(*(col.cdef), atomic_cell_or_collection(std::move(ac))); } }); } @@ -517,10 +517,10 @@ public: } if (col.is_static) { - _in_progress->as_static_row().set_cell(*(col.cdef), std::move(ac)); + _in_progress->as_mutable_static_row().set_cell(*(col.cdef), std::move(ac)); return; } - _in_progress->as_clustering_row().set_cell(*(col.cdef), atomic_cell_or_collection(std::move(ac))); + _in_progress->as_mutable_clustering_row().set_cell(*(col.cdef), atomic_cell_or_collection(std::move(ac))); }); } @@ -545,7 +545,7 @@ public: if (col.cell.size() == 0) { row_marker rm(tombstone(timestamp, ttl)); - _in_progress->as_clustering_row().apply(rm); + _in_progress->as_mutable_clustering_row().apply(rm); return ret; } if (!col.is_present) { @@ -562,9 +562,9 @@ public: if (is_multi_cell) { update_pending_collection(col.cdef, std::move(col.collection_extra_data), std::move(ac)); } else if (col.is_static) { - _in_progress->as_static_row().set_cell(*col.cdef, atomic_cell_or_collection(std::move(ac))); + _in_progress->as_mutable_static_row().set_cell(*col.cdef, atomic_cell_or_collection(std::move(ac))); } else { - _in_progress->as_clustering_row().set_cell(*col.cdef, atomic_cell_or_collection(std::move(ac))); + _in_progress->as_mutable_clustering_row().set_cell(*col.cdef, atomic_cell_or_collection(std::move(ac))); } return ret; } @@ -629,7 +629,7 @@ public: if (range_tombstone::is_single_clustering_row_tombstone(*_schema, start_ck, start_kind, end, end_kind)) { auto ret = flush_if_needed(std::move(start_ck)); if (!_skip_clustering_row) { - _in_progress->as_clustering_row().apply(tombstone(deltime)); + _in_progress->as_mutable_clustering_row().apply(tombstone(deltime)); } return ret; } else { @@ -757,7 +757,7 @@ private: // that mutation fragment are produced in ascending order. if (!_last_position || !_cmp(mf->position(), *_last_position)) { _last_position = position_in_partition(mf->position()); - _range_tombstones.apply(std::move(mf->as_range_tombstone())); + _range_tombstones.apply(std::move(*mf).as_range_tombstone()); } } else { // mp_row_consumer may produce mutation_fragments in parts if they are diff --git a/streamed_mutation.cc b/streamed_mutation.cc index 8929902762..8d052441a7 100644 --- a/streamed_mutation.cc +++ b/streamed_mutation.cc @@ -276,7 +276,7 @@ private: } boost::range::pop_heap(_readers, heap_compare); if (result.is_range_tombstone()) { - auto remainder = result.as_range_tombstone().apply(*_schema, std::move(_readers.back().row.as_range_tombstone())); + auto remainder = result.as_mutable_range_tombstone().apply(*_schema, std::move(_readers.back().row).as_range_tombstone()); if (remainder) { _deferred_tombstones.apply(std::move(*remainder)); } @@ -409,7 +409,7 @@ streamed_mutation reverse_streamed_mutation(streamed_mutation sm) { _static_row = std::move(mf); } else { if (mf->is_range_tombstone()) { - mf->as_range_tombstone().flip(); + mf->as_mutable_range_tombstone().flip(); } _mutation_fragments.emplace(std::move(*mf)); } diff --git a/streamed_mutation.hh b/streamed_mutation.hh index 2e71603d4f..486cc3fe68 100644 --- a/streamed_mutation.hh +++ b/streamed_mutation.hh @@ -211,13 +211,17 @@ public: bool is_clustering_row() const { return _kind == kind::clustering_row; } bool is_range_tombstone() const { return _kind == kind::range_tombstone; } - static_row& as_static_row() { return _data->_static_row; } - clustering_row& as_clustering_row() { return _data->_clustering_row; } - range_tombstone& as_range_tombstone() { return _data->_range_tombstone; } + static_row& as_mutable_static_row() { return _data->_static_row; } + clustering_row& as_mutable_clustering_row() { return _data->_clustering_row; } + range_tombstone& as_mutable_range_tombstone() { return _data->_range_tombstone; } - const static_row& as_static_row() const { return _data->_static_row; } - const clustering_row& as_clustering_row() const { return _data->_clustering_row; } - const range_tombstone& as_range_tombstone() const { return _data->_range_tombstone; } + static_row&& as_static_row() && { return std::move(_data->_static_row); } + clustering_row&& as_clustering_row() && { return std::move(_data->_clustering_row); } + range_tombstone&& as_range_tombstone() && { return std::move(_data->_range_tombstone); } + + const static_row& as_static_row() const & { return _data->_static_row; } + const clustering_row& as_clustering_row() const & { return _data->_clustering_row; } + const range_tombstone& as_range_tombstone() const & { return _data->_range_tombstone; } // Requirements: mutation_fragment_kind() == mf.mutation_fragment_kind() && !is_range_tombstone() void apply(const schema& s, mutation_fragment&& mf);