Qt 6.x
The Qt SDK
Loading...
Searching...
No Matches
qthreadpool.cpp
Go to the documentation of this file.
1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4#include "qthreadpool.h"
5#include "qthreadpool_p.h"
6#include "qdeadlinetimer.h"
7#include "qcoreapplication.h"
8
9#include <algorithm>
10#include <memory>
11
13
14using namespace Qt::StringLiterals;
15
16/*
17 QThread wrapper, provides synchronization against a ThreadPool
18*/
20{
22public:
24 void run() override;
26
30};
31
32/*
33 QThreadPool private class.
34*/
35
36
41 :manager(manager), runnable(nullptr)
42{
44}
45
46/*
47 \internal
48*/
50{
51 QMutexLocker locker(&manager->mutex);
52 for(;;) {
54 runnable = nullptr;
55
56 do {
57 if (r) {
58 // If autoDelete() is false, r might already be deleted after run(), so check status now.
59 const bool del = r->autoDelete();
60
61 // run the task
62 locker.unlock();
63#ifndef QT_NO_EXCEPTIONS
64 try {
65#endif
66 r->run();
67#ifndef QT_NO_EXCEPTIONS
68 } catch (...) {
69 qWarning("Qt Concurrent has caught an exception thrown from a worker thread.\n"
70 "This is not supported, exceptions thrown in worker threads must be\n"
71 "caught before control returns to Qt Concurrent.");
73 throw;
74 }
75#endif
76
77 if (del)
78 delete r;
79 locker.relock();
80 }
81
82 // if too many threads are active, stop working in this one
84 break;
85
86 // all work is done, time to wait for more
87 if (manager->queue.isEmpty())
88 break;
89
91 r = page->pop();
92
93 if (page->isFinished()) {
95 delete page;
96 }
97 } while (true);
98
99 // this thread is about to be deleted, do not wait or expire
100 if (!manager->allThreads.contains(this)) {
102 return;
103 }
104
105 // if too many threads are active, expire this thread
109 return;
110 }
113 // wait for work, exiting after the expiry timeout is reached
115 // this thread is about to be deleted, do not work or expire
116 if (!manager->allThreads.contains(this)) {
118 return;
119 }
120 if (manager->waitingThreads.removeOne(this)) {
122 return;
123 }
125 }
126}
127
129{
130 if (--manager->activeThreads == 0)
132}
133
134
135/*
136 \internal
137*/
139{ }
140
142{
143 Q_ASSERT(task != nullptr);
144 if (allThreads.isEmpty()) {
145 // always create at least one thread
147 return true;
148 }
149
150 // can't do anything if we're over the limit
152 return false;
153
154 if (!waitingThreads.isEmpty()) {
155 // recycle an available thread
157 waitingThreads.takeFirst()->runnableReady.wakeOne();
158 return true;
159 }
160
161 if (!expiredThreads.isEmpty()) {
162 // restart an expired thread
164 Q_ASSERT(thread->runnable == nullptr);
165
167
168 thread->runnable = task;
169
170 // Ensure that the thread has actually finished, otherwise the following
171 // start() has no effect.
172 thread->wait();
173 Q_ASSERT(thread->isFinished());
174 thread->start(threadPriority);
175 return true;
176 }
177
178 // start a new thread
180 return true;
181}
182
183inline bool comparePriority(int priority, const QueuePage *p)
184{
185 return p->priority() < priority;
186}
187
188void QThreadPoolPrivate::enqueueTask(QRunnable *runnable, int priority)
189{
190 Q_ASSERT(runnable != nullptr);
191 for (QueuePage *page : std::as_const(queue)) {
192 if (page->priority() == priority && !page->isFull()) {
193 page->push(runnable);
194 return;
195 }
196 }
197 auto it = std::upper_bound(queue.constBegin(), queue.constEnd(), priority, comparePriority);
198 queue.insert(std::distance(queue.constBegin(), it), new QueuePage(runnable, priority));
199}
200
202{
203 return (allThreads.size()
207}
208
210{
211 // try to push tasks on the queue to any available threads
212 while (!queue.isEmpty()) {
214 if (!tryStart(page->first()))
215 break;
216
217 page->pop();
218
219 if (page->isFinished()) {
221 delete page;
222 }
223 }
224}
225
227{
228 const int activeThreadCount = this->activeThreadCount();
229 return activeThreadCount >= maxThreadCount() && (activeThreadCount - reservedThreads) >= 1;
230}
231
233{
234 const int activeThreadCount = this->activeThreadCount();
235 return activeThreadCount > maxThreadCount() && (activeThreadCount - reservedThreads) > 1;
236}
237
242{
243 Q_ASSERT(runnable != nullptr);
244 auto thread = std::make_unique<QThreadPoolThread>(this);
245 if (objectName.isEmpty())
246 objectName = u"Thread (pooled)"_s;
247 thread->setObjectName(objectName);
248 Q_ASSERT(!allThreads.contains(thread.get())); // if this assert hits, we have an ABA problem (deleted threads don't get removed here)
249 allThreads.insert(thread.get());
251
252 thread->runnable = runnable;
253 thread.release()->start(threadPriority);
254}
255
264{
265 // move the contents of the set out so that we can iterate without the lock
266 auto allThreadsCopy = std::exchange(allThreads, {});
269
270 mutex.unlock();
271
272 for (QThreadPoolThread *thread : std::as_const(allThreadsCopy)) {
273 if (thread->isRunning()) {
274 thread->runnableReady.wakeAll();
275 thread->wait();
276 }
277 delete thread;
278 }
279
280 mutex.lock();
281}
282
289{
290 while (!(queue.isEmpty() && activeThreads == 0) && !timer.hasExpired())
292
293 return queue.isEmpty() && activeThreads == 0;
294}
295
297{
298 QMutexLocker locker(&mutex);
299 QDeadlineTimer timer(msecs);
300 if (!waitForDone(timer))
301 return false;
302 reset();
303 // New jobs might have started during reset, but return anyway
304 // as the active thread and task count did reach 0 once, and
305 // race conditions are outside our scope.
306 return true;
307}
308
310{
311 QMutexLocker locker(&mutex);
312 while (!queue.isEmpty()) {
313 auto *page = queue.takeLast();
314 while (!page->isFinished()) {
315 QRunnable *r = page->pop();
316 if (r && r->autoDelete()) {
317 locker.unlock();
318 delete r;
319 locker.relock();
320 }
321 }
322 delete page;
323 }
324}
325
344{
345 Q_D(QThreadPool);
346
347 if (runnable == nullptr)
348 return false;
349
350 QMutexLocker locker(&d->mutex);
351 for (QueuePage *page : std::as_const(d->queue)) {
352 if (page->tryTake(runnable)) {
353 if (page->isFinished()) {
354 d->queue.removeOne(page);
355 delete page;
356 }
357 return true;
358 }
359 }
360
361 return false;
362}
363
371{
372 Q_Q(QThreadPool);
373 if (!q->tryTake(runnable))
374 return;
375 // If autoDelete() is false, runnable might already be deleted after run(), so check status now.
376 const bool del = runnable->autoDelete();
377
378 runnable->run();
379
380 if (del)
381 delete runnable;
382}
383
442{
443 Q_D(QThreadPool);
444 connect(this, &QObject::objectNameChanged, this, [d](const QString &newName) {
445 // We keep a copy of the name under our own lock, so we can access it thread-safely.
446 QMutexLocker locker(&d->mutex);
447 d->objectName = newName;
448 });
449}
450
456{
457 Q_D(QThreadPool);
458 waitForDone();
459 Q_ASSERT(d->queue.isEmpty());
460 Q_ASSERT(d->allThreads.isEmpty());
461}
462
467{
468 Q_CONSTINIT static QPointer<QThreadPool> theInstance;
469 Q_CONSTINIT static QBasicMutex theMutex;
470
471 const QMutexLocker locker(&theMutex);
472 if (theInstance.isNull() && !QCoreApplication::closingDown())
473 theInstance = new QThreadPool();
474 return theInstance;
475}
476
482{
483 Q_CONSTINIT static QPointer<QThreadPool> guiInstance;
484 Q_CONSTINIT static QBasicMutex theMutex;
485
486 const QMutexLocker locker(&theMutex);
487 if (guiInstance.isNull() && !QCoreApplication::closingDown())
488 guiInstance = new QThreadPool();
489 return guiInstance;
490}
491
507void QThreadPool::start(QRunnable *runnable, int priority)
508{
509 if (!runnable)
510 return;
511
512 Q_D(QThreadPool);
513 QMutexLocker locker(&d->mutex);
514
515 if (!d->tryStart(runnable))
516 d->enqueueTask(runnable, priority);
517}
518
553{
554 if (!runnable)
555 return false;
556
557 Q_D(QThreadPool);
558 QMutexLocker locker(&d->mutex);
559 if (d->tryStart(runnable))
560 return true;
561
562 return false;
563}
564
598{
599 Q_D(const QThreadPool);
600 QMutexLocker locker(&d->mutex);
601 return d->expiryTimeout;
602}
603
604void QThreadPool::setExpiryTimeout(int expiryTimeout)
605{
606 Q_D(QThreadPool);
607 QMutexLocker locker(&d->mutex);
608 if (d->expiryTimeout == expiryTimeout)
609 return;
610 d->expiryTimeout = expiryTimeout;
611}
612
626{
627 Q_D(const QThreadPool);
628 QMutexLocker locker(&d->mutex);
629 return d->requestedMaxThreadCount;
630}
631
632void QThreadPool::setMaxThreadCount(int maxThreadCount)
633{
634 Q_D(QThreadPool);
635 QMutexLocker locker(&d->mutex);
636
637 if (maxThreadCount == d->requestedMaxThreadCount)
638 return;
639
640 d->requestedMaxThreadCount = maxThreadCount;
641 d->tryToStartMoreThreads();
642}
643
655{
656 Q_D(const QThreadPool);
657 QMutexLocker locker(&d->mutex);
658 return d->activeThreadCount();
659}
660
677{
678 Q_D(QThreadPool);
679 QMutexLocker locker(&d->mutex);
680 ++d->reservedThreads;
681}
682
696{
697 Q_D(QThreadPool);
698 QMutexLocker locker(&d->mutex);
699 d->stackSize = stackSize;
700}
701
703{
704 Q_D(const QThreadPool);
705 QMutexLocker locker(&d->mutex);
706 return d->stackSize;
707}
708
724{
725 Q_D(QThreadPool);
726 QMutexLocker locker(&d->mutex);
727 d->threadPriority = priority;
728}
729
731{
732 Q_D(const QThreadPool);
733 QMutexLocker locker(&d->mutex);
734 return d->threadPriority;
735}
736
750{
751 Q_D(QThreadPool);
752 QMutexLocker locker(&d->mutex);
753 --d->reservedThreads;
754 d->tryToStartMoreThreads();
755}
756
777{
778 if (!runnable)
779 return releaseThread();
780
781 Q_D(QThreadPool);
782 QMutexLocker locker(&d->mutex);
783 Q_ASSERT(d->reservedThreads > 0);
784 --d->reservedThreads;
785
786 if (!d->tryStart(runnable)) {
787 // This can only happen if we reserved max threads,
788 // and something took the one minimum thread.
789 d->enqueueTask(runnable, INT_MAX);
790 }
791}
792
815{
816 Q_D(QThreadPool);
817 return d->waitForDone(msecs);
818}
819
830{
831 Q_D(QThreadPool);
832 d->clear();
833}
834
840bool QThreadPool::contains(const QThread *thread) const
841{
842 Q_D(const QThreadPool);
843 const QThreadPoolThread *poolThread = qobject_cast<const QThreadPoolThread *>(thread);
844 if (!poolThread)
845 return false;
846 QMutexLocker locker(&d->mutex);
847 return d->allThreads.contains(const_cast<QThreadPoolThread *>(poolThread));
848}
849
851
852#include "moc_qthreadpool.cpp"
853#include "qthreadpool.moc"
QByteArray first(qsizetype n) const
Definition qbytearray.h:159
static bool closingDown()
Returns true if the application objects are being destroyed; otherwise returns false.
\inmodule QtCore
qsizetype size() const noexcept
Definition qlist.h:386
void removeFirst() noexcept
Definition qlist.h:800
bool isEmpty() const noexcept
Definition qlist.h:390
T & first()
Definition qlist.h:628
iterator insert(qsizetype i, parameter_type t)
Definition qlist.h:471
bool removeOne(const AT &t)
Definition qlist.h:581
value_type takeFirst()
Definition qlist.h:549
const_iterator constBegin() const noexcept
Definition qlist.h:615
value_type takeLast()
Definition qlist.h:550
const_iterator constEnd() const noexcept
Definition qlist.h:616
void clear()
Definition qlist.h:417
\inmodule QtCore
Definition qmutex.h:317
void unlock() noexcept
Unlocks this mutex locker.
Definition qmutex.h:323
void relock() noexcept
Relocks an unlocked mutex locker.
Definition qmutex.h:324
Mutex * mutex() const noexcept
Returns the mutex on which the QMutexLocker is operating.
Definition qmutex.h:325
\inmodule QtCore
Definition qmutex.h:285
void unlock() noexcept
Unlocks the mutex.
Definition qmutex.h:293
void lock() noexcept
Locks the mutex.
Definition qmutex.h:290
\inmodule QtCore
Definition qobject.h:90
static QMetaObject::Connection connect(const QObject *sender, const char *signal, const QObject *receiver, const char *member, Qt::ConnectionType=Qt::AutoConnection)
\threadsafe
Definition qobject.cpp:2823
QThread * thread() const
Returns the thread in which the object lives.
Definition qobject.cpp:1561
void objectNameChanged(const QString &objectName, QPrivateSignal)
This signal is emitted after the object's name has been changed.
\inmodule QtCore
Definition qpointer.h:18
void enqueue(const T &t)
Adds value t to the tail of the queue.
Definition qqueue.h:18
T dequeue()
Removes the head item in the queue and returns it.
Definition qqueue.h:19
\inmodule QtCore
Definition qrunnable.h:18
bool autoDelete() const
Returns true is auto-deletion is enabled; false otherwise.
Definition qrunnable.h:37
virtual void run()=0
Implement this pure virtual function in your subclass.
qsizetype size() const
Definition qset.h:50
bool isEmpty() const
Definition qset.h:52
bool contains(const T &value) const
Definition qset.h:71
iterator insert(const T &value)
Definition qset.h:155
\macro QT_RESTRICTED_CAST_FROM_ASCII
Definition qstring.h:127
bool isEmpty() const
Returns true if the string has no characters; otherwise returns false.
Definition qstring.h:1083
bool areAllThreadsActive() const
bool waitForDone(int msecs)
bool tryStart(QRunnable *task)
void enqueueTask(QRunnable *task, int priority=0)
QList< QueuePage * > queue
QThread::Priority threadPriority
QWaitCondition noActiveThreads
QQueue< QThreadPoolThread * > expiredThreads
int activeThreadCount() const
int maxThreadCount() const
QSet< QThreadPoolThread * > allThreads
void startThread(QRunnable *runnable=nullptr)
void stealAndRunRunnable(QRunnable *runnable)
QQueue< QThreadPoolThread * > waitingThreads
static QThreadPool * qtGuiInstance()
Returns the QThreadPool instance for Qt Gui.
bool tooManyThreadsActive() const
QThreadPoolThread(QThreadPoolPrivate *manager)
void run() override
QRunnable * runnable
QThreadPoolPrivate * manager
void registerThreadInactive()
QWaitCondition runnableReady
\inmodule QtCore
Definition qthreadpool.h:20
void reserveThread()
Reserves one thread, disregarding activeThreadCount() and maxThreadCount().
void start(QRunnable *runnable, int priority=0)
Reserves a thread and uses it to run runnable, unless this thread will make the current thread count ...
static QThreadPool * globalInstance()
Returns the global QThreadPool instance.
void setExpiryTimeout(int expiryTimeout)
int activeThreadCount
the number of active threads in the thread pool.
Definition qthreadpool.h:25
void setMaxThreadCount(int maxThreadCount)
void startOnReservedThread(QRunnable *runnable)
Releases a thread previously reserved with reserveThread() and uses it to run runnable.
bool tryTake(QRunnable *runnable)
void setThreadPriority(QThread::Priority priority)
QThreadPool(QObject *parent=nullptr)
Constructs a thread pool with the given parent.
int maxThreadCount
the maximum number of threads used by the thread pool.
Definition qthreadpool.h:24
uint stackSize
the stack size for the thread pool worker threads.
Definition qthreadpool.h:26
void setStackSize(uint stackSize)
~QThreadPool()
Destroys the QThreadPool.
bool contains(const QThread *thread) const
bool tryStart(QRunnable *runnable)
Attempts to reserve a thread to run runnable.
void releaseThread()
Releases a thread previously reserved by a call to reserveThread().
int expiryTimeout
the thread expiry timeout value in milliseconds.
Definition qthreadpool.h:23
QThread::Priority threadPriority
the thread priority for new worker threads.
Definition qthreadpool.h:27
bool waitForDone(int msecs=-1)
Waits up to msecs milliseconds for all threads to exit and removes all threads from the thread pool.
void start(Priority=InheritPriority)
Definition qthread.cpp:923
bool isFinished() const
Definition qthread.cpp:981
bool wait(QDeadlineTimer deadline=QDeadlineTimer(QDeadlineTimer::Forever))
Definition qthread.cpp:950
void setStackSize(uint stackSize)
Definition qthread.cpp:1046
bool wait(QMutex *, QDeadlineTimer=QDeadlineTimer(QDeadlineTimer::Forever))
QSet< QString >::iterator it
Combined button and popup list for selecting options.
#define qWarning
Definition qlogging.h:162
GLboolean r
[2]
GLdouble GLdouble GLdouble GLdouble q
Definition qopenglext.h:259
GLfloat GLfloat p
[1]
#define Q_ASSERT(cond)
Definition qrandom.cpp:47
bool comparePriority(int priority, const QueuePage *p)
#define Q_OBJECT
unsigned int uint
Definition qtypes.h:29
QObject::connect nullptr
QByteArray page
[45]
QTimer * timer
[3]
QNetworkAccessManager manager
IUIAutomationTreeWalker __RPC__deref_out_opt IUIAutomationElement ** parent