Example on a complex non-linear case study on how to use Unscented Kalman filter¶

This example shows how to use an Unscented Kalman filter in Python.

This example is a complex non-linear case study.

The state vector and measurements are explained in the C++ example.


""" @example ukf-nonlinear-complex-example.py
 Example of a complex non-linear use-case of the Unscented Kalman Filter (UKF).
  The system we are interested in is a 4-wheel robot, moving at a low velocity.
  As such, it can be modeled unp.sing a bicycle model.

  The state vector of the UKF is:
  \f[
  \begin{array}{lcl}
        \textbf{x}[0] &=& x \\
        \textbf{x}[1] &=& y \\
        \textbf{x}[2] &=& \theta
   \end{array}
   \f]
   where \f$ \theta \f$ is the heading of the robot.

   The measurement \f$ \textbf{z} \f$ corresponds to the distance and relative orientation of the
   robot with different landmarks. Be \f$ p_x^i \f$ and \f$ p_y^i \f$ the position of the \f$ i^{th} \f$ landmark
   along the x and y axis, the measurement vector can be written as:
   \f[
   \begin{array}{lcl}
        \textbf{z}[2i] &=& \sqrt{(p_x^i - x)^2 + (p_y^i - y)^2} \\
        \textbf{z}[2i+1] &=& \tan^{-1}{\frac{p_y^i - y}{p_x^i - x}} - \theta
   \end{array}
   \f]

  Some noise is added to the measurement vector to simulate measurements which are
  not perfect.

   The mean of several angles must be computed in the Unscented Transform. The definition we chose to use
   is the following:

   \f$ mean(\boldsymbol{\theta}) = atan2 (\frac{\sum_{i=1}^n sin{\theta_i}}{n}, \frac{\sum_{i=1}^n cos{\theta_i}}{n})  \f$

   As the Unscented Transform uses a weighted mean, the actual implementation of the weighted mean
   of several angles is the following:

   \f$ mean_{weighted}(\boldsymbol{\theta}) = atan2 (\sum_{i=1}^n w_m^i sin{\theta_i}, \sum_{i=1}^n w_m^i cos{\theta_i})  \f$

   where \f$ w_m^i \f$ is the weight associated to the \f$ i^{th} \f$ measurements for the weighted mean.

   Additionnally, the addition and subtraction of angles must be carefully done, as the result
   must stay in the interval \f$[- \pi ; \pi ]\f$ or \f$[0 ; 2 \pi ]\f$ . We decided to use
   the interval \f$[- \pi ; \pi ]\f$ .
"""

from visp.core import ColVector, Matrix, UnscentedKalman, UKSigmaDrawerMerwe, Math
import numpy as np
from typing import List
from math import sqrt

# For the Graphical User Interface
try:
  from visp.gui import Plot
  has_gui = True
except:
  has_gui = False
import numpy as np

def normalize_angle(angle: float) -> float:
  angle_0_to_2pi = Math.modulo(angle, 2. * np.pi)
  angle_MinPi_Pi = angle_0_to_2pi
  if angle_MinPi_Pi > np.pi:
    # Substract 2 PI to be in interval [-Pi; Pi]
    angle_MinPi_Pi = angle_MinPi_Pi - 2. * np.pi
  return angle_MinPi_Pi

def measurement_mean(measurements: List[ColVector], wm: List[float]) -> ColVector:
  """
  Compute the weighted mean of measurement vectors.

  :param measurements: The measurement vectors, such as v[0] = dist_0 ; v[1] = bearing_0;
    v[2] = dist_1 ; v[3] = bearing_1 ...
  :param wm: The associated weights.

  :return ColVector: The weighted mean.
  """
  nb_points = len(measurements)
  size_measurement = measurements[0].size()
  nb_landmarks = size_measurement // 2
  mean = np.zeros(size_measurement)
  sum_cos = np.zeros(nb_landmarks)
  sum_sin = np.zeros(nb_landmarks)

  for i in range(nb_points):
    for j in range(nb_landmarks):
      mean[2*j] += wm[i] * measurements[i][2*j]
      sum_cos[j] += np.cos(measurements[i][(2*j)+1]) * wm[i]
      sum_sin[j] += np.sin(measurements[i][(2*j)+1]) * wm[i]

  orientations = np.arctan2(sum_sin, sum_cos)
  mean[1::2] = orientations
  return ColVector(mean)

