counters: drop revertability of apply()

Since 4cfcd8055e 'Merge "Drop reversible
apply() from mutation_partition" from Tomasz' it is no longer required
for apply() to be revertable.
This commit is contained in:
Paweł Dziepak
2018-02-20 11:45:15 +00:00
parent f7438a8b96
commit a2b5779714
5 changed files with 20 additions and 102 deletions

View File

@@ -58,7 +58,6 @@ private:
static constexpr int8_t LIVE_FLAG = 0x01;
static constexpr int8_t EXPIRY_FLAG = 0x02; // When present, expiry field is present. Set only for live cells
static constexpr int8_t COUNTER_UPDATE_FLAG = 0x08; // Cell is a counter update.
static constexpr int8_t COUNTER_IN_PLACE_REVERT = 0x10;
static constexpr unsigned flags_size = 1;
static constexpr unsigned timestamp_offset = flags_size;
static constexpr unsigned timestamp_size = 8;
@@ -73,13 +72,6 @@ private:
static bool is_counter_update(bytes_view cell) {
return cell[0] & COUNTER_UPDATE_FLAG;
}
static bool is_counter_in_place_revert_set(bytes_view cell) {
return cell[0] & COUNTER_IN_PLACE_REVERT;
}
template<typename BytesContainer>
static void set_counter_in_place_revert(BytesContainer& cell, bool flag) {
cell[0] = (cell[0] & ~COUNTER_IN_PLACE_REVERT) | (flag * COUNTER_IN_PLACE_REVERT);
}
static bool is_live(const bytes_view& cell) {
return cell[0] & LIVE_FLAG;
}
@@ -208,9 +200,6 @@ public:
bool is_counter_update() const {
return atomic_cell_type::is_counter_update(_data);
}
bool is_counter_in_place_revert_set() const {
return atomic_cell_type::is_counter_in_place_revert_set(_data);
}
bool is_live() const {
return atomic_cell_type::is_live(_data);
}
@@ -263,9 +252,6 @@ public:
bytes_view serialize() const {
return _data;
}
void set_counter_in_place_revert(bool flag) {
atomic_cell_type::set_counter_in_place_revert(_data, flag);
}
};
class atomic_cell_view final : public atomic_cell_base<bytes_view> {

View File

@@ -118,38 +118,10 @@ static bool apply_in_place(atomic_cell_or_collection& dst, atomic_cell_or_collec
auto src_ts = src_ccmv.timestamp();
dst_ccmv.set_timestamp(std::max(dst_ts, src_ts));
src_ccmv.set_timestamp(dst_ts);
src.as_mutable_atomic_cell().set_counter_in_place_revert(true);
return true;
}
static void revert_in_place_apply(atomic_cell_or_collection& dst, atomic_cell_or_collection& src)
{
assert(dst.can_use_mutable_view() && src.can_use_mutable_view());
auto dst_ccmv = counter_cell_mutable_view(dst.as_mutable_atomic_cell());
auto src_ccmv = counter_cell_mutable_view(src.as_mutable_atomic_cell());
auto dst_shards = dst_ccmv.shards();
auto src_shards = src_ccmv.shards();
auto dst_it = dst_shards.begin();
auto src_it = src_shards.begin();
while (src_it != src_shards.end()) {
while (dst_it != dst_shards.end() && dst_it->id() < src_it->id()) {
++dst_it;
}
assert(dst_it != dst_shards.end() && dst_it->id() == src_it->id());
dst_it->swap_value_and_clock(*src_it);
++src_it;
}
auto dst_ts = dst_ccmv.timestamp();
auto src_ts = src_ccmv.timestamp();
dst_ccmv.set_timestamp(src_ts);
src_ccmv.set_timestamp(dst_ts);
src.as_mutable_atomic_cell().set_counter_in_place_revert(false);
}
bool counter_cell_view::apply_reversibly(atomic_cell_or_collection& dst, atomic_cell_or_collection& src)
void counter_cell_view::apply(atomic_cell_or_collection& dst, atomic_cell_or_collection& src)
{
auto dst_ac = dst.as_atomic_cell();
auto src_ac = src.as_atomic_cell();
@@ -157,9 +129,8 @@ bool counter_cell_view::apply_reversibly(atomic_cell_or_collection& dst, atomic_
if (!dst_ac.is_live() || !src_ac.is_live()) {
if (dst_ac.is_live() || (!src_ac.is_live() && compare_atomic_cell_for_merge(dst_ac, src_ac) < 0)) {
std::swap(dst, src);
return true;
}
return false;
return;
}
if (dst_ac.is_counter_update() && src_ac.is_counter_update()) {
@@ -167,7 +138,7 @@ bool counter_cell_view::apply_reversibly(atomic_cell_or_collection& dst, atomic_
auto dst_v = dst_ac.counter_update_value();
dst = atomic_cell::make_live_counter_update(std::max(dst_ac.timestamp(), src_ac.timestamp()),
src_v + dst_v);
return true;
return;
}
assert(!dst_ac.is_counter_update());
@@ -176,11 +147,10 @@ bool counter_cell_view::apply_reversibly(atomic_cell_or_collection& dst, atomic_
if (counter_cell_view(dst_ac).shard_count() >= counter_cell_view(src_ac).shard_count()
&& dst.can_use_mutable_view() && src.can_use_mutable_view()) {
if (apply_in_place(dst, src)) {
return true;
return;
}
}
src.as_mutable_atomic_cell().set_counter_in_place_revert(false);
auto dst_shards = counter_cell_view(dst_ac).shards();
auto src_shards = counter_cell_view(src_ac).shards();
@@ -192,21 +162,6 @@ bool counter_cell_view::apply_reversibly(atomic_cell_or_collection& dst, atomic_
auto cell = result.build(std::max(dst_ac.timestamp(), src_ac.timestamp()));
src = std::exchange(dst, atomic_cell_or_collection(cell));
return true;
}
void counter_cell_view::revert_apply(atomic_cell_or_collection& dst, atomic_cell_or_collection& src)
{
if (dst.as_atomic_cell().is_counter_update()) {
auto src_v = src.as_atomic_cell().counter_update_value();
auto dst_v = dst.as_atomic_cell().counter_update_value();
dst = atomic_cell::make_live(dst.as_atomic_cell().timestamp(),
long_type->decompose(dst_v - src_v));
} else if (src.as_atomic_cell().is_counter_in_place_revert_set()) {
revert_in_place_apply(dst, src);
} else {
std::swap(dst, src);
}
}
stdx::optional<atomic_cell> counter_cell_view::difference(atomic_cell_view a, atomic_cell_view b)

View File

@@ -388,11 +388,7 @@ struct counter_cell_view : basic_counter_cell_view<bytes_view> {
std::vector<counter_shard> shards_compatible_with_1_7_4() const;
// Reversibly applies two counter cells, at least one of them must be live.
// Returns true iff dst was modified.
static bool apply_reversibly(atomic_cell_or_collection& dst, atomic_cell_or_collection& src);
// Reverts apply performed by apply_reversible().
static void revert_apply(atomic_cell_or_collection& dst, atomic_cell_or_collection& src);
static void apply(atomic_cell_or_collection& dst, atomic_cell_or_collection& src);
// Computes a counter cell containing minimal amount of data which, when
// applied to 'b' returns the same cell as 'a' and 'b' applied together.

View File

@@ -1036,7 +1036,7 @@ apply_monotonically(const column_definition& def, cell_and_hash& dst,
// provided via an upper layer
if (def.is_atomic()) {
if (def.is_counter()) {
counter_cell_view::apply_reversibly(dst.cell, src); // FIXME: Optimize
counter_cell_view::apply(dst.cell, src); // FIXME: Optimize
dst.hash = { };
} else if (compare_atomic_cell_for_merge(dst.cell.as_atomic_cell(), src.as_atomic_cell()) < 0) {
std::swap(dst.cell, src);

View File

@@ -79,40 +79,21 @@ SEASTAR_TEST_CASE(test_counter_cell) {
BOOST_REQUIRE_EQUAL(cv.total_value(), 8);
verify_shard_order(cv);
counter_cell_view::apply_reversibly(c1, c2);
counter_cell_view::apply(c1, c2);
cv = counter_cell_view(c1.as_atomic_cell());
BOOST_REQUIRE_EQUAL(cv.total_value(), 4);
verify_shard_order(cv);
});
}
SEASTAR_TEST_CASE(test_reversability_of_apply) {
SEASTAR_TEST_CASE(test_apply) {
return seastar::async([] {
auto verify_applies_reversibly = [] (atomic_cell_or_collection dst, atomic_cell_or_collection src, int64_t value) {
auto original_dst = dst;
auto applied = counter_cell_view::apply_reversibly(dst, src);
auto applied_dst = dst;
auto verify_apply = [] (atomic_cell_or_collection dst, atomic_cell_or_collection src, int64_t value) {
counter_cell_view::apply(dst, src);
auto cv = counter_cell_view(dst.as_atomic_cell());
BOOST_REQUIRE_EQUAL(cv.total_value(), value);
BOOST_REQUIRE_EQUAL(cv.timestamp(), std::max(dst.as_atomic_cell().timestamp(), src.as_atomic_cell().timestamp()));
if (applied) {
counter_cell_view::revert_apply(dst, src);
}
BOOST_REQUIRE_EQUAL(counter_cell_view(dst.as_atomic_cell()),
counter_cell_view(original_dst.as_atomic_cell()));
applied = counter_cell_view::apply_reversibly(dst, src);
BOOST_REQUIRE_EQUAL(counter_cell_view(dst.as_atomic_cell()),
counter_cell_view(applied_dst.as_atomic_cell()));
if (applied) {
counter_cell_view::revert_apply(dst, src);
}
BOOST_REQUIRE_EQUAL(counter_cell_view(dst.as_atomic_cell()),
counter_cell_view(original_dst.as_atomic_cell()));
};
auto id = generate_ids(5);
@@ -124,21 +105,21 @@ SEASTAR_TEST_CASE(test_reversability_of_apply) {
auto c2 = counter_cell_builder::from_single_shard(2, counter_shard(id[2], 8, 3));
verify_applies_reversibly(c1, c2, 12);
verify_applies_reversibly(c2, c1, 12);
verify_apply(c1, c2, 12);
verify_apply(c2, c1, 12);
counter_cell_builder b2;
b2.add_shard(counter_shard(id[1], 4, 5));
b2.add_shard(counter_shard(id[3], 5, 4));
auto c3 = atomic_cell_or_collection(b2.build(2));
verify_applies_reversibly(c1, c3, 15);
verify_applies_reversibly(c3, c1, 15);
verify_apply(c1, c3, 15);
verify_apply(c3, c1, 15);
auto c4 = counter_cell_builder::from_single_shard(0, counter_shard(id[2], 8, 1));
verify_applies_reversibly(c1, c4, 6);
verify_applies_reversibly(c4, c1, 6);
verify_apply(c1, c4, 6);
verify_apply(c4, c1, 6);
counter_cell_builder b3;
b3.add_shard(counter_shard(id[0], 9, 0));
@@ -146,13 +127,13 @@ SEASTAR_TEST_CASE(test_reversability_of_apply) {
b3.add_shard(counter_shard(id[3], 5, 4));
auto c5 = atomic_cell_or_collection(b3.build(2));
verify_applies_reversibly(c1, c5, 21);
verify_applies_reversibly(c5, c1, 21);
verify_apply(c1, c5, 21);
verify_apply(c5, c1, 21);
auto c6 = counter_cell_builder::from_single_shard(3, counter_shard(id[2], 8, 1));
verify_applies_reversibly(c1, c6, 6);
verify_applies_reversibly(c6, c1, 6);
verify_apply(c1, c6, 6);
verify_apply(c6, c1, 6);
});
}