treewide: require type to copy atomic_cell
This commit is contained in:
@@ -30,6 +30,7 @@
|
||||
#include <cstdint>
|
||||
#include <iosfwd>
|
||||
#include <seastar/util/gcc6-concepts.hh>
|
||||
#include "data/cell.hh"
|
||||
|
||||
class abstract_type;
|
||||
|
||||
@@ -259,7 +260,7 @@ public:
|
||||
class atomic_cell_view final : public atomic_cell_base<bytes_view> {
|
||||
atomic_cell_view(bytes_view data) : atomic_cell_base(std::move(data)) {}
|
||||
public:
|
||||
static atomic_cell_view from_bytes(bytes_view data) { return atomic_cell_view(data); }
|
||||
static atomic_cell_view from_bytes(const data::type_info&, bytes_view data) { return atomic_cell_view(data); }
|
||||
|
||||
friend class atomic_cell;
|
||||
friend std::ostream& operator<<(std::ostream& os, const atomic_cell_view& acv);
|
||||
@@ -268,7 +269,7 @@ public:
|
||||
class atomic_cell_mutable_view final : public atomic_cell_base<bytes_mutable_view> {
|
||||
atomic_cell_mutable_view(bytes_mutable_view data) : atomic_cell_base(std::move(data)) {}
|
||||
public:
|
||||
static atomic_cell_mutable_view from_bytes(bytes_mutable_view data) { return atomic_cell_mutable_view(data); }
|
||||
static atomic_cell_mutable_view from_bytes(const data::type_info&, bytes_mutable_view data) { return atomic_cell_mutable_view(data); }
|
||||
|
||||
friend class atomic_cell;
|
||||
};
|
||||
@@ -281,11 +282,11 @@ public:
|
||||
class atomic_cell final : public atomic_cell_base<managed_bytes> {
|
||||
atomic_cell(managed_bytes b) : atomic_cell_base(std::move(b)) {}
|
||||
public:
|
||||
atomic_cell(const atomic_cell&) = default;
|
||||
atomic_cell(const abstract_type&, const atomic_cell& ac) : atomic_cell_base(managed_bytes(ac._data)) { }
|
||||
atomic_cell(atomic_cell&&) = default;
|
||||
atomic_cell& operator=(const atomic_cell&) = default;
|
||||
atomic_cell& operator=(const atomic_cell&) = delete;
|
||||
atomic_cell& operator=(atomic_cell&&) = default;
|
||||
atomic_cell(atomic_cell_view other) : atomic_cell_base(managed_bytes{other._data}) {}
|
||||
atomic_cell(const abstract_type&, atomic_cell_view other) : atomic_cell_base(managed_bytes{other._data}) {}
|
||||
operator atomic_cell_view() const {
|
||||
return atomic_cell_view(_data);
|
||||
}
|
||||
|
||||
@@ -40,10 +40,11 @@ public:
|
||||
atomic_cell_or_collection& operator=(atomic_cell_or_collection&&) = default;
|
||||
atomic_cell_or_collection& operator=(const atomic_cell_or_collection&) = delete;
|
||||
atomic_cell_or_collection(atomic_cell ac) : _data(std::move(ac._data)) {}
|
||||
atomic_cell_or_collection(const abstract_type& at, atomic_cell_view acv) : atomic_cell_or_collection(atomic_cell(at, acv)) { }
|
||||
static atomic_cell_or_collection from_atomic_cell(atomic_cell data) { return { std::move(data._data) }; }
|
||||
atomic_cell_view as_atomic_cell(const column_definition&) const { return atomic_cell_view::from_bytes(_data); }
|
||||
atomic_cell_view as_atomic_cell(const column_definition& cdef) const { return atomic_cell_view::from_bytes(cdef.type->imr_state().type_info(), _data); }
|
||||
atomic_cell_ref as_atomic_cell_ref(const column_definition&) { return { _data }; }
|
||||
atomic_cell_mutable_view as_mutable_atomic_cell(const column_definition&) { return atomic_cell_mutable_view::from_bytes(_data); }
|
||||
atomic_cell_mutable_view as_mutable_atomic_cell(const column_definition& cdef) { return atomic_cell_mutable_view::from_bytes(cdef.type->imr_state().type_info(), _data); }
|
||||
atomic_cell_or_collection(collection_mutation cm) : _data(std::move(cm.data)) {}
|
||||
atomic_cell_or_collection copy(const abstract_type&) const { return managed_bytes(_data); }
|
||||
explicit operator bool() const {
|
||||
|
||||
@@ -40,7 +40,7 @@ private:
|
||||
}
|
||||
static void accept_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, atomic_cell_view cell) {
|
||||
if (is_compatible(new_def, old_type, kind) && cell.timestamp() > new_def.dropped_at()) {
|
||||
dst.apply(new_def, atomic_cell_or_collection(cell));
|
||||
dst.apply(new_def, atomic_cell_or_collection(*new_def.type, cell));
|
||||
}
|
||||
}
|
||||
static void accept_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, collection_mutation_view cell) {
|
||||
|
||||
@@ -161,7 +161,7 @@ void counter_cell_view::apply(const column_definition& cdef, atomic_cell_or_coll
|
||||
});
|
||||
|
||||
auto cell = result.build(std::max(dst_ac.timestamp(), src_ac.timestamp()));
|
||||
src = std::exchange(dst, atomic_cell_or_collection(cell));
|
||||
src = std::exchange(dst, atomic_cell_or_collection(*counter_type, cell));
|
||||
}
|
||||
|
||||
stdx::optional<atomic_cell> counter_cell_view::difference(atomic_cell_view a, atomic_cell_view b)
|
||||
@@ -171,7 +171,7 @@ stdx::optional<atomic_cell> counter_cell_view::difference(atomic_cell_view a, at
|
||||
|
||||
if (!b.is_live() || !a.is_live()) {
|
||||
if (b.is_live() || (!a.is_live() && compare_atomic_cell_for_merge(b, a) < 0)) {
|
||||
return atomic_cell(a);
|
||||
return atomic_cell(*counter_type, a);
|
||||
}
|
||||
return { };
|
||||
}
|
||||
|
||||
@@ -302,7 +302,8 @@ maps::setter_by_key::execute(mutation& m, const clustering_key_prefix& prefix, c
|
||||
}
|
||||
auto ctype = static_pointer_cast<const map_type_impl>(column.type);
|
||||
auto avalue = value ? params.make_cell(*ctype->get_values_type(), *value) : params.make_dead_cell();
|
||||
map_type_impl::mutation update = { {}, { { std::move(to_bytes(*key)), std::move(avalue) } } };
|
||||
map_type_impl::mutation update;
|
||||
update.cells.emplace_back(std::move(to_bytes(*key)), std::move(avalue));
|
||||
// should have been verified as map earlier?
|
||||
auto col_mut = ctype->serialize_mutation_form(std::move(update));
|
||||
m.set_cell(prefix, column, std::move(col_mut));
|
||||
|
||||
@@ -1623,7 +1623,7 @@ bool row::compact_and_expire(
|
||||
auto&& cell = c.as_collection_mutation();
|
||||
auto&& ctype = static_pointer_cast<const collection_type_impl>(def.type);
|
||||
auto m_view = ctype->deserialize_mutation_form(cell);
|
||||
collection_type_impl::mutation m = m_view.materialize();
|
||||
collection_type_impl::mutation m = m_view.materialize(*ctype);
|
||||
any_live |= m.compact_and_expire(tomb, query_time, can_gc, gc_before);
|
||||
if (m.cells.empty() && m.tomb <= tomb.tomb()) {
|
||||
erase = true;
|
||||
|
||||
@@ -39,7 +39,8 @@ public:
|
||||
}
|
||||
|
||||
virtual void accept_static_cell(column_id id, atomic_cell_view cell) override {
|
||||
_p._static_row.apply(_schema.column_at(column_kind::static_column, id), atomic_cell_or_collection(cell));
|
||||
auto& cdef = _schema.static_column_at(id);
|
||||
_p._static_row.apply(_schema.column_at(column_kind::static_column, id), atomic_cell_or_collection(*cdef.type, cell));
|
||||
}
|
||||
|
||||
virtual void accept_static_cell(column_id id, collection_mutation_view collection) override {
|
||||
@@ -58,7 +59,8 @@ public:
|
||||
}
|
||||
|
||||
virtual void accept_row_cell(column_id id, atomic_cell_view cell) override {
|
||||
_current_row->cells().apply(_schema.column_at(column_kind::regular_column, id), atomic_cell_or_collection(cell));
|
||||
auto& cdef = _schema.regular_column_at(id);
|
||||
_current_row->cells().apply(_schema.column_at(column_kind::regular_column, id), atomic_cell_or_collection(*cdef.type, cell));
|
||||
}
|
||||
|
||||
virtual void accept_row_cell(column_id id, collection_mutation_view collection) override {
|
||||
|
||||
@@ -243,12 +243,14 @@ mutation_fragment frozen_mutation_fragment::unfreeze(const schema& s)
|
||||
return seastar::visit(view.fragment(),
|
||||
[&] (ser::clustering_row_view crv) {
|
||||
class clustering_row_builder {
|
||||
const schema& _s;
|
||||
mutation_fragment _mf;
|
||||
public:
|
||||
clustering_row_builder(clustering_key key, row_tombstone t, row_marker m)
|
||||
: _mf(mutation_fragment::clustering_row_tag_t(), std::move(key), std::move(t), std::move(m), row()) { }
|
||||
clustering_row_builder(const schema& s, clustering_key key, row_tombstone t, row_marker m)
|
||||
: _s(s), _mf(mutation_fragment::clustering_row_tag_t(), std::move(key), std::move(t), std::move(m), row()) { }
|
||||
void accept_atomic_cell(column_id id, const atomic_cell& ac) {
|
||||
_mf.as_mutable_clustering_row().cells().append_cell(id, atomic_cell_or_collection(ac));
|
||||
auto& type = *_s.regular_column_at(id).type;
|
||||
_mf.as_mutable_clustering_row().cells().append_cell(id, atomic_cell_or_collection(atomic_cell(type, ac)));
|
||||
}
|
||||
void accept_collection(column_id id, const collection_mutation& cm) {
|
||||
_mf.as_mutable_clustering_row().cells().append_cell(id, atomic_cell_or_collection(cm));
|
||||
@@ -258,17 +260,19 @@ mutation_fragment frozen_mutation_fragment::unfreeze(const schema& s)
|
||||
|
||||
auto cr = crv.row();
|
||||
auto t = row_tombstone(cr.deleted_at(), shadowable_tombstone(cr.shadowable_deleted_at()));
|
||||
clustering_row_builder builder(cr.key(), std::move(t), read_row_marker(cr.marker()));
|
||||
clustering_row_builder builder(s, cr.key(), std::move(t), read_row_marker(cr.marker()));
|
||||
read_and_visit_row(cr.cells(), s.get_column_mapping(), column_kind::regular_column, builder);
|
||||
return std::move(builder).get_mutation_fragment();
|
||||
},
|
||||
[&] (ser::static_row_view sr) {
|
||||
class static_row_builder {
|
||||
const schema& _s;
|
||||
mutation_fragment _mf;
|
||||
public:
|
||||
static_row_builder() : _mf(static_row()) { }
|
||||
explicit static_row_builder(const schema& s) : _s(s), _mf(static_row()) { }
|
||||
void accept_atomic_cell(column_id id, const atomic_cell& ac) {
|
||||
_mf.as_mutable_static_row().cells().append_cell(id, atomic_cell_or_collection(ac));
|
||||
auto& type = *_s.static_column_at(id).type;
|
||||
_mf.as_mutable_static_row().cells().append_cell(id, atomic_cell_or_collection(atomic_cell(type, ac)));
|
||||
}
|
||||
void accept_collection(column_id id, const collection_mutation& cm) {
|
||||
_mf.as_mutable_static_row().cells().append_cell(id, atomic_cell_or_collection(cm));
|
||||
@@ -276,7 +280,7 @@ mutation_fragment frozen_mutation_fragment::unfreeze(const schema& s)
|
||||
mutation_fragment get_mutation_fragment() && { return std::move(_mf); }
|
||||
};
|
||||
|
||||
static_row_builder builder;
|
||||
static_row_builder builder(s);
|
||||
read_and_visit_row(sr.cells(), s.get_column_mapping(), column_kind::static_column, builder);
|
||||
return std::move(builder).get_mutation_fragment();
|
||||
},
|
||||
|
||||
@@ -44,7 +44,8 @@ public:
|
||||
|
||||
virtual void accept_static_cell(column_id id, atomic_cell_view cell) override {
|
||||
row& r = _partition.static_row();
|
||||
r.append_cell(id, atomic_cell_or_collection(cell));
|
||||
auto& cdef = _schema.static_column_at(id);
|
||||
r.append_cell(id, atomic_cell_or_collection(*cdef.type, cell));
|
||||
}
|
||||
|
||||
virtual void accept_static_cell(column_id id, collection_mutation_view collection) override {
|
||||
@@ -65,7 +66,8 @@ public:
|
||||
|
||||
virtual void accept_row_cell(column_id id, atomic_cell_view cell) override {
|
||||
row& r = _current_row->cells();
|
||||
r.append_cell(id, atomic_cell_or_collection(cell));
|
||||
auto& cdef = _schema.regular_column_at(id);
|
||||
r.append_cell(id, atomic_cell_or_collection(*cdef.type, cell));
|
||||
}
|
||||
|
||||
virtual void accept_row_cell(column_id id, collection_mutation_view collection) override {
|
||||
|
||||
@@ -109,7 +109,9 @@ SEASTAR_TEST_CASE(test_apply) {
|
||||
b1.add_shard(counter_shard(id[4], 1, 3));
|
||||
auto c1 = atomic_cell_or_collection(b1.build(1));
|
||||
|
||||
auto c2 = counter_cell_builder::from_single_shard(2, counter_shard(id[2], 8, 3));
|
||||
auto c2 = atomic_cell_or_collection(
|
||||
counter_cell_builder::from_single_shard(2, counter_shard(id[2], 8, 3))
|
||||
);
|
||||
|
||||
verify_apply(c1, c2, 12);
|
||||
verify_apply(c2, c1, 12);
|
||||
@@ -122,7 +124,9 @@ SEASTAR_TEST_CASE(test_apply) {
|
||||
verify_apply(c1, c3, 15);
|
||||
verify_apply(c3, c1, 15);
|
||||
|
||||
auto c4 = counter_cell_builder::from_single_shard(0, counter_shard(id[2], 8, 1));
|
||||
auto c4 = atomic_cell_or_collection(
|
||||
counter_cell_builder::from_single_shard(0, counter_shard(id[2], 8, 1))
|
||||
);
|
||||
|
||||
verify_apply(c1, c4, 6);
|
||||
verify_apply(c4, c1, 6);
|
||||
@@ -136,7 +140,9 @@ SEASTAR_TEST_CASE(test_apply) {
|
||||
verify_apply(c1, c5, 21);
|
||||
verify_apply(c5, c1, 21);
|
||||
|
||||
auto c6 = counter_cell_builder::from_single_shard(3, counter_shard(id[2], 8, 1));
|
||||
auto c6 = atomic_cell_or_collection(
|
||||
counter_cell_builder::from_single_shard(3, counter_shard(id[2], 8, 1))
|
||||
);
|
||||
|
||||
verify_apply(c1, c6, 6);
|
||||
verify_apply(c6, c1, 6);
|
||||
@@ -340,19 +346,19 @@ SEASTAR_TEST_CASE(test_counter_update_mutations) {
|
||||
auto c1 = atomic_cell::make_live_counter_update(api::new_timestamp(), 5);
|
||||
auto s1 = atomic_cell::make_live_counter_update(api::new_timestamp(), 4);
|
||||
mutation m1(s, pk);
|
||||
m1.set_clustered_cell(ck, col, c1);
|
||||
m1.set_static_cell(scol, s1);
|
||||
m1.set_clustered_cell(ck, col, std::move(c1));
|
||||
m1.set_static_cell(scol, std::move(s1));
|
||||
|
||||
auto c2 = atomic_cell::make_live_counter_update(api::new_timestamp(), 9);
|
||||
auto s2 = atomic_cell::make_live_counter_update(api::new_timestamp(), 8);
|
||||
mutation m2(s, pk);
|
||||
m2.set_clustered_cell(ck, col, c2);
|
||||
m2.set_static_cell(scol, s2);
|
||||
m2.set_clustered_cell(ck, col, std::move(c2));
|
||||
m2.set_static_cell(scol, std::move(s2));
|
||||
|
||||
auto c3 = atomic_cell::make_dead(api::new_timestamp() / 2, gc_clock::now());
|
||||
mutation m3(s, pk);
|
||||
m3.set_clustered_cell(ck, col, c3);
|
||||
m3.set_static_cell(scol, c3);
|
||||
m3.set_clustered_cell(ck, col, atomic_cell(*counter_type, c3));
|
||||
m3.set_static_cell(scol, std::move(c3));
|
||||
|
||||
auto m12 = m1;
|
||||
m12.apply(m2);
|
||||
@@ -390,19 +396,19 @@ SEASTAR_TEST_CASE(test_transfer_updates_to_shards) {
|
||||
auto c1 = atomic_cell::make_live_counter_update(api::new_timestamp(), 5);
|
||||
auto s1 = atomic_cell::make_live_counter_update(api::new_timestamp(), 4);
|
||||
mutation m1(s, pk);
|
||||
m1.set_clustered_cell(ck, col, c1);
|
||||
m1.set_static_cell(scol, s1);
|
||||
m1.set_clustered_cell(ck, col, std::move(c1));
|
||||
m1.set_static_cell(scol, std::move(s1));
|
||||
|
||||
auto c2 = atomic_cell::make_live_counter_update(api::new_timestamp(), 9);
|
||||
auto s2 = atomic_cell::make_live_counter_update(api::new_timestamp(), 8);
|
||||
mutation m2(s, pk);
|
||||
m2.set_clustered_cell(ck, col, c2);
|
||||
m2.set_static_cell(scol, s2);
|
||||
m2.set_clustered_cell(ck, col, std::move(c2));
|
||||
m2.set_static_cell(scol, std::move(s2));
|
||||
|
||||
auto c3 = atomic_cell::make_dead(api::new_timestamp() / 2, gc_clock::now());
|
||||
mutation m3(s, pk);
|
||||
m3.set_clustered_cell(ck, col, c3);
|
||||
m3.set_static_cell(scol, c3);
|
||||
m3.set_clustered_cell(ck, col, atomic_cell(*counter_type, c3));
|
||||
m3.set_static_cell(scol, std::move(c3));
|
||||
|
||||
auto m0 = m1;
|
||||
transform_counter_updates_to_shards(m0, nullptr, 0);
|
||||
|
||||
@@ -64,6 +64,15 @@ static db::nop_large_partition_handler nop_lp_handler;
|
||||
|
||||
static atomic_cell make_atomic_cell(bytes value) {
|
||||
return atomic_cell::make_live(*bytes_type, 0, std::move(value));
|
||||
}
|
||||
|
||||
static atomic_cell make_atomic_cell() {
|
||||
return atomic_cell::make_live(*bytes_type, 0, bytes_view());
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
static atomic_cell make_atomic_cell(data_type dt, T value) {
|
||||
return atomic_cell::make_live(*dt, 0, dt->decompose(std::move(value)));
|
||||
};
|
||||
|
||||
static mutation_partition get_partition(memtable& mt, const partition_key& key) {
|
||||
@@ -100,7 +109,8 @@ SEASTAR_TEST_CASE(test_mutation_is_applied) {
|
||||
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(2)});
|
||||
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(3)));
|
||||
auto c = make_atomic_cell(int32_type, 3);
|
||||
m.set_clustered_cell(c_key, r1_col, std::move(c));
|
||||
mt->apply(std::move(m));
|
||||
|
||||
auto p = get_partition(*mt, key);
|
||||
@@ -172,6 +182,23 @@ SEASTAR_TEST_CASE(test_row_tombstone_updates) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
collection_type_impl::mutation make_collection_mutation(tombstone t, bytes key, atomic_cell cell)
|
||||
{
|
||||
collection_type_impl::mutation m;
|
||||
m.tomb = t;
|
||||
m.cells.emplace_back(std::move(key), std::move(cell));
|
||||
return m;
|
||||
}
|
||||
|
||||
collection_type_impl::mutation make_collection_mutation(tombstone t, bytes key1, atomic_cell cell1, bytes key2, atomic_cell cell2)
|
||||
{
|
||||
collection_type_impl::mutation m;
|
||||
m.tomb = t;
|
||||
m.cells.emplace_back(std::move(key1), std::move(cell1));
|
||||
m.cells.emplace_back(std::move(key2), std::move(cell2));
|
||||
return m;
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_map_mutations) {
|
||||
return seastar::async([] {
|
||||
auto my_map_type = map_type_impl::get_instance(int32_type, utf8_type, true);
|
||||
@@ -180,19 +207,19 @@ SEASTAR_TEST_CASE(test_map_mutations) {
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
|
||||
auto& column = *s->get_column_definition("s1");
|
||||
map_type_impl::mutation mmut1{{}, {{int32_type->decompose(101), make_atomic_cell(utf8_type->decompose(sstring("101")))}}};
|
||||
auto mmut1 = make_collection_mutation({}, int32_type->decompose(101), make_atomic_cell(utf8_type, sstring("101")));
|
||||
mutation m1(s, key);
|
||||
m1.set_static_cell(column, my_map_type->serialize_mutation_form(mmut1));
|
||||
mt->apply(m1);
|
||||
map_type_impl::mutation mmut2{{}, {{int32_type->decompose(102), make_atomic_cell(utf8_type->decompose(sstring("102")))}}};
|
||||
auto mmut2 = make_collection_mutation({}, int32_type->decompose(102), make_atomic_cell(utf8_type, sstring("102")));
|
||||
mutation m2(s, key);
|
||||
m2.set_static_cell(column, my_map_type->serialize_mutation_form(mmut2));
|
||||
mt->apply(m2);
|
||||
map_type_impl::mutation mmut3{{}, {{int32_type->decompose(103), make_atomic_cell(utf8_type->decompose(sstring("103")))}}};
|
||||
auto mmut3 = make_collection_mutation({}, int32_type->decompose(103), make_atomic_cell(utf8_type, sstring("103")));
|
||||
mutation m3(s, key);
|
||||
m3.set_static_cell(column, my_map_type->serialize_mutation_form(mmut3));
|
||||
mt->apply(m3);
|
||||
map_type_impl::mutation mmut2o{{}, {{int32_type->decompose(102), make_atomic_cell(utf8_type->decompose(sstring("102 override")))}}};
|
||||
auto mmut2o = make_collection_mutation({}, int32_type->decompose(102), make_atomic_cell(utf8_type, sstring("102 override")));
|
||||
mutation m2o(s, key);
|
||||
m2o.set_static_cell(column, my_map_type->serialize_mutation_form(mmut2o));
|
||||
mt->apply(m2o);
|
||||
@@ -216,19 +243,19 @@ SEASTAR_TEST_CASE(test_set_mutations) {
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
|
||||
auto& column = *s->get_column_definition("s1");
|
||||
map_type_impl::mutation mmut1{{}, {{int32_type->decompose(101), make_atomic_cell({})}}};
|
||||
auto mmut1 = make_collection_mutation({}, int32_type->decompose(101), make_atomic_cell());
|
||||
mutation m1(s, key);
|
||||
m1.set_static_cell(column, my_set_type->serialize_mutation_form(mmut1));
|
||||
mt->apply(m1);
|
||||
map_type_impl::mutation mmut2{{}, {{int32_type->decompose(102), make_atomic_cell({})}}};
|
||||
auto mmut2 = make_collection_mutation({}, int32_type->decompose(102), make_atomic_cell());
|
||||
mutation m2(s, key);
|
||||
m2.set_static_cell(column, my_set_type->serialize_mutation_form(mmut2));
|
||||
mt->apply(m2);
|
||||
map_type_impl::mutation mmut3{{}, {{int32_type->decompose(103), make_atomic_cell({})}}};
|
||||
auto mmut3 = make_collection_mutation({}, int32_type->decompose(103), make_atomic_cell());
|
||||
mutation m3(s, key);
|
||||
m3.set_static_cell(column, my_set_type->serialize_mutation_form(mmut3));
|
||||
mt->apply(m3);
|
||||
map_type_impl::mutation mmut2o{{}, {{int32_type->decompose(102), make_atomic_cell({})}}};
|
||||
auto mmut2o = make_collection_mutation({}, int32_type->decompose(102), make_atomic_cell());
|
||||
mutation m2o(s, key);
|
||||
m2o.set_static_cell(column, my_set_type->serialize_mutation_form(mmut2o));
|
||||
mt->apply(m2o);
|
||||
@@ -253,19 +280,19 @@ SEASTAR_TEST_CASE(test_list_mutations) {
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
|
||||
auto& column = *s->get_column_definition("s1");
|
||||
auto make_key = [] { return timeuuid_type->decompose(utils::UUID_gen::get_time_UUID()); };
|
||||
collection_type_impl::mutation mmut1{{}, {{make_key(), make_atomic_cell(int32_type->decompose(101))}}};
|
||||
auto mmut1 = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 101));
|
||||
mutation m1(s, key);
|
||||
m1.set_static_cell(column, my_list_type->serialize_mutation_form(mmut1));
|
||||
mt->apply(m1);
|
||||
collection_type_impl::mutation mmut2{{}, {{make_key(), make_atomic_cell(int32_type->decompose(102))}}};
|
||||
auto mmut2 = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 102));
|
||||
mutation m2(s, key);
|
||||
m2.set_static_cell(column, my_list_type->serialize_mutation_form(mmut2));
|
||||
mt->apply(m2);
|
||||
collection_type_impl::mutation mmut3{{}, {{make_key(), make_atomic_cell(int32_type->decompose(103))}}};
|
||||
auto mmut3 = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 103));
|
||||
mutation m3(s, key);
|
||||
m3.set_static_cell(column, my_list_type->serialize_mutation_form(mmut3));
|
||||
mt->apply(m3);
|
||||
collection_type_impl::mutation mmut2o{{}, {{make_key(), make_atomic_cell(int32_type->decompose(102))}}};
|
||||
auto mmut2o = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 102));
|
||||
mutation m2o(s, key);
|
||||
m2o.set_static_cell(column, my_list_type->serialize_mutation_form(mmut2o));
|
||||
mt->apply(m2o);
|
||||
@@ -302,7 +329,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
|
||||
auto insert_row = [&] (int32_t c1, int32_t r1) {
|
||||
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)});
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(r1)));
|
||||
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, r1));
|
||||
cf.apply(std::move(m));
|
||||
return cf.flush();
|
||||
};
|
||||
@@ -479,10 +506,12 @@ SEASTAR_TEST_CASE(test_cell_ordering) {
|
||||
|
||||
auto assert_order = [] (atomic_cell_view first, atomic_cell_view second) {
|
||||
if (compare_atomic_cell_for_merge(first, second) >= 0) {
|
||||
BOOST_FAIL(sprint("Expected %s < %s", first, second));
|
||||
BOOST_TEST_MESSAGE(sprint("Expected %s < %s", first, second));
|
||||
abort();
|
||||
}
|
||||
if (compare_atomic_cell_for_merge(second, first) <= 0) {
|
||||
BOOST_FAIL(sprint("Expected %s < %s", second, first));
|
||||
BOOST_TEST_MESSAGE(sprint("Expected %s < %s", second, first));
|
||||
abort();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -829,7 +858,7 @@ SEASTAR_TEST_CASE(test_mutation_diff) {
|
||||
m1.partition().clustered_row(*s, ckey2).apply(row_marker(3));
|
||||
m1.set_clustered_cell(ckey2, *s->get_column_definition("v2"),
|
||||
atomic_cell::make_live(*bytes_type, 2, bytes_type->decompose(data_value(bytes("v2:value4")))));
|
||||
map_type_impl::mutation mset1 {{}, {{int32_type->decompose(1), make_atomic_cell({})}, {int32_type->decompose(2), make_atomic_cell({})}}};
|
||||
auto mset1 = make_collection_mutation({}, int32_type->decompose(1), make_atomic_cell(), int32_type->decompose(2), make_atomic_cell());
|
||||
m1.set_clustered_cell(ckey2, *s->get_column_definition("v3"),
|
||||
my_set_type->serialize_mutation_form(mset1));
|
||||
|
||||
@@ -843,7 +872,7 @@ SEASTAR_TEST_CASE(test_mutation_diff) {
|
||||
atomic_cell::make_live(*bytes_type, 2, bytes_type->decompose(data_value(bytes("v1:value3")))));
|
||||
m2.set_clustered_cell(ckey2, *s->get_column_definition("v2"),
|
||||
atomic_cell::make_live(*bytes_type, 3, bytes_type->decompose(data_value(bytes("v2:value4a")))));
|
||||
map_type_impl::mutation mset2 {{}, {{int32_type->decompose(1), make_atomic_cell({})}, {int32_type->decompose(3), make_atomic_cell({})}}};
|
||||
auto mset2 = make_collection_mutation({}, int32_type->decompose(1), make_atomic_cell(), int32_type->decompose(3), make_atomic_cell());
|
||||
m2.set_clustered_cell(ckey2, *s->get_column_definition("v3"),
|
||||
my_set_type->serialize_mutation_form(mset2));
|
||||
|
||||
@@ -855,7 +884,7 @@ SEASTAR_TEST_CASE(test_mutation_diff) {
|
||||
atomic_cell::make_live(*bytes_type, 2, bytes_type->decompose(data_value(bytes("v1:value3")))));
|
||||
m3.set_clustered_cell(ckey2, *s->get_column_definition("v2"),
|
||||
atomic_cell::make_live(*bytes_type, 3, bytes_type->decompose(data_value(bytes("v2:value4a")))));
|
||||
map_type_impl::mutation mset3 {{}, {{int32_type->decompose(1), make_atomic_cell({})}}};
|
||||
auto mset3 = make_collection_mutation({}, int32_type->decompose(1), make_atomic_cell());
|
||||
m3.set_clustered_cell(ckey2, *s->get_column_definition("v3"),
|
||||
my_set_type->serialize_mutation_form(mset3));
|
||||
|
||||
@@ -924,7 +953,7 @@ SEASTAR_TEST_CASE(test_large_blobs) {
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
|
||||
|
||||
mutation m(s, key);
|
||||
m.set_static_cell(s1_col, make_atomic_cell(bytes_type->decompose(data_value(blob1))));
|
||||
m.set_static_cell(s1_col, make_atomic_cell(bytes_type, data_value(blob1)));
|
||||
mt->apply(std::move(m));
|
||||
|
||||
auto p = get_partition(*mt, key);
|
||||
@@ -1251,7 +1280,7 @@ SEASTAR_TEST_CASE(test_tombstone_purge) {
|
||||
const column_definition& col = *s->get_column_definition("value");
|
||||
|
||||
mutation m(s, key);
|
||||
m.set_clustered_cell(clustering_key::make_empty(), col, make_atomic_cell(int32_type->decompose(1)));
|
||||
m.set_clustered_cell(clustering_key::make_empty(), col, make_atomic_cell(int32_type, 1));
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now() - std::chrono::seconds(1));
|
||||
m.partition().apply(tomb);
|
||||
BOOST_REQUIRE(!m.partition().empty());
|
||||
@@ -1397,6 +1426,7 @@ SEASTAR_TEST_CASE(test_collection_cell_diff) {
|
||||
{{"p", utf8_type}}, {}, {{"v", list_type_impl::get_instance(bytes_type, true)}}, {}, utf8_type));
|
||||
|
||||
auto& col = s->column_at(column_kind::regular_column, 0);
|
||||
auto& ctype = *static_pointer_cast<const collection_type_impl>(col.type);
|
||||
auto k = dht::global_partitioner().decorate_key(*s, partition_key::from_single_value(*s, to_bytes("key")));
|
||||
mutation m1(s, k);
|
||||
auto uuid = utils::UUID_gen::get_time_UUID_bytes();
|
||||
@@ -1404,12 +1434,12 @@ SEASTAR_TEST_CASE(test_collection_cell_diff) {
|
||||
mcol1.cells.emplace_back(
|
||||
bytes(reinterpret_cast<const int8_t*>(uuid.data()), uuid.size()),
|
||||
atomic_cell::make_live(*bytes_type, api::timestamp_type(1), to_bytes("element")));
|
||||
m1.set_clustered_cell(clustering_key::make_empty(), col, list_type_impl::serialize_mutation_form(mcol1));
|
||||
m1.set_clustered_cell(clustering_key::make_empty(), col, ctype.serialize_mutation_form(mcol1));
|
||||
|
||||
mutation m2(s, k);
|
||||
collection_type_impl::mutation mcol2;
|
||||
mcol2.tomb = tombstone(api::timestamp_type(2), gc_clock::now());
|
||||
m2.set_clustered_cell(clustering_key::make_empty(), col, list_type_impl::serialize_mutation_form(mcol2));
|
||||
m2.set_clustered_cell(clustering_key::make_empty(), col, ctype.serialize_mutation_form(mcol2));
|
||||
|
||||
mutation m12 = m1;
|
||||
m12.apply(m2);
|
||||
|
||||
@@ -791,11 +791,11 @@ SEASTAR_TEST_CASE(datafile_generation_11) {
|
||||
mutation m(s, key);
|
||||
|
||||
tombstone tomb(api::new_timestamp(), gc_clock::now());
|
||||
set_type_impl::mutation set_mut{{ tomb }, {
|
||||
{ to_bytes("1"), make_atomic_cell(bytes_type, {}) },
|
||||
{ to_bytes("2"), make_atomic_cell(bytes_type, {}) },
|
||||
{ to_bytes("3"), make_atomic_cell(bytes_type, {}) }
|
||||
}};
|
||||
set_type_impl::mutation set_mut;
|
||||
set_mut.tomb = tomb;
|
||||
set_mut.cells.emplace_back(to_bytes("1"), make_atomic_cell(bytes_type, {}));
|
||||
set_mut.cells.emplace_back(to_bytes("2"), make_atomic_cell(bytes_type, {}));
|
||||
set_mut.cells.emplace_back(to_bytes("3"), make_atomic_cell(bytes_type, {}));
|
||||
|
||||
auto set_type = static_pointer_cast<const set_type_impl>(set_col.type);
|
||||
m.set_clustered_cell(c_key, set_col, set_type->serialize_mutation_form(set_mut));
|
||||
@@ -805,7 +805,8 @@ SEASTAR_TEST_CASE(datafile_generation_11) {
|
||||
|
||||
auto key2 = partition_key::from_exploded(*s, {to_bytes("key2")});
|
||||
mutation m2(s, key2);
|
||||
set_type_impl::mutation set_mut_single{{}, {{ to_bytes("4"), make_atomic_cell(bytes_type, {}) }}};
|
||||
set_type_impl::mutation set_mut_single;
|
||||
set_mut_single.cells.emplace_back(to_bytes("4"), make_atomic_cell(bytes_type, {}));
|
||||
|
||||
m2.set_clustered_cell(c_key, set_col, set_type->serialize_mutation_form(set_mut_single));
|
||||
|
||||
|
||||
@@ -822,16 +822,16 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic) {
|
||||
mutation m(s, k);
|
||||
|
||||
auto ck = clustering_key::from_exploded(*s, {int32_type->decompose(1), int32_type->decompose(2)});
|
||||
m.set_clustered_cell(ck, *s->get_column_definition("v"), cell);
|
||||
m.set_clustered_cell(ck, *s->get_column_definition("v"), atomic_cell(*int32_type, cell));
|
||||
|
||||
ck = clustering_key::from_exploded(*s, {int32_type->decompose(1), int32_type->decompose(4)});
|
||||
m.set_clustered_cell(ck, *s->get_column_definition("v"), cell);
|
||||
m.set_clustered_cell(ck, *s->get_column_definition("v"), atomic_cell(*int32_type, cell));
|
||||
|
||||
ck = clustering_key::from_exploded(*s, {int32_type->decompose(1), int32_type->decompose(6)});
|
||||
m.set_clustered_cell(ck, *s->get_column_definition("v"), cell);
|
||||
m.set_clustered_cell(ck, *s->get_column_definition("v"), atomic_cell(*int32_type, cell));
|
||||
|
||||
ck = clustering_key::from_exploded(*s, {int32_type->decompose(3), int32_type->decompose(9)});
|
||||
m.set_clustered_cell(ck, *s->get_column_definition("v"), cell);
|
||||
m.set_clustered_cell(ck, *s->get_column_definition("v"), atomic_cell(*int32_type, cell));
|
||||
|
||||
m.partition().apply_row_tombstone(*s, range_tombstone(
|
||||
clustering_key_prefix::from_exploded(*s, {int32_type->decompose(1)}),
|
||||
@@ -874,16 +874,16 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_compound_dense) {
|
||||
mutation m(s, dk);
|
||||
|
||||
auto ck1 = clustering_key::from_exploded(*s, {int32_type->decompose(1), int32_type->decompose(2)});
|
||||
m.set_clustered_cell(ck1, *s->get_column_definition("v"), cell);
|
||||
m.set_clustered_cell(ck1, *s->get_column_definition("v"), atomic_cell(*int32_type, cell));
|
||||
|
||||
auto ck2 = clustering_key::from_exploded(*s, {int32_type->decompose(1), int32_type->decompose(4)});
|
||||
m.set_clustered_cell(ck2, *s->get_column_definition("v"), cell);
|
||||
m.set_clustered_cell(ck2, *s->get_column_definition("v"), atomic_cell(*int32_type, cell));
|
||||
|
||||
auto ck3 = clustering_key::from_exploded(*s, {int32_type->decompose(1), int32_type->decompose(6)});
|
||||
m.set_clustered_cell(ck3, *s->get_column_definition("v"), cell);
|
||||
m.set_clustered_cell(ck3, *s->get_column_definition("v"), atomic_cell(*int32_type, cell));
|
||||
|
||||
auto ck4 = clustering_key::from_exploded(*s, {int32_type->decompose(3), int32_type->decompose(9)});
|
||||
m.set_clustered_cell(ck4, *s->get_column_definition("v"), cell);
|
||||
m.set_clustered_cell(ck4, *s->get_column_definition("v"), atomic_cell(*int32_type, cell));
|
||||
|
||||
m.partition().apply_row_tombstone(*s, range_tombstone(
|
||||
clustering_key_prefix::from_exploded(*s, {int32_type->decompose(1)}),
|
||||
@@ -936,13 +936,13 @@ SEASTAR_TEST_CASE(test_promoted_index_blocks_are_monotonic_non_compound_dense) {
|
||||
mutation m(s, dk);
|
||||
|
||||
auto ck1 = clustering_key::from_exploded(*s, {int32_type->decompose(1)});
|
||||
m.set_clustered_cell(ck1, *s->get_column_definition("v"), cell);
|
||||
m.set_clustered_cell(ck1, *s->get_column_definition("v"), atomic_cell(*int32_type, cell));
|
||||
|
||||
auto ck2 = clustering_key::from_exploded(*s, {int32_type->decompose(2)});
|
||||
m.set_clustered_cell(ck2, *s->get_column_definition("v"), cell);
|
||||
m.set_clustered_cell(ck2, *s->get_column_definition("v"), atomic_cell(*int32_type, cell));
|
||||
|
||||
auto ck3 = clustering_key::from_exploded(*s, {int32_type->decompose(3)});
|
||||
m.set_clustered_cell(ck3, *s->get_column_definition("v"), cell);
|
||||
m.set_clustered_cell(ck3, *s->get_column_definition("v"), atomic_cell(*int32_type, cell));
|
||||
|
||||
m.partition().apply_row_tombstone(*s, range_tombstone(
|
||||
clustering_key_prefix::from_exploded(*s, {int32_type->decompose(1)}),
|
||||
@@ -1005,7 +1005,7 @@ SEASTAR_TEST_CASE(test_promoted_index_repeats_open_tombstones) {
|
||||
{1, gc_clock::now()}));
|
||||
|
||||
auto ck = clustering_key::from_exploded(*s, {bytes_type->decompose(data_value(to_bytes("ck3")))});
|
||||
m.set_clustered_cell(ck, *s->get_column_definition("v"), cell);
|
||||
m.set_clustered_cell(ck, *s->get_column_definition("v"), atomic_cell(*int32_type, cell));
|
||||
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mt->apply(m);
|
||||
@@ -1090,7 +1090,7 @@ SEASTAR_TEST_CASE(test_promoted_index_is_absent_for_schemas_without_clustering_k
|
||||
mutation m(s, dk);
|
||||
for (auto&& v : { 1, 2, 3, 4 }) {
|
||||
auto cell = atomic_cell::make_live(*int32_type, 1, int32_type->decompose(v), { });
|
||||
m.set_clustered_cell(clustering_key_prefix::make_empty(), *s->get_column_definition("v"), cell);
|
||||
m.set_clustered_cell(clustering_key_prefix::make_empty(), *s->get_column_definition("v"), atomic_cell(*int32_type, cell));
|
||||
}
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
mt->apply(m);
|
||||
|
||||
@@ -538,7 +538,7 @@ enum class status {
|
||||
ttl,
|
||||
};
|
||||
|
||||
inline bool check_status_and_done(const atomic_cell &c, status expected) {
|
||||
inline bool check_status_and_done(atomic_cell_view c, status expected) {
|
||||
if (expected == status::dead) {
|
||||
BOOST_REQUIRE(c.is_live() == false);
|
||||
return true;
|
||||
@@ -594,7 +594,7 @@ match_collection(const row& row, const schema& s, bytes col, const tombstone& t)
|
||||
auto ctype = static_pointer_cast<const collection_type_impl>(cdef->type);
|
||||
auto&& mut = ctype->deserialize_mutation_form(c);
|
||||
BOOST_REQUIRE(mut.tomb == t);
|
||||
return mut.materialize();
|
||||
return mut.materialize(*ctype);
|
||||
}
|
||||
|
||||
template <status Status>
|
||||
|
||||
11
types.cc
11
types.cc
@@ -2102,12 +2102,12 @@ collection_type_impl::to_value(collection_mutation_view mut, cql_serialization_f
|
||||
}
|
||||
|
||||
collection_type_impl::mutation
|
||||
collection_type_impl::mutation_view::materialize() const {
|
||||
collection_type_impl::mutation_view::materialize(const collection_type_impl& ctype) const {
|
||||
collection_type_impl::mutation m;
|
||||
m.tomb = tomb;
|
||||
m.cells.reserve(cells.size());
|
||||
for (auto&& e : cells) {
|
||||
m.cells.emplace_back(bytes(e.first.begin(), e.first.end()), e.second);
|
||||
m.cells.emplace_back(bytes(e.first.begin(), e.first.end()), atomic_cell(*ctype.value_comparator(), e.second));
|
||||
}
|
||||
return m;
|
||||
}
|
||||
@@ -2493,7 +2493,8 @@ auto collection_type_impl::deserialize_mutation_form(collection_mutation_view cm
|
||||
auto ksize = read_simple<uint32_t>(in);
|
||||
auto key = read_simple_bytes(in, ksize);
|
||||
auto vsize = read_simple<uint32_t>(in);
|
||||
auto value = atomic_cell_view::from_bytes(read_simple_bytes(in, vsize));
|
||||
// value_comparator(), ugh
|
||||
auto value = atomic_cell_view::from_bytes(value_comparator()->imr_state().type_info(), read_simple_bytes(in, vsize));
|
||||
ret.cells.emplace_back(key, value);
|
||||
}
|
||||
assert(in.empty());
|
||||
@@ -2519,7 +2520,7 @@ bool collection_type_impl::is_any_live(collection_mutation_view cm, tombstone to
|
||||
auto ksize = read_simple<uint32_t>(in);
|
||||
in.remove_prefix(ksize);
|
||||
auto vsize = read_simple<uint32_t>(in);
|
||||
auto value = atomic_cell_view::from_bytes(read_simple_bytes(in, vsize));
|
||||
auto value = atomic_cell_view::from_bytes(value_comparator()->imr_state().type_info(), read_simple_bytes(in, vsize));
|
||||
if (value.is_live(tomb, now, false)) {
|
||||
return true;
|
||||
}
|
||||
@@ -2540,7 +2541,7 @@ api::timestamp_type collection_type_impl::last_update(collection_mutation_view c
|
||||
auto ksize = read_simple<uint32_t>(in);
|
||||
in.remove_prefix(ksize);
|
||||
auto vsize = read_simple<uint32_t>(in);
|
||||
auto value = atomic_cell_view::from_bytes(read_simple_bytes(in, vsize));
|
||||
auto value = atomic_cell_view::from_bytes(value_comparator()->imr_state().type_info(), read_simple_bytes(in, vsize));
|
||||
max = std::max(value.timestamp(), max);
|
||||
}
|
||||
return max;
|
||||
|
||||
2
types.hh
2
types.hh
@@ -799,7 +799,7 @@ public:
|
||||
struct mutation_view {
|
||||
tombstone tomb;
|
||||
std::vector<std::pair<bytes_view, atomic_cell_view>> cells;
|
||||
mutation materialize() const;
|
||||
mutation materialize(const collection_type_impl&) const;
|
||||
};
|
||||
virtual data_type name_comparator() const = 0;
|
||||
virtual data_type value_comparator() const = 0;
|
||||
|
||||
Reference in New Issue
Block a user