mutation_fragment: s/as_mutable_clustering_row/mutate_as_clustering_row/

We will soon want to update the memory consumption of mutation fragment
after each modification done to it, to do that safely we have to forbid
direct access to the underlying data and instead have callers pass a
lambda doing their modifications.

Uses where this method was just used to move the fragment away are
converted to use `as_clustering_row() &&`.
This commit is contained in:
Botond Dénes
2020-08-24 12:49:45 +03:00
parent f2b9cad4c6
commit 4f5ccf82cb
9 changed files with 75 additions and 45 deletions

View File

@@ -885,7 +885,7 @@ future<stop_iteration> view_update_builder::on_results() {
if (_update->is_range_tombstone()) {
_update_tombstone_tracker.apply(std::move(_update->as_range_tombstone()));
} else if (_update->is_clustering_row()) {
auto& update = _update->as_mutable_clustering_row();
auto update = std::move(*_update).as_clustering_row();
apply_tracked_tombstones(_update_tombstone_tracker, update);
auto tombstone = _existing_tombstone_tracker.current_tombstone();
auto existing = tombstone
@@ -901,7 +901,7 @@ future<stop_iteration> view_update_builder::on_results() {
if (_existing->is_range_tombstone()) {
_existing_tombstone_tracker.apply(std::move(_existing->as_range_tombstone()));
} else if (_existing->is_clustering_row()) {
auto& existing = _existing->as_mutable_clustering_row();
auto existing = std::move(*_existing).as_clustering_row();
apply_tracked_tombstones(_existing_tombstone_tracker, existing);
auto tombstone = _update_tombstone_tracker.current_tombstone();
// The way we build the read command used for existing rows, we should always have a non-empty
@@ -921,8 +921,12 @@ future<stop_iteration> view_update_builder::on_results() {
_update_tombstone_tracker.apply(std::move(*_update).as_range_tombstone());
} else if (_update->is_clustering_row()) {
assert(_existing->is_clustering_row());
apply_tracked_tombstones(_update_tombstone_tracker, _update->as_mutable_clustering_row());
apply_tracked_tombstones(_existing_tombstone_tracker, _existing->as_mutable_clustering_row());
_update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
apply_tracked_tombstones(_update_tombstone_tracker, cr);
});
_existing->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
apply_tracked_tombstones(_existing_tombstone_tracker, cr);
});
generate_update(std::move(*_update).as_clustering_row(), { std::move(*_existing).as_clustering_row() });
}
return advance_all();
@@ -942,7 +946,9 @@ future<stop_iteration> view_update_builder::on_results() {
// If we have updates and it's a range tombstone, it removes nothing pre-exisiting, so we can ignore it
if (_update && !_update->is_end_of_partition()) {
if (_update->is_clustering_row()) {
apply_tracked_tombstones(_update_tombstone_tracker, _update->as_mutable_clustering_row());
_update->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
apply_tracked_tombstones(_update_tombstone_tracker, cr);
});
auto existing_tombstone = _existing_tombstone_tracker.current_tombstone();
auto existing = existing_tombstone
? std::optional<clustering_row>(std::in_place, _update->as_clustering_row().key(), row_tombstone(std::move(existing_tombstone)), row_marker(), ::row())

View File

@@ -385,9 +385,9 @@ public:
_data->_size_in_bytes = std::nullopt;
fn(_data->_static_row);
}
clustering_row& as_mutable_clustering_row() {
void mutate_as_clustering_row(const schema& s, std::invocable<clustering_row&> auto&& fn) {
_data->_size_in_bytes = std::nullopt;
return _data->_clustering_row;
fn(_data->_clustering_row);
}
range_tombstone& as_mutable_range_tombstone() {
_data->_size_in_bytes = std::nullopt;

View File

@@ -303,10 +303,14 @@ mutation_fragment frozen_mutation_fragment::unfreeze(const schema& s)
clustering_row_builder(const schema& s, clustering_key key, row_tombstone t, row_marker m)
: _s(s), _mf(mutation_fragment::clustering_row_tag_t(), std::move(key), std::move(t), std::move(m), row()) { }
void accept_atomic_cell(column_id id, atomic_cell ac) {
_mf.as_mutable_clustering_row().cells().append_cell(id, std::move(ac));
_mf.mutate_as_clustering_row(_s, [&] (clustering_row& cr) mutable {
cr.cells().append_cell(id, std::move(ac));
});
}
void accept_collection(column_id id, const collection_mutation& cm) {
_mf.as_mutable_clustering_row().cells().append_cell(id, collection_mutation(*_s.regular_column_at(id).type, cm));
_mf.mutate_as_clustering_row(_s, [&] (clustering_row& cr) mutable {
cr.cells().append_cell(id, collection_mutation(*_s.regular_column_at(id).type, cm));
});
}
mutation_fragment get_mutation_fragment() && { return std::move(_mf); }
};

View File

@@ -186,7 +186,9 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
if (_digest_requested) {
e.row().cells().prepare_hash(_schema, column_kind::regular_column);
}
result.as_mutable_clustering_row().apply(_schema, e);
result.mutate_as_clustering_row(_schema, [&] (clustering_row& cr) mutable {
cr.apply(_schema, e);
});
}
return result;
}

