TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/native_scheduler.hpp>
21 : #include <boost/corosio/detail/scheduler_op.hpp>
22 :
23 : #include <boost/corosio/native/detail/select/select_op.hpp>
24 : #include <boost/corosio/detail/timer_service.hpp>
25 : #include <boost/corosio/detail/make_err.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28 :
29 : #include <boost/corosio/detail/except.hpp>
30 : #include <boost/corosio/detail/thread_local_ptr.hpp>
31 :
32 : #include <sys/select.h>
33 : #include <sys/socket.h>
34 : #include <unistd.h>
35 : #include <errno.h>
36 : #include <fcntl.h>
37 :
38 : #include <algorithm>
39 : #include <atomic>
40 : #include <chrono>
41 : #include <condition_variable>
42 : #include <cstddef>
43 : #include <limits>
44 : #include <mutex>
45 : #include <unordered_map>
46 :
47 : namespace boost::corosio::detail {
48 :
49 : struct select_op;
50 :
51 : /** POSIX scheduler using select() for I/O multiplexing.
52 :
53 : This scheduler implements the scheduler interface using the POSIX select()
54 : call for I/O event notification. It uses a single reactor model
55 : where one thread runs select() while other threads wait on a condition
56 : variable for handler work. This design provides:
57 :
58 : - Handler parallelism: N posted handlers can execute on N threads
59 : - No thundering herd: condition_variable wakes exactly one thread
60 : - Portability: Works on all POSIX systems
61 :
62 : The design mirrors epoll_scheduler for behavioral consistency:
63 : - Same single-reactor thread coordination model
64 : - Same work counting semantics
65 : - Same timer integration pattern
66 :
67 : Known Limitations:
68 : - FD_SETSIZE (~1024) limits maximum concurrent connections
69 : - O(n) scanning: rebuilds fd_sets each iteration
70 : - Level-triggered only (no edge-triggered mode)
71 :
72 : @par Thread Safety
73 : All public member functions are thread-safe.
74 : */
75 : class BOOST_COROSIO_DECL select_scheduler final
76 : : public native_scheduler
77 : , public capy::execution_context::service
78 : {
79 : public:
80 : using key_type = scheduler;
81 :
82 : /** Construct the scheduler.
83 :
84 : Creates a self-pipe for reactor interruption.
85 :
86 : @param ctx Reference to the owning execution_context.
87 : @param concurrency_hint Hint for expected thread count (unused).
88 : */
89 : select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
90 :
91 : ~select_scheduler() override;
92 :
93 : select_scheduler(select_scheduler const&) = delete;
94 : select_scheduler& operator=(select_scheduler const&) = delete;
95 :
96 : void shutdown() override;
97 : void post(std::coroutine_handle<> h) const override;
98 : void post(scheduler_op* h) const override;
99 : bool running_in_this_thread() const noexcept override;
100 : void stop() override;
101 : bool stopped() const noexcept override;
102 : void restart() override;
103 : std::size_t run() override;
104 : std::size_t run_one() override;
105 : std::size_t wait_one(long usec) override;
106 : std::size_t poll() override;
107 : std::size_t poll_one() override;
108 :
109 : /** Return the maximum file descriptor value supported.
110 :
111 : Returns FD_SETSIZE - 1, the maximum fd value that can be
112 : monitored by select(). Operations with fd >= FD_SETSIZE
113 : will fail with EINVAL.
114 :
115 : @return The maximum supported file descriptor value.
116 : */
117 : static constexpr int max_fd() noexcept
118 : {
119 : return FD_SETSIZE - 1;
120 : }
121 :
122 : /** Register a file descriptor for monitoring.
123 :
124 : @param fd The file descriptor to register.
125 : @param op The operation associated with this fd.
126 : @param events Event mask: 1 = read, 2 = write, 3 = both.
127 : */
128 : void register_fd(int fd, select_op* op, int events) const;
129 :
130 : /** Unregister a file descriptor from monitoring.
131 :
132 : @param fd The file descriptor to unregister.
133 : @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
134 : */
135 : void deregister_fd(int fd, int events) const;
136 :
137 : void work_started() noexcept override;
138 : void work_finished() noexcept override;
139 :
140 : // Event flags for register_fd/deregister_fd
141 : static constexpr int event_read = 1;
142 : static constexpr int event_write = 2;
143 :
144 : private:
145 : std::size_t do_one(long timeout_us);
146 : void run_reactor(std::unique_lock<std::mutex>& lock);
147 : void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
148 : void interrupt_reactor() const;
149 : long calculate_timeout(long requested_timeout_us) const;
150 :
151 : // Self-pipe for interrupting select()
152 : int pipe_fds_[2]; // [0]=read, [1]=write
153 :
154 : mutable std::mutex mutex_;
155 : mutable std::condition_variable wakeup_event_;
156 : mutable op_queue completed_ops_;
157 : mutable std::atomic<long> outstanding_work_;
158 : std::atomic<bool> stopped_;
159 : bool shutdown_;
160 :
161 : // Per-fd state for tracking registered operations
162 : struct fd_state
163 : {
164 : select_op* read_op = nullptr;
165 : select_op* write_op = nullptr;
166 : };
167 : mutable std::unordered_map<int, fd_state> registered_fds_;
168 : mutable int max_fd_ = -1;
169 :
170 : // Single reactor thread coordination
171 : mutable bool reactor_running_ = false;
172 : mutable bool reactor_interrupted_ = false;
173 : mutable int idle_thread_count_ = 0;
174 :
175 : // Sentinel operation for interleaving reactor runs with handler execution.
176 : // Ensures the reactor runs periodically even when handlers are continuously
177 : // posted, preventing timer starvation.
178 : struct task_op final : scheduler_op
179 : {
180 MIS 0 : void operator()() override {}
181 0 : void destroy() override {}
182 : };
183 : task_op task_op_;
184 : };
185 :
186 : /*
187 : select Scheduler - Single Reactor Model
188 : =======================================
189 :
190 : This scheduler mirrors the epoll_scheduler design but uses select() instead
191 : of epoll for I/O multiplexing. The thread coordination strategy is identical:
192 : one thread becomes the "reactor" while others wait on a condition variable.
193 :
194 : Thread Model
195 : ------------
196 : - ONE thread runs select() at a time (the reactor thread)
197 : - OTHER threads wait on wakeup_event_ (condition variable) for handlers
198 : - When work is posted, exactly one waiting thread wakes via notify_one()
199 :
200 : Key Differences from epoll
201 : --------------------------
202 : - Uses self-pipe instead of eventfd for interruption (more portable)
203 : - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
204 : - FD_SETSIZE limit (~1024 fds on most systems)
205 : - Level-triggered only (no edge-triggered mode)
206 :
207 : Self-Pipe Pattern
208 : -----------------
209 : To interrupt a blocking select() call (e.g., when work is posted or a timer
210 : expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
211 : always in the read_fds set, so select() returns immediately. We drain the
212 : pipe to clear the readable state.
213 :
214 : fd-to-op Mapping
215 : ----------------
216 : We use an unordered_map<int, fd_state> to track which operations are
217 : registered for each fd. This allows O(1) lookup when select() returns
218 : ready fds. Each fd can have at most one read op and one write op registered.
219 : */
220 :
221 : namespace select {
222 :
223 : struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
224 : {
225 : select_scheduler const* key;
226 : scheduler_context* next;
227 : };
228 :
229 : inline thread_local_ptr<scheduler_context> context_stack;
230 :
231 : struct thread_context_guard
232 : {
233 : scheduler_context frame_;
234 :
235 HIT 124 : explicit thread_context_guard(select_scheduler const* ctx) noexcept
236 124 : : frame_{ctx, context_stack.get()}
237 : {
238 124 : context_stack.set(&frame_);
239 124 : }
240 :
241 124 : ~thread_context_guard() noexcept
242 : {
243 124 : context_stack.set(frame_.next);
244 124 : }
245 : };
246 :
247 : struct work_guard
248 : {
249 : select_scheduler* self;
250 171486 : ~work_guard()
251 : {
252 171486 : self->work_finished();
253 171486 : }
254 : };
255 :
256 : } // namespace select
257 :
258 135 : inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
259 135 : : pipe_fds_{-1, -1}
260 135 : , outstanding_work_(0)
261 135 : , stopped_(false)
262 135 : , shutdown_(false)
263 135 : , max_fd_(-1)
264 135 : , reactor_running_(false)
265 135 : , reactor_interrupted_(false)
266 270 : , idle_thread_count_(0)
267 : {
268 : // Create self-pipe for interrupting select()
269 135 : if (::pipe(pipe_fds_) < 0)
270 MIS 0 : detail::throw_system_error(make_err(errno), "pipe");
271 :
272 : // Set both ends to non-blocking and close-on-exec
273 HIT 405 : for (int i = 0; i < 2; ++i)
274 : {
275 270 : int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
276 270 : if (flags == -1)
277 : {
278 MIS 0 : int errn = errno;
279 0 : ::close(pipe_fds_[0]);
280 0 : ::close(pipe_fds_[1]);
281 0 : detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
282 : }
283 HIT 270 : if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
284 : {
285 MIS 0 : int errn = errno;
286 0 : ::close(pipe_fds_[0]);
287 0 : ::close(pipe_fds_[1]);
288 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
289 : }
290 HIT 270 : if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
291 : {
292 MIS 0 : int errn = errno;
293 0 : ::close(pipe_fds_[0]);
294 0 : ::close(pipe_fds_[1]);
295 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
296 : }
297 : }
298 :
299 HIT 135 : timer_svc_ = &get_timer_service(ctx, *this);
300 135 : timer_svc_->set_on_earliest_changed(
301 3483 : timer_service::callback(this, [](void* p) {
302 3348 : static_cast<select_scheduler*>(p)->interrupt_reactor();
303 3348 : }));
304 :
305 : // Initialize resolver service
306 135 : get_resolver_service(ctx, *this);
307 :
308 : // Initialize signal service
309 135 : get_signal_service(ctx, *this);
310 :
311 : // Push task sentinel to interleave reactor runs with handler execution
312 135 : completed_ops_.push(&task_op_);
313 135 : }
314 :
315 270 : inline select_scheduler::~select_scheduler()
316 : {
317 135 : if (pipe_fds_[0] >= 0)
318 135 : ::close(pipe_fds_[0]);
319 135 : if (pipe_fds_[1] >= 0)
320 135 : ::close(pipe_fds_[1]);
321 270 : }
322 :
323 : inline void
324 135 : select_scheduler::shutdown()
325 : {
326 : {
327 135 : std::unique_lock lock(mutex_);
328 135 : shutdown_ = true;
329 :
330 270 : while (auto* h = completed_ops_.pop())
331 : {
332 135 : if (h == &task_op_)
333 135 : continue;
334 MIS 0 : lock.unlock();
335 0 : h->destroy();
336 0 : lock.lock();
337 HIT 135 : }
338 135 : }
339 :
340 135 : outstanding_work_.store(0, std::memory_order_release);
341 :
342 135 : if (pipe_fds_[1] >= 0)
343 135 : interrupt_reactor();
344 :
345 135 : wakeup_event_.notify_all();
346 135 : }
347 :
348 : inline void
349 3689 : select_scheduler::post(std::coroutine_handle<> h) const
350 : {
351 : struct post_handler final : scheduler_op
352 : {
353 : std::coroutine_handle<> h_;
354 :
355 3689 : explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
356 :
357 7378 : ~post_handler() override = default;
358 :
359 3689 : void operator()() override
360 : {
361 3689 : auto h = h_;
362 3689 : delete this;
363 3689 : h.resume();
364 3689 : }
365 :
366 MIS 0 : void destroy() override
367 : {
368 0 : delete this;
369 0 : }
370 : };
371 :
372 HIT 3689 : auto ph = std::make_unique<post_handler>(h);
373 3689 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
374 :
375 3689 : std::unique_lock lock(mutex_);
376 3689 : completed_ops_.push(ph.release());
377 3689 : wake_one_thread_and_unlock(lock);
378 3689 : }
379 :
380 : inline void
381 161405 : select_scheduler::post(scheduler_op* h) const
382 : {
383 161405 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
384 :
385 161405 : std::unique_lock lock(mutex_);
386 161405 : completed_ops_.push(h);
387 161405 : wake_one_thread_and_unlock(lock);
388 161405 : }
389 :
390 : inline bool
391 551 : select_scheduler::running_in_this_thread() const noexcept
392 : {
393 551 : for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
394 361 : if (c->key == this)
395 361 : return true;
396 190 : return false;
397 : }
398 :
399 : inline void
400 103 : select_scheduler::stop()
401 : {
402 103 : bool expected = false;
403 103 : if (stopped_.compare_exchange_strong(
404 : expected, true, std::memory_order_release,
405 : std::memory_order_relaxed))
406 : {
407 : // Wake all threads so they notice stopped_ and exit
408 : {
409 103 : std::lock_guard lock(mutex_);
410 103 : wakeup_event_.notify_all();
411 103 : }
412 103 : interrupt_reactor();
413 : }
414 103 : }
415 :
416 : inline bool
417 3 : select_scheduler::stopped() const noexcept
418 : {
419 3 : return stopped_.load(std::memory_order_acquire);
420 : }
421 :
422 : inline void
423 37 : select_scheduler::restart()
424 : {
425 37 : stopped_.store(false, std::memory_order_release);
426 37 : }
427 :
428 : inline std::size_t
429 100 : select_scheduler::run()
430 : {
431 100 : if (stopped_.load(std::memory_order_acquire))
432 MIS 0 : return 0;
433 :
434 HIT 200 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
435 : {
436 MIS 0 : stop();
437 0 : return 0;
438 : }
439 :
440 HIT 100 : select::thread_context_guard ctx(this);
441 :
442 100 : std::size_t n = 0;
443 171562 : while (do_one(-1))
444 171462 : if (n != (std::numeric_limits<std::size_t>::max)())
445 171462 : ++n;
446 100 : return n;
447 100 : }
448 :
449 : inline std::size_t
450 MIS 0 : select_scheduler::run_one()
451 : {
452 0 : if (stopped_.load(std::memory_order_acquire))
453 0 : return 0;
454 :
455 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
456 : {
457 0 : stop();
458 0 : return 0;
459 : }
460 :
461 0 : select::thread_context_guard ctx(this);
462 0 : return do_one(-1);
463 0 : }
464 :
465 : inline std::size_t
466 HIT 27 : select_scheduler::wait_one(long usec)
467 : {
468 27 : if (stopped_.load(std::memory_order_acquire))
469 3 : return 0;
470 :
471 48 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
472 : {
473 MIS 0 : stop();
474 0 : return 0;
475 : }
476 :
477 HIT 24 : select::thread_context_guard ctx(this);
478 24 : return do_one(usec);
479 24 : }
480 :
481 : inline std::size_t
482 MIS 0 : select_scheduler::poll()
483 : {
484 0 : if (stopped_.load(std::memory_order_acquire))
485 0 : return 0;
486 :
487 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
488 : {
489 0 : stop();
490 0 : return 0;
491 : }
492 :
493 0 : select::thread_context_guard ctx(this);
494 :
495 0 : std::size_t n = 0;
496 0 : while (do_one(0))
497 0 : if (n != (std::numeric_limits<std::size_t>::max)())
498 0 : ++n;
499 0 : return n;
500 0 : }
501 :
502 : inline std::size_t
503 0 : select_scheduler::poll_one()
504 : {
505 0 : if (stopped_.load(std::memory_order_acquire))
506 0 : return 0;
507 :
508 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
509 : {
510 0 : stop();
511 0 : return 0;
512 : }
513 :
514 0 : select::thread_context_guard ctx(this);
515 0 : return do_one(0);
516 0 : }
517 :
518 : inline void
519 HIT 6551 : select_scheduler::register_fd(int fd, select_op* op, int events) const
520 : {
521 : // Validate fd is within select() limits
522 6551 : if (fd < 0 || fd >= FD_SETSIZE)
523 MIS 0 : detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
524 :
525 : {
526 HIT 6551 : std::lock_guard lock(mutex_);
527 :
528 6551 : auto& state = registered_fds_[fd];
529 6551 : if (events & event_read)
530 3414 : state.read_op = op;
531 6551 : if (events & event_write)
532 3137 : state.write_op = op;
533 :
534 6551 : if (fd > max_fd_)
535 229 : max_fd_ = fd;
536 6551 : }
537 :
538 : // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
539 : // with the newly registered fd.
540 6551 : interrupt_reactor();
541 6551 : }
542 :
543 : inline void
544 6487 : select_scheduler::deregister_fd(int fd, int events) const
545 : {
546 6487 : std::lock_guard lock(mutex_);
547 :
548 6487 : auto it = registered_fds_.find(fd);
549 6487 : if (it == registered_fds_.end())
550 6328 : return;
551 :
552 159 : if (events & event_read)
553 159 : it->second.read_op = nullptr;
554 159 : if (events & event_write)
555 MIS 0 : it->second.write_op = nullptr;
556 :
557 : // Remove entry if both are null
558 HIT 159 : if (!it->second.read_op && !it->second.write_op)
559 : {
560 159 : registered_fds_.erase(it);
561 :
562 : // Recalculate max_fd_ if needed
563 159 : if (fd == max_fd_)
564 : {
565 158 : max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
566 158 : for (auto& [registered_fd, state] : registered_fds_)
567 : {
568 MIS 0 : if (registered_fd > max_fd_)
569 0 : max_fd_ = registered_fd;
570 : }
571 : }
572 : }
573 HIT 6487 : }
574 :
575 : inline void
576 10472 : select_scheduler::work_started() noexcept
577 : {
578 10472 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
579 10472 : }
580 :
581 : inline void
582 175566 : select_scheduler::work_finished() noexcept
583 : {
584 351132 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
585 103 : stop();
586 175566 : }
587 :
588 : inline void
589 13476 : select_scheduler::interrupt_reactor() const
590 : {
591 13476 : char byte = 1;
592 13476 : [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
593 13476 : }
594 :
595 : inline void
596 165094 : select_scheduler::wake_one_thread_and_unlock(
597 : std::unique_lock<std::mutex>& lock) const
598 : {
599 165094 : if (idle_thread_count_ > 0)
600 : {
601 : // Idle worker exists - wake it via condvar
602 MIS 0 : wakeup_event_.notify_one();
603 0 : lock.unlock();
604 : }
605 HIT 165094 : else if (reactor_running_ && !reactor_interrupted_)
606 : {
607 : // No idle workers but reactor is running - interrupt it
608 3339 : reactor_interrupted_ = true;
609 3339 : lock.unlock();
610 3339 : interrupt_reactor();
611 : }
612 : else
613 : {
614 : // No one to wake
615 161755 : lock.unlock();
616 : }
617 165094 : }
618 :
619 : inline long
620 9618 : select_scheduler::calculate_timeout(long requested_timeout_us) const
621 : {
622 9618 : if (requested_timeout_us == 0)
623 MIS 0 : return 0;
624 :
625 HIT 9618 : auto nearest = timer_svc_->nearest_expiry();
626 9618 : if (nearest == timer_service::time_point::max())
627 37 : return requested_timeout_us;
628 :
629 9581 : auto now = std::chrono::steady_clock::now();
630 9581 : if (nearest <= now)
631 120 : return 0;
632 :
633 : auto timer_timeout_us =
634 9461 : std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
635 9461 : .count();
636 :
637 : // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
638 9461 : constexpr auto long_max =
639 : static_cast<long long>((std::numeric_limits<long>::max)());
640 : auto capped_timer_us =
641 9461 : (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
642 9461 : static_cast<long long>(0)),
643 9461 : long_max);
644 :
645 9461 : if (requested_timeout_us < 0)
646 9461 : return static_cast<long>(capped_timer_us);
647 :
648 : // requested_timeout_us is already long, so min() result fits in long
649 : return static_cast<long>(
650 MIS 0 : (std::min)(static_cast<long long>(requested_timeout_us),
651 0 : capped_timer_us));
652 : }
653 :
654 : inline void
655 HIT 81962 : select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
656 : {
657 : // Calculate timeout considering timers, use 0 if interrupted
658 : long effective_timeout_us =
659 81962 : reactor_interrupted_ ? 0 : calculate_timeout(-1);
660 :
661 : // Build fd_sets from registered_fds_
662 : fd_set read_fds, write_fds, except_fds;
663 1393354 : FD_ZERO(&read_fds);
664 1393354 : FD_ZERO(&write_fds);
665 1393354 : FD_ZERO(&except_fds);
666 :
667 : // Always include the interrupt pipe
668 81962 : FD_SET(pipe_fds_[0], &read_fds);
669 81962 : int nfds = pipe_fds_[0];
670 :
671 : // Add registered fds
672 97588 : for (auto& [fd, state] : registered_fds_)
673 : {
674 15626 : if (state.read_op)
675 12489 : FD_SET(fd, &read_fds);
676 15626 : if (state.write_op)
677 : {
678 3137 : FD_SET(fd, &write_fds);
679 : // Also monitor for errors on connect operations
680 3137 : FD_SET(fd, &except_fds);
681 : }
682 15626 : if (fd > nfds)
683 12492 : nfds = fd;
684 : }
685 :
686 : // Convert timeout to timeval
687 : struct timeval tv;
688 81962 : struct timeval* tv_ptr = nullptr;
689 81962 : if (effective_timeout_us >= 0)
690 : {
691 81925 : tv.tv_sec = effective_timeout_us / 1000000;
692 81925 : tv.tv_usec = effective_timeout_us % 1000000;
693 81925 : tv_ptr = &tv;
694 : }
695 :
696 81962 : lock.unlock();
697 :
698 81962 : int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
699 81962 : int saved_errno = errno;
700 :
701 : // Process timers outside the lock
702 81962 : timer_svc_->process_expired();
703 :
704 81962 : if (ready < 0 && saved_errno != EINTR)
705 MIS 0 : detail::throw_system_error(make_err(saved_errno), "select");
706 :
707 : // Re-acquire lock before modifying completed_ops_
708 HIT 81962 : lock.lock();
709 :
710 : // Drain the interrupt pipe if readable
711 81962 : if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
712 : {
713 : char buf[256];
714 19950 : while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
715 : {
716 : }
717 : }
718 :
719 : // Process I/O completions
720 81962 : int completions_queued = 0;
721 81962 : if (ready > 0)
722 : {
723 : // Iterate over registered fds (copy keys to avoid iterator invalidation)
724 9975 : std::vector<int> fds_to_check;
725 9975 : fds_to_check.reserve(registered_fds_.size());
726 22500 : for (auto& [fd, state] : registered_fds_)
727 12525 : fds_to_check.push_back(fd);
728 :
729 22500 : for (int fd : fds_to_check)
730 : {
731 12525 : auto it = registered_fds_.find(fd);
732 12525 : if (it == registered_fds_.end())
733 MIS 0 : continue;
734 :
735 HIT 12525 : auto& state = it->second;
736 :
737 : // Check for errors (especially for connect operations)
738 12525 : bool has_error = FD_ISSET(fd, &except_fds);
739 :
740 : // Process read readiness
741 12525 : if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
742 : {
743 3255 : auto* op = state.read_op;
744 : // Claim the op by exchanging to unregistered. Both registering and
745 : // registered states mean the op is ours to complete.
746 3255 : auto prev = op->registered.exchange(
747 : select_registration_state::unregistered,
748 : std::memory_order_acq_rel);
749 3255 : if (prev != select_registration_state::unregistered)
750 : {
751 3255 : state.read_op = nullptr;
752 :
753 3255 : if (has_error)
754 : {
755 MIS 0 : int errn = 0;
756 0 : socklen_t len = sizeof(errn);
757 0 : if (::getsockopt(
758 0 : fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
759 0 : errn = errno;
760 0 : if (errn == 0)
761 0 : errn = EIO;
762 0 : op->complete(errn, 0);
763 : }
764 : else
765 : {
766 HIT 3255 : op->perform_io();
767 : }
768 :
769 3255 : completed_ops_.push(op);
770 3255 : ++completions_queued;
771 : }
772 : }
773 :
774 : // Process write readiness
775 12525 : if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
776 : {
777 3137 : auto* op = state.write_op;
778 : // Claim the op by exchanging to unregistered. Both registering and
779 : // registered states mean the op is ours to complete.
780 3137 : auto prev = op->registered.exchange(
781 : select_registration_state::unregistered,
782 : std::memory_order_acq_rel);
783 3137 : if (prev != select_registration_state::unregistered)
784 : {
785 3137 : state.write_op = nullptr;
786 :
787 3137 : if (has_error)
788 : {
789 MIS 0 : int errn = 0;
790 0 : socklen_t len = sizeof(errn);
791 0 : if (::getsockopt(
792 0 : fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
793 0 : errn = errno;
794 0 : if (errn == 0)
795 0 : errn = EIO;
796 0 : op->complete(errn, 0);
797 : }
798 : else
799 : {
800 HIT 3137 : op->perform_io();
801 : }
802 :
803 3137 : completed_ops_.push(op);
804 3137 : ++completions_queued;
805 : }
806 : }
807 :
808 : // Clean up empty entries
809 12525 : if (!state.read_op && !state.write_op)
810 6392 : registered_fds_.erase(it);
811 : }
812 9975 : }
813 :
814 81962 : if (completions_queued > 0)
815 : {
816 3258 : if (completions_queued == 1)
817 124 : wakeup_event_.notify_one();
818 : else
819 3134 : wakeup_event_.notify_all();
820 : }
821 81962 : }
822 :
823 : inline std::size_t
824 171586 : select_scheduler::do_one(long timeout_us)
825 : {
826 171586 : std::unique_lock lock(mutex_);
827 :
828 : for (;;)
829 : {
830 253548 : if (stopped_.load(std::memory_order_acquire))
831 100 : return 0;
832 :
833 253448 : scheduler_op* op = completed_ops_.pop();
834 :
835 253448 : if (op == &task_op_)
836 : {
837 81962 : bool more_handlers = !completed_ops_.empty();
838 :
839 81962 : if (!more_handlers)
840 : {
841 19236 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
842 : {
843 MIS 0 : completed_ops_.push(&task_op_);
844 0 : return 0;
845 : }
846 HIT 9618 : if (timeout_us == 0)
847 : {
848 MIS 0 : completed_ops_.push(&task_op_);
849 0 : return 0;
850 : }
851 : }
852 :
853 HIT 81962 : reactor_interrupted_ = more_handlers || timeout_us == 0;
854 81962 : reactor_running_ = true;
855 :
856 81962 : if (more_handlers && idle_thread_count_ > 0)
857 MIS 0 : wakeup_event_.notify_one();
858 :
859 HIT 81962 : run_reactor(lock);
860 :
861 81962 : reactor_running_ = false;
862 81962 : completed_ops_.push(&task_op_);
863 81962 : continue;
864 81962 : }
865 :
866 171486 : if (op != nullptr)
867 : {
868 171486 : lock.unlock();
869 171486 : select::work_guard g{this};
870 171486 : (*op)();
871 171486 : return 1;
872 171486 : }
873 :
874 MIS 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
875 0 : return 0;
876 :
877 0 : if (timeout_us == 0)
878 0 : return 0;
879 :
880 0 : ++idle_thread_count_;
881 0 : if (timeout_us < 0)
882 0 : wakeup_event_.wait(lock);
883 : else
884 0 : wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
885 0 : --idle_thread_count_;
886 HIT 81962 : }
887 171586 : }
888 :
889 : } // namespace boost::corosio::detail
890 :
891 : #endif // BOOST_COROSIO_HAS_SELECT
892 :
893 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
|