corosio


Directory: ../corosio_lcov_PR-139/
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 81.4% 79 / 0 / 97
Functions: 80.0% 16 / 0 / 20
Branches: 50.0% 12 / 0 / 24

src/corosio/src/detail/epoll/op.hpp
Line Branch Exec Source
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <coroutine>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/dispatch_coro.hpp"
27 #include "src/detail/scheduler_op.hpp"
28 #include "src/detail/endpoint_convert.hpp"
29
30 #include <unistd.h>
31 #include <errno.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <mutex>
37 #include <optional>
38 #include <stop_token>
39
40 #include <netinet/in.h>
41 #include <sys/socket.h>
42 #include <sys/uio.h>
43
44 /*
45 epoll Operation State
46 =====================
47
48 Each async I/O operation has a corresponding epoll_op-derived struct that
49 holds the operation's state while it's in flight. The socket impl owns
50 fixed slots for each operation type (conn_, rd_, wr_), so only one
51 operation of each type can be pending per socket at a time.
52
53 Persistent Registration
54 -----------------------
55 File descriptors are registered with epoll once (via descriptor_state) and
56 stay registered until closed. The descriptor_state tracks which operations
57 are pending (read_op, write_op, connect_op). When an event arrives, the
58 reactor dispatches to the appropriate pending operation.
59
60 Impl Lifetime Management
61 ------------------------
62 When cancel() posts an op to the scheduler's ready queue, the socket impl
63 might be destroyed before the scheduler processes the op. The `impl_ptr`
64 member holds a shared_ptr to the impl, keeping it alive until the op
65 completes. This is set by cancel() and cleared in operator() after the
66 coroutine is resumed.
67
68 EOF Detection
69 -------------
70 For reads, 0 bytes with no error means EOF. But an empty user buffer also
71 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
72
73 SIGPIPE Prevention
74 ------------------
75 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
76 SIGPIPE when the peer has closed.
77 */
78
79 namespace boost::corosio::detail {
80
81 // Forward declarations
82 class epoll_socket_impl;
83 class epoll_acceptor_impl;
84 struct epoll_op;
85
86 // Forward declaration
87 class epoll_scheduler;
88
89 /** Per-descriptor state for persistent epoll registration.
90
91 Tracks pending operations for a file descriptor. The fd is registered
92 once with epoll and stays registered until closed.
93
94 This struct extends scheduler_op to support deferred I/O processing.
95 When epoll events arrive, the reactor sets ready_events and queues
96 this descriptor for processing. When popped from the scheduler queue,
97 operator() performs the actual I/O and queues completion handlers.
98
99 @par Deferred I/O Model
100 The reactor no longer performs I/O directly. Instead:
101 1. Reactor sets ready_events and queues descriptor_state
102 2. Scheduler pops descriptor_state and calls operator()
103 3. operator() performs I/O under mutex and queues completions
104
105 This eliminates per-descriptor mutex locking from the reactor hot path.
106
107 @par Thread Safety
108 The mutex protects operation pointers and ready flags during I/O.
109 ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
110 */
111 struct descriptor_state : scheduler_op
112 {
113 std::mutex mutex;
114
115 // Protected by mutex
116 epoll_op* read_op = nullptr;
117 epoll_op* write_op = nullptr;
118 epoll_op* connect_op = nullptr;
119
120 // Caches edge events that arrived before an op was registered
121 bool read_ready = false;
122 bool write_ready = false;
123
124 // Deferred cancellation: set by cancel() when the target op is not
125 // parked (e.g. completing inline via speculative I/O). Checked when
126 // the next op parks; if set, the op is immediately self-cancelled.
127 // This matches IOCP semantics where CancelIoEx always succeeds.
128 bool read_cancel_pending = false;
129 bool write_cancel_pending = false;
130 bool connect_cancel_pending = false;
131
132 // Set during registration only (no mutex needed)
133 std::uint32_t registered_events = 0;
134 int fd = -1;
135
136 // For deferred I/O - set by reactor, read by scheduler
137 std::atomic<std::uint32_t> ready_events_{0};
138 std::atomic<bool> is_enqueued_{false};
139 epoll_scheduler const* scheduler_ = nullptr;
140
141 // Prevents impl destruction while this descriptor_state is queued.
142 // Set by close_socket() when is_enqueued_ is true, cleared by operator().
143 std::shared_ptr<void> impl_ref_;
144
145 /// Add ready events atomically.
146 60961 void add_ready_events(std::uint32_t ev) noexcept
147 {
148 60961 ready_events_.fetch_or(ev, std::memory_order_relaxed);
149 60961 }
150
151 /// Perform deferred I/O and queue completions.
152 void operator()() override;
153
154 /// Destroy without invoking.
155 /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
156 /// the self-referential cycle set by close_socket().
157 void destroy() override { impl_ref_.reset(); }
158 };
159
160 struct epoll_op : scheduler_op
161 {
162 struct canceller
163 {
164 epoll_op* op;
165 void operator()() const noexcept;
166 };
167
168 std::coroutine_handle<> h;
169 capy::executor_ref ex;
170 std::error_code* ec_out = nullptr;
171 std::size_t* bytes_out = nullptr;
172
173 int fd = -1;
174 int errn = 0;
175 std::size_t bytes_transferred = 0;
176
177 std::atomic<bool> cancelled{false};
178 std::optional<std::stop_callback<canceller>> stop_cb;
179
180 // Prevents use-after-free when socket is closed with pending ops.
181 // See "Impl Lifetime Management" in file header.
182 std::shared_ptr<void> impl_ptr;
183
184 // For stop_token cancellation - pointer to owning socket/acceptor impl.
185 // When stop is requested, we call back to the impl to perform actual I/O cancellation.
186 epoll_socket_impl* socket_impl_ = nullptr;
187 epoll_acceptor_impl* acceptor_impl_ = nullptr;
188
189 28840 epoll_op() = default;
190
191 236699 void reset() noexcept
192 {
193 236699 fd = -1;
194 236699 errn = 0;
195 236699 bytes_transferred = 0;
196 236699 cancelled.store(false, std::memory_order_relaxed);
197 236699 impl_ptr.reset();
198 236699 socket_impl_ = nullptr;
199 236699 acceptor_impl_ = nullptr;
200 236699 }
201
202 // Defined in sockets.cpp where epoll_socket_impl is complete
203 void operator()() override;
204
205 37749 virtual bool is_read_operation() const noexcept { return false; }
206 virtual void cancel() noexcept = 0;
207
208 void destroy() override
209 {
210 stop_cb.reset();
211 impl_ptr.reset();
212 }
213
214 43970 void request_cancel() noexcept
215 {
216 43970 cancelled.store(true, std::memory_order_release);
217 43970 }
218
219 80572 void start(std::stop_token token, epoll_socket_impl* impl)
220 {
221 80572 cancelled.store(false, std::memory_order_release);
222 80572 stop_cb.reset();
223 80572 socket_impl_ = impl;
224 80572 acceptor_impl_ = nullptr;
225
226
2/2
✓ Branch 1 taken 99 times.
✓ Branch 2 taken 80473 times.
80572 if (token.stop_possible())
227 99 stop_cb.emplace(token, canceller{this});
228 80572 }
229
230 4799 void start(std::stop_token token, epoll_acceptor_impl* impl)
231 {
232 4799 cancelled.store(false, std::memory_order_release);
233 4799 stop_cb.reset();
234 4799 socket_impl_ = nullptr;
235 4799 acceptor_impl_ = impl;
236
237
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 4790 times.
4799 if (token.stop_possible())
238 9 stop_cb.emplace(token, canceller{this});
239 4799 }
240
241 85307 void complete(int err, std::size_t bytes) noexcept
242 {
243 85307 errn = err;
244 85307 bytes_transferred = bytes;
245 85307 }
246
247 virtual void perform_io() noexcept {}
248 };
249
250
251 struct epoll_connect_op : epoll_op
252 {
253 endpoint target_endpoint;
254
255 4791 void reset() noexcept
256 {
257 4791 epoll_op::reset();
258 4791 target_endpoint = endpoint{};
259 4791 }
260
261 4791 void perform_io() noexcept override
262 {
263 // connect() completion status is retrieved via SO_ERROR, not return value
264 4791 int err = 0;
265 4791 socklen_t len = sizeof(err);
266
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4791 times.
4791 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
267 err = errno;
268 4791 complete(err, 0);
269 4791 }
270
271 // Defined in sockets.cpp where epoll_socket_impl is complete
272 void operator()() override;
273 void cancel() noexcept override;
274 };
275
276
277 struct epoll_read_op : epoll_op
278 {
279 static constexpr std::size_t max_buffers = 16;
280 iovec iovecs[max_buffers];
281 int iovec_count = 0;
282 bool empty_buffer_read = false;
283
284 37828 bool is_read_operation() const noexcept override
285 {
286 37828 return !empty_buffer_read;
287 }
288
289 113654 void reset() noexcept
290 {
291 113654 epoll_op::reset();
292 113654 iovec_count = 0;
293 113654 empty_buffer_read = false;
294 113654 }
295
296 145 void perform_io() noexcept override
297 {
298 ssize_t n;
299 do {
300 145 n = ::readv(fd, iovecs, iovec_count);
301
3/4
✓ Branch 0 taken 141 times.
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 141 times.
145 } while (n < 0 && errno == EINTR);
302
303
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 141 times.
145 if (n >= 0)
304 4 complete(0, static_cast<std::size_t>(n));
305 else
306 141 complete(errno, 0);
307 145 }
308
309 void cancel() noexcept override;
310 };
311
312
313 struct epoll_write_op : epoll_op
314 {
315 static constexpr std::size_t max_buffers = 16;
316 iovec iovecs[max_buffers];
317 int iovec_count = 0;
318
319 113455 void reset() noexcept
320 {
321 113455 epoll_op::reset();
322 113455 iovec_count = 0;
323 113455 }
324
325 void perform_io() noexcept override
326 {
327 msghdr msg{};
328 msg.msg_iov = iovecs;
329 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
330
331 ssize_t n;
332 do {
333 n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
334 } while (n < 0 && errno == EINTR);
335
336 if (n >= 0)
337 complete(0, static_cast<std::size_t>(n));
338 else
339 complete(errno, 0);
340 }
341
342 void cancel() noexcept override;
343 };
344
345
346 struct epoll_accept_op : epoll_op
347 {
348 int accepted_fd = -1;
349 io_object::io_object_impl** impl_out = nullptr;
350 sockaddr_in peer_addr{};
351
352 4799 void reset() noexcept
353 {
354 4799 epoll_op::reset();
355 4799 accepted_fd = -1;
356 4799 impl_out = nullptr;
357 4799 peer_addr = {};
358 4799 }
359
360 4788 void perform_io() noexcept override
361 {
362 4788 socklen_t addrlen = sizeof(peer_addr);
363 int new_fd;
364 do {
365 4788 new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&peer_addr),
366 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
367
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 4788 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
4788 } while (new_fd < 0 && errno == EINTR);
368
369
1/2
✓ Branch 0 taken 4788 times.
✗ Branch 1 not taken.
4788 if (new_fd >= 0)
370 {
371 4788 accepted_fd = new_fd;
372 4788 complete(0, 0);
373 }
374 else
375 {
376 complete(errno, 0);
377 }
378 4788 }
379
380 // Defined in acceptors.cpp where epoll_acceptor_impl is complete
381 void operator()() override;
382 void cancel() noexcept override;
383 };
384
385 } // namespace boost::corosio::detail
386
387 #endif // BOOST_COROSIO_HAS_EPOLL
388
389 #endif // BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
390