tracktion-engine 3.0-10-g034fdde4aa5
Tracktion Engine — High level data model for audio applications

« « « Anklang Documentation
Loading...
Searching...
No Matches
tracktion_MultiThreadedNodePlayer.cpp
Go to the documentation of this file.
1 /*
2 ,--. ,--. ,--. ,--.
3 ,-' '-.,--.--.,--,--.,---.| |,-.,-' '-.`--' ,---. ,--,--, Copyright 2024
4 '-. .-'| .--' ,-. | .--'| /'-. .-',--.| .-. || \ Tracktion Software
5 | | | | \ '-' \ `--.| \ \ | | | |' '-' '| || | Corporation
6 `---' `--' `--`--'`---'`--'`--' `---' `--' `---' `--''--' www.tracktion.com
7
8 Tracktion Engine uses a GPL/commercial licence - see LICENCE.md for details.
9*/
10
11#pragma once
12
13#include <thread>
14#if JUCE_INTEL
15 #include <emmintrin.h>
16#endif
17
18#define RETURN_MID_NODES_OPTIMISATION 1
19
20namespace tracktion { inline namespace graph
21{
22
23//==============================================================================
24//==============================================================================
26{
27public:
28 //==============================================================================
30 : player (p)
31 {
32 }
33
34 void createThreads (size_t numThreads)
35 {
36 if (threads.size() == numThreads)
37 return;
38
40
41 for (size_t i = 0; i < numThreads; ++i)
42 {
43 threads.emplace_back ([this] { runThread(); });
44 setThreadPriority (threads.back(), 10);
45 }
46 }
47
48 void clearThreads()
49 {
51
52 for (auto& t : threads)
53 t.join();
54
55 threads.clear();
56 }
57
58 void signalOne()
59 {
60 {
62 triggered.store (true, std::memory_order_release);
63 }
64
65 condition.notify_one();
66 }
67
68 void signalAll()
69 {
70 {
72 triggered.store (true, std::memory_order_release);
73 }
74
75 condition.notify_all();
76 }
77
78 void wait()
79 {
80 thread_local int pauseCount = 0;
81
82 if (shouldExit())
83 return;
84
85 if (shouldWait())
86 {
87 ++pauseCount;
88
89 if (pauseCount < 25)
90 {
91 pause();
92 }
93 else if (pauseCount < 50)
94 {
96 }
97 else
98 {
99 pauseCount = 0;
100
101 // Fall back to locking
102 std::unique_lock<std::mutex> lock (mutex);
103 condition.wait (lock, [this] { return ! shouldWaitOrIsNotTriggered(); });
104 }
105 }
106 else
107 {
108 pauseCount = 0;
109 }
110 }
111
112 void waitForFinalNode()
113 {
114 pause();
115 }
116
119 {
120 threadsShouldExit = true;
121 signalAll();
122 }
123
126 {
127 threadsShouldExit = false;
128 }
129
131 bool shouldExit() const
132 {
133 return threadsShouldExit.load (std::memory_order_acquire);
134 }
135
140 {
141 if (shouldExit())
142 return false;
143
144 return player.numNodesQueued == 0;
145 }
146
151 bool process()
152 {
153 return player.processNextFreeNode();
154 }
155
156private:
157 //==============================================================================
159
160 std::atomic<bool> threadsShouldExit { false };
162 mutable std::mutex mutex;
163 mutable std::condition_variable condition;
164 mutable std::atomic<bool> triggered { false };
165
166 bool shouldWaitOrIsNotTriggered()
167 {
168 if (! triggered.load (std::memory_order_acquire))
169 return false;
170
171 return shouldWait();
172 }
173
174 void runThread()
175 {
176 for (;;)
177 {
178 if (shouldExit())
179 return;
180
181 if (! process())
182 wait();
183 }
184 }
185};
186
187
188//==============================================================================
189//==============================================================================
194
199
200void MultiThreadedNodePlayer::setNumThreads (size_t newNumThreads)
201{
202 if (newNumThreads == numThreadsToUse)
203 return;
204
205 clearThreads();
206 numThreadsToUse = newNumThreads;
207 createThreads();
208}
209
211{
212 setNode (std::move (newNode), getSampleRate(), blockSize);
213}
214
215void MultiThreadedNodePlayer::setNode (std::unique_ptr<Node> newNode, double sampleRateToUse, int blockSizeToUse)
216{
217 setNewGraph (prepareToPlay (std::move (newNode), preparedNode ? preparedNode->graph.get() : nullptr,
218 sampleRateToUse, blockSizeToUse));
219}
220
221void MultiThreadedNodePlayer::prepareToPlay (double sampleRateToUse, int blockSizeToUse)
222{
223 if (sampleRateToUse == sampleRate && blockSizeToUse == blockSize)
224 return;
225
226 if (! preparedNode)
227 return;
228
229 // Don't pass in the old graph here as we're stealing the root from it
230 setNewGraph (prepareToPlay (std::move (preparedNode->graph->rootNode), nullptr,
231 sampleRateToUse, blockSizeToUse));
232}
233
235{
237
238 if (! tryLock.owns_lock())
239 return -1;
240
241 if (getNode() == nullptr)
242 return -1;
243
244 const std::lock_guard<RealTimeSpinLock> lock (preparedNodeMutex);
245
246 // Reset the stream range
247 numSamplesToProcess = pc.numSamples;
248 referenceSampleRange = pc.referenceSampleRange;
249
250 // Prepare all the nodes to be played back
251 for (auto node : preparedNode->graph->orderedNodes)
252 node->prepareForNextBlock (referenceSampleRange);
253
254 if (numThreadsToUse.load (std::memory_order_acquire) == 0)
255 {
256 for (auto node : preparedNode->graph->orderedNodes)
257 node->process (numSamplesToProcess, referenceSampleRange);
258 }
259 else
260 {
261 // Reset the queue to be processed
262 jassert (preparedNode->playbackNodes.size() == preparedNode->graph->orderedNodes.size());
263 resetProcessQueue();
264
265 // Try to process Nodes until the root is ready
266 for (;;)
267 {
268 if (preparedNode->graph->rootNode->hasProcessed())
269 break;
270
271 if (! processNextFreeNode())
272 threadPool->waitForFinalNode();
273 }
274 }
275
276 // Add output from graph to buffers
277 {
278 auto output = preparedNode->graph->rootNode->getProcessedOutput();
279 auto numAudioChannels = std::min (output.audio.getNumChannels(), pc.buffers.audio.getNumChannels());
280
281 if (numAudioChannels > 0)
282 add (pc.buffers.audio.getFirstChannels (numAudioChannels),
283 output.audio.getFirstChannels (numAudioChannels));
284
285 pc.buffers.midi.mergeFrom (output.midi);
286 }
287
288 return -1;
289}
290
292{
293 std::lock_guard<RealTimeSpinLock> sl (clearNodesLock);
294
295 // N.B. The threads will be trying to read the preparedNodes so we need to actually stop these first
296 clearThreads();
297 setNode (nullptr);
298 createThreads();
299
300 assert (preparedNode == nullptr);
301 assert (getNode() == nullptr);
302}
303
304//==============================================================================
305//==============================================================================
307 double sampleRateToUse, int blockSizeToUse)
308{
309 createThreads();
310
311 sampleRate.store (sampleRateToUse, std::memory_order_release);
312 blockSize = blockSizeToUse;
313
314 return node_player_utils::prepareToPlay (std::move (node), oldGraph,
315 sampleRateToUse, blockSizeToUse);
316}
317
318//==============================================================================
319void MultiThreadedNodePlayer::clearThreads()
320{
321 threadPool->clearThreads();
322}
323
324void MultiThreadedNodePlayer::createThreads()
325{
326 threadPool->createThreads (numThreadsToUse.load());
327}
328
329inline void MultiThreadedNodePlayer::pause()
330{
331 #if JUCE_INTEL
332 _mm_pause();
333 _mm_pause();
334 #else
335 __asm__ __volatile__ ("yield");
336 __asm__ __volatile__ ("yield");
337 #endif
338}
339
340//==============================================================================
341void MultiThreadedNodePlayer::setNewGraph (std::unique_ptr<NodeGraph> newGraph)
342{
343 if (! newGraph)
344 {
345 currentPreparedNode = {};
346 preparedNode = {};
347 return;
348 }
349
350 // Ensure the Nodes ready to be processed are at the front of the queue
351 std::stable_sort (newGraph->orderedNodes.begin(), newGraph->orderedNodes.end(),
352 [] (auto n1, auto n2)
353 {
354 return n1->isReadyToProcess() && ! n2->isReadyToProcess();
355 });
356
357 auto newPreparedNode = std::make_unique<PreparedNode>();
358 newPreparedNode->graph = std::move (newGraph);
359 newPreparedNode->nodesReadyToBeProcessed.reset (newPreparedNode->graph->orderedNodes.size());
360 buildNodesOutputLists (newPreparedNode->graph->orderedNodes, newPreparedNode->playbackNodes);
361
362 currentPreparedNode = newPreparedNode.get();
363
364 // Then swap the storage under the lock so the old Node isn't being processed whilst it is deleted
365 const std::lock_guard<RealTimeSpinLock> lock (preparedNodeMutex);
366 preparedNode = std::move (newPreparedNode);
367}
368
369//==============================================================================
370void MultiThreadedNodePlayer::buildNodesOutputLists (std::vector<Node*>& allNodes,
372{
373 playbackNodes.clear();
374 playbackNodes.reserve (allNodes.size());
375
376 // Create a PlaybackNode for each Node
377 for (auto n : allNodes)
378 {
379 #if JUCE_DEBUG
380 for (auto& pn : playbackNodes)
381 jassert (&pn->node != n);
382 #endif
383
384 // Ensure no Nodes are present twice in the list
385 jassert (std::count (allNodes.begin(), allNodes.end(), n) == 1);
386
387 playbackNodes.push_back (std::make_unique<PlaybackNode> (*n));
388 n->internal = playbackNodes.back().get();
389 n->numOutputNodes = 0;
390 }
391
392 // Iterate all Nodes, for each input, add to the current Nodes output list
393 for (auto node : allNodes)
394 {
395 for (auto inputNode : node->getDirectInputNodes())
396 {
397 // Check the input is actually still in the graph
398 jassert (std::find (allNodes.begin(), allNodes.end(), inputNode) != allNodes.end());
399 static_cast<PlaybackNode*> (inputNode->internal)->outputs.push_back (node);
400 ++inputNode->numOutputNodes;
401 }
402 }
403}
404
405void MultiThreadedNodePlayer::resetProcessQueue()
406{
407 assert (preparedNode->nodesReadyToBeProcessed.getUsedSlots() == 0);
408 preparedNode->nodesReadyToBeProcessed.reset();
409 numNodesQueued.store (0, std::memory_order_release);
410
411 // Reset all the counters
412 // And then move any Nodes that are ready to the correct queue
413 for (auto& playbackNode : preparedNode->playbackNodes)
414 {
415 jassert (playbackNode->hasBeenQueued);
416 playbackNode->hasBeenQueued = false;
417 playbackNode->numInputsToBeProcessed.store (playbackNode->numInputs, std::memory_order_release);
418
419 // Check only ready nodes will be queued
420 #if JUCE_DEBUG
421 if (playbackNode->node.isReadyToProcess())
422 jassert (playbackNode->numInputsToBeProcessed == 0);
423
424 if (playbackNode->numInputsToBeProcessed == 0)
425 jassert (playbackNode->node.isReadyToProcess());
426 #endif
427 }
428
429 #if JUCE_DEBUG
430 for (auto& playbackNode : preparedNode->playbackNodes)
431 {
432 playbackNode->hasBeenDequeued = false;
433 jassert (! playbackNode->hasBeenQueued);
434 jassert (playbackNode->numInputsToBeProcessed == playbackNode->numInputs);
435 jassert (playbackNode->numInputsToBeProcessed.load (std::memory_order_acquire) == playbackNode->numInputs);
436 }
437 #endif
438
439 size_t numNodesJustQueued = 0;
440
441 // Make sure the counters are reset for all nodes before queueing any
442 for (auto& playbackNode : preparedNode->playbackNodes)
443 {
444 if (playbackNode->numInputsToBeProcessed.load (std::memory_order_acquire) == 0)
445 {
446 jassert (! playbackNode->hasBeenQueued);
447 playbackNode->hasBeenQueued = true;
448 preparedNode->nodesReadyToBeProcessed.push (&playbackNode->node);
449 ++numNodesJustQueued;
450 }
451 }
452
453 // Make sure this is only incremented after all the nodes have been queued
454 // or the threads will start queueing Nodes at the same time
455 numNodesQueued += numNodesJustQueued;
456 threadPool->signalAll();
457}
458
459Node* MultiThreadedNodePlayer::updateProcessQueueForNode (Node& node)
460{
461 auto playbackNode = static_cast<PlaybackNode*> (node.internal);
462
463 #if RETURN_MID_NODES_OPTIMISATION
464 Node* nodeToReturn = nullptr;
465 #endif
466
467 for (auto output : playbackNode->outputs)
468 {
469 auto outputPlaybackNode = static_cast<PlaybackNode*> (output->internal);
470
471 // fetch_sub returns the previous value so it will now be 0
472 if (outputPlaybackNode->numInputsToBeProcessed.fetch_sub (1, std::memory_order_acq_rel) == 1)
473 {
474 jassert (outputPlaybackNode->node.isReadyToProcess());
475 jassert (! outputPlaybackNode->hasBeenQueued);
476 outputPlaybackNode->hasBeenQueued = true;
477
478 #if RETURN_MID_NODES_OPTIMISATION
479 // We can return one Node to be processed on this thread, otherwise we can
480 // queue it for another thread to possibly process
481 if (nodeToReturn == nullptr)
482 {
483 nodeToReturn = &outputPlaybackNode->node;
484 }
485 else
486 {
487 preparedNode->nodesReadyToBeProcessed.push (&outputPlaybackNode->node);
488 numNodesQueued.fetch_add (1, std::memory_order_acq_rel);
489 }
490 #else
491 // If there is only one Node or we're at the last Node we can reutrn this to be processed by the same thread
492 if (playbackNode->outputs.size() == 1
493 || output == playbackNode->outputs.back())
494 return &outputPlaybackNode->node;
495
496 preparedNode->nodesReadyToBeProcessed.push (&outputPlaybackNode->node);
497 numNodesQueued.fetch_add (1, std::memory_order_acq_rel);
498 #endif
499 }
500 }
501
502 #if RETURN_MID_NODES_OPTIMISATION
503 return nodeToReturn;
504 #else
505 return nullptr;
506 #endif
507}
508
509//==============================================================================
510bool MultiThreadedNodePlayer::processNextFreeNode()
511{
512 Node* nodeToProcess = nullptr;
513
514 if (numNodesQueued.load (std::memory_order_acquire) == 0)
515 return false;
516
517 if (! preparedNode->nodesReadyToBeProcessed.pop (nodeToProcess))
518 return false;
519
520 numNodesQueued.fetch_sub (1, std::memory_order_acq_rel);
521
522 assert (nodeToProcess != nullptr);
523 processNode (*nodeToProcess);
524
525 return true;
526}
527
528void MultiThreadedNodePlayer::processNode (Node& node)
529{
530 auto* nodeToProcess = &node;
531
532 // Attempt to process serial Node chains on this thread
533 // to reduce context switches and overhead
534 for (;;)
535 {
536 #if JUCE_DEBUG
537 jassert (! static_cast<PlaybackNode*> (nodeToProcess->internal)->hasBeenDequeued);
538 static_cast<PlaybackNode*> (nodeToProcess->internal)->hasBeenDequeued = true;
539 #endif
540
541 // Process Node
542 nodeToProcess->process (numSamplesToProcess, referenceSampleRange);
543 nodeToProcess = updateProcessQueueForNode (*nodeToProcess);
544
545 if (! nodeToProcess)
546 break;
547 }
548}
549
550}}
assert
T back(T... args)
T begin(T... args)
bool shouldExit() const
Returns true if all the threads should exit.
bool shouldWait()
Returns true if there are no free Nodes to be processed and the calling thread should wait until ther...
void signalShouldExit()
Signals the pool that all the threads should exit.
void resetExitSignal()
Signals the pool that all the threads should continue to run and not exit.
double getSampleRate() const
Returns the current sample rate.
int process(const Node::ProcessContext &)
Process a block of the Node.
void setNode(std::unique_ptr< Node >)
Sets the Node to process.
MultiThreadedNodePlayer()
Creates an empty MultiThreadedNodePlayer.
void prepareToPlay(double sampleRateToUse, int blockSizeToUse)
Prepares the current Node to be played.
void setNumThreads(size_t)
Sets the number of threads to use for rendering.
Struct to describe a single iteration of a process call.
T clear(T... args)
T count(T... args)
T emplace_back(T... args)
T end(T... args)
T fetch_add(T... args)
T fetch_sub(T... args)
T find(T... args)
T get(T... args)
T is_pointer_v
#define jassert(expression)
T load(T... args)
T lock(T... args)
T min(T... args)
T owns_lock(T... args)
pause
T reset(T... args)
T size(T... args)
T stable_sort(T... args)
T store(T... args)
Holds a graph in an order ready for processing and a sorted map for quick lookups.
T yield(T... args)