"""Thread definitions and associated worker classes.
"""
import collections
import time
import uuid
import weakref
from PySide2 import QtCore, QtGui, QtWidgets
from . import workers
from .. import common
[docs]class DataType(object):
"""Used to signify the end of a data type."""
def __repr__(self):
return '<< DataType({}, {}) >>'.format(self.data_type, self.queue)
def __init__(self, q, t):
self.queue = q
self.data_type = t
FileThumbnail = 'FileThumbnail'
FavouriteThumbnail = 'FavouriteThumbnail'
AssetThumbnail = 'AssetThumbnail'
BookmarkThumbnail = 'BookmarkThumbnail'
FileInfo = 'FileInfo'
FileInfo2 = 'FileInfo2'
FileInfo3 = 'FileInfo3'
FavouriteInfo = 'FavouriteInfo'
AssetInfo = 'AssetInfo'
BookmarkInfo = 'BookmarkInfo'
QueuedDatabaseTransaction = 'QueuedDatabaseTransaction'
QueuedSettingsTransaction = 'QueuedSettingsTransaction'
QueuedShotgunQuery = 'QueuedShotgunQuery'
controllers = {}
# Main thread definitions
THREADS = {
BookmarkInfo: {
'queue': collections.deque([], common.max_list_items),
'preload': True,
'data_types': {
common.FileItem: DataType(BookmarkInfo, common.FileItem),
},
'worker': workers.InfoWorker,
'role': common.FileInfoLoaded,
'tab': common.BookmarkTab,
'mutex': QtCore.QMutex()
},
BookmarkThumbnail: {
'queue': collections.deque([], 99),
'preload': False,
'data_types': {
common.FileItem: DataType(BookmarkThumbnail, common.FileItem),
},
'worker': workers.ThumbnailWorker,
'role': common.ThumbnailLoaded,
'tab': common.BookmarkTab,
'mutex': QtCore.QMutex()
},
AssetInfo: {
'queue': collections.deque([], common.max_list_items),
'preload': True,
'data_types': {
common.FileItem: DataType(AssetInfo, common.FileItem),
},
'worker': workers.InfoWorker,
'role': common.FileInfoLoaded,
'tab': common.AssetTab,
'mutex': QtCore.QMutex()
},
AssetThumbnail: {
'queue': collections.deque([], 99),
'preload': False,
'data_types': {
common.FileItem: DataType(AssetThumbnail, common.FileItem),
},
'worker': workers.ThumbnailWorker,
'role': common.ThumbnailLoaded,
'tab': common.AssetTab,
'mutex': QtCore.QMutex()
},
FileInfo: {
'queue': collections.deque([], common.max_list_items),
'preload': True,
'data_types': {
common.FileItem: DataType(FileInfo, common.FileItem),
common.SequenceItem: DataType(FileInfo, common.SequenceItem),
},
'worker': workers.InfoWorker,
'role': common.FileInfoLoaded,
'tab': common.FileTab,
'mutex': QtCore.QMutex()
},
FileInfo2: {
'queue': collections.deque([], common.max_list_items),
'preload': True,
'data_types': {
common.FileItem: DataType(FileInfo, common.FileItem),
common.SequenceItem: DataType(FileInfo, common.SequenceItem),
},
'worker': workers.InfoWorker,
'role': common.FileInfoLoaded,
'tab': common.FileTab,
'mutex': QtCore.QMutex()
},
FileInfo3: {
'queue': collections.deque([], common.max_list_items),
'preload': True,
'data_types': {
common.FileItem: DataType(FileInfo, common.FileItem),
common.SequenceItem: DataType(FileInfo, common.SequenceItem),
},
'worker': workers.InfoWorker,
'role': common.FileInfoLoaded,
'tab': common.FileTab,
'mutex': QtCore.QMutex()
},
FileThumbnail: {
'queue': collections.deque([], 99),
'preload': False,
'data_types': {
common.FileItem: DataType(FileThumbnail, common.FileItem),
common.SequenceItem: DataType(FileThumbnail, common.SequenceItem),
},
'worker': workers.ThumbnailWorker,
'role': common.ThumbnailLoaded,
'tab': common.FileTab,
'mutex': QtCore.QMutex()
},
FavouriteInfo: {
'queue': collections.deque([], common.max_list_items),
'preload': True,
'data_types': {
common.FileItem: DataType(FavouriteInfo, common.FileItem),
common.SequenceItem: DataType(FavouriteInfo, common.SequenceItem),
},
'worker': workers.InfoWorker,
'role': common.FileInfoLoaded,
'tab': common.FavouriteTab,
'mutex': QtCore.QMutex()
},
FavouriteThumbnail: {
'queue': collections.deque([], 99),
'preload': False,
'data_types': {
common.FileItem: DataType(FavouriteThumbnail, common.FileItem),
common.SequenceItem: DataType(FavouriteThumbnail, common.SequenceItem),
},
'worker': workers.ThumbnailWorker,
'role': common.ThumbnailLoaded,
'tab': common.FavouriteTab,
'mutex': QtCore.QMutex()
},
QueuedDatabaseTransaction: {
'queue': collections.deque([], common.max_list_items),
'preload': False,
'data_types': {},
'worker': workers.TransactionsWorker,
'role': None,
'tab': -1,
'mutex': QtCore.QMutex()
},
QueuedShotgunQuery: {
'queue': collections.deque([], common.max_list_items),
'preload': False,
'data_types': {},
'worker': workers.ShotgunWorker,
'role': None,
'tab': -1,
'mutex': QtCore.QMutex()
},
}
[docs]def queue_database_transaction(*args):
"""A utility method used to execute a delayed database transaction.
"""
if args not in queue(QueuedDatabaseTransaction):
queue(QueuedDatabaseTransaction).append(args)
get_thread(QueuedDatabaseTransaction).startTimer.emit()
def queue_shotgun_query(*args):
if args not in queue(QueuedShotgunQuery):
queue(QueuedShotgunQuery).append(args)
get_thread(QueuedShotgunQuery).startTimer.emit()
[docs]def reset_all_queues():
"""Clear all thread queues.
"""
for k in THREADS:
THREADS[k]['queue'].clear()
[docs]def quit_threads():
"""Terminate all running threads.
"""
for k in THREADS:
thread = get_thread(k)
if thread.isRunning():
THREADS[k]['queue'].clear()
thread.quit()
thread.wait()
n = 0
while any([get_thread(k).isRunning() for k in THREADS]):
if n >= 20:
for thread in THREADS.values():
thread.terminate()
break
n += 1
time.sleep(0.3)
[docs]def get_thread(k):
"""Get a cached thread controller instance.
Args:
k (str): Name of the thread controller to return, e.g.
``threads.QueuedShotgunQuery``.
If the controller does not yet exist we will create and cache it.
All threads are associated with worker, defined by `THREADS`.
"""
if k not in THREADS:
raise KeyError('{} is invalid. Must be one of {}'.format(
k, '\n'.join(THREADS.keys())))
if k in controllers:
return controllers[k]
controllers[k] = BaseThread(THREADS[k]['worker'](k))
return controllers[k]
[docs]def queue(k):
"""Returns a queue associated with a thread."""
if k not in THREADS:
raise KeyError('Wrong key')
return THREADS[k]['queue']
[docs]def add_to_queue(k, ref):
"""Adds a wekref item to the worker's queue.
Args:
ref (weakref.ref): A weak reference to a data segment.
end (bool): Add to the end of the queue instead if `True`.
"""
common.check_type(ref, weakref.ref)
if ref not in queue(k) and ref():
queue(k).append(ref)
[docs]class BaseThread(QtCore.QThread):
"""Base QThread controller.
Attributes:
initWorker (QtCore.Signal): Signal emitted when the thread has spun up.
startTimer (QtCore.Signal): Starts the thread's queue timer.
stopTimer (QtCore.Signal): Stops the thread's queue timer.
"""
initWorker = QtCore.Signal()
startTimer = QtCore.Signal()
stopTimer = QtCore.Signal()
def __init__(self, worker, parent=None):
super().__init__(parent=parent)
self.setObjectName(f'{worker.queue}Thread_{uuid.uuid1().hex}')
self.setTerminationEnabled(True)
self.worker = worker
self._connect_signals()
def _connect_signals(self):
if QtCore.QCoreApplication.instance():
QtCore.QCoreApplication.instance().aboutToQuit.connect(self.quit)
if QtGui.QGuiApplication.instance():
QtGui.QGuiApplication.instance().lastWindowClosed.connect(self.quit)
self.started.connect(self.move_worker_to_thread)
[docs] @common.debug
@QtCore.Slot()
def move_worker_to_thread(self):
"""Slot called when the thread is started.
We'll move the worker to the thread and connect all signals needed to
communicate with the worker.
"""
self.worker.moveToThread(self)
cnx = QtCore.Qt.QueuedConnection
self.initWorker.connect(self.worker.initWorker, cnx)
self.initWorker.emit()
if self.worker.thread() == QtWidgets.QApplication.instance().thread():
s = 'Could not move worker to thread.'
raise RuntimeError(s)