tracktion-engine 3.0-10-g034fdde4aa5
Tracktion Engine — High level data model for audio applications

« « « Anklang Documentation
Loading...
Searching...
No Matches
tracktion_NodePlayerThreadPools.cpp
Go to the documentation of this file.
1 /*
2 ,--. ,--. ,--. ,--.
3 ,-' '-.,--.--.,--,--.,---.| |,-.,-' '-.`--' ,---. ,--,--, Copyright 2024
4 '-. .-'| .--' ,-. | .--'| /'-. .-',--.| .-. || \ Tracktion Software
5 | | | | \ '-' \ `--.| \ \ | | | |' '-' '| || | Corporation
6 `---' `--' `--`--'`---'`--'`--' `---' `--' `---' `--''--' www.tracktion.com
7
8 Tracktion Engine uses a GPL/commercial licence - see LICENCE.md for details.
9*/
10
11#include <thread>
12
13#ifdef _MSC_VER
14 #pragma warning (push)
15 #pragma warning (disable: 4127)
16#endif
17
18namespace tracktion { inline namespace graph
19{
20
21//==============================================================================
22//==============================================================================
23namespace
24{
25 constexpr int timeOutMilliseconds = -1;
26
27 inline void pause()
28 {
29 #if JUCE_INTEL
30 _mm_pause();
31 _mm_pause();
32 #else
33 __asm__ __volatile__ ("yield");
34 __asm__ __volatile__ ("yield");
35 #endif
36 }
37}
38
39
40//==============================================================================
41//==============================================================================
43{
45 : ThreadPool (p)
46 {
47 }
48
49 void createThreads (size_t numThreads, juce::AudioWorkgroup workgroupToUse) override
50 {
51 if (threads.size() == numThreads)
52 return;
53
55 workgroup = workgroupToUse;
56
57 const auto rtOpts = juce::Thread::RealtimeOptions()
58 .withPriority (10)
60
61 for (size_t i = 0; i < numThreads; ++i)
62 {
63 threads.emplace_back ([this] { runThread(); });
64 setThreadPriority (threads.back(), 10);
66 }
67 }
68
69 void clearThreads() override
70 {
72
73 for (auto& t : threads)
74 t.join();
75
76 threads.clear();
77 }
78
79 void signalOne() override
80 {
81 {
83 triggered.store (true, std::memory_order_release);
84 }
85
86 condition.notify_one();
87 }
88
89 void signal (int numToSignal) override
90 {
91 {
93 triggered.store (true, std::memory_order_release);
94 }
95
96 for (int i = std::min ((int) threads.size(), numToSignal); --i >= 0;)
97 condition.notify_one();
98 }
99
100 void signalAll() override
101 {
102 {
103 std::unique_lock<std::mutex> lock (mutex);
104 triggered.store (true, std::memory_order_release);
105 }
106
107 condition.notify_all();
108 }
109
110 void wait()
111 {
112 if (! shouldWait())
113 return;
114
115 std::unique_lock<std::mutex> lock (mutex);
116
117 if (timeOutMilliseconds < 0)
118 {
119 condition.wait (lock, [this] { return ! shouldWait(); });
120 }
121 else
122 {
123 if (! condition.wait_for (lock, std::chrono::milliseconds (timeOutMilliseconds),
124 [this] { return ! shouldWait(); }))
125 {
126 return;
127 }
128 }
129 }
130
131 void waitForFinalNode() override
132 {
133 if (isFinalNodeReady())
134 return;
135
136 if (! shouldWait())
137 return;
138
139 pause();
140 return;
141 }
142
143private:
145 mutable std::mutex mutex;
146 mutable std::condition_variable condition;
147 mutable std::atomic<bool> triggered { false };
148 juce::AudioWorkgroup workgroup;
149
150 bool shouldWaitOrIsNotTriggered()
151 {
152 if (! triggered.load (std::memory_order_acquire))
153 return false;
154
155 return shouldWait();
156 }
157
158 void runThread()
159 {
161 workgroup.join (token);
162
163 for (;;)
164 {
165 if (shouldExit())
166 return;
167
168 if (! process())
169 wait();
170 }
171 }
172};
173
174
175//==============================================================================
176//==============================================================================
178{
180 : ThreadPool (p)
181 {
182 }
183
184 void createThreads (size_t numThreads, juce::AudioWorkgroup workgroupToUse) override
185 {
186 if (threads.size() == numThreads)
187 return;
188
190 workgroup = workgroupToUse;
191
192 const auto rtOpts = juce::Thread::RealtimeOptions()
193 .withPriority (10)
195
196 for (size_t i = 0; i < numThreads; ++i)
197 {
198 threads.emplace_back ([this] { runThread(); });
199 setThreadPriority (threads.back(), 10);
201 }
202 }
203
204 void clearThreads() override
205 {
207
208 for (auto& t : threads)
209 t.join();
210
211 threads.clear();
212 }
213
214 void signalOne() override
215 {
216 }
217
218 void signal (int) override
219 {
220 }
221
222 void signalAll() override
223 {
224 }
225
226 void wait()
227 {
228 // The pause and sleep counts avoid starving the CPU if there aren't enough queued nodes
229 // This only happens on the worker threads so the main audio thread never interacts with the thread scheduler
230 thread_local int pauseCount = 0;
231
232 if (shouldWait())
233 {
234 ++pauseCount;
235
236 if (pauseCount < 50)
237 pause();
238 else if (pauseCount < 100)
240 else if (pauseCount < 150)
242 else if (pauseCount < 200)
244 else
245 pauseCount = 0;
246 }
247 else
248 {
249 pauseCount = 0;
250 }
251 }
252
253 void waitForFinalNode() override
254 {
255 pause();
256 }
257
258private:
260 juce::AudioWorkgroup workgroup;
261
262 void runThread()
263 {
265 workgroup.join (token);
266
267 for (;;)
268 {
269 if (shouldExit())
270 return;
271
272 if (! process())
273 wait();
274 }
275 }
276
277 inline void pause()
278 {
279 #if JUCE_INTEL
280 _mm_pause();
281 _mm_pause();
282 #else
283 __asm__ __volatile__ ("yield");
284 __asm__ __volatile__ ("yield");
285 #endif
286 }
287};
288
289
290//==============================================================================
291//==============================================================================
293{
295 : ThreadPool (p)
296 {
297 }
298
299 void createThreads (size_t numThreads, juce::AudioWorkgroup workgroupToUse) override
300 {
301 if (threads.size() == numThreads)
302 return;
303
305 workgroup = workgroupToUse;
306
307 const auto rtOpts = juce::Thread::RealtimeOptions()
308 .withPriority (10)
310
311 for (size_t i = 0; i < numThreads; ++i)
312 {
313 threads.emplace_back ([this] { runThread(); });
314 setThreadPriority (threads.back(), 10);
316 }
317 }
318
319 void clearThreads() override
320 {
322
323 for (auto& t : threads)
324 t.join();
325
326 threads.clear();
327 }
328
329 void signalOne() override
330 {
331 {
332 std::unique_lock<std::mutex> lock (mutex);
333 triggered.store (true, std::memory_order_release);
334 }
335
336 condition.notify_one();
337 }
338
339 void signal (int numToSignal) override
340 {
341 {
342 std::unique_lock<std::mutex> lock (mutex);
343 triggered.store (true, std::memory_order_release);
344 }
345
346 for (int i = std::min ((int) threads.size(), numToSignal); --i >= 0;)
347 condition.notify_one();
348 }
349
350 void signalAll() override
351 {
352 {
353 std::unique_lock<std::mutex> lock (mutex);
354 triggered.store (true, std::memory_order_release);
355 }
356
357 condition.notify_all();
358 }
359
360 void wait()
361 {
362 thread_local int pauseCount = 0;
363
364 if (shouldExit())
365 return;
366
367 if (shouldWait())
368 {
369 ++pauseCount;
370
371 if (pauseCount < 25)
372 {
373 pause();
374 }
375 else if (pauseCount < 50)
376 {
378 }
379 else
380 {
381 pauseCount = 0;
382
383 // Fall back to locking
384 std::unique_lock<std::mutex> lock (mutex);
385
386 if (timeOutMilliseconds < 0)
387 {
388 condition.wait (lock, [this] { return ! shouldWaitOrIsNotTriggered(); });
389 }
390 else
391 {
392 if (! condition.wait_for (lock, std::chrono::milliseconds (timeOutMilliseconds),
393 [this] { return ! shouldWaitOrIsNotTriggered(); }))
394 {
395 return;
396 }
397 }
398 }
399 }
400 else
401 {
402 pauseCount = 0;
403 }
404 }
405
406 void waitForFinalNode() override
407 {
408 pause();
409 }
410
411private:
413 mutable std::mutex mutex;
414 mutable std::condition_variable condition;
415 mutable std::atomic<bool> triggered { false };
416 juce::AudioWorkgroup workgroup;
417
418
419 bool shouldWaitOrIsNotTriggered()
420 {
421 if (! triggered.load (std::memory_order_acquire))
422 return false;
423
424 return shouldWait();
425 }
426
427 void runThread()
428 {
430 workgroup.join (token);
431
432 for (;;)
433 {
434 if (shouldExit())
435 return;
436
437 if (! process())
438 wait();
439 }
440 }
441};
442
443
444//==============================================================================
445//==============================================================================
446template<typename SemaphoreType>
448{
450 : ThreadPool (p)
451 {
452 }
453
454 void createThreads (size_t numThreads, juce::AudioWorkgroup workgroupToUse) override
455 {
456 if (threads.size() == numThreads)
457 return;
458
460 semaphore = std::make_unique<SemaphoreType> ((int) numThreads);
461 workgroup = workgroupToUse;
462
463 const auto rtOpts = juce::Thread::RealtimeOptions()
464 .withPriority (10)
466
467 for (size_t i = 0; i < numThreads; ++i)
468 {
469 threads.emplace_back ([this] { runThread(); });
470 setThreadPriority (threads.back(), 10);
472 }
473 }
474
475 void clearThreads() override
476 {
478
479 for (auto& t : threads)
480 t.join();
481
482 threads.clear();
483 semaphore.reset();
484 }
485
486 void signalOne() override
487 {
488 if (semaphore) semaphore->signal();
489 }
490
491 void signal (int numToSignal) override
492 {
493 if (semaphore) semaphore->signal (std::min (numToSignal, (int) threads.size()));
494 }
495
496 void signalAll() override
497 {
498 if (semaphore) semaphore->signal ((int) threads.size());
499 }
500
501 void wait()
502 {
503 if (! shouldWait())
504 return;
505
506 if (timeOutMilliseconds < 0)
507 {
508 semaphore->wait();
509 }
510 else
511 {
512 using namespace std::chrono;
513 semaphore->timed_wait ((std::uint64_t) duration_cast<microseconds> (milliseconds (timeOutMilliseconds)).count());
514 }
515 }
516
517 void waitForFinalNode() override
518 {
519 if (isFinalNodeReady())
520 return;
521
522 if (! shouldWait())
523 return;
524
525 pause();
526 return;
527 }
528
529private:
532 juce::AudioWorkgroup workgroup;
533
534 void runThread()
535 {
537 workgroup.join (token);
538
539 for (;;)
540 {
541 if (shouldExit())
542 return;
543
544 if (! process())
545 wait();
546 }
547 }
548};
549
550
551//==============================================================================
552//==============================================================================
553template<typename SemaphoreType>
555{
557 : ThreadPool (p)
558 {
559 }
560
561 void createThreads (size_t numThreads, juce::AudioWorkgroup workgroupToUse) override
562 {
563 if (threads.size() == numThreads)
564 return;
565
567 semaphore = std::make_unique<SemaphoreType> ((int) numThreads);
568 workgroup = workgroupToUse;
569
570 const auto rtOpts = juce::Thread::RealtimeOptions()
571 .withPriority (10)
573
574 for (size_t i = 0; i < numThreads; ++i)
575 {
576 threads.emplace_back ([this] { runThread(); });
577 setThreadPriority (threads.back(), 10);
579 }
580 }
581
582 void clearThreads() override
583 {
585
586 for (auto& t : threads)
587 t.join();
588
589 threads.clear();
590 semaphore.reset();
591 }
592
593 void signalOne() override
594 {
595 if (semaphore) semaphore->signal();
596 }
597
598 void signal (int numToSignal) override
599 {
600 if (semaphore) semaphore->signal (std::min (numToSignal, (int) threads.size()));
601 }
602
603 void signalAll() override
604 {
605 if (semaphore) semaphore->signal ((int) threads.size());
606 }
607
608 void wait()
609 {
610 thread_local int pauseCount = 0;
611
612 if (shouldExit())
613 return;
614
615 if (shouldWait())
616 {
617 ++pauseCount;
618
619 if (pauseCount < 25)
620 {
621 pause();
622 }
623 else if (pauseCount < 50)
624 {
626 }
627 else
628 {
629 pauseCount = 0;
630
631 // Fall back to locking
632 if (timeOutMilliseconds < 0)
633 {
634 semaphore->wait();
635 }
636 else
637 {
638 using namespace std::chrono;
639 semaphore->timed_wait ((std::uint64_t) duration_cast<microseconds> (milliseconds (timeOutMilliseconds)).count());
640 }
641 }
642 }
643 else
644 {
645 pauseCount = 0;
646 }
647 }
648
649 void waitForFinalNode() override
650 {
651 if (isFinalNodeReady())
652 return;
653
654 if (! shouldWait())
655 return;
656
657 pause();
658 return;
659 }
660
661private:
664 juce::AudioWorkgroup workgroup;
665
666 void runThread()
667 {
669 workgroup.join (token);
670
672
673 for (;;)
674 {
675 if (shouldExit())
676 return;
677
678 if (! process())
679 wait();
680 }
681 }
682};
683//==============================================================================
684//==============================================================================
686{
687 switch (poolType)
688 {
689 case ThreadPoolStrategy::conditionVariable:
691 case ThreadPoolStrategy::hybrid:
693 case ThreadPoolStrategy::semaphore:
695 case ThreadPoolStrategy::lightweightSemaphore:
697 case ThreadPoolStrategy::lightweightSemHybrid:
699 case ThreadPoolStrategy::realTime:
700 default:
702 }
703}
704
705
706#ifdef _MSC_VER
707 #pragma warning (pop)
708#endif
709
710}}
T back(T... args)
void join(WorkgroupToken &token) const
static void JUCE_CALLTYPE disableDenormalisedNumberSupport(bool shouldDisable=true) noexcept
T clear(T... args)
T count(T... args)
T emplace_back(T... args)
T is_pointer_v
T load(T... args)
T min(T... args)
LockFreeMultiThreadedNodePlayer::ThreadPoolCreator getPoolCreatorFunction(ThreadPoolStrategy poolType)
Returns a function to create a ThreadPool for the given stategy.
ThreadPoolStrategy
Available strategies for thread pools.
@ semaphore
Uses a semaphore to suspend threads.
bool tryToUpgradeCurrentThreadToRealtime(const juce::Thread::RealtimeOptions &)
Tries to upgrade the current thread to realtime priority.
pause
T reset(T... args)
T size(T... args)
T sleep_for(T... args)
T store(T... args)
RealtimeOptions withApproximateAudioProcessingTime(int samplesPerFrame, double sampleRate) const
RealtimeOptions withPriority(int newPriority) const
Base class for thread pools which can be customised to determine how cooperative threads should behav...
bool shouldWait()
Returns true if there are no free Nodes to be processed and the calling thread should wait until ther...
void signalShouldExit()
Signals the pool that all the threads should exit.
bool isFinalNodeReady()
Returns true if all the Nodes have been processed.
void resetExitSignal()
Signals the pool that all the threads should continue to run and not exit.
ThreadPool(LockFreeMultiThreadedNodePlayer &p)
Constructs a ThreadPool for a given LockFreeMultiThreadedNodePlayer.
void clearThreads() override
Subclasses should implement this to clear all the threads.
void signal(int numToSignal) override
Called by the player when more than one Node becomes available to process.
void createThreads(size_t numThreads, juce::AudioWorkgroup workgroupToUse) override
Subclasses should implement this to create the given number of threads.
void signalOne() override
Called by the player when a Node becomes available to process.
void waitForFinalNode() override
Called by the player when the audio thread has no free Nodes to process.
void signalAll() override
Called by the player when more than one Node becomes available to process.
void signalAll() override
Called by the player when more than one Node becomes available to process.
void signalOne() override
Called by the player when a Node becomes available to process.
void waitForFinalNode() override
Called by the player when the audio thread has no free Nodes to process.
void clearThreads() override
Subclasses should implement this to clear all the threads.
void signal(int numToSignal) override
Called by the player when more than one Node becomes available to process.
void createThreads(size_t numThreads, juce::AudioWorkgroup workgroupToUse) override
Subclasses should implement this to create the given number of threads.
void clearThreads() override
Subclasses should implement this to clear all the threads.
void signal(int) override
Called by the player when more than one Node becomes available to process.
void signalAll() override
Called by the player when more than one Node becomes available to process.
void createThreads(size_t numThreads, juce::AudioWorkgroup workgroupToUse) override
Subclasses should implement this to create the given number of threads.
void waitForFinalNode() override
Called by the player when the audio thread has no free Nodes to process.
void signalOne() override
Called by the player when a Node becomes available to process.
void signalAll() override
Called by the player when more than one Node becomes available to process.
void createThreads(size_t numThreads, juce::AudioWorkgroup workgroupToUse) override
Subclasses should implement this to create the given number of threads.
void waitForFinalNode() override
Called by the player when the audio thread has no free Nodes to process.
void signal(int numToSignal) override
Called by the player when more than one Node becomes available to process.
void clearThreads() override
Subclasses should implement this to clear all the threads.
void signalOne() override
Called by the player when a Node becomes available to process.
void clearThreads() override
Subclasses should implement this to clear all the threads.
void waitForFinalNode() override
Called by the player when the audio thread has no free Nodes to process.
void createThreads(size_t numThreads, juce::AudioWorkgroup workgroupToUse) override
Subclasses should implement this to create the given number of threads.
void signal(int numToSignal) override
Called by the player when more than one Node becomes available to process.
void signalOne() override
Called by the player when a Node becomes available to process.
void signalAll() override
Called by the player when more than one Node becomes available to process.
T yield(T... args)