View File

@@ -361,12 +361,11 @@ public:
if (digest_requested) {
row->row().cells().prepare_hash(_schema, column_kind::regular_column);
}
auto mf = mutation_fragment(clustering_row(_schema, *row));
auto& cr = mf.as_mutable_clustering_row();
auto cr = clustering_row(_schema, *row);
for (++it; it != _current_row.end(); ++it) {
cr.apply(_schema, *it->it);
}
return mf;
return mutation_fragment(std::move(cr));
}
// Can be called only when cursor is valid and pointing at a row.

View File

@@ -264,7 +264,9 @@ private:
sr.set_cell(*_cdef, std::move(ac));
});
} else {
mf.as_mutable_clustering_row().set_cell(*_cdef, std::move(ac));
mf.mutate_as_clustering_row(s, [&] (clustering_row& cr) {
cr.set_cell(*_cdef, std::move(ac));
});
}
}
};
@@ -522,7 +524,9 @@ public:
if (col.cell.size() == 0) {
row_marker rm(timestamp, gc_clock::duration(ttl), gc_clock::time_point(gc_clock::duration(expiration)));
_in_progress->as_mutable_clustering_row().apply(std::move(rm));
_in_progress->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) {
cr.apply(std::move(rm));
});
return ret;
}
@@ -543,7 +547,9 @@ public:
sr.set_cell(*(col.cdef), std::move(ac));
});
} else {
_in_progress->as_mutable_clustering_row().set_cell(*(col.cdef), atomic_cell_or_collection(std::move(ac)));
_in_progress->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.set_cell(*(col.cdef), atomic_cell_or_collection(std::move(ac)));
});
}
});
}
@@ -597,7 +603,9 @@ public:
});
return;
}
_in_progress->as_mutable_clustering_row().set_cell(*(col.cdef), atomic_cell_or_collection(std::move(ac)));
_in_progress->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.set_cell(*(col.cdef), atomic_cell_or_collection(std::move(ac)));
});
});
}
@@ -617,7 +625,9 @@ public:
if (col.cell.size() == 0) {
row_marker rm(tombstone(timestamp, local_deletion_time));
_in_progress->as_mutable_clustering_row().apply(rm);
_in_progress->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.apply(rm);
});
return ret;
}
if (!col.is_present) {
@@ -638,7 +648,9 @@ public:
sr.set_cell(*col.cdef, atomic_cell_or_collection(std::move(ac)));
});
} else {
_in_progress->as_mutable_clustering_row().set_cell(*col.cdef, atomic_cell_or_collection(std::move(ac)));
_in_progress->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.set_cell(*col.cdef, atomic_cell_or_collection(std::move(ac)));
});
}
return ret;
}
@@ -656,7 +668,9 @@ public:
auto ck = clustering_key_prefix::from_exploded_view(key);
auto ret = flush_if_needed(std::move(ck));
if (!_skip_in_progress) {
_in_progress->as_mutable_clustering_row().apply(shadowable_tombstone(tombstone(deltime)));
_in_progress->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.apply(shadowable_tombstone(tombstone(deltime)));
});
}
return ret;
}
@@ -710,7 +724,9 @@ public:
if (range_tombstone::is_single_clustering_row_tombstone(*_schema, start_ck, start_kind, end, end_kind)) {
auto ret = flush_if_needed(std::move(start_ck));
if (!_skip_in_progress) {
_in_progress->as_mutable_clustering_row().apply(tombstone(deltime));
_in_progress->mutate_as_clustering_row(*_schema, [&] (clustering_row& cr) mutable {
cr.apply(tombstone(deltime));
});
}
return ret;
} else {

View File

@@ -505,7 +505,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = mt->make_flat_reader(s, tests::make_permit());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
}
@@ -514,14 +514,14 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = mt->make_flat_reader(s, tests::make_permit(), query::full_partition_range, slice);
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
{
auto rd = mt->make_flat_reader(s, tests::make_permit());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
@@ -531,7 +531,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = mt->make_flat_reader(s, tests::make_permit());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
}
@@ -540,14 +540,14 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = mt->make_flat_reader(s, tests::make_permit(), query::full_partition_range, slice);
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
{
auto rd = mt->make_flat_reader(s, tests::make_permit());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
});

