corosio


Directory: ../corosio_lcov_PR-139/
Coverage: low: ≥ 0% medium: ≥ 75.0% high: ≥ 90.0%
Coverage Exec / Excl / Total
Lines: 79.9% 397 / 0 / 497
Functions: 89.6% 43 / 0 / 48
Branches: 66.7% 202 / 0 / 303

src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Exec Source
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/timer_service.hpp"
17 #include "src/detail/make_err.hpp"
18 #include "src/detail/posix/resolver_service.hpp"
19 #include "src/detail/posix/signals.hpp"
20
21 #include <boost/corosio/detail/except.hpp>
22 #include <boost/corosio/detail/thread_local_ptr.hpp>
23
24 #include <chrono>
25 #include <limits>
26 #include <utility>
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <sys/epoll.h>
31 #include <sys/eventfd.h>
32 #include <sys/socket.h>
33 #include <sys/timerfd.h>
34 #include <unistd.h>
35
36 /*
37 epoll Scheduler - Single Reactor Model
38 ======================================
39
40 This scheduler uses a thread coordination strategy to provide handler
41 parallelism and avoid the thundering herd problem.
42 Instead of all threads blocking on epoll_wait(), one thread becomes the
43 "reactor" while others wait on a condition variable for handler work.
44
45 Thread Model
46 ------------
47 - ONE thread runs epoll_wait() at a time (the reactor thread)
48 - OTHER threads wait on cond_ (condition variable) for handlers
49 - When work is posted, exactly one waiting thread wakes via notify_one()
50 - This matches Windows IOCP semantics where N posted items wake N threads
51
52 Event Loop Structure (do_one)
53 -----------------------------
54 1. Lock mutex, try to pop handler from queue
55 2. If got handler: execute it (unlocked), return
56 3. If queue empty and no reactor running: become reactor
57 - Run epoll_wait (unlocked), queue I/O completions, loop back
58 4. If queue empty and reactor running: wait on condvar for work
59
60 The task_running_ flag ensures only one thread owns epoll_wait().
61 After the reactor queues I/O completions, it loops back to try getting
62 a handler, giving priority to handler execution over more I/O polling.
63
64 Signaling State (state_)
65 ------------------------
66 The state_ variable encodes two pieces of information:
67 - Bit 0: signaled flag (1 = signaled, persists until cleared)
68 - Upper bits: waiter count (each waiter adds 2 before blocking)
69
70 This allows efficient coordination:
71 - Signalers only call notify when waiters exist (state_ > 1)
72 - Waiters check if already signaled before blocking (fast-path)
73
74 Wake Coordination (wake_one_thread_and_unlock)
75 ----------------------------------------------
76 When posting work:
77 - If waiters exist (state_ > 1): signal and notify_one()
78 - Else if reactor running: interrupt via eventfd write
79 - Else: no-op (thread will find work when it checks queue)
80
81 This avoids waking threads unnecessarily. With cascading wakes,
82 each handler execution wakes at most one additional thread if
83 more work exists in the queue.
84
85 Work Counting
86 -------------
87 outstanding_work_ tracks pending operations. When it hits zero, run()
88 returns. Each operation increments on start, decrements on completion.
89
90 Timer Integration
91 -----------------
92 Timers are handled by timer_service. The reactor adjusts epoll_wait
93 timeout to wake for the nearest timer expiry. When a new timer is
94 scheduled earlier than current, timer_service calls interrupt_reactor()
95 to re-evaluate the timeout.
96 */
97
98 namespace boost::corosio::detail {
99
100 struct scheduler_context
101 {
102 epoll_scheduler const* key;
103 scheduler_context* next;
104 op_queue private_queue;
105 long private_outstanding_work;
106 int inline_budget;
107
108 183 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
109 183 : key(k)
110 183 , next(n)
111 183 , private_outstanding_work(0)
112 183 , inline_budget(0)
113 {
114 183 }
115 };
116
117 namespace {
118
119 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
120
121 struct thread_context_guard
122 {
123 scheduler_context frame_;
124
125 183 explicit thread_context_guard(
126 epoll_scheduler const* ctx) noexcept
127 183 : frame_(ctx, context_stack.get())
128 {
129 183 context_stack.set(&frame_);
130 183 }
131
132 183 ~thread_context_guard() noexcept
133 {
134
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 183 times.
183 if (!frame_.private_queue.empty())
135 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
136 183 context_stack.set(frame_.next);
137 183 }
138 };
139
140 scheduler_context*
141 451335 find_context(epoll_scheduler const* self) noexcept
142 {
143
2/2
✓ Branch 1 taken 449656 times.
✓ Branch 2 taken 1679 times.
451335 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
144
1/2
✓ Branch 0 taken 449656 times.
✗ Branch 1 not taken.
449656 if (c->key == self)
145 449656 return c;
146 1679 return nullptr;
147 }
148
149 } // namespace
150
151 void
152 85371 epoll_scheduler::
153 reset_inline_budget() const noexcept
154 {
155
1/2
✓ Branch 1 taken 85371 times.
✗ Branch 2 not taken.
85371 if (auto* ctx = find_context(this))
156 85371 ctx->inline_budget = max_inline_budget_;
157 85371 }
158
159 bool
160 226909 epoll_scheduler::
161 try_consume_inline_budget() const noexcept
162 {
163
1/2
✓ Branch 1 taken 226909 times.
✗ Branch 2 not taken.
226909 if (auto* ctx = find_context(this))
164 {
165
2/2
✓ Branch 0 taken 151328 times.
✓ Branch 1 taken 75581 times.
226909 if (ctx->inline_budget > 0)
166 {
167 151328 --ctx->inline_budget;
168 151328 return true;
169 }
170 }
171 75581 return false;
172 }
173
174 void
175 60961 descriptor_state::
176 operator()()
177 {
178 60961 is_enqueued_.store(false, std::memory_order_relaxed);
179
180 // Take ownership of impl ref set by close_socket() to prevent
181 // the owning impl from being freed while we're executing
182 60961 auto prevent_impl_destruction = std::move(impl_ref_);
183
184 60961 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 60961 times.
60961 if (ev == 0)
186 {
187 scheduler_->compensating_work_started();
188 return;
189 }
190
191 60961 op_queue local_ops;
192
193 60961 int err = 0;
194
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 60960 times.
60961 if (ev & EPOLLERR)
195 {
196 1 socklen_t len = sizeof(err);
197
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
198 err = errno;
199
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
200 1 err = EIO;
201 }
202
203 {
204
1/1
✓ Branch 1 taken 60961 times.
60961 std::lock_guard lock(mutex);
205
2/2
✓ Branch 0 taken 18273 times.
✓ Branch 1 taken 42688 times.
60961 if (ev & EPOLLIN)
206 {
207
2/2
✓ Branch 0 taken 4792 times.
✓ Branch 1 taken 13481 times.
18273 if (read_op)
208 {
209 4792 auto* rd = read_op;
210
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4792 times.
4792 if (err)
211 rd->complete(err, 0);
212 else
213 4792 rd->perform_io();
214
215
2/4
✓ Branch 0 taken 4792 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 4792 times.
4792 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
216 {
217 rd->errn = 0;
218 }
219 else
220 {
221 4792 read_op = nullptr;
222 4792 local_ops.push(rd);
223 }
224 }
225 else
226 {
227 13481 read_ready = true;
228 }
229 }
230
2/2
✓ Branch 0 taken 56173 times.
✓ Branch 1 taken 4788 times.
60961 if (ev & EPOLLOUT)
231 {
232
3/4
✓ Branch 0 taken 51382 times.
✓ Branch 1 taken 4791 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 51382 times.
56173 bool had_write_op = (connect_op || write_op);
233
2/2
✓ Branch 0 taken 4791 times.
✓ Branch 1 taken 51382 times.
56173 if (connect_op)
234 {
235 4791 auto* cn = connect_op;
236
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 4791 times.
4791 if (err)
237 cn->complete(err, 0);
238 else
239 4791 cn->perform_io();
240 4791 connect_op = nullptr;
241 4791 local_ops.push(cn);
242 }
243
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 56173 times.
56173 if (write_op)
244 {
245 auto* wr = write_op;
246 if (err)
247 wr->complete(err, 0);
248 else
249 wr->perform_io();
250
251 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
252 {
253 wr->errn = 0;
254 }
255 else
256 {
257 write_op = nullptr;
258 local_ops.push(wr);
259 }
260 }
261
2/2
✓ Branch 0 taken 51382 times.
✓ Branch 1 taken 4791 times.
56173 if (!had_write_op)
262 51382 write_ready = true;
263 }
264
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 60960 times.
60961 if (err)
265 {
266
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (read_op)
267 {
268 read_op->complete(err, 0);
269 local_ops.push(std::exchange(read_op, nullptr));
270 }
271
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (write_op)
272 {
273 write_op->complete(err, 0);
274 local_ops.push(std::exchange(write_op, nullptr));
275 }
276
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (connect_op)
277 {
278 connect_op->complete(err, 0);
279 local_ops.push(std::exchange(connect_op, nullptr));
280 }
281 }
282 60961 }
283
284 // Execute first handler inline — the scheduler's work_cleanup
285 // accounts for this as the "consumed" work item
286 60961 scheduler_op* first = local_ops.pop();
287
2/2
✓ Branch 0 taken 9583 times.
✓ Branch 1 taken 51378 times.
60961 if (first)
288 {
289
1/1
✓ Branch 1 taken 9583 times.
9583 scheduler_->post_deferred_completions(local_ops);
290
1/1
✓ Branch 1 taken 9583 times.
9583 (*first)();
291 }
292 else
293 {
294 51378 scheduler_->compensating_work_started();
295 }
296 60961 }
297
298 203 epoll_scheduler::
299 epoll_scheduler(
300 capy::execution_context& ctx,
301 203 int)
302 203 : epoll_fd_(-1)
303 203 , event_fd_(-1)
304 203 , timer_fd_(-1)
305 203 , outstanding_work_(0)
306 203 , stopped_(false)
307 203 , shutdown_(false)
308 203 , task_running_{false}
309 203 , task_interrupted_(false)
310 406 , state_(0)
311 {
312 203 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
313
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (epoll_fd_ < 0)
314 detail::throw_system_error(make_err(errno), "epoll_create1");
315
316 203 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
317
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (event_fd_ < 0)
318 {
319 int errn = errno;
320 ::close(epoll_fd_);
321 detail::throw_system_error(make_err(errn), "eventfd");
322 }
323
324 203 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
325
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 203 times.
203 if (timer_fd_ < 0)
326 {
327 int errn = errno;
328 ::close(event_fd_);
329 ::close(epoll_fd_);
330 detail::throw_system_error(make_err(errn), "timerfd_create");
331 }
332
333 203 epoll_event ev{};
334 203 ev.events = EPOLLIN | EPOLLET;
335 203 ev.data.ptr = nullptr;
336
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 203 times.
203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
337 {
338 int errn = errno;
339 ::close(timer_fd_);
340 ::close(event_fd_);
341 ::close(epoll_fd_);
342 detail::throw_system_error(make_err(errn), "epoll_ctl");
343 }
344
345 203 epoll_event timer_ev{};
346 203 timer_ev.events = EPOLLIN | EPOLLERR;
347 203 timer_ev.data.ptr = &timer_fd_;
348
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 203 times.
203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
349 {
350 int errn = errno;
351 ::close(timer_fd_);
352 ::close(event_fd_);
353 ::close(epoll_fd_);
354 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
355 }
356
357
1/1
✓ Branch 1 taken 203 times.
203 timer_svc_ = &get_timer_service(ctx, *this);
358
1/1
✓ Branch 3 taken 203 times.
203 timer_svc_->set_on_earliest_changed(
359 timer_service::callback(
360 this,
361 [](void* p) {
362 5002 auto* self = static_cast<epoll_scheduler*>(p);
363 5002 self->timerfd_stale_.store(true, std::memory_order_release);
364
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5002 times.
5002 if (self->task_running_.load(std::memory_order_acquire))
365 self->interrupt_reactor();
366 5002 }));
367
368 // Initialize resolver service
369
1/1
✓ Branch 1 taken 203 times.
203 get_resolver_service(ctx, *this);
370
371 // Initialize signal service
372
1/1
✓ Branch 1 taken 203 times.
203 get_signal_service(ctx, *this);
373
374 // Push task sentinel to interleave reactor runs with handler execution
375 203 completed_ops_.push(&task_op_);
376 203 }
377
378 406 epoll_scheduler::
379 203 ~epoll_scheduler()
380 {
381
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (timer_fd_ >= 0)
382 203 ::close(timer_fd_);
383
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (event_fd_ >= 0)
384 203 ::close(event_fd_);
385
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (epoll_fd_ >= 0)
386 203 ::close(epoll_fd_);
387 406 }
388
389 void
390 203 epoll_scheduler::
391 shutdown()
392 {
393 {
394
1/1
✓ Branch 1 taken 203 times.
203 std::unique_lock lock(mutex_);
395 203 shutdown_ = true;
396
397
2/2
✓ Branch 1 taken 203 times.
✓ Branch 2 taken 203 times.
406 while (auto* h = completed_ops_.pop())
398 {
399
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (h == &task_op_)
400 203 continue;
401 lock.unlock();
402 h->destroy();
403 lock.lock();
404 203 }
405
406 203 signal_all(lock);
407 203 }
408
409 203 outstanding_work_.store(0, std::memory_order_release);
410
411
1/2
✓ Branch 0 taken 203 times.
✗ Branch 1 not taken.
203 if (event_fd_ >= 0)
412 203 interrupt_reactor();
413 203 }
414
415 void
416 6835 epoll_scheduler::
417 post(std::coroutine_handle<> h) const
418 {
419 struct post_handler final
420 : scheduler_op
421 {
422 std::coroutine_handle<> h_;
423
424 explicit
425 6835 post_handler(std::coroutine_handle<> h)
426 6835 : h_(h)
427 {
428 6835 }
429
430 13670 ~post_handler() = default;
431
432 6835 void operator()() override
433 {
434 6835 auto h = h_;
435
1/2
✓ Branch 0 taken 6835 times.
✗ Branch 1 not taken.
6835 delete this;
436
1/1
✓ Branch 1 taken 6835 times.
6835 h.resume();
437 6835 }
438
439 void destroy() override
440 {
441 delete this;
442 }
443 };
444
445
1/1
✓ Branch 1 taken 6835 times.
6835 auto ph = std::make_unique<post_handler>(h);
446
447 // Fast path: same thread posts to private queue
448 // Only count locally; work_cleanup batches to global counter
449
2/2
✓ Branch 1 taken 5182 times.
✓ Branch 2 taken 1653 times.
6835 if (auto* ctx = find_context(this))
450 {
451 5182 ++ctx->private_outstanding_work;
452 5182 ctx->private_queue.push(ph.release());
453 5182 return;
454 }
455
456 // Slow path: cross-thread post requires mutex
457 1653 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
458
459
1/1
✓ Branch 1 taken 1653 times.
1653 std::unique_lock lock(mutex_);
460 1653 completed_ops_.push(ph.release());
461
1/1
✓ Branch 1 taken 1653 times.
1653 wake_one_thread_and_unlock(lock);
462 6835 }
463
464 void
465 80842 epoll_scheduler::
466 post(scheduler_op* h) const
467 {
468 // Fast path: same thread posts to private queue
469 // Only count locally; work_cleanup batches to global counter
470
2/2
✓ Branch 1 taken 80816 times.
✓ Branch 2 taken 26 times.
80842 if (auto* ctx = find_context(this))
471 {
472 80816 ++ctx->private_outstanding_work;
473 80816 ctx->private_queue.push(h);
474 80816 return;
475 }
476
477 // Slow path: cross-thread post requires mutex
478 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
479
480
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
481 26 completed_ops_.push(h);
482
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
483 26 }
484
485 void
486 5723 epoll_scheduler::
487 on_work_started() noexcept
488 {
489 5723 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
490 5723 }
491
492 void
493 5691 epoll_scheduler::
494 on_work_finished() noexcept
495 {
496
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5691 times.
11382 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
497 stop();
498 5691 }
499
500 bool
501 152021 epoll_scheduler::
502 running_in_this_thread() const noexcept
503 {
504
2/2
✓ Branch 1 taken 151781 times.
✓ Branch 2 taken 240 times.
152021 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
505
1/2
✓ Branch 0 taken 151781 times.
✗ Branch 1 not taken.
151781 if (c->key == this)
506 151781 return true;
507 240 return false;
508 }
509
510 void
511 43 epoll_scheduler::
512 stop()
513 {
514
1/1
✓ Branch 1 taken 43 times.
43 std::unique_lock lock(mutex_);
515
2/2
✓ Branch 0 taken 22 times.
✓ Branch 1 taken 21 times.
43 if (!stopped_)
516 {
517 22 stopped_ = true;
518 22 signal_all(lock);
519
1/1
✓ Branch 1 taken 22 times.
22 interrupt_reactor();
520 }
521 43 }
522
523 bool
524 18 epoll_scheduler::
525 stopped() const noexcept
526 {
527 18 std::unique_lock lock(mutex_);
528 36 return stopped_;
529 18 }
530
531 void
532 49 epoll_scheduler::
533 restart()
534 {
535
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
536 49 stopped_ = false;
537 49 }
538
539 std::size_t
540 183 epoll_scheduler::
541 run()
542 {
543
2/2
✓ Branch 1 taken 32 times.
✓ Branch 2 taken 151 times.
366 if (outstanding_work_.load(std::memory_order_acquire) == 0)
544 {
545
1/1
✓ Branch 1 taken 32 times.
32 stop();
546 32 return 0;
547 }
548
549 151 thread_context_guard ctx(this);
550
1/1
✓ Branch 1 taken 151 times.
151 std::unique_lock lock(mutex_);
551
552 151 std::size_t n = 0;
553 for (;;)
554 {
555
3/3
✓ Branch 1 taken 148756 times.
✓ Branch 3 taken 151 times.
✓ Branch 4 taken 148605 times.
148756 if (!do_one(lock, -1, &ctx.frame_))
556 151 break;
557
1/2
✓ Branch 1 taken 148605 times.
✗ Branch 2 not taken.
148605 if (n != (std::numeric_limits<std::size_t>::max)())
558 148605 ++n;
559
2/2
✓ Branch 1 taken 67714 times.
✓ Branch 2 taken 80891 times.
148605 if (!lock.owns_lock())
560
1/1
✓ Branch 1 taken 67714 times.
67714 lock.lock();
561 }
562 151 return n;
563 151 }
564
565 std::size_t
566 2 epoll_scheduler::
567 run_one()
568 {
569
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
570 {
571 stop();
572 return 0;
573 }
574
575 2 thread_context_guard ctx(this);
576
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
577
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
578 2 }
579
580 std::size_t
581 34 epoll_scheduler::
582 wait_one(long usec)
583 {
584
2/2
✓ Branch 1 taken 7 times.
✓ Branch 2 taken 27 times.
68 if (outstanding_work_.load(std::memory_order_acquire) == 0)
585 {
586
1/1
✓ Branch 1 taken 7 times.
7 stop();
587 7 return 0;
588 }
589
590 27 thread_context_guard ctx(this);
591
1/1
✓ Branch 1 taken 27 times.
27 std::unique_lock lock(mutex_);
592
1/1
✓ Branch 1 taken 27 times.
27 return do_one(lock, usec, &ctx.frame_);
593 27 }
594
595 std::size_t
596 2 epoll_scheduler::
597 poll()
598 {
599
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
600 {
601
1/1
✓ Branch 1 taken 1 time.
1 stop();
602 1 return 0;
603 }
604
605 1 thread_context_guard ctx(this);
606
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
607
608 1 std::size_t n = 0;
609 for (;;)
610 {
611
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
612 1 break;
613
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
614 2 ++n;
615
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
616
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
617 }
618 1 return n;
619 1 }
620
621 std::size_t
622 4 epoll_scheduler::
623 poll_one()
624 {
625
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
626 {
627
1/1
✓ Branch 1 taken 2 times.
2 stop();
628 2 return 0;
629 }
630
631 2 thread_context_guard ctx(this);
632
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
633
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
634 2 }
635
636 void
637 9654 epoll_scheduler::
638 register_descriptor(int fd, descriptor_state* desc) const
639 {
640 9654 epoll_event ev{};
641 9654 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
642 9654 ev.data.ptr = desc;
643
644
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9654 times.
9654 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
645 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
646
647 9654 desc->registered_events = ev.events;
648 9654 desc->fd = fd;
649 9654 desc->scheduler_ = this;
650
651
1/1
✓ Branch 1 taken 9654 times.
9654 std::lock_guard lock(desc->mutex);
652 9654 desc->read_ready = false;
653 9654 desc->write_ready = false;
654 9654 }
655
656 void
657 9654 epoll_scheduler::
658 deregister_descriptor(int fd) const
659 {
660 9654 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
661 9654 }
662
663 void
664 9788 epoll_scheduler::
665 work_started() const noexcept
666 {
667 9788 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
668 9788 }
669
670 void
671 16595 epoll_scheduler::
672 work_finished() const noexcept
673 {
674
2/2
✓ Branch 0 taken 158 times.
✓ Branch 1 taken 16437 times.
33190 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
675 {
676 // Last work item completed - wake all threads so they can exit.
677 // signal_all() wakes threads waiting on the condvar.
678 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
679 // Both are needed because they target different blocking mechanisms.
680 158 std::unique_lock lock(mutex_);
681 158 signal_all(lock);
682
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 158 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 158 times.
158 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
683 {
684 task_interrupted_ = true;
685 lock.unlock();
686 interrupt_reactor();
687 }
688 158 }
689 16595 }
690
691 void
692 51378 epoll_scheduler::
693 compensating_work_started() const noexcept
694 {
695 51378 auto* ctx = find_context(this);
696
1/2
✓ Branch 0 taken 51378 times.
✗ Branch 1 not taken.
51378 if (ctx)
697 51378 ++ctx->private_outstanding_work;
698 51378 }
699
700 void
701 epoll_scheduler::
702 drain_thread_queue(op_queue& queue, long count) const
703 {
704 // Note: outstanding_work_ was already incremented when posting
705 std::unique_lock lock(mutex_);
706 completed_ops_.splice(queue);
707 if (count > 0)
708 maybe_unlock_and_signal_one(lock);
709 }
710
711 void
712 9583 epoll_scheduler::
713 post_deferred_completions(op_queue& ops) const
714 {
715
1/2
✓ Branch 1 taken 9583 times.
✗ Branch 2 not taken.
9583 if (ops.empty())
716 9583 return;
717
718 // Fast path: if on scheduler thread, use private queue
719 if (auto* ctx = find_context(this))
720 {
721 ctx->private_queue.splice(ops);
722 return;
723 }
724
725 // Slow path: add to global queue and wake a thread
726 std::unique_lock lock(mutex_);
727 completed_ops_.splice(ops);
728 wake_one_thread_and_unlock(lock);
729 }
730
731 void
732 251 epoll_scheduler::
733 interrupt_reactor() const
734 {
735 // Only write if not already armed to avoid redundant writes
736 251 bool expected = false;
737
2/2
✓ Branch 1 taken 235 times.
✓ Branch 2 taken 16 times.
251 if (eventfd_armed_.compare_exchange_strong(expected, true,
738 std::memory_order_release, std::memory_order_relaxed))
739 {
740 235 std::uint64_t val = 1;
741
1/1
✓ Branch 1 taken 235 times.
235 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
742 }
743 251 }
744
745 void
746 383 epoll_scheduler::
747 signal_all(std::unique_lock<std::mutex>&) const
748 {
749 383 state_ |= 1;
750 383 cond_.notify_all();
751 383 }
752
753 bool
754 1679 epoll_scheduler::
755 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
756 {
757 1679 state_ |= 1;
758
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1679 times.
1679 if (state_ > 1)
759 {
760 lock.unlock();
761 cond_.notify_one();
762 return true;
763 }
764 1679 return false;
765 }
766
767 void
768 186948 epoll_scheduler::
769 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
770 {
771 186948 state_ |= 1;
772 186948 bool have_waiters = state_ > 1;
773 186948 lock.unlock();
774
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 186948 times.
186948 if (have_waiters)
775 cond_.notify_one();
776 186948 }
777
778 void
779 epoll_scheduler::
780 clear_signal() const
781 {
782 state_ &= ~std::size_t(1);
783 }
784
785 void
786 epoll_scheduler::
787 wait_for_signal(std::unique_lock<std::mutex>& lock) const
788 {
789 while ((state_ & 1) == 0)
790 {
791 state_ += 2;
792 cond_.wait(lock);
793 state_ -= 2;
794 }
795 }
796
797 void
798 epoll_scheduler::
799 wait_for_signal_for(
800 std::unique_lock<std::mutex>& lock,
801 long timeout_us) const
802 {
803 if ((state_ & 1) == 0)
804 {
805 state_ += 2;
806 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
807 state_ -= 2;
808 }
809 }
810
811 void
812 1679 epoll_scheduler::
813 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
814 {
815
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1679 times.
1679 if (maybe_unlock_and_signal_one(lock))
816 return;
817
818
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1653 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1653 times.
1679 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
819 {
820 26 task_interrupted_ = true;
821 26 lock.unlock();
822 26 interrupt_reactor();
823 }
824 else
825 {
826 1653 lock.unlock();
827 }
828 }
829
830 /** RAII guard for handler execution work accounting.
831
832 Handler consumes 1 work item, may produce N new items via fast-path posts.
833 Net change = N - 1:
834 - If N > 1: add (N-1) to global (more work produced than consumed)
835 - If N == 1: net zero, do nothing
836 - If N < 1: call work_finished() (work consumed, may trigger stop)
837
838 Also drains private queue to global for other threads to process.
839 */
840 struct work_cleanup
841 {
842 epoll_scheduler const* scheduler;
843 std::unique_lock<std::mutex>* lock;
844 scheduler_context* ctx;
845
846 148638 ~work_cleanup()
847 {
848
1/2
✓ Branch 0 taken 148638 times.
✗ Branch 1 not taken.
148638 if (ctx)
849 {
850 148638 long produced = ctx->private_outstanding_work;
851
2/2
✓ Branch 0 taken 95 times.
✓ Branch 1 taken 148543 times.
148638 if (produced > 1)
852 95 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
853
2/2
✓ Branch 0 taken 16358 times.
✓ Branch 1 taken 132185 times.
148543 else if (produced < 1)
854 16358 scheduler->work_finished();
855 // produced == 1: net zero, handler consumed what it produced
856 148638 ctx->private_outstanding_work = 0;
857
858
2/2
✓ Branch 1 taken 80902 times.
✓ Branch 2 taken 67736 times.
148638 if (!ctx->private_queue.empty())
859 {
860 80902 lock->lock();
861 80902 scheduler->completed_ops_.splice(ctx->private_queue);
862 }
863 }
864 else
865 {
866 // No thread context - slow-path op was already counted globally
867 scheduler->work_finished();
868 }
869 148638 }
870 };
871
872 /** RAII guard for reactor work accounting.
873
874 Reactor only produces work via timer/signal callbacks posting handlers.
875 Unlike handler execution which consumes 1, the reactor consumes nothing.
876 All produced work must be flushed to global counter.
877 */
878 struct task_cleanup
879 {
880 epoll_scheduler const* scheduler;
881 std::unique_lock<std::mutex>* lock;
882 scheduler_context* ctx;
883
884 48122 ~task_cleanup()
885 48122 {
886
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 48122 times.
48122 if (!ctx)
887 return;
888
889
2/2
✓ Branch 0 taken 4996 times.
✓ Branch 1 taken 43126 times.
48122 if (ctx->private_outstanding_work > 0)
890 {
891 4996 scheduler->outstanding_work_.fetch_add(
892 4996 ctx->private_outstanding_work, std::memory_order_relaxed);
893 4996 ctx->private_outstanding_work = 0;
894 }
895
896
2/2
✓ Branch 1 taken 4996 times.
✓ Branch 2 taken 43126 times.
48122 if (!ctx->private_queue.empty())
897 {
898
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4996 times.
4996 if (!lock->owns_lock())
899 lock->lock();
900 4996 scheduler->completed_ops_.splice(ctx->private_queue);
901 }
902 48122 }
903 };
904
905 void
906 9986 epoll_scheduler::
907 update_timerfd() const
908 {
909 9986 auto nearest = timer_svc_->nearest_expiry();
910
911 9986 itimerspec ts{};
912 9986 int flags = 0;
913
914
3/3
✓ Branch 2 taken 9986 times.
✓ Branch 4 taken 9942 times.
✓ Branch 5 taken 44 times.
9986 if (nearest == timer_service::time_point::max())
915 {
916 // No timers - disarm by setting to 0 (relative)
917 }
918 else
919 {
920 9942 auto now = std::chrono::steady_clock::now();
921
3/3
✓ Branch 1 taken 9942 times.
✓ Branch 4 taken 59 times.
✓ Branch 5 taken 9883 times.
9942 if (nearest <= now)
922 {
923 // Use 1ns instead of 0 - zero disarms the timerfd
924 59 ts.it_value.tv_nsec = 1;
925 }
926 else
927 {
928 9883 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
929
1/1
✓ Branch 1 taken 9883 times.
19766 nearest - now).count();
930 9883 ts.it_value.tv_sec = nsec / 1000000000;
931 9883 ts.it_value.tv_nsec = nsec % 1000000000;
932 // Ensure non-zero to avoid disarming if duration rounds to 0
933
3/4
✓ Branch 0 taken 9872 times.
✓ Branch 1 taken 11 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9872 times.
9883 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
934 ts.it_value.tv_nsec = 1;
935 }
936 }
937
938
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 9986 times.
9986 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
939 detail::throw_system_error(make_err(errno), "timerfd_settime");
940 9986 }
941
942 void
943 48122 epoll_scheduler::
944 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
945 {
946
2/2
✓ Branch 0 taken 38310 times.
✓ Branch 1 taken 9812 times.
48122 int timeout_ms = task_interrupted_ ? 0 : -1;
947
948
2/2
✓ Branch 1 taken 9812 times.
✓ Branch 2 taken 38310 times.
48122 if (lock.owns_lock())
949
1/1
✓ Branch 1 taken 9812 times.
9812 lock.unlock();
950
951 48122 task_cleanup on_exit{this, &lock, ctx};
952
953 // Flush deferred timerfd programming before blocking
954
2/2
✓ Branch 1 taken 4990 times.
✓ Branch 2 taken 43132 times.
48122 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
955
1/1
✓ Branch 1 taken 4990 times.
4990 update_timerfd();
956
957 // Event loop runs without mutex held
958 epoll_event events[128];
959
1/1
✓ Branch 1 taken 48122 times.
48122 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
960
961
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 48122 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
48122 if (nfds < 0 && errno != EINTR)
962 detail::throw_system_error(make_err(errno), "epoll_wait");
963
964 48122 bool check_timers = false;
965 48122 op_queue local_ops;
966
967 // Process events without holding the mutex
968
2/2
✓ Branch 0 taken 65989 times.
✓ Branch 1 taken 48122 times.
114111 for (int i = 0; i < nfds; ++i)
969 {
970
2/2
✓ Branch 0 taken 32 times.
✓ Branch 1 taken 65957 times.
65989 if (events[i].data.ptr == nullptr)
971 {
972 std::uint64_t val;
973
1/1
✓ Branch 1 taken 32 times.
32 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
974 32 eventfd_armed_.store(false, std::memory_order_relaxed);
975 32 continue;
976 32 }
977
978
2/2
✓ Branch 0 taken 4996 times.
✓ Branch 1 taken 60961 times.
65957 if (events[i].data.ptr == &timer_fd_)
979 {
980 std::uint64_t expirations;
981
1/1
✓ Branch 1 taken 4996 times.
4996 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
982 4996 check_timers = true;
983 4996 continue;
984 4996 }
985
986 // Deferred I/O: just set ready events and enqueue descriptor
987 // No per-descriptor mutex locking in reactor hot path!
988 60961 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
989 60961 desc->add_ready_events(events[i].events);
990
991 // Only enqueue if not already enqueued
992 60961 bool expected = false;
993
1/2
✓ Branch 1 taken 60961 times.
✗ Branch 2 not taken.
60961 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
994 std::memory_order_release, std::memory_order_relaxed))
995 {
996 60961 local_ops.push(desc);
997 }
998 }
999
1000 // Process timers only when timerfd fires
1001
2/2
✓ Branch 0 taken 4996 times.
✓ Branch 1 taken 43126 times.
48122 if (check_timers)
1002 {
1003
1/1
✓ Branch 1 taken 4996 times.
4996 timer_svc_->process_expired();
1004
1/1
✓ Branch 1 taken 4996 times.
4996 update_timerfd();
1005 }
1006
1007
1/1
✓ Branch 1 taken 48122 times.
48122 lock.lock();
1008
1009
2/2
✓ Branch 1 taken 37856 times.
✓ Branch 2 taken 10266 times.
48122 if (!local_ops.empty())
1010 37856 completed_ops_.splice(local_ops);
1011 48122 }
1012
1013 std::size_t
1014 148790 epoll_scheduler::
1015 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
1016 {
1017 for (;;)
1018 {
1019
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 196910 times.
196912 if (stopped_)
1020 2 return 0;
1021
1022 196910 scheduler_op* op = completed_ops_.pop();
1023
1024 // Handle reactor sentinel - time to poll for I/O
1025
2/2
✓ Branch 0 taken 48272 times.
✓ Branch 1 taken 148638 times.
196910 if (op == &task_op_)
1026 {
1027 48272 bool more_handlers = !completed_ops_.empty();
1028
1029 // Nothing to run the reactor for: no pending work to wait on,
1030 // or caller requested a non-blocking poll
1031
4/4
✓ Branch 0 taken 9962 times.
✓ Branch 1 taken 38310 times.
✓ Branch 2 taken 150 times.
✓ Branch 3 taken 48122 times.
58234 if (!more_handlers &&
1032
3/4
✓ Branch 1 taken 9812 times.
✓ Branch 2 taken 150 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 9812 times.
19924 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1033 timeout_us == 0))
1034 {
1035 150 completed_ops_.push(&task_op_);
1036 150 return 0;
1037 }
1038
1039
3/4
✓ Branch 0 taken 9812 times.
✓ Branch 1 taken 38310 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 9812 times.
48122 task_interrupted_ = more_handlers || timeout_us == 0;
1040 48122 task_running_.store(true, std::memory_order_release);
1041
1042
2/2
✓ Branch 0 taken 38310 times.
✓ Branch 1 taken 9812 times.
48122 if (more_handlers)
1043 38310 unlock_and_signal_one(lock);
1044
1045 48122 run_task(lock, ctx);
1046
1047 48122 task_running_.store(false, std::memory_order_relaxed);
1048 48122 completed_ops_.push(&task_op_);
1049 48122 continue;
1050 48122 }
1051
1052 // Handle operation
1053
1/2
✓ Branch 0 taken 148638 times.
✗ Branch 1 not taken.
148638 if (op != nullptr)
1054 {
1055
1/2
✓ Branch 1 taken 148638 times.
✗ Branch 2 not taken.
148638 if (!completed_ops_.empty())
1056
1/1
✓ Branch 1 taken 148638 times.
148638 unlock_and_signal_one(lock);
1057 else
1058 lock.unlock();
1059
1060 148638 work_cleanup on_exit{this, &lock, ctx};
1061
1062
1/1
✓ Branch 1 taken 148638 times.
148638 (*op)();
1063 148638 return 1;
1064 148638 }
1065
1066 // No pending work to wait on, or caller requested non-blocking poll
1067 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1068 timeout_us == 0)
1069 return 0;
1070
1071 clear_signal();
1072 if (timeout_us < 0)
1073 wait_for_signal(lock);
1074 else
1075 wait_for_signal_for(lock, timeout_us);
1076 48122 }
1077 }
1078
1079 } // namespace boost::corosio::detail
1080
1081 #endif
1082