#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# mesycontrol - Remote control for mesytec devices.
# Copyright (C) 2015-2016 mesytec GmbH & Co. KG <info@mesytec.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__author__ = 'Florian Lüke'
__email__ = 'f.lueke@mesytec.com'
from qt import QtCore
from qt import QtGui
from qt import pyqtProperty
from qt import pyqtSignal
from functools import wraps
import traceback
import sys
import util
[docs]class IncompleteFuture(RuntimeError):
pass
[docs]class FutureIsDone(RuntimeError):
pass
[docs]class FutureIsRunning(RuntimeError):
pass
[docs]class CancelledError(RuntimeError):
pass
[docs]class Future(object):
def __init__(self):
self._done = False
self._result = None
self._exception = None
self._exception_observed = False
self._running = False
self._cancelled = False
self._callbacks = list()
self._progress_callbacks = list()
self._progress_min = 0
self._progress_max = 100
self._progress = 0
self._progress_text = str()
self.log = util.make_logging_source_adapter(__name__, self)
def __del__(self):
if self._exception is not None and not self._exception_observed:
self.log.error("Unobserved exception in Future: %s %s",
type(self._exception), self._exception)
# ===== Client functionality =====
[docs] def done(self):
return self._done
[docs] def result(self):
if not self.done():
raise IncompleteFuture(self)
if self.cancelled():
raise CancelledError()
if self._exception is not None:
self._exception_observed = True
raise self._exception
return self._result
[docs] def exception(self):
if not self.done():
raise IncompleteFuture(self)
if self.cancelled():
raise CancelledError()
self._exception_observed = True
return self._exception
[docs] def cancel(self):
if self.done() or self.running():
return False
self.log.info("%s: canceled", self)
self._cancelled = True
self._set_done()
return True
[docs] def cancelled(self):
return self._cancelled
[docs] def running(self):
return self._running
[docs] def progress(self):
return self._progress
[docs] def progress_range(self):
return (self._progress_min, self._progress_max)
[docs] def progress_min(self):
return self._progress_min
[docs] def progress_max(self):
return self._progress_max
[docs] def add_done_callback(self, fn, unique=True):
assert fn is not None
if self.done():
self._exec_callback(fn)
elif not unique or (fn not in self._callbacks):
self._callbacks.append(fn)
return self
[docs] def add_progress_callback(self, fn, unique=True):
assert fn is not None
if self.done():
self._exec_callback(fn)
elif not unique or (fn not in self._progress_callbacks):
self._progress_callbacks.append(fn)
return self
# ===== Executor functionality =====
[docs] def set_result(self, result):
if self.done():
raise FutureIsDone(self)
self._result = result
self._set_done()
return self
[docs] def set_exception(self, exception):
if self.done():
raise FutureIsDone(self)
exception.traceback_lines = traceback.format_exception(*sys.exc_info())
self._exception = exception
self._set_done()
return self
[docs] def set_running_or_notify_cancel(self):
"""If the method returns False then the Future was cancelled. Otherwise
the Future will be put in running state and True is returned."""
if self.running():
raise FutureIsRunning()
if self._result is not None or self._exception is not None:
raise FutureIsDone()
if self.cancelled():
return False
self._running = True
return True
[docs] def set_progress(self, progress):
self._progress = progress
for cb in self._progress_callbacks:
self._exec_callback(cb)
[docs] def set_progress_range(self, min_or_tuple, max_or_none=None):
self._progress_min = min_or_tuple[0] if max_or_none is None else min_or_tuple
self._progress_max = min_or_tuple[1] if max_or_none is None else max_or_none
[docs] def progress_text(self):
return self._progress_text
[docs] def set_progress_text(self, txt):
self._progress_text = txt
for cb in self._progress_callbacks:
self._exec_callback(cb)
def _set_done(self):
if self._done:
raise FutureIsDone()
self._done = True
self._running = False
if self.cancelled():
self.log.debug("%s done: canceled", self)
elif self.exception() is not None:
self.log.debug("%s done: %s", self, self.exception())
else:
self.log.debug("%s done: %s", self, self.result())
for cb in self._callbacks:
self._exec_callback(cb)
self._callbacks = list()
self._progress_callbacks = list()
def _exec_callback(self, cb):
try:
cb(self)
except Exception:
self.log.exception("Callback %s raised", cb)
[docs]def all_done(*futures):
"""Returns a future that completes once all of the given futures complete.
The returned futures result will be a list of futures in order of
completion.
"""
ret = Future()
ret.set_progress_range(0, len(futures))
done_futures = list()
def on_future_done(f):
done_futures.append(f)
ret.set_progress(len(done_futures))
if len(done_futures) == len(futures):
ret.set_result(done_futures)
for f in futures:
f.add_done_callback(on_future_done)
if len(futures) == 0:
ret.set_result(list())
return ret
[docs]def progress_forwarder(source, dest):
def callback(f):
dest.set_progress_range(source.progress_range())
dest.set_progress(source.progress())
dest.set_progress_text(source.progress_text())
source.add_progress_callback(callback)
[docs]class FutureObserver(QtCore.QObject):
"""Qt wrapper around a Future object using Qt signals to notify about state
changes."""
done = pyqtSignal()
cancelled = pyqtSignal()
progress_range_changed = pyqtSignal(int, int)
progress_changed = pyqtSignal(int)
progress_text_changed = pyqtSignal(str)
def __init__(self, the_future=None, parent=None):
super(FutureObserver, self).__init__(parent)
self.log = util.make_logging_source_adapter(__name__, self)
self.future = the_future
[docs] def get_future(self):
return self._future
[docs] def set_future(self, the_future):
self._future = the_future
if self.future is not None:
self._progress_range = self.future.progress_range()
self._progress = self.future.progress()
self._progress_text = self.future.progress_text()
self.future.add_done_callback(self._future_done)
self.future.add_progress_callback(self._future_progress)
else:
self._progress_range = (0, 0)
self._progress = 0
self._progress_text = str()
self.progress_range_changed.emit(*self._progress_range)
self.progress_changed.emit(self._progress)
self.progress_text_changed.emit(self._progress_text)
def _future_done(self, f):
self.log.debug("Future %s is done", f)
if f.cancelled():
self.cancelled.emit()
self.done.emit()
def _future_progress(self, f):
self.log.debug("Future %s progress changed r=%s p=%s t=%s",
f, f.progress_range(), f.progress(), f.progress_text())
if self._progress_range != f.progress_range():
self._progress_range = f.progress_range()
self.progress_range_changed.emit(*self._progress_range)
if self._progress != f.progress():
self._progress = f.progress()
self.progress_changed.emit(self._progress)
if self._progress_text != f.progress_text():
self._progress_text = f.progress_text()
self.progress_text_changed.emit(self._progress_text)
future = pyqtProperty(object, get_future, set_future)
[docs]def set_result_on(result_future):
def deco(f):
@wraps(f)
def wrapper(*args, **kwargs):
try:
result_future.set_result(f(*args, **kwargs))
except Exception as e:
result_future.set_exception(e)
return wrapper
return deco
[docs]def set_exception_on(result_future):
def deco(f):
@wraps(f)
def wrapper(*args, **kwargs):
try:
f(*args, **kwargs)
except Exception as e:
result_future.set_exception(e)
return wrapper
return deco
[docs]def future_progress_dialog(cancelable=True):
def deco(func):
@wraps(func)
def wrapper(*args, **kwargs):
f = func(*args, **kwargs)
if f.done():
return
fo = FutureObserver(the_future=f)
pd = QtGui.QProgressDialog()
if not cancelable:
pd.setCancelButton(None)
fo.progress_range_changed.connect(pd.setRange)
fo.progress_changed.connect(pd.setValue)
fo.progress_text_changed.connect(pd.setLabelText)
fo.done.connect(pd.close)
pd.exec_()
return wrapper
return deco
if __name__ == "__main__":
ret = Future()
@set_result_on(ret)
def my_func():
return 42
my_func()
print ret.result()
# ==================
ret = Future()
@set_result_on(ret)
def my_func():
raise ValueError(42)
my_func()
try:
print ret.result()
except Exception as e:
print type(e), e
# ==================
ret = Future()
@set_exception_on(ret)
def my_func():
return 42
my_func()
try:
print ret.result()
except Exception as e:
print type(e), e
# ==================
ret = Future()
@set_exception_on(ret)
def my_func():
raise ValueError(42)
my_func()
try:
print ret.result()
except Exception as e:
print type(e), e