#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# mesycontrol - Remote control for mesytec devices.
# Copyright (C) 2015-2016 mesytec GmbH & Co. KG <info@mesytec.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__author__ = 'Florian Lüke'
__email__ = 'f.lueke@mesytec.com'
from functools import partial
import collections
import itertools
from .. qt import pyqtSignal
from .. qt import Qt
from .. qt import QtCore
from .. qt import QtGui
from .. future import future_progress_dialog
from .. future import set_exception_on
from .. future import set_result_on
from .. import future
from .. import parameter_binding as pb
from .. import util
from .. specialized_device import DeviceBase
from .. specialized_device import DeviceWidgetBase
from .. util import hline
from .. util import make_spinbox
from .. util import make_title_label
from .. util import ReadOnlyCheckBox
import mscf16_profile
NUM_CHANNELS = mscf16_profile.NUM_CHANNELS
NUM_GROUPS = mscf16_profile.NUM_GROUPS
GAIN_FACTOR = mscf16_profile.GAIN_FACTOR
GAIN_JUMPER_LIMITS_V = mscf16_profile.GAIN_JUMPER_LIMITS_V
GAIN_JUMPER_LIMITS_C = mscf16_profile.GAIN_JUMPER_LIMITS_C
AUTO_PZ_ALL = NUM_CHANNELS + 1
SHAPING_TIMES_US = mscf16_profile.SHAPING_TIMES_US
CHANNEL_MODE_COMMON = 0
CHANNEL_MODE_INDIVIDUAL = 1
cg_helper = util.ChannelGroupHelper(NUM_CHANNELS, NUM_GROUPS)
[docs]class ModuleInfo(object):
"""Holds information about an MSCF16 that can't be detected via
software."""
def __init__(self, name='F', shaping_time=1, input_type='V',
input_connector='L', discriminator='CFD', cfd_delay=30):
self.name = name
self.shaping_time = shaping_time
self.input_type = input_type
self.input_connector = input_connector
self.discriminator = discriminator
self.cfd_delay = cfd_delay
[docs]class HardwareInfo(object):
"""Decodes the `hardware_info' register of newer MSCF16s."""
LN_TYPE = 1 << 0
HW_GE_V4 = 1 << 1
INTEGRATING = 1 << 2
SUMDIS = 1 << 6
def __init__(self, hw_info=None):
self.info = hw_info
[docs] def is_ln_type(self):
"""True if LN (low noise) version"""
return self.is_valid() and self.info & HardwareInfo.LN_TYPE
[docs] def is_hw_version_ge_4(self):
"""True if hardware version >= 4"""
return self.is_valid() and self.info & HardwareInfo.HW_GE_V4
[docs] def is_integrating(self):
"""True if this is a charge integrating MSCF16 (PMT variant)"""
return self.is_valid() and self.info & HardwareInfo.INTEGRATING
[docs] def has_sumdis(self):
return self.is_valid() and self.info & HardwareInfo.SUMDIS
def __and__(self, other):
return self.info & other
[docs] def is_valid(self):
return self.info is not None
[docs]class CopyFunction(object):
panel2rc = 1
rc2panel = 2
common2single = 3
Version = collections.namedtuple('Version', 'major minor')
[docs]def get_config_parameters(app_device):
# Start out with the default parameters defined in the profile. Then try to
# read the version register and remove parameters accordingly. If the
# hardware_info register is available use that information to keep/remove
# additional parameters.
# If no hardware is present the profile default parameters are returned.
# FIXME: it might be better to return the minimal profile if no version
# info can be read. At a later point when hardware is connected and version
# info is available the list of config parameters has to be updated.
profile = app_device.profile
params = profile.get_config_parameters()
if not app_device.has_hw:
return future.Future().set_result(params)
device = MSCF16(app_device, util.HARDWARE, util.HARDWARE)
ret = future.Future()
def version_done(f):
try:
version = f.result()
except Exception as e:
device.log.warning("could not read MSCF-16 version: %s", e)
ret.set_result(params)
return
def maybe_remove(min_version, *param_names):
if version < min_version:
device.log.info("version %s < %s -> removing %s",
version, min_version, param_names)
for n in param_names:
params.remove(profile[n])
maybe_remove((4, 0), 'blr_threshold', 'blr_enable', 'coincidence_time',
'shaper_offset', 'threshold_offset')
maybe_remove((5, 3), 'sumdis_threshold', 'ecl_delay_enable')
maybe_remove((5, 4), 'tf_int_time')
if version >= (5, 3):
def hw_info_done(f):
try:
hw_info = f.result()
if not hw_info.has_sumdis():
device.log.info("sumdis_threshold not available according to hw_info register")
params.remove(profile['sumdis_threshold'])
except Exception as e:
device.log.warning("could not read MSCF-16 hardware info register: %s", e)
ret.set_result(params)
device.get_hardware_info().add_done_callback(hw_info_done)
else:
ret.set_result(params)
device.get_version().add_done_callback(version_done)
return ret
[docs]def decode_version(val):
return Version(*divmod(int(val), 16))
[docs]def decode_fpga_version(val):
return Version(*divmod(int(val), 256))
[docs]def decode_cpu_software_version(val):
return Version(*divmod(int(val), 256))
[docs]def decode_hardware_info(val):
return HardwareInfo(int(val))
# ========== Device ==========
[docs]class MSCF16(DeviceBase):
gain_jumper_changed = pyqtSignal(int, float) # group, value
auto_pz_channel_changed = pyqtSignal(int)
def __init__(self, app_device, read_mode, write_mode, parent=None):
super(MSCF16, self).__init__(app_device, read_mode, write_mode, parent)
self.log = util.make_logging_source_adapter(__name__, self)
self._auto_pz_channel = 0
self._on_hardware_set(app_device, None, self.hw)
self.extension_changed.connect(self._on_extension_changed)
# ===== version registers =====
[docs] def get_version(self):
"""Reads the 'version' register and returns a Future whose result is a
namedtuple of the form (major, minor)."""
if not self.has_hw:
return future.Future().set_exception(
pb.ParameterUnavailable("hardware not present"))
ret = future.Future()
@set_result_on(ret)
def done(f):
return decode_version(int(f.result()))
self.get_hw_parameter('version').add_done_callback(done)
return ret
[docs] def has_detailed_versions(self):
ret = future.Future()
@set_result_on(ret)
def done(f):
version = f.result()
return version >= Version(5, 3)
self.get_version().add_done_callback(done)
return ret
def _get_detailed_version_parameter(self, param_name, decode_fun):
ret = future.Future()
@set_result_on(ret)
def get_param_done(f):
return decode_fun(int(f.result()))
@set_exception_on(ret)
def has_versions_done(f):
if f.result():
self.get_hw_parameter(param_name).add_done_callback(get_param_done)
else:
raise RuntimeError("Register '%s' (%d) not supported. Requires version >= 5.3" %
(param_name, self.profile[param_name].address))
self.has_detailed_versions().add_done_callback(has_versions_done)
return ret
[docs] def get_fpga_version(self):
return self._get_detailed_version_parameter(
'fpga_version', decode_fpga_version)
[docs] def get_cpu_software_version(self):
return self._get_detailed_version_parameter(
'cpu_software_version', decode_cpu_software_version)
[docs] def get_hardware_info(self):
return self._get_detailed_version_parameter(
'hardware_info', decode_hardware_info)
# FIXME: simplify this. DRY! This also does throw if hardware is not present.
# The version conditions appear twice: once in get_config_parameters() and again here!
# Also no user info string can be generated easily, e.g. "Requires version >= 5.3".
[docs] def has_ecl_enable(self):
ret = future.Future()
@future.set_result_on(ret)
def done(f):
return f.result() >= (5, 3)
self.get_version().add_done_callback(done)
return ret
[docs] def has_tf_int_time(self):
ret = future.Future()
@future.set_result_on(ret)
def done(f):
return f.result() >= (5, 4)
self.get_version().add_done_callback(done)
return ret
[docs] def is_charge_integrating(self):
ret = future.Future()
@future.set_result_on(ret)
def hw_info_done(f):
try:
return f.result().is_integrating()
except Exception:
return self.get_extension('input_type') == 'C'
self.get_hardware_info().add_done_callback(hw_info_done)
return ret
[docs] def has_sumdis_threshold(self):
ret = future.Future()
@future.set_result_on(ret)
def hw_info_done(f):
try:
return f.result().has_sumdis()
except Exception:
return False
self.get_hardware_info().add_done_callback(hw_info_done)
return ret
# ===== gain =====
# FIXME: update gain labels on hw_info availablity change
[docs] def get_total_gain(self, group):
ret = future.Future()
f_gain_group = self.get_parameter('gain_group%d' % group)
f_integrating = self.is_charge_integrating()
@future.set_result_on(ret)
def done(_):
gain = GAIN_FACTOR ** int(f_gain_group.result())
jumper = self.get_gain_jumper(group)
if f_integrating.result():
# Gain adjustable from 100 pC to 20 nC
return gain / jumper # FIXME: units
else:
return gain * jumper # this is unit-less
future.all_done(f_gain_group, f_integrating).add_done_callback(done)
return ret
[docs] def get_gain_jumper(self, group):
return self.get_extension('gain_jumpers')[group]
[docs] def set_gain_jumper(self, group, jumper_value):
jumpers = self.get_extension('gain_jumpers')
if jumpers[group] != jumper_value:
jumpers[group] = jumper_value
self.set_extension('gain_jumpers', jumpers)
[docs] def apply_common_gain(self):
return self._apply_common_to_single(
'gain_common', 'gain_group%d', NUM_GROUPS)
# ===== shaping time =====
[docs] def get_effective_shaping_time(self, group):
ret = future.Future()
@set_result_on(ret)
def done(f):
return SHAPING_TIMES_US[self.get_extension('shaping_time')][int(f.result())]
self.get_parameter('shaping_time_group%d' % group).add_done_callback(done)
return ret
[docs] def apply_common_sht(self):
return self._apply_common_to_single(
'shaping_time_common', 'shaping_time_group%d', NUM_GROUPS)
# ===== pz - pole zero =====
[docs] def apply_common_pz(self):
return self._apply_common_to_single(
'pz_value_common', 'pz_value_channel%d', NUM_CHANNELS)
[docs] def set_auto_pz(self, channel):
return self.set_hw_parameter('auto_pz', channel)
[docs] def get_auto_pz(self):
return self.get_hw_parameter('auto_pz')
# ===== threshold =====
[docs] def apply_common_threshold(self):
return self._apply_common_to_single(
'threshold_common', 'threshold_channel%d', NUM_CHANNELS)
# ===== copy function =====
# ===== helpers =====
def _apply_common_to_single(self, common_param_name, single_param_name_fmt, n_single_params):
ret = future.Future()
@set_result_on(ret)
def all_applied(f):
return all(g.result() for g in f.result())
@set_exception_on(ret)
def apply_to_single(f):
futures = list()
for i in range(n_single_params):
futures.append(self.set_parameter(single_param_name_fmt % i, int(f.result())))
f_all = future.all_done(*futures).add_done_callback(all_applied)
future.progress_forwarder(f_all, ret)
self.get_parameter(common_param_name).add_done_callback(apply_to_single)
return ret
def _on_hardware_set(self, app_device, old, new):
# Overrides DeviceBase._on_hardware_set which is connected by DeviceBase.
super(MSCF16, self)._on_hardware_set(app_device, old, new)
if old is not None:
old.parameter_changed.disconnect(self._on_hw_parameter_changed)
try:
old.remove_polling_subscriber(self)
except KeyError:
pass
if new is not None:
new.parameter_changed.connect(self._on_hw_parameter_changed)
def _on_hw_parameter_changed(self, address, value):
if address == self.profile['auto_pz'].address:
# Refresh the channels PZ value once auto pz is done.
# auto_pz = 0 means auto pz is not currently running
# 0 < auto_pz <= NUM_CHANNELS means auto pz is running for that channel
# self._auto_pz_channel is the last channel that auto pz was running for
if 0 < self._auto_pz_channel <= NUM_CHANNELS:
self.read_hw_parameter('pz_value_channel%d' % (self._auto_pz_channel-1))
self._auto_pz_channel = value
self.auto_pz_channel_changed.emit(value)
def _on_extension_changed(self, name, value):
if name == 'gain_jumpers':
for g, v in enumerate(value):
self.gain_jumper_changed.emit(g, v)
[docs] def ensure_individual_channel_mode(self):
"""If any of cfg and hw are in common mode change them to individual mode"""
ret = future.Future()
def done(f_list):
try:
do_set = any(int(f.result()) == CHANNEL_MODE_COMMON for f in f_list.result())
if do_set:
self.log.warning("%s: Detected common channel mode. Setting individual mode.",
self.get_display_string())
@future.set_result_on(ret)
def set_done(f_set):
return bool(f_set.result())
# This takes the write mode into account. No extra checks need to be done.
self.set_parameter('single_channel_mode', CHANNEL_MODE_INDIVIDUAL
).add_done_callback(set_done)
else:
ret.set_result(True)
except Exception as e:
self.log.warning("%s: %s", self.get_display_string(), str(e))
ret.set_exception(e)
f_list = list()
if self.write_mode & util.CONFIG and self.has_cfg:
f_list.append(self.get_cfg_parameter('single_channel_mode'))
if self.write_mode & util.HARDWARE and self.has_hw:
f_list.append(self.get_hw_parameter('single_channel_mode'))
future.all_done(*f_list).add_done_callback(done)
return ret
# ========== GUI ==========
dynamic_label_style = "QLabel { background-color: lightgrey; }"
[docs]class GainPage(QtGui.QGroupBox):
def __init__(self, device, display_mode, write_mode, parent=None):
super(GainPage, self).__init__("Gain", parent)
self.log = util.make_logging_source_adapter(__name__, self)
self.device = device
device.gain_jumper_changed.connect(self._on_device_gain_jumper_changed)
device.extension_changed.connect(self._on_device_extension_changed)
self.gain_inputs = list()
self.gain_labels = list()
self.bindings = list()
layout = QtGui.QGridLayout(self)
layout.setContentsMargins(2, 2, 2, 2)
layout.addWidget(QtGui.QLabel("Common"), 0, 0, 1, 1, Qt.AlignRight)
self.gain_common = util.DelayedSpinBox()
b = pb.factory.make_binding(
device=device,
profile=device.profile['gain_common'],
display_mode=display_mode,
write_mode=write_mode,
target=self.gain_common)
self.bindings.append(b)
common_layout, self.apply_common_gain_button = util.make_apply_common_button_layout(
self.gain_common, "Apply to groups", self._apply_common_gain)
layout.addLayout(common_layout, 0, 1)
layout.addWidget(make_title_label("Group"), 1, 0, 1, 1, Qt.AlignRight)
layout.addWidget(make_title_label("RC Gain"), 1, 1, 1, 1, Qt.AlignCenter)
layout.addWidget(make_title_label("Total"), 1, 2, 1, 1, Qt.AlignCenter)
offset = layout.rowCount()
for i in range(NUM_GROUPS):
group_range = cg_helper.group_channel_range(i)
descr_label = QtGui.QLabel("%d-%d" % (group_range[0], group_range[-1]))
gain_spin = util.DelayedSpinBox()
gain_label = QtGui.QLabel("N/A")
gain_label.setStyleSheet(dynamic_label_style)
self.gain_inputs.append(gain_spin)
self.gain_labels.append(gain_label)
b = pb.factory.make_binding(
device=device,
profile=device.profile['gain_group%d' % i],
display_mode=display_mode,
write_mode=write_mode,
target=gain_spin)
self.bindings.append(b)
b.add_update_callback(self._update_gain_label_cb, group=i)
layout.addWidget(descr_label, i+offset, 0, 1, 1, Qt.AlignRight)
layout.addWidget(gain_spin, i+offset, 1)
layout.addWidget(gain_label, i+offset, 2, 1, 1, Qt.AlignCenter)
[docs] def handle_hardware_connected_changed(self, connected):
if self.device.read_mode & util.HARDWARE:
self.apply_common_gain_button.setEnabled(connected)
@future_progress_dialog()
def _apply_common_gain(self):
return self.device.apply_common_gain()
def _on_device_gain_jumper_changed(self, group, value):
self._update_gain_label(group)
def _on_device_extension_changed(self, name, value):
if name == 'input_type':
for i in range(NUM_GROUPS):
self._update_gain_label(i)
def _update_gain_label_cb(self, f, group):
self._update_gain_label(group)
def _update_gain_label(self, group):
f_gain = self.device.get_total_gain(group)
f_integrating = self.device.is_charge_integrating()
def done(_):
try:
if f_integrating.result():
tmpl = "%.2f nC"
else:
tmpl = "%.2f"
self.gain_labels[group].setText(tmpl % f_gain.result())
self.gain_labels[group].setToolTip(str())
except Exception as e:
self.gain_labels[group].setText("N/A")
self.gain_labels[group].setToolTip(str(e))
future.all_done(f_gain, f_integrating).add_done_callback(done)
[docs]class AutoPZSpin(QtGui.QStackedWidget):
def __init__(self, parent=None):
super(AutoPZSpin, self).__init__(parent)
self.spin = util.DelayedSpinBox()
self.progress = QtGui.QProgressBar()
self.progress.setTextVisible(False)
self.progress.setMinimum(0)
self.progress.setMaximum(0)
# Ignore the size hint of the progressbar so that only the size of the
# spinbox is taken into account when calculating the final widget size.
self.progress.setSizePolicy(QtGui.QSizePolicy.Ignored, QtGui.QSizePolicy.Ignored)
self.addWidget(self.spin)
self.addWidget(self.progress)
[docs] def showSpin(self):
self.setCurrentIndex(0)
[docs] def showProgress(self):
self.setCurrentIndex(1)
[docs]class ShapingPage(QtGui.QGroupBox):
auto_pz_button_size = QtCore.QSize(24, 24)
def __init__(self, device, display_mode, write_mode, parent=None):
super(ShapingPage, self).__init__("Shaping", parent)
self.log = util.make_logging_source_adapter(__name__, self)
self.device = device
self.device.auto_pz_channel_changed.connect(self._on_device_auto_pz_channel_changed)
self.device.hardware_set.connect(self._on_hardware_set)
self.device.extension_changed.connect(self._on_device_extension_changed)
self.stop_icon = QtGui.QIcon(':/stop.png')
self.sht_inputs = list()
self.sht_labels = list()
self.pz_inputs = list()
self.pz_buttons = list()
self.pz_stacks = list()
self.bindings = list()
# Columns: group_num, shaping time input, shaping time display, chan_num, pz input, auto pz button
self.spin_sht_common = util.DelayedSpinBox()
b = pb.factory.make_binding(
device=device,
profile=device.profile['shaping_time_common'],
display_mode=display_mode,
write_mode=write_mode,
target=self.spin_sht_common)
self.bindings.append(b)
self.spin_pz_common = util.DelayedSpinBox()
b = pb.factory.make_binding(
device=device,
profile=device.profile['pz_value_common'],
display_mode=display_mode,
write_mode=write_mode,
target=self.spin_pz_common)
self.bindings.append(b)
sht_common_layout, self.sht_common_button = util.make_apply_common_button_layout(
self.spin_sht_common, "Apply to groups", self._apply_common_sht)
pz_common_layout, self.pz_common_button = util.make_apply_common_button_layout(
self.spin_pz_common, "Apply to channels", self._apply_common_pz)
self.pb_auto_pz_all = QtGui.QPushButton("A")
self.pb_auto_pz_all.setToolTip("Start auto PZ for all channels")
self.pb_auto_pz_all.setStatusTip(self.pb_auto_pz_all.toolTip())
self.pb_auto_pz_all.setMaximumSize(ShapingPage.auto_pz_button_size)
self.pb_auto_pz_all.clicked.connect(partial(self.device.set_auto_pz, channel=AUTO_PZ_ALL))
layout = QtGui.QGridLayout(self)
layout.setContentsMargins(2, 2, 2, 2)
layout.addWidget(QtGui.QLabel("Common"), 0, 0, 1, 1, Qt.AlignRight)
layout.addLayout(sht_common_layout, 0, 1)
layout.addWidget(QtGui.QLabel("Common"), 0, 3, 1, 1, Qt.AlignRight)
layout.addLayout(pz_common_layout, 0, 4)
layout.addWidget(self.pb_auto_pz_all, 0, 5)
layout.addWidget(make_title_label("Group"), 1, 0, 1, 1, Qt.AlignRight)
layout.addWidget(make_title_label("Shaping time"), 1, 1, 1, 2, Qt.AlignLeft)
layout.addWidget(make_title_label("Chan"), 1, 3, 1, 1, Qt.AlignRight)
layout.addWidget(make_title_label("PZ"), 1, 4)
for chan in range(NUM_CHANNELS):
group = int(chan / NUM_GROUPS)
group_range = cg_helper.group_channel_range(group)
row = layout.rowCount()
if chan % NUM_GROUPS == 0:
descr_label = QtGui.QLabel("%d-%d" % (group_range[0], group_range[-1]))
spin_sht = util.DelayedSpinBox()
label_sht = QtGui.QLabel("N/A")
label_sht.setStyleSheet(dynamic_label_style)
layout.addWidget(descr_label, row, 0, 1, 1, Qt.AlignRight)
layout.addWidget(spin_sht, row, 1)
layout.addWidget(label_sht, row, 2)
self.sht_inputs.append(spin_sht)
self.sht_labels.append(label_sht)
b = pb.factory.make_binding(
device=device,
profile=device.profile['shaping_time_group%d' % group],
display_mode=display_mode,
write_mode=write_mode,
target=spin_sht)
self.bindings.append(b)
b.add_update_callback(self._update_sht_label_cb, group=group)
label_chan = QtGui.QLabel("%d" % chan)
spin_pz = AutoPZSpin()
self.pz_inputs.append(spin_pz.spin)
self.pz_stacks.append(spin_pz)
b = pb.factory.make_binding(
device=device,
profile=device.profile['pz_value_channel%d' % chan],
display_mode=display_mode,
write_mode=write_mode,
target=spin_pz.spin)
self.bindings.append(b)
button_pz = QtGui.QPushButton("A")
button_pz.setToolTip("Start auto PZ for channel %d" % chan)
button_pz.setStatusTip(button_pz.toolTip())
button_pz.setMaximumSize(ShapingPage.auto_pz_button_size)
button_pz.clicked.connect(partial(self._auto_pz_button_clicked, channel=chan))
self.pz_buttons.append(button_pz)
layout.addWidget(label_chan, row, 3, 1, 1, Qt.AlignRight)
layout.addWidget(spin_pz, row, 4)
layout.addWidget(button_pz, row, 5)
layout.addWidget(hline(), layout.rowCount(), 0, 1, 6)
self.spin_shaper_offset = util.DelayedSpinBox()
self.label_shaper_offset = QtGui.QLabel()
self.label_shaper_offset.setStyleSheet(dynamic_label_style)
self.spin_blr_threshold = util.DelayedSpinBox()
self.check_blr_enable = QtGui.QCheckBox()
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['shaper_offset'],
display_mode=display_mode,
write_mode=write_mode,
target=self.spin_shaper_offset
).add_update_callback(self._update_shaper_offset_label_cb))
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['blr_threshold'],
display_mode=display_mode,
write_mode=write_mode,
target=self.spin_blr_threshold))
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['blr_enable'],
display_mode=display_mode,
write_mode=write_mode,
target=self.check_blr_enable))
row = layout.rowCount()
layout.addWidget(QtGui.QLabel("Sh. offset"), row, 0)
layout.addWidget(self.spin_shaper_offset, row, 1)
layout.addWidget(self.label_shaper_offset, row, 2)
row += 1
layout.addWidget(QtGui.QLabel("BLR thresh."), row, 0)
layout.addWidget(self.spin_blr_threshold, row, 1)
row += 1
layout.addWidget(QtGui.QLabel("BLR enable"), row, 0)
layout.addWidget(self.check_blr_enable, row, 1)
self._on_hardware_set(device, None, device.hw)
[docs] def handle_hardware_connected_changed(self, connected):
if self.device.read_mode & util.HARDWARE:
self.sht_common_button.setEnabled(connected)
self.pz_common_button.setEnabled(connected)
def _on_hardware_set(self, device, old, new):
hw_is_ok = new is not None and new.idc == idc
self.pb_auto_pz_all.setEnabled(hw_is_ok)
for b in self.pz_buttons:
b.setEnabled(hw_is_ok)
@future_progress_dialog()
def _apply_common_sht(self):
return self.device.apply_common_sht()
@future_progress_dialog()
def _apply_common_pz(self):
return self.device.apply_common_pz()
def _update_sht_label_cb(self, ignored_future, group):
self._update_sht_label(group)
def _update_sht_label(self, group):
def done(f):
try:
text = "%.2f µs" % f.result()
label = self.sht_labels[group]
label.setText(QtCore.QString.fromUtf8(text))
except Exception:
self.sht_labels[group].setText("N/A")
self.device.get_effective_shaping_time(group).add_done_callback(done)
def _on_device_auto_pz_channel_changed(self, value):
for i, pz_stack in enumerate(self.pz_stacks):
try:
button = self.pz_buttons[i]
except IndexError:
continue
if value == 0 or i != value-1:
pz_stack.showSpin()
button.setText("A")
button.setIcon(QtGui.QIcon())
button.setToolTip("Start auto PZ for channel %d" % i)
elif i == value-1:
pz_stack.showProgress()
button.setText("")
button.setIcon(self.stop_icon)
button.setToolTip("Stop auto PZ")
def _auto_pz_button_clicked(self, channel):
def done(f):
auto_pz = int(f.result())
# Turn auto pz off if the button of the active channel was clicked.
# Otherwise turn it on for that channel.
self.device.set_auto_pz(0 if auto_pz == channel+1 else channel+1)
self.device.get_auto_pz().add_done_callback(done)
def _update_shaper_offset_label_cb(self, f):
value = int(f.result()) - 100
self.label_shaper_offset.setText("%d" % value)
def _on_device_extension_changed(self, name, value):
if name == 'shaping_time':
for i in range(NUM_GROUPS):
self._update_sht_label(i)
[docs]class TimingPage(QtGui.QGroupBox):
def __init__(self, device, display_mode, write_mode, parent=None):
super(TimingPage, self).__init__("Timing", parent)
self.device = device
self.threshold_inputs = list()
self.threshold_labels = list()
self.bindings = list()
self.threshold_common = make_spinbox(limits=device.profile['threshold_common'].range.to_tuple())
self.threshold_common = util.DelayedSpinBox()
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['threshold_common'],
display_mode=display_mode,
write_mode=write_mode,
target=self.threshold_common))
threshold_common_layout, self.threshold_common_button = util.make_apply_common_button_layout(
self.threshold_common, "Apply to channels", self._apply_common_threshold)
layout = QtGui.QGridLayout(self)
layout.setContentsMargins(2, 2, 2, 2)
layout.addWidget(QtGui.QLabel("Common"), 0, 0, 1, 1, Qt.AlignRight)
layout.addLayout(threshold_common_layout, 0, 1)
layout.addWidget(make_title_label("Chan"), 1, 0, 1, 1, Qt.AlignRight)
layout.addWidget(make_title_label("Threshold"), 1, 1)
for chan in range(NUM_CHANNELS):
offset = 2
descr_label = QtGui.QLabel("%d" % chan)
spin_threshold = util.DelayedSpinBox()
label_threshold = QtGui.QLabel()
label_threshold.setStyleSheet(dynamic_label_style)
layout.addWidget(descr_label, chan+offset, 0, 1, 1, Qt.AlignRight)
layout.addWidget(spin_threshold, chan+offset, 1)
layout.addWidget(label_threshold, chan+offset, 2)
self.threshold_inputs.append(spin_threshold)
self.threshold_labels.append(label_threshold)
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['threshold_channel%d' % chan],
display_mode=display_mode,
write_mode=write_mode,
target=spin_threshold))
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['threshold_channel%d' % chan],
display_mode=display_mode,
write_mode=write_mode,
target=label_threshold,
unit_name='percent'))
layout.addWidget(hline(), layout.rowCount(), 0, 1, 3)
self.spin_threshold_offset = util.DelayedSpinBox()
self.label_threshold_offset = QtGui.QLabel()
self.label_threshold_offset.setStyleSheet(dynamic_label_style)
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['threshold_offset'],
display_mode=display_mode,
write_mode=write_mode,
target=self.spin_threshold_offset
).add_update_callback(self._threshold_offset_cb))
self.check_ecl_delay = QtGui.QCheckBox()
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['ecl_delay_enable'],
display_mode=display_mode,
write_mode=write_mode,
target=self.check_ecl_delay
).add_update_callback(self._ecl_delay_enable_cb))
self.spin_tf_int_time = util.DelayedSpinBox()
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['tf_int_time'],
display_mode=display_mode,
write_mode=write_mode,
target=self.spin_tf_int_time
).add_update_callback(self._tf_int_time_cb))
row = layout.rowCount()
layout.addWidget(QtGui.QLabel("Thr. offset"), row, 0)
layout.addWidget(self.spin_threshold_offset, row, 1)
layout.addWidget(self.label_threshold_offset, row, 2)
row += 1
layout.addWidget(QtGui.QLabel("ECL delay"), row, 0)
layout.addWidget(self.check_ecl_delay, row, 1)
row += 1
layout.addWidget(QtGui.QLabel("TF int. time"), row, 0)
layout.addWidget(self.spin_tf_int_time, row, 1)
@future_progress_dialog()
def _apply_common_threshold(self):
return self.device.apply_common_threshold()
def _ecl_delay_enable_cb(self, param_future):
def done(f):
try:
self.check_ecl_delay.setEnabled(f.result())
if not f.result():
self.check_ecl_delay.setToolTip("N/A")
except Exception:
pass
self.device.has_ecl_enable().add_done_callback(done)
def _tf_int_time_cb(self, param_future):
def done(f):
try:
self.spin_tf_int_time.setEnabled(f.result())
if not f.result():
self.spin_tf_int_time.setToolTip("N/A")
except Exception:
pass
self.device.has_tf_int_time().add_done_callback(done)
def _threshold_offset_cb(self, f):
value = int(f.result()) - 100
self.label_threshold_offset.setText("%d" % value)
[docs] def handle_hardware_connected_changed(self, connected):
if self.device.read_mode & util.HARDWARE:
self.threshold_common_button.setEnabled(connected)
[docs]class ChannelModeBinding(pb.AbstractParameterBinding):
def __init__(self, **kwargs):
super(ChannelModeBinding, self).__init__(**kwargs)
self.target[0].toggled.connect(self._mode_single_toggled)
self.target[1].toggled.connect(self._mode_common_toggled)
def _update(self, rf):
try:
rb = self.target[0] if int(rf.result()) else self.target[1]
with util.block_signals(rb):
rb.setChecked(True)
except Exception:
pass
for rb in self.target:
rb.setToolTip(self._get_tooltip(rf))
rb.setStatusTip(rb.toolTip())
def _mode_single_toggled(self, checked):
if checked:
self._write_value(1)
def _mode_common_toggled(self, checked):
if checked:
self._write_value(0)
[docs]class MiscPage(QtGui.QWidget):
def __init__(self, device, display_mode, write_mode, parent=None):
super(MiscPage, self).__init__(parent)
self.log = util.make_logging_source_adapter(__name__, self)
self.device = device
self.bindings = list()
self.device.hardware_set.connect(self._on_hardware_set)
layout = QtGui.QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
# Coincidence/Trigger
trigger_box = QtGui.QGroupBox("Coincidence/Trigger")
trigger_layout = QtGui.QGridLayout(trigger_box)
trigger_layout.setContentsMargins(2, 2, 2, 2)
self.spin_coincidence_time = util.DelayedSpinBox()
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['coincidence_time'],
display_mode=display_mode,
write_mode=write_mode,
target=self.spin_coincidence_time))
self.label_coincidence_time = QtGui.QLabel()
self.label_coincidence_time.setStyleSheet(dynamic_label_style)
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['coincidence_time'],
display_mode=display_mode,
write_mode=write_mode,
target=self.label_coincidence_time,
unit_name='nanoseconds'))
self.spin_multiplicity_high = util.DelayedSpinBox()
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['multiplicity_hi'],
display_mode=display_mode,
write_mode=write_mode,
target=self.spin_multiplicity_high))
self.spin_multiplicity_low = util.DelayedSpinBox()
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['multiplicity_lo'],
display_mode=display_mode,
write_mode=write_mode,
target=self.spin_multiplicity_low))
row = 0
trigger_layout.addWidget(QtGui.QLabel("Coinc. time"), row, 0)
trigger_layout.addWidget(self.spin_coincidence_time, row, 1)
trigger_layout.addWidget(self.label_coincidence_time, row, 2)
row += 1
trigger_layout.addWidget(QtGui.QLabel("Mult. low"), row, 0)
trigger_layout.addWidget(self.spin_multiplicity_low, row, 1)
row += 1
trigger_layout.addWidget(QtGui.QLabel("Mult. high"), row, 0)
trigger_layout.addWidget(self.spin_multiplicity_high, row, 1)
# Monitor
monitor_box = QtGui.QGroupBox("Monitor Channel")
monitor_layout = QtGui.QGridLayout(monitor_box)
monitor_layout.setContentsMargins(2, 2, 2, 2)
self.combo_monitor = QtGui.QComboBox()
self.combo_monitor.addItem("Off")
self.combo_monitor.addItems(["Channel %d" % i for i in range(NUM_CHANNELS)])
self.combo_monitor.setMaxVisibleItems(NUM_CHANNELS+1)
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['monitor_channel'],
display_mode=display_mode,
write_mode=write_mode,
target=self.combo_monitor))
monitor_layout.addWidget(self.combo_monitor, 0, 0)
# Channel mode
self.rb_mode_single = QtGui.QRadioButton("Individual")
self.rb_mode_common = QtGui.QRadioButton("Common")
self.rb_mode_common.setEnabled(False)
self.bindings.append(ChannelModeBinding(
device=device,
profile=device.profile['single_channel_mode'],
display_mode=display_mode,
write_mode=write_mode,
target=(self.rb_mode_single, self.rb_mode_common)))
mode_box = QtGui.QGroupBox("Channel Mode")
mode_layout = QtGui.QGridLayout(mode_box)
mode_layout.setContentsMargins(2, 2, 2, 2)
mode_layout.addWidget(self.rb_mode_single, 0, 0)
mode_layout.addWidget(self.rb_mode_common, 0, 1)
# Copy Functions
copy_box = QtGui.QGroupBox("Copy")
copy_layout = QtGui.QVBoxLayout(copy_box)
copy_layout.setContentsMargins(2, 2, 2, 2)
self.pb_copy_panel2rc = QtGui.QPushButton("Panel -> RC", clicked=self._copy_panel2rc)
self.copy_panel2rc_progress = QtGui.QProgressBar()
self.copy_panel2rc_stack = QtGui.QStackedWidget()
self.copy_panel2rc_stack.addWidget(self.pb_copy_panel2rc)
self.copy_panel2rc_stack.addWidget(self.copy_panel2rc_progress)
self.pb_copy_rc2panel = QtGui.QPushButton("RC -> Panel", clicked=self._copy_rc2panel)
self.pb_copy_common2single = QtGui.QPushButton("Common -> Single", clicked=self._copy_common2single)
self.copy_common2single_progress = QtGui.QProgressBar()
self.copy_common2single_stack = QtGui.QStackedWidget()
self.copy_common2single_stack.addWidget(self.pb_copy_common2single)
self.copy_common2single_stack.addWidget(self.copy_common2single_progress)
copy_layout.addWidget(self.copy_panel2rc_stack)
copy_layout.addWidget(self.pb_copy_rc2panel)
copy_layout.addWidget(self.copy_common2single_stack)
# Version display
version_box = QtGui.QGroupBox("Version")
version_layout = QtGui.QFormLayout(version_box)
version_layout.setContentsMargins(2, 2, 2, 2)
self.version_labels = dict()
for k, l in (
("version", "Version"),
("fpga_version", "FPGA"),
("cpu_software_version", "CPU")):
self.version_labels[k] = label = QtGui.QLabel()
label.setStyleSheet(dynamic_label_style)
version_layout.addRow(l+":", label)
self.hardware_info_widget = HardwareInfoWidget()
version_layout.addRow("Hardware Info:", self.hardware_info_widget)
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['version'],
display_mode=util.HARDWARE,
fixed_modes=True,
).add_update_callback(
self._version_label_cb,
label=self.version_labels['version'],
getter=self.device.get_version))
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['fpga_version'],
display_mode=util.HARDWARE,
fixed_modes=True,
).add_update_callback(
self._version_label_cb,
label=self.version_labels['fpga_version'],
getter=self.device.get_fpga_version))
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['cpu_software_version'],
display_mode=util.HARDWARE,
fixed_modes=True,
).add_update_callback(
self._version_label_cb,
label=self.version_labels['cpu_software_version'],
getter=self.device.get_cpu_software_version))
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['hardware_info'],
display_mode=util.HARDWARE,
fixed_modes=True,
).add_update_callback(
self._hardware_info_cb))
# More
self.spin_sumdis_threshold = util.DelayedSpinBox()
self.bindings.append(pb.factory.make_binding(
device=device,
profile=device.profile['sumdis_threshold'],
display_mode=display_mode,
write_mode=write_mode,
target=self.spin_sumdis_threshold
).add_update_callback(self._sumdis_threshold_cb))
more_box = QtGui.QGroupBox("More")
more_layout = QtGui.QFormLayout(more_box)
more_layout.setContentsMargins(2, 2, 2, 2)
more_layout.addRow("Sumdis Threshold", self.spin_sumdis_threshold)
layout.addWidget(trigger_box)
layout.addWidget(monitor_box)
layout.addWidget(mode_box)
layout.addWidget(copy_box)
layout.addWidget(version_box)
layout.addWidget(more_box)
self._on_hardware_set(device, None, device.hw)
def _on_hardware_set(self, device, old, new):
hw_is_ok = (new is not None
and not new.address_conflict
and not device.idc_conflict)
self.pb_copy_panel2rc.setEnabled(hw_is_ok)
self.pb_copy_rc2panel.setEnabled(hw_is_ok)
self.pb_copy_common2single.setEnabled(hw_is_ok)
signals = ['connected', 'disconnected', 'address_conflict_changed']
for sig in signals:
if old is not None:
try:
getattr(old, sig).disconnect(self._hardware_state_changed)
except TypeError:
pass
if new is not None:
getattr(new, sig).connect(self._hardware_state_changed)
for binding in self.bindings:
binding.populate()
def _hardware_state_changed(self):
hw = self.device.hw
en = (hw is not None
and hw.is_connected()
and not hw.address_conflict)
for b in (self.pb_copy_panel2rc, self.pb_copy_rc2panel,
self.pb_copy_common2single):
b.setEnabled(en)
def _copy_panel2rc(self):
def progress(f):
self.copy_panel2rc_progress.setValue(f.progress())
def done(f):
self.copy_panel2rc_stack.setCurrentIndex(0)
copy_future = self.device.perform_copy_function(CopyFunction.panel2rc)
copy_future.add_progress_callback(progress)
copy_future.add_done_callback(done)
self.copy_panel2rc_progress.setMaximum(copy_future.progress_max())
self.copy_panel2rc_progress.setValue(0)
self.copy_panel2rc_stack.setCurrentIndex(1)
def _copy_rc2panel(self):
self.device.perform_copy_function(CopyFunction.rc2panel)
def _copy_common2single(self):
def progress(f):
self.copy_common2single_progress.setValue(f.progress())
def done(f):
self.copy_common2single_stack.setCurrentIndex(0)
copy_future = self.device.perform_copy_function(CopyFunction.common2single)
copy_future.add_progress_callback(progress)
copy_future.add_done_callback(done)
self.copy_common2single_progress.setMaximum(copy_future.progress_max())
self.copy_common2single_progress.setValue(0)
self.copy_common2single_stack.setCurrentIndex(1)
def _version_label_cb(self, read_mem_future, label, getter):
def done(getter_future):
try:
version = getter_future.result()
label.setText("%d.%d" % (version.major, version.minor))
label.setToolTip(str())
except Exception as e:
label.setText("N/A")
label.setToolTip(str(e))
label.setStatusTip(label.toolTip())
getter().add_done_callback(done)
def _hardware_info_cb(self, read_mem_future):
def done(getter_future):
try:
hw_info = getter_future.result()
except Exception:
hw_info = HardwareInfo()
self.hardware_info_widget.set_hardware_info(hw_info)
self.device.get_hardware_info().add_done_callback(done)
def _sumdis_threshold_cb(self, read_sumdis_future):
def done(f):
try:
self.spin_sumdis_threshold.setEnabled(f.result())
if not f.result():
self.spin_sumdis_threshold.setToolTip("N/A")
except Exception:
pass
self.device.has_sumdis_threshold().add_done_callback(done)
# ========== Module ==========
idc = 20
device_class = MSCF16
device_ui_class = MSCF16Widget
profile_dict = mscf16_profile.profile_dict
if __name__ == "__main__":
import mock
import signal
import sys
import mscf16_profile
QtGui.QApplication.setDesktopSettingsAware(False)
app = QtGui.QApplication(sys.argv)
app.setStyle(QtGui.QStyleFactory.create("Plastique"))
def signal_handler(signum, frame):
app.quit()
signal.signal(signal.SIGINT, signal_handler)
timer = QtCore.QTimer()
timer.timeout.connect(lambda: None)
timer.start(500)
device = mock.Mock()
device.profile = mscf16_profile.get_device_profile()
device.parameter_changed = mock.MagicMock()
device.get_total_gain = mock.MagicMock(return_value=2)
device.get_gain_jumper = mock.MagicMock(return_value=30)
w = MSCF16Widget(device)
w.show()
ret = app.exec_()
print "device:", device.mock_calls
sys.exit(ret)