View File

@@ -3326,7 +3326,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = cache.make_reader(s, tests::make_permit());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
}
@@ -3335,14 +3335,14 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = cache.make_reader(s, tests::make_permit(), query::full_partition_range, slice);
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
{
auto rd = cache.make_reader(s, tests::make_permit());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
@@ -3353,7 +3353,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = cache.make_reader(s, tests::make_permit());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
}
@@ -3362,14 +3362,14 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = cache.make_reader(s, tests::make_permit(), query::full_partition_range, slice);
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
{
auto rd = cache.make_reader(s, tests::make_permit());
rd(db::no_timeout).get0()->as_partition_start();
clustering_row row = std::move(rd(db::no_timeout).get0()->as_mutable_clustering_row());
clustering_row row = std::move(*rd(db::no_timeout).get0()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
});

View File

@@ -115,26 +115,29 @@ PERF_TEST_F(clustering_row, copy_1M)
PERF_TEST_F(clustering_row, hash_4)
{
auto& cr = clustering_row_4().as_mutable_clustering_row();
cr.cells().prepare_hash(*schema(), column_kind::regular_column);
perf_tests::do_not_optimize(cr);
cr.cells().clear_hash();
clustering_row_4().mutate_as_clustering_row(*schema(), [&] (::clustering_row& cr) mutable {
cr.cells().prepare_hash(*schema(), column_kind::regular_column);
perf_tests::do_not_optimize(cr);
cr.cells().clear_hash();
});
}
PERF_TEST_F(clustering_row, hash_4k)
{
auto& cr = clustering_row_4k().as_mutable_clustering_row();
cr.cells().prepare_hash(*schema(), column_kind::regular_column);
perf_tests::do_not_optimize(cr);
cr.cells().clear_hash();
clustering_row_4k().mutate_as_clustering_row(*schema(), [&] (::clustering_row& cr) mutable {
cr.cells().prepare_hash(*schema(), column_kind::regular_column);
perf_tests::do_not_optimize(cr);
cr.cells().clear_hash();
});
}
PERF_TEST_F(clustering_row, hash_1M)
{
auto& cr = clustering_row_1M().as_mutable_clustering_row();
cr.cells().prepare_hash(*schema(), column_kind::regular_column);
perf_tests::do_not_optimize(cr);
cr.cells().clear_hash();
clustering_row_1M().mutate_as_clustering_row(*schema(), [&] (::clustering_row& cr) mutable {
cr.cells().prepare_hash(*schema(), column_kind::regular_column);
perf_tests::do_not_optimize(cr);
cr.cells().clear_hash();
});
}
}