15 #include <emmintrin.h>
18#define RETURN_MID_NODES_OPTIMISATION 1
20namespace tracktion {
inline namespace graph
34 void createThreads (
size_t numThreads)
36 if (threads.
size() == numThreads)
41 for (
size_t i = 0; i < numThreads; ++i)
44 setThreadPriority (threads.
back(), 10);
52 for (
auto& t : threads)
80 thread_local int pauseCount = 0;
93 else if (pauseCount < 50)
103 condition.
wait (lock, [
this] {
return ! shouldWaitOrIsNotTriggered(); });
112 void waitForFinalNode()
120 threadsShouldExit =
true;
127 threadsShouldExit =
false;
144 return player.numNodesQueued == 0;
153 return player.processNextFreeNode();
166 bool shouldWaitOrIsNotTriggered()
202 if (newNumThreads == numThreadsToUse)
206 numThreadsToUse = newNumThreads;
217 setNewGraph (
prepareToPlay (std::move (newNode), preparedNode ? preparedNode->graph.
get() :
nullptr,
218 sampleRateToUse, blockSizeToUse));
223 if (sampleRateToUse == sampleRate && blockSizeToUse == blockSize)
230 setNewGraph (
prepareToPlay (std::move (preparedNode->graph->rootNode),
nullptr,
231 sampleRateToUse, blockSizeToUse));
247 numSamplesToProcess = pc.numSamples;
248 referenceSampleRange = pc.referenceSampleRange;
251 for (
auto node : preparedNode->graph->orderedNodes)
252 node->prepareForNextBlock (referenceSampleRange);
256 for (
auto node : preparedNode->graph->orderedNodes)
257 node->process (numSamplesToProcess, referenceSampleRange);
262 jassert (preparedNode->playbackNodes.size() == preparedNode->graph->orderedNodes.size());
268 if (preparedNode->graph->rootNode->hasProcessed())
271 if (! processNextFreeNode())
272 threadPool->waitForFinalNode();
278 auto output = preparedNode->graph->rootNode->getProcessedOutput();
279 auto numAudioChannels =
std::min (output.audio.getNumChannels(), pc.buffers.audio.getNumChannels());
281 if (numAudioChannels > 0)
282 add (pc.buffers.audio.getFirstChannels (numAudioChannels),
283 output.audio.getFirstChannels (numAudioChannels));
285 pc.buffers.midi.mergeFrom (output.midi);
300 assert (preparedNode ==
nullptr);
307 double sampleRateToUse,
int blockSizeToUse)
312 blockSize = blockSizeToUse;
314 return node_player_utils::prepareToPlay (std::move (node), oldGraph,
315 sampleRateToUse, blockSizeToUse);
319void MultiThreadedNodePlayer::clearThreads()
321 threadPool->clearThreads();
324void MultiThreadedNodePlayer::createThreads()
326 threadPool->createThreads (numThreadsToUse.
load());
329inline void MultiThreadedNodePlayer::pause()
335 __asm__ __volatile__ (
"yield");
336 __asm__ __volatile__ (
"yield");
345 currentPreparedNode = {};
351 std::stable_sort (newGraph->orderedNodes.begin(), newGraph->orderedNodes.end(),
352 [] (
auto n1,
auto n2)
354 return n1->isReadyToProcess() && ! n2->isReadyToProcess();
358 newPreparedNode->graph = std::move (newGraph);
359 newPreparedNode->nodesReadyToBeProcessed.reset (newPreparedNode->graph->orderedNodes.size());
360 buildNodesOutputLists (newPreparedNode->graph->orderedNodes, newPreparedNode->playbackNodes);
362 currentPreparedNode = newPreparedNode.get();
366 preparedNode = std::move (newPreparedNode);
373 playbackNodes.clear();
374 playbackNodes.reserve (allNodes.
size());
377 for (
auto n : allNodes)
380 for (
auto& pn : playbackNodes)
388 n->internal = playbackNodes.back().get();
389 n->numOutputNodes = 0;
393 for (
auto node : allNodes)
395 for (
auto inputNode : node->getDirectInputNodes())
399 static_cast<PlaybackNode*
> (inputNode->internal)->outputs.push_back (node);
400 ++inputNode->numOutputNodes;
405void MultiThreadedNodePlayer::resetProcessQueue()
407 assert (preparedNode->nodesReadyToBeProcessed.getUsedSlots() == 0);
408 preparedNode->nodesReadyToBeProcessed.
reset();
413 for (
auto& playbackNode : preparedNode->playbackNodes)
415 jassert (playbackNode->hasBeenQueued);
416 playbackNode->hasBeenQueued =
false;
421 if (playbackNode->node.isReadyToProcess())
422 jassert (playbackNode->numInputsToBeProcessed == 0);
424 if (playbackNode->numInputsToBeProcessed == 0)
425 jassert (playbackNode->node.isReadyToProcess());
430 for (
auto& playbackNode : preparedNode->playbackNodes)
432 playbackNode->hasBeenDequeued =
false;
433 jassert (! playbackNode->hasBeenQueued);
434 jassert (playbackNode->numInputsToBeProcessed == playbackNode->numInputs);
439 size_t numNodesJustQueued = 0;
442 for (
auto& playbackNode : preparedNode->playbackNodes)
446 jassert (! playbackNode->hasBeenQueued);
447 playbackNode->hasBeenQueued =
true;
448 preparedNode->nodesReadyToBeProcessed.push (&playbackNode->node);
449 ++numNodesJustQueued;
455 numNodesQueued += numNodesJustQueued;
456 threadPool->signalAll();
459Node* MultiThreadedNodePlayer::updateProcessQueueForNode (Node& node)
461 auto playbackNode =
static_cast<PlaybackNode*
> (node.internal);
463 #if RETURN_MID_NODES_OPTIMISATION
464 Node* nodeToReturn =
nullptr;
467 for (
auto output : playbackNode->outputs)
469 auto outputPlaybackNode =
static_cast<PlaybackNode*
> (output->internal);
474 jassert (outputPlaybackNode->node.isReadyToProcess());
475 jassert (! outputPlaybackNode->hasBeenQueued);
476 outputPlaybackNode->hasBeenQueued =
true;
478 #if RETURN_MID_NODES_OPTIMISATION
481 if (nodeToReturn ==
nullptr)
483 nodeToReturn = &outputPlaybackNode->node;
487 preparedNode->nodesReadyToBeProcessed.push (&outputPlaybackNode->node);
492 if (playbackNode->outputs.size() == 1
493 || output == playbackNode->outputs.back())
494 return &outputPlaybackNode->node;
496 preparedNode->nodesReadyToBeProcessed.push (&outputPlaybackNode->node);
502 #if RETURN_MID_NODES_OPTIMISATION
510bool MultiThreadedNodePlayer::processNextFreeNode()
512 Node* nodeToProcess =
nullptr;
517 if (! preparedNode->nodesReadyToBeProcessed.pop (nodeToProcess))
522 assert (nodeToProcess !=
nullptr);
523 processNode (*nodeToProcess);
528void MultiThreadedNodePlayer::processNode (Node& node)
530 auto* nodeToProcess = &node;
537 jassert (!
static_cast<PlaybackNode*
> (nodeToProcess->internal)->hasBeenDequeued);
538 static_cast<PlaybackNode*
> (nodeToProcess->internal)->hasBeenDequeued =
true;
542 nodeToProcess->process (numSamplesToProcess, referenceSampleRange);
543 nodeToProcess = updateProcessQueueForNode (*nodeToProcess);
bool process()
Process the next chain of Nodes.
bool shouldExit() const
Returns true if all the threads should exit.
bool shouldWait()
Returns true if there are no free Nodes to be processed and the calling thread should wait until ther...
void signalShouldExit()
Signals the pool that all the threads should exit.
void resetExitSignal()
Signals the pool that all the threads should continue to run and not exit.
Plays back a node with mutiple threads.
double getSampleRate() const
Returns the current sample rate.
int process(const Node::ProcessContext &)
Process a block of the Node.
void clearNode()
Clears the current Node.
void setNode(std::unique_ptr< Node >)
Sets the Node to process.
Node * getNode()
Returns the current Node.
MultiThreadedNodePlayer()
Creates an empty MultiThreadedNodePlayer.
~MultiThreadedNodePlayer()
Destructor.
void prepareToPlay(double sampleRateToUse, int blockSizeToUse)
Prepares the current Node to be played.
void setNumThreads(size_t)
Sets the number of threads to use for rendering.
Struct to describe a single iteration of a process call.
T emplace_back(T... args)
Holds a graph in an order ready for processing and a sorted map for quick lookups.