"""
SPEAD operations - unpack and use spead data, usually from Snap blocks.
"""
import logging
LOGGER = logging.getLogger(__name__)
[docs]class SpeadPacket(object):
"""
A Spead packet. Headers and data.
"""
[docs] class SpeadPacketError(Exception):
pass
[docs] @staticmethod
def decode_spead_magic_word(word64, required_version=None,
required_flavour=None,
required_numheaders=None):
"""
Decode a 64-bit word as a SPEAD header.
:param word64: A 64-bit word
:param required_version: the specific SPEAD version required, an integer
:param required_flavour: the specific SPEAD flavour required as
a string, e.g. '64,48'
:param required_numheaders: the number of headers (NOT incl. the
magic number) expected, an integer
"""
magic_number = word64 >> 56
spead_version = (word64 >> 48) & 0xff
spead_id_width = (word64 >> 40) & 0xff
spead_addr_width = (word64 >> 32) & 0xff
reserved = (word64 >> 16) & 0xffff
num_headers = word64 & 0xffff
spead_flavour = '%s,%s' % (
(spead_addr_width * 8) + (spead_id_width * 8), spead_addr_width * 8)
if magic_number != 83:
raise SpeadPacket.SpeadPacketError(
'Wrong SPEAD magic number, expected {}, got {}'.format(
83, magic_number))
if reserved != 0:
raise SpeadPacket.SpeadPacketError(
'Wrong SPEAD reserved section, expected {}, got {}'.format(
0, reserved))
if (required_version is not None) and (
spead_version != required_version):
raise SpeadPacket.SpeadPacketError(
'Wrong SPEAD version, expected {}, got {}'.format(
required_version, spead_version))
if (required_flavour is not None) and (
spead_flavour != required_flavour):
raise SpeadPacket.SpeadPacketError(
'Wrong SPEAD flavour, expected {}, got {}'.format(
required_flavour, spead_flavour))
if (required_numheaders is not None) and (
num_headers != required_numheaders):
raise SpeadPacket.SpeadPacketError(
'Wrong num SPEAD hdrs, expected {}, got {}'.format(
required_numheaders, num_headers))
return {'magic_number': magic_number,
'version': spead_version,
'id_bits': spead_id_width * 8,
'address_bits': spead_addr_width * 8,
'reserved': reserved,
'num_headers': num_headers,
'flavour': spead_flavour}
[docs] @staticmethod
def decode_item_pointer(header64, id_bits, address_bits):
"""
Decode a 64-bit header word in the id and data/pointer portions
:param header64: the 64-bit word
:param id_bits: how many bits are used for the ID
:param address_bits: how many bits are used for the data/pointer
:return: a tuple of the ID and data/pointer
"""
hdr_id = header64 >> address_bits
# if the top bit is set, it's immediate addressing so clear the top bit
if hdr_id & pow(2, id_bits - 1):
hdr_id &= pow(2, id_bits - 1) - 1
hdr_data = header64 & (pow(2, address_bits) - 1)
return hdr_id, hdr_data
[docs] def __init__(self, headers=None, data=None):
"""
Create a new SpeadPacket object
"""
self.headers = headers if headers is not None else {}
self.data = data if data is not None else []
[docs] @classmethod
def from_data(cls, data, expected_version=None, expected_flavour=None,
expected_hdrs=None, expected_length=None):
"""
Create a SpeadPacket from a list of 64-bit data words
Assumes the list of data starts at the SPEAD magic word.
"""
(headers, hdr_pkt_len_bytes) = SpeadPacket.decode_headers(
data, expected_version, expected_flavour, expected_hdrs)
main_header = headers[0x0000]
pktdata = [] # this is 64-bit words, which is admittedly a bit arb
pktlen = 0
for ctr in range(main_header['num_headers']+1, len(data)):
pktdata.append(data[ctr])
pktlen += 1
pktlen_bytes = pktlen * 8
if (expected_length is not None) and (pktlen != expected_length):
raise SpeadPacket.SpeadPacketError(
'Packet is not the expected length, given_expected(%i bytes) '
'packet(%i bytes)' % (expected_length * 8, pktlen_bytes))
# the data may be too long here. think 64-bit packets into 256-bit
# interface.
if pktlen_bytes > hdr_pkt_len_bytes:
# too much data in heap, chop it off
hdr_pkt_len_64 = hdr_pkt_len_bytes / 8
pktdata = pktdata[:hdr_pkt_len_64]
LOGGER.warn('Packet seemed to have more data in it than the SPEAD'
'headers describe: pkt(%i bytes) header(%i bytes)' % (
pktlen_bytes, hdr_pkt_len_bytes))
elif pktlen_bytes < hdr_pkt_len_bytes:
raise SpeadPacket.SpeadPacketError(
'Packet contains less data than indicated in the SPEAD '
'header: hdr(%i bytes) packet(%i bytes)\nCheck the magic '
'header, number of headers and headers 2 and 4.' % (
hdr_pkt_len_bytes, pktlen_bytes))
obj = cls(headers, pktdata)
return obj
[docs] def get_strings(self, headers_only=False, hex_nums=False):
"""
Get a list of the string representation of this packet.
"""
rv = ['header 0x0000: version(%i) flavour(%s) num_headers(%i)' % (
self.headers[0]['version'], self.headers[0]['flavour'],
self.headers[0]['num_headers'])]
for hdr_id, hdr_value in self.headers.items():
if hdr_id == 0x0000:
continue
if hex_nums:
rv.append('header 0x%04x: 0x%x' % (hdr_id, hdr_value))
else:
rv.append('header 0x%04x: %i' % (hdr_id, hdr_value))
if headers_only:
return rv
for dataword in self.data:
if hex_nums:
rv.append('0x%016x' % dataword)
else:
rv.append('%i' % dataword)
return rv
[docs] def print_packet(self, headers_only=False, hex_nums=False):
"""
Print a representation of the packet.
"""
for string in self.get_strings(headers_only, hex_nums):
print(string)
[docs]class SpeadProcessor(object):
"""
Set up a SPEAD processor with version, flavour, etc. Then call methods
to process data.
"""
[docs] def __init__(self, version=4, flavour='64,48',
packet_length=None, num_headers=None):
"""
Create a SpeadProcessor
:param version
:param flavour
:param packet_length
:param num_headers
"""
self.packets = []
self.version = version
self.flavour = flavour
self.expected_num_headers = num_headers
self.expected_packet_length = packet_length
[docs] def process_data(self, data_packets):
"""
Create SpeadPacket objects from a list of data packets.
"""
if len(data_packets) == 0:
return
for pkt in data_packets:
try:
pkt_data = pkt['data']
pkt_ip = pkt['ip']
except TypeError:
pkt_data = pkt
pkt_ip = None
except KeyError:
if 'ip' not in pkt:
pkt_ip = None
if 'data' not in pkt:
raise RuntimeError('Could not find data key')
spead_pkt = SpeadPacket.from_data(
pkt_data, self.version, self.flavour,
self.expected_num_headers, self.expected_packet_length)
if pkt_ip is not None:
spead_pkt.ip = pkt_ip
self.packets.append(spead_pkt)
# def process_spead_word(current_spead_info, data, pkt_counter):
#
# if pkt_counter == 1:
# spead_header = decode_spead_magic_word(data)
# if len(current_spead_info) == 0:
# current_spead_info = spead_header
# rv_string = 'spead %s, %d headers to come' % (spead_header['flavour'], spead_header['num_headers'])
# if current_spead_info['num_headers'] != spead_header['num_headers']:
# rv_string += ', ERROR: num spead hdrs changed from %d to %d?!' %\
# (current_spead_info['num_headers'], spead_header['num_headers'])
# return spead_header, rv_string
# elif (pkt_counter > 1) and (pkt_counter <= 1 + current_spead_info['num_headers']):
# hdr_id, hdr_data = decode_item_pointer(data, current_spead_info['id_bits'], current_spead_info['address_bits'])
# if hdr_id == 0x0004:
# # the SPEAD packet length is in BYTES! we're counting 64-bit words, so divide by 8
# current_spead_info['packet_length'] = current_spead_info['num_headers'] + (hdr_data / 8)
# string_data = 'spead hdr 0x%04x: ' % hdr_id + ('%d' % hdr_data if not True else '0x%X' % hdr_data)
# return current_spead_info if hdr_id == 0x0004 else None, string_data
# else:
# # data = '%d, %d, %d, %d' % (data >> 48, (data >> 32) & 0xffff, (data >> 16) & 0xffff, (data >> 0) & 0xffff)
# return None, data
# def gbe_to_spead(gbedata):
# pkt_counter = 1
# _current_packet = {'headers': {}, 'data': []}
# for wordctr in range(0, len(gbedata)):
# if pkt_counter == 1:
# spead_header = decode_spead_magic_word(gbedata[wordctr])
# _current_packet['headers'][0] = spead_header
# elif (pkt_counter > 1) and (pkt_counter <= 1 + _current_packet['headers'][0]['num_headers']):
# hdr_id, hdr_data = decode_item_pointer(gbedata[wordctr],
# _current_packet['headers'][0]['id_bits'],
# _current_packet['headers'][0]['address_bits'])
# # the SPEAD packet length is in BYTES! we're counting 64-bit words, so divide by 8
# if hdr_id == 0x0004:
# _current_packet['headers'][0]['packet_length'] = _current_packet['headers'][0]['num_headers'] + \
# (hdr_data / 8)
# if hdr_id in _current_packet['headers'].keys():
# raise RuntimeError('Header ID 0x%04x already exists in packet!' % hdr_id)
# _current_packet['headers'][hdr_id] = hdr_data
# else:
# _current_packet['data'].append(gbedata[wordctr])
# pkt_counter += 1
# if _current_packet['headers'][0]['packet_length'] + 1 != len(gbedata):
# raise ValueError('SPEAD header packet length %d does not match GBE packet length %d') % \
# (_current_packet['headers'][0]['packet_length'] + 1, len(gbedata))
# return _current_packet
# def decode_spead(spead_data, eof_data=None):
# """
# Given a data list and EOF list from a snapblock, decode SPEAD data and store it in spead packets
# """
# if eof_data is not None:
# if len(spead_data) != len(eof_data):
# raise RuntimeError('Need EOF and data lengths to be the same!')
# first_spead_header = find_spead_header(spead_data, SPEAD_EXPECTED_VERSION, SPEAD_EXPECTED_FLAVOUR)
# if first_spead_header == -1:
# raise RuntimeError('Could not find valid SPEAD header.')
# else:
# first_spead_header = 0
# spead_packets = []
# _current_packet = {'headers': {}, 'data': []}
# pkt_counter = 1
# for wordctr in range(first_spead_header, len(spead_data)):
# if eof_data[wordctr]:
# if pkt_counter != _current_packet['headers'][0]['packet_length'] + 1:
# _current_packet['headers'][0]['length_error'] = True
# spead_packets.append(_current_packet)
# _current_packet = {'headers': {}, 'data': []}
# pkt_counter = 0
# elif pkt_counter == 1:
# spead_header = decode_spead_header(spead_data[wordctr])
# if len(spead_packets) > 0:
# if spead_packets[0]['headers'][0]['num_headers'] != spead_header['num_headers']:
# raise RuntimeError('SPEAD header format changed mid-snapshot?')
# _current_packet['headers'][0] = spead_header
# elif (pkt_counter > 1) and (pkt_counter <= 1 + _current_packet['headers'][0]['num_headers']):
# hdr_id, hdr_data = decode_item_pointer_(_current_packet['headers'][0]['address_bits'],
# _current_packet['headers'][0]['id_bits'],
# spead_data[wordctr])
# # the SPEAD packet length is in BYTES! we're counting 64-bit words, so divide by 8
# if hdr_id == 0x0004:
# _current_packet['headers'][0]['packet_length'] = _current_packet['headers'][0]['num_headers'] + \
# (hdr_data / 8)
# if hdr_id in _current_packet['headers'].keys():
# raise RuntimeError('Header ID 0x%04x already exists in packet!' % hdr_id)
# _current_packet['headers'][hdr_id] = hdr_data
# else:
# _current_packet['data'].append(spead_data[wordctr])
# pkt_counter += 1
# return spead_packets