#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# mesycontrol - Remote control for mesytec devices.
# Copyright (C) 2015-2016 mesytec GmbH & Co. KG <info@mesytec.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__author__ = 'Florian Lüke'
__email__ = 'f.lueke@mesytec.com'
from qt import QtCore
from qt import pyqtSignal
from functools import partial
import collections
import weakref
import sys
import util
from future import Future
QProcess = QtCore.QProcess
BASE_PORT = 23000 #: The default port to listen on
MAX_PORT = 65535 #: The maximum port number.
# The mesycontrol_server exit codes.
EXIT_CODES = {
0: "exit_success",
1: "exit_options_error",
2: "exit_address_in_use",
3: "exit_address_not_available",
4: "exit_permission_denied",
5: "exit_bad_listen_address",
127: "exit_unknown_error"
}
# How long to wait after process startup before checking if the process is
# still running. The server might exit right away if its listening port is in
# use.
STARTUP_DELAY_MS = 500
[docs]class ServerError(Exception):
pass
[docs]class ServerIsStarting(ServerError):
pass
[docs]class ServerIsRunning(ServerError):
pass
[docs]class ServerIsStopping(ServerError):
pass
[docs]class ServerIsStopped(ServerError):
pass
[docs]def get_exit_code_string(exit_code):
return EXIT_CODES.get(exit_code, "exit_unknown_error")
# Local listen ports and server startup:
# TODO: move the code that restart the server process with a different local
# port into ServerProcess itself and make it transparent for the user of the
# class. Meaning the classes user should not see started(), stopped(), error()
# or finished() signals while the ServerProcess is still trying to find a
# working listen port.
# Recording of used ports is wonky at best: it does not take the listen address
# into account at all. If it would then addresses like 0.0.0.0 would need
# special treatment as they won't work if e.g. the port is in use by a process
# listening only on 127.0.0.1. 0.0.0.0 needs the port on all interfaces...
# Just scrap the code recording used ports and start the server until a free
# base port for the address is found or we run out of ports.
[docs]class ServerProcess(QtCore.QObject):
started = pyqtSignal()
stopped = pyqtSignal()
error = pyqtSignal(QProcess.ProcessError, str, int, str)
finished = pyqtSignal(QProcess.ExitStatus, int, str) #: exit_status, exit_code, exit_code_string
output = pyqtSignal(str)
exit_codes = {
0: "exit_success",
1: "exit_options_error",
2: "exit_address_in_use",
3: "exit_address_not_available",
4: "exit_permission_denied",
5: "exit_bad_listen_address",
127: "exit_unknown_error"
}
startup_delay_ms = 200
def __init__(self, binary='mesycontrol_server', listen_address='127.0.0.1', listen_port=BASE_PORT,
serial_port=None, baud_rate=0, tcp_host=None, tcp_port=4001, verbosity=0,
output_buffer_maxlen=10000, parent=None):
super(ServerProcess, self).__init__(parent)
self.log = util.make_logging_source_adapter(__name__, self)
self.binary = binary
self.listen_address = listen_address
self.listen_port = listen_port
self.serial_port = serial_port
self.baud_rate = baud_rate
self.tcp_host = tcp_host
self.tcp_port = tcp_port
self.verbosity = verbosity
self.process = QProcess()
self.process.setProcessChannelMode(QProcess.MergedChannels)
self.process.error.connect(self._error)
self.process.finished.connect(self._finished)
self.process.readyReadStandardOutput.connect(self._output)
self._startup_delay_timer = QtCore.QTimer()
self._startup_delay_timer.setSingleShot(True)
self._startup_delay_timer.setInterval(ServerProcess.startup_delay_ms)
self.output_buffer = collections.deque(maxlen=output_buffer_maxlen)
[docs] def start(self):
# Startup procedure:
# - start the server process and wait for it to emit started() or error()
# - on error: set result to ServerError
# - on started: start a timer waiting for startup_delay_ms
# - on timeout: if the process is still running: set result to True
# else set result to ServerError
ret = Future()
try:
if self.process.state() != QProcess.NotRunning:
raise ServerIsRunning()
args = self._prepare_args()
program = util.which(self.binary)
if program is None:
raise ServerError("Could not find server binary '%s'" % self.binary)
cmd_line = "%s %s" % (program, " ".join(args))
def dc():
self.process.started.disconnect(on_started)
self.process.error.disconnect(on_error)
self.process.finished.disconnect(on_finished)
def on_started():
self.log.debug("[pid=%s] Started %s", self.process.pid(), cmd_line)
dc()
self._startup_delay_timer.timeout.connect(on_startup_delay_expired)
self._startup_delay_timer.start()
def on_error(error):
self.log.debug("Error starting %s: %d", error)
dc()
ret.set_exception(ServerError(error))
def on_finished(exit_code, exit_status):
self.log.debug("%s finished: code=%d, status=%d", exit_code, exit_status)
def on_startup_delay_expired():
self._startup_delay_timer.timeout.disconnect(on_startup_delay_expired)
if self.process.state() == QProcess.Running:
self.started.emit()
self.log.debug("[pid=%s] Startup delay expired", self.process.pid())
ret.set_progress_text("Started %s" % cmd_line)
ret.set_result(True)
else:
if self.process.error() != QProcess.UnknownError:
self.log.warning("Setting exception with errorString: %s", self.process.errorString())
ret.set_exception(ServerError(self.process.errorString()))
else:
self.log.warning("Setting exception with exit_code: %s", self.exit_code())
ret.set_exception(ServerError(self.exit_code()))
self.process.started.connect(on_started)
self.process.error.connect(on_error)
self.process.finished.connect(on_finished)
self.log.debug("Starting %s", cmd_line)
self.process.start(program, args, QtCore.QIODevice.ReadOnly)
ret.set_progress_text("Starting %s" % cmd_line)
except Exception as e:
self.log.exception("ServerProcess")
ret.set_exception(e)
return ret
[docs] def stop(self, kill=False):
ret = Future()
if self.process.state() != QProcess.NotRunning:
def on_finished(code, status):
self.log.debug("Process finished with code=%d (%s)", code, ServerProcess.exit_code_string(code))
self.process.finished.disconnect(on_finished)
ret.set_result(True)
self.process.finished.connect(on_finished)
if not kill and not sys.platform.startswith('win32'):
self.process.terminate()
else:
self.process.kill()
else:
ret.set_exception(ServerIsStopped())
return ret
[docs] def kill(self):
return self.stop(True)
[docs] def is_starting(self):
return self.process.state() == QProcess.Starting
[docs] def is_running(self):
return self.process.state() == QProcess.Running
[docs] def exit_code(self):
code = self.process.exitCode()
return (code, ServerProcess.exit_code_string(code))
[docs] @staticmethod
def exit_code_string(code):
return ServerProcess.exit_codes.get(code, "exit_unknown_error")
def _prepare_args(self):
args = list()
if self.verbosity != 0:
# verbose/quiet flags (-vvv / -qqq)
verb_flag = 'v' if self.verbosity > 0 else 'q'
verb_args = '-' + verb_flag * abs(self.verbosity)
args.append(verb_args)
args.extend(['--listen-address', self.listen_address])
args.extend(['--listen-port', str(self.listen_port)])
if self.serial_port is not None:
args.extend(['--mrc-serial-port', self.serial_port])
args.extend(['--mrc-baud-rate', str(self.baud_rate)])
elif self.tcp_host is not None:
args.extend(['--mrc-host', self.tcp_host])
args.extend(['--mrc-port', str(self.tcp_port)])
else:
raise RuntimeError("Neither serial_port nor tcp_host given.")
return args
def _error(self, error):
exit_code = self.process.exitCode()
self.log.error("error=%d (%s), exit_code=%d (%s)",
error, self.process.errorString(),
exit_code, ServerProcess.exit_code_string(exit_code))
self.error.emit(error, self.process.errorString(),
exit_code, ServerProcess.exit_code_string(exit_code))
def _finished(self, code, status):
self.log.debug("Finished: status=%d, code=%d, str=%s", status, code,
ServerProcess.exit_code_string(code))
self.finished.emit(status, code, ServerProcess.exit_code_string(code))
def _output(self):
data = str(self.process.readAllStandardOutput())
self.output_buffer.append(data)
self.output.emit(data)
[docs]class ServerProcessPool(QtCore.QObject):
"""Keeps track of running ServerProcesses and the listen ports they use."""
def __init__(self, parent=None):
super(ServerProcessPool, self).__init__(parent)
self.log = util.make_logging_source_adapter(__name__, self)
self._procs_by_port = weakref.WeakValueDictionary()
self._unavailable_ports = set()
[docs] def create_process(self, options={}, parent=None):
proc = ServerProcess(parent=parent)
for attr, value in options.iteritems():
setattr(proc, attr, value)
proc.listen_port = self._get_free_port()
self._procs_by_port[proc.listen_port] = proc
proc.finished.connect(partial(self._on_process_finished, process=proc))
return proc
def _get_free_port(self):
in_use = set(self._procs_by_port.keys())
self.log.debug("in_use=%s, unavail=%s", in_use, self._unavailable_ports)
in_use = in_use.union(self._unavailable_ports)
for p in xrange(BASE_PORT, 65535):
if p not in in_use:
return p
raise RuntimeError("No listen ports available")
def _on_process_finished(self, exit_status, exit_code, exit_code_string, process):
self.log.debug("_on_process_finished: exit_code=%d (%s), exit_status=%d",
exit_code, exit_code_string, exit_status)
if exit_code_string == 'exit_address_in_use':
self.log.warning('listen_port %d in use by an external process', process.listen_port)
self._unavailable_ports.add(process.listen_port)
del self._procs_by_port[process.listen_port]
process.listen_port = self._get_free_port()
self._procs_by_port[process.listen_port] = process
process.start()
pool = ServerProcessPool()
if __name__ == "__main__":
import logging
import signal
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)-15s] [%(name)s.%(levelname)s] %(message)s')
app = QtCore.QCoreApplication([])
def signal_handler(signum, frame):
app.quit()
signal.signal(signal.SIGINT, signal_handler)
sp1 = ServerProcess(serial_port='/dev/ttyUSB0')
sp2 = ServerProcess(serial_port='/dev/ttyUSB0')
sp3 = ServerProcess(serial_port='/dev/ttyUSB0', listen_address="foobar")
sp4 = ServerProcess(serial_port='/dev/ttyUSB0', listen_address="1.2.3.4")
def sp_start_done(f):
print "start() done:", f.result()
sp1.start().add_done_callback(sp_start_done)
sp2.start().add_done_callback(sp_start_done)
sp3.start().add_done_callback(sp_start_done)
sp4.start().add_done_callback(sp_start_done)
sys.exit(app.exec_())