Source code for i2c_motion

import time,logging,numpy as np,struct,sys
import random
#from transforms3d.quaternions import *
#from transforms3d.euler import *

logger = logging.getLogger(__name__)

[docs]class IMUSimple:
[docs] def __init__(self,bus,mpuaddr=0x69,akaddr=None,orient=[[1,0,0],[0,1,0],[0,0,1]]): """ IMUSimple IMU usage demo .. code-block:: python bus = i2c.I2C_IPGPIO(2,3,15000) imu = i2c_motion.IMUSimple(bus,0x69,0x0c) imu.init() print(imu.pose) Default orientation: * x+ east * y+ north * z+ upward * IMU chip pin 1 in 4th quadrant * IMU chip top side upwards provide other orientations in the following format: .. code-block:: python imu = IMUSimple(bus,0x69, orient = [[1,0,0], # new x+ in old coordinate system [0,1,0], # new y+ in old coordinate system [0,0,1]]) # new z+ in old coordinate system """ self.mpu = MPU9250(bus,mpuaddr) if akaddr!=None: # Enable aux i2c - AK8963 self.mpu.setWord('BYPASS_EN',0x1) self.ak = AK8963(bus,akaddr) else: self.ak=None self.rotmax = self.calcRotationMatrix(orient)
[docs] def calcRotationMatrix(self,dst): #src = np.asarray([[1,0,0],[0,1,0],[0,0,1]]) dst = np.asarray(dst)/np.abs(np.linalg.det(dst)) if np.dot(dst[0],dst[1])!=0 or np.dot(dst[0],dst[2])!=0 or np.dot(dst[1],dst[2])!=0: msg = 'Invalid parameter!' logger.error(msg) raise ValueError(msg) # src * R = dst # if src is an I matrix, then R == dst return dst
[docs] def init(self): self.mpu.init(accel=True,gyro=False) #self.mpu.init(lowpower=True) if self.ak!=None: self.ak.init()
@property def accel(self): acc = np.asarray(self.mpu.accel) return np.dot(acc, self.rotmax) @property def gyro(self): return self.mpu.gyro @property def mag(self): return self.ak.mag @property def pose(self): from numpy.linalg import norm acc = np.asarray(self.accel) a0 = [0,0,norm(acc)] na = np.cross(a0,acc) ta = np.arccos(np.dot(a0,acc)/norm(acc)/norm(a0)) ph = np.arctan2(acc[0],acc[1]) return ta*180/np.pi,ph*180/np.pi
[docs]class MPU9250: """ 9-axis MotionTracking device that combines a 3-axis gyroscope, 3-axis accelerometer, 3-axis magnetometer and a Digital Motion Processor (DMP) all in a small 3x3x1mm package available as a pin-compatible upgrade from the MPU-6515 """ DICT = dict() # All register names use lower case letter # All fields use upper case letter DICT[0x00] = { 'XG_ST_DATA' : 0b11111111 << 0,} DICT[0x01] = { 'YG_ST_DATA' : 0b11111111 << 0,} DICT[0x02] = { 'ZG_ST_DATA' : 0b11111111 << 0,} DICT[0x0d] = { 'XA_ST_DATA' : 0b11111111 << 0,} DICT[0x0e] = { 'YA_ST_DATA' : 0b11111111 << 0,} DICT[0x0f] = { 'ZA_ST_DATA' : 0b11111111 << 0,} DICT[0x13] = { 'X_OFFS_USR_H' : 0b11111111 << 0,} DICT[0x14] = { 'X_OFFS_USR_L' : 0b11111111 << 0,} DICT[0x15] = { 'Y_OFFS_USR_H' : 0b11111111 << 0,} DICT[0x16] = { 'Y_OFFS_USR_L' : 0b11111111 << 0,} DICT[0x17] = { 'Z_OFFS_USR_H' : 0b11111111 << 0,} DICT[0x18] = { 'Z_OFFS_USR_L' : 0b11111111 << 0,} DICT[0x19] = { 'SMPLRT_DIV' : 0b11111111 << 0,} DICT[0x1a] = { 'config' : 0b11111111 << 0, 'FIFO_MODE' : 0b1 << 6, 'EXT_SYNC_SET' : 0b111 << 3, 'DLPF_CFG' : 0b111 << 0,} DICT[0x1b] = { 'gyro_config' : 0b11111111 << 0, 'XGYRO_CTEN' : 0b1 << 7, 'YGYRO_CTEN' : 0b1 << 6, 'ZGYRO_CTEN' : 0b1 << 5, 'GYRO_FS_SEL' : 0b11 << 3, 'FCHOICE_B' : 0b11 << 0,} DICT[0x1c] = { 'accel_config' : 0xff << 0, 'AX_ST_EN' : 0b1 << 7, 'AY_ST_EN' : 0b1 << 6, 'AZ_ST_EN' : 0b1 << 5, 'ACCEL_FS_SEL' : 0b11 << 3,} DICT[0x1d] = { 'accel_config2' : 0xff << 0, 'ACCEL_FCHOICE_B' : 0b11 << 2, 'A_DLPF_CFG' : 0b11 << 0, } DICT[0x1e] = { 'LPOSC_CLKSEL' : 0b1111 << 0,} DICT[0x1f] = { 'WOM_THRESHOLD' : 0b11111111 << 0,} DICT[0x23] = { 'fifo_en' : 0b11111111 << 0, 'TEMP_FIFO_EN' : 0b1 << 7, 'GYRO_XOUT' : 0b1 << 6, 'GYRO_YOUT' : 0b1 << 5, 'GYRO_ZOUT' : 0b1 << 4, 'ACCEL' : 0b1 << 3, 'SLV2' : 0b1 << 2, 'SLV1' : 0b1 << 1, 'SLV0' : 0b1 << 0,} DICT[0x24] = { 'i2c_mst_ctrl' : 0b11111111 << 0, 'MULT_MST_EN' : 0b1 << 7, 'WAIT_FOR_ES' : 0b1 << 6, 'SLV_3_FIFO_EN' : 0b1 << 5, 'I2C_MST_P_NSR' : 0b1 << 4, 'I2C_MST_CLK' : 0b1111 << 0, } DICT[0x25] = { 'I2C_SLV0_RNW' : 0b1 << 7, 'I2C_ID_0' : 0b1111111 << 0, } DICT[0x26] = { 'I2C_SLV0_REG' : 0b11111111 << 0,} DICT[0x27] = { 'I2C_SLV0_EN' : 0b1 << 7, 'I2C_SLV0_BYTE_SW' : 0b1 << 6, 'I2C_SLV0_REG_DIS' : 0b1 << 5, 'I2C_SLV0_GRP' : 0b1 << 4, 'I2C_SLV0_LENG' : 0b1111 << 0, } DICT[0x28] = { 'I2C_SLV1_RNW' : 0b1 << 7, 'I2C_ID_1' : 0b1111111 << 0, } DICT[0x29] = { 'I2C_SLV1_REG' : 0b11111111 << 0,} DICT[0x2a] = { 'I2C_SLV1_EN' : 0b1 << 7, 'I2C_SLV1_BYTE_SW' : 0b1 << 6, 'I2C_SLV1_REG_DIS' : 0b1 << 5, 'I2C_SLV1_GRP' : 0b1 << 4, 'I2C_SLV1_LENG' : 0b1111 << 0, } DICT[0x2b] = { 'I2C_SLV2_RNW' : 0b1 << 7, 'I2C_ID_2' : 0b1111111 << 0, } DICT[0x2c] = { 'I2C_SLV2_REG' : 0b11111111 << 0,} DICT[0x2d] = { 'I2C_SLV2_EN' : 0b1 << 7, 'I2C_SLV2_BYTE_SW' : 0b1 << 6, 'I2C_SLV2_REG_DIS' : 0b1 << 5, 'I2C_SLV2_GRP' : 0b1 << 4, 'I2C_SLV2_LENG' : 0b1111 << 0, } DICT[0x2e] = { 'I2C_SLV3_RNW' : 0b1 << 7, 'I2C_ID_3' : 0b1111111 << 0, } DICT[0x2f] = { 'I2C_SLV3_REG' : 0b11111111 << 0,} DICT[0x30] = { 'I2C_SLV3_EN' : 0b1 << 7, 'I2C_SLV3_BYTE_SW' : 0b1 << 6, 'I2C_SLV3_REG_DIS' : 0b1 << 5, 'I2C_SLV3_GRP' : 0b1 << 4, 'I2C_SLV3_LENG' : 0b1111 << 0, } DICT[0x31] = { 'I2C_SLV4_RNW' : 0b1 << 7, 'I2C_ID_4' : 0b1111111 << 0, } DICT[0x32] = { 'I2C_SLV3_REG' : 0b11111111 << 0,} DICT[0x33] = { 'I2C_SLV4_DO' : 0b11111111 << 0,} DICT[0x34] = { 'I2C_SLV4_EN' : 0b1 << 7, 'SLV4_DONE_INT_EN' : 0b1 << 6, 'I2C_SLV4_REG_DIS' : 0b1 << 5, 'I2C_MST_DLY' : 0b1111 << 0, } DICT[0x35] = { 'I2C_SLV4_DI' : 0b11111111 << 0,} DICT[0x36] = { 'PASS_THROUGH' : 0b1 << 7, 'I2C_SLV4_DONE' : 0b1 << 6, 'I2C_LOST_ARB' : 0b1 << 5, 'I2C_SLV4_NACK' : 0b1 << 4, 'I2C_SLV3_NACK' : 0b1 << 3, 'I2C_SLV2_NACK' : 0b1 << 2, 'I2C_SLV1_NACK' : 0b1 << 1, 'I2C_SLV0_NACK' : 0b1 << 0,} DICT[0x37] = { 'int_pin_cfg' : 0xff << 0, 'ACTL' : 0b1 << 7, 'OPEN' : 0b1 << 6, 'LATCH_INT_EN' : 0b1 << 5, 'INT_ANYRD_2CLEAR' : 0b1 << 4, 'ACTL_FSYNC' : 0b1 << 3, 'FSYNC_INT_MODE_EN' : 0b1 << 2, 'BYPASS_EN' : 0b1 << 1, } DICT[0x38] = { 'int_enable' : 0xff << 0, 'WOM_EN' : 0b1 << 6, 'FIFO_OFLOW_EN' : 0b1 << 4, 'FSYNC_INT_EN' : 0b1 << 3, 'RAW_RDY_EN' : 0b1 << 0, } DICT[0x3a] = { 'INT_STATUS' : 0b11111111 << 0, 'WOM_INT' : 0b1 << 6, 'FIFO_OFLOW_INT' : 0b1 << 4, 'FSYNC_INT' : 0b1 << 3, 'RAW_DATA_RDY_INT' : 0b1 << 0, } DICT[0x3b] = { 'ACCEL_XOUT_H' : 0b11111111 << 0,} DICT[0x3c] = { 'ACCEL_XOUT_L' : 0b11111111 << 0,} DICT[0x3d] = { 'ACCEL_YOUT_H' : 0b11111111 << 0,} DICT[0x3e] = { 'ACCEL_YOUT_L' : 0b11111111 << 0,} DICT[0x3f] = { 'ACCEL_ZOUT_H' : 0b11111111 << 0,} DICT[0x40] = { 'ACCEL_ZOUT_L' : 0b11111111 << 0,} DICT[0x41] = { 'TEMP_OUT_H' : 0b11111111 << 0,} DICT[0x42] = { 'TEMP_OUT_L' : 0b11111111 << 0,} DICT[0x43] = { 'GYRO_XOUT_H' : 0b11111111 << 0,} DICT[0x44] = { 'GYRO_XOUT_L' : 0b11111111 << 0,} DICT[0x45] = { 'GYRO_YOUT_H' : 0b11111111 << 0,} DICT[0x46] = { 'GYRO_YOUT_L' : 0b11111111 << 0,} DICT[0x47] = { 'GYRO_ZOUT_H' : 0b11111111 << 0,} DICT[0x48] = { 'GYRO_ZOUT_L' : 0b11111111 << 0,} DICT[0x49] = { 'EXT_SENS_DATA_00' : 0b11111111 << 0,} DICT[0x4a] = { 'EXT_SENS_DATA_01' : 0b11111111 << 0,} DICT[0x4b] = { 'EXT_SENS_DATA_02' : 0b11111111 << 0,} DICT[0x4c] = { 'EXT_SENS_DATA_03' : 0b11111111 << 0,} DICT[0x4d] = { 'EXT_SENS_DATA_04' : 0b11111111 << 0,} DICT[0x4e] = { 'EXT_SENS_DATA_05' : 0b11111111 << 0,} DICT[0x4f] = { 'EXT_SENS_DATA_06' : 0b11111111 << 0,} DICT[0x50] = { 'EXT_SENS_DATA_07' : 0b11111111 << 0,} DICT[0x51] = { 'EXT_SENS_DATA_08' : 0b11111111 << 0,} DICT[0x52] = { 'EXT_SENS_DATA_09' : 0b11111111 << 0,} DICT[0x53] = { 'EXT_SENS_DATA_10' : 0b11111111 << 0,} DICT[0x54] = { 'EXT_SENS_DATA_11' : 0b11111111 << 0,} DICT[0x55] = { 'EXT_SENS_DATA_12' : 0b11111111 << 0,} DICT[0x56] = { 'EXT_SENS_DATA_13' : 0b11111111 << 0,} DICT[0x57] = { 'EXT_SENS_DATA_14' : 0b11111111 << 0,} DICT[0x58] = { 'EXT_SENS_DATA_15' : 0b11111111 << 0,} DICT[0x59] = { 'EXT_SENS_DATA_16' : 0b11111111 << 0,} DICT[0x5a] = { 'EXT_SENS_DATA_17' : 0b11111111 << 0,} DICT[0x5b] = { 'EXT_SENS_DATA_18' : 0b11111111 << 0,} DICT[0x5c] = { 'EXT_SENS_DATA_19' : 0b11111111 << 0,} DICT[0x5d] = { 'EXT_SENS_DATA_20' : 0b11111111 << 0,} DICT[0x5e] = { 'EXT_SENS_DATA_21' : 0b11111111 << 0,} DICT[0x5f] = { 'EXT_SENS_DATA_22' : 0b11111111 << 0,} DICT[0x60] = { 'EXT_SENS_DATA_23' : 0b11111111 << 0,} DICT[0x63] = { 'I2C_SLV0_DO' : 0b11111111 << 0,} DICT[0x64] = { 'I2C_SLV1_DO' : 0b11111111 << 0,} DICT[0x65] = { 'I2C_SLV2_DO' : 0b11111111 << 0,} DICT[0x66] = { 'I2C_SLV3_DO' : 0b11111111 << 0,} DICT[0x67] = { 'DELAY_ES_SHADOW' : 0b1 << 7, 'I2C_SLV4_DLY_EN' : 0b1 << 4, 'I2C_SLV3_DLY_EN' : 0b1 << 3, 'I2C_SLV2_DLY_EN' : 0b1 << 2, 'I2C_SLV1_DLY_EN' : 0b1 << 1, 'I2C_SLV0_DLY_EN' : 0b1 << 0,} DICT[0x68] = { 'GYRO_RST' : 0b1 << 2, 'ACCEL_RST' : 0b1 << 1, 'TEMP_RST' : 0b1 << 0, } DICT[0x69] = { 'ACCEL_INTEL_EN' : 0b1 << 7, 'ACCEL_INTEL_MODE' : 0b1 << 6, } DICT[0x6a] = { 'user_ctrl' : 0b11111111 << 0, 'FIFO_EN' : 0b1 << 6, 'I2C_MST_EN' : 0b1 << 5, 'I2C_IF_DIS' : 0b1 << 4, 'FIFO_RST' : 0b1 << 2, 'I2C_MST_RST' : 0b1 << 1, 'SIG_COND_RST' : 0b1 << 0,} DICT[0x6b] = { 'pwr_mgmt_1' : 0b11111111 << 0, 'H_RESET' : 0b1 << 7, 'SLEEP' : 0b1 << 6, 'CYCLE' : 0b1 << 5, 'GYRO_STANDBY' : 0b1 << 4, 'PD_PTAT' : 0b1 << 3, 'CLKSEL' : 0b111 << 0,} DICT[0x6c] = { 'pwr_mgmt_2' : 0b11111111 << 0, 'DIS_XA' : 0b1 << 5, 'DIS_YA' : 0b1 << 4, 'DIS_ZA' : 0b1 << 3, 'DIS_XG' : 0b1 << 2, 'DIS_YG' : 0b1 << 1, 'DIS_ZG' : 0b1 << 0,} DICT[0x72] = { 'FIFO_CNT_H' : 0b11111 << 0,} DICT[0x73] = { 'FIFO_CNT_L' : 0b11111111 << 0,} DICT[0x74] = { 'fifo_r_w' : 0b11111111 << 0, 'D' : 0b11111111 << 0,} DICT[0x75] = { 'WHOAMI' : 0b11111111 << 0,} DICT[0x77] = { 'XA_OFFS_H' : 0b11111111 << 0,} DICT[0x78] = { 'XA_OFFS_L' : 0b1111111 << 1,} DICT[0x7a] = { 'YA_OFFS_H' : 0b11111111 << 0,} DICT[0x7b] = { 'YA_OFFS_L' : 0b1111111 << 1,} DICT[0x7d] = { 'ZA_OFFS_H' : 0b11111111 << 0,} DICT[0x7e] = { 'ZA_OFFS_L' : 0b1111111 << 1,} WHOAMI=0x71 _gyro_scale = None _accel_scale = None
[docs] def __init__(self, itf, addr=0x68): self.itf = itf self.addr = addr # This is an effective way of detecting connectivity of I2C bus if self.whoami() is not self.WHOAMI: msg = "MPU9250 at address {} is not ready!".format(addr) logger.error(msg) raise IOError(msg)
[docs] def init(self,gyro=True,accel=True,lowpower=False): """ Initialise MPU9250 """ self.reset() self.setWord('pwr_mgmt_1',0x00) # Wait MEMS oscilator to stablize time.sleep(0.1) # low power mode is problematic at the moment if lowpower==True: # Enter into accelerometer only low power mode rid,mask=self._getMask(self.DICT,'CLKSEL') val = self._set(0x0, 1, mask) rid,mask=self._getMask(self.DICT,'CYCLE') val = self._set(val, 1, mask) rid,mask=self._getMask(self.DICT,'SLEEP') val = self._set(val, 0, mask) # PD_PTAT is, I believe, the legendary 'TEMP_DIS' bit # in MPU-9250 Register Map datasheet rid,mask=self._getMask(self.DICT,'PD_PTAT') val = self._set(val, 1, mask) self.write(rid,val) # Set Accelerometer sample rate at low power mode # Lposc_clksel Output Frequency (Hz) # 0 0.24 # 1 0.49 # 2 0.98 # 3 1.95 # 4 3.91 # 5 7.81 # 6 15.63 # 7 31.25 # 8 62.50 # 9 125 # 10 250 # 11 500 # 12-15 RESERVED rid,mask=self._getMask(self.DICT,'LPOSC_CLKSEL') val = self._set(val, 8, mask) self.write(rid,val) # Enable accel and disable gyro accel=True gyro=False else: self.setWord('pwr_mgmt_1',0x0) # Enable accel and gyro (PWR_MGMT_2) rid,mask=self._getMask(self.DICT,'DIS_XA') val = self._set(0x0,not accel,mask) rid,mask=self._getMask(self.DICT,'DIS_YA') val = self._set(val,not accel,mask) rid,mask=self._getMask(self.DICT,'DIS_ZA') val = self._set(val,not accel,mask) rid,mask=self._getMask(self.DICT,'DIS_XG') val = self._set(val,not gyro,mask) rid,mask=self._getMask(self.DICT,'DIS_YG') val = self._set(val,not gyro,mask) rid,mask=self._getMask(self.DICT,'DIS_ZG') val = self._set(val,not gyro,mask) self.write(rid,val) self.setWord('config',0x0) self.setWord('SMPLRT_DIV',0x0) self.setWord('gyro_config',0X0) self.setWord('accel_config',0X0) # Must set ACCEL_FCHOICE to 0 if in low-power mode (accel_config2) rid,mask=self._getMask(self.DICT,'ACCEL_FCHOICE_B') val = self._set(0x0,lowpower,mask) self.write(rid,val) # Disable interrupt self.setWord('int_enable',0x0) self.gyro_scale=0 self.accel_scale=0
[docs] def powerOff(self): self.setWord('pwr_mgmt_2',0xff)
[docs] def whoami(self): rid, mask = self._getMask(self.DICT, 'WHOAMI') return self.read(rid)
[docs] def read(self,reg,length=1): return self.itf.read(self.addr,reg,length)
[docs] def write(self,reg,data): self.itf.write(self.addr,reg,data)
[docs] def getWord(self,name): rid, mask = self._getMask(self.DICT, name) return self._get(self.read(rid),mask)
[docs] def setWord(self,name,value): rid, mask = self._getMask(self.DICT, name) if mask == 0xff: data = self._set(0x0,value,mask) self.write(rid,data) else: data = self.read(rid) data = self._set(data,value,mask) self.write(rid,data)
def _set(self, d1, d2, mask=None): # Update some bits of d1 with d2, while keep other bits unchanged if mask: d1 = d1 & ~mask d2 = d2 * (mask & -mask) return d1 | d2 def _get(self, data, mask): data = data & mask return data / (mask & -mask) def _getMask(self, dicts, name): for rid in dicts: if name in dicts[rid]: return rid, dicts[rid][name] return None,None
[docs] def reset(self): self.setWord('H_RESET',0x1) time.sleep(0.1)
[docs] def setAccelSamplingRate(self,conf): """ Accelerometer sampling rate parameter conf is in range(9) +-----+----------------+------------+---------+ | num | bandwidth (Hz) | delay (ms) | fs (Hz) | +-----+----------------+------------+---------+ | 0 | 1130 | 0.75 | 4000 | +-----+----------------+------------+---------+ | 1 | 460 | 1.94 | 1000 | +-----+----------------+------------+---------+ | 2 | 184 | 5.8 | 1000 | +-----+----------------+------------+---------+ | 3 | 92 | 7.8 | 1000 | +-----+----------------+------------+---------+ | 4 | 41 | 11.8 | 1000 | +-----+----------------+------------+---------+ | 5 | 20 | 19.8 | 1000 | +-----+----------------+------------+---------+ | 6 | 10 | 35.7 | 1000 | +-----+----------------+------------+---------+ | 7 | 5 | 66.96 | 1000 | +-----+----------------+------------+---------+ | 8 | 460 | 1.94 | 1000 | +-----+----------------+------------+---------+ """ vals=[ [1130, 0.75, 4000], [460, 1.94, 1000], [184, 5.8, 1000], [92, 7.8, 1000], [41, 11.8, 1000], [20, 19.8, 1000], [10, 35.7, 1000], [5, 66.96, 1000], [460, 1.94, 1000],] param=[ [0,0], [1,0], [1,1], [1,2], [1,3], [1,4], [1,5], [1,6], [1,7],] if conf not in range(9): raise ValueError("Invalid parameter") config = param[conf] r = vals[conf] self.setWord('ACCEL_FCHOICE', config[0]) self.setWord('A_DLPFCFG', config[1]) logger.info("MPU9250 Accel sampling bandwidth={} Hz, delay={} ms, fs={} Hz".format(r[0],r[1],r[2]))
[docs] def setGyroSamplingRate(self,conf): """ Gyroscope sampling rate parameter conf is in range(10) +-----+----------------+------------+---------+ | num | bandwidth (Hz) | delay (ms) | fs (Hz) | +-----+----------------+------------+---------+ | 0 | 8800 | 0.064 | 32000 | +-----+----------------+------------+---------+ | 1 | 3600 | 0.11 | 32000 | +-----+----------------+------------+---------+ | 2 | 250 | 0.97 | 8000 | +-----+----------------+------------+---------+ | 3 | 184 | 2.9 | 1000 | +-----+----------------+------------+---------+ | 4 | 92 | 3.9 | 1000 | +-----+----------------+------------+---------+ | 5 | 41 | 5.9 | 1000 | +-----+----------------+------------+---------+ | 6 | 20 | 9.9 | 1000 | +-----+----------------+------------+---------+ | 7 | 10 | 17.85 | 1000 | +-----+----------------+------------+---------+ | 8 | 5 | 33.48 | 1000 | +-----+----------------+------------+---------+ | 9 | 3600 | 0.17 | 8000 | +-----+----------------+------------+---------+ """ vals=[ [8800, 0.064, 32000], [3600, 0.11, 32000], [250, 0.97, 8000 ], [184, 2.9, 1000 ], [92, 3.9, 1000 ], [41, 5.9, 1000 ], [20, 9.9, 1000 ], [10, 17.85, 1000 ], [5, 33.48, 1000 ], [3600, 0.17, 8000 ],] param=[ [0,0], [1,0], [3,0], [3,1], [3,2], [3,3], [3,4], [3,5], [3,6], [3,7],] if conf not in range(10): raise ValueError("Invalid parameter") config = param[conf] r = vals[conf] self.setWord('FCHOICE_B', config[0] ^ 0b11) self.setWord('DLPF_CFG', config[1]) logger.info("MPU9250 Gyro sampling bandwidth={} Hz, delay={} ms, fs={} Hz".format(r[0],r[1],r[2]))
[docs] def setFIFO(self,accel=False,temp=False,gyro=False,slv0=False,slv1=False,slv2=False,slv3=False): rid, mask = self._getMask(self.DICT, 'fifo_en') val = self._set(0x0, gyro, self.DICT[rid]['GYRO_XOUT']) val = self._set(val, gyro, self.DICT[rid]['GYRO_YOUT']) val = self._set(val, gyro, self.DICT[rid]['GYRO_ZOUT']) val = self._set(val, accel, self.DICT[rid]['ACCEL']) val = self._set(val, temp, self.DICT[rid]['TEMP_FIFO_EN']) val = self._set(val, slv0, self.DICT[rid]['SLV0']) val = self._set(val, slv1, self.DICT[rid]['SLV1']) val = self._set(val, slv2, self.DICT[rid]['SLV2']) self.write(rid,val)
#self.setWord('SLV_3_FIFO_EN',slv3)
[docs] def getRegister(self,rid=None): if rid==None: return dict([(regId,self.getRegister(regId)) for regId in self.DICT]) elif rid in self.DICT: rval = self.read(rid) return {name: self._get(rval,mask) for name, mask in self.DICT[rid].items()} else: raise ValueError("Invalid parameter")
@property def accel_scale(self): if self._accel_scale not in range(4): self._accel_scale = self.getWord('ACCEL_FS_SEL') return self._accel_scale @accel_scale.setter def accel_scale(self,scale): if scale not in range(4): raise ValueError("Invalid parameter") self.ACCEL_SCALE = scale self.setWord('ACCEL_FS_SEL', scale) @property def gyro_scale(self): if self._gyro_scale not in range(4): self._gyro_scale = self.getWord('GYRO_FS_SEL') return self._gyro_scale @gyro_scale.setter def accel_scale(self,scale): if scale not in range(4): raise ValueError("Invalid parameter") self.GYRO_SCALE = scale self.setWord('GYRO_FS_SEL', scale) @property def accel(self): rid, mask = self._getMask(self.DICT, 'ACCEL_XOUT_H') data = np.asarray(self.read(rid,6)) data = (data[0::2] << 8) | data[1::2] data = signed(data,16) return self.interpretAccel(data) @property def gyro(self): rid, mask = self._getMask(self.DICT, 'GYRO_XOUT_H') data = np.asarray(self.read(rid,6)) data = (data[0::2] << 8) | data[1::2] data = signed(data,16) return self.interpretGyro(data)
[docs] def interpretAccel(self,data,scale=None): ACCEL_FULL_SCALE = [2,4,8,16] def _setScale(_d,_c): return _c*_d if scale==None: scale = ACCEL_FULL_SCALE[self.accel_scale] coe = (scale*2.0)/(1<<16) if isinstance(data,np.ndarray) or isinstance(data,list): v_setScale = np.vectorize(_setScale) return v_setScale(data,coe) else: return coe*data
[docs] def interpretGyro(self,data,scale=None): GYRO_FULL_SCALE = [250, 500, 1000, 2000] def _setScale(_d,_c): return _c*_d if scale==None: scale = GYRO_FULL_SCALE[self.gyro_scale] coe = (scale*2.0)/(1<<16) if isinstance(data,np.ndarray) or isinstance(data,list): v_setScale = np.vectorize(_setScale) return v_setScale(data,coe) else: return coe*data
[docs] def readFIFOCount(self): """ Read the number of data available in the FIFO """ rid, mask = self._getMask(self.DICT, 'FIFO_CNT_H') data = self.read(rid,2) data = (self._get(data[0],mask) << 8) | data[1] return data
[docs] def readFIFO(self,types,length=None,filename=None,raw=False,wait=0.001): """ read FIFO and sort data into categories according to given types .. code-block:: python readFIFO({'accel':True,'gyro':True}) readFIFO({'accel':True,'gyro':True},length=1200) readFIFO({'accel':True,'slv0':8},filename='/tmp/data.txt') """ bitwidth=16 n = self.sortFIFOData(types) if filename == None: data = self._readFIFO2MEM(length*n,wait=wait) data = self.sortFIFOData(types,data) if 'gyro' in data: data['gyro'] = signed(data['gyro'],bitwidth) if 'accel' in data: data['accel'] = signed(data['accel'],bitwidth) if 'gyro' in data and not raw: data['gyro'] = self.interpretGyro(data['gyro']) if 'accel' in data and not raw: data['accel'] = self.interpretAccel(data['accel']) return data else: interm = '/tmp/'+str(random.randint(0,sys.maxint)) with open(interm,'r+') as fi, open(filename,'w') as fo: self._readFIFO2File(fi,length*n,wait) fi.seek(0)
[docs] def sortFIFOData(self,types,data=None): """ Sort a serial of data into categories according to given types .. code-block:: python sortFIFOData({'accel':True,'gyro':True},[0,0,16383,0,0,0]) """ TYPES = ['accel','temp','gyro','slv0','slv1','slv2','slv3'] if not isinstance(types,dict): raise ValueError("Invalid parameter") if any(key not in TYPES for key in types): raise ValueError("Invalid parameter") if any(not isinstance(value,int) for value in types.values()): raise ValueError("Invalid parameter") if any(value >= 16 or value < 1 for value in types.values()): raise ValueError("Invalid parameter") # each n = 0 if 'accel' in types: n += 6 # 3 directions, 2 bytes per direction if 'temp' in types: n += 2 # 2 bytes if 'gyro' in types: n += 6 # 6 bytes if 'slv0' in types: n += types['slv0'] if 'slv1' in types: n += types['slv1'] if 'slv2' in types: n += types['slv2'] if 'slv3' in types: n += types['slv3'] if data==None: return n else: data = np.asarray(data).reshape(-1,n) # Sort out sensor readings results = dict() if 'accel' in types: results['accel'] = (data[:,0:6:2] << 8) | data[:,1:6:2] data = data[:,6:] if 'temp' in types: results['temp'] = (data[:,0] << 8) | data[:,1] data = data[:,2:] if 'gyro' in types: results['gyro'] = (data[:,0:6:2] << 8) | data[:,1:6:2] data = data[:,6:] if 'slv0' in types: results['slv0'] = data[:,0:types['slv0']] data = data[:,types['slv0']:] if 'slv1' in types: results['slv1'] = data[:,0:types['slv1']] data = data[:,types['slv1']:] if 'slv2' in types: results['slv2'] = data[:,0:types['slv2']] data = data[:,types['slv2']:] if 'slv3' in types: results['slv3'] = data[:,0:types['slv3']] data = data[:,types['slv3']:] return results
def _readFIFO2File(self,handle,length=None,wait=0.001,verbose=True): """ Redirect FIFO into a file """ rid, mask = self._getMask(self.DICT, 'fifo_r_w') if length == None: cnts = self.readFIFOCount() if cnts > 0: data = self.read(rid, cnts) handle.write('\n'.join([str(num) for num in data])) else: total = length percent=range(0,101,10) while length > 0: if verbose and (total-length)*100./total > percent[0]: print str(percent[0])+'%...', sys.stdout.flush() percent = percent[1:] cnts = self.readFIFOCount() if length<=cnts: data = self.read(rid, length) handle.write('\n'.join([str(num) for num in data])) length -= length elif cnts>0: data = self.read(rid, cnts) handle.write('\n'.join([str(num) for num in data])) length -= cnts time.sleep(wait) else: time.sleep(wait) if verbose: print(str(percent[0])+'%') def _readFIFO2MEM(self,length=None,wait=0.001,verbose=True): """ read FIFO into memory """ rid, mask = self._getMask(self.DICT, 'fifo_r_w') data = [] if length == None: cnts = self.readFIFOCount() if cnts > 0: data = self.read(rid, cnts) else: total = length percent=range(10,101,10) while length > 0: if verbose and (total-length)*1./total > percent[0]: print(str(percent[0])+'%...',) sys.stdout.flush() percent = percent[1:] cnts = self.readFIFOCount() if length<=cnts: data += self.read(rid, length) length -= length return data elif cnts > 0: data += self.read(rid, cnts) length -= cnts time.sleep(wait) else: time.sleep(wait) if verbose: print(str(percent[0])+'%') return data #def setAccelBias(self,bias): @property def accel_offs(self): data = np.zeros(6) rid,mask=self._getMask(self.DICT,'XA_OFFS_H') data[0]=self._get(self.read(rid),mask) rid,mask=self._getMask(self.DICT,'XA_OFFS_L') data[1]=self._get(self.read(rid),mask) rid,mask=self._getMask(self.DICT,'YA_OFFS_H') data[2]=self._get(self.read(rid),mask) rid,mask=self._getMask(self.DICT,'YA_OFFS_L') data[3]=self._get(self.read(rid),mask) rid,mask=self._getMask(self.DICT,'ZA_OFFS_H') data[4]=self._get(self.read(rid),mask) rid,mask=self._getMask(self.DICT,'ZA_OFFS_L') data[5]=self._get(self.read(rid),mask) data = (data[0::2] << 7) | data[1::2] data = signed(data,15) data = [struct.unpack('>i',struct.pack('>I',num))[0] for num in data] return data @property def gyro_offs(self): rid,mask=self._getMask(self.DICT,'X_OFFS_USR_H') data = np.asarray(self.read(rid,6)) data = (data[0::2] << 8) | data[1::2] data = [struct.unpack('>h',struct.pack('>H',num))[0] for num in data] return data @gyro_offs.setter def gyro_offs(self,offs): if not isinstance(offs,list) and not isinstance(offs,np.ndarray): raise ValueError("Invalid parameter") if len(offs) != 3: raise ValueError("Invalid parameter") FS_SEL=self.getWord('GYRO_FS_SEL') offs = [int(off * 2.**FS_SEL / -4) for off in offs] offs = [struct.unpack('>H',struct.pack('>h',num))[0] for num in offs] goffs = np.zeros(6,dtype=np.int8) goffs[0::2] = [off >> 8 for off in offs] goffs[1::2] = [off & 0xff for off in offs] rid,mask=self._getMask(self.DICT,'X_OFFS_USR_H') for i in range(len(goffs)): self.write(rid+i,goffs[i]) # def selftest(self, gyroscale=250,accelscale=2): # # self.setWord('SMPLRT_DIV', 0) # self.setWord('config', 2) # self.setWord('gyro_config', 0) # self.setWord('accel_config2', 2) # self.setWord('accel_config', 0) # # self.gyro_scale=gyroscale # GFS = self.GYRO_FULL_SCALE.index(gyroscale) # self.setAccelScale(accelscale) # AFS = self.ACCEL_FULL_SCALE.index(accelscale) # # acc = np.zeros(3) # gyr = np.zeros(3) # for i in range(200): # acc += self.readAccel(True) # gyr += self.readGyro(True) # acc /= 200. # gyr /= 200. # # self.setWord('XGYRO_CTEN', 0x1) # self.setWord('YGYRO_CTEN', 0x1) # self.setWord('ZGYRO_CTEN', 0x1) # self.setWord('AX_ST_EN', 0x1) # self.setWord('AY_ST_EN', 0x1) # self.setWord('AZ_ST_EN', 0x1) # # time.sleep(1) # # accST = np.zeros(3) # gyrST = np.zeros(3) # for i in range(200): # accST += self.readAccel(True) # gyrST += self.readGyro(True) # accST /= 200. # gyrST /= 200. # # self.setWord('XGYRO_CTEN', 0x0) # self.setWord('YGYRO_CTEN', 0x0) # self.setWord('ZGYRO_CTEN', 0x0) # self.setWord('AX_ST_EN', 0x0) # self.setWord('AY_ST_EN', 0x0) # self.setWord('AZ_ST_EN', 0x0) # # rid,mask=self._getMask(self.DICT,'XG_ST_DATA') # stFactory = np.asarray(self.read(rid,6)) # factoryTrim[0:3] = ((2620/1)<<GFS)*np.power(1.01,stFactory[0:3]-1.0) # factoryTrim[3:6] = ((2620/1)<<AFS)*np.power(1.01,stFactory[3:6]-1.0) # # gyrerr = ((gyrST - gyr)/factoryTrim[0:3])*100.0-100.0 # accerr = ((accST - acc)/factoryTrim[3:6])*100.0-100.0 # # return {'GYRO':gyrerr,'ACCEL':accerr}
[docs] def calibrate(self,WRITE_OFFSET_REG=True): """ Not working """ self.init(gyro=True,accel=True) # Actual sampling rate = 1000 / (1+9) = 100 Hz self.setGyroSamplingRate(5) self.setAccelSamplingRate(4) self.setWord('SMPLRT_DIV',0x9) self.gyro_scale=0 self.accel_scale=0 self.setWord('FIFO_RST',0x1) self.setWord('FIFO_EN',0x1) logger.info('Calibrate Gyro bias') if WRITE_OFFSET_REG: rid,mask=self._getMask(self.DICT,'X_OFFS_USR_H') self.write(rid,[0]*6) self.setFIFO(gyro=True) data = self.readFIFO({'gyro':True},raw=True,length=5*100) self.setFIFO() if WRITE_OFFSET_REG: self.setGyroOffs(np.average(data['gyro'],axis=0)) logger.info('Calibrate Accel bias and triad mismatches') length=100*5 fn = '/tmp/imu_' + time.strftime('%Y%m%d%H%M%S', time.gmtime()) fn = fn + '_ga_' + str(length) + '.txt' self.setFIFO(gyro=True,accel=True) data = self.readFIFO({'gyro':True,'accel':True}, filename=fn,length=length) self.setFIFO()
[docs]class AK8963: DICT = dict() DICT[0x00] = { 'wia':0xff<<0,} DICT[0x01] = { 'info':0xff<<0,} DICT[0x02] = { 'st1':0xff<<0, 'DOR' : 0b1 << 1, 'DRDY' : 0b1 << 0, } DICT[0x03] = { 'hxl':0xff<<0,} DICT[0x04] = { 'hxh':0xff<<0,} DICT[0x05] = { 'hyl':0xff<<0,} DICT[0x06] = { 'hyh':0xff<<0,} DICT[0x07] = { 'hzl':0xff<<0,} DICT[0x08] = { 'hzh':0xff<<0,} DICT[0x09] = { 'st2':0xff<<0, 'BITM' : 0b1 << 4, 'HOFL' : 0b1 << 3, } DICT[0x0a] = { 'cntl1':0xff<<0, 'BIT' : 0b1 << 4, 'MODE' : 0b1111 << 0, } DICT[0x0b] = { 'cntl2':0xff<<0, 'SRST' : 0b1 << 0,} DICT[0x0c] = { 'astc':0xff<<0, 'SELF' : 0b1 << 6,} DICT[0x0d] = { 'ts1':0xff<<0,} DICT[0x0e] = { 'ts2':0xff<<0,} DICT[0x0f] = { 'i2cdis':0xff<<0,} DICT[0x10] = { 'asax':0xff<<0,} DICT[0x11] = { 'asay':0xff<<0,} DICT[0x12] = { 'asaz':0xff<<0,} WHOAMI=0x48 adj=np.asarray([0,0,0])
[docs] def __init__(self, itf, addr=0x0c): """ AK8963 magnetometer If integrated with mpu9250, make an instance of mpu9250 and enable aux i2c before using AK8963 """ self.itf = itf self.addr = addr if self.whoami() is not self.WHOAMI: logger.error("AK893 at address {} is not ready!".format(addr))
[docs] def init(self): """ Initialise AKB8963 """ self.reset() self.adj = self.getAdjustment() if self.selftest(): raise Exception("AK8963 selftest failed!") time.sleep(0.01) self.setWord('MODE',0b0000) time.sleep(0.01) time.sleep(0.01) self.setWord('BIT',0b1) time.sleep(0.01)
[docs] def getAdjustment(self): self.setWord('MODE',0b1111) x=(self.getWord('asax') - 128.0)/256.0 + 1.0 y=(self.getWord('asay') - 128.0)/256.0 + 1.0 z=(self.getWord('asaz') - 128.0)/256.0 + 1.0 self.setWord('MODE',0b0000) time.sleep(0.01) return np.asarray([x,y,z])
[docs] def selftest(self): bitm = self.getWord('BITM') if bitm: bitwidth = 16 else: bitwidth = 14 self.setWord('MODE',0b0000) self.setWord('SELF',0b1) self.setWord('MODE',0b1000) while self.getWord('DRDY') != 1: pass rid, mask = self._getMask(self.DICT,'hxl') data = np.asarray(self.read(rid,6)) data = (data[1::2] << 8) | data[0::2] self.setWord('SELF',0b0) self.setWord('MODE',0b0000) data = signed(data,bitwidth) data = np.multiply(data,self.adj) err = 0 if bitwidth == 14: if data[0] < -50 or data[0] > 50: logger.error("X direction selftest out of range!") err += 1 if data[1] < -50 or data[1] > 50: logger.error("X direction selftest out of range!") err += 1 if data[2] < -800 or data[2] > 200: logger.error("X direction selftest out of range!") err += 1 else: if data[0] < -200 or data[0] > 200: logger.error("X direction selftest out of range!") err += 1 if data[1] < -200 or data[1] > 200: logger.error("X direction selftest out of range!") err += 1 if data[2] < -3200 or data[2] > 800: logger.error("X direction selftest out of range!") err += 1 return err
[docs] def whoami(self): rid, mask = self._getMask(self.DICT, 'wia') return self.read(rid)
[docs] def reset(self): self.setWord('cntl2',0x1) time.sleep(0.1)
[docs] def read(self,reg,length=1): return self.itf.read(self.addr,reg,length)
[docs] def write(self,reg,data): self.itf.write(self.addr,reg,data)
[docs] def getWord(self,name): rid, mask = self._getMask(self.DICT, name) return self._get(self.read(rid),mask)
[docs] def setWord(self,name,value): rid, mask = self._getMask(self.DICT, name) if mask == 0xff: data = self._set(0x0,value,mask) self.write(rid,data) else: data = self.read(rid) data = self._set(data,value,mask) self.write(rid,data)
def _set(self, d1, d2, mask=None): # Update some bits of d1 with d2, while keep other bits unchanged if mask: d1 = d1 & ~mask d2 = d2 * (mask & -mask) return d1 | d2 def _get(self, data, mask): data = data & mask return data / (mask & -mask) def _getMask(self, dicts, name): for rid in dicts: if name in dicts[rid]: return rid, dicts[rid][name] return None,None @property def mag(self): bitm = self.getWord('BITM') if bitm: bitwidth = 16 else: bitwidth = 14 self.setWord('MODE',0b0001) while not self.getWord('DRDY'): time.sleep(0.01) rid, mask = self._getMask(self.DICT,'hxl') data = np.asarray(self.read(rid,6)) data = (data[1::2] << 8) | data[0::2] if self.getWord('HOFL'): logger.warning('AK8963 overflown!') data = signed(data,bitwidth) if bitwidth==14: return data * 0.6 else: return data * 0.15
[docs]def signed(data,bitwidth): """ Convert an unsigned data into a 32-bit signed data .. code-block:: python signed(0xfff,12) # it's -1 signed(0xff,12) # it's 255 """ if isinstance(data,int): data &= (0b1<<bitwidth)-1 if data & (0b1<<(bitwidth-1)) != 0: data ^= (0xffffffff^(0b1<<bitwidth)-1) data = struct.unpack('>i',struct.pack('>I',data)) return data[0] elif isinstance(data,list) or isinstance(data,np.ndarray): vsigned = np.vectorize(signed) data = vsigned(data,bitwidth) return data else: raise ValueError("Invalid parameter")