import logging
from network import Mac, IpAddress
from utils import check_changing_status, CheckCounter
[docs]class Gbe(object):
"""
A (multi)gigabit network interface on a device.
"""
[docs] def __init__(self, parent, name, address, length_bytes, device_info=None):
"""
Most of initialised from_device_info in child classes
:param parent:
:param name:
:param device_info:
"""
self.parent = parent
self.name = name
self.address = address
self.length_bytes = length_bytes
self.fullname = self.parent.host + ':' + self.name
self.block_info = device_info
self.process_device_info(device_info)
self.core_details = None
self.snaps = {'tx': None, 'rx': None}
self.registers = {'tx': [], 'rx': []}
self.multicast_subscriptions = []
# TODO
# if self.parent.is_connected():
# self._check()
@property
def mac(self):
return None
@property
def ip_address(self):
return None
@property
def port(self):
return None
[docs] @classmethod
def from_device_info(cls, parent, device_name, device_info, memorymap_dict, **kwargs):
"""
Process device info and the memory map to get all necessary info
and return a Gbe instance.
:param parent: the parent device, normally an FPGA instance
:param device_name: the unique device name
:param device_info: information about this device
:param memorymap_dict: a dictionary containing the device memory map
:return: a Gbe object
"""
address, length_bytes = -1, -1
for mem_name in memorymap_dict.keys():
if mem_name == device_name:
address = memorymap_dict[mem_name]['address']
length_bytes = memorymap_dict[mem_name]['bytes']
break
if address == -1 or length_bytes == -1:
raise RuntimeError('Could not find address or length '
'for Gbe device %s' % device_name)
return cls(parent, device_name, address, length_bytes, device_info)
def __repr__(self):
return '%s:%s' % (self.__class__.__name__, self.name)
def __str__(self):
"""
String representation of this GbE interface.
"""
return '%s: MAC(%s) IP(%s) Port(%s)' % (
self.name, str(self.mac), str(self.ip_address), str(self.port))
[docs] def process_device_info(self, device_info):
"""
Process device info to setup GbE object
:param device_info: Dictionary including:
* IP Address
* Mac Address
* Port number
"""
if device_info is None:
return
fabric_ip = device_info['fab_ip']
if fabric_ip.find('(2^24) + ') != -1:
device_info['fab_ip'] = (fabric_ip.replace('*(2^24) + ', '.')
.replace('*(2^16) + ', '.')
.replace('*(2^8) + ', '.')
.replace('*(2^0)', ''))
fabric_mac = device_info['fab_mac']
if fabric_mac.find('hex2dec') != -1:
fabric_mac = fabric_mac.replace('hex2dec(\'', '')
fabric_mac = fabric_mac.replace('\')', '')
device_info['fab_mac'] = (
fabric_mac[0:2] + ':' + fabric_mac[2:4] + ':' +
fabric_mac[4:6] + ':' + fabric_mac[6:8] + ':' +
fabric_mac[8:10] + ':' + fabric_mac[10:])
mac = device_info['fab_mac']
ip_address = device_info['fab_ip']
port = device_info['fab_udp']
if mac is None or ip_address is None or port is None:
raise ValueError('%s: 10Gbe interface must '
'have mac, ip and port.' % self.fullname)
[docs] def setup(self, mac, ipaddress, port, gateway=None, subnet_mask=None):
"""
Set up the MAC, IP and port for this interface
:param mac: String or Integer input, MAC address (e.g. '02:00:00:00:00:01')
:param ipaddress: String or Integer input, IP address (eg '10.0.0.1')
:param port: String or Integer input
"""
raise NotImplementedError('This is no longer required as the mac, '
'ip_address and port are no longer stored '
'as attributes. These values are retrieved '
'from the processing node when required.')
[docs] def post_create_update(self, raw_device_info):
"""
Update the device with information not available at creation.
:param raw_device_info: info about this block that may be useful
"""
self.registers = {'tx': [], 'rx': []}
for register in self.parent.registers:
if register.name.find(self.name + '_') == 0:
name = register.name.replace(self.name + '_', '')
if name[0:2] == 'tx' and name.find('txs_') == -1:
self.registers['tx'].append(register.name)
elif name[0:2] == 'rx' and name.find('rxs_') == -1:
self.registers['rx'].append(register.name)
else:
if not (name.find('txs_') == 0 or name.find('rxs_') == 0):
self.parent.logger.warn('%s: odd register name %s under Gbe '
'block' % (self.fullname, register.name))
def _check(self):
"""
Does this device exist on the parent and it is accessible?
"""
self.parent.read(self.name, 1)
[docs] def read_txsnap(self):
"""
Read the TX snapshot embedded in this GbE yellow block
"""
raise NotImplementedError
[docs] def read_rxsnap(self):
"""
Read the RX snapshot embedded in this GbE yellow block
"""
raise NotImplementedError
[docs] def read_rx_counters(self):
"""
Read all RX counters embedded in this TenGBE yellow block
"""
results = {}
for reg in self.registers['rx']:
results[reg] = self.parent.memory_devices[reg].read()['data']['reg']
return results
[docs] def read_tx_counters(self):
"""
Read all TX counters embedded in this TenGBE yellow block
"""
results = {}
for reg in self.registers['tx']:
results[reg] = self.parent.memory_devices[reg].read()['data']['reg']
return results
[docs] def read_counters(self):
"""
Read all the counters embedded in this TenGBE yellow block
"""
results = {}
for direction in ['tx', 'rx']:
for reg in self.registers[direction]:
tmp = self.parent.memory_devices[reg].read()
results[reg] = tmp['data']['reg']
return results
[docs] def rx_okay(self, wait_time=0.2, checks=10):
"""
Is this gbe core receiving okay?
i.e. _rxctr incrementing and _rxerrctr not incrementing
:param wait_time: seconds to wait between checks
:param checks: times to run check
:return: True/False
"""
if checks < 2:
raise RuntimeError('Cannot check less often than twice?')
fields = [
CheckCounter(self.name + '_rxctr', True, True),
CheckCounter(self.name + '_rxfullctr', False),
CheckCounter(self.name + '_rxofctr', False),
CheckCounter(self.name + '_rxerrctr', False),
CheckCounter(self.name + '_rxbadctr', False),
CheckCounter(self.name + '_rxvldctr'),
]
result, message = check_changing_status(
fields, self.read_rx_counters, wait_time, checks)
if not result:
self.parent.logger.error('%s: %s' % (self.fullname, message))
return False
return True
[docs] def tx_okay(self, wait_time=0.2, checks=10):
"""
Is this gbe core transmitting okay?
i.e. _txctr incrementing and _txerrctr not incrementing
:param wait_time: seconds to wait between checks
:param checks: times to run check
:return: True/False
"""
if checks < 2:
raise RuntimeError('Cannot check less often than twice?')
fields = {
CheckCounter(self.name + '_txctr', True, True),
CheckCounter(self.name + '_txfullctr', False),
CheckCounter(self.name + '_txofctr', False),
CheckCounter(self.name + '_txerrctr', False),
CheckCounter(self.name + '_txvldctr'),
}
result, message = check_changing_status(
fields, self.read_tx_counters, wait_time, checks)
if not result:
self.parent.logger.error('%s: %s' % (self.fullname, message))
return False
return True
[docs] def fabric_enable(self):
"""
Enable the core fabric
"""
raise NotImplementedError
[docs] def fabric_disable(self):
"""
Enable the core fabric
"""
raise NotImplementedError
[docs] def multicast_receive(self, ip_str, group_size):
"""
Send a multicast group join request.
:param ip_str: A dotted decimal string representation of the base
mcast IP address.
:param group_size: An integer for how many mcast addresses from
base to respond to.
"""
raise NotImplementedError
[docs] def multicast_remove(self, ip_str):
"""
Send a request to be removed from a multicast group.
:param ip_str: A dotted decimal string representation of the base
mcast IP address.
"""
raise NotImplementedError
[docs] def get_gbe_core_details(self, read_arp=False, read_cpu=False):
"""
:param read_arp:
:param read_cpu:
"""
raise NotImplementedError
[docs] def get_arp_details(self, port_dump=None):
"""
Get ARP details from this interface.
:param port_dump: A list of raw bytes from interface memory.
:type port_dump: list
"""
raise NotImplementedError
[docs] def get_cpu_details(self, port_dump=None):
"""
Read details of the CPU buffers.
:param port_dump:
"""
raise NotImplementedError
[docs] def print_gbe_core_details(self, arp=False, cpu=False, refresh=True):
"""
Prints 10GbE core details.
:param arp: include the ARP table
:type arp: boolean
:param cpu: include the CPU packet buffers
:type cpu: boolean
:param refresh: read the 10gbe details first
"""
if refresh or (self.core_details is None):
self.get_gbe_core_details(arp, cpu)
details = self.core_details
print('------------------------')
print('%s configuration:' % self.fullname)
print('MAC: ', Mac.mac2str(int(details['mac'])))
print('Gateway: ', details['gateway_ip'])
print('IP: ', details['ip'])
print('Fabric port: ',)
print('%5d' % details['fabric_port'])
print('Fabric interface is currently: %s' %
'Enabled' if details['fabric_en'] else 'Disabled')
print('XAUI Status: ', details['xaui_status'])
for ctr in range(0, 4):
print('\tlane sync %i: %i' % (ctr, details['xaui_lane_sync'][ctr]))
print('\tChannel bond: %i' % details['xaui_chan_bond'])
print('XAUI PHY config: ')
print('\tRX_eq_mix: %2X' % details['xaui_phy']['rx_eq_mix'])
print('\tRX_eq_pol: %2X' % details['xaui_phy']['rx_eq_pol'])
print('\tTX_pre-emph: %2X' % details['xaui_phy']['tx_preemph'])
print('\tTX_diff_ctrl: %2X' % details['xaui_phy']['tx_swing'])
print('Multicast:')
for k in details['multicast']:
print('\t%s: %s' % (k, details['multicast'][k]))
if arp:
self.print_arp_details(refresh=refresh, only_hits=True)
if cpu:
self.print_cpu_details(refresh=refresh)
[docs] def print_arp_details(self, refresh=False, only_hits=False):
"""
Print nicely formatted ARP info.
:param refresh:
:param only_hits:
"""
details = self.core_details
if details is None:
refresh = True
elif 'arp' not in details.keys():
refresh = True
if refresh:
self.get_gbe_core_details(read_arp=True)
print('ARP Table: ')
for ip_address in range(256):
all_fs = True
if only_hits:
for mac in range(0, 6):
if details['arp'][ip_address][mac] != 255:
all_fs = False
break
printmac = True
if only_hits and all_fs:
printmac = False
if printmac:
print('IP: %s%3d: MAC:' % (details['ip_prefix'], ip_address),)
for mac in range(0, 6):
print('%02X' % details['arp'][ip_address][mac],)
print('')
[docs] def print_cpu_details(self, refresh=False):
"""
Print nicely formatted CPU details info.
:param refresh:
"""
details = self.core_details
if details is None:
refresh = True
elif 'cpu_rx' not in details.keys():
refresh = True
if refresh:
self.get_gbe_core_details(read_cpu=True)
print('CPU TX Interface (at offset 4096 bytes):')
print('Byte offset: Contents (Hex)')
for key, value in details['cpu_tx'].iteritems():
print('%04i: ' % key,)
for val in value:
print('%02x' % val,)
print('')
print('------------------------')
print('CPU RX Interface (at offset 8192bytes):')
print('CPU packet RX buffer unacknowledged data: %i' %
details['cpu_rx_buf_unack_data'])
print('Byte offset: Contents (Hex)')
for key, value in details['cpu_rx'].iteritems():
print('%04i: ' % key,)
for val in value:
print('%02x' % val,)
print('')
print('------------------------')
# end