def measurement_residual(meas: ColVector, to_subtract: ColVector) -> ColVector:
  """
  Compute the subtraction between two vectors expressed in the measurement space,
  such as v[0] = dist_0 ; v[1] = bearing_0; v[2] = dist_1 ; v[3] = bearing_1 ...

  :param meas: Measurement to which we must subtract something.
  :param toSubtract: The something we must subtract.

  :return ColVector: \b meas - \b toSubtract .
  """
  res = meas.numpy() - to_subtract.numpy()
  res[1::2] = [normalize_angle(angle) for angle in res[1::2]]
  return ColVector(res)

def state_add_vectors(a: ColVector, b: ColVector) -> ColVector:
  """
  Compute the addition between two vectors expressed in the state space,
  such as v[0] = x ; v[1] = y; v[2] = heading .

  :param a: The first state vector to which another state vector must be added.
  :param b: The other state vector that must be added to a.

  :return ColVector: The sum a + b.
  """
  add = a + b
  return ColVector([add[0], add[1], normalize_angle(add[2])] )


def state_mean_vectors(states: List[ColVector], wm: List[float]) -> ColVector:
  """
  Compute the weighted mean of state vectors.

  :param states: The state vectors.
  :param wm: The associated weights.
  :return ColVector: The weighted mean.
 """
  mean = np.zeros(3)
  wm_np = np.array(wm)
  weighted_x = wm_np * np.array([state[0] for state in states])
  weighted_y = wm_np * np.array([state[1] for state in states])
  mean[0] = np.array(weighted_x).sum()
  mean[1] = np.array(weighted_y).sum()
  sumCos = (wm_np * np.array([np.cos(state[2]) for state in states])).sum()
  sumSin = (wm_np * np.array([np.sin(state[2]) for state in states])).sum()
  mean[2] = np.arctan2(sumSin, sumCos)
  return ColVector(mean)

def state_residual_vectors(a, b) -> ColVector:
  """
  Compute the subtraction between two vectors expressed in the state space,
  such as v[0] = x ; v[1] = y; v[2] = heading .

  :param a: The first state vector to which another state vector must be subtracted.
  :param b: The other state vector that must be subtracted to a.

  :return ColVector: The subtraction a - b.
  """
  res = a - b
  return ColVector([res[0], res[1], normalize_angle(res[2])])

def fx(x: ColVector, dt: float) -> ColVector:
  """
  As the state model {x, y, \f$ \theta \f$} does not contain any velocity
  information, it does not evolve without commands.

  :param x: The internal state of the UKF.
  :param dt: The sampling time: how far in the future are we projecting x.

  :return ColVector: The updated internal state, projected in time, also known as the prior.
  """
  return x

def generate_turn_commands(v: float, angleStart: float, angleStop: float, nbSteps: int) -> List[ColVector]:
  """
  Compute the commands realising a turn at constant linear velocity.

  :param v: Constant linear velocity.
  :param angleStart: Starting angle (in degrees).
  :param angleStop: Stop angle (in degrees).
  :param nbSteps: Number of steps to perform the turn.
  :return List[ColVector]: The corresponding list of commands.
  """
  cmds = []
  dTheta = Math.rad(angleStop - angleStart) / float(nbSteps - 1)
  for i in range(nbSteps):
    theta = Math.rad(angleStart) + dTheta * float(i)
    cmd = ColVector([v, theta])
    cmds.append(cmd)
  return cmds

def generate_commands() -> List[ColVector]:
  """
  Generate the list of commands for the simulation.

  :return List[ColVector]: The list of commands to use in the simulation
  """
  cmds = []
  # Starting by a straight line acceleration
  nbSteps = 30
  dv = (1.1 - 0.001) / float(nbSteps - 1)
  for i in range(nbSteps):
    cmd = ColVector([0.001 + float(i) * dv, 0.])
    cmds.append(cmd)

  # Left turn
  lastLinearVelocity = cmds[len(cmds) -1][0]
  leftTurnCmds = generate_turn_commands(lastLinearVelocity, 0, 2, 15)
  cmds.extend(leftTurnCmds)
  for i in range(100):
    cmds.append(cmds[len(cmds) -1])

  # Right turn
  lastLinearVelocity = cmds[len(cmds) -1][0]
  rightTurnCmds = generate_turn_commands(lastLinearVelocity, 2, -2, 15);
  cmds.extend(rightTurnCmds)
  for i in range(200):
    cmds.append(cmds[len(cmds) -1])

  # Left  turn again
  lastLinearVelocity = cmds[len(cmds) -1][0]
  leftTurnCmds = generate_turn_commands(lastLinearVelocity, -2, 0, 15)
  cmds.extend(leftTurnCmds)
  for i in range(150):
    cmds.append(cmds[len(cmds) -1])

  lastLinearVelocity = cmds[len(cmds) -1][0]
  leftTurnCmds = generate_turn_commands(lastLinearVelocity, 0, 1, 25)
  cmds.extend(leftTurnCmds)
  for i in range(150):
    cmds.append(cmds[len(cmds) -1])

  return cmds

class vpBicycleModel:
  """
  Class that approximates a 4-wheel robot unp.sing a bicycle model.
  """
  def __init__(self, w: float):
    """
    Construct a new vpBicycleModel object.

    :param w:The length of the wheelbase.
    """
    self._w = w # The length of the wheelbase.

  def compute_motion(self, u: ColVector, x: ColVector, dt: float) -> ColVector:
    """
    Models the effect of the command on the state model.

    :param u: The commands. u[0] = velocity ; u[1] = steeringAngle .
    :param x: The state model. x[0] = x ; x[1] = y ; x[2] = heading
    :param dt: The period.
    :return ColVector: The state model after applying the command.
    """
    heading = x[2]
    vel = u[0]
    steeringAngle = u[1]
    distance = vel * dt

    if (abs(steeringAngle) > 0.001):
      # The robot is turning
      beta = (distance / self._w) * np.tan(steeringAngle)
      radius = self._w / np.tan(steeringAngle)
      sinh = np.sin(heading)
      sinhb = np.sin(heading + beta)
      cosh = np.cos(heading)
      coshb = np.cos(heading + beta)
      motion = ColVector([
        -radius * sinh + radius * sinhb,
        radius * cosh - radius * coshb,
        beta
      ])
      return motion
    else:
      # The robot is moving in straight line
      motion = ColVector([
        distance * np.cos(heading),
        distance * np.sin(heading),
        0.
      ])
      return motion

  def move(self, u: ColVector, x: ColVector, dt: float) -> ColVector:
    """
    Models the effect of the command on the state model.

    :param u: The commands. u[0] = velocity ; u[1] = steeringAngle .
    :param x: The state model. x[0] = x ; x[1] = y ; x[2] = heading
    :param dt: The period.
    :return ColVector: The state model after applying the command.
    """
    motion = self.compute_motion(u, x, dt)
    newX = x + motion
    normalizedAngle = normalize_angle(newX[2])
    return ColVector([newX[0], newX[1], normalizedAngle])

class LandmarkMeasurements:
  """
  Class that permits to convert the position + heading of the 4-wheel
  robot into measurements from a landmark.
  """
  def __init__(self, x: float, y: float,  range_std: float, rel_angle_std: float):
    """
    Construct a new LandmarkMeasurements object.

    :param x: The position along the x-axis of the landmark.
    :param y: The position along the y-axis of the landmark.
    :param range_std: The standard deviation of the range measurements.
    :param rel_angle_std: The standard deviation of the relative angle measurements.
    """
    self._x = x # The position along the x-axis of the landmark
    self._y = y # The position along the y-axis of the landmark
    self._range_std = range_std # The standard deviation of the range measurement
    self._rel_angle_std = rel_angle_std # The standard deviation of the relative angle measurement
    np.random.seed(4224)

  def state_to_measurement(self, chi: ColVector) -> ColVector:
    """
    Convert the prior of the UKF into the measurement space.

    :param chi: The prior.
    :return ColVector: The prior expressed in the measurement space.
    """
    dx = self._x - chi[0]
    dy = self._y - chi[1]
    meas = ColVector([sqrt(dx * dx + dy * dy), normalize_angle(np.arctan2(dy, dx) - chi[2])])
    return meas

  def measure_gt(self, pos: ColVector) -> ColVector:
    """
    Perfect measurement of the range and relative orientation of the robot
    located at pos.

    :param pos: The actual position of the robot (pos[0]: x, pos[1]: y, pos[2] = heading.
    :return ColVector: [0] the range [1] the relative orientation of the robot.
    """
    dx = self._x - pos[0]
    dy = self._y - pos[1]
    range = sqrt(dx * dx + dy * dy)
    orientation = normalize_angle(np.arctan2(dy, dx) - pos[2])
    measurements = ColVector([range, orientation])
    return measurements

  def measure_with_noise(self, pos: ColVector) -> ColVector:
    """
    Noisy measurement of the range and relative orientation that
    correspond to pos.

    :param pos: The actual position of the robot (pos[0]: x ; pos[1] = y ; pos[2] = heading).
    :return ColVector: [0] the range [1] the relative orientation.
    """
    measurementsGT = self.measure_gt(pos)
    measurementsNoisy = measurementsGT
    range = measurementsNoisy[0] + np.random.normal(0., self._range_std)
    relAngle = normalize_angle(measurementsNoisy[1] + np.random.normal(0., self._rel_angle_std))
    return ColVector([range, relAngle])


class LandmarksGrid:
  """
  Class that represent a grid of landmarks that measure the distance and
  relative orientation of the 4-wheel robot.
  """
  def __init__(self, landmarks: List[LandmarkMeasurements]):
    """
    Construct a new LandmarksGrid object.

    :param landmarks: The list of landmarks forming the grid.
    """
    self._landmarks = landmarks # The list of landmarks forming the grid.

  def state_to_measurement(self, chi: ColVector) -> ColVector:
    """
    Convert the prior of the UKF into the measurement space.

    :param chi: The prior.
    :return ColVector: The prior expressed in the measurement space.
    """
    nbLandmarks = len(self._landmarks)
    measurements = np.zeros(2*nbLandmarks)
    for i in range (nbLandmarks):
      landmarkMeas = self._landmarks[i].state_to_measurement(chi)
      measurements[2*i] = landmarkMeas[0]
      measurements[(2*i) + 1] = landmarkMeas[1]
    return ColVector(measurements)

  def measure_gt(self, pos: ColVector) -> ColVector:
    """
    Perfect measurement from each landmark of the range and relative orientation of the robot
    located at pos.

    :param pos: The actual position of the robot (pos[0]: x, pos[1]: y, pos[2] = heading.
    :return ColVector: n x ([0] the range [1] the relative orientation of the robot), where
    n is the number of landmarks.
    """
    nbLandmarks = len(self._landmarks)
    measurements = np.zeros(2*nbLandmarks)
    for i in range(nbLandmarks):
      landmarkMeas = self._landmarks[i].measure_gt(pos)
      measurements[2*i] = landmarkMeas[0]
      measurements[(2*i) + 1] = landmarkMeas[1]
    return ColVector(measurements)

  def measure_with_noise(self, pos: ColVector) -> ColVector:
    """
    Noisy measurement from each landmark of the range and relative orientation that
    correspond to pos.

    :param pos: The actual position of the robot (pos[0]: x ; pos[1] = y ; pos[2] = heading).
    :return ColVector: n x ([0] the range [1] the relative orientation of the robot), where
    n is the number of landmarks.
    """
    nbLandmarks = len(self._landmarks)
    measurements = np.zeros(2*nbLandmarks)
    for i in range(nbLandmarks):
      landmarkMeas = self._landmarks[i].measure_with_noise(pos)
      measurements[2*i] = landmarkMeas[0]
      measurements[(2*i) + 1] = landmarkMeas[1]
    return ColVector(measurements)

if __name__ == '__main__':
  dt = 0.1 # Period of 0.1s
  step = 1. # Number of update of the robot position between two UKF filtering
  sigma_range = 0.3 # Standard deviation of the range measurement: 0.3m
  sigma_bearing = Math.rad(0.5) # Standard deviation of the bearing angle: 0.5deg
  wheelbase = 0.5 # Wheelbase of 0.5m
  process_variance = 0.0001
  positions = [ (5, 10) , (10, 5), (15, 15), (20, 5), (0, 30), (50, 30), (40, 10)] # Positions of the landmarks constituting the grid
  landmarks = [LandmarkMeasurements(x, y, sigma_range, sigma_bearing) for x,y in positions] # Vector of landmarks constituting the grid
  nbLandmarks = len(landmarks) # Number of landmarks constituting the grid
  cmds = generate_commands()
  nb_cmds = len(cmds)

  # Creation of the simulated grid of landmarks and robot
  grid = LandmarksGrid(landmarks)
  robot = vpBicycleModel(wheelbase)

  # The object that draws the sigma points used by the UKF
  drawer = UKSigmaDrawerMerwe(n=3, alpha=0.1, beta=2, kappa=0, resFunc=state_residual_vectors, addFunc=state_add_vectors)

  # The matrices require for the construction of the Unscented filter
  P0 = Matrix([[0.1, 0., 0.],
               [0., 0.1, 0.],
               [0., 0., 0.05]]) # The initial estimate of the state covariance matrix
  R1landmark = Matrix([[sigma_range * sigma_range, 0], [0, sigma_bearing*sigma_bearing]]) # The measurement covariance matrix for 1 landmark
  R = Matrix(2*nbLandmarks, 2 * nbLandmarks) # The measurement covariance matrix for the grid of landmarks
  for i in range(nbLandmarks):
    R.insert(R1landmark, 2*i, 2*i)

  Q = Matrix() # The process covariance matrix
  Q.eye(3)
  Q = Q * process_variance
  X0 = ColVector([2., 6., 0.3]) # robot_x, robot_y, robot_orientation(rad)

  # Creation of the Unscented Kalman filter
  ukf = UnscentedKalman(Q, R, drawer, fx, grid.state_to_measurement) # The Unscented Kalman Filter instance

  # Initializing the state vector and state covariance matrix estimates
  ukf.init(X0, P0)
  ukf.setCommandStateFunction(robot.compute_motion)
  ukf.setMeasurementMeanFunction(measurement_mean)
  ukf.setMeasurementResidualFunction(measurement_residual)
  ukf.setStateAddFunction(state_add_vectors)
  ukf.setStateMeanFunction(state_mean_vectors)
  ukf.setStateResidualFunction(state_residual_vectors)

  # Initializing the Graphical User Interface if the needed libraries are available
  if has_gui:
    num_plots = 1
    plot = Plot(num_plots)
    plot.initGraph(0, 2)
    plot.setTitle(0, "Position of the robot")
    plot.setUnitX(0, "Position along x(m)")
    plot.setUnitY(0, "Position along y (m)")
    plot.setLegend(0, 0, "GT")
    plot.setLegend(0, 1, "Filtered")

  robot_pos = X0
  for i in range(nb_cmds):
    robot_pos = robot.move(cmds[i], robot_pos, dt / step)

    if (i % int(step) == 0):
      # Perform the measurement
      z = grid.measure_with_noise(robot_pos)

      # Use the UKF to filter the measurement
      ukf.filter(z, dt, cmds[i])

      if has_gui:
        # Plot the filtered state
        Xest = ukf.getXest()
        plot.plot(0, 1, Xest[0], Xest[1])

    # Update the GUI if available
    if has_gui:
      # Plot the ground truth
      plot.plot(0, 0, robot_pos[0], robot_pos[1])

  print('Finished')
  input('Press enter to quit')