Source code for transport_dummy

import logging

import skarab_definitions as sd
from transport import Transport
from network import IpAddress

LOGGER = logging.getLogger(__name__)


[docs]class NamedFifo(object):
[docs] def __init__(self, maxlen=None): self.names = [] self.values = [] self.maxlen = maxlen
[docs] def push(self, name, value): self.names.append(name) self.values.append(value) if self.maxlen is not None: if len(self) > self.maxlen: self.names.pop(0) self.values.pop(0)
[docs] def pop(self, name=None): pop = self.names.index(name) if name is not None else 0 self.names.pop(pop) rv = self.values.pop(pop) return rv
def __len__(self): return len(self.names)
[docs]class DummyTransport(Transport): """ A dummy transport for testing """
[docs] def __init__(self, **kwargs): """ Make a Dummy Transport :param host: IP Address should be 127.0.0.1 for a Dummy """ Transport.__init__(self, **kwargs) self._devices = NamedFifo(100) self._devices_wishbone = NamedFifo(100) LOGGER.info('%s: port(%s) created and connected.' % ( self.host, sd.ETHERNET_CONTROL_PORT_ADDRESS))
[docs] def connect(self, timeout=None): """ :param timeout: """ return
[docs] def is_running(self): """ Is the FPGA programmed and running? :return: True or False """ return True
[docs] def is_connected(self): """ """ return True
[docs] def test_connection(self): """ Write to and read from the scratchpad to test the connection to the FPGA """ return self.is_connected()
[docs] def ping(self): """ Use the 'watchdog' request to ping the FPGA host. :return: True or False """ return True
[docs] def disconnect(self): """ :return: """ pass
[docs] def read(self, device_name, size, offset=0): """ :param device_name: :param size: :param offset: """ try: return self._devices.pop(device_name) except ValueError: pass return '\x00' * size
[docs] def blindwrite(self, device_name, data, offset=0): """ :param device_name: :param data: :param offset: """ self._devices.push(device_name, data) return
[docs] def listdev(self): """ Get a list of the memory bus items in this design. :return: a list of memory devices """ return self.memory_devices.keys()
[docs] def deprogram(self): """ Deprogram the FPGA connected by this transport """ raise NotImplementedError
[docs] def set_igmp_version(self, version): """ :param version: """ pass
[docs] def upload_to_ram_and_program(self, filename, port=-1, timeout=10, wait_complete=True, skip_verification=False): """ Upload an FPG file to RAM and then program the FPGA. :param filename: the file to upload :param port: the port to use on the rx end, -1 means a random port :param timeout: how long to wait, seconds :param wait_complete: wait for the transaction to complete, return after upload if False :param skip_verification: don't verify the image after uploading it """ self.bitstream = filename return True
[docs] def upload_to_flash(self, binary_file, port=-1, force_upload=False, timeout=30, wait_complete=True): """ Upload the provided binary file to the flash filesystem. :param binary_file: filename of the binary file to upload :param port: host-side port, -1 means a random port will be used :param force_upload: upload the binary even if it already exists on the host :param timeout: upload timeout, in seconds :param wait_complete: wait for the upload to complete, or just kick it off """ return True
[docs] def get_system_information_from_transport(self): """ """ return self.bitstream, None
[docs] def post_get_system_information(self): """ Cleanup run after get_system_information """ pass
[docs] def read_wishbone(self, wb_address): """ Used to perform low level wishbone read from a Wishbone slave. :param wb_address: address of the wishbone slave to read from :return: Read Data or None """ try: return self._devices_wishbone.pop(wb_address) except ValueError: pass return 0
[docs] def write_wishbone(self, wb_address, data): """ Used to perform low level wishbone write to a wishbone slave. Gives low level direct access to wishbone bus. :param wb_address: address of the wishbone slave to write to :param data: data to write :return: response object """ self._devices_wishbone.push(wb_address, data) return None
[docs] @staticmethod def multicast_receive(gbename, ip, mask): """ :param gbename: :param ip: :param mask: """ resp_ip = IpAddress(ip) resp_mask = IpAddress(mask) LOGGER.debug('%s: multicast configured: addr(%s) mask(%s)' % ( gbename, resp_ip.ip_str, resp_mask.ip_str))
# end