counters: implement transforming counter deltas to shards

The leader receives counter updates as deltas which have to be
transformed to counter shards. In order to do that, current local shard
of the modified counter cell needs to be read, logical clock incremented
and the value modified by the specified delta.
This commit is contained in:
Paweł Dziepak
2017-01-30 17:17:32 +00:00
parent 55277b3182
commit 8d889082bf
2 changed files with 100 additions and 0 deletions

View File

@@ -23,6 +23,7 @@
#include "counters.hh"
#include "mutation.hh"
#include "combine.hh"
counter_id counter_id::local()
{
return counter_id(service::get_local_storage_service().get_local_id());
@@ -133,3 +134,95 @@ stdx::optional<atomic_cell> counter_cell_view::difference(atomic_cell_view a, at
}
return diff;
}
void transform_counter_updates_to_shards(mutation& m, const mutation* current_state, uint64_t clock_offset) {
// FIXME: allow current_state to be frozen_mutation
auto transform_new_row_to_shards = [clock_offset] (auto& cr) {
cr.row().cells().for_each_cell([clock_offset] (auto, atomic_cell_or_collection& ac_o_c) {
auto acv = ac_o_c.as_atomic_cell();
if (!acv.is_live()) {
return; // continue -- we are in lambda
}
auto delta = value_cast<int64_t>(long_type->deserialize_value(acv.value()));
counter_cell_builder ccb;
ccb.add_shard(counter_shard(counter_id::local(), delta, clock_offset + 1));
ac_o_c = ccb.build(acv.timestamp());
});
};
if (!current_state) {
for (auto& cr : m.partition().clustered_rows()) {
transform_new_row_to_shards(cr);
}
return;
}
clustering_key::less_compare cmp(*m.schema());
auto& cstate = current_state->partition();
auto it = cstate.clustered_rows().begin();
auto end = cstate.clustered_rows().end();
for (auto& cr : m.partition().clustered_rows()) {
while (it != end && cmp(it->key(), cr.key())) {
++it;
}
if (it == end || cmp(cr.key(), it->key())) {
transform_new_row_to_shards(cr);
continue;
}
struct counter_shard_or_tombstone {
stdx::optional<counter_shard> shard;
tombstone tomb;
};
std::deque<std::pair<column_id, counter_shard_or_tombstone>> shards;
it->row().cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& ac_o_c) {
auto acv = ac_o_c.as_atomic_cell();
if (!acv.is_live()) {
counter_shard_or_tombstone cs_o_t { { },
tombstone(acv.timestamp(), acv.deletion_time()) };
shards.emplace_back(std::make_pair(id, cs_o_t));
return; // continue -- we are in lambda
}
counter_cell_view ccv(acv);
auto cs = ccv.local_shard();
if (!cs) {
return; // continue
}
shards.emplace_back(std::make_pair(id, counter_shard_or_tombstone { counter_shard(*cs), tombstone() }));
});
cr.row().cells().for_each_cell([&] (column_id id, atomic_cell_or_collection& ac_o_c) {
auto acv = ac_o_c.as_atomic_cell();
if (!acv.is_live()) {
return; // continue -- we are in lambda
}
while (!shards.empty() && shards.front().first < id) {
shards.pop_front();
}
auto delta = value_cast<int64_t>(long_type->deserialize_value(acv.value()));
counter_cell_builder ccb;
if (shards.empty() || shards.front().first > id) {
ccb.add_shard(counter_shard(counter_id::local(), delta, clock_offset + 1));
} else if (shards.front().second.tomb.timestamp == api::missing_timestamp) {
auto& cs = *shards.front().second.shard;
cs.update(delta, clock_offset + 1);
ccb.add_shard(cs);
shards.pop_front();
} else {
// We are apply the tombstone that's already there second time.
// It is not necessary but there is no easy way to remove cell
// from a mutation.
tombstone t = shards.front().second.tomb;
ac_o_c = atomic_cell::make_dead(t.timestamp, t.deletion_time);
shards.pop_front();
return; // continue -- we are in lambda
}
ac_o_c = ccb.build(acv.timestamp());
});
}
}

View File

@@ -30,6 +30,8 @@
class mutation;
class mutation;
class counter_id {
int64_t _least_significant;
int64_t _most_significant;
@@ -299,6 +301,11 @@ public:
friend std::ostream& operator<<(std::ostream& os, counter_cell_view ccv);
};
// Transforms mutation dst from counter updates to counter shards using state
// stored in current_state.
// If current_state is present it has to be in the same schema as dst.
void transform_counter_updates_to_shards(mutation& dst, const mutation* current_state, uint64_t clock_offset);
template<>
struct appending_hash<counter_shard_view> {
template<typename Hasher>