corosio


Directory: ../corosio_lcov_PR-139/
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 74.2% 95 / 0 / 128
Functions: 84.2% 16 / 0 / 19
Branches: 66.7% 24 / 0 / 36

src/corosio/src/detail/select/op.hpp
Line Branch Exec Source
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_SELECT_OP_HPP
11 #define BOOST_COROSIO_DETAIL_SELECT_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <coroutine>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/dispatch_coro.hpp"
27 #include "src/detail/scheduler_op.hpp"
28 #include "src/detail/endpoint_convert.hpp"
29
30 #include <unistd.h>
31 #include <errno.h>
32 #include <fcntl.h>
33
34 #include <atomic>
35 #include <cstddef>
36 #include <memory>
37 #include <optional>
38 #include <stop_token>
39
40 #include <netinet/in.h>
41 #include <sys/select.h>
42 #include <sys/socket.h>
43 #include <sys/uio.h>
44
45 /*
46 select Operation State
47 ======================
48
49 Each async I/O operation has a corresponding select_op-derived struct that
50 holds the operation's state while it's in flight. The socket impl owns
51 fixed slots for each operation type (conn_, rd_, wr_), so only one
52 operation of each type can be pending per socket at a time.
53
54 This mirrors the epoll_op design for consistency across backends.
55
56 Completion vs Cancellation Race
57 -------------------------------
58 The `registered` atomic uses a tri-state (unregistered, registering,
59 registered) to handle two races: (1) between register_fd() and the
60 reactor seeing an event, and (2) between reactor completion and cancel().
61
62 The registering state closes the window where an event could arrive
63 after register_fd() but before the boolean was set. The reactor and
64 cancel() both treat registering the same as registered when claiming.
65
66 Whoever atomically exchanges to unregistered "claims" the operation
67 and is responsible for completing it. The loser sees unregistered and
68 does nothing. The initiating thread uses compare_exchange to transition
69 from registering to registered; if this fails, the reactor or cancel
70 already claimed the op.
71
72 Impl Lifetime Management
73 ------------------------
74 When cancel() posts an op to the scheduler's ready queue, the socket impl
75 might be destroyed before the scheduler processes the op. The `impl_ptr`
76 member holds a shared_ptr to the impl, keeping it alive until the op
77 completes.
78
79 EOF Detection
80 -------------
81 For reads, 0 bytes with no error means EOF. But an empty user buffer also
82 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
83
84 SIGPIPE Prevention
85 ------------------
86 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
87 SIGPIPE when the peer has closed.
88 */
89
90 namespace boost::corosio::detail {
91
92 // Forward declarations for cancellation support
93 class select_socket_impl;
94 class select_acceptor_impl;
95
96 /** Registration state for async operations.
97
98 Tri-state enum to handle the race between register_fd() and
99 run_reactor() seeing an event. Setting REGISTERING before
100 calling register_fd() ensures events delivered during the
101 registration window are not dropped.
102 */
103 enum class select_registration_state : std::uint8_t
104 {
105 unregistered, ///< Not registered with reactor
106 registering, ///< register_fd() called, not yet confirmed
107 registered ///< Fully registered, ready for events
108 };
109
110 struct select_op : scheduler_op
111 {
112 struct canceller
113 {
114 select_op* op;
115 void operator()() const noexcept;
116 };
117
118 std::coroutine_handle<> h;
119 capy::executor_ref ex;
120 std::error_code* ec_out = nullptr;
121 std::size_t* bytes_out = nullptr;
122
123 int fd = -1;
124 int errn = 0;
125 std::size_t bytes_transferred = 0;
126
127 std::atomic<bool> cancelled{false};
128 std::atomic<select_registration_state> registered{select_registration_state::unregistered};
129 std::optional<std::stop_callback<canceller>> stop_cb;
130
131 // Prevents use-after-free when socket is closed with pending ops.
132 std::shared_ptr<void> impl_ptr;
133
134 // For stop_token cancellation - pointer to owning socket/acceptor impl.
135 select_socket_impl* socket_impl_ = nullptr;
136 select_acceptor_impl* acceptor_impl_ = nullptr;
137
138 16134 select_op() = default;
139
140 154773 void reset() noexcept
141 {
142 154773 fd = -1;
143 154773 errn = 0;
144 154773 bytes_transferred = 0;
145 154773 cancelled.store(false, std::memory_order_relaxed);
146 154773 registered.store(select_registration_state::unregistered, std::memory_order_relaxed);
147 154773 impl_ptr.reset();
148 154773 socket_impl_ = nullptr;
149 154773 acceptor_impl_ = nullptr;
150 154773 }
151
152 149417 void operator()() override
153 {
154 149417 stop_cb.reset();
155
156
1/2
✓ Branch 0 taken 149417 times.
✗ Branch 1 not taken.
149417 if (ec_out)
157 {
158
2/2
✓ Branch 1 taken 199 times.
✓ Branch 2 taken 149218 times.
149417 if (cancelled.load(std::memory_order_acquire))
159 199 *ec_out = capy::error::canceled;
160
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 149217 times.
149218 else if (errn != 0)
161 1 *ec_out = make_err(errn);
162
6/6
✓ Branch 1 taken 74594 times.
✓ Branch 2 taken 74623 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 74589 times.
✓ Branch 5 taken 5 times.
✓ Branch 6 taken 149212 times.
149217 else if (is_read_operation() && bytes_transferred == 0)
163 5 *ec_out = capy::error::eof;
164 else
165 149212 *ec_out = {};
166 }
167
168
1/2
✓ Branch 0 taken 149417 times.
✗ Branch 1 not taken.
149417 if (bytes_out)
169 149417 *bytes_out = bytes_transferred;
170
171 // Move to stack before destroying the frame
172 149417 capy::executor_ref saved_ex( std::move( ex ) );
173 149417 std::coroutine_handle<> saved_h( std::move( h ) );
174 149417 impl_ptr.reset();
175
2/2
✓ Branch 1 taken 149417 times.
✓ Branch 4 taken 149417 times.
149417 dispatch_coro(saved_ex, saved_h).resume();
176 149417 }
177
178 74622 virtual bool is_read_operation() const noexcept { return false; }
179 virtual void cancel() noexcept = 0;
180
181 void destroy() override
182 {
183 stop_cb.reset();
184 impl_ptr.reset();
185 }
186
187 24870 void request_cancel() noexcept
188 {
189 24870 cancelled.store(true, std::memory_order_release);
190 24870 }
191
192 void start(std::stop_token token)
193 {
194 cancelled.store(false, std::memory_order_release);
195 stop_cb.reset();
196 socket_impl_ = nullptr;
197 acceptor_impl_ = nullptr;
198
199 if (token.stop_possible())
200 stop_cb.emplace(token, canceller{this});
201 }
202
203 152094 void start(std::stop_token token, select_socket_impl* impl)
204 {
205 152094 cancelled.store(false, std::memory_order_release);
206 152094 stop_cb.reset();
207 152094 socket_impl_ = impl;
208 152094 acceptor_impl_ = nullptr;
209
210
2/2
✓ Branch 1 taken 99 times.
✓ Branch 2 taken 151995 times.
152094 if (token.stop_possible())
211 99 stop_cb.emplace(token, canceller{this});
212 152094 }
213
214 2679 void start(std::stop_token token, select_acceptor_impl* impl)
215 {
216 2679 cancelled.store(false, std::memory_order_release);
217 2679 stop_cb.reset();
218 2679 socket_impl_ = nullptr;
219 2679 acceptor_impl_ = impl;
220
221
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2679 times.
2679 if (token.stop_possible())
222 stop_cb.emplace(token, canceller{this});
223 2679 }
224
225 154611 void complete(int err, std::size_t bytes) noexcept
226 {
227 154611 errn = err;
228 154611 bytes_transferred = bytes;
229 154611 }
230
231 virtual void perform_io() noexcept {}
232 };
233
234
235 struct select_connect_op : select_op
236 {
237 endpoint target_endpoint;
238
239 2677 void reset() noexcept
240 {
241 2677 select_op::reset();
242 2677 target_endpoint = endpoint{};
243 2677 }
244
245 2677 void perform_io() noexcept override
246 {
247 // connect() completion status is retrieved via SO_ERROR, not return value
248 2677 int err = 0;
249 2677 socklen_t len = sizeof(err);
250
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2677 times.
2677 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
251 err = errno;
252 2677 complete(err, 0);
253 2677 }
254
255 // Defined in sockets.cpp where select_socket_impl is complete
256 void operator()() override;
257 void cancel() noexcept override;
258 };
259
260
261 struct select_read_op : select_op
262 {
263 static constexpr std::size_t max_buffers = 16;
264 iovec iovecs[max_buffers];
265 int iovec_count = 0;
266 bool empty_buffer_read = false;
267
268 74595 bool is_read_operation() const noexcept override
269 {
270 74595 return !empty_buffer_read;
271 }
272
273 74790 void reset() noexcept
274 {
275 74790 select_op::reset();
276 74790 iovec_count = 0;
277 74790 empty_buffer_read = false;
278 74790 }
279
280 123 void perform_io() noexcept override
281 {
282 123 ssize_t n = ::readv(fd, iovecs, iovec_count);
283
1/2
✓ Branch 0 taken 123 times.
✗ Branch 1 not taken.
123 if (n >= 0)
284 123 complete(0, static_cast<std::size_t>(n));
285 else
286 complete(errno, 0);
287 123 }
288
289 void cancel() noexcept override;
290 };
291
292
293 struct select_write_op : select_op
294 {
295 static constexpr std::size_t max_buffers = 16;
296 iovec iovecs[max_buffers];
297 int iovec_count = 0;
298
299 74627 void reset() noexcept
300 {
301 74627 select_op::reset();
302 74627 iovec_count = 0;
303 74627 }
304
305 void perform_io() noexcept override
306 {
307 msghdr msg{};
308 msg.msg_iov = iovecs;
309 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
310
311 ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
312 if (n >= 0)
313 complete(0, static_cast<std::size_t>(n));
314 else
315 complete(errno, 0);
316 }
317
318 void cancel() noexcept override;
319 };
320
321
322 struct select_accept_op : select_op
323 {
324 int accepted_fd = -1;
325 io_object::io_object_impl* peer_impl = nullptr;
326 io_object::io_object_impl** impl_out = nullptr;
327
328 2679 void reset() noexcept
329 {
330 2679 select_op::reset();
331 2679 accepted_fd = -1;
332 2679 peer_impl = nullptr;
333 2679 impl_out = nullptr;
334 2679 }
335
336 2674 void perform_io() noexcept override
337 {
338 2674 sockaddr_in addr{};
339 2674 socklen_t addrlen = sizeof(addr);
340
341 // Note: select backend uses accept() + fcntl instead of accept4()
342 // for broader POSIX compatibility
343 2674 int new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&addr), &addrlen);
344
345
1/2
✓ Branch 0 taken 2674 times.
✗ Branch 1 not taken.
2674 if (new_fd >= 0)
346 {
347 // Reject fds that exceed select()'s FD_SETSIZE limit.
348 // Better to fail now than during later async operations.
349
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2674 times.
2674 if (new_fd >= FD_SETSIZE)
350 {
351 ::close(new_fd);
352 complete(EINVAL, 0);
353 return;
354 }
355
356 // Set non-blocking and close-on-exec flags.
357 // A non-blocking socket is essential for the async reactor;
358 // if we can't configure it, fail rather than risk blocking.
359 2674 int flags = ::fcntl(new_fd, F_GETFL, 0);
360
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2674 times.
2674 if (flags == -1)
361 {
362 int err = errno;
363 ::close(new_fd);
364 complete(err, 0);
365 return;
366 }
367
368
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2674 times.
2674 if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
369 {
370 int err = errno;
371 ::close(new_fd);
372 complete(err, 0);
373 return;
374 }
375
376
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2674 times.
2674 if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
377 {
378 int err = errno;
379 ::close(new_fd);
380 complete(err, 0);
381 return;
382 }
383
384 2674 accepted_fd = new_fd;
385 2674 complete(0, 0);
386 }
387 else
388 {
389 complete(errno, 0);
390 }
391 }
392
393 // Defined in acceptors.cpp where select_acceptor_impl is complete
394 void operator()() override;
395 void cancel() noexcept override;
396 };
397
398 } // namespace boost::corosio::detail
399
400 #endif // BOOST_COROSIO_HAS_SELECT
401
402 #endif // BOOST_COROSIO_DETAIL_SELECT_OP_HPP
403