diff --git a/Makefile b/Makefile index bd4578fed4..07db558b85 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,16 @@ +mode = release -sanitize = -fsanitize=address -fsanitize=leak -fsanitize=undefined -CXXFLAGS = -std=gnu++1y -g -Wall -O0 -MD -MT $@ -MP -flto $(sanitize) +sanitize.debug = -fsanitize=address -fsanitize=leak -fsanitize=undefined +sanitize.release = + +opt.debug = -O0 +opt.release = -O2 -flto + +sanitize = $(sanitize.$(mode)) +opt = $(opt.$(mode)) + +CXXFLAGS = -std=gnu++1y -g -Wall $(opt) -MD -MT $@ -MP -flto $(sanitize) -fvisibility=hidden tests = test-reactor diff --git a/reactor.cc b/reactor.cc index e4b4f73a6d..c0e93f46b1 100644 --- a/reactor.cc +++ b/reactor.cc @@ -21,7 +21,7 @@ reactor::~reactor() { void reactor::epoll_add_in(pollable_fd& pfd, std::unique_ptr t) { auto ctl = pfd.events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; - pfd.events |= EPOLLIN | EPOLLONESHOT; + pfd.events |= EPOLLIN; assert(!pfd.pollin); pfd.pollin = std::move(t); ::epoll_event eevt; @@ -33,7 +33,7 @@ void reactor::epoll_add_in(pollable_fd& pfd, std::unique_ptr t) { void reactor::epoll_add_out(pollable_fd& pfd, std::unique_ptr t) { auto ctl = pfd.events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD; - pfd.events |= EPOLLOUT | EPOLLONESHOT; + pfd.events |= EPOLLOUT; assert(!pfd.pollout); pfd.pollout = std::move(t); ::epoll_event eevt; @@ -43,9 +43,14 @@ void reactor::epoll_add_out(pollable_fd& pfd, std::unique_ptr t) { assert(r == 0); } +void reactor::forget(pollable_fd& fd) { + if (fd.events) { + ::epoll_ctl(_epollfd, EPOLL_CTL_DEL, fd.fd, nullptr); + } +} + std::unique_ptr -reactor::listen(socket_address sa, listen_options opts) -{ +reactor::listen(socket_address sa, listen_options opts) { int fd = ::socket(sa.u.sa.sa_family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); assert(fd != -1); if (opts.reuse_address) { @@ -55,7 +60,7 @@ reactor::listen(socket_address sa, listen_options opts) int r = ::bind(fd, &sa.u.sa, sizeof(sa.u.sas)); assert(r != -1); ::listen(fd, 100); - return std::unique_ptr(new pollable_fd(fd)); + return std::unique_ptr(new pollable_fd(*this, fd)); } void reactor::run() { @@ -67,13 +72,20 @@ void reactor::run() { auto& evt = eevt[i]; auto pfd = reinterpret_cast(evt.data.ptr); auto events = evt.events; + pfd->events = 0; + ::epoll_ctl(_epollfd, EPOLL_CTL_DEL, pfd->fd, &evt); + std::unique_ptr t_in, t_out; if (events & EPOLLIN) { - auto t = std::move(pfd->pollin); - t->run(); + t_in = std::move(pfd->pollin); } if (events & EPOLLOUT) { - auto t = std::move(pfd->pollout); - t->run(); + t_out = std::move(pfd->pollout); + } + if (t_in) { + t_in->run(); + } + if (t_out) { + t_out->run(); } } } diff --git a/reactor.hh b/reactor.hh index c0f01ba557..c1d5893b89 100644 --- a/reactor.hh +++ b/reactor.hh @@ -19,6 +19,7 @@ #include #include #include +#include class socket_address; class reactor; @@ -131,7 +132,6 @@ struct future_state { assert(_state == state::future); _state = state::result; new (&_u.value) T(std::forward(a)...); - std::cout << "checking task at " << &_task << "\n"; if (_task) { _task->run(); } @@ -155,7 +155,6 @@ struct future_state { } template void schedule(Func&& func) { - std::cout << "scheduling task at " << &_task << "\n"; _task = make_task(std::forward(func)); } }; @@ -203,14 +202,26 @@ public: T get() { return _state->get(); } + + template + void then(Func, Enable); + template - void then(Func&& func) { + void then(Func&& func, std::enable_if_t, void>::value, void*> = nullptr) { auto state = _state; state->schedule([fut = std::move(*this), func = std::forward(func)] () mutable { - std::cout << "running task\n"; func(std::move(fut)); }); } +#if 0 + template + auto then(Func&& func) -> std::enable_if::value, decltype(func(*this))>::type { + auto state = _state; + state->schedule([fut = std::move(*this), func = std::forward(func)] () mutable { + func(std::move(fut)); + }); + } +#endif friend class promise; }; @@ -236,6 +247,7 @@ public: private: void epoll_add_in(pollable_fd& fd, std::unique_ptr t); void epoll_add_out(pollable_fd& fd, std::unique_ptr t); + void forget(pollable_fd& fd); void abort_on_error(int ret); public: reactor(); @@ -249,16 +261,27 @@ public: future read_some(pollable_fd& fd, void* buffer, size_t size); + future write_some(pollable_fd& fd, void* buffer, size_t size); + + future write_all(pollable_fd& fd, void* buffer, size_t size); + void run(); +private: + void write_all_part(pollable_fd& fd, void* buffer, size_t size, + promise result, size_t completed); + friend class pollable_fd; }; class pollable_fd { +public: + ~pollable_fd() { r.forget(*this); ::close(fd); } protected: - explicit pollable_fd(int fd) : fd(fd) {} + explicit pollable_fd(reactor& r, int fd) : r(r), fd(fd) {} pollable_fd(const pollable_fd&) = delete; void operator=(const pollable_fd&) = delete; + reactor& r; int fd; int events = 0; std::unique_ptr pollin; @@ -271,12 +294,12 @@ future reactor::accept(pollable_fd& listenfd) { promise pr; future fut = pr.get_future(); - epoll_add_in(listenfd, make_task([pr = std::move(pr), lfd = listenfd.fd] () mutable { + epoll_add_in(listenfd, make_task([this, pr = std::move(pr), lfd = listenfd.fd] () mutable { socket_address sa; socklen_t sl = sizeof(&sa.u.sas); int fd = ::accept4(lfd, &sa.u.sa, &sl, SOCK_NONBLOCK | SOCK_CLOEXEC); assert(fd != -1); - pr.set_value(accept_result{std::unique_ptr(new pollable_fd(fd)), sa}); + pr.set_value(accept_result{std::unique_ptr(new pollable_fd(*this, fd)), sa}); })); return fut; } @@ -294,5 +317,43 @@ reactor::read_some(pollable_fd& fd, void* buffer, size_t len) { return fut; } +inline +future +reactor::write_some(pollable_fd& fd, void* buffer, size_t len) { + promise pr; + auto fut = pr.get_future(); + epoll_add_out(fd, make_task([pr = std::move(pr), sfd = fd.fd, buffer, len] () mutable { + ssize_t r = ::send(sfd, buffer, len, 0); + assert(r != -1); + pr.set_value(r); + })); + return fut; +} + +inline +void +reactor::write_all_part(pollable_fd& fd, void* buffer, size_t len, + promise result, size_t completed) { + if (completed == len) { + result.set_value(completed); + } else { + write_some(fd, static_cast(buffer) + completed, len - completed).then( + [&fd, buffer, len, result = std::move(result), completed, this] (future part) mutable { + write_all_part(fd, buffer, len, std::move(result), completed + part.get()); + }); + } +} + +inline +future +reactor::write_all(pollable_fd& fd, void* buffer, size_t len) { + assert(len); + promise pr; + auto fut = pr.get_future(); + write_all_part(fd, buffer, len, std::move(pr), 0); + return fut; +} + + #endif /* REACTOR_HH_ */ diff --git a/test-reactor.cc b/test-reactor.cc index db7b9133ba..1d18f07b68 100644 --- a/test-reactor.cc +++ b/test-reactor.cc @@ -12,23 +12,31 @@ struct test { reactor r; std::unique_ptr listener; struct connection { + connection(reactor& r, std::unique_ptr fd) : r(r), fd(std::move(fd)) {} reactor& r; std::unique_ptr fd; + char buffer[8192]; + void copy_data() { + r.read_some(*fd, buffer, sizeof(buffer)).then([this] (future fut) { + auto n = fut.get(); + if (n) { + r.write_all(*fd, buffer, n).then([this, n] (future fut) { + if (fut.get() == n) { + copy_data(); + } + }); + } else { + delete this; + } + }); + } }; void new_connection(accept_result&& accepted) { - std::cout << "got connection\n"; - copy_data(std::move(std::get<0>(accepted))); - } - void copy_data(std::unique_ptr fd) { - char buffer[8192]; - r.read_some(*fd, buffer, sizeof(buffer)).then([this] (future fut) { - auto n = fut.get(); - std::cout << "got data: " << n << "\n"; - }); + auto c = new connection(r, std::move(std::get<0>(accepted))); + c->copy_data(); } void start_accept() { r.accept(*listener).then([this] (future fut) { - std::cout << "accept future returned\n"; new_connection(fut.get()); start_accept(); });