streaming: Add get_progress_on_all_shards for plan_id

Get stream_bytes for a specific plan_id.
This commit is contained in:
Asias He
2016-02-25 16:24:45 +08:00
parent 97558b2cfe
commit 9dede89e07
2 changed files with 22 additions and 0 deletions

View File

@@ -141,6 +141,14 @@ stream_bytes stream_manager::get_progress(UUID plan_id, gms::inet_address peer)
return sbytes[peer];
}
stream_bytes stream_manager::get_progress(UUID plan_id) {
stream_bytes ret;
for (auto& x : _stream_bytes[plan_id]) {
ret += x.second;
}
return ret;
}
future<> stream_manager::remove_progress_on_all_shards(UUID plan_id) {
return get_stream_manager().invoke_on_all([plan_id] (auto& sm) {
sm.remove_progress(plan_id);
@@ -157,6 +165,16 @@ future<stream_bytes> stream_manager::get_progress_on_all_shards(UUID plan_id, gm
);
}
future<stream_bytes> stream_manager::get_progress_on_all_shards(UUID plan_id) {
return get_stream_manager().map_reduce0(
[plan_id] (auto& sm) {
return sm.get_progress(plan_id);
},
stream_bytes(),
std::plus<stream_bytes>()
);
}
future<stream_bytes> stream_manager::get_progress_on_all_shards(gms::inet_address peer) {
return get_stream_manager().map_reduce0(
[peer] (auto& sm) {

View File

@@ -122,10 +122,14 @@ public:
stream_bytes get_progress(UUID plan_id, gms::inet_address peer);
stream_bytes get_progress(UUID plan_id);
future<> remove_progress_on_all_shards(UUID plan_id);
future<stream_bytes> get_progress_on_all_shards(UUID plan_id, gms::inet_address peer);
future<stream_bytes> get_progress_on_all_shards(UUID plan_id);
future<stream_bytes> get_progress_on_all_shards(gms::inet_address peer);
future<stream_bytes> get_progress_on_all_shards();