Merge 'commitlog allocation/deletion/flush request rate counters + footprint projection' from Calle Wilund
Adds measuring the apparent delta vector of footprint added/removed within the timer time slice, and potentially include this (if influx is greater than data removed) in threshold calculation. The idea is to anticipate crossing usage threshold within a time slice, so request a flush slightly earlier, hoping this will give all involved more time to do their disk work. Obviously, this is very akin to just adjusting the threshold downwards, but the slight difference is that we take actual transaction rate vs. segment free rate into account, not just static footprint. Note: this is a very simplistic version of this anticipation scheme, we just use the "raw" delta for the timer slice. A more sophisiticated approach would perhaps do either a lowpass filtered rate (adjust over longer time), or a regression or whatnot. But again, the default persiod of 10s is something of an eternity, so maybe that is superfluous... Closes #10651 * github.com:scylladb/scylla: commitlog: Add (internal) measurement of byte rates add/release/flush-req commitlog: Add counters for # bytes released/flush requested commitlog: Keep track of last flush high position to avoid double request commitlog: Fix counter descriptor language
This commit is contained in:
@@ -325,11 +325,47 @@ public:
|
||||
requires std::derived_from<T, db::commitlog::entry_writer> && std::same_as<R, decltype(std::declval<T>().result())>
|
||||
future<R> allocate_when_possible(T writer, db::timeout_clock::time_point timeout);
|
||||
|
||||
struct stats {
|
||||
template<typename T>
|
||||
struct byte_flow {
|
||||
T bytes_written = 0;
|
||||
T bytes_released = 0;
|
||||
T bytes_flush_requested = 0;
|
||||
|
||||
byte_flow operator+(const byte_flow& rhs) const {
|
||||
return byte_flow{
|
||||
.bytes_written = bytes_written + rhs.bytes_written,
|
||||
.bytes_released = bytes_released + rhs.bytes_released,
|
||||
.bytes_flush_requested = bytes_flush_requested + rhs.bytes_flush_requested,
|
||||
};
|
||||
}
|
||||
byte_flow operator-(const byte_flow& rhs) const {
|
||||
return byte_flow{
|
||||
.bytes_written = bytes_written - rhs.bytes_written,
|
||||
.bytes_released = bytes_released - rhs.bytes_released,
|
||||
.bytes_flush_requested = bytes_flush_requested - rhs.bytes_flush_requested,
|
||||
};
|
||||
}
|
||||
byte_flow<double> operator/(double d) const {
|
||||
return byte_flow<double>{
|
||||
.bytes_written = bytes_written / d,
|
||||
.bytes_released = bytes_released / d,
|
||||
.bytes_flush_requested = bytes_flush_requested / d,
|
||||
};
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const byte_flow& r) {
|
||||
return os << std::fixed
|
||||
<< "[written=" << r.bytes_written
|
||||
<< ", released=" << r.bytes_released
|
||||
<< ", flush_req=" << r.bytes_flush_requested
|
||||
<< "]";
|
||||
}
|
||||
};
|
||||
|
||||
struct stats : public byte_flow<uint64_t> {
|
||||
uint64_t cycle_count = 0;
|
||||
uint64_t flush_count = 0;
|
||||
uint64_t allocation_count = 0;
|
||||
uint64_t bytes_written = 0;
|
||||
uint64_t bytes_slack = 0;
|
||||
uint64_t segments_created = 0;
|
||||
uint64_t segments_destroyed = 0;
|
||||
@@ -360,6 +396,10 @@ public:
|
||||
};
|
||||
|
||||
stats totals;
|
||||
byte_flow<uint64_t> last_bytes;
|
||||
byte_flow<double> bytes_rate;
|
||||
|
||||
typename std::chrono::high_resolution_clock::time_point last_time;
|
||||
|
||||
size_t pending_allocations() const {
|
||||
return _request_controller.waiters();
|
||||
@@ -713,6 +753,7 @@ public:
|
||||
clogger.debug("Segment {} is no longer active and will submitted for delete now", *this);
|
||||
++_segment_manager->totals.segments_destroyed;
|
||||
_segment_manager->totals.active_size_on_disk -= file_position();
|
||||
_segment_manager->totals.bytes_released += file_position();
|
||||
_segment_manager->totals.wasted_size_on_disk -= _waste;
|
||||
mode = dispose_mode::Delete;
|
||||
} else if (_segment_manager->cfg.warn_about_segments_left_on_disk_after_shutdown) {
|
||||
@@ -1463,49 +1504,55 @@ void db::commitlog::segment_manager::create_counters(const sstring& metrics_cate
|
||||
"A non-zero value indicates that the disk write path became temporary slow.")),
|
||||
|
||||
sm::make_counter("alloc", totals.allocation_count,
|
||||
sm::description("Counts a number of times a new mutation has been added to a segment. "
|
||||
sm::description("Counts number of times a new mutation has been added to a segment. "
|
||||
"Divide bytes_written by this value to get the average number of bytes per mutation written to the disk.")),
|
||||
|
||||
sm::make_counter("cycle", totals.cycle_count,
|
||||
sm::description("Counts a number of commitlog write cycles - when the data is written from the internal memory buffer to the disk.")),
|
||||
sm::description("Counts number of commitlog write cycles - when the data is written from the internal memory buffer to the disk.")),
|
||||
|
||||
sm::make_counter("flush", totals.flush_count,
|
||||
sm::description("Counts a number of times the flush() method was called for a file.")),
|
||||
sm::description("Counts number of times the flush() method was called for a file.")),
|
||||
|
||||
sm::make_counter("bytes_written", totals.bytes_written,
|
||||
sm::description("Counts a number of bytes written to the disk. "
|
||||
sm::description("Counts number of bytes written to the disk. "
|
||||
"Divide this value by \"alloc\" to get the average number of bytes per mutation written to the disk.")),
|
||||
|
||||
sm::make_counter("bytes_released", totals.bytes_released,
|
||||
sm::description("Counts number of bytes released from disk. (Deleted/recycled)")),
|
||||
|
||||
sm::make_counter("bytes_flush_requested", totals.bytes_flush_requested,
|
||||
sm::description("Counts number of bytes requested to be flushed (persisted).")),
|
||||
|
||||
sm::make_counter("slack", totals.bytes_slack,
|
||||
sm::description("Counts a number of unused bytes written to the disk due to disk segment alignment.")),
|
||||
sm::description("Counts number of unused bytes written to the disk due to disk segment alignment.")),
|
||||
|
||||
sm::make_gauge("pending_flushes", totals.pending_flushes,
|
||||
sm::description("Holds a number of currently pending flushes. See the related flush_limit_exceeded metric.")),
|
||||
sm::description("Holds number of currently pending flushes. See the related flush_limit_exceeded metric.")),
|
||||
|
||||
sm::make_gauge("pending_allocations", [this] { return pending_allocations(); },
|
||||
sm::description("Holds a number of currently pending allocations. "
|
||||
sm::description("Holds number of currently pending allocations. "
|
||||
"A non-zero value indicates that we have a bottleneck in the disk write flow.")),
|
||||
|
||||
sm::make_counter("requests_blocked_memory", totals.requests_blocked_memory,
|
||||
sm::description("Counts a number of requests blocked due to memory pressure. "
|
||||
sm::description("Counts number of requests blocked due to memory pressure. "
|
||||
"A non-zero value indicates that the commitlog memory quota is not enough to serve the required amount of requests.")),
|
||||
|
||||
sm::make_counter("flush_limit_exceeded", totals.flush_limit_exceeded,
|
||||
sm::description(
|
||||
seastar::format("Counts a number of times a flush limit was exceeded. "
|
||||
seastar::format("Counts number of times a flush limit was exceeded. "
|
||||
"A non-zero value indicates that there are too many pending flush operations (see pending_flushes) and some of "
|
||||
"them will be blocked till the total amount of pending flush operations drops below {}.", cfg.max_active_flushes))),
|
||||
|
||||
sm::make_gauge("disk_total_bytes", totals.total_size_on_disk,
|
||||
sm::description("Holds a size of disk space in bytes reserved for data so far. "
|
||||
sm::description("Holds size of disk space in bytes reserved for data so far. "
|
||||
"A too high value indicates that we have some bottleneck in the writing to sstables path.")),
|
||||
|
||||
sm::make_gauge("disk_active_bytes", totals.active_size_on_disk,
|
||||
sm::description("Holds a size of disk space in bytes used for data so far. "
|
||||
sm::description("Holds size of disk space in bytes used for data so far. "
|
||||
"A too high value indicates that we have some bottleneck in the writing to sstables path.")),
|
||||
|
||||
sm::make_gauge("disk_slack_end_bytes", totals.wasted_size_on_disk,
|
||||
sm::description("Holds a size of disk space in bytes unused because of segment switching (end slack). "
|
||||
sm::description("Holds size of disk space in bytes unused because of segment switching (end slack). "
|
||||
"A too high value indicates that we do not write enough data to each segment.")),
|
||||
|
||||
sm::make_gauge("memory_buffer_bytes", totals.buffer_list_bytes,
|
||||
@@ -1536,28 +1583,45 @@ void db::commitlog::segment_manager::flush_segments(uint64_t size_to_remove) {
|
||||
high = replay_position(high.id + 1, 0);
|
||||
}
|
||||
|
||||
auto n = size_to_remove;
|
||||
// Now get a set of used CF ids:
|
||||
std::unordered_set<cf_id_type> ids;
|
||||
|
||||
if (size_to_remove != 0) {
|
||||
for (auto& s : _segments) {
|
||||
if (n <= s->size_on_disk()) {
|
||||
high = replay_position(s->_desc.id, db::position_type(s->size_on_disk()));
|
||||
uint64_t n = size_to_remove;
|
||||
uint64_t flushing = 0;
|
||||
|
||||
for (auto& s : _segments) {
|
||||
// if a segment is allocating, it should be included in flush request,
|
||||
// because we cannot free anything there anyway. If a segment is allocating,
|
||||
// it is the last one, so just break.
|
||||
if (s->is_still_allocating()) {
|
||||
break;
|
||||
}
|
||||
|
||||
auto rp = replay_position(s->_desc.id, db::position_type(s->size_on_disk()));
|
||||
if (rp <= _flush_position) {
|
||||
// already requested.
|
||||
continue;
|
||||
}
|
||||
|
||||
auto size = s->size_on_disk();
|
||||
auto waste = s->_waste;
|
||||
|
||||
flushing += size - waste;
|
||||
|
||||
for (auto& id : s->_cf_dirty | boost::adaptors::map_keys) {
|
||||
ids.insert(id);
|
||||
}
|
||||
|
||||
if (size_to_remove != 0) {
|
||||
if (n <= size) {
|
||||
high = rp;
|
||||
break;
|
||||
}
|
||||
n -= s->size_on_disk();
|
||||
}
|
||||
}
|
||||
|
||||
// Now get a set of used CF ids:
|
||||
std::unordered_set<cf_id_type> ids;
|
||||
auto e = std::find_if(_segments.begin(), _segments.end(), std::mem_fn(&segment::is_still_allocating));
|
||||
std::for_each(_segments.begin(), e, [&ids](sseg_ptr& s) {
|
||||
for (auto& id : s->_cf_dirty | boost::adaptors::map_keys) {
|
||||
ids.insert(id);
|
||||
}
|
||||
});
|
||||
|
||||
clogger.debug("Flushing ({} MB) to {}", size_to_remove/(1024*1024), high);
|
||||
clogger.debug("Flushing ({} MB) to {}", flushing/(1024*1024), high);
|
||||
|
||||
// For each CF id: for each callback c: call c(id, high)
|
||||
for (auto& f : callbacks) {
|
||||
@@ -1569,6 +1633,9 @@ void db::commitlog::segment_manager::flush_segments(uint64_t size_to_remove) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_flush_position = high;
|
||||
totals.bytes_flush_requested += flushing;
|
||||
}
|
||||
|
||||
future<db::commitlog::segment_manager::sseg_ptr> db::commitlog::segment_manager::allocate_segment_ex(descriptor d, named_file f, open_flags flags) {
|
||||
@@ -2133,16 +2200,46 @@ void db::commitlog::segment_manager::on_timer() {
|
||||
if (cfg.mode != sync_mode::BATCH) {
|
||||
sync();
|
||||
}
|
||||
|
||||
byte_flow<uint64_t> curr = totals;
|
||||
auto diff = curr - std::exchange(last_bytes, curr);
|
||||
auto now = std::chrono::high_resolution_clock::now();
|
||||
auto seconds = std::chrono::duration_cast<std::chrono::duration<double>>(now - last_time).count();
|
||||
auto rate = diff / seconds;
|
||||
|
||||
// not using yet. but should maybe, adjust for time windows etc.
|
||||
// for now, use simple "timer frequency" based diffs (i.e. rate per 10s)
|
||||
// to try to predict where disk foot print will be by the next timer call.
|
||||
bytes_rate = rate;
|
||||
last_time = now;
|
||||
|
||||
clogger.debug("Rate: {} / s ({} s)", rate, seconds);
|
||||
|
||||
// IFF a new segment was put in use since last we checked, and we're
|
||||
// above threshold, request flush.
|
||||
if (_new_counter > 0) {
|
||||
auto max = disk_usage_threshold;
|
||||
auto cur = totals.active_size_on_disk + totals.wasted_size_on_disk;
|
||||
uint64_t extra = 0;
|
||||
|
||||
if (max != 0 && cur >= max) {
|
||||
clogger.debug("Used size on disk {} MB exceeds local threshold {} MB", cur / (1024 * 1024), max / (1024 * 1024));
|
||||
// TODO: better heuristics? Do a semi-pessimistic approach, guess that half
|
||||
// of flush request will manage to finish by next lap, so count it as half.
|
||||
auto returned = diff.bytes_released + diff.bytes_flush_requested/2;
|
||||
if (diff.bytes_written > returned) {
|
||||
// we are guessing we are gonna add at least this.
|
||||
extra = (diff.bytes_written - returned);
|
||||
}
|
||||
|
||||
// do not just measure current footprint, but maybe include expected
|
||||
// footprint that will be added.
|
||||
if (max != 0 && (cur + extra) >= max) {
|
||||
clogger.debug("Used size on disk {} MB ({} MB projected) exceeds local threshold {} MB"
|
||||
, (cur) / (1024 * 1024)
|
||||
, (cur+extra) / (1024 * 1024)
|
||||
, max / (1024 * 1024)
|
||||
);
|
||||
_new_counter = 0;
|
||||
flush_segments(cur - max);
|
||||
flush_segments(cur + extra - max);
|
||||
}
|
||||
}
|
||||
return do_pending_deletes();
|
||||
|
||||
Reference in New Issue
Block a user