#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# mesycontrol - Remote control for mesytec devices.
# Copyright (C) 2015-2016 mesytec GmbH & Co. KG <info@mesytec.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__author__ = 'Florian Lüke'
__email__ = 'f.lueke@mesytec.com'
from qt import QtCore
import functools
[docs]class DuplicateParameter(RuntimeError):
pass
[docs]class Unit(object):
def __init__(self, label, factor=1.0, offset=0.0, name=None):
if label is None and name is None:
raise RuntimeError("Neither `label' nor `name' given.")
self.label = str(label) if label is not None else None
self.name = str(name) if name is not None else self.label
self.factor = float(factor) #: Factor used for value<->unit conversion.
self.offset = float(offset) #: Offset used for value<->unit conversion.
if self.factor == 0.0:
raise RuntimeError("invalid unit conversion factor of 0.0 given")
[docs] def unit_value(self, raw_value):
return raw_value / self.factor + self.offset
[docs] def raw_value(self, unit_value):
return round((unit_value - self.offset) * self.factor)
[docs] def get_factor(self):
return self._factor
[docs] def set_factor(self, factor):
self._factor = float(factor)
[docs] def get_offset(self):
return self._offset
[docs] def set_offset(self, offset):
self._offset = float(offset)
factor = property(get_factor, set_factor)
offset = property(get_offset, set_offset)
[docs] @staticmethod
def fromDict(d):
label = d.get('label', None)
name = d.get('name', None)
ret = Unit(name=name, label=label)
if 'factor' in d: ret.factor = d['factor']
if 'offset' in d: ret.offset = d['offset']
return ret
raw_unit = Unit(label=None, name='raw', factor=1.0, offset=0.0)
[docs]class Range(object):
def __init__(self, min_value, max_value):
self.min_value = min_value
self.max_value = max_value
[docs] def in_range(self, value):
return self.min_value <= value and value <= self.max_value
[docs] def limit_to(self, value):
if value < self.min_value:
return self.min_value
if value > self.max_value:
return self.max_value
return value
[docs] def to_tuple(self):
return (self.min_value, self.max_value)
def __getitem__(self, key):
if key == 0:
return self.min_value
if key == 1:
return self.max_value
raise KeyError(key)
[docs]@functools.total_ordering
class ParameterProfile(object):
def __init__(self, address):
self.address = int(address) #: Numeric address of the parameter.
self._name = None #: Human readable name.
self.index = None #: Index number of this parameter if it is part of a sequence
#: of parameters. E.g.: for MHV4 channels 1 through 4 index numbers
#: would be 0 through 3.
self.poll = False #: True if this parameter should be polled repeatedly.
self.read_only = False #: True if this parameter is read only. Its
# value will not be stored in the configuration.
self.critical = False #: True if this parameter affects a critical
# device setting (e.g. MHV4 channel enable).
# Only meaningful for writeable parameters.
self.safe_value = 0 #: Optional safe value if this parameter is critical.
self.do_not_store = False #: True if this parameters value should not be stored
# in or loaded from the configuration (e.g. MSCF-16
# copy function).
self.range = None #: Range instance limiting this parameters values.
self.units = list() #: Optional list of Unit definitions for this parameter.
#: Used to convert between raw and human readable parameter
#: values.
self._default = None #: Default value of the parameter. If not set the minimum of the
# parameters range will be used. If no range is set defaults to 0.
self.units.append(raw_unit)
[docs] def should_be_stored(self):
return not self.read_only and not self.do_not_store
[docs] def is_named(self):
return self._name is not None and len(self._name)
[docs] def get_name(self):
if not self.is_named():
return str()
return self._name
[docs] def set_name(self, name):
self._name = str(name) if name is not None else None
[docs] def get_unit(self, label_or_name):
return filter(lambda u: u.label == label_or_name or u.name == label_or_name, self.units)[0]
[docs] def has_index(self):
return self.index is not None
[docs] def set_default(self, value):
self._default = int(value)
[docs] def get_default(self):
if self._default is not None:
return self._default
if self.range is not None:
return self.range.min_value
return 0
def __eq__(self, o):
if isinstance(o, int):
return self.address == o
return self.address == o.address
def __lt__(self, o):
if isinstance(o, int):
return self.address < o
return self.address < o.address
def __str__(self):
if self.is_named():
return "ParameterProfile(address=%d, name=%s)" % (self.address, self.name)
return "ParameterProfile(address=%d)" % self.address
name = property(get_name, set_name)
default = property(get_default, set_default)
[docs] @staticmethod
def fromDict(d):
ret = ParameterProfile(d['address'])
#if 'name' in d: ret.name = d['name']
#if 'index' in d: ret.index = int(d['index'])
#if 'poll' in d: ret.poll = bool(d['poll'])
#if 'read_only' in d: ret.read_only = bool(d['read_only'])
#if 'critical' in d: ret.critical = bool(d['critical'])
#if 'safe_value' in d: ret.safe_value = int(d['safe_value'])
#if 'do_not_store' in d: ret.do_not_store = bool(d['do_not_store'])
attributes = (a for a in d.keys() if a not in ('address', 'range', 'units'))
for attr in attributes:
setattr(ret, attr, d[attr])
if 'range' in d: ret.range = Range(d['range'][0], d['range'][1])
if 'units' in d:
for unit_def in d['units']:
ret.units.append(Unit.fromDict(unit_def))
return ret;
# TODO: add get_poll_items() and use it for polling instead of get_volatile_addresses().
# difference: get_poll_items() returns a combination of single parameters and
# parameter ranges whereas get_volatile_addresses() returns a list of parameter
# addresses
[docs]class DeviceProfile(object):
def __init__(self, idc):
self.idc = int(idc) #: Device Identifier Code
self.name = None #: Device name (e.g. MHV4). Should be unique.
self.parameters = list()
self.address_parameter_map = dict()
self.name_parameter_map = dict()
self._extensions = dict()
def __str__(self):
return "DeviceProfile(name=%s, idc=%d)" % (self.name, self.idc)
[docs] def add_parameter(self, param):
if param.address in self.address_parameter_map:
raise DuplicateParameter("Address %d already present in DeviceProfile %s" %
(param.address, self))
self.parameters.append(param)
self.address_parameter_map[param.address] = param
self.name_parameter_map[param.name] = param
[docs] def get_parameter_by_address(self, address):
return self.address_parameter_map.get(address, None)
[docs] def get_parameter_by_name(self, name):
return self.name_parameter_map.get(name, None)
[docs] def has_parameter(self, param):
return (param in self.address_parameter_map or
param in self.name_parameter_map)
def __getitem__(self, key):
if isinstance(key, (str, unicode, QtCore.QString)):
return self.get_parameter_by_name(str(key))
try:
return self.get_parameter_by_address(int(key))
except ValueError:
raise KeyError(key)
[docs] def get_parameters(self):
return list(self.parameters)
[docs] def get_critical_parameters(self):
return filter(lambda p: p.critical, self.parameters)
[docs] def get_non_critical_parameters(self):
return filter(lambda p: not p.critical, self.parameters)
[docs] def get_config_parameters(self):
predicate = lambda p: not p.read_only and not p.do_not_store
return filter(predicate, self.parameters)
[docs] def get_non_critical_config_parameters(self):
predicate = lambda p: not p.critical and not p.read_only and not p.do_not_store
return filter(predicate, self.parameters)
[docs] def get_static_addresses(self):
return map(lambda p: p.address, filter(lambda p: not p.poll, self.parameters))
[docs] def get_volatile_addresses(self):
return map(lambda p: p.address, filter(lambda p: p.poll, self.parameters))
[docs] def set_extension(self, ext):
#d = dict()
#d['name'] = name = ext['name']
#d['default'] = ext['value']
#if 'limits' in ext:
# d['limits'] = ext['limits']
#if 'values' in ext:
# d['values'] = ext['values']
#self._extensions[name] = d
# FIXME: change extension mechanism to handle extension meta data
# => stored extension values in config are different from extension
# definitions in the device profile.
self._extensions[ext['name']] = ext
[docs] def get_extension(self, name):
return self._extensions[name]
[docs] def get_extensions(self):
return dict(self._extensions)
[docs] def get_parameter_names(self):
return dict((pp.address, pp.name)
for pp in filter(lambda pp: pp.is_named(), self.parameters))
[docs]def from_dict(d):
ret = DeviceProfile(d['idc'])
ret.name = str(d.get('name', None))
for pd in d.get('parameters', list()):
ret.add_parameter(ParameterProfile.fromDict(pd))
for ext in d.get('extensions', list()):
ret.set_extension(ext)
return ret
[docs]def make_generic_profile(device_idc):
ret = DeviceProfile(device_idc)
ret.name = "GenericDevice(idc=%d)" % device_idc
for i in range(256):
ret.add_parameter(ParameterProfile(i))
return ret