Source code for lorenz.parareal.utils

import os
import numpy as np
import pandas as panda
import matplotlib.pyplot as plt

[docs]def delete_old_files_lorenz(): """Delete old csv files created for the lorenz system. """ if(os.path.isfile("data_parareal/solx.csv")): os.remove("data_parareal/solx.csv") if(os.path.isfile("data_parareal/soly.csv")): os.remove("data_parareal/soly.csv") if(os.path.isfile("data_parareal/solz.csv")): os.remove("data_parareal/solz.csv") if(os.path.isfile("data_parareal/sol_rk4.csv")): os.remove("data_parareal/sol_rk4.csv") if(os.path.isfile("data_parareal/init_pt_x.csv")): os.remove("data_parareal/init_pt_x.csv") if(os.path.isfile("data_parareal/init_pt_y.csv")): os.remove("data_parareal/init_pt_y.csv") if(os.path.isfile("data_parareal/init_pt_z.csv")): os.remove("data_parareal/init_pt_z.csv") print("Old files deleted")
[docs]def delete_olf_files_oscillator(): """Delete old csv files created for the oscillator. """ if(os.path.isfile("data_parareal/solx_oscillateur.csv")): os.remove("data_parareal/solx_oscillateur.csv") if(os.path.isfile("data_parareal/sol_exacte_oscillateur.csv")): os.remove("data_parareal/sol_exacte_oscillateur.csv") if(os.path.isfile("data_parareal/init_pt_oscillateur.csv")): os.remove("data_parareal/init_pt_oscillateur.csv") print("Old files deleted")
[docs]def compute_time(t0,T,dt_G,P): """Compute time used to the system resolution. Args: t0 (float): Starting time. T (float): Finish time. dt_G (float): Coarse time step. P (int): Number of interval (= number of processes). Returns: list: Times used to the system resolution. """ # time between t_j and t_{j+1} dt_P = (T-t0)/P # nb points nb_pts = dt_P//dt_G # t_j exact times_exact = [t0] for j in range(1,P+1): times_exact.append(times_exact[-1] + dt_P) times_exact[-1]=T # t_j approach (to be a multiple of dt_G) times = [t0] for i in range(1,P+1): exact=times_exact[i-1]+dt_P approche=times[i-1]+dt_G*nb_pts if(exact-approche<=abs(exact-(approche+dt_G))): #lower rounding times.append(times[i-1] + dt_G*nb_pts) else: #upper rounding times.append(times[i-1] + dt_G*(nb_pts+1)) times[-1]=T return times
[docs]def sol_converge(x0_k,x0_knext,eps=1e-15): """Check if initial points converge. Args: x0_k (numpy.ndarray): Initial points at the previous time. x0_knext (numpy.ndarray): Initial points at the current time. eps (float, optional): Error tolerance. Defaults to 1e-15. Returns: bool: Boolean to true if the initial points converge. """ return (np.max(np.abs(x0_knext-x0_k))/np.max(np.abs(x0_knext))) < eps
[docs]def csv_files_lorenz(t0,T,dt_F,times,nb_tj,nb_iter,solution,init_pts, reshape_size,fct,fct_res,gamma=None): """Create and write in the csv files for the Lorenz system. Args: t0 (float): Starting time. T (float): Finish time. dt_F (float): Time step. times (list): Times used to the system resolution. nb_tj (numpy.ndarray): Number of value of the solution for each process. nb_iter (int): Number of iteration (until the solution converge). solution (numpy.ndarray): Solution for each iteration. init_pts (numpy.ndarray): Init points for each iteration. reshape_size (int): Dimension of the system. fct (function): Function which represent the ODE. fct_res (function): Function which represent the integrator. gamma (tuple, optional): System parameters. Defaults to None. """ # time between t0 and T for the fine integrator t = np.arange(t0,T+1e-6,dt_F) # init init_pts_x = panda.DataFrame(times[:-1],columns=['t'],dtype=np.float64) init_pts_x.insert(len(init_pts_x.columns),'nb_pts',nb_tj) init_pts_y = panda.DataFrame(times[:-1],columns=['t'],dtype=np.float64) init_pts_y.insert(len(init_pts_y.columns),'nb_pts',nb_tj) init_pts_z = panda.DataFrame(times[:-1],columns=['t'],dtype=np.float64) init_pts_z.insert(len(init_pts_z.columns),'nb_pts',nb_tj) solutions_x = panda.DataFrame(t,columns=['t'],dtype=np.float64) solutions_y = panda.DataFrame(t,columns=['t'],dtype=np.float64) solutions_z = panda.DataFrame(t,columns=['t'],dtype=np.float64) # write for k in range(nb_iter): X0_k = init_pts[k,:] init_pts_x.insert(len(init_pts_x.columns),'k='+str(k),X0_k[:,0]) init_pts_y.insert(len(init_pts_y.columns),'k='+str(k),X0_k[:,1]) init_pts_z.insert(len(init_pts_z.columns),'k='+str(k),X0_k[:,2]) sol_k = solution[k,:] sol_k = sol_k.reshape((-1,reshape_size)) solutions_x.insert(len(solutions_x.columns),'k='+str(k),sol_k[:,0]) solutions_y.insert(len(solutions_y.columns),'k='+str(k),sol_k[:,1]) solutions_z.insert(len(solutions_z.columns),'k='+str(k),sol_k[:,2]) # to convert dataframe to csv files init_pts_x.to_csv('data_parareal/init_pt_x.csv') init_pts_y.to_csv('data_parareal/init_pt_y.csv') init_pts_z.to_csv('data_parareal/init_pt_z.csv') solutions_x.to_csv('data_parareal/solx.csv') solutions_y.to_csv('data_parareal/soly.csv') solutions_z.to_csv('data_parareal/solz.csv') # with RK4 sol = fct_res(init_pts[0,:][0],dt_F,t0,T,fct,gamma) sol_rk4 = panda.DataFrame(t,columns=['t'],dtype=np.float64) sol_rk4.insert(len(sol_rk4.columns),'x',sol[:,0]) sol_rk4.insert(len(sol_rk4.columns),'y',sol[:,1]) sol_rk4.insert(len(sol_rk4.columns),'z',sol[:,2]) sol_rk4.to_csv('data_parareal/sol_rk4.csv') print("Fichiers csv créés")
[docs]def csv_files_oscillator(t0,T,dt_F,times,nb_tj,nb_iter,solution,init_pts, reshape_size,fct,fct_res,gamma=None): """Create and write in the csv files for the Lorenz system. Args: t0 (float): Starting time. T (float): Finish time. dt_F (float): Time step. times (list): Times used to the system resolution. nb_tj (numpy.ndarray): Number of value of the solution for each process. nb_iter (int): Number of iteration (until the solution converge). solution (numpy.ndarray): Solution for each iteration. init_pts (numpy.ndarray): Init points for each iteration. reshape_size (int): Dimension of the system. fct (function): Function which represent the ODE. fct_res (function): Function which represent the integrator. gamma (tuple, optional): System parameters. Defaults to None. """ # time between t0 and T for the fine integrator ## t = np.arange(t0,T+1e-6,dt_F) # init init_pts_x = panda.DataFrame(times[:-1],columns=['t'],dtype=np.float64) init_pts_x.insert(len(init_pts_x.columns),'nb_pts',nb_tj) solutions_x = panda.DataFrame(t,columns=['t'],dtype=np.float64) # write for k in range(nb_iter): X0_k = init_pts[k,:] init_pts_x.insert(len(init_pts_x.columns),'k='+str(k),X0_k[:,0]) sol_k = solution[k,:] sol_k = sol_k.reshape((-1,reshape_size)) solutions_x.insert(len(solutions_x.columns),'k='+str(k),sol_k[:,0]) # to convert dataframe to csv files init_pts_x.to_csv('data_parareal/init_pt_oscillator.csv') solutions_x.to_csv('data_parareal/solx_oscillator.csv') # with exact solution sol = fct_res(t,gamma) sol_exacte = panda.DataFrame(t,columns=['t'],dtype=np.float64) sol_exacte.insert(len(sol_exacte.columns),'x',sol) sol_exacte.to_csv('data_parareal/sol_exacte_oscillator.csv') print("Fichiers csv créés")
[docs]def RK4(X0,dt,t0,T,fct,gamma=None): """Runge Kutta order 4 method. Args: X0 (list): Initial point of the system. dt (float): Time step. t0 (float): Starting time. T (float): Finish time. fct (function): Function which represent the ODE. gamma (tuple, optional): System parameters. Defaults to None. Returns: numpy.ndarray: Solution calculated by the method. """ X=np.array([X0]) t=t0 #=t_0 while((t+dt)<=T or (np.isclose(t+dt,T))): K1=fct(t, X[-1],gamma) K2=fct(t+dt/2., X[-1] + 1./2. * K1 * dt,gamma) K3=fct(t+dt/2., X[-1] + 1./2. * K2 * dt,gamma) K4=fct(t+dt, X[-1]+ K3 * dt,gamma) X=np.append(X,[X[-1]+ dt/6.* (K1+2.*K2+2.*K3+K4)],axis=0) t+=dt return X
[docs]def erreur(solx,solx_exacte): """Compute the error between two solutions. Args: solx (numpy.ndarray): Calculated solution. solx_exacte (numpy.ndarray): Exact solution. Returns: float: The error between the exact and calculated solutions. """ return np.max(np.abs(solx-solx_exacte))
[docs]def E_j_k(j,k,nb_pts,solx,solx_exacte,x0): """Compute the value of the convergence property for j and k. Args: j (int): Processus (between 0 and P-1). k (int): Iteration (between 0 and nb_iter-1) nb_pts (list): Number of value of the solution for each process. solx (numpy.ndarray): Calculated solution for each iteration. solx_exacte (numpy.ndarray): Exact solution. x0 (numpy.ndarray): Initial point for each iteration. Returns: tuple: Two terms of the convergence property. """ suite_nb = [-1] for i in range(len(nb_pts)): suite_nb.append(suite_nb[-1]+nb_pts[i]) suite_nb[0] = 0 suite_nb[-1] += 1 nb1=suite_nb[j] nb2=suite_nb[j+1] sol_k = solx[:,k] sol_k_j = sol_k[nb1:nb2] sol_ex_k = solx_exacte[nb1:nb2] err = erreur(sol_k_j,sol_ex_k) diff = np.abs(x0[j,k]-sol_ex_k[0]) return (diff,err)
[docs]def cvg(t_ex,solx_exacte,t,solx,nb_iter,times,nb_pts,x0): """Calculate for each process and each iteration the value of the convergence property. Plot the value maximum on process for each iteration and the coarse time step power k (the iteration). Args: t_ex (list): Discretisation time between t0 and T for the exact solution. Can be different of the one used for the method (if the time step is different). solx_exacte (numpy.ndarray): Exact solution. t (list): Discretisation time between t0 and T for the calculated solution. solx (numpy.ndarray): Calculated solution for each iteration. nb_iter (int): Number of iteration (until the solution converge). times (list): Times used to the system resolution. nb_pts (list): Number of value of the solution for each process. x0 (numpy.ndarray): Initial point for each iteration. """ nb_proc = len(nb_pts) dt_G=0.01 diff = [] err = [] for k in range(nb_iter): diff_k = [] err_k = [] for j in range(nb_proc): diff_,err_=E_j_k(j,k,nb_pts,solx,solx_exacte,x0) diff_k.append(diff_) err_k.append(err_) diff.append(diff_k) err.append(err_k) diff = np.array(diff) err = np.array(err) delta_t = [] for k in range(nb_iter): delta_t.append(dt_G**k) tab_k=np.arange(0,np.shape(diff)[0],1) plt.semilogy(tab_k,np.max(diff+err,axis=1),label="E_j_k") plt.semilogy(tab_k,delta_t,label="delta_t^k") plt.xticks(tab_k,tab_k) plt.title("Convergence based on iterations") plt.legend() plt.show()