13 #include <emmintrin.h>
16#define RETURN_MID_NODES_OPTIMISATION 1
18namespace tracktion {
inline namespace graph
27 : audioWorkgroup (
std::move (audioWorkgroup_))
29 threadPool = poolCreator (*
this);
34 if (numThreadsToUse > 0)
40 if (newNumThreads == numThreadsToUse)
44 numThreadsToUse = newNumThreads;
56 postNewGraph (
prepareToPlay (std::move (newNode), lastGraphPosted,
57 sampleRateToUse, blockSizeToUse,
63 if (sampleRateToUse == sampleRate && blockSizeToUse == blockSize)
72 if (
auto pn = scopedAccess.get())
73 currentGraph = std::move (pn->graph);
80 sampleRateToUse, blockSizeToUse,
87 const auto preparedNode = scopedAccess.get();
89 if (preparedNode ==
nullptr)
92 if (! preparedNode->graph)
95 if (! preparedNode->graph->rootNode)
99 numSamplesToProcess = pc.numSamples;
100 referenceSampleRange = pc.referenceSampleRange;
103 for (
auto node : preparedNode->graph->orderedNodes)
104 node->prepareForNextBlock (referenceSampleRange);
107 preparedNode->graph->rootNode->retain();
111 for (
auto node : preparedNode->graph->orderedNodes)
112 node->process (numSamplesToProcess, referenceSampleRange);
117 jassert (preparedNode->playbackNodes.size() == preparedNode->graph->orderedNodes.size());
118 resetProcessQueue (*preparedNode);
123 if (preparedNode->graph->rootNode->hasProcessed())
126 if (! processNextFreeNode (*preparedNode))
127 threadPool->waitForFinalNode();
133 auto output = preparedNode->graph->rootNode->getProcessedOutput();
134 auto numAudioChannels =
std::min (output.audio.getNumChannels(), pc.buffers.audio.getNumChannels());
136 if (numAudioChannels > 0)
137 add (pc.buffers.audio.getFirstChannels (numAudioChannels),
138 output.audio.getFirstChannels (numAudioChannels));
140 pc.buffers.midi.mergeFrom (output.midi);
144 preparedNode->graph->rootNode->release();
155 lastGraphPosted =
nullptr;
156 lastAudioBufferPoolPosted =
nullptr;
157 preparedNodeObject.
clear();
165 if (useMemoryPool.
exchange (usePool) != usePool)
169void LockFreeMultiThreadedNodePlayer::enableNodeMemorySharing (
bool shouldBeEnabled)
171 if (
std::exchange (nodeMemorySharingEnabled, shouldBeEnabled) != shouldBeEnabled)
179 double sampleRateToUse,
int blockSizeToUse,
180 bool useCurrentAudioBufferPool)
187 if (! useCurrentAudioBufferPool)
188 return node_player_utils::prepareToPlay (std::move (node), oldGraph, sampleRateToUse, blockSizeToUse,
nullptr,
nullptr, nodeMemorySharingEnabled);
190 return node_player_utils::prepareToPlay (std::move (node), oldGraph, sampleRateToUse, blockSizeToUse,
193 auto data = lastAudioBufferPoolPosted->
allocate (s);
194 return { data.getView().getFirstChannels (s.numChannels).getStart (s.numFrames), std::move (data) };
198 lastAudioBufferPoolPosted->
release (std::move (b.data));
200 nodeMemorySharingEnabled);
204void LockFreeMultiThreadedNodePlayer::clearThreads()
206 threadPool->clearThreads();
209void LockFreeMultiThreadedNodePlayer::createThreads()
211 threadPool->createThreads (numThreadsToUse.
load(), audioWorkgroup);
214inline void LockFreeMultiThreadedNodePlayer::pause()
220 __asm__ __volatile__ (
"yield");
221 __asm__ __volatile__ (
"yield");
234 std::stable_sort (newGraph->orderedNodes.begin(), newGraph->orderedNodes.end(),
235 [] (
auto n1,
auto n2)
237 return n1->isReadyToProcess() && ! n2->isReadyToProcess();
240 rootNode = newGraph->rootNode.
get();
242 PreparedNode newPreparedNode;
243 newPreparedNode.graph = std::move (newGraph);
245 buildNodesOutputLists (newPreparedNode);
249 const size_t poolCapacity = newPreparedNode.graph->orderedNodes.size();
252 node_player_utils::reserveAudioBufferPool (newPreparedNode.graph->rootNode.get(),
253 newPreparedNode.graph->orderedNodes,
254 *newPreparedNode.audioBufferPool,
255 numThreadsToUse, blockSize);
258 lastGraphPosted = newPreparedNode.graph.get();
259 lastAudioBufferPoolPosted = newPreparedNode.audioBufferPool.get();
264void LockFreeMultiThreadedNodePlayer::buildNodesOutputLists (PreparedNode& preparedNode)
266 preparedNode.playbackNodes.clear();
267 preparedNode.playbackNodes.reserve (preparedNode.graph->orderedNodes.size());
269 for (
auto n : preparedNode.graph->orderedNodes)
272 for (
auto& pn : preparedNode.playbackNodes)
276 jassert (
std::count (preparedNode.graph->orderedNodes.begin(), preparedNode.graph->orderedNodes.end(), n) == 1);
279 n->internal = preparedNode.playbackNodes.back().get();
280 n->numOutputNodes = 0;
284 for (
auto node : preparedNode.graph->orderedNodes)
286 for (
auto inputNode : node->getDirectInputNodes())
289 jassert (
std::find (preparedNode.graph->orderedNodes.begin(), preparedNode.graph->orderedNodes.end(), inputNode) != preparedNode.graph->orderedNodes.end());
290 static_cast<PlaybackNode*
> (inputNode->internal)->outputs.push_back (node);
291 ++inputNode->numOutputNodes;
296void LockFreeMultiThreadedNodePlayer::resetProcessQueue (PreparedNode& preparedNode)
303 if (! preparedNode.nodesReadyToBeProcessed->try_dequeue (temp))
311 for (
auto& playbackNode : preparedNode.playbackNodes)
313 jassert (playbackNode->hasBeenQueued);
314 playbackNode->hasBeenQueued =
false;
319 if (playbackNode->node.isReadyToProcess())
320 jassert (playbackNode->numInputsToBeProcessed == 0);
322 if (playbackNode->numInputsToBeProcessed == 0)
323 jassert (playbackNode->node.isReadyToProcess());
328 for (
auto& playbackNode : preparedNode.playbackNodes)
330 playbackNode->hasBeenDequeued =
false;
331 jassert (! playbackNode->hasBeenQueued);
332 jassert (playbackNode->numInputsToBeProcessed == playbackNode->numInputs);
337 size_t numNodesJustQueued = 0;
340 for (
auto& playbackNode : preparedNode.playbackNodes)
344 jassert (! playbackNode->hasBeenQueued);
345 playbackNode->hasBeenQueued =
true;
346 preparedNode.nodesReadyToBeProcessed->try_enqueue (&playbackNode->node);
347 ++numNodesJustQueued;
353 numNodesQueued += numNodesJustQueued;
355 threadPool->setCurrentNode (&preparedNode);
357 if (
int numThreadsToSignal = (
int) numNodesQueued.
load(); numThreadsToSignal > 1)
358 threadPool->signal (numThreadsToSignal);
361Node* LockFreeMultiThreadedNodePlayer::updateProcessQueueForNode (PreparedNode& preparedNode, Node& node)
363 auto playbackNode =
static_cast<PlaybackNode*
> (node.internal);
365 #if RETURN_MID_NODES_OPTIMISATION
366 Node* nodeToReturn =
nullptr;
369 for (
auto output : playbackNode->outputs)
371 auto outputPlaybackNode =
static_cast<PlaybackNode*
> (output->internal);
376 jassert (outputPlaybackNode->node.isReadyToProcess());
377 jassert (! outputPlaybackNode->hasBeenQueued);
378 outputPlaybackNode->hasBeenQueued =
true;
380 #if RETURN_MID_NODES_OPTIMISATION
383 if (nodeToReturn ==
nullptr)
385 nodeToReturn = &outputPlaybackNode->node;
389 preparedNode.nodesReadyToBeProcessed->try_enqueue (&outputPlaybackNode->node);
394 if (playbackNode->outputs.size() == 1
395 || output == playbackNode->outputs.back())
396 return &outputPlaybackNode->node;
398 preparedNode.nodesReadyToBeProcessed->try_enqueue (&outputPlaybackNode->node);
404 #if RETURN_MID_NODES_OPTIMISATION
412bool LockFreeMultiThreadedNodePlayer::processNextFreeNode (PreparedNode& preparedNode)
414 Node* nodeToProcess =
nullptr;
419 if (! preparedNode.nodesReadyToBeProcessed->try_dequeue (nodeToProcess))
424 assert (nodeToProcess !=
nullptr);
425 processNode (preparedNode, *nodeToProcess);
430void LockFreeMultiThreadedNodePlayer::processNode (PreparedNode& preparedNode, Node& node)
432 auto* nodeToProcess = &node;
439 jassert (!
static_cast<PlaybackNode*
> (nodeToProcess->internal)->hasBeenDequeued);
440 static_cast<PlaybackNode*
> (nodeToProcess->internal)->hasBeenDequeued =
true;
444 nodeToProcess->process (numSamplesToProcess, referenceSampleRange);
445 nodeToProcess = updateProcessQueueForNode (preparedNode, *nodeToProcess);
choc::buffer::ChannelArrayBuffer< float > allocate(choc::buffer::Size)
Returns an allocated buffer for a given size from the pool.
bool release(choc::buffer::ChannelArrayBuffer< float > &&)
Releases an allocated buffer back to the pool.
void clearNode()
Clears the current Node.
void setNode(std::unique_ptr< Node >)
Sets the Node to process.
void enablePooledMemoryAllocations(bool)
Enables or disables the use on an AudioBufferPool to reduce memory consumption.
~LockFreeMultiThreadedNodePlayer()
Destructor.
double getSampleRate() const
Returns the current sample rate.
int process(const Node::ProcessContext &)
Process a block of the Node.
void setNumThreads(size_t)
Sets the number of threads to use for rendering.
void prepareToPlay(double sampleRateToUse, int blockSizeToUse)
Prepares the current Node to be played.
LockFreeMultiThreadedNodePlayer()
Creates an empty LockFreeMultiThreadedNodePlayer.
void clear()
Clears the object and any pending object by assigining them defaultly constructed objects.
ScopedRealTimeAccess getScopedAccess()
Creates a ScopedRealTimeAccess for this LockFreeObject.
void pushNonRealTime(ObjectType &&newObj)
Pushes a new object to be picked up on the real time thread.
Struct to describe a single iteration of a process call.
LockFreeMultiThreadedNodePlayer::ThreadPoolCreator getPoolCreatorFunction(ThreadPoolStrategy poolType)
Returns a function to create a ThreadPool for the given stategy.
Holds a view over some data and optionally some storage for that data.
Holds a graph in an order ready for processing and a sorted map for quick lookups.