import numpy as np
from filterpy.kalman import EnsembleKalmanFilter
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from scipy.integrate import solve_ivp
import time
[docs]def plot_1_fig(lorenz1,lorenz2,ind,tab_temps,tab_cov,diff,tab_x_y_z,labelx,labely):
"""
Parameters:
lorenz1: observations,an array of vectors where each vector is of size 3, representing [x,y,z]
lorenz2: model,an array of vectors where each vector is of size 3, representing [x,y,z]
ind:a real representing the index we want to plot (x, y or z)
tab_temps:real type array representing time
tab_cov:an array of vectors where each vector is of size 3, representing variances
(diag of the cov matrix associated to the state obtained through data assimilation)
diff:represent the distance to create our standart deviation variance
tab_x_y_z:represente an array of vectors where each vector is of size 3,array after data assimilation,
representing [x,y,z]
labelx:legend of the x-curve
labely:legend of the y-curve
Returns:
return nothing but plot 3 curves as a function of ind(either x,y or z) as a function of time
"""
fig=plt.figure(figsize=(18,5))
ax1 = fig.add_subplot(1,3,1)
ax1.plot(lorenz1[3],lorenz1[ind],label="observation (0.01)")
ax1.plot(lorenz2[3],lorenz2[ind],label="Model (0.1)")
ax1.plot(tab_temps,tab_x_y_z[:,ind],color='blue',label="apres l'assimilation de donnée")
plt.fill_between(tab_temps,tab_x_y_z[:,ind]+(tab_cov[:,ind]+diff),tab_x_y_z[:,ind]-(tab_cov[:,ind]+diff), color='blue',alpha=0.2)
plt.xlabel(labelx)
plt.ylabel(labely)
ax1.legend()
[docs]def plot(lorenz1,lorenz2,tab_temps,tab_x_y_z):
"""
Parameters:
lorenz1: observations,an array of vectors where each vector is of size 3, representing [x,y,z]
lorenz2: model,an array of vectors where each vector is of size 3, representing [x,y,z]
tab_temps:real type array representing time
tab_x_y_z:represente an array of vectors where each vector is of size 3,array after data assimilation,
representing [x,y,z]
Returns:
return nothing but makes three different plot, in each one we will have
3 curves (observations,model and stade after data assimilation) as a function of time
"""
fig=plt.figure(figsize=(18,6))
ax1 = fig.add_subplot(1,3,1)
ax1.plot(lorenz1[3],lorenz1[0],label="observation (0.01)")
ax1.plot(lorenz2[3],lorenz2[0],label="Model (0.1)")
ax1.plot(tab_temps,tab_x_y_z[:,0],label="apres l'assimilation de donnée")
plt.xlabel("t")
plt.ylabel("x")
ax1.legend(loc='upper right')
ax2 = fig.add_subplot(1,3,2)
ax2.plot(lorenz1[3],lorenz1[1],label="observation (0.01)")
ax2.plot(lorenz2[3],lorenz2[1],label="Model (0.1)")
ax2.plot(tab_temps,tab_x_y_z[:,1],label="apres l'assimilation de donnée")
plt.xlabel("t")
plt.ylabel("y")
ax1.legend(loc='upper right')
ax3 = fig.add_subplot(1,3,3)
ax3.plot(lorenz1[3],lorenz1[2],label="observation (0.01)")
ax3.plot(lorenz2[3],lorenz2[2],label="Model (0.1)")
ax3.plot(tab_temps,tab_x_y_z[:,2],label="apres l'assimilation de donnée")
plt.xlabel("t")
plt.ylabel("z")
ax1.legend(loc='upper right')
[docs]def f_orcillateur(t,X_n,w):
"""
Parameters:
X_n:un tableau de reel de taille 2
w: a real that represente the first parameter of the hamonic oscillator
"""
(u,v)=X_n
f_1 = v
f_2 =-w**2*u
return np.array([f_1,f_2])
[docs]def RK4_harmonique(w,X0,N,T):
"""
Parameters:
w: a real that represente the first parameter of the hamonic oscillator
X0:a size 2 array with the initial constition, initial point.
N: a real that represents the number of discritisation
T: a real that represents the time interval
Returns:
this fonction return resolution with RK4
X[:,0]: array of x
X[:,1]: array of y
T_tab: array of the time
"""
dt=T/N
X = np.zeros( (N+1, len(X0)) )
T_tab=np.zeros(N+1)
X[0] = X0
t=0. #=t_0
for n in range(1,N+1):
K1=f_orcillateur(t, X[n-1],w)
K2=f_orcillateur(t+dt/2., X[n-1] + 1./2. * K1 * dt,w)
K3=f_orcillateur(t+dt/2., X[n-1] + 1./2. * K2 * dt,w)
K4=f_orcillateur(t+dt, X[n-1]+ K3 * dt,w)
T_tab[n]=t+dt
X[n]=X[n-1]+ dt/6.* (K1+2.*K2+2.*K3+K4)
t+=dt
return X[:,0],X[:,1],T_tab
[docs]def euler_explicit_harmonique(w,X0,N,T):
"""
Parameters:
w: a real that represente the first parameter of the hamonic oscillator
X0:a size 2 array with the initial constition, initial point.
N: a real that represents the number of discritisation
T: a real that represents the time interval
Returns:
this fonction return resolution with euler
X[:,0]: array of x
X[:,1]: array of y
T_tab: array of the time
"""
dt=T/N
X = np.zeros( (N+1, len(X0)) )
T_tab=np.zeros(N+1)
X[0] = X0
t=0.
for n in range(1,N+1):
X[n]=[X[n-1,0]+(dt)*X[n-1,1],X[n-1,1]-w**2*X[n-1,0]*(dt)]
T_tab[n]=t+dt
t+=dt
return X[:,0],X[:,1],T_tab
[docs]def f(t_n,X_n,σ, b, r):
(x,y,z)=X_n
f_1 = σ*(y-x)
f_2 = x*(r-z)-y
f_3 = x*y-b*z
return np.array([f_1,f_2,f_3])
[docs]def RK4_Lorenz(γ,X0,N,T):
"""
Parameters:
γ: an array of real that represente the three parameter of the lorenz system (σ, b, r)
X0:a size 3 array with the initial constition, initial point (x,y,z)
N: a real that represents the number of discritisation
T: a real that represents the time interval
Returns:
this fonction return resolution with RK4
X[:,0]: array of x
X[:,1]: array of y
X[:,2]: array of z
T_tab: array of the time
"""
(σ,b,r)=γ
dt=T/N
X = np.zeros( (N+1, len(X0)) )
T_tab=np.zeros(N+1)
X[0] = X0
T_tab[0]=0
t=0. #=t_0
for n in range(1,N+1):
K1=f(t, X[n-1],σ,b,r)
K2=f(t+dt/2., X[n-1] + 1./2. * K1 * dt,σ,b,r)
K3=f(t+dt/2., X[n-1] + 1./2. * K2 * dt,σ,b,r)
K4=f(t+dt, X[n-1]+ K3 * dt,σ,b,r)
T_tab[n]=t+dt
X[n]=X[n-1]+ dt/6.* (K1+2.*K2+2.*K3+K4)
t+=dt
return X[:,0],X[:,1],X[:,2],T_tab
[docs]def fx(x,t, dt,γ):
def f(t_n,X_n,γ):
(x,y,z)=X_n
f_1 = γ[0]*(y-x)
f_2 = x*(γ[2]-z)-y
f_3 = x*y-γ[1]*z
return np.array([f_1,f_2,f_3])
K1=f(t, x,γ)
K2=f(t+dt/2., x + 1./2. * K1 * dt,γ)
K3=f(t+dt/2., x + 1./2. * K2 * dt,γ)
K4=f(t+dt, x+ K3 * dt,γ)
X_next=x+ dt/6.* (K1+2.*K2+2.*K3+K4)
return X_next
[docs]def assimilation_donnée(x,read_sensor,P,Q,R,T,dimz,dt,N,nb_echantillon,hx,fx,γ):
"""
Parameters:
x:a size 3 array with the initial constition, initial point (x,y,z)
read_sensor: a function that will read the observation every time
P:covariance matrix associated with the forecast state error
Q:covariance matrix associated with the model error
R:covariance matrix associated with the observations error
T:a real that represents the time interval
dimz: a real that represent the dimention of the observations
dt:a real that represent the time for each discretisation
N: a real that represents the index difference between the observations and the model
nb_echantillon: a real that represents the index difference between the observations and the model
hx:Measurement function. Convert state x into a measurement
fx:State transition function
γ: an array of real that represente the three parameter of the lorenz system (σ, b, r)
N: a real that represents the number of discritisation
Returns:
this function do the data assimilation using the ensemble kalman filter
-tab_etat: an array with the state mean after data assimilation
-tab_temps: an array with the time
-tab_cov: an array with the diagonal of the state covarience matrix pour each time
"""
f = EnsembleKalmanFilter (x=x, P=P, dim_z=dimz, dt=dt, N=nb_echantillon,hx=hx, fx=lambda x,dt:fx(x,t,dt,γ))
f.R = R # matrice de cov associer a la mesure
f.Q =Q #bruit blanc centree en 0
t=0
index=N
tab_etat=[]
tab_temps=[]
tab_cov=[]
tab_etat.append(f.x)
tab_temps.append(t)
tab_cov.append(f.P_post.diagonal())
while (t<T-dt):#
z = read_sensor(index)
f.predict()
f.update(z)
diag_cov=f.P_post.diagonal()
tab_cov.append(diag_cov)
index+=N
t=t+dt
tab_etat.append(f.x)
tab_temps.append(t)
return(np.array(tab_etat),np.array(tab_temps),np.array(tab_cov))