Source code for tengbe

import logging
import struct

from memory import Memory
from network import Mac, IpAddress
from gbe import Gbe
import numpy as np
from pkg_resources import resource_filename

LOGGER = logging.getLogger(__name__)

STRUCT_CTYPES = {1: 'B', 2: 'H', 4: 'L', 8: 'Q'}
STRUCT_CTYPES_TO_B = {'B': 1, 'H': 2, 'L': 4, 'Q': 8}

TENGBE_UNIFIED_MMAP_TXT = 'tengbe_mmap.txt'
TENGBE_MMAP_LEGACY_TXT  = 'tengbe_mmap_legacy.txt'

[docs]def read_memory_map_definition(filename): """ Read memory map definition from text file. Returns a python dictionary: {REGISTER_NAME1: {'offset': offset, 'size': size, 'rwflag': rwflag}, REGISTER_NAME2: {'offset': offset, 'size': size, 'rwflag': rwflag} ...} Notes: Used by TenGbe.configure_core() to write to mmap. """ mmap_arr = np.genfromtxt(filename, dtype='str', skip_header=1) mmap_keys = list(mmap_arr[:, 0]) mmap_offsets = [int(x, 0) for x in mmap_arr[:, 1]] mmap_size = [int(x, 0) for x in mmap_arr[:, 2]] mmap_rw = list(mmap_arr[:, 3]) mmap = {} for ii, k in enumerate(mmap_keys): mmap[k] = {'offset': mmap_offsets[ii], 'size': mmap_size[ii], 'rwflag': mmap_rw[ii]} return mmap
[docs]class TenGbe(Memory, Gbe): """ To do with the CASPER ten GBE yellow block implemented on FPGAs, and interfaced-to via KATCP memory reads/writes. """
[docs] def __init__(self, parent, name, address, length_bytes, device_info=None): """ :param parent: Parent object who owns this TenGbe instance :param name: Unique name of the instance :param address: :param length_bytes: :param device_info: Information about this device """ Memory.__init__(self, name, 32, address, length_bytes) Gbe.__init__(self, parent, name, address, length_bytes, device_info) self.memmap_compliant = self._check_memmap_compliance() if self.memmap_compliant: self.memmap = read_memory_map_definition(TENGBE_UNIFIED_MMAP_TXT) else: self.memmap = read_memory_map_definition(TENGBE_MMAP_LEGACY_TXT)
@property def mac(self): return self.get_gbe_core_details()['mac'] @property def ip_address(self): return self.get_gbe_core_details()['ip'] @property def port(self): return self.get_gbe_core_details()['fabric_port'] def _check_memmap_compliance(self): """ Look at the first word of the core's memory map and try to figure out if it compliant with the harmonized ethernet map. This isn't flawless, but unless the user sets a very weird MAC address for their core (which is what the old core's map stored in register 0, it should be OK). """ x = self.parent.read(self.name, 4) cpu_tx_en, cpu_rx_en, rev, core_type = struct.unpack('4B', x) if (cpu_tx_en > 1) or (cpu_rx_en > 1) or (core_type != 2): return False else: return True
[docs] def post_create_update(self, raw_device_info): """ Update the device with information not available at creation. :param raw_device_info: info about this block that may be useful """ super(TenGbe, self).post_create_update(raw_device_info) self.snaps = {'tx': None, 'rx': None} for snapshot in self.parent.snapshots: if snapshot.name.find(self.name + '_') == 0: name = snapshot.name.replace(self.name + '_', '') if name == 'txs_ss': self.snaps['tx'] = snapshot.name elif name == 'rxs_ss': self.snaps['rx'] = snapshot.name else: errmsg = '%s: incorrect snap %s under tengbe ' \ 'block' % (self.fullname, snapshot.name) LOGGER.error(errmsg) raise RuntimeError(errmsg)
[docs] def read_txsnap(self): """ Read the TX snapshot embedded in this TenGBE yellow block """ return self.snaps['tx'].read(timeout=10)['data']
[docs] def read_rxsnap(self): """ Read the RX snapshot embedded in this TenGBE yellow block """ return self.snaps['rx'].read(timeout=10)['data']
def _memmap_write(self, register, value): """ Write to memory map :param register: register to write to. Register must be in memmap. :param value: Value to write. """ offset = self.memmap[register]['offset'] bytesize = self.memmap[register]['size'] rw_addr = offset - offset % 4 # Round in case of non 32-bit writes ctype = STRUCT_CTYPES.get(bytesize, 'L') # Treat as 32-bit if longer than 8B if self.memmap[register]['rwflag'] == 'r': raise RuntimeError("Warning: %s is read-only!" % register) if bytesize in (1, 2): n_elem = int(4 / bytesize) pcode = '>%i%s' % (n_elem, ctype) current_value = self.parent.read(self.name, size=4, offset=rw_addr) new_arr = list(struct.unpack(pcode, current_value)) new_arr[offset % n_elem] = value packed = struct.pack(pcode, *new_arr) elif bytesize in (4, 8): if isinstance(value, str): packed = value else: packed = struct.pack('>%s' % ctype, value) else: raise RuntimeError("Can only write 1,2,4,8 Byte registers with this function.") self.parent.blindwrite(self.name, packed, offset=rw_addr) def _memmap_read(self, register): """ Read from memory map :param register: register to read from. Must be in memmap. """ offset = self.memmap[register]['offset'] bytesize = self.memmap[register]['size'] ctype = STRUCT_CTYPES.get(bytesize) # Treat as 32-bit if longer than 8B if bytesize in (4, 8): value = self.parent.read(self.name, size=bytesize, offset=offset) value = struct.unpack('>%s' % ctype, value)[0] elif bytesize in (1, 2): if bytesize == 2 and offset % 4 not in (0, 2): raise RuntimeError("Attempted to read 16-bits from 32-bit word with %iB offset. " "Not supported." % (offset%4)) read_addr = offset - offset % 4 value = self.parent.read(self.name, size=4, offset=read_addr) valuearr = struct.unpack('>%i%s' % (int(4 / bytesize), ctype), value) value = valuearr[int((offset % 4) / bytesize)] else: raise RuntimeError("Cannot read %s of size %i: only 1,2,4,8 B supported" % (register, bytesize)) return value def _memmap_read_array(self, register, ctype='L'): """ Read array chunk from mem-map """ offset = self.memmap[register]['offset'] bytesize = self.memmap[register]['size'] if isinstance(ctype, str): wsize = STRUCT_CTYPES_TO_B[ctype] elif isinstance(ctype, int): wsize = ctype ctype = STRUCT_CTYPES[ctype] else: raise RuntimeError('Unknown ctype: %s' % ctype) value = self.parent.read(self.name, size=bytesize, offset=offset) value = struct.unpack('>%i%s' % (int(bytesize / wsize), ctype), value) return value def _memmap_write_array(self, register, value, ctype='L'): offset = self.memmap[register]['offset'] bytesize = self.memmap[register]['size'] if isinstance(ctype, str): wsize = STRUCT_CTYPES_TO_B[ctype] elif isinstance(ctype, int): wsize = ctype ctype = STRUCT_CTYPES[ctype] else: raise RuntimeError('Unknown ctype: %s' % ctype) n_elem = int(bytesize / wsize) # Differs to n_elem in small bytesize code! if len(value) != n_elem: raise RuntimeError("Register is %i 32-bit words long, but array is " "of length %i. Make sure these match." % (len(value), n_elem)) if isinstance(value, str): packed = value else: packed = struct.pack('>%i%s' % (n_elem, ctype), *value) self.parent.blindwrite(self.name, packed, offset=offset)
[docs] def configure_core(self): """ Setup the interface by writing to the fabric directly, bypassing tap. :param self: :return: """ gateway = 1 if self.gateway is None else self.gateway.ip_int self._memmap_write('MAC_ADDR', self.mac.mac_int) self._memmap_write('IP_ADDR', self.ip_address.ip_int) self._memmap_write('NETMASK', self.subnet_mask.ip_int) self._memmap_write('GW_ADDR', gateway) self._memmap_write('PORT', self.port) self.fabric_enable() self.fabric_soft_reset_toggle()
[docs] def dhcp_start(self): """ Configure this interface, then start a DHCP client on ALL interfaces. """ #if self.mac is None: # TODO get MAC from EEPROM serial number and assign here #self.mac = '0' reply, _ = self.parent.transport.katcprequest( name='tap-start', request_timeout=5, require_ok=True, request_args=(self.name, self.name, '0.0.0.0', str(self.port), str(self.mac), )) if reply.arguments[0] != 'ok': raise RuntimeError('%s: failure starting tap driver.' % self.name) reply, _ = self.parent.transport.katcprequest( name='tap-arp-config', request_timeout=1, require_ok=True, request_args=(self.name, 'mode', '0')) if reply.arguments[0] != 'ok': raise RuntimeError('%s: failure disabling ARP.' % self.name) reply, _ = self.parent.transport.katcprequest( name='tap-dhcp', request_timeout=30, require_ok=True, request_args=(self.name, )) if reply.arguments[0] != 'ok': raise RuntimeError('%s: failure starting DHCP client.' % self.name) reply, _ = self.parent.transport.katcprequest( name='tap-arp-config', request_timeout=1, require_ok=True, request_args=(self.name, 'mode', '-1')) if reply.arguments[0] != 'ok': raise RuntimeError('%s: failure re-enabling ARP.' % self.name) # it looks like the command completed without error, so # update the basic core details self.get_gbe_core_details()
[docs] def tap_start(self, restart=False): """ Program a 10GbE device and start the TAP driver. :param restart: stop before starting """ if len(self.name) > 8: raise NameError('%s: tap device identifier must be shorter than 9 ' 'characters..' % self.fullname) if restart: self.tap_stop() if self.tap_running(): LOGGER.info('%s: tap already running.' % self.fullname) return LOGGER.info('%s: starting tap driver.' % self.fullname) reply, _ = self.parent.transport.katcprequest( name='tap-start', request_timeout=-1, require_ok=True, request_args=(self.name, self.name, str(self.ip_address), str(self.port), str(self.mac), )) if reply.arguments[0] != 'ok': raise RuntimeError('%s: failure starting tap driver.' % self.fullname)
[docs] def tap_stop(self): """ Stop a TAP driver. """ if not self.tap_running(): return LOGGER.info('%s: stopping tap driver.' % self.fullname) reply, _ = self.parent.transport.katcprequest( name='tap-stop', request_timeout=-1, require_ok=True, request_args=(self.name, )) if reply.arguments[0] != 'ok': raise RuntimeError('%s: failure stopping tap ' 'device.' % self.fullname)
[docs] def tap_info(self): """ Get info on the tap instance running on this interface. """ uninforms = [] def handle_inform(msg): uninforms.append(msg) self.parent.unhandled_inform_handler = handle_inform _, informs = self.parent.transport.katcprequest( name='tap-info', request_timeout=-1, require_ok=False, request_args=(self.name, )) self.parent.unhandled_inform_handler = None # process the tap-info if len(informs) == 1: return {'name': informs[0].arguments[0], 'ip': informs[0].arguments[1]} elif len(informs) == 0: return {'name': '', 'ip': ''} else: raise RuntimeError('%s: invalid return from tap-info?' % self.fullname)
# TODO - this request should return okay if the tap isn't # running - it shouldn't fail # if reply.arguments[0] != 'ok': # log_runtime_error(LOGGER, 'Failure getting tap info for ' # 'device %s." % str(self))
[docs] def tap_running(self): """ Determine if an instance if tap is already running on for this ten GBE interface. """ tapinfo = self.tap_info() if tapinfo['name'] == '': return False return True
[docs] def tap_arp_reload(self): """ Have the tap driver reload its ARP table right now. """ reply, _ = self.parent.transport.katcprequest( name="tap-arp-reload", request_timeout=-1, require_ok=True, request_args=(self.name, )) if reply.arguments[0] != 'ok': raise RuntimeError('Failure requesting ARP reload for tap ' 'device %s.' % str(self))
[docs] def multicast_receive(self, ip_str, group_size): """ Send a request to KATCP to have this tap instance send a multicast group join request. :param ip_str: A dotted decimal string representation of the base mcast IP address. :param group_size: An integer for how many mcast addresses from base to respond to. """ # mask = 255*(2 ** 24) + 255*(2 ** 16) + 255*(2 ** 8) + (255-group_size) # self.parent.write_int(self.name, str2ip(ip_str), offset=12) # self.parent.write_int(self.name, mask, offset=13) # mcast_group_string = ip_str + '+' + str(group_size) mcast_group_string = ip_str reply, _ = self.parent.transport.katcprequest( 'tap-multicast-add', -1, True, request_args=(self.name, 'recv', mcast_group_string, )) if reply.arguments[0] == 'ok': if mcast_group_string not in self.multicast_subscriptions: self.multicast_subscriptions.append(mcast_group_string) return else: raise RuntimeError('%s: failed adding multicast receive %s to ' 'tap device.' % (self.fullname, mcast_group_string))
[docs] def multicast_remove(self, ip_str): """ Send a request to be removed from a multicast group. :param ip_str: A dotted decimal string representation of the base mcast IP address. """ try: reply, _ = self.parent.transport.katcprequest( 'tap-multicast-remove', -1, True, request_args=(self.name, IpAddress.str2ip(ip_str), )) except: raise RuntimeError('%s: tap-multicast-remove does not seem to ' 'be supported on %s' % (self.fullname, self.parent.host)) if reply.arguments[0] == 'ok': if ip_str not in self.multicast_subscriptions: LOGGER.warning( '%s: That is odd, %s removed from mcast subscriptions, but ' 'it was not in its list of sbscribed addresses.' % ( self.fullname, ip_str)) self.multicast_subscriptions.remove(ip_str) return else: raise RuntimeError('%s: failed removing multicast address %s ' 'from tap device' % (self.fullname, IpAddress.str2ip(ip_str)))
def _fabric_enable_disable(self, target_val): """ :param target_val: """ self._memmap_write('ENABLE', target_val)
[docs] def fabric_enable(self): """ Enable the core fabric """ self._fabric_enable_disable(1)
[docs] def fabric_disable(self): """ Enable the core fabric """ self._fabric_enable_disable(0)
[docs] def fabric_soft_reset_toggle(self): """ Toggle the fabric soft reset """ self._memmap_write('SOFT_RST', 0) self._memmap_write('SOFT_RST', 1) self._memmap_write('SOFT_RST', 0) FLAGS_OFFSET = self.memmap['FLAGS']['offset'] FLAGS_SIZE = self.memmap['FLAGS']['size']
[docs] def get_gbe_core_details(self, read_arp=False, read_cpu=False, read_multicast=False): """ Get 10GbE core details. :param read_arp (bool): Get ARP table details (default False) :param read_cpu (bool): Get CPU details (default False) :param read_multicast (bool): Get multicast address table (default False) :returns: dictionary of core details (IP address, subnet mask, MAC address, port, etc). """ IP_ADDR = self._memmap_read('IP_ADDR') IP_PREFIX = '.'.join(IpAddress(IP_ADDR).ip_str.split('.')[:3]) returnval = { 'ip_prefix': IP_PREFIX, 'ip': IpAddress(IP_ADDR), 'subnet_mask': IpAddress(self._memmap_read('NETMASK')), 'mac': Mac(self._memmap_read('MAC_ADDR')), 'gateway_ip': IpAddress(self._memmap_read('GW_ADDR')), 'fabric_port': self._memmap_read('PORT'), 'fabric_en': self._memmap_read('ENABLE'), 'multicast': {'base_ip': IpAddress(self._memmap_read('MC_IP')), 'ip_mask': IpAddress(self._memmap_read('MC_MASK')), 'rx_ips': []} } if not self.memmap_compliant: data = self.parent.read(self.name, 16384) data = list(struct.unpack('>16384B', data)) returnval_legacy_dict = { 'xaui_lane_sync': [bool(data[0x27] & 4), bool(data[0x27] & 8), bool(data[0x27] & 16), bool(data[0x27] & 32)], 'xaui_status': [data[0x24], data[0x25], data[0x26], data[0x27]], 'xaui_chan_bond': bool(data[0x27] & 64), 'xaui_phy': {'rx_eq_mix': data[0x28], 'rx_eq_pol': data[0x29], 'tx_preemph': data[0x2a], 'tx_swing': data[0x2b]}, } returnval.update(returnval_legacy_dict) if read_multicast: possible_addresses = [int(returnval['multicast']['base_ip'])] mask_int = int(returnval['multicast']['ip_mask']) for ctr in range(32): mask_bit = (mask_int >> ctr) & 1 if not mask_bit: new_ips = [] for ip in possible_addresses: new_ips.append(ip & (~(1 << ctr))) new_ips.append(new_ips[-1] | (1 << ctr)) possible_addresses.extend(new_ips) tmp = list(set(possible_addresses)) for ip in tmp: returnval['multicast']['rx_ips'].append(IpAddress(ip)) if read_arp: returnval['arp'] = self.get_arp_details() if read_cpu: returnval.update(self.get_cpu_details()) self.core_details = returnval return returnval
[docs] def get_arp_details(self, N=256): """ Get ARP details from this interface. """ arp_table = self._memmap_read_array('ARP_CACHE', ctype='Q') return map(Mac, arp_table[:N])
[docs] def get_cpu_details(self, port_dump=None): """ Read details of the CPU buffers. :param port_dump: """ #TODO Not memmap compliant if port_dump is None: port_dump = self.parent.read(self.name, 16384) port_dump = list(struct.unpack('>16384B', port_dump)) returnval = {'cpu_tx': {}} for ctr in range(4096 / 8): tmp = [] for ctr2 in range(8): tmp.append(port_dump[4096 + (8 * ctr) + ctr2]) returnval['cpu_tx'][ctr*8] = tmp returnval['cpu_rx_buf_unack_data'] = port_dump[6 * 4 + 3] returnval['cpu_rx'] = {} for ctr in range(port_dump[6 * 4 + 3] + 8): tmp = [] for ctr2 in range(8): tmp.append(port_dump[8192 + (8 * ctr) + ctr2]) returnval['cpu_rx'][ctr * 8] = tmp return returnval
[docs] def set_arp_table(self, macs): """Set the ARP table with a list of MAC addresses. The list, `macs`, is passed such that the zeroth element is the MAC address of the device with IP XXX.XXX.XXX.0, and element N is the MAC address of the device with IP XXX.XXX.XXX.N""" arp_addr = self.memmap['ARP_CACHE']['offset'] macs = list(macs) if isinstance(macs[0], Mac): macs = [m.mac_int for m in macs] macs_packed = struct.pack('>%dQ' % (len(macs)), *macs) self.parent.write(self.name, macs_packed, offset=arp_addr)
# end