Merge 'Coroutinize system_keyspace::get_compaction_history' from Pavel Emelyanov

Closes #13620

* github.com:scylladb/scylladb:
  system_keyspace: Fix indentation after previous patch
  system_keyspace: Coroutinize get_compaction_history()
This commit is contained in:
Botond Dénes
2023-04-24 09:48:01 +03:00
2 changed files with 16 additions and 20 deletions

View File

@@ -3074,25 +3074,21 @@ future<> system_keyspace::update_compaction_history(utils::UUID uuid, sstring ks
});
}
future<> system_keyspace::get_compaction_history(compaction_history_consumer&& f) {
return do_with(compaction_history_consumer(std::move(f)),
[this](compaction_history_consumer& consumer) mutable {
sstring req = format("SELECT * from system.{}", COMPACTION_HISTORY);
return _qp.local().query_internal(req, [&consumer] (const cql3::untyped_result_set::row& row) mutable {
compaction_history_entry entry;
entry.id = row.get_as<utils::UUID>("id");
entry.ks = row.get_as<sstring>("keyspace_name");
entry.cf = row.get_as<sstring>("columnfamily_name");
entry.compacted_at = row.get_as<int64_t>("compacted_at");
entry.bytes_in = row.get_as<int64_t>("bytes_in");
entry.bytes_out = row.get_as<int64_t>("bytes_out");
if (row.has("rows_merged")) {
entry.rows_merged = row.get_map<int32_t, int64_t>("rows_merged");
}
return consumer(std::move(entry)).then([] {
return stop_iteration::no;
});
});
future<> system_keyspace::get_compaction_history(compaction_history_consumer consumer) {
sstring req = format("SELECT * from system.{}", COMPACTION_HISTORY);
co_await _qp.local().query_internal(req, [&consumer] (const cql3::untyped_result_set::row& row) mutable -> future<stop_iteration> {
compaction_history_entry entry;
entry.id = row.get_as<utils::UUID>("id");
entry.ks = row.get_as<sstring>("keyspace_name");
entry.cf = row.get_as<sstring>("columnfamily_name");
entry.compacted_at = row.get_as<int64_t>("compacted_at");
entry.bytes_in = row.get_as<int64_t>("bytes_in");
entry.bytes_out = row.get_as<int64_t>("bytes_out");
if (row.has("rows_merged")) {
entry.rows_merged = row.get_map<int32_t, int64_t>("rows_merged");
}
co_await consumer(std::move(entry));
co_return stop_iteration::no;
});
}

View File

@@ -321,7 +321,7 @@ public:
future<> update_compaction_history(utils::UUID uuid, sstring ksname, sstring cfname, int64_t compacted_at, int64_t bytes_in, int64_t bytes_out,
std::unordered_map<int32_t, int64_t> rows_merged);
using compaction_history_consumer = noncopyable_function<future<>(const compaction_history_entry&)>;
future<> get_compaction_history(compaction_history_consumer&& f);
future<> get_compaction_history(compaction_history_consumer f);
struct repair_history_entry {
tasks::task_id id;