Anklang-0.3.0.dev712+gdc4e642f anklang-0.3.0.dev712+gdc4e642f
ASE — Anklang Sound Engine (C++)

« « « Anklang Documentation
Loading...
Searching...
No Matches
loop.cc
Go to the documentation of this file.
1 // This Source Code Form is licensed MPL-2.0: http://mozilla.org/MPL/2.0
2#include "loop.hh"
3#include "atomics.hh"
4#include "utils.hh"
5#include "platform.hh"
6#include "internal.hh"
7#include "strings.hh"
8#include <sys/poll.h>
9#include <sys/wait.h>
10#include <errno.h>
11#include <atomic>
12#include <unistd.h>
13#include <fcntl.h>
14#include <sys/time.h>
15#include <unistd.h>
16#include <signal.h>
17#include <algorithm>
18#include <list>
19
20namespace Ase {
21
22enum {
23 WAITING = 0,
24 PREPARED,
25 NEEDS_DISPATCH,
26};
27
28static constexpr int16_t WILLQUIT = 0x8000; // quit() called before or during run()
29static constexpr int16_t HASQUIT = 0x4000; // last run() or iterate() was quit
30static constexpr int16_t UNDEFINED_PRIORITY = -32768;
31static constexpr auto PRIORITY_CEILING = LoopPriority::SYSALLOC;
32
33// == PollFD invariants ==
34static_assert (PollFD::IN == POLLIN);
35static_assert (PollFD::PRI == POLLPRI);
36static_assert (PollFD::OUT == POLLOUT);
37static_assert (PollFD::RDNORM == POLLRDNORM);
38static_assert (PollFD::RDBAND == POLLRDBAND);
39static_assert (PollFD::WRNORM == POLLWRNORM);
40static_assert (PollFD::WRBAND == POLLWRBAND);
41static_assert (PollFD::ERR == POLLERR);
42static_assert (PollFD::HUP == POLLHUP);
43static_assert (PollFD::NVAL == POLLNVAL);
44static_assert (sizeof (PollFD) == sizeof (struct pollfd));
45static_assert (offsetof (PollFD, fd) == offsetof (struct pollfd, fd));
46static_assert (sizeof (((PollFD*) 0)->fd) == sizeof (((struct pollfd*) 0)->fd));
47static_assert (offsetof (PollFD, events) == offsetof (struct pollfd, events));
48static_assert (sizeof (((PollFD*) 0)->events) == sizeof (((struct pollfd*) 0)->events));
49static_assert (offsetof (PollFD, revents) == offsetof (struct pollfd, revents));
50static_assert (sizeof (((PollFD*) 0)->revents) == sizeof (((struct pollfd*) 0)->revents));
51
52// === Stupid ID allocator ===
53static std::atomic<uint64_t> global_id_counter { 101000000 };
54static LoopID
55alloc_id ()
56{
57 const uint64_t id = global_id_counter.fetch_add (+1, std::memory_order_relaxed);
58 assert_return (id != 0, {});
59 return LoopID (id);
60}
61
62static void
63release_id (LoopID id)
64{
65 assert_return (id != LoopID::INVALID);
66}
67
68// === Loop ==
69Loop::Loop() {}
70Loop::~Loop() {}
71
72// === LoopImpl ===
74class LoopImpl : public Loop
75{
77 std::vector<PollFD> cached_pollfd_vector_;
78 std::vector<LoopSourceP*> cached_poll_candidates_;
79public:
82 SourceList sources_;
83 std::vector<LoopSourceP> poll_sources_;
84 AtomicStack<LoopSourceP> pending_add_stack_;
85 AtomicStack<LoopID> pending_cancel_stack_;
86 std::atomic<int16_t> quit_code_ = 0;
87 uint running_ = 0;
88 uint rr_index_ = 0;
89 int16 dispatch_priority_ = 0;
90 EventFd eventfd_;
91 GlibGMainContext *gcontext_;
92 bool finishable_L ();
93 int run () override;
94 bool running () override;
95 void wakeup () override;
96 void quit (int quit_code) override;
97 bool finishable () override;
98 bool iterate (bool may_block) override;
99 void iterate_pending () override;
100 bool pending () override;
101 bool set_g_main_context (GlibGMainContext *glib_main_context) override;
102 bool has_quit () override;
103 bool iterate_loops_Lm (LoopState&, bool b, bool d);
104 void destroy_loop () override;
105 LoopSourceP& find_first_L ();
106 LoopSourceP& find_source_L (LoopID id);
107 bool has_primary_L (void);
108 void remove_source_Lm (LoopSourceP source);
109 void kill_sources_Lm (void);
110 void unpoll_sources_U ();
111 void collect_sources_Lm (LoopState&);
112 bool prepare_sources_Lm (LoopState&, std::vector<PollFD>&);
113 bool check_sources_Lm (LoopState&, const std::vector<PollFD>&);
114 void dispatch_source_Lm (LoopState&);
115 void process_atomic_stacks ();
116 LoopID add_source (LoopSourceP loop_source, LoopPriority priority) override;
117 void cancel (LoopID id) override;
118 void cancel (LoopID *idp) override;
119 bool has_primary () override;
120 LoopID exec_sigchld (int64_t pid, const SigchldSlot &vfunc, LoopPriority priority) override;
121 bool exec_once (uint delay_ms, LoopID *once_id, const VoidSlot &vfunc, LoopPriority priority) override;
122 explicit LoopImpl ();
123 virtual ~LoopImpl ();
124};
125
126// === LoopImpl ===
127inline LoopSourceP&
128LoopImpl::find_first_L()
129{
130 static LoopSourceP null_source;
131 return sources_.empty() ? null_source : sources_[0];
132}
133
134inline LoopSourceP&
135LoopImpl::find_source_L (LoopID id)
136{
137 for (SourceList::iterator lit = sources_.begin(); lit != sources_.end(); lit++)
138 if (id == (*lit)->id_)
139 return *lit;
140 static LoopSourceP null_source;
141 return null_source;
142}
143
144bool
145LoopImpl::has_primary_L()
146{
147 for (SourceList::iterator lit = sources_.begin(); lit != sources_.end(); lit++)
148 if ((*lit)->primary())
149 return true;
150 return false;
151}
152
153void
154LoopImpl::process_atomic_stacks ()
155{
156 LoopSourceP source;
157 while (pending_add_stack_.pop (source))
158 sources_.push_back (source);
159 LoopID cancel_id;
160 while (pending_cancel_stack_.pop (cancel_id))
161 {
162 LoopSourceP &src = find_source_L (cancel_id);
163 if (src)
164 remove_source_Lm (src);
165 }
166}
167
168bool
170{
171 return has_primary_L();
172}
173
184LoopID
186{
187 static_assert (UNDEFINED_PRIORITY < 1, "");
188 assert_return (static_cast<uint16_t> (priority) >= 1 && priority <= PRIORITY_CEILING, LoopID::INVALID);
189 assert_return (source != NULL, LoopID::INVALID);
190 assert_return (source->loop_ == NULL, LoopID::INVALID);
191 source->loop_ = this;
192 const auto source_id = source->id_ = alloc_id();
193 source->loop_state_ = WAITING;
194 source->priority_ = static_cast<uint16_t> (priority);
195 if (pending_add_stack_.push (source))
196 wakeup();
197 return source_id;
198}
199
200void
201LoopImpl::remove_source_Lm (LoopSourceP source)
202{
203 assert_return (source->loop_ == this);
204 source->loop_ = NULL;
205 source->loop_state_ = WAITING;
206 auto pos = find (sources_.begin(), sources_.end(), source);
207 assert_return (pos != sources_.end());
208 sources_.erase (pos);
209 release_id (source->id_);
210 source->id_ = LoopID::INVALID;
211 source->destroy();
212}
213
221void
223{
224 return_unless (id != LoopID::INVALID);
225 if (pending_cancel_stack_.push (id))
226 wakeup();
227}
228
236void
237LoopImpl::cancel (LoopID *idp)
238{
239 if (idp) {
240 if (*idp != LoopID::INVALID)
241 cancel (*idp);
242 *idp = LoopID::INVALID;
243 }
244}
245
246bool
247LoopImpl::exec_once (uint delay_ms, LoopID *once_id, const VoidSlot &vfunc, LoopPriority priority)
248{
249 assert_return (once_id != nullptr, false);
250 assert_return (static_cast<uint16_t> (priority) >= 1 && priority <= PRIORITY_CEILING, false);
251 if (!vfunc) {
252 cancel (once_id);
253 return false;
254 }
255 auto once_handler = [vfunc,once_id]() { *once_id = LoopID::INVALID; vfunc(); };
256 LoopSourceP source = TimedSource::create (once_handler, delay_ms, 0);
257 source->loop_ = this;
258 source->id_ = alloc_id();
259 source->loop_state_ = WAITING;
260 source->priority_ = static_cast<uint16_t> (priority);
261 LoopID warn_id = LoopID::INVALID;
262 {
263 if (*once_id != LoopID::INVALID) {
264 LoopSourceP &source = find_source_L (*once_id);
265 if (source)
266 remove_source_Lm (source);
267 else
268 warn_id = *once_id;
269 }
270 sources_.push_back (source);
271 *once_id = source->id_;
272 }
273 if (warn_id != LoopID::INVALID)
274 warning ("%s: failed to remove loop source: %lu", __func__, static_cast<uint64_t> (warn_id));
275 wakeup();
276 return true;
277}
278
279LoopID
280LoopImpl::exec_sigchld (int64_t pid, const SigchldSlot &slot, LoopPriority priority)
281{
282 return add_source (SigchldSource::create (pid, slot), priority);
283}
284
285void
286LoopImpl::kill_sources_Lm()
287{
288 for (;;)
289 {
290 LoopSourceP &source = find_first_L();
291 if (source == NULL)
292 break;
293 remove_source_Lm (source);
294 }
295 unpoll_sources_U(); // unlocked
296}
297
305LoopP
307{
308 static thread_local LoopP thread_loop = LoopImpl::make_shared();
309 return thread_loop;
310}
311
315{
316 const std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
317 auto promise = this->make_promise<uint64_t> ([&] (auto resolve) // std::function<void(uint64_t)>
318 {
319 this->add ([resolve, start]()
320 {
321 const std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now();
322 const uint64_t elapsed = std::chrono::duration_cast<std::chrono::milliseconds> (end - start).count();
323 resolve (elapsed);
324 return false;
325 }, ms);
326 });
327 return promise;
328}
329
330// === LoopImpl ===
331LoopImpl::LoopImpl() :
332 gcontext_ (nullptr)
333{
334 cached_pollfd_vector_.reserve (1);
335 cached_poll_candidates_.reserve (1);
336 const int err = eventfd_.open();
337 if (err < 0)
338 fatal_error ("LoopImpl: failed to create wakeup pipe: %s", strerror (-err));
339 // eventfd_ must work upfront for wakeup() to work
340}
341
342LoopImpl::~LoopImpl()
343{
344 destroy_loop();
345 assert_return (sources_.empty() == true);
346}
347
357void
359{
360 set_g_main_context (NULL);
361 // guard main_loop_ pointer across callbacks
362 LoopP main_loop_guard = shared_ptr_cast<Loop*> (this);
363 process_atomic_stacks();
364 kill_sources_Lm();
365}
366
371void
373{
374 if (eventfd_.opened())
375 eventfd_.wakeup();
376}
377
384int
386{
387 LoopP main_loop_guard = shared_ptr_cast<Loop> (this);
388 LoopState state;
389 quit_code_ &= ~HASQUIT; // reset old quit code, from former run()
390 running_ += 1;
391 while (ISLIKELY (!(WILLQUIT & quit_code_)))
392 iterate_loops_Lm (state, true, true);
393 running_ -= 1;
394 if (quit_code_ & WILLQUIT) // apply quit code from this run()
395 quit_code_ = HASQUIT | (quit_code_ & ~WILLQUIT);
396 return quit_code_;
397}
398
399bool
401{
402 return running_ > 0;
403}
404
405bool
407{
408 return HASQUIT & quit_code_; // last run() was quit
409}
410
419void
420LoopImpl::quit (int quit_code)
421{
422 if (!(WILLQUIT & quit_code_)) {
423 quit_code_ = quit_code | WILLQUIT;
424 wakeup ();
425 }
426}
427
428bool
429LoopImpl::finishable_L()
430{
431 // finishable if no primary sources remain
432 return !has_primary_L();
433}
434
435bool
437{
438 return finishable_L();
439}
440
451bool
452LoopImpl::iterate (bool may_block)
453{
454 LoopP main_loop_guard = shared_ptr_cast<Loop> (this);
455 LoopState state;
456 running_ += 1;
457 const bool sources_pending = iterate_loops_Lm (state, may_block, true);
458 running_ -= 1;
459 return sources_pending;
460}
461
468void
470{
471 LoopP main_loop_guard = shared_ptr_cast<Loop> (this);
472 LoopState state;
473 const uint16_t saved_quit_code_ = quit_code_; // save former quit code
474 running_ += 1;
475 while (ISLIKELY (!(WILLQUIT & quit_code_))) // abort on quitting twice
476 if (!iterate_loops_Lm (state, false, true))
477 break;
478 running_ -= 1;
479 if (saved_quit_code_ & HASQUIT) // preserve former quit code
480 quit_code_ = saved_quit_code_ & ~WILLQUIT;
481 else if (quit_code_ & WILLQUIT) // or apply recent quit code
482 quit_code_ = HASQUIT | (quit_code_ & ~WILLQUIT);
483}
484
485bool
487{
488 LoopState state;
489 LoopP main_loop_guard = shared_ptr_cast<Loop> (this);
490 running_ += 1;
491 const bool more = iterate_loops_Lm (state, false, false);
492 running_ -= 1;
493 return more;
494}
495
496bool
497LoopImpl::set_g_main_context (GlibGMainContext *glib_main_context)
498{
499#ifdef __G_LIB_H__
500 if (glib_main_context)
501 {
502 if (gcontext_)
503 return false;
504 if (!g_main_context_acquire (glib_main_context))
505 return false;
506 gcontext_ = g_main_context_ref (glib_main_context);
507 }
508 else if (gcontext_)
509 {
510 glib_main_context = gcontext_;
511 gcontext_ = NULL;
512 g_main_context_release (glib_main_context);
513 g_main_context_unref (glib_main_context);
514 }
515 return true;
516#else
517 return false;
518#endif
519}
520
521#ifdef __G_LIB_H__
522static GPollFD*
523mk_gpollfd (PollFD *pfd)
524{
525 GPollFD *gpfd = (GPollFD*) pfd;
526 static_assert (sizeof (GPollFD) == sizeof (PollFD), "");
527 static_assert (sizeof (gpfd->fd) == sizeof (pfd->fd), "");
528 static_assert (sizeof (gpfd->events) == sizeof (pfd->events), "");
529 static_assert (sizeof (gpfd->revents) == sizeof (pfd->revents), "");
530 static_assert (offsetof (GPollFD, fd) == offsetof (PollFD, fd), "");
531 static_assert (offsetof (GPollFD, events) == offsetof (PollFD, events), "");
532 static_assert (offsetof (GPollFD, revents) == offsetof (PollFD, revents), "");
533 static_assert (PollFD::IN == int (G_IO_IN), "");
534 static_assert (PollFD::PRI == int (G_IO_PRI), "");
535 static_assert (PollFD::OUT == int (G_IO_OUT), "");
536 // static_assert (PollFD::RDNORM == int (G_IO_RDNORM), "");
537 // static_assert (PollFD::RDBAND == int (G_IO_RDBAND), "");
538 // static_assert (PollFD::WRNORM == int (G_IO_WRNORM), "");
539 // static_assert (PollFD::WRBAND == int (G_IO_WRBAND), "");
540 static_assert (PollFD::ERR == int (G_IO_ERR), "");
541 static_assert (PollFD::HUP == int (G_IO_HUP), "");
542 static_assert (PollFD::NVAL == int (G_IO_NVAL), "");
543 return gpfd;
544}
545#endif
546
547void
548LoopImpl::unpoll_sources_U() // must be unlocked!
549{
550 // clear poll sources
551 poll_sources_.resize (0);
552}
553
554void
555LoopImpl::collect_sources_Lm (LoopState &state)
556{
557 // enforce clean slate
558 if (UNLIKELY (!poll_sources_.empty()))
559 {
560 unpoll_sources_U(); // unlocked
561 assert_return (poll_sources_.empty());
562 }
563 if (UNLIKELY (!state.seen_primary))
564 state.seen_primary = true;
565 // cache vector, otherwise malloc shows up in the profiles
566 std::vector<LoopSourceP*> &poll_candidates = cached_poll_candidates_;
567 poll_candidates.resize (0);
568 // determine dispatch priority & collect sources for preparing
569 dispatch_priority_ = UNDEFINED_PRIORITY; // initially, consider sources at *all* priorities
570 for (SourceList::iterator lit = sources_.begin(); lit != sources_.end(); lit++)
571 {
572 LoopSource &source = **lit;
573 if (UNLIKELY (!state.seen_primary && source.primary_))
574 state.seen_primary = true;
575 if (source.loop_ != this || // ignore destroyed and
576 (source.dispatching_ && !source.may_recurse_)) // avoid unallowed recursion
577 continue;
578 if (source.priority_ > dispatch_priority_ && // ignore lower priority sources
579 source.loop_state_ == NEEDS_DISPATCH) // if NEEDS_DISPATCH sources remain
580 dispatch_priority_ = source.priority_; // so raise dispatch_priority_
581 if (source.priority_ > dispatch_priority_ || // add source if it is an eligible
582 (source.priority_ == dispatch_priority_ && // candidate, baring future raises
583 source.loop_state_ == NEEDS_DISPATCH)) // of dispatch_priority_...
584 poll_candidates.push_back (&*lit); // collect only, adding ref() later
585 }
586 // ensure ref counts on all prepare sources
587 assert_return (poll_sources_.empty());
588 for (size_t i = 0; i < poll_candidates.size(); i++)
589 if ((*poll_candidates[i])->priority_ > dispatch_priority_ || // throw away lower priority sources
590 ((*poll_candidates[i])->priority_ == dispatch_priority_ &&
591 (*poll_candidates[i])->loop_state_ == NEEDS_DISPATCH)) // re-poll sources that need dispatching
592 poll_sources_.push_back (*poll_candidates[i]);
593 /* here, poll_sources_ contains either all sources, or only the highest priority
594 * NEEDS_DISPATCH sources plus higher priority sources. giving precedence to the
595 * remaining NEEDS_DISPATCH sources ensures round-robin processing.
596 */
597}
598
599bool
600LoopImpl::prepare_sources_Lm (LoopState &state, std::vector<PollFD> &pfda)
601{
602 // prepare sources, up to NEEDS_DISPATCH priority
603 for (auto lit = poll_sources_.begin(); lit != poll_sources_.end(); lit++)
604 {
605 LoopSource &source = **lit;
606 if (source.loop_ != this) // test undestroyed
607 continue;
608 int64 timeout = -1;
609 const bool need_dispatch = source.prepare (state, &timeout);
610 if (source.loop_ != this)
611 continue; // ignore newly destroyed sources
612 if (need_dispatch)
613 {
614 dispatch_priority_ = std::max (dispatch_priority_, source.priority_); // upgrade dispatch priority
615 source.loop_state_ = NEEDS_DISPATCH;
616 continue;
617 }
618 source.loop_state_ = PREPARED;
619 if (timeout >= 0)
620 state.timeout_usecs = std::min (state.timeout_usecs, timeout);
621 uint npfds = source.n_pfds();
622 for (uint i = 0; i < npfds; i++)
623 if (source.pfds_[i].pfd->fd >= 0)
624 {
625 uint idx = pfda.size();
626 source.pfds_[i].idx = idx;
627 pfda.push_back (*source.pfds_[i].pfd);
628 pfda[idx].revents = 0;
629 }
630 else
631 source.pfds_[i].idx = 4294967295U; // UINT_MAX
632 }
633 return dispatch_priority_ > UNDEFINED_PRIORITY;
634}
635
636bool
637LoopImpl::check_sources_Lm (LoopState &state, const std::vector<PollFD> &pfda)
638{
639 // check polled sources
640 for (auto lit = poll_sources_.begin(); lit != poll_sources_.end(); lit++)
641 {
642 LoopSource &source = **lit;
643 if (source.loop_ != this && // test undestroyed
644 source.loop_state_ != PREPARED)
645 continue; // only check prepared sources
646 uint npfds = source.n_pfds();
647 for (uint i = 0; i < npfds; i++)
648 {
649 uint idx = source.pfds_[i].idx;
650 if (idx < pfda.size() &&
651 source.pfds_[i].pfd->fd == pfda[idx].fd)
652 source.pfds_[i].pfd->revents = pfda[idx].revents;
653 else
654 source.pfds_[i].idx = 4294967295U; // UINT_MAX
655 }
656 bool need_dispatch = source.check (state);
657 if (source.loop_ != this)
658 continue; // ignore newly destroyed sources
659 if (need_dispatch)
660 {
661 dispatch_priority_ = std::max (dispatch_priority_, source.priority_); // upgrade dispatch priority
662 source.loop_state_ = NEEDS_DISPATCH;
663 }
664 else
665 source.loop_state_ = WAITING;
666 }
667 return dispatch_priority_ > UNDEFINED_PRIORITY;
668}
669
670void
671LoopImpl::dispatch_source_Lm (LoopState &state)
672{
673 // find a source to dispatch at dispatch_priority_
674 LoopSourceP dispatch_source = NULL; // shared_ptr to keep alive even if everything else is destroyed
675 for (auto lit = poll_sources_.begin(); lit != poll_sources_.end(); lit++)
676 {
677 LoopSourceP &source = *lit;
678 if (source->loop_ == this && // test undestroyed
679 source->priority_ == dispatch_priority_ && // only dispatch at dispatch priority
680 source->loop_state_ == NEEDS_DISPATCH)
681 {
682 dispatch_source = source;
683 break;
684 }
685 }
686 dispatch_priority_ = UNDEFINED_PRIORITY;
687 // dispatch single source
688 if (dispatch_source)
689 {
690 dispatch_source->loop_state_ = WAITING;
691 const bool old_was_dispatching = dispatch_source->was_dispatching_;
692 dispatch_source->was_dispatching_ = dispatch_source->dispatching_;
693 dispatch_source->dispatching_ = true;
694 const bool keep_alive = dispatch_source->dispatch (state);
695 dispatch_source->dispatching_ = dispatch_source->was_dispatching_;
696 dispatch_source->was_dispatching_ = old_was_dispatching;
697 if (dispatch_source->loop_ == this && !keep_alive)
698 remove_source_Lm (dispatch_source);
699 }
700}
701
702bool
703LoopImpl::iterate_loops_Lm (LoopState &state, bool may_block, bool may_dispatch)
704{
705 assert_return (state.phase == state.NONE, false);
706 process_atomic_stacks ();
707 std::vector<PollFD> &pfda = cached_pollfd_vector_;
708 pfda.resize (0); // cache array between iterations, to reduce malloc overhead
709 // allow poll wakeups
710 const PollFD wakeup = { eventfd_.inputfd(), PollFD::IN, 0 };
711 const uint wakeup_idx = 0; // wakeup_idx = pfda.size();
712 pfda.push_back (wakeup);
713 // collect
714 state.phase = state.COLLECT;
715 state.seen_primary = false;
716 collect_sources_Lm (state);
717 // prepare
718 bool any_dispatchable = false;
719 state.phase = state.PREPARE;
720 state.timeout_usecs = INT64_MAX;
721 state.current_time_usecs = timestamp_realtime();
722 bool adispatchable = false;
723 bool gdispatchable = false;
724 adispatchable = prepare_sources_Lm (state, pfda);
725 any_dispatchable |= adispatchable;
726 // prepare GLib
727 ASE_UNUSED const int gfirstfd = pfda.size();
728 ASE_UNUSED int gpriority = INT32_MIN;
729 if (ASE_UNLIKELY (gcontext_))
730 {
731#ifdef __G_LIB_H__
732 gdispatchable = g_main_context_prepare (gcontext_, &gpriority) != 0;
733 any_dispatchable |= gdispatchable;
734 int gtimeout = INT32_MAX;
735 int gnfds = g_main_context_query (gcontext_, gpriority, &gtimeout, mk_gpollfd (&pfda[gfirstfd]), pfda.size() - gfirstfd);
736 while (gnfds >= 0 && size_t (gnfds) != pfda.size() - gfirstfd)
737 {
738 pfda.resize (gfirstfd + gnfds);
739 gtimeout = INT32_MAX;
740 gnfds = g_main_context_query (gcontext_, gpriority, &gtimeout, mk_gpollfd (&pfda[gfirstfd]), pfda.size() - gfirstfd);
741 }
742 if (gtimeout >= 0)
743 state.timeout_usecs = MIN (state.timeout_usecs, gtimeout * int64 (1000));
744#endif
745 }
746 // poll file descriptors
747 int64 timeout_msecs = state.timeout_usecs / 1000;
748 if (state.timeout_usecs > 0 && timeout_msecs <= 0)
749 timeout_msecs = 1;
750 if (!may_block || any_dispatchable)
751 timeout_msecs = 0;
752 state.timeout_usecs = 0;
753 int presult;
754 do
755 presult = poll ((struct pollfd*) &pfda[0], pfda.size(), std::min (timeout_msecs, int64 (2147483647))); // INT_MAX
756 while (presult < 0 && errno == EAGAIN); // EINTR may indicate a signal
757 if (presult < 0 && errno != EINTR)
758 warning ("LoopImpl: poll() failed: %s", strerror());
759 else if (pfda[wakeup_idx].revents)
760 eventfd_.flush(); // restart queueing wakeups, possibly triggered by dispatching
761 // check
762 state.phase = state.CHECK;
763 state.current_time_usecs = timestamp_realtime();
764 int16 max_dispatch_priority = -32768;
765 adispatchable |= check_sources_Lm (state, pfda);
766 if (adispatchable)
767 {
768 any_dispatchable = true;
769 max_dispatch_priority = std::max (max_dispatch_priority, dispatch_priority_);
770 }
771 // check GLib
772 if (ASE_UNLIKELY (gcontext_))
773 {
774#ifdef __G_LIB_H__
775 gdispatchable = g_main_context_check (gcontext_, gpriority, mk_gpollfd (&pfda[gfirstfd]), pfda.size() - gfirstfd);
776 any_dispatchable |= gdispatchable;
777#endif
778 }
779 // dispatch
780 if (may_dispatch && any_dispatchable)
781 {
782 state.phase = state.DISPATCH;
783 if (gdispatchable && (!adispatchable || (rr_index_++ & 1)))
784 {
785#ifdef __G_LIB_H__
786 g_main_context_dispatch (gcontext_);
787#endif
788 }
789 else if (adispatchable)
790 dispatch_source_Lm (state); // passes on shared_ptr to keep alive while locked
791 }
792 // cleanup
793 state.phase = state.NONE;
794 unpoll_sources_U(); // unlocked
795 return any_dispatchable; // need to dispatch or recheck
796}
797
798// === LoopSource ===
799LoopSource::LoopSource () :
800 loop_ (NULL),
801 pfds_ (NULL),
802 id_ (LoopID::INVALID),
803 priority_ (UNDEFINED_PRIORITY),
804 loop_state_ (0),
805 may_recurse_ (0),
806 dispatching_ (0),
807 was_dispatching_ (0),
808 primary_ (true)
809{}
810
811uint
812LoopSource::n_pfds ()
813{
814 uint i = 0;
815 if (pfds_)
816 while (pfds_[i].pfd)
817 i++;
818 return i;
819}
820
821void
822LoopSource::may_recurse (bool may_recurse)
823{
824 may_recurse_ = may_recurse;
825}
826
827bool
829{
830 return may_recurse_;
831}
832
833bool
835{
836 return primary_;
837}
838
839void
840LoopSource::primary (bool is_primary)
841{
842 primary_ = is_primary;
843}
844
845bool
847{
848 return dispatching_ && was_dispatching_;
849}
850
851void
853{
854 const uint idx = n_pfds();
855 uint npfds = idx + 1;
856 pfds_ = (typeof (pfds_)) realloc (pfds_, sizeof (pfds_[0]) * (npfds + 1));
857 if (!pfds_)
858 fatal_error ("LoopSource: out of memory");
859 pfds_[npfds].idx = 4294967295U; // UINT_MAX
860 pfds_[npfds].pfd = NULL;
861 pfds_[idx].idx = 4294967295U; // UINT_MAX
862 pfds_[idx].pfd = pfd;
863}
864
865void
867{
868 uint idx, npfds = n_pfds();
869 for (idx = 0; idx < npfds; idx++)
870 if (pfds_[idx].pfd == pfd)
871 break;
872 if (idx < npfds)
873 {
874 pfds_[idx].idx = 4294967295U; // UINT_MAX
875 pfds_[idx].pfd = pfds_[npfds - 1].pfd;
876 pfds_[idx].idx = pfds_[npfds - 1].idx;
877 pfds_[npfds - 1].idx = 4294967295U; // UINT_MAX
878 pfds_[npfds - 1].pfd = NULL;
879 }
880 else
881 warning ("LoopSource: unremovable PollFD: %p (fd=%d)", pfd, pfd->fd);
882}
883
884void
885LoopSource::destroy ()
886{}
887
888void
890{
891 if (loop_)
892 loop_->cancel (source_id());
893}
894
895LoopSource::~LoopSource ()
896{
897 assert_return (loop_ == NULL);
898 if (pfds_)
899 free (pfds_);
900}
901
902// == DispatcherSource ==
903DispatcherSource::DispatcherSource (const DispatcherSlot &slot) :
904 slot_ (slot)
905{}
906
907DispatcherSource::~DispatcherSource ()
908{
909 slot_ = NULL;
910}
911
912bool
913DispatcherSource::prepare (const LoopState &state, int64 *timeout_usecs_p)
914{
915 return slot_ (state);
916}
917
918bool
920{
921 return slot_ (state);
922}
923
924bool
926{
927 return slot_ (state);
928}
929
930void
931DispatcherSource::destroy()
932{
933 LoopState state;
934 state.phase = state.DESTROY;
935 slot_ (state);
936}
937
938// == USignalSource ==
939USignalSource::USignalSource (int8 signum, const USignalSlot &slot) :
940 slot_ (slot), signum_ (signum)
941{
942 const uint s = 128 + signum_;
943 index_ = s / 32;
944 shift_ = s % 32;
945}
946
947USignalSource::~USignalSource ()
948{
949 slot_ = NULL;
950}
951
952static std::atomic<uint32> usignals_notified[8] = { 0, 0, 0, 0, 0, 0, 0, 0 };
953
955void
957{
958 const uint s = 128 + signum;
959 const uint index = s / 32;
960 const uint shift = s % 32;
961 usignals_notified[index] |= 1 << shift;
962}
963
964bool
965USignalSource::prepare (const LoopState &state, int64 *timeout_usecs_p)
966{
967 return usignals_notified[index_] & (1 << shift_);
968}
969
970bool
972{
973 return usignals_notified[index_] & (1 << shift_);
974}
975
976bool
978{
979 usignals_notified[index_] &= ~(1 << shift_);
980 return slot_ (signum_);
981}
982
983void
984USignalSource::destroy()
985{}
986
988write_uint (uint32_t i)
989{
991 char *c = &a.back();
992 ASE_ASSERT (c>=&a[0] && c<&a[a.size()]);
993 *c-- = 0;
994 *c = '0' + (i % 10);
995 i /= 10;
996 while (i != 0) {
997 *(--c) = '0' + (i % 10);
998 i /= 10;
999 }
1000 if (c > &a[0])
1001 memmove (&a[0], c, &a.back() + 1 - c);
1002 ASE_ASSERT (c>=&a[0] && c<&a[a.size()]);
1003 return a;
1004}
1005
1006void
1007USignalSource::install_sigaction (int8 signum)
1008{
1009 struct sigaction action;
1010 action.sa_handler = [] (int signum) {
1011 if (0) { // DEBUG
1012 constexpr size_t N = 1024;
1013 char buf[N] = __FILE__ ":";
1014 strncat (buf, &write_uint (__LINE__)[0], N);
1015 strncat (buf, ": sa_handler: signal=", N);
1016 strncat (buf, &write_uint (signum)[0], N);
1017 strncat (buf, "\n", N);
1018 ::write (2, buf, strlen (buf));
1019 }
1020 USignalSource::raise (signum);
1021 };
1022 sigemptyset (&action.sa_mask);
1023 action.sa_flags = SA_NOMASK;
1024 sigaction (signum, &action, nullptr);
1025}
1026
1027// === SigchldSource ===
1028static std::atomic<uint64_t> sigchld_counter = 0;
1029
1030SigchldSource::SigchldSource (int64_t pid, const SigchldSlot &slot) :
1031 slot_ (slot), pid_ (pid)
1032{
1033 if (uint64_t unused = 0; sigchld_counter.compare_exchange_strong (unused, 1)) {
1034 struct sigaction action;
1035 action.sa_handler = [] (int signum) {
1036 sigchld_counter++;
1037 };
1038 sigemptyset (&action.sa_mask);
1039 action.sa_flags = SA_NOMASK;
1040 sigaction (SIGCHLD, &action, nullptr);
1041 }
1042}
1043
1044SigchldSource::~SigchldSource()
1045{}
1046
1047bool
1048SigchldSource::prepare (const LoopState &state, int64 *timeout_usecs_p)
1049{
1050 return pid_ && sigchld_counter_ != sigchld_counter;
1051}
1052
1053bool
1055{
1056 return pid_ && sigchld_counter_ != sigchld_counter;
1057}
1058
1059bool
1061{
1062 if (pid_) {
1063 sigchld_counter_ = sigchld_counter;
1064 // Use pid_ to avoid reaping unknown children
1065 int status = 0;
1066 const pid_t child_pid = wait4 (pid_, &status, WNOHANG, nullptr);
1067 if (child_pid > 0) {
1068 slot_ (pid_, status);
1069#if 0
1070 struct rusage ru {}; // wait4 (..., &ru);
1071 printf (" Child Pid %d user time: %ld.%06ld sec\n", child_pid, ru.ru_utime.tv_sec, ru.ru_utime.tv_usec);
1072 printf (" System time: %ld.%06ld sec\n", ru.ru_stime.tv_sec, ru.ru_stime.tv_usec);
1073 printf (" Max RSS: %ld KB\n", ru.ru_maxrss);
1074 printf (" Page faults: %ld\n", ru.ru_minflt);
1075 printf (" I/O operations: %ld\n", ru.ru_inblock + ru.ru_oublock);
1076 printf (" Voluntary context switches: %ld\n", ru.ru_nvcsw);
1077 printf (" Involuntary context switches: %ld\n", ru.ru_nivcsw);
1078 printf ("\n");
1079#endif
1080 if (WIFEXITED (status) || WIFSIGNALED (status)) {
1081 // Child exited
1082 pid_ = 0;
1083 return false; // destroy
1084 }
1085 }
1086 }
1087 return true; // keep_alive
1088}
1089
1090void
1091SigchldSource::destroy ()
1092{
1093 pid_ = 0;
1094}
1095
1096// == TimedSource ==
1097TimedSource::TimedSource (const VoidSlot &slot, uint initial_interval_msecs, uint repeat_interval_msecs) :
1098 expiration_usecs_ (timestamp_realtime() + 1000ULL * initial_interval_msecs),
1099 interval_msecs_ (repeat_interval_msecs), first_interval_ (true),
1100 oneshot_ (true), void_slot_ (slot)
1101{}
1102
1103TimedSource::TimedSource (const BoolSlot &slot, uint initial_interval_msecs, uint repeat_interval_msecs) :
1104 expiration_usecs_ (timestamp_realtime() + 1000ULL * initial_interval_msecs),
1105 interval_msecs_ (repeat_interval_msecs), first_interval_ (true),
1106 oneshot_ (false), bool_slot_ (slot)
1107{}
1108
1109bool
1110TimedSource::prepare (const LoopState &state, int64 *timeout_usecs_p)
1111{
1112 if (state.current_time_usecs >= expiration_usecs_)
1113 return true; /* timeout expired */
1114 if (!first_interval_)
1115 {
1116 uint64 interval = interval_msecs_ * 1000ULL;
1117 if (state.current_time_usecs + interval < expiration_usecs_)
1118 expiration_usecs_ = state.current_time_usecs + interval; /* clock warped back in time */
1119 }
1120 *timeout_usecs_p = std::min (expiration_usecs_ - state.current_time_usecs, uint64 (2147483647)); // INT_MAX
1121 return 0 == *timeout_usecs_p;
1122}
1123
1124bool
1126{
1127 return state.current_time_usecs >= expiration_usecs_;
1128}
1129
1130bool
1132{
1133 bool repeat = false;
1134 first_interval_ = false;
1135 if (oneshot_ && void_slot_ != NULL)
1136 void_slot_ ();
1137 else if (!oneshot_ && bool_slot_ != NULL)
1138 repeat = bool_slot_ ();
1139 if (repeat)
1140 expiration_usecs_ = timestamp_realtime() + 1000ULL * interval_msecs_;
1141 return repeat;
1142}
1143
1144TimedSource::~TimedSource ()
1145{
1146 if (oneshot_)
1147 void_slot_.~VoidSlot();
1148 else
1149 bool_slot_.~BoolSlot();
1150}
1151
1152// == PollFDSource ==
1165PollFDSource::PollFDSource (const BPfdSlot &slot, int fd, const String &mode) :
1166 pfd_ ((PollFD) { fd, 0, 0 }),
1167 never_close_ (strchr (mode.c_str(), 'C') != NULL),
1168 oneshot_ (false), bool_poll_slot_ (slot)
1169{
1170 construct (mode);
1171}
1172
1173PollFDSource::PollFDSource (const VPfdSlot &slot, int fd, const String &mode) :
1174 pfd_ ((PollFD) { fd, 0, 0 }),
1175 never_close_ (strchr (mode.c_str(), 'C') != NULL),
1176 oneshot_ (true), void_poll_slot_ (slot)
1177{
1178 construct (mode);
1179}
1180
1181void
1182PollFDSource::construct (const String &mode)
1183{
1184 add_poll (&pfd_);
1185 pfd_.events |= strchr (mode.c_str(), 'w') ? PollFD::OUT : 0;
1186 pfd_.events |= strchr (mode.c_str(), 'r') ? PollFD::IN : 0;
1187 pfd_.events |= strchr (mode.c_str(), 'p') ? PollFD::PRI : 0;
1188 pfd_.events |= strchr (mode.c_str(), 'd') ? PollFD::WRBAND : 0;
1189 if (pfd_.fd >= 0)
1190 {
1191 const long lflags = fcntl (pfd_.fd, F_GETFL, 0);
1192 long nflags = lflags;
1193 if (strchr (mode.c_str(), 'b'))
1194 nflags &= ~long (O_NONBLOCK);
1195 else if (strchr (mode.c_str(), 'B'))
1196 nflags |= O_NONBLOCK;
1197 if (nflags != lflags)
1198 {
1199 int err;
1200 do
1201 err = fcntl (pfd_.fd, F_SETFL, nflags);
1202 while (err < 0 && (errno == EINTR || errno == EAGAIN));
1203 }
1204 }
1205}
1206
1207bool
1208PollFDSource::prepare (const LoopState &state, int64 *timeout_usecs_p)
1209{
1210 pfd_.revents = 0;
1211 return pfd_.fd < 0;
1212}
1213
1214bool
1216{
1217 return pfd_.fd < 0 || pfd_.revents != 0;
1218}
1219
1220bool
1222{
1223 bool keep_alive = !oneshot_;
1224 if (oneshot_ && void_poll_slot_ != NULL)
1225 void_poll_slot_ (pfd_);
1226 else if (!oneshot_ && bool_poll_slot_ != NULL)
1227 keep_alive = bool_poll_slot_ (pfd_);
1228 /* close down */
1229 if (!keep_alive)
1230 {
1231 if (!never_close_ && pfd_.fd >= 0)
1232 close (pfd_.fd);
1233 pfd_.fd = -1;
1234 }
1235 return keep_alive;
1236}
1237
1238void
1239PollFDSource::destroy()
1240{
1241 /* close down */
1242 if (!never_close_ && pfd_.fd >= 0)
1243 close (pfd_.fd);
1244 pfd_.fd = -1;
1245}
1246
1247PollFDSource::~PollFDSource ()
1248{
1249 if (oneshot_)
1250 void_poll_slot_.~VPfdSlot();
1251 else
1252 bool_poll_slot_.~BPfdSlot();
1253}
1254
1255} // Ase
1256
1257// == Loop Description ==
T begin(T... args)
virtual bool dispatch(const LoopState &state)
Dispatch source, returns if it should be kept alive.
Definition loop.cc:925
virtual bool prepare(const LoopState &state, int64 *timeout_usecs_p)
Prepare the source for dispatching (true return) or polling (false).
Definition loop.cc:913
virtual bool check(const LoopState &state)
Check the source and its PollFD descriptors for dispatching (true return).
Definition loop.cc:919
Wakeup facility for IPC.
Definition utils.hh:52
bool opened()
Indicates whether eventfd has been opened.
Definition utils.cc:153
void flush()
Clear pending wakeups.
Definition utils.cc:187
int inputfd()
Returns the file descriptor for POLLIN.
Definition utils.cc:147
void wakeup()
Wakeup polling end.
Definition utils.cc:170
Loop implementation with internal state.
Definition loop.cc:75
void quit(int quit_code) override
Stop the event loop.
Definition loop.cc:420
bool has_quit() override
Check if quit() has been called.
Definition loop.cc:406
void wakeup() override
Wake up the event loop.
Definition loop.cc:372
void cancel(LoopID id) override
Cancel an event source.
Definition loop.cc:222
int run() override
Run the event loop.
Definition loop.cc:385
bool pending() override
Check if iterate() needs to be called for dispatching.
Definition loop.cc:486
LoopID exec_sigchld(int64_t pid, const SigchldSlot &vfunc, LoopPriority priority) override
Execute a signal callback for prepare, check, dispatch.
Definition loop.cc:280
bool set_g_main_context(GlibGMainContext *glib_main_context) override
Set context to integrate with a GLib GMainContext loop.
Definition loop.cc:497
void iterate_pending() override
Iterate pending sources.
Definition loop.cc:469
LoopID add_source(LoopSourceP loop_source, LoopPriority priority) override
Add an event source to the loop.
Definition loop.cc:185
bool exec_once(uint delay_ms, LoopID *once_id, const VoidSlot &vfunc, LoopPriority priority) override
Execute a callback once on SIGCHLD for pid.
Definition loop.cc:247
bool has_primary() override
Indicates whether loop contains primary sources.
Definition loop.cc:169
bool finishable() override
Indicates wether this loop has no primary sources left to process.
Definition loop.cc:436
bool running() override
Indicates if quit() has been called already.
Definition loop.cc:400
void destroy_loop() override
Remove all sources from a loop and prevent any further execution.
Definition loop.cc:358
bool iterate(bool may_block) override
Iterate the main loop once.
Definition loop.cc:452
bool recursion() const
Indicates wether the source is currently in recursion.
Definition loop.cc:846
void add_poll(PollFD *const pfd)
Add a PollFD descriptors for poll(2) and check().
Definition loop.cc:852
bool primary() const
Indicate whether this source is primary.
Definition loop.cc:834
bool may_recurse() const
Indicates if this source may recurse.
Definition loop.cc:828
void loop_remove()
Remove this source from its event loop if any.
Definition loop.cc:889
void remove_poll(PollFD *const pfd)
Remove a previously added PollFD.
Definition loop.cc:866
Loop object, polling for events and executing callbacks in accordance.
Definition loop.hh:77
std::shared_ptr< Promise< uint64_t > > delay(std::chrono::milliseconds ms)
Create a promise that resolves after ms milliseconds and returns the elapsed delay.
Definition loop.cc:314
virtual void cancel(LoopID id)=0
Cancel a source and remove it from the loop.
static LoopP current()
Return the thread-local singleton loop, created on first call.
Definition loop.cc:306
virtual bool dispatch(const LoopState &state)
Dispatch source, returns if it should be kept alive.
Definition loop.cc:1221
virtual bool prepare(const LoopState &state, int64 *timeout_usecs_p)
Prepare the source for dispatching (true return) or polling (false).
Definition loop.cc:1208
virtual bool check(const LoopState &state)
Check the source and its PollFD descriptors for dispatching (true return).
Definition loop.cc:1215
virtual bool dispatch(const LoopState &state)
Dispatch source, returns if it should be kept alive.
Definition loop.cc:1060
virtual bool prepare(const LoopState &state, int64 *timeout_usecs_p)
Prepare the source for dispatching (true return) or polling (false).
Definition loop.cc:1048
virtual bool check(const LoopState &state)
Check the source and its PollFD descriptors for dispatching (true return).
Definition loop.cc:1054
virtual bool check(const LoopState &state)
Check the source and its PollFD descriptors for dispatching (true return).
Definition loop.cc:1125
virtual bool prepare(const LoopState &state, int64 *timeout_usecs_p)
Prepare the source for dispatching (true return) or polling (false).
Definition loop.cc:1110
virtual bool dispatch(const LoopState &state)
Dispatch source, returns if it should be kept alive.
Definition loop.cc:1131
virtual bool dispatch(const LoopState &state)
Dispatch source, returns if it should be kept alive.
Definition loop.cc:977
virtual bool prepare(const LoopState &state, int64 *timeout_usecs_p)
Prepare the source for dispatching (true return) or polling (false).
Definition loop.cc:965
static void raise(int8 signum)
Flag a unix signal being raised, this function may be called from any thread at any time.
Definition loop.cc:956
virtual bool check(const LoopState &state)
Check the source and its PollFD descriptors for dispatching (true return).
Definition loop.cc:971
close
T compare_exchange_strong(T... args)
#define ASE_ASSERT(expr)
Issue an assertion warning if expr evaluates to false.
Definition cxxaux.hh:91
#define ASE_DEFINE_MAKE_SHARED(CLASS)
Define a member function static shared_ptr<CLASS> make_shared(ctorargs...);.
Definition cxxaux.hh:274
#define ASE_UNLIKELY(expr)
Compiler hint to optimize for expr evaluating to false.
Definition cxxaux.hh:46
#define ASE_CLASS_NON_COPYABLE(ClassName)
Delete copy ctor and assignment operator.
Definition cxxaux.hh:111
printf
T empty(T... args)
T end(T... args)
T erase(T... args)
fcntl
T fetch_add(T... args)
free
#define assert_return(expr,...)
Return from the current function if expr is unmet and issue an assertion warning.
Definition internal.hh:29
#define MIN(a, b)
Yield minimum of a and b.
Definition internal.hh:57
#define return_unless(cond,...)
Return silently if cond does not evaluate to true with return value ...
Definition internal.hh:73
#define UNLIKELY(cond)
Hint to the compiler to optimize for cond == FALSE.
Definition internal.hh:65
#define ISLIKELY(cond)
Hint to the compiler to optimize for cond == TRUE.
Definition internal.hh:63
typedef int
T max(T... args)
memmove
T min(T... args)
The Anklang C++ API namespace.
Definition api.hh:9
uint64_t uint64
A 64-bit unsigned integer, use PRI*64 in format strings.
Definition cxxaux.hh:25
int16_t int16
A 16-bit signed integer.
Definition cxxaux.hh:27
int8_t int8
An 8-bit signed integer.
Definition cxxaux.hh:26
int64_t int64
A 64-bit unsigned integer, use PRI*64 in format strings.
Definition cxxaux.hh:29
LoopPriority
Definition loop.hh:35
@ SYSALLOC
Internal maintenance, don't use.
std::string String
Convenience alias for std::string.
Definition cxxaux.hh:35
uint32_t uint
Provide 'uint' as convenience type.
Definition cxxaux.hh:18
uint64 timestamp_realtime()
Return the current time as uint64 in µseconds.
Definition platform.cc:579
poll
T push_back(T... args)
realloc
T resize(T... args)
sigaction
sigemptyset
typedef int16_t
strchr
strerror
strncat
Thread-safe, lock-free stack based on MpmcStack and an Allocator with allows_read_after_free.
Definition atomics.hh:179
bool pop(Value &value)
Pop value from top of the stack, returns if value was reassigned (true), otherwise the stack was empt...
Definition atomics.hh:198
bool push(Value &&value)
Add value to top of the stack, returns if the stack was empty (true).
Definition atomics.hh:184
uint64 current_time_usecs
Equals timestamp_realtime() as of prepare() and check().
Definition loop.hh:139
Mirrors struct pollfd for poll(3posix)
Definition loop.hh:14
@ RDNORM
reading data will not block
Definition loop.hh:23
@ HUP
file descriptor closed
Definition loop.hh:29
@ IN
RDNORM || RDBAND.
Definition loop.hh:20
@ WRNORM
writing data will not block
Definition loop.hh:25
@ PRI
urgent data available
Definition loop.hh:21
@ NVAL
invalid PollFD
Definition loop.hh:30
@ RDBAND
reading priority data will not block
Definition loop.hh:24
@ OUT
writing data will not block
Definition loop.hh:22
@ ERR
error condition
Definition loop.hh:28
@ WRBAND
writing priority data will not block
Definition loop.hh:26
typedef pid_t