Salome HOME
465168d076944f07e7505b78591e0970f5aaecb5
[modules/adao.git] / src / daComposant / daAlgorithms / Atoms / ecwukf.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2008-2024 EDF R&D
4 #
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
9 #
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18 #
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
22
23 __doc__ = """
24     Unscented Kalman Filter
25 """
26 __author__ = "Jean-Philippe ARGAUD"
27
28 import numpy, scipy, copy
29 from daCore.NumericObjects import GenerateWeightsAndSigmaPoints
30 from daCore.PlatformInfo import PlatformInfo, vfloat
31 mpr = PlatformInfo().MachinePrecision()
32
33 # ==============================================================================
34 def ecwukf(selfA, Xb, Y, U, HO, EM, CM, R, B, Q, VariantM="UKF"):
35     """
36     Unscented Kalman Filter
37     """
38     if selfA._parameters["EstimationOf"] == "Parameters":
39         selfA._parameters["StoreInternalVariables"] = True
40     #
41     wsp = GenerateWeightsAndSigmaPoints(
42         Nn       = Xb.size,
43         EO       = selfA._parameters["EstimationOf"],
44         VariantM = VariantM,
45         Alpha    = selfA._parameters["Alpha"],
46         Beta     = selfA._parameters["Beta"],
47         Kappa    = selfA._parameters["Kappa"],
48     )
49     Wm, Wc, SC = wsp.get()
50     #
51     # Durée d'observation et tailles
52     if hasattr(Y, "stepnumber"):
53         duration = Y.stepnumber()
54         __p = numpy.cumprod(Y.shape())[-1]
55     else:
56         duration = 2
57         __p = numpy.size(Y)
58     #
59     # Précalcul des inversions de B et R
60     if selfA._parameters["StoreInternalVariables"] \
61             or selfA._toStore("CostFunctionJ") \
62             or selfA._toStore("CostFunctionJb") \
63             or selfA._toStore("CostFunctionJo") \
64             or selfA._toStore("CurrentOptimum") \
65             or selfA._toStore("APosterioriCovariance"):
66         BI = B.getI()
67         RI = R.getI()
68     #
69     __n = Xb.size
70     nbPreviousSteps  = len(selfA.StoredVariables["Analysis"])
71     #
72     if len(selfA.StoredVariables["Analysis"]) == 0 or not selfA._parameters["nextStep"]:
73         Xn = Xb
74         if hasattr(B, "asfullmatrix"):
75             Pn = B.asfullmatrix(__n)
76         else:
77             Pn = B
78         selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
79         selfA.StoredVariables["Analysis"].store( Xb )
80         if selfA._toStore("APosterioriCovariance"):
81             selfA.StoredVariables["APosterioriCovariance"].store( Pn )
82     elif selfA._parameters["nextStep"]:
83         Xn = selfA._getInternalState("Xn")
84         Pn = selfA._getInternalState("Pn")
85     #
86     if selfA._parameters["EstimationOf"] == "Parameters":
87         XaMin            = Xn
88         previousJMinimum = numpy.finfo(float).max
89     #
90     for step in range(duration - 1):
91         #
92         if U is not None:
93             if hasattr(U, "store") and len(U) > 1:
94                 Un = numpy.ravel( U[step] ).reshape((-1, 1))
95             elif hasattr(U, "store") and len(U) == 1:
96                 Un = numpy.ravel( U[0] ).reshape((-1, 1))
97             else:
98                 Un = numpy.ravel( U ).reshape((-1, 1))
99         else:
100             Un = None
101         #
102         Hm = HO["Direct"].appliedControledFormTo
103         if selfA._parameters["EstimationOf"] == "State":
104             Mm = EM["Direct"].appliedControledFormTo
105         if CM is not None and "Tangent" in CM and U is not None:
106             Cm = CM["Tangent"].asMatrix(Xn)
107         else:
108             Cm = None
109         #
110         Pndemi = numpy.real(scipy.linalg.sqrtm(Pn))
111         Xnmu = Xn + Pndemi @ SC
112         nbSpts = SC.shape[1]
113         #
114         if selfA._parameters["EstimationOf"] == "State":
115             XEnnmu = Mm( [(Xnmu[:, point].reshape((-1, 1)), Un) for point in range(nbSpts)],
116                          argsAsSerie = True,
117                          returnSerieAsArrayMatrix = True )
118             if Cm is not None and Un is not None:  # Attention : si Cm est aussi dans M, doublon !
119                 Cm = Cm.reshape(__n, Un.size)  # ADAO & check shape
120                 XEnnmu = XEnnmu + Cm @ Un
121         elif selfA._parameters["EstimationOf"] == "Parameters":
122             # --- > Par principe, M = Id, Q = 0
123             XEnnmu = numpy.array( Xnmu )
124         #
125         Xhmn = ( XEnnmu * Wm ).sum(axis=1)
126         #
127         if selfA._parameters["EstimationOf"] == "State":
128             Pmn = copy.copy(Q)
129         elif selfA._parameters["EstimationOf"] == "Parameters":
130             Pmn = 0.
131         for point in range(nbSpts):
132             dXEnnmuXhmn = XEnnmu[:, point].flat - Xhmn
133             Pmn += Wc[point] * numpy.outer(dXEnnmuXhmn, dXEnnmuXhmn)
134         #
135         Pmndemi = numpy.real(scipy.linalg.sqrtm(Pmn))
136         Xnnmu = Xhmn.reshape((-1, 1)) + Pmndemi @ SC
137         #
138         Ynnmu = Hm( [(Xnnmu[:, point], None) for point in range(nbSpts)],
139                     argsAsSerie = True,
140                     returnSerieAsArrayMatrix = True )
141         #
142         Yhmn = ( Ynnmu * Wm ).sum(axis=1)
143         #
144         Pyyn = copy.copy(R)
145         Pxyn = 0.
146         for point in range(nbSpts):
147             dYnnmuYhmn = Ynnmu[:, point].flat - Yhmn
148             dXnnmuXhmn = Xnnmu[:, point].flat - Xhmn
149             Pyyn += Wc[point] * numpy.outer(dYnnmuYhmn, dYnnmuYhmn)
150             Pxyn += Wc[point] * numpy.outer(dXnnmuXhmn, dYnnmuYhmn)
151         #
152         if hasattr(Y, "store"):
153             Ynpu = numpy.ravel( Y[step + 1] ).reshape((__p, 1))
154         else:
155             Ynpu = numpy.ravel( Y ).reshape((__p, 1))
156         _Innovation  = Ynpu - Yhmn.reshape((-1, 1))
157         if selfA._parameters["EstimationOf"] == "Parameters":
158             if Cm is not None and Un is not None:  # Attention : si Cm est aussi dans H, doublon !
159                 _Innovation = _Innovation - Cm @ Un
160         #
161         Kn = Pxyn @ scipy.linalg.inv(Pyyn)
162         Xn = Xhmn.reshape((-1, 1)) + Kn @ _Innovation
163         Pn = Pmn - Kn @ (Pyyn @ Kn.T)
164         #
165         Xa = Xn  # Pointeurs
166         # --------------------------
167         selfA._setInternalState("Xn", Xn)
168         selfA._setInternalState("Pn", Pn)
169         # --------------------------
170         #
171         selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
172         # ---> avec analysis
173         selfA.StoredVariables["Analysis"].store( Xa )
174         if selfA._toStore("SimulatedObservationAtCurrentAnalysis"):
175             selfA.StoredVariables["SimulatedObservationAtCurrentAnalysis"].store( Hm((Xa, None)) )
176         if selfA._toStore("InnovationAtCurrentAnalysis"):
177             selfA.StoredVariables["InnovationAtCurrentAnalysis"].store( _Innovation )
178         # ---> avec current state
179         if selfA._parameters["StoreInternalVariables"] \
180                 or selfA._toStore("CurrentState"):
181             selfA.StoredVariables["CurrentState"].store( Xn )
182         if selfA._toStore("ForecastState"):
183             selfA.StoredVariables["ForecastState"].store( Xhmn )
184         if selfA._toStore("ForecastCovariance"):
185             selfA.StoredVariables["ForecastCovariance"].store( Pmn )
186         if selfA._toStore("BMA"):
187             selfA.StoredVariables["BMA"].store( Xhmn - Xa )
188         if selfA._toStore("InnovationAtCurrentState"):
189             selfA.StoredVariables["InnovationAtCurrentState"].store( _Innovation )
190         if selfA._toStore("SimulatedObservationAtCurrentState") \
191                 or selfA._toStore("SimulatedObservationAtCurrentOptimum"):
192             selfA.StoredVariables["SimulatedObservationAtCurrentState"].store( Yhmn )
193         # ---> autres
194         if selfA._parameters["StoreInternalVariables"] \
195                 or selfA._toStore("CostFunctionJ") \
196                 or selfA._toStore("CostFunctionJb") \
197                 or selfA._toStore("CostFunctionJo") \
198                 or selfA._toStore("CurrentOptimum") \
199                 or selfA._toStore("APosterioriCovariance"):
200             Jb  = vfloat( 0.5 * (Xa - Xb).T * (BI * (Xa - Xb)) )
201             Jo  = vfloat( 0.5 * _Innovation.T * (RI * _Innovation) )
202             J   = Jb + Jo
203             selfA.StoredVariables["CostFunctionJb"].store( Jb )
204             selfA.StoredVariables["CostFunctionJo"].store( Jo )
205             selfA.StoredVariables["CostFunctionJ" ].store( J )
206             #
207             if selfA._toStore("IndexOfOptimum") \
208                     or selfA._toStore("CurrentOptimum") \
209                     or selfA._toStore("CostFunctionJAtCurrentOptimum") \
210                     or selfA._toStore("CostFunctionJbAtCurrentOptimum") \
211                     or selfA._toStore("CostFunctionJoAtCurrentOptimum") \
212                     or selfA._toStore("SimulatedObservationAtCurrentOptimum"):
213                 IndexMin = numpy.argmin( selfA.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
214             if selfA._toStore("IndexOfOptimum"):
215                 selfA.StoredVariables["IndexOfOptimum"].store( IndexMin )
216             if selfA._toStore("CurrentOptimum"):
217                 selfA.StoredVariables["CurrentOptimum"].store( selfA.StoredVariables["Analysis"][IndexMin] )
218             if selfA._toStore("SimulatedObservationAtCurrentOptimum"):
219                 selfA.StoredVariables["SimulatedObservationAtCurrentOptimum"].store( selfA.StoredVariables["SimulatedObservationAtCurrentAnalysis"][IndexMin] )  # noqa: E501
220             if selfA._toStore("CostFunctionJbAtCurrentOptimum"):
221                 selfA.StoredVariables["CostFunctionJbAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJb"][IndexMin] )  # noqa: E501
222             if selfA._toStore("CostFunctionJoAtCurrentOptimum"):
223                 selfA.StoredVariables["CostFunctionJoAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJo"][IndexMin] )  # noqa: E501
224             if selfA._toStore("CostFunctionJAtCurrentOptimum"):
225                 selfA.StoredVariables["CostFunctionJAtCurrentOptimum" ].store( selfA.StoredVariables["CostFunctionJ" ][IndexMin] )  # noqa: E501
226         if selfA._toStore("APosterioriCovariance"):
227             selfA.StoredVariables["APosterioriCovariance"].store( Pn )
228         if selfA._parameters["EstimationOf"] == "Parameters" \
229                 and J < previousJMinimum:
230             previousJMinimum    = J
231             XaMin               = Xa
232             if selfA._toStore("APosterioriCovariance"):
233                 covarianceXaMin = selfA.StoredVariables["APosterioriCovariance"][-1]
234     #
235     # Stockage final supplémentaire de l'optimum en estimation de paramètres
236     # ----------------------------------------------------------------------
237     if selfA._parameters["EstimationOf"] == "Parameters":
238         selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
239         selfA.StoredVariables["Analysis"].store( XaMin )
240         if selfA._toStore("APosterioriCovariance"):
241             selfA.StoredVariables["APosterioriCovariance"].store( covarianceXaMin )
242         if selfA._toStore("BMA"):
243             selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(XaMin) )
244     #
245     return 0
246
247 # ==============================================================================
248 if __name__ == "__main__":
249     print('\n AUTODIAGNOSTIC\n')