JUCE-7.0.12-0-g4f43011b96 JUCE-7.0.12-0-g4f43011b96
JUCE — C++ application framework with suport for VST, VST3, LV2 audio plug-ins

« « « Anklang Documentation
Loading...
Searching...
No Matches
juce_ThreadPool.cpp
Go to the documentation of this file.
1 /*
2 ==============================================================================
3
4 This file is part of the JUCE library.
5 Copyright (c) 2022 - Raw Material Software Limited
6
7 JUCE is an open source library subject to commercial or open-source
8 licensing.
9
10 The code included in this file is provided under the terms of the ISC license
11 http://www.isc.org/downloads/software-support-policy/isc-license. Permission
12 To use, copy, modify, and/or distribute this software for any purpose with or
13 without fee is hereby granted provided that the above copyright notice and
14 this permission notice appear in all copies.
15
16 JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
17 EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
18 DISCLAIMED.
19
20 ==============================================================================
21*/
22
23namespace juce
24{
25
27{
28 ThreadPoolThread (ThreadPool& p, const Options& options)
29 : Thread { options.threadName, options.threadStackSizeBytes },
30 pool { p }
31 {
32 }
33
34 void run() override
35 {
36 while (! threadShouldExit())
37 {
38 if (! pool.runNextJob (*this))
39 wait (500);
40 }
41 }
42
43 std::atomic<ThreadPoolJob*> currentJob { nullptr };
44
45 ThreadPool& pool;
46
48};
49
50//==============================================================================
51ThreadPoolJob::ThreadPoolJob (const String& name) : jobName (name)
52{
53}
54
56{
57 // you mustn't delete a job while it's still in a pool! Use ThreadPool::removeJob()
58 // to remove it first!
59 jassert (pool == nullptr || ! pool->contains (this));
60}
61
63{
64 return jobName;
65}
66
67void ThreadPoolJob::setJobName (const String& newName)
68{
69 jobName = newName;
70}
71
73{
74 shouldStop = true;
75 listeners.call ([] (Thread::Listener& l) { l.exitSignalSent(); });
76}
77
79{
80 listeners.add (listener);
81}
82
84{
85 listeners.remove (listener);
86}
87
89{
90 if (auto* t = dynamic_cast<ThreadPool::ThreadPoolThread*> (Thread::getCurrentThread()))
91 return t->currentJob.load();
92
93 return nullptr;
94}
95
96//==============================================================================
98{
99 // not much point having a pool without any threads!
100 jassert (options.numberOfThreads > 0);
101
102 for (int i = jmax (1, options.numberOfThreads); --i >= 0;)
103 threads.add (new ThreadPoolThread (*this, options));
104
105 for (auto* t : threads)
106 t->startThread (options.desiredThreadPriority);
107}
108
109ThreadPool::ThreadPool (int numberOfThreads,
110 size_t threadStackSizeBytes,
111 Thread::Priority desiredThreadPriority)
112 : ThreadPool { Options{}.withNumberOfThreads (numberOfThreads)
113 .withThreadStackSizeBytes (threadStackSizeBytes)
114 .withDesiredThreadPriority (desiredThreadPriority) }
115{
116}
117
119{
120 removeAllJobs (true, 5000);
121 stopThreads();
122}
123
124void ThreadPool::stopThreads()
125{
126 for (auto* t : threads)
127 t->signalThreadShouldExit();
128
129 for (auto* t : threads)
130 t->stopThread (500);
131}
132
134{
135 jassert (job != nullptr);
136 jassert (job->pool == nullptr);
137
138 if (job->pool == nullptr)
139 {
140 job->pool = this;
141 job->shouldStop = false;
142 job->isActive = false;
143 job->shouldBeDeleted = deleteJobWhenFinished;
144
145 {
146 const ScopedLock sl (lock);
147 jobs.add (job);
148 }
149
150 for (auto* t : threads)
151 t->notify();
152 }
153}
154
156{
157 struct LambdaJobWrapper final : public ThreadPoolJob
158 {
159 LambdaJobWrapper (std::function<ThreadPoolJob::JobStatus()> j) : ThreadPoolJob ("lambda"), job (j) {}
160 JobStatus runJob() override { return job(); }
161
163 };
164
165 addJob (new LambdaJobWrapper (jobToRun), true);
166}
167
169{
170 struct LambdaJobWrapper final : public ThreadPoolJob
171 {
172 LambdaJobWrapper (std::function<void()> j) : ThreadPoolJob ("lambda"), job (std::move (j)) {}
173 JobStatus runJob() override { job(); return ThreadPoolJob::jobHasFinished; }
174
175 std::function<void()> job;
176 };
177
178 addJob (new LambdaJobWrapper (std::move (jobToRun)), true);
179}
180
182{
183 const ScopedLock sl (lock);
184 return jobs.size();
185}
186
188{
189 return threads.size();
190}
191
192ThreadPoolJob* ThreadPool::getJob (int index) const noexcept
193{
194 const ScopedLock sl (lock);
195 return jobs [index];
196}
197
198bool ThreadPool::contains (const ThreadPoolJob* job) const noexcept
199{
200 const ScopedLock sl (lock);
201 return jobs.contains (const_cast<ThreadPoolJob*> (job));
202}
203
204bool ThreadPool::isJobRunning (const ThreadPoolJob* job) const noexcept
205{
206 const ScopedLock sl (lock);
207 return jobs.contains (const_cast<ThreadPoolJob*> (job)) && job->isActive;
208}
209
211{
212 const ScopedLock sl (lock);
213
214 auto index = jobs.indexOf (const_cast<ThreadPoolJob*> (job));
215
216 if (index > 0 && ! job->isActive)
217 jobs.move (index, 0);
218}
219
220bool ThreadPool::waitForJobToFinish (const ThreadPoolJob* job, int timeOutMs) const
221{
222 if (job != nullptr)
223 {
224 auto start = Time::getMillisecondCounter();
225
226 while (contains (job))
227 {
228 if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
229 return false;
230
231 jobFinishedSignal.wait (2);
232 }
233 }
234
235 return true;
236}
237
239{
240 bool dontWait = true;
242
243 if (job != nullptr)
244 {
245 const ScopedLock sl (lock);
246
247 if (jobs.contains (job))
248 {
249 if (job->isActive)
250 {
252 job->signalJobShouldExit();
253
254 dontWait = false;
255 }
256 else
257 {
258 jobs.removeFirstMatchingValue (job);
259 addToDeleteList (deletionList, job);
260 }
261 }
262 }
263
264 return dontWait || waitForJobToFinish (job, timeOutMs);
265}
266
269{
271
272 {
274
275 {
276 const ScopedLock sl (lock);
277
278 for (int i = jobs.size(); --i >= 0;)
279 {
280 auto* job = jobs.getUnchecked (i);
281
282 if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job))
283 {
284 if (job->isActive)
285 {
286 jobsToWaitFor.add (job);
287
289 job->signalJobShouldExit();
290 }
291 else
292 {
293 jobs.remove (i);
294 addToDeleteList (deletionList, job);
295 }
296 }
297 }
298 }
299 }
300
301 auto start = Time::getMillisecondCounter();
302
303 for (;;)
304 {
305 for (int i = jobsToWaitFor.size(); --i >= 0;)
306 {
307 auto* job = jobsToWaitFor.getUnchecked (i);
308
309 if (! isJobRunning (job))
310 jobsToWaitFor.remove (i);
311 }
312
313 if (jobsToWaitFor.size() == 0)
314 break;
315
316 if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
317 return false;
318
319 jobFinishedSignal.wait (20);
320 }
321
322 return true;
323}
324
326{
327 StringArray s;
328 const ScopedLock sl (lock);
329
330 for (auto* job : jobs)
331 if (job->isActive || ! onlyReturnActiveJobs)
332 s.add (job->getJobName());
333
334 return s;
335}
336
337ThreadPoolJob* ThreadPool::pickNextJobToRun()
338{
340
341 {
342 const ScopedLock sl (lock);
343
344 for (int i = 0; i < jobs.size(); ++i)
345 {
346 if (auto* job = jobs[i])
347 {
348 if (! job->isActive)
349 {
350 if (job->shouldStop)
351 {
352 jobs.remove (i);
353 addToDeleteList (deletionList, job);
354 --i;
355 continue;
356 }
357
358 job->isActive = true;
359 return job;
360 }
361 }
362 }
363 }
364
365 return nullptr;
366}
367
368bool ThreadPool::runNextJob (ThreadPoolThread& thread)
369{
370 if (auto* job = pickNextJobToRun())
371 {
372 auto result = ThreadPoolJob::jobHasFinished;
373 thread.currentJob = job;
374
375 try
376 {
377 result = job->runJob();
378 }
379 catch (...)
380 {
381 jassertfalse; // Your runJob() method mustn't throw any exceptions!
382 }
383
384 thread.currentJob = nullptr;
385
387
388 {
389 const ScopedLock sl (lock);
390
391 if (jobs.contains (job))
392 {
393 job->isActive = false;
394
395 if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop)
396 {
397 jobs.removeFirstMatchingValue (job);
398 addToDeleteList (deletionList, job);
399
400 jobFinishedSignal.signal();
401 }
402 else
403 {
404 // move the job to the end of the queue if it wants another go
405 jobs.move (jobs.indexOf (job), -1);
406 }
407 }
408 }
409
410 return true;
411 }
412
413 return false;
414}
415
416void ThreadPool::addToDeleteList (OwnedArray<ThreadPoolJob>& deletionList, ThreadPoolJob* job) const
417{
418 job->shouldStop = true;
419 job->pool = nullptr;
420
421 if (job->shouldBeDeleted)
422 deletionList.add (job);
423}
424
425} // namespace juce
Holds a resizable array of primitive or copy-by-value objects.
Definition juce_Array.h:56
Automatically locks and unlocks a mutex object.
An array designed for holding objects.
A special array for holding a list of strings.
void add(String stringToAdd)
Appends a string at the end of the array.
The JUCE String class!
Definition juce_String.h:53
A task that is executed by a ThreadPool object.
void signalJobShouldExit()
Calling this will cause the shouldExit() method to return true, and the job should (if it's been impl...
JobStatus
These are the values that can be returned by the runJob() method.
@ jobHasFinished
indicates that the job has finished and can be removed from the pool.
@ jobNeedsRunningAgain
indicates that the job would like to be called again when a thread is free.
void setJobName(const String &newName)
Changes the job's name.
String getJobName() const
Returns the name of this job.
void addListener(Thread::Listener *)
Add a listener to this thread job which will receive a callback when signalJobShouldExit was called o...
virtual ~ThreadPoolJob()
Destructor.
static ThreadPoolJob * getCurrentThreadPoolJob()
If the calling thread is being invoked inside a runJob() method, this will return the ThreadPoolJob t...
void removeListener(Thread::Listener *)
Removes a listener added with addListener.
ThreadPoolJob(const String &name)
Creates a thread pool job object.
A callback class used when you need to select which ThreadPoolJob objects are suitable for some kind ...
A set of threads that will run a list of jobs.
void moveJobToFront(const ThreadPoolJob *jobToMove) noexcept
If the given job is in the queue, this will move it to the front so that it is the next one to be exe...
void addJob(ThreadPoolJob *job, bool deleteJobWhenFinished)
Adds a job to the queue.
int getNumThreads() const noexcept
Returns the number of threads assigned to this thread pool.
ThreadPoolJob * getJob(int index) const noexcept
Returns one of the jobs in the queue.
int getNumJobs() const noexcept
Returns the number of jobs currently running or queued.
bool removeAllJobs(bool interruptRunningJobs, int timeOutMilliseconds, JobSelector *selectedJobsToRemove=nullptr)
Tries to remove all jobs from the pool.
~ThreadPool()
Destructor.
StringArray getNamesOfAllJobs(bool onlyReturnActiveJobs) const
Returns a list of the names of all the jobs currently running or queued.
bool isJobRunning(const ThreadPoolJob *job) const noexcept
Returns true if the given job is currently being run by a thread.
bool removeJob(ThreadPoolJob *job, bool interruptIfRunning, int timeOutMilliseconds)
Tries to remove a job from the pool.
bool contains(const ThreadPoolJob *job) const noexcept
Returns true if the given job is currently queued or running.
bool waitForJobToFinish(const ThreadPoolJob *job, int timeOutMilliseconds) const
Waits until a job has finished running and has been removed from the pool.
ThreadPool()
Creates a thread pool based using the default arguments provided by ThreadPoolOptions.
Used to receive callbacks for thread exit calls.
Encapsulates a thread.
Definition juce_Thread.h:43
static Thread *JUCE_CALLTYPE getCurrentThread()
Finds the thread object that is currently running.
bool wait(double timeOutMilliseconds) const
Suspends the execution of this thread until either the specified timeout period has elapsed,...
bool threadShouldExit() const
Checks whether the thread has been told to stop running.
Priority
The different runtime priorities of non-realtime threads.
Definition juce_Thread.h:54
static uint32 getMillisecondCounter() noexcept
Returns the number of millisecs since a fixed event (usually system startup).
void signal() const
Wakes up any threads that are currently waiting on this object.
bool wait(double timeOutMilliseconds=-1.0) const
Suspends the calling thread until the event has been signalled.
#define jassert(expression)
Platform-independent assertion macro.
#define JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR(className)
This is a shorthand way of writing both a JUCE_DECLARE_NON_COPYABLE and JUCE_LEAK_DETECTOR macro for ...
#define jassertfalse
This will always cause an assertion failure.
JUCE Namespace.
CriticalSection::ScopedLockType ScopedLock
Automatically locks and unlocks a CriticalSection object.
constexpr Type jmax(Type a, Type b)
Returns the larger of two values.
Type unalignedPointerCast(void *ptr) noexcept
Casts a pointer to another type via void*, which suppresses the cast-align warning which sometimes ar...
Definition juce_Memory.h:88
unsigned int uint32
A platform-independent 32-bit unsigned integer type.
A set of threads that will run a list of jobs.
void run() override
Must be implemented to perform the thread's actual code.