import logging
import time
from memory import Memory, fp2fixed_int
import bitfield
LOGGER = logging.getLogger(__name__)
[docs]class Register(Memory):
"""
A CASPER register on an FPGA.
"""
[docs] def __init__(self, parent, name, address, device_info=None,
auto_update=False):
"""
:param parent:
:param name:
:param address:
:param device_info:
:param auto_update:
"""
self.auto_update = auto_update
self.parent = parent
self.last_values = {}
Memory.__init__(self, name=name, width_bits=32,
address=address, length_bytes=4)
self.process_info(device_info)
LOGGER.debug('New Register %s' % self)
[docs] @classmethod
def from_device_info(cls, parent, device_name, device_info, memorymap_dict, **kwargs):
"""
Process device info and the memory map to get all necessary info and
return a Register instance.
:param parent: the parent device
:param device_name: the unique device name
:param device_info: information about this device
:param memorymap_dict: a dictionary containing the device memory map
:return: a Register object
"""
address, length_bytes = -1, -1
for mem_name in memorymap_dict.keys():
if mem_name == device_name:
address, length_bytes = (memorymap_dict[mem_name]['address'],
memorymap_dict[mem_name]['bytes'])
break
if address == -1 or length_bytes == -1:
LOGGER.error(memorymap_dict)
print(memorymap_dict)
raise RuntimeError('Could not find address or length for '
'Register %s' % device_name)
return cls(parent, device_name, address=address,
device_info=device_info)
[docs] def info(self):
"""
Return a string with information about this Register instance.
"""
fstring = ''
for field in self._fields.iterkeys():
fstring += field + ', '
if fstring[-2:] == ', ':
fstring = fstring[:-2]
return '%s(%i,[%s])' % (self.name, self.width_bits, fstring)
[docs] def read(self, **kwargs):
"""
Memory.read returns a list for all bitfields, so just put those
values into single values.
"""
memdata = Memory.read(self, **kwargs)
results = memdata['data']
timestamp = memdata['timestamp']
for k, v in results.iteritems():
results[k] = v[0]
self.last_values = results
return {'data': results, 'timestamp': timestamp}
[docs] def read_raw(self, **kwargs):
"""
Read a raw 4-byte value from the host device. Size is 4-bytes.
:param kwargs:
"""
rawdata = self.parent.read(device_name=self.name, size=4, offset=0*4)
return rawdata, time.time()
[docs] def write_raw(self, data, blindwrite=False):
"""
Use the katcp_client_fpga write integer function.
"""
self.parent.write_int(self.name, data, blindwrite=blindwrite)
[docs] def read_uint(self, **kwargs):
return self.parent.read_uint(self.name, **kwargs)
[docs] def write_int(self, uintvalue, blindwrite=False, word_offset=0):
"""
Write an unsigned integer to this device using the fpga client.
"""
self.parent.write_int(device_name=self.name, integer=uintvalue,
blindwrite=blindwrite, word_offset=word_offset)
def _write_common(self, **kwargs):
"""
Form the dictionary of values that must be written
:param kwargs: the field names and values to write
"""
if len(kwargs) == 0:
LOGGER.info('%s: no keyword args given, exiting.' % self.name)
return
_read_necessary = False
new_values = {_field: None for _field in self.field_names()}
for k in kwargs:
if k not in new_values:
raise ValueError('Field {} not found in register {} on host '
'{}'.format(k, self.name, self.parent.host))
if kwargs[k] in ['pulse', 'toggle']:
_read_necessary = True
new_values[k] = kwargs[k]
for _value in new_values.values():
if _value is None:
_read_necessary = True
if _read_necessary:
# LOGGER.debug('A read of register %s is necessary' % self.name)
new_values = self.read()['data']
pulse = {}
for k in kwargs:
if kwargs[k] == 'pulse':
LOGGER.debug('%s: pulsing field %s (%i -> %i)' % (
self.name, k, new_values[k], not new_values[k]))
pulse[k] = new_values[k]
new_values[k] = not new_values[k]
elif kwargs[k] == 'toggle':
LOGGER.debug('%s: toggling field %s (%i -> %i)' % (
self.name, k, new_values[k], not new_values[k]))
new_values[k] = not new_values[k]
else:
new_values[k] = kwargs[k]
LOGGER.debug('%s: writing %.5f to field %s' % (
self.name, new_values[k], k))
# pack the values into a 32-bit integer
fixed_int = 0
for _f in self._fields.values():
_intval = fp2fixed_int(new_values[_f.name], _f.width_bits,
_f.binary_pt, _f.numtype == 1)
fixed_int |= (_intval << _f.offset)
# double-check the integer value is not too large
if fixed_int > (2**32)-1:
LOGGER.error('%s: problem writing to register %s:' %
(self.parent.host, self.name))
for _f in self._fields.values():
_intval = fp2fixed_int(new_values[_f.name], _f.width_bits,
_f.binary_pt, _f.numtype == 1)
LOGGER.error('%s:%s:%s:%i(%sfix%i.%i) = %.8e -> %i' % (
self.parent.host, self.name, _f.name, _f.offset,
'u' if _f.numtype != 1 else '',
_f.width_bits, _f.binary_pt,
new_values[_f.name], _intval
))
LOGGER.error('%s:%s - gave int value of %i' %
(self.parent.host, self.name, fixed_int))
return fixed_int, pulse
[docs] def blindwrite(self, **kwargs):
"""
As write, but without checking the result
"""
fint, pulse = self._write_common(**kwargs)
self.write_raw(fint, blindwrite=True)
if len(pulse.keys()) > 0:
self.blindwrite(**pulse)
[docs] def write(self, **kwargs):
"""
Write fields in a register, using keyword arguments for fields
:param kwargs:
"""
fint, pulse = self._write_common(**kwargs)
self.write_raw(fint)
if len(pulse.keys()) > 0:
self.write(**pulse)
[docs] def write_single(self, value):
"""
Write single value.
:param value:
"""
if len(self.field_names()) != 1:
raise ValueError('Register has more than one field, cannot '
'use the assignment shortcut write method.')
fwritedict = {self.field_names()[0]: value}
self.write(**fwritedict)
# TODO
# class FieldsHolder(object):
# def __init__(self, parentreg):
# self._reglkjsdfoi = parentreg
[docs] def process_info(self, info):
"""
Set this Register's extra information.
"""
if (info is None) or (info == {}):
return
self.block_info = info
self.fields_clear()
# current and current-but-one have names field
if 'names' in self.block_info.keys():
self._process_info_current()
elif 'numios' in self.block_info.keys():
# aborted tabbed one
self._process_info_tabbed()
elif 'name' in self.block_info.keys():
# oldest
LOGGER.error('Old registers are deprecated!')
self.field_add(bitfield.Field('reg', 0, 32, 0, 0))
else:
LOGGER.error('That is a seriously old register - please swap it '
'out!')
LOGGER.error(self)
LOGGER.error(self.block_info)
self.field_add(bitfield.Field('reg', 0, 32, 0, 0))
# raise RuntimeError('Unknown Register type.')
# TODO
# # add the fields as shortcut readable and writeable
# self.fields = Register.FieldsHolder(self)
# for fld in self._fields:
# setattr(
# self.fields, fld, property(
# lambda fldhldr:
# fldhldr._reglkjsdfoi.read()['data'][fld],
# lambda fldhldr, value:
# fldhldr._reglkjsdfoi.write(**{fld: value})
# )
# )
def _process_info_current(self):
# current one
clean_fields = bitfield.clean_fields
# a single value may have been used for width, type or binary point
fields = {'names': clean_fields(self.name, 'register',
self.block_info['names']),
'widths': clean_fields(self.name, 'register',
self.block_info['bitwidths']),
'types': clean_fields(self.name, 'register',
self.block_info['arith_types']),
'bps': clean_fields(self.name, 'register',
self.block_info['bin_pts'])}
fields['names'].reverse()
fields['widths'].reverse()
fields['types'].reverse()
fields['bps'].reverse()
len_names = len(fields['names'])
for fld in ['widths', 'types', 'bps']:
# convert the number-based fields to integers
for n, val in enumerate(fields[fld]):
try:
intvalue = int(val)
except ValueError:
intvalue = eval(val)
fields[fld][n] = intvalue
# accommodate new snapshots where the fields may have length one
len_fld = len(fields[fld])
if len_fld != len_names:
if len_fld != 1:
raise RuntimeError('%i names, but %i %s?' % (
len_names, len_fld, fld))
fields[fld] = [fields[fld][0]] * len_names
# construct the fields and add them to this BitField
for ctr, name in enumerate(fields['names']):
field = bitfield.Field(name, fields['types'][ctr],
fields['widths'][ctr],
fields['bps'][ctr], -1)
self.field_add(field, auto_offset=True)
def _process_info_tabbed(self):
LOGGER.warn('Tabbed registers are deprecated!')
numios = int(self.block_info['numios'])
for ctr in range(numios, 0, -1):
if self.block_info['arith_type%i' % ctr] == 'Boolean':
atype = 2
elif self.block_info['arith_type%i' % ctr] == 'Unsigned':
atype = 0
else:
atype = 1
field = bitfield.Field(self.block_info['name%i' % ctr], atype,
int(self.block_info['bitwidth%i' % ctr]),
int(self.block_info['bin_pt%i' % ctr]),
-1)
self.field_add(field, auto_offset=True)
# end