Example on a non-linear case study on how to use Unscented Kalman filterΒΆ
This example shows how to use an Unscented Kalman filter in Python.
This example is a non-linear case study.
The state vector and measurements are explained in the C++ example.
""" @example ukf-nonlinear-example.py
Example of a simple non-linear use-case of the Unscented Kalman Filter (UKF).
The system we are interested in is an aircraft flying in the sky and
observed by a radar station.
We consider the plan perpendicular to the ground and passing by both the radar
station and the aircraft. The x-axis corresponds to the position on the ground
and the y-axis to the altitude.
The state vector of the UKF is:
\f[
\begin{array}{lcl}
\textbf{x}[0] &=& x \\
\textbf{x}[1] &=& \dot{x} \\
\textbf{x}[1] &=& y \\
\textbf{x}[2] &=& \dot{y}
\end{array}
\f]
The measurement \f$ \textbf{z} \f$ corresponds to the distance and angle between the ground and the aircraft
observed by the radar station. Be \f$ p_x \f$ and \f$ p_y \f$ the position of the radar station
along the x and y axis, the measurement vector can be written as:
\f[
\begin{array}{lcl}
\textbf{z}[0] &=& \sqrt{(p_x^i - x)^2 + (p_y^i - y)^2} \\
\textbf{z}[1] &=& \tan^{-1}{\frac{y - p_y}{x - p_x}}
\end{array}
\f]
Some noise is added to the measurement vector to simulate a sensor which is
not perfect.
"""
from visp.core import ColVector, Matrix, UnscentedKalman, UKSigmaDrawerMerwe, Math
import numpy as np
from typing import List
# For the Graphical User Interface
try:
from visp.gui import Plot
has_gui = True
except:
has_gui = False
import numpy as np
def normalize_angle(angle: float) -> float:
angle_0_to_2pi = Math.modulo(angle, 2. * np.pi)
angle_MinPi_Pi = angle_0_to_2pi
if angle_MinPi_Pi > np.pi:
# Substract 2 PI to be in interval [-Pi; Pi]
angle_MinPi_Pi = angle_MinPi_Pi - 2. * np.pi
return angle_MinPi_Pi
def measurement_mean(measurements: List[ColVector], wm: List[float]) -> ColVector:
"""
Compute the weighted mean of measurement vectors.
:param measurements: The measurement vectors, such as measurements[i][0] = range and
measurements[i][1] = elevation_angle.
:param wm: The associated weights.
:return vpColVector: The weighted mean of the measurement vectors.
"""
wm_np = np.array(wm)
ranges = np.array([meas[0] for meas in measurements])
elev_angles = np.array([meas[1] for meas in measurements])
meanRange = (wm_np * ranges).sum()
sum_cos = (wm_np * np.cos(elev_angles)).sum()
sum_sin = (wm_np * np.sin(elev_angles)).sum()
mean_angle = np.arctan2(sum_sin, sum_cos)
return ColVector([meanRange, mean_angle])
def measurementResidual(meas: ColVector, to_subtract: ColVector) -> ColVector:
"""
Compute the subtraction between two vectors expressed in the measurement space,
such as v[0] = range ; v[1] = elevation_angle
:param meas: Measurement to which we must subtract something.
:param toSubtract: The something we must subtract.
:return vpColVector: \b meas - \b toSubtract .
"""
res_temp = meas - to_subtract
return ColVector([res_temp[0], normalize_angle(res_temp[1])])
def state_add_vectors(a, b) -> ColVector:
"""
Method that permits to add two state vectors.
:param a: The first state vector to which another state vector must be added.
:param b: The other state vector that must be added to a.
:return ColVector: The sum a + b.
"""
return a + b
def state_residual_vectors(a, b) -> ColVector:
"""
Method that permits to subtract a state vector to another.
:param a: The first state vector to which another state vector must be subtracted.
:param b: The other state vector that must be subtracted to a.
:return ColVector: The subtraction a - b.
"""
return a - b
def fx(x: ColVector, dt: float) -> ColVector:
"""
Process function that projects in time the internal state of the UKF.
:param x: The internal state of the UKF.
:param dt: The sampling time: how far in the future are we projecting x.
:return ColVector: The updated internal state, projected in time, also known as the prior.
"""
return ColVector([
x[0] + dt * x[1],
x[1],
x[2] + dt * x[3],
x[3]
])
class vpRadarStation:
"""
Class that permits to convert the position of the aircraft into
range and elevation angle measurements.
"""
def __init__(self, x: float, y: float, range_std: float, elev_angle_std: float):
"""
Construct a new vpRadarStation
:param x: The position on the ground of the radar.
:param y: The altitude of the radar.
:param range_std: The standard deviation of the range measurements.
:param elev_angle_std: The standard deviation of the elevation angle measurements.
"""
self._x = x
self._y = y
self._stdevRange = range_std
self._stdevElevAngle = elev_angle_std
def state_to_measurement(self, x: ColVector) -> ColVector:
"""
Measurement function that expresses the internal state of the UKF in the measurement space.
:param x: The internal state of the UKF.
:return ColVector: The internal state, expressed in the measurement space.
"""
dx = x[0] - self._x
dy = x[2] - self._y
range = np.sqrt(dx * dx + dy * dy)
elev_angle = np.arctan2(dy, dx)
return ColVector([range, elev_angle])
def measure_gt(self, pos: ColVector) -> ColVector:
"""
Perfect measurement of the range and elevation angle that
correspond to pos.
:param pos: The actual position of the aircraft (pos[0]: projection of the position
on the ground, pos[1]: altitude).
:return ColVector: [0] the range [1] the elevation angle.
"""
dx = pos[0] - self._x
dy = pos[1] - self._y
range = np.sqrt(dx * dx + dy * dy)
elev_angle = np.arctan2(dy, dx)
return ColVector([range, elev_angle])
def measure_with_noise(self, pos: ColVector) -> ColVector:
"""
Noisy measurement of the range and elevation angle that
correspond to pos.
:param pos: The actual position of the aircraft (pos[0]: projection of the position
on the ground, pos[1]: altitude).
:return vpColVector: [0] the range [1] the elevation angle.
"""
measurements_GT = self.measure_gt(pos)
measurements_noisy = ColVector([measurements_GT[0] + np.random.normal(0., self._stdevRange), measurements_GT[1] + np.random.normal(0., self._stdevElevAngle)])
return measurements_noisy
class vpACSimulator:
"""
Class to simulate a flying aircraft.
"""
def __init__(self, X0: ColVector, vel: ColVector, vel_std: float):
"""
Construct a new vpACSimulator object.
:param X0: Initial position of the aircraft.
:param vel: Velocity of the aircraft.
:param vel_std: Standard deviation of the variation of the velocity.
"""
self._pos = X0 # Position of the simulated aircraft
self._vel = vel # Velocity of the simulated aircraft
np.random.seed(4224)
self._stdevVel = vel_std # Standard deviation of the random generator for slight variations of the velocity of the aircraft
def update(self, dt: float) -> ColVector:
"""
Compute the new position of the aircraft after dt seconds have passed
since the last update.
:param dt: Period since the last update.
:return ColVector: The new position of the aircraft.
"""
dx_temp = self._vel * dt
dx = ColVector([dx_temp[0] + np.random.normal(0., self._stdevVel) * dt, dx_temp[1] + np.random.normal(0., self._stdevVel) * dt])
self._pos += dx
return self._pos
def generate_Q_matrix(dt: float) -> Matrix:
"""
Method that generates the process covariance matrix for a process for which the
state vector can be written as (x, dx/dt)^T
:param dt: The sampling period.
:return Matrix: The corresponding process covariance matrix.
"""
return Matrix(
[[dt**3/3, dt**2/2, 0, 0],
[dt**2/2, dt, 0, 0],
[0, 0, dt**3/3, dt**2/2],
[0, 0, dt**2/2, dt]])
def generate_P0_matrix() -> Matrix:
"""
Method that generates the intial guess of the state covariance matrix.
@return Matrix The corresponding state covariance matrix.
"""
return Matrix(
[[300*300, 0, 0, 0],
[0, 30*30, 0, 0],
[0, 0, 150*150, 0],
[0, 0, 0, 30*30]])
if __name__ == '__main__':
dt = 3. # The sampling period
gt_X_init = -500. # Ground truth initial position along the X-axis, in meters
gt_Y_init = 1000. # Ground truth initial position along the Y-axis, in meters
gt_vX_init = 100. # The velocity along the x-axis
gt_vY_init = 5. # The velocity along the y-axis
proc_var = 0.1 # The variance of the process function
sigma_range = 5 # Standard deviation of the range measurement: 5m
sigma_elev_angle = Math.rad(0.5) # Standard deviation of the elevation angle measurent: 0.5deg
stdev_aircraft_velocity = 0.2; # Standard deviation of the velocity of the simulated aircraft,
# to make it deviate a bit from the constant velocity model
# The object that draws the sigma points used by the UKF
drawer = UKSigmaDrawerMerwe(n=4, alpha=0.3, beta=2, kappa=-1, resFunc=state_residual_vectors, addFunc=state_add_vectors)
# The object that performs radar measurements
radar = vpRadarStation(0., 0., sigma_range, sigma_elev_angle)
P0 = generate_P0_matrix() # The initial estimate of the state covariance matrix
R = Matrix([[sigma_range * sigma_range, 0], [0, sigma_elev_angle * sigma_elev_angle]]) # The measurement covariance matrix
Q = generate_Q_matrix(dt) * proc_var # The process covariance matrix
ukf = UnscentedKalman(Q, R, drawer, fx, radar.state_to_measurement) # The Unscented Kalman Filter instance
# Initializing the state vector and state covariance matrix estimates
ukf.init(ColVector([0.9 * gt_X_init, 0.9 * gt_vX_init, 0.9 * gt_Y_init, 0.9 * gt_vY_init]), P0)
ukf.setMeasurementMeanFunction(measurement_mean)
ukf.setMeasurementResidualFunction(measurementResidual)
# Initializing the Graphical User Interface if the needed libraries are available
if has_gui:
num_plots = 4
plot = Plot(num_plots)
plot_titles = [
'Position along X-axis', 'Velocity along X-axis',
'Position along Y-axis', 'Velocity along Y-axis'
]
plot_y_units = [
'Position (m)', 'Velocity (m/s)',
'Position (m)', 'Velocity (m/s)'
]
plot_legends = ['GT', 'Filtered']
# Initializing the subplots
for plot_index in range(num_plots):
plot.initGraph(plot_index, len(plot_legends))
plot.setTitle(plot_index, plot_titles[plot_index])
plot.setUnitY(plot_index, plot_y_units[plot_index])
plot.setUnitX(plot_index, 'Time (s)')
for legend_index in range(len(plot_legends)):
plot.setLegend(plot_index, legend_index, plot_legends[legend_index])
ac_pos = ColVector([gt_X_init, gt_Y_init]) # Ground truth position
ac_vel = ColVector([gt_vX_init, gt_vY_init]) # Ground truth velocity
ac = vpACSimulator(ac_pos, ac_vel, stdev_aircraft_velocity)
gt_X_prev = ColVector([ac_pos[0], ac_pos[1]]) # Previous ground truth position
for i in range(500):
# Creating noisy measurements
gt_X = ac.update(dt)
gt_V = (gt_X - gt_X_prev) / dt
z = radar.measure_with_noise(gt_X)
# Filtering using the UKF
ukf.filter(z, dt)
# Getting the filtered state vector
Xest = ukf.getXest()
# Update the GUI if available
if has_gui:
plot.plot(0, 0, i, gt_X[0])
plot.plot(0, 1, i, Xest[0])
plot.plot(1, 0, i, gt_V[0])
plot.plot(1, 1, i, Xest[1])
plot.plot(2, 0, i, gt_X[1])
plot.plot(2, 1, i, Xest[2])
plot.plot(3, 0, i, gt_V[1])
plot.plot(3, 1, i, Xest[3])
# Updating last measurement for future computation of the noisy velocity
gt_X_prev = ColVector([gt_X[0], gt_X[1]])
print('Finished')
input('Press enter to quit')