Source code for transport

from utils import get_hostname


[docs]class Transport(object): """ The actual network transport of data for a CasperFpga object. """
[docs] def __init__(self, **kwargs): """ Initialise the CasperFpga object :param host: """ self.host, self.bitstream = get_hostname(**kwargs) self.memory_devices = None self.prog_info = {'last_uploaded': '', 'last_programmed': '', 'system_name': ''}
[docs] def connect(self, timeout=None): """ :param timeout: """ pass
[docs] def is_running(self): """ Is the FPGA programmed and running? :return: True or False """ raise NotImplementedError
[docs] def is_connected(self): """ Is the transport layer connected to the platform? :return: True or False """ raise NotImplementedError
[docs] def test_connection(self): """ Write to and read from the scratchpad to test the connection to the FPGA - i.e. Is the casper FPGA connected? :return: Boolean - True/False - Success/Fail """ return self.is_connected()
[docs] def ping(self): """ Use the 'watchdog' request to ping the FPGA host. :return: True or False """ raise NotImplementedError
[docs] def disconnect(self): """ """ pass
[docs] def read(self, device_name, size, offset=0): """ Read `size` bytes from register `device_name`. Start reading from `offset` bytes from `device_name`'s base address. Return the read data as a big-endian binary string. :param device_name: Name of device to be read :type device_name: String :param size: Number of bytes to read :type size: Integer :param offset: Offset from which to begin read, in bytes :type offset: Integer :return: Big-endian binary string """ raise NotImplementedError
[docs] def blindwrite(self, device_name, data, offset=0): """ Write binary data to `device_name`, starting at `offset` bytes from `device_name`'s base address.. :param device_name: Name of device to be read :type device_name: String :param data: Data to write :type data: Big-endian binary string :param offset: Offset from which to begin write, in bytes :type offset: Integer :return: None """ raise NotImplementedError
[docs] def listdev(self): """ Get a list of the memory bus items in this design. :return: a list of memory devices """ return self.memory_devices.keys()
[docs] def deprogram(self): """ Deprogram the FPGA connected by this transport """ raise NotImplementedError
[docs] def set_igmp_version(self, version): """ :param version: """ pass
[docs] def upload_to_ram_and_program(self, filename, port=-1, timeout=10, wait_complete=True, skip_verification=False): """ Upload an FPG file to RAM and then program the FPGA. - Implemented in the child :param filename: the file to upload :param port: the port to use on the rx end, -1 means a random port :param timeout: how long to wait, seconds :param wait_complete: wait for the transaction to complete, return after upload if False :param skip_verification: don't verify the image after uploading it """ raise NotImplementedError
[docs] def upload_to_flash(self, binary_file, port=-1, force_upload=False, timeout=30, wait_complete=True): """ Upload the provided binary file to the flash filesystem. :param binary_file: filename of the binary file to upload :param port: host-side port, -1 means a random port will be used :param force_upload: upload the binary even if it already exists on the host :param timeout: upload timeout, in seconds :param wait_complete: wait for the upload to complete, or just kick it off """ raise NotImplementedError
[docs] def get_system_information_from_transport(self): """ """ return self.bitstream, None
[docs] def post_get_system_information(self): """ Cleanup run after get_system_information """ pass
# end