Speculate epoll results
In many cases, we can guess the result of an epoll_wait() before it happens: - if a read() or write() consumes the entire buffer, a following call will likely succeed (and if it doesn't, it likely won't) - after an accept() completes, a write() will likely succeed Speculatively add these events to events_known; if we mispredict and fail with EAGAIN, all we need to do is retry.
This commit is contained in:
63
reactor.hh
63
reactor.hh
@@ -467,10 +467,16 @@ extern reactor the_reactor;
|
||||
|
||||
class pollable_fd_state {
|
||||
public:
|
||||
struct speculation {
|
||||
int events = 0;
|
||||
explicit speculation(int epoll_events_guessed = 0) : events(epoll_events_guessed) {}
|
||||
};
|
||||
~pollable_fd_state() { the_reactor.forget(*this); ::close(fd); }
|
||||
explicit pollable_fd_state(int fd) : fd(fd) {}
|
||||
explicit pollable_fd_state(int fd, speculation speculate = speculation())
|
||||
: fd(fd), events_known(speculate.events) {}
|
||||
pollable_fd_state(const pollable_fd_state&) = delete;
|
||||
void operator=(const pollable_fd_state&) = delete;
|
||||
void speculate_epoll(int events) { events_known |= events; }
|
||||
int fd;
|
||||
int events_requested = 0; // wanted by pollin/pollout promises
|
||||
int events_epoll = 0; // installed in epoll
|
||||
@@ -482,8 +488,11 @@ public:
|
||||
};
|
||||
|
||||
class pollable_fd {
|
||||
public:
|
||||
using speculation = pollable_fd_state::speculation;
|
||||
std::unique_ptr<pollable_fd_state> _s;
|
||||
pollable_fd(int fd) : _s(std::make_unique<pollable_fd_state>(fd)) {}
|
||||
pollable_fd(int fd, speculation speculate = speculation())
|
||||
: _s(std::make_unique<pollable_fd_state>(fd, speculate)) {}
|
||||
public:
|
||||
pollable_fd(pollable_fd&&) = default;
|
||||
pollable_fd& operator=(pollable_fd&&) = default;
|
||||
@@ -597,6 +606,15 @@ public:
|
||||
private:
|
||||
};
|
||||
|
||||
inline
|
||||
size_t iovec_len(const std::vector<iovec>& iov)
|
||||
{
|
||||
size_t ret = 0;
|
||||
for (auto&& e : iov) {
|
||||
ret += e.iov_len;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
inline
|
||||
future<accept_result>
|
||||
@@ -608,7 +626,7 @@ reactor::accept(pollable_fd_state& listenfd) {
|
||||
socklen_t sl = sizeof(&sa.u.sas);
|
||||
int fd = ::accept4(lfd, &sa.u.sa, &sl, SOCK_NONBLOCK | SOCK_CLOEXEC);
|
||||
assert(fd != -1);
|
||||
pr.set_value(accept_result{pollable_fd(fd), sa});
|
||||
pr.set_value(accept_result{pollable_fd(fd, pollable_fd::speculation(EPOLLOUT)), sa});
|
||||
});
|
||||
return fut;
|
||||
}
|
||||
@@ -618,9 +636,18 @@ future<size_t>
|
||||
reactor::read_some(pollable_fd_state& fd, void* buffer, size_t len) {
|
||||
promise<size_t> pr;
|
||||
auto fut = pr.get_future();
|
||||
readable(fd).then([pr = std::move(pr), rfd = fd.fd, buffer, len] () mutable {
|
||||
ssize_t r = ::recv(rfd, buffer, len, 0);
|
||||
readable(fd).then([this, pr = std::move(pr), &fd, buffer, len] () mutable {
|
||||
ssize_t r = ::recv(fd.fd, buffer, len, 0);
|
||||
if (r == -1 && errno == EAGAIN) {
|
||||
read_some(fd, buffer, len).then([pr = std::move(pr)] (size_t r) mutable {
|
||||
pr.set_value(r);
|
||||
});
|
||||
return;
|
||||
}
|
||||
assert(r != -1);
|
||||
if (size_t(r) == len) {
|
||||
fd.speculate_epoll(EPOLLIN);
|
||||
}
|
||||
pr.set_value(r);
|
||||
});
|
||||
return fut;
|
||||
@@ -631,12 +658,21 @@ future<size_t>
|
||||
reactor::read_some(pollable_fd_state& fd, const std::vector<iovec>& iov) {
|
||||
promise<size_t> pr;
|
||||
auto fut = pr.get_future();
|
||||
readable(fd).then([pr = std::move(pr), rfd = fd.fd, iov = iov] () mutable {
|
||||
readable(fd).then([this, pr = std::move(pr), &fd, iov = iov] () mutable {
|
||||
::msghdr mh = {};
|
||||
mh.msg_iov = &iov[0];
|
||||
mh.msg_iovlen = iov.size();
|
||||
ssize_t r = ::recvmsg(rfd, &mh, 0);
|
||||
ssize_t r = ::recvmsg(fd.fd, &mh, 0);
|
||||
if (r == -1 && errno == EAGAIN) {
|
||||
read_some(fd, iov).then([pr = std::move(pr)] (size_t r) mutable {
|
||||
pr.set_value(r);
|
||||
});
|
||||
return;
|
||||
}
|
||||
assert(r != -1);
|
||||
if (size_t(r) == iovec_len(iov)) {
|
||||
fd.speculate_epoll(EPOLLIN);
|
||||
}
|
||||
pr.set_value(r);
|
||||
});
|
||||
return fut;
|
||||
@@ -647,9 +683,18 @@ future<size_t>
|
||||
reactor::write_some(pollable_fd_state& fd, const void* buffer, size_t len) {
|
||||
promise<size_t> pr;
|
||||
auto fut = pr.get_future();
|
||||
writeable(fd).then([pr = std::move(pr), sfd = fd.fd, buffer, len] () mutable {
|
||||
ssize_t r = ::send(sfd, buffer, len, 0);
|
||||
writeable(fd).then([this, pr = std::move(pr), &fd, buffer, len] () mutable {
|
||||
ssize_t r = ::send(fd.fd, buffer, len, 0);
|
||||
if (r == -1 && errno == EAGAIN) {
|
||||
write_some(fd, buffer, len).then([pr = std::move(pr)] (size_t r) mutable {
|
||||
pr.set_value(r);
|
||||
});
|
||||
return;
|
||||
}
|
||||
assert(r != -1);
|
||||
if (size_t(r) == len) {
|
||||
fd.speculate_epoll(EPOLLOUT);
|
||||
}
|
||||
pr.set_value(r);
|
||||
});
|
||||
return fut;
|
||||
|
||||
Reference in New Issue
Block a user