from __future__ import print_function
import threading
import Queue
import time
import logging
LOGGER = logging.getLogger(__name__)
[docs]class CheckCounter(object):
[docs] def __init__(self, name, must_change=True, required=False):
"""
:param name: the name of the counter
:param must_change: this counter should be changing
:param required: is this counter required?
"""
self.name = name
self.must_change = must_change
self.required = required
self.data = None
self.changed = False
[docs]def get_kwarg(field, kwargs, default=None):
try:
return kwargs[field]
except KeyError:
return default
[docs]def get_hostname(**kwargs):
"""
:param kwargs:
"""
host = kwargs['host']
bitstream = get_kwarg('bitstream', kwargs)
if ',' in host:
host, bitstream = host.split(',')
return host, bitstream
[docs]def parse_fpg(filename):
"""
Read the meta information from the FPG file.
:param filename: the name of the fpg file to parse
:return: device info dictionary, memory map info (coreinfo.tab) dictionary
"""
LOGGER.debug('Parsing file %s for system information' % filename)
if filename is not None:
fptr = open(filename, 'r')
firstline = fptr.readline().strip().rstrip('\n')
if firstline != '#!/bin/kcpfpg':
fptr.close()
raise RuntimeError('%s does not look like an fpg file we can '
'parse.' % filename)
else:
raise IOError('No such file %s' % filename)
memorydict = {}
metalist = []
while True:
line = fptr.readline().strip().rstrip('\n')
if line.lstrip().rstrip() == '?quit':
break
elif line.startswith('?meta'):
# some versions of mlib_devel may mistakenly have put spaces
# as delimiters where tabs should have been used. Rectify that
# here.
if line.startswith('?meta '):
LOGGER.warn('An old version of mlib_devel generated %s. Please '
'update. Meta fields are seperated by spaces, '
'should be tabs.' % filename)
line = line.replace(' ', '\t')
# and carry on as usual.
line = line.replace('\_', ' ').replace('?meta', '')
line = line.replace('\n', '').lstrip().rstrip()
#line_split = line.split('\t')
# Rather split on any space
line_split = line.split()
name = line_split[0]
tag = line_split[1]
param = line_split[2]
if len(line_split[3:]) == 1:
value = line_split[3:][0]
else:
value = ' '.join(line_split[3:])
# name, tag, param, value = line.split('\t')
name = name.replace('/', '_')
metalist.append((name, tag, param, value))
elif line.startswith('?register'):
if line.startswith('?register '):
register = line.replace('\_', ' ').replace('?register ', '')
register = register.replace('\n', '').lstrip().rstrip()
name, address, size_bytes = register.split(' ')
elif line.startswith('?register\t'):
register = line.replace('\_', ' ').replace('?register\t', '')
register = register.replace('\n', '').lstrip().rstrip()
name, address, size_bytes = register.split('\t')
else:
raise ValueError('Cannot find ?register entries in '
'correct format.')
address = int(address, 16)
size_bytes = int(size_bytes, 16)
if name in memorydict.keys():
raise RuntimeError('%s: mem device %s already in '
'dictionary' % (filename, name))
memorydict[name] = {'address': address, 'bytes': size_bytes}
fptr.close()
return create_meta_dictionary(metalist), memorydict
[docs]def get_git_info_from_fpg(fpg_file):
"""
Method to get git info from an fpg-file's header
:param fpg_file: filename as string
:return: Dictionary of git_info
- key = git-repo
- value = git-version
"""
git_tag = '77777_git'
git_info_dict = None
# This returns a tuple of dictionaries,
# - device info dict, memory map info (coreinfo.tab) dict
# - Git info is meta-data, and should only ever be in the
# first dictionary of the tuple
fpg_header = parse_fpg(fpg_file)
fpg_metadata = fpg_header[0]
git_info_dict = fpg_metadata.get(git_tag, None)
try:
git_info_dict.pop('tag')
return git_info_dict
except KeyError:
# tag: rcs entry isn't there, no worries
return git_info_dict
[docs]def pull_info_from_fpg(fpg_file, parameter):
"""
Pull available parameters about x-engine or f-engine from .fpg file.
Available options for x-engine: 'x_fpga_clock', 'xeng_outbits',
'xeng_accumulation_len'
Available options for f-engine: 'n_chans', 'quant_format', 'spead_flavour'
:param fpg_file: bit file path
:param parameter: parameter string
:return: pattern value (string)
"""
match = []
fpg_dict = parse_fpg(fpg_file)
if parameter == 'x_fpga_clock':
match = str(int(fpg_dict[0]['XSG_core_config']['clk_rate'])*10**6)
if parameter == 'xeng_outbits':
match = fpg_dict[0]['sys0_vacc']['n_bits']
if parameter == 'xeng_accumulation_len':
match = fpg_dict[0]['sys0_xeng']['acc_len']
if parameter == 'spead_flavour':
match1 = fpg_dict[0]['pack_spead_pack0']['spead_msw']
match2 = fpg_dict[0]['pack_spead_pack0']['spead_lsw']
s = ','
match = s.join([match1, match2])
if parameter == 'quant_format':
match1 = fpg_dict[0]['snap_quant0']['io_widths']
match2 = fpg_dict[0]['snap_quant0']['io_bps']
s = '.'
match = s.join([match1[1], match2[1]])
if parameter == 'n_chans':
pfb_dict = fpg_dict[0]['pfb_fft_wideband_real_fft_biplex_real_4x']
match1 = int(pfb_dict['fftsize'])
match2 = int(pfb_dict['n_inputs'])
match = match2*2**match1
if match is []:
errstr = 'Parameter %s does not match any field in fpg ' \
'file.' % parameter
LOGGER.error(errstr)
raise RuntimeError(errstr)
return match
[docs]def check_changing_status(counters, data_function, wait_time, num_checks):
"""
Check a changing set of status fields.
:param counters: a list of CheckCounters
:param data_function: a function that will return a single value for the
fields from field_dict
:param wait_time: seconds to wait between calls to data_function
:param num_checks: times to run data_function
"""
# fields = [
# CheckCounter('test_required_diff', False, True),
# CheckCounter('test_required_same', True, True),
# CheckCounter('test_diff', False, True),
# CheckCounter('test_same', True, True),
# ]
#
# def get_data():
# res = {}
# res['test_required_diff'] = time.time()
# res['test_required_same'] = 123456
# res['test_diff'] = time.time()
# res['test_same'] = 654321
# return res
if num_checks < 2:
raise ValueError('num_checks of less than two makes no sense')
# check that required data fields are returned by the data function
change_required = {}
d = data_function()
to_remove = []
for checkctr in counters:
if checkctr.name not in d:
if checkctr.required:
return False, 'required field %s not found' % checkctr.name
to_remove.append(checkctr.name)
else:
checkctr.data = d[checkctr.name]
for name in to_remove:
for idx, checkctr in enumerate(counters):
if checkctr.name == name:
counters.pop(idx)
break
time.sleep(wait_time)
for loop in range(num_checks - 1):
dnew = data_function()
for checkctr in counters:
ctrnew = dnew[checkctr.name]
if checkctr.must_change:
# must change
if ctrnew != checkctr.data:
checkctr.changed = True
else:
# must stay the same
if ctrnew != checkctr.data:
return False, '%s changing: %.3f > %.3f' % (
checkctr.name, checkctr.data, ctrnew)
time.sleep(wait_time)
for checkctr in counters:
if checkctr.must_change and (not checkctr.changed):
return False, '%s is not changing: %.3f' % (
checkctr.name, checkctr.data)
return True, ''
[docs]def program_fpgas(fpga_list, progfile, timeout=10):
"""
Program more than one FPGA at the same time.
:param fpga_list: a list of objects for the FPGAs to be programmed
:param progfile: string, the file used to program the FPGAs
:param timeout: how long to wait for a response, in seconds
"""
stime = time.time()
if progfile is None:
for fpga in fpga_list:
try:
tuplen = len(fpga)
fpga[0].bitstream = fpga[1]
except TypeError:
pass
else:
for fpga in fpga_list:
fpga.bitstream = progfile
threaded_fpga_function(fpga_list, 60, 'upload_to_ram_and_program')
LOGGER.info('Programming %d FPGAs took %.3f seconds.' % (
len(fpga_list), time.time() - stime))
[docs]def threaded_create_fpgas_from_hosts(host_list, fpga_class=None,
port=7147, timeout=10,
best_effort=False, **kwargs):
"""
Create KatcpClientFpga objects in many threads, Moar FASTAAA!
:param fpga_class: the class to insantiate, usually CasperFpga
:param host_list: a comma-seperated list of hosts
:param port: the port on which to do network comms
:param timeout: how long to wait, in seconds
:param best_effort: return as many hosts as it was possible to make
"""
if fpga_class is None:
from casperfpga import CasperFpga
fpga_class = CasperFpga
num_hosts = len(host_list)
result_queue = Queue.Queue(maxsize=num_hosts)
thread_list = []
def makehost(hostname):
result_queue.put_nowait(fpga_class(hostname, port, **kwargs))
for host_ in host_list:
thread = threading.Thread(target=makehost, args=(host_,))
thread.daemon = True
thread.start()
thread_list.append(thread)
for thread_ in thread_list:
thread_.join(timeout)
fpgas = [None] * num_hosts
hosts_missing = host_list[:]
while True:
try:
result = result_queue.get_nowait()
host_pos = host_list.index(result.host)
fpgas[host_pos] = result
hosts_missing.pop(hosts_missing.index(result.host))
except Queue.Empty:
break
if hosts_missing:
for host in hosts_missing:
LOGGER.error('Could not create host %s.' % host)
errstr = 'Given %d hosts, only made %d CasperFpgas.' % (
num_hosts, num_hosts-len(hosts_missing))
LOGGER.error(errstr)
if best_effort:
rv = []
for fpga in fpgas:
if fpga is not None:
rv.append(fpga)
return rv
raise RuntimeError(errstr)
return fpgas
def _check_target_func(target_function):
"""
:param target_function:
"""
if isinstance(target_function, basestring):
return target_function, (), {}
try:
len(target_function)
except TypeError:
return target_function, (), {}
if len(target_function) == 3:
return target_function
elif len(target_function) == 1:
target_function = (target_function[0], (), {})
elif len(target_function) == 2:
target_function = (target_function[0], target_function[1], {})
else:
raise RuntimeError('target_function tuple too long? - (name, (), {})')
return target_function
[docs]def threaded_fpga_function(fpga_list, timeout, target_function):
"""
Thread the running of any CasperFpga function on a list of
CasperFpga objects. Much faster.
:param fpga_list: list of KatcpClientFpga objects
:param timeout: how long to wait before timing out
:param target_function: a tuple with three parts:
1. string, the KatcpClientFpga function to
run e.g. 'disconnect' for fpgaobj.disconnect()
2. tuple, the arguments to the function
3. dict, the keyword arguments to the function e.g. (func_name, (1,2,), {'another_arg': 3})
:return: a dictionary of the results, keyed on hostname
"""
target_function = _check_target_func(target_function)
def dofunc(fpga, *args, **kwargs):
try:
rv = getattr(fpga, target_function[0])(*args, **kwargs)
return rv
except AttributeError:
LOGGER.error('FPGA %s has no such function: %s' % (
fpga.host, target_function[0]))
raise
return threaded_fpga_operation(
fpga_list, timeout, (dofunc, target_function[1], target_function[2]))
[docs]def threaded_fpga_operation(fpga_list, timeout, target_function):
"""
Thread any operation against many FPGA objects
:param fpga_list: list of KatcpClientFpga objects
:param timeout: how long to wait before timing out
:param target_function: a tuple with three parts:
1. reference, the function object that must be
run - MUST take FPGA object as first argument
2. tuple, the arguments to the function
3. dict, the keyword arguments to the function e.g. (func_name, (1,2,), {'another_arg': 3})
:return: a dictionary of the results, keyed on hostname
"""
target_function = _check_target_func(target_function)
def jobfunc(resultq, fpga):
rv = target_function[0](fpga, *target_function[1], **target_function[2])
resultq.put_nowait((fpga.host, rv))
num_fpgas = len(fpga_list)
result_queue = Queue.Queue(maxsize=num_fpgas)
thread_list = []
for fpga_ in fpga_list:
thread = threading.Thread(target=jobfunc, args=(result_queue, fpga_))
thread.setDaemon(True)
thread.start()
thread_list.append(thread)
for thread_ in thread_list:
thread_.join(timeout)
if thread_.isAlive():
break
returnval = {}
hosts_missing = [fpga.host for fpga in fpga_list]
while True:
try:
result = result_queue.get_nowait()
returnval[result[0]] = result[1]
hosts_missing.pop(hosts_missing.index(result[0]))
except Queue.Empty:
break
if hosts_missing:
errmsg = 'Ran \'%s\' on hosts. Did not get a response ' \
'from %s.' % (target_function[0].__name__, hosts_missing)
LOGGER.error(errmsg)
return returnval
[docs]def threaded_non_blocking_request(fpga_list, timeout, request, request_args):
"""
Make a non-blocking KatCP request to a list of KatcpClientFpgas, using
the Asynchronous client.
:param fpga_list: list of KatcpClientFpga objects
:param timeout: the request timeout
:param request: the request string
:param request_args: the arguments to the request, as a list
:return: a dictionary, keyed by hostname, of result dictionaries containing
reply and informs
"""
raise DeprecationWarning
num_fpgas = len(fpga_list)
reply_queue = Queue.Queue(maxsize=num_fpgas)
requests = {}
replies = {}
# reply callback
def reply_cb(host, req_id):
LOGGER.debug('Reply(%s) from host(%s)' % (req_id, host))
reply_queue.put_nowait([host, req_id])
# start the requests
LOGGER.debug('Send request(%s) to %i hosts.' % (request, num_fpgas))
lock = threading.Lock()
for fpga_ in fpga_list:
lock.acquire()
req = fpga_.nb_request(request, None, reply_cb, *request_args)
requests[req['host']] = [req['request'], req['id']]
lock.release()
LOGGER.debug('Request \'%s\' id(%s) to host(%s)' % (
req['request'], req['id'], req['host']))
# wait for replies from the requests
timedout = False
done = False
while (not timedout) and (not done):
try:
it = reply_queue.get(block=True, timeout=timeout)
except:
timedout = True
break
replies[it[0]] = it[1]
if len(replies) == num_fpgas:
done = True
if timedout:
LOGGER.error('non_blocking_request timeout after %is.' % timeout)
LOGGER.error(replies)
raise RuntimeError('non_blocking_request timeout after %is.' % timeout)
# process the replies
returnval = {}
for fpga_ in fpga_list:
try:
request_id = replies[fpga_.host]
except KeyError:
LOGGER.error(replies)
raise KeyError(
'Didn\'t get a reply for FPGA \'%s\' so the request \'%s\' '
'probably didn\'t complete.' % (fpga_.host, request))
reply, informs = fpga_.nb_get_request_result(request_id)
frv = {'request': requests[fpga_.host][0],
'reply': reply.arguments[0],
'reply_args': reply.arguments}
informlist = []
for inf in informs:
informlist.append(inf.arguments)
frv['informs'] = informlist
returnval[fpga_.host] = frv
fpga_.nb_pop_request_by_id(request_id)
return returnval
[docs]def hosts_from_dhcp_leases(host_pref=None,
leases_file='/var/lib/misc/dnsmasq.leases'):
"""
Get a list of hosts from a leases file.
:param host_pref: the prefix of the hosts in which we're interested
:param leases_file: the file to read
"""
hosts = []
if host_pref is None:
host_pref = ['roach', 'skarab']
if not isinstance(host_pref, list):
host_pref = [host_pref]
with open(leases_file) as masqfile:
masqlines = masqfile.readlines()
for line in masqlines:
(leasetime, mac, ip, host, mac2) = line.replace('\n', '').split(' ')
for host_prefix in host_pref:
if host.startswith(host_prefix):
hosts.append(host if host != '*' else ip)
break
return hosts, leases_file
[docs]def deprogram_hosts(host_list):
"""
:param host_list:
"""
if len(host_list) == 0:
raise RuntimeError('No good carrying on without hosts.')
fpgas = threaded_create_fpgas_from_hosts(host_list)
running = threaded_fpga_function(fpgas, 10, 'is_running')
deprogrammed = []
to_deprogram = []
already_deprogrammed = []
for fpga in fpgas:
if running[fpga.host]:
deprogrammed.append(fpga.host)
to_deprogram.append(fpga)
else:
already_deprogrammed.append(fpga.host)
running = threaded_fpga_function(to_deprogram, 10, 'deprogram')
if len(deprogrammed) != 0:
print('%s: deprogrammed okay.' % deprogrammed)
if len(already_deprogrammed) != 0:
print('%s: already deprogrammed.' % already_deprogrammed)
threaded_fpga_function(fpgas, 10, 'disconnect')
# end