tracktion-engine 3.0-10-g034fdde4aa5
Tracktion Engine — High level data model for audio applications

« « « Anklang Documentation
Loading...
Searching...
No Matches
tracktion_LockFreeMultiThreadedNodePlayer.cpp
Go to the documentation of this file.
1 /*
2 ,--. ,--. ,--. ,--.
3 ,-' '-.,--.--.,--,--.,---.| |,-.,-' '-.`--' ,---. ,--,--, Copyright 2024
4 '-. .-'| .--' ,-. | .--'| /'-. .-',--.| .-. || \ Tracktion Software
5 | | | | \ '-' \ `--.| \ \ | | | |' '-' '| || | Corporation
6 `---' `--' `--`--'`---'`--'`--' `---' `--' `---' `--''--' www.tracktion.com
7
8 Tracktion Engine uses a GPL/commercial licence - see LICENCE.md for details.
9*/
10
11#include <thread>
12#if JUCE_INTEL
13 #include <emmintrin.h>
14#endif
15
16#define RETURN_MID_NODES_OPTIMISATION 1
17
18namespace tracktion { inline namespace graph
19{
20
22{
23 threadPool = getPoolCreatorFunction (ThreadPoolStrategy::realTime) (*this);
24}
25
27 : audioWorkgroup (std::move (audioWorkgroup_))
28{
29 threadPool = poolCreator (*this);
30}
31
33{
34 if (numThreadsToUse > 0)
35 clearThreads();
36}
37
39{
40 if (newNumThreads == numThreadsToUse)
41 return;
42
43 clearThreads();
44 numThreadsToUse = newNumThreads;
45 createThreads();
46}
47
49{
50 setNode (std::move (newNode), getSampleRate(), blockSize);
51}
52
53void LockFreeMultiThreadedNodePlayer::setNode (std::unique_ptr<Node> newNode, double sampleRateToUse, int blockSizeToUse)
54{
55 // The prepare and set the new Node, passing in the old graph
56 postNewGraph (prepareToPlay (std::move (newNode), lastGraphPosted,
57 sampleRateToUse, blockSizeToUse,
58 useMemoryPool));
59}
60
61void LockFreeMultiThreadedNodePlayer::prepareToPlay (double sampleRateToUse, int blockSizeToUse)
62{
63 if (sampleRateToUse == sampleRate && blockSizeToUse == blockSize)
64 return;
65
66 std::unique_ptr<NodeGraph> currentGraph;
67
68 // Ensure we've flushed any pending Node to the current prepared Node
69 {
70 const auto scopedAccess = preparedNodeObject.getScopedAccess();
71
72 if (auto pn = scopedAccess.get())
73 currentGraph = std::move (pn->graph);
74 }
75
76 clearNode();
77
78 // Don't pass in the old graph here as we're stealing the root from it
79 postNewGraph (prepareToPlay (currentGraph != nullptr ? std::move (currentGraph->rootNode) : std::unique_ptr<Node>(), nullptr,
80 sampleRateToUse, blockSizeToUse,
81 useMemoryPool));
82}
83
85{
86 const auto scopedAccess = preparedNodeObject.getScopedAccess();
87 const auto preparedNode = scopedAccess.get();
88
89 if (preparedNode == nullptr)
90 return -1;
91
92 if (! preparedNode->graph)
93 return -1;
94
95 if (! preparedNode->graph->rootNode)
96 return -1;
97
98 // Reset the stream range
99 numSamplesToProcess = pc.numSamples;
100 referenceSampleRange = pc.referenceSampleRange;
101
102 // Prepare all the nodes to be played back
103 for (auto node : preparedNode->graph->orderedNodes)
104 node->prepareForNextBlock (referenceSampleRange);
105
106 // We need to retain the root so we can get the output from it
107 preparedNode->graph->rootNode->retain();
108
109 if (numThreadsToUse.load (std::memory_order_acquire) == 0 || preparedNode->graph->orderedNodes.size() == 1)
110 {
111 for (auto node : preparedNode->graph->orderedNodes)
112 node->process (numSamplesToProcess, referenceSampleRange);
113 }
114 else
115 {
116 // Reset the queue to be processed
117 jassert (preparedNode->playbackNodes.size() == preparedNode->graph->orderedNodes.size());
118 resetProcessQueue (*preparedNode);
119
120 // Try to process Nodes until the root is ready
121 for (;;)
122 {
123 if (preparedNode->graph->rootNode->hasProcessed())
124 break;
125
126 if (! processNextFreeNode (*preparedNode))
127 threadPool->waitForFinalNode();
128 }
129 }
130
131 // Add output from graph to buffers
132 {
133 auto output = preparedNode->graph->rootNode->getProcessedOutput();
134 auto numAudioChannels = std::min (output.audio.getNumChannels(), pc.buffers.audio.getNumChannels());
135
136 if (numAudioChannels > 0)
137 add (pc.buffers.audio.getFirstChannels (numAudioChannels),
138 output.audio.getFirstChannels (numAudioChannels));
139
140 pc.buffers.midi.mergeFrom (output.midi);
141 }
142
143 // We need to release the root to match the previous retain
144 preparedNode->graph->rootNode->release();
145
146 return -1;
147}
148
150{
151 // N.B. The threads will be trying to read the preparedNodes so we need to actually stop these first
152 clearThreads();
153
154 rootNode = nullptr;
155 lastGraphPosted = nullptr;
156 lastAudioBufferPoolPosted = nullptr;
157 preparedNodeObject.clear();
158
159 createThreads();
160}
161
162//==============================================================================
164{
165 if (useMemoryPool.exchange (usePool) != usePool)
166 prepareToPlay (sampleRate, blockSize);
167}
168
169void LockFreeMultiThreadedNodePlayer::enableNodeMemorySharing (bool shouldBeEnabled)
170{
171 if (std::exchange (nodeMemorySharingEnabled, shouldBeEnabled) != shouldBeEnabled)
172 prepareToPlay (sampleRate, blockSize);
173}
174
175
176//==============================================================================
177//==============================================================================
179 double sampleRateToUse, int blockSizeToUse,
180 bool useCurrentAudioBufferPool)
181{
182 createThreads();
183
184 sampleRate.store (sampleRateToUse, std::memory_order_release);
185 blockSize.store (blockSizeToUse, std::memory_order_release);;
186
187 if (! useCurrentAudioBufferPool)
188 return node_player_utils::prepareToPlay (std::move (node), oldGraph, sampleRateToUse, blockSizeToUse, nullptr, nullptr, nodeMemorySharingEnabled);
189
190 return node_player_utils::prepareToPlay (std::move (node), oldGraph, sampleRateToUse, blockSizeToUse,
191 [this] (auto s) -> NodeBuffer
192 {
193 auto data = lastAudioBufferPoolPosted->allocate (s);
194 return { data.getView().getFirstChannels (s.numChannels).getStart (s.numFrames), std::move (data) };
195 },
196 [this] (auto b)
197 {
198 lastAudioBufferPoolPosted->release (std::move (b.data));
199 },
200 nodeMemorySharingEnabled);
201}
202
203//==============================================================================
204void LockFreeMultiThreadedNodePlayer::clearThreads()
205{
206 threadPool->clearThreads();
207}
208
209void LockFreeMultiThreadedNodePlayer::createThreads()
210{
211 threadPool->createThreads (numThreadsToUse.load(), audioWorkgroup);
212}
213
214inline void LockFreeMultiThreadedNodePlayer::pause()
215{
216 #if JUCE_INTEL
217 _mm_pause();
218 _mm_pause();
219 #else
220 __asm__ __volatile__ ("yield");
221 __asm__ __volatile__ ("yield");
222 #endif
223}
224
225//==============================================================================
226void LockFreeMultiThreadedNodePlayer::postNewGraph (std::unique_ptr<NodeGraph> newGraph)
227{
228 if (! newGraph)
229 {
230 clearNode();
231 return;
232 }
233
234 std::stable_sort (newGraph->orderedNodes.begin(), newGraph->orderedNodes.end(),
235 [] (auto n1, auto n2)
236 {
237 return n1->isReadyToProcess() && ! n2->isReadyToProcess();
238 });
239
240 rootNode = newGraph->rootNode.get();
241
242 PreparedNode newPreparedNode;
243 newPreparedNode.graph = std::move (newGraph);
244 newPreparedNode.nodesReadyToBeProcessed = std::make_unique<LockFreeFifo<Node*>> ((int) newPreparedNode.graph->orderedNodes.size());
245 buildNodesOutputLists (newPreparedNode);
246
247 if (useMemoryPool)
248 {
249 const size_t poolCapacity = newPreparedNode.graph->orderedNodes.size();
250 newPreparedNode.audioBufferPool = std::make_unique<AudioBufferPool> (poolCapacity);
251
252 node_player_utils::reserveAudioBufferPool (newPreparedNode.graph->rootNode.get(),
253 newPreparedNode.graph->orderedNodes,
254 *newPreparedNode.audioBufferPool,
255 numThreadsToUse, blockSize);
256 }
257
258 lastGraphPosted = newPreparedNode.graph.get();
259 lastAudioBufferPoolPosted = newPreparedNode.audioBufferPool.get();
260 preparedNodeObject.pushNonRealTime (std::move (newPreparedNode));
261}
262
263//==============================================================================
264void LockFreeMultiThreadedNodePlayer::buildNodesOutputLists (PreparedNode& preparedNode)
265{
266 preparedNode.playbackNodes.clear();
267 preparedNode.playbackNodes.reserve (preparedNode.graph->orderedNodes.size());
268
269 for (auto n : preparedNode.graph->orderedNodes)
270 {
271 #if JUCE_DEBUG
272 for (auto& pn : preparedNode.playbackNodes)
273 jassert (&pn->node != n);
274 #endif
275
276 jassert (std::count (preparedNode.graph->orderedNodes.begin(), preparedNode.graph->orderedNodes.end(), n) == 1);
277
278 preparedNode.playbackNodes.push_back (std::make_unique<PlaybackNode> (*n));
279 n->internal = preparedNode.playbackNodes.back().get();
280 n->numOutputNodes = 0;
281 }
282
283 // Iterate all nodes, for each input, add to the current Nodes output list
284 for (auto node : preparedNode.graph->orderedNodes)
285 {
286 for (auto inputNode : node->getDirectInputNodes())
287 {
288 // Check the input is actually still in the graph
289 jassert (std::find (preparedNode.graph->orderedNodes.begin(), preparedNode.graph->orderedNodes.end(), inputNode) != preparedNode.graph->orderedNodes.end());
290 static_cast<PlaybackNode*> (inputNode->internal)->outputs.push_back (node);
291 ++inputNode->numOutputNodes;
292 }
293 }
294}
295
296void LockFreeMultiThreadedNodePlayer::resetProcessQueue (PreparedNode& preparedNode)
297{
298 // Clear the nodesReadyToBeProcessed list
299 for (;;)
300 {
301 Node* temp;
302
303 if (! preparedNode.nodesReadyToBeProcessed->try_dequeue (temp))
304 break;
305 }
306
307 numNodesQueued.store (0, std::memory_order_release);
308
309 // Reset all the counters
310 // And then move any Nodes that are ready to the correct queue
311 for (auto& playbackNode : preparedNode.playbackNodes)
312 {
313 jassert (playbackNode->hasBeenQueued);
314 playbackNode->hasBeenQueued = false;
315 playbackNode->numInputsToBeProcessed.store (playbackNode->numInputs, std::memory_order_release);
316
317 // Check only ready nodes will be queued
318 #if JUCE_DEBUG
319 if (playbackNode->node.isReadyToProcess())
320 jassert (playbackNode->numInputsToBeProcessed == 0);
321
322 if (playbackNode->numInputsToBeProcessed == 0)
323 jassert (playbackNode->node.isReadyToProcess());
324 #endif
325 }
326
327 #if JUCE_DEBUG
328 for (auto& playbackNode : preparedNode.playbackNodes)
329 {
330 playbackNode->hasBeenDequeued = false;
331 jassert (! playbackNode->hasBeenQueued);
332 jassert (playbackNode->numInputsToBeProcessed == playbackNode->numInputs);
333 jassert (playbackNode->numInputsToBeProcessed.load (std::memory_order_acquire) == playbackNode->numInputs);
334 }
335 #endif
336
337 size_t numNodesJustQueued = 0;
338
339 // Make sure the counters are reset for all nodes before queueing any
340 for (auto& playbackNode : preparedNode.playbackNodes)
341 {
342 if (playbackNode->numInputsToBeProcessed.load (std::memory_order_acquire) == 0)
343 {
344 jassert (! playbackNode->hasBeenQueued);
345 playbackNode->hasBeenQueued = true;
346 preparedNode.nodesReadyToBeProcessed->try_enqueue (&playbackNode->node);
347 ++numNodesJustQueued;
348 }
349 }
350
351 // Make sure this is only incremented after all the nodes have been queued
352 // or the threads will start queueing Nodes at the same time
353 numNodesQueued += numNodesJustQueued;
354
355 threadPool->setCurrentNode (&preparedNode);
356
357 if (int numThreadsToSignal = (int) numNodesQueued.load(); numThreadsToSignal > 1)
358 threadPool->signal (numThreadsToSignal);
359}
360
361Node* LockFreeMultiThreadedNodePlayer::updateProcessQueueForNode (PreparedNode& preparedNode, Node& node)
362{
363 auto playbackNode = static_cast<PlaybackNode*> (node.internal);
364
365 #if RETURN_MID_NODES_OPTIMISATION
366 Node* nodeToReturn = nullptr;
367 #endif
368
369 for (auto output : playbackNode->outputs)
370 {
371 auto outputPlaybackNode = static_cast<PlaybackNode*> (output->internal);
372
373 // fetch_sub returns the previous value so it will now be 0
374 if (outputPlaybackNode->numInputsToBeProcessed.fetch_sub (1, std::memory_order_acq_rel) == 1)
375 {
376 jassert (outputPlaybackNode->node.isReadyToProcess());
377 jassert (! outputPlaybackNode->hasBeenQueued);
378 outputPlaybackNode->hasBeenQueued = true;
379
380 #if RETURN_MID_NODES_OPTIMISATION
381 // We can return one Node to be processed on this thread, otherwise we can
382 // queue it for another thread to possibly process
383 if (nodeToReturn == nullptr)
384 {
385 nodeToReturn = &outputPlaybackNode->node;
386 }
387 else
388 {
389 preparedNode.nodesReadyToBeProcessed->try_enqueue (&outputPlaybackNode->node);
390 numNodesQueued.fetch_add (1, std::memory_order_acq_rel);
391 }
392 #else
393 // If there is only one Node or we're at the last Node we can return this to be processed by the same thread
394 if (playbackNode->outputs.size() == 1
395 || output == playbackNode->outputs.back())
396 return &outputPlaybackNode->node;
397
398 preparedNode.nodesReadyToBeProcessed->try_enqueue (&outputPlaybackNode->node);
399 numNodesQueued.fetch_add (1, std::memory_order_acq_rel);
400 #endif
401 }
402 }
403
404 #if RETURN_MID_NODES_OPTIMISATION
405 return nodeToReturn;
406 #else
407 return nullptr;
408 #endif
409}
410
411//==============================================================================
412bool LockFreeMultiThreadedNodePlayer::processNextFreeNode (PreparedNode& preparedNode)
413{
414 Node* nodeToProcess = nullptr;
415
416 if (numNodesQueued.load (std::memory_order_acquire) == 0)
417 return false;
418
419 if (! preparedNode.nodesReadyToBeProcessed->try_dequeue (nodeToProcess))
420 return false;
421
422 numNodesQueued.fetch_sub (1, std::memory_order_acq_rel);
423
424 assert (nodeToProcess != nullptr);
425 processNode (preparedNode, *nodeToProcess);
426
427 return true;
428}
429
430void LockFreeMultiThreadedNodePlayer::processNode (PreparedNode& preparedNode, Node& node)
431{
432 auto* nodeToProcess = &node;
433
434 // Attempt to process serial Node chains on this thread
435 // to reduce context switches and overhead
436 for (;;)
437 {
438 #if JUCE_DEBUG
439 jassert (! static_cast<PlaybackNode*> (nodeToProcess->internal)->hasBeenDequeued);
440 static_cast<PlaybackNode*> (nodeToProcess->internal)->hasBeenDequeued = true;
441 #endif
442
443 // Process Node
444 nodeToProcess->process (numSamplesToProcess, referenceSampleRange);
445 nodeToProcess = updateProcessQueueForNode (preparedNode, *nodeToProcess);
446
447 if (! nodeToProcess)
448 break;
449 }
450}
451
452}}
assert
choc::buffer::ChannelArrayBuffer< float > allocate(choc::buffer::Size)
Returns an allocated buffer for a given size from the pool.
bool release(choc::buffer::ChannelArrayBuffer< float > &&)
Releases an allocated buffer back to the pool.
void setNode(std::unique_ptr< Node >)
Sets the Node to process.
void enablePooledMemoryAllocations(bool)
Enables or disables the use on an AudioBufferPool to reduce memory consumption.
int process(const Node::ProcessContext &)
Process a block of the Node.
void setNumThreads(size_t)
Sets the number of threads to use for rendering.
void prepareToPlay(double sampleRateToUse, int blockSizeToUse)
Prepares the current Node to be played.
LockFreeMultiThreadedNodePlayer()
Creates an empty LockFreeMultiThreadedNodePlayer.
void clear()
Clears the object and any pending object by assigining them defaultly constructed objects.
ScopedRealTimeAccess getScopedAccess()
Creates a ScopedRealTimeAccess for this LockFreeObject.
void pushNonRealTime(ObjectType &&newObj)
Pushes a new object to be picked up on the real time thread.
Struct to describe a single iteration of a process call.
T count(T... args)
T exchange(T... args)
T fetch_add(T... args)
T fetch_sub(T... args)
T find(T... args)
T get(T... args)
T is_pointer_v
#define jassert(expression)
typedef int
T load(T... args)
T min(T... args)
LockFreeMultiThreadedNodePlayer::ThreadPoolCreator getPoolCreatorFunction(ThreadPoolStrategy poolType)
Returns a function to create a ThreadPool for the given stategy.
T stable_sort(T... args)
T store(T... args)
Holds a view over some data and optionally some storage for that data.
Holds a graph in an order ready for processing and a sorted map for quick lookups.