from __future__ import print_function
import logging
import time
from memory import Memory
import bitfield
from register import Register
LOGGER = logging.getLogger(__name__)
[docs]class Snap(Memory):
"""
Snap blocks are triggered/controlled blocks of RAM on FPGAs.
"""
[docs] def __init__(self, parent, name, width_bits, address, length_bytes,
device_info=None):
super(Snap, self).__init__(name=name, width_bits=width_bits,
address=address, length_bytes=length_bytes)
self.parent = parent
self.block_info = device_info
self.field_add(bitfield.Field(name='data', numtype=0,
width_bits=self.width_bits,
binary_pt=0, lsb_offset=0))
self.control_registers = {
'control': {'register': None, 'name': self.name + '_ctrl'},
'status': {'register': None, 'name': self.name + '_status'},
'trig_offset': {'register': None,
'name': self.name + '_trig_offset'},
'extra_value': {'register': None, 'name': self.name + '_val'},
'tr_en_cnt': {'register': None, 'name': self.name + '_tr_en_cnt'}}
LOGGER.debug('New Snap %s' % self)
[docs] @classmethod
def from_device_info(cls, parent, device_name, device_info, memorymap_dict, **kwargs):
"""
Process device info and the memory map to get all necessary
info and return a Snap instance.
:param parent: the Casperfpga on which this snap is found
:param device_name: the unique device name
:param device_info: information about this device
:param memorymap_dict: a dictionary containing the device memory map
:return: a Snap object
"""
address, length_bytes = -1, -1
for mem_name in memorymap_dict.keys():
if mem_name == device_name + '_bram':
address = memorymap_dict[mem_name]['address']
length_bytes = memorymap_dict[mem_name]['bytes']
break
word_bits = int(device_info['data_width'])
num_bytes = pow(2, int(device_info['nsamples'])) * (word_bits/8)
if length_bytes == -1:
length_bytes = num_bytes
if length_bytes != num_bytes:
raise RuntimeError('%s has mask length_bytes %d bytes, '
'but mem map length_bytes %d bytes' %
(device_name, num_bytes, length_bytes))
return cls(parent, device_name, width_bits=word_bits, address=address,
length_bytes=length_bytes, device_info=device_info)
[docs] def post_create_update(self, raw_system_info):
"""
Update the device with information not available at creation.
:param raw_system_info: dictionary of device information
"""
# is this snap block inside a bitsnap block?
for dev_name, dev_info in raw_system_info.items():
if dev_name != '':
if dev_info['tag'] == 'casper:bitsnap':
if self.name == dev_name + '_ss':
self.update_from_bitsnap(dev_info)
break
# find control registers for this snap block
self._link_control_registers(raw_system_info)
[docs] def update_from_bitsnap(self, info):
"""
Update this device with information from a bitsnap container.
:type self: Snap
:param info: device information dictionary containing Simulink block
information
"""
clean_fields = bitfield.clean_fields
self.block_info = info
if self.width_bits != int(info['snap_data_width']):
raise ValueError('Snap and matched bitsnap widths do not match.')
samples_bytes = pow(2, int(info['snap_nsamples'])) * (self.width_bits/8)
if self.length_bytes != samples_bytes:
raise ValueError('Snap and matched bitsnap lengths do not match.')
fields = {'names': clean_fields(self.name, 'snapshot',
info['io_names']),
'widths': clean_fields(self.name, 'snapshot',
info['io_widths']),
'types': clean_fields(self.name, 'snapshot',
info['io_types']),
'bps': clean_fields(self.name, 'snapshot',
info['io_bps'])}
fields['names'].reverse()
fields['widths'].reverse()
fields['types'].reverse()
fields['bps'].reverse()
self.fields_clear()
len_names = len(fields['names'])
for fld in ['widths', 'types', 'bps']:
# convert the number-based fields to integers
for n, val in enumerate(fields[fld]):
try:
intvalue = int(val)
except ValueError:
intvalue = eval(val)
fields[fld][n] = intvalue
# accommodate new snapshots where the fields may have length one
len_fld = len(fields[fld])
if len_fld != len_names:
if len_fld != 1:
raise RuntimeError('%i names, but %i %s?' % (
len_names, len_fld, fld))
fields[fld] = [fields[fld][0]] * len_names
# construct the fields and add them to this BitField
for ctr, name in enumerate(fields['names']):
field = bitfield.Field(name=name,
numtype=fields['types'][ctr],
width_bits=fields['widths'][ctr],
binary_pt=fields['bps'][ctr],
lsb_offset=-1)
self.field_add(field, auto_offset=True)
def _link_control_registers(self, raw_device_info):
"""
Link available registers to this snapshot block's control registers.
:param raw_device_info: Information about the device in raw form
"""
for controlreg in self.control_registers.values():
try:
reg = self.parent.memory_devices[controlreg['name']]
assert isinstance(reg, Register)
controlreg['register'] = reg
except KeyError:
pass
# set up the control register fields
if (self.control_registers['control']['register'] is None) or \
(self.control_registers['status']['register'] is None):
raise RuntimeError('Critical control registers for snap %s '
'missing.' % self.name)
if 'value' in self.block_info.keys():
self.block_info['snap_value'] = self.block_info['value']
if self.block_info['snap_value'] == 'on':
if self.control_registers['extra_value']['register'] is None:
raise RuntimeError('snap %s extra value register specified, '
'but not found. Problem.' % self.name)
extra_reg = self.control_registers['extra_value']
extra_info = raw_device_info[extra_reg['name']]
extra_info['mode'] = 'fields of arbitrary size'
if 'extra_names' in self.block_info.keys():
extra_info['names'] = self.block_info['extra_names']
extra_info['bitwidths'] = self.block_info['extra_widths']
extra_info['arith_types'] = self.block_info['extra_types']
extra_info['bin_pts'] = self.block_info['extra_bps']
else:
extra_info['names'] = '[reg]'
extra_info['bitwidths'] = '[32]'
extra_info['arith_types'] = '[0]'
extra_info['bin_pts'] = '[0]'
extra_reg['register'].process_info(extra_info)
[docs] def arm(self, man_trig=False, man_valid=False, offset=-1,
circular_capture=False):
"""
Arm the snapshot block.
:param man_trig:
:type man_trig: bool
:param man_valid:
:type man_valid: bool
:param offset:
:type offset: int
:param circular_capture:
:type circular_capture: bool
"""
ctrl_reg = self.control_registers['control']['register']
if offset >= 0:
self.control_registers['trig_offset']['register'].write_int(offset)
ctrl_reg.write_int(
(0 + (man_trig << 1) + (man_valid << 2) + (circular_capture << 3)))
ctrl_reg.write_int(
(1 + (man_trig << 1) + (man_valid << 2) + (circular_capture << 3)))
[docs] def print_snap(self, limit_lines=-1, **kwargs):
"""
Read and print(a snap block.)
:param limit_lines: limit the number of lines to print
:param offset: trigger offset
:param man_valid: force valid to be true
:param man_trig: force a trigger now
:param circular_capture: enable circular capture
:param timeout: time out after this many seconds
:param read_nowait: do not wait for the snap to finish reading
"""
snapdata = self.read(**kwargs)
for ctr in range(0, len(snapdata['data'][snapdata['data'].keys()[0]])):
print('%5i ' % ctr, end='')
for key in sorted(snapdata['data'].keys()):
print('{}({})\t'.format(key, snapdata['data'][key][ctr]), end='')
print('')
if (limit_lines > 0) and (ctr == limit_lines):
break
print('Capture offset: %i' % snapdata['offset'])
[docs] def read(self, **kwargs):
"""
Override Memory.read to handle the extra value register.
:param offset: trigger offset
:param man_valid: force valid to be true
:param man_trig: force a trigger now
:param circular_capture: enable circular capture
:param timeout: time out after this many seconds
:param read_nowait: do not wait for the snap to finish reading
"""
rawdata, rawtime = self.read_raw(**kwargs)
processed = self._process_data(rawdata['data'])
if 'offset' in rawdata.keys():
offset = rawdata['offset']
else:
offset = 0
return {'data': processed, 'offset': offset, 'timestamp': rawtime,
'extra_value': rawdata['extra_value']}
[docs] def read_raw(self, **kwargs):
"""
Read snap data from the memory device.
"""
snapsetup = {
'man_trig': False,
'man_valid': False,
'timeout': -1,
'offset': -1,
'read_nowait': False,
'circular_capture': False,
'arm': True
}
for kkey in kwargs.keys():
if kkey not in snapsetup:
raise RuntimeError('Invalid kwarg for '
'snap read_raw(): %s' % kkey)
for setupvar in snapsetup:
if setupvar in kwargs:
snapsetup[setupvar] = kwargs[setupvar]
if snapsetup['arm']:
self.arm(man_trig=snapsetup['man_trig'],
man_valid=snapsetup['man_valid'],
offset=snapsetup['offset'],
circular_capture=snapsetup['circular_capture'])
else:
error = False
for req in ['man_trig', 'man_valid', 'offset', 'circular_capture']:
if req in kwargs:
error = True
break
if error:
raise RuntimeError('Additional kwargs to snapshot read_raw() '
'will have no effect if arm=False '
'is specified.')
done = snapsetup['read_nowait']
start_time = time.time()
# TODO - what would a sensible option be to check addr?
# the default of zero is probably not right
addr = 0
while (not done) and \
((time.time() - start_time) < snapsetup['timeout'] or
(snapsetup['timeout'] < 0)):
addr = self.control_registers['status']['register'].read_uint()
done = not bool(addr & 0x80000000)
if snapsetup['read_nowait']:
addr = self.length_bytes
bram_dmp = {'extra_value': None, 'data': [],
'length': addr & 0x7fffffff, 'offset': 0}
status_val = self.control_registers['status']['register'].read_uint()
now_status = bool(status_val & 0x80000000)
now_addr = status_val & 0x7fffffff
if (not snapsetup['read_nowait']) and \
((bram_dmp['length'] != now_addr) or
(bram_dmp['length'] == 0) or now_status):
# if address is still changing, then the snap block didn't
# finish capturing. we return empty.
error_info = 'timeout %2.2f seconds. Addr at stop time: %i. ' \
'Now: Still running :%s, addr: %i.' % (
snapsetup['timeout'], bram_dmp['length'],
'yes' if now_status else 'no', now_addr)
if bram_dmp['length'] != now_addr:
raise RuntimeError('Snap %s error: Address still changing '
'after %s' % (self.name, error_info))
elif bram_dmp['length'] == 0:
raise RuntimeError('Snap %s error: Returned 0 bytes after '
'%s' % (self.name, error_info))
else:
raise RuntimeError('Snap %s error: %s' % (
self.name, error_info))
if snapsetup['circular_capture']:
val = self.control_registers['tr_en_cnt']['register'].read_uint()
bram_dmp['offset'] = val - bram_dmp['length']
else:
bram_dmp['offset'] = 0
if bram_dmp['length'] == 0:
bram_dmp['data'] = []
datatime = -1
else:
bram_dmp['data'] = self.parent.read(self.name + '_bram',
bram_dmp['length'])
datatime = time.time()
bram_dmp['offset'] += snapsetup['offset']
if bram_dmp['offset'] < 0:
bram_dmp['offset'] = 0
if bram_dmp['length'] != self.length_bytes:
raise RuntimeError('%s.read_uint() - expected %i bytes, got %i' % (
self.name, self.length_bytes,
bram_dmp['length'] / (self.width_bits / 8)))
# read the extra value
ev_reg = self.control_registers['extra_value']['register']
if ev_reg is not None:
bram_dmp['extra_value'] = ev_reg.read()
return bram_dmp, datatime
def __str__(self):
return '%s: %s' % (self.name, self.block_info)
def __repr__(self):
return '%s:%s' % (self.__class__.__name__, self.name)
[docs] @staticmethod
def packetise_snapdata(data, eof_key='eof', packet_length=-1, dv_key=None):
"""
Use the given EOF key to packetise a dictionary of snap data
:param data: a dictionary containing snap block data
:param eof_key: the key used to identify the packet boundaries - the eof
comes on the LAST VALID word in a packet
:param packet_length: check the length of the packets against
this as they are created (in 64-bit words)
:param dv_key: the key used to identify which data samples are valid
:return: a list of packets
"""
class PacketLengthError(Exception):
pass
current_packet = {}
packets = []
for ctr in range(0, len(data[eof_key])):
if dv_key is not None:
if data[dv_key][ctr] == 0:
# print('ctr({}) is zero'.format(ctr))
continue
for key in data.keys():
if key not in current_packet.keys():
current_packet[key] = []
current_packet[key].append(data[key][ctr])
if current_packet[eof_key][-1]:
if packet_length != -1:
if len(current_packet[eof_key]) != packet_length:
raise PacketLengthError(
'Expected {}, got {} at location {}.'.format(
packet_length, len(current_packet[eof_key]),
ctr))
packets.append(current_packet)
current_packet = {}
return packets