]> SALOME platform Git repositories - modules/adao.git/blob - src/daComposant/daAlgorithms/Atoms/ecw2ukf.py
Salome HOME
Documentation corrections and update
[modules/adao.git] / src / daComposant / daAlgorithms / Atoms / ecw2ukf.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2008-2024 EDF R&D
4 #
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
9 #
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 # Lesser General Public License for more details.
14 #
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18 #
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
20 #
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
22
23 __doc__ = """
24     Constrained Unscented Kalman Filter
25 """
26 __author__ = "Jean-Philippe ARGAUD"
27
28 import numpy, scipy, copy
29 from daCore.NumericObjects import GenerateWeightsAndSigmaPoints
30 from daCore.PlatformInfo import PlatformInfo, vfloat
31 from daCore.NumericObjects import ApplyBounds, ForceNumericBounds
32 mpr = PlatformInfo().MachinePrecision()
33
34 # ==============================================================================
35 def ecw2ukf(selfA, Xb, Y, U, HO, EM, CM, R, B, Q, VariantM="UKF"):
36     """
37     Constrained Unscented Kalman Filter
38     """
39     if selfA._parameters["EstimationOf"] == "Parameters":
40         selfA._parameters["StoreInternalVariables"] = True
41     selfA._parameters["Bounds"] = ForceNumericBounds( selfA._parameters["Bounds"] )
42     #
43     wsp = GenerateWeightsAndSigmaPoints(
44         Nn       = Xb.size,
45         EO       = selfA._parameters["EstimationOf"],
46         VariantM = VariantM,
47         Alpha    = selfA._parameters["Alpha"],
48         Beta     = selfA._parameters["Beta"],
49         Kappa    = selfA._parameters["Kappa"],
50     )
51     Wm, Wc, SC = wsp.get()
52     #
53     # Durée d'observation et tailles
54     if hasattr(Y, "stepnumber"):
55         duration = Y.stepnumber()
56         __p = numpy.cumprod(Y.shape())[-1]
57     else:
58         duration = 2
59         __p = numpy.size(Y)
60     #
61     # Précalcul des inversions de B et R
62     if selfA._parameters["StoreInternalVariables"] \
63             or selfA._toStore("CostFunctionJ") \
64             or selfA._toStore("CostFunctionJb") \
65             or selfA._toStore("CostFunctionJo") \
66             or selfA._toStore("CurrentOptimum") \
67             or selfA._toStore("APosterioriCovariance"):
68         BI = B.getI()
69         RI = R.getI()
70     #
71     __n = Xb.size
72     nbPreviousSteps  = len(selfA.StoredVariables["Analysis"])
73     #
74     if len(selfA.StoredVariables["Analysis"]) == 0 or not selfA._parameters["nextStep"]:
75         Xn = Xb
76         if hasattr(B, "asfullmatrix"):
77             Pn = B.asfullmatrix(__n)
78         else:
79             Pn = B
80         selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
81         selfA.StoredVariables["Analysis"].store( Xb )
82         if selfA._toStore("APosterioriCovariance"):
83             selfA.StoredVariables["APosterioriCovariance"].store( Pn )
84     elif selfA._parameters["nextStep"]:
85         Xn = selfA._getInternalState("Xn")
86         Pn = selfA._getInternalState("Pn")
87     #
88     if selfA._parameters["EstimationOf"] == "Parameters":
89         XaMin            = Xn
90         previousJMinimum = numpy.finfo(float).max
91     #
92     for step in range(duration - 1):
93         #
94         if U is not None:
95             if hasattr(U, "store") and len(U) > 1:
96                 Un = numpy.ravel( U[step] ).reshape((-1, 1))
97             elif hasattr(U, "store") and len(U) == 1:
98                 Un = numpy.ravel( U[0] ).reshape((-1, 1))
99             else:
100                 Un = numpy.ravel( U ).reshape((-1, 1))
101         else:
102             Un = None
103         #
104         if CM is not None and "Tangent" in CM and U is not None:
105             Cm = CM["Tangent"].asMatrix(Xn)
106         else:
107             Cm = None
108         #
109         Pndemi = numpy.real(scipy.linalg.sqrtm(Pn))
110         Xnmu = Xn + Pndemi @ SC
111         nbSpts = SC.shape[1]
112         #
113         if selfA._parameters["Bounds"] is not None and selfA._parameters["ConstrainedBy"] == "EstimateProjection":
114             for point in range(nbSpts):
115                 Xnmu[:, point] = ApplyBounds( Xnmu[:, point], selfA._parameters["Bounds"] )
116         #
117         if selfA._parameters["EstimationOf"] == "State":
118             Mm = EM["Direct"].appliedControledFormTo
119             XEnnmu = Mm( [(Xnmu[:, point].reshape((-1, 1)), Un) for point in range(nbSpts)],
120                          argsAsSerie = True,
121                          returnSerieAsArrayMatrix = True )
122             if Cm is not None and Un is not None:  # Attention : si Cm est aussi dans M, doublon !
123                 Cm = Cm.reshape(__n, Un.size)  # ADAO & check shape
124                 XEnnmu = XEnnmu + Cm @ Un
125         elif selfA._parameters["EstimationOf"] == "Parameters":
126             # --- > Par principe, M = Id, Q = 0
127             XEnnmu = numpy.array( Xnmu )
128         #
129         Xhmn = ( XEnnmu * Wm ).sum(axis=1)
130         #
131         if selfA._parameters["Bounds"] is not None and selfA._parameters["ConstrainedBy"] == "EstimateProjection":
132             Xhmn = ApplyBounds( Xhmn, selfA._parameters["Bounds"] )
133         #
134         if selfA._parameters["EstimationOf"] == "State":
135             Pmn = copy.copy(Q)
136         elif selfA._parameters["EstimationOf"] == "Parameters":
137             Pmn = 0.
138         for point in range(nbSpts):
139             dXEnnmuXhmn = XEnnmu[:, point].flat - Xhmn
140             Pmn += Wc[point] * numpy.outer(dXEnnmuXhmn, dXEnnmuXhmn)
141         #
142         if selfA._parameters["EstimationOf"] == "Parameters" and selfA._parameters["Bounds"] is not None:
143             Pmndemi = selfA._parameters["Reconditioner"] * numpy.real(scipy.linalg.sqrtm(Pmn))
144         else:
145             Pmndemi = numpy.real(scipy.linalg.sqrtm(Pmn))
146         #
147         Xnnmu = Xhmn.reshape((-1, 1)) + Pmndemi @ SC
148         #
149         if selfA._parameters["Bounds"] is not None and selfA._parameters["ConstrainedBy"] == "EstimateProjection":
150             for point in range(nbSpts):
151                 Xnnmu[:, point] = ApplyBounds( Xnnmu[:, point], selfA._parameters["Bounds"] )
152         #
153         Hm = HO["Direct"].appliedControledFormTo
154         Ynnmu = Hm( [(Xnnmu[:, point], None) for point in range(nbSpts)],
155                     argsAsSerie = True,
156                     returnSerieAsArrayMatrix = True )
157         #
158         Yhmn = ( Ynnmu * Wm ).sum(axis=1)
159         #
160         Pyyn = copy.copy(R)
161         Pxyn = 0.
162         for point in range(nbSpts):
163             dYnnmuYhmn = Ynnmu[:, point].flat - Yhmn
164             dXnnmuXhmn = Xnnmu[:, point].flat - Xhmn
165             Pyyn += Wc[point] * numpy.outer(dYnnmuYhmn, dYnnmuYhmn)
166             Pxyn += Wc[point] * numpy.outer(dXnnmuXhmn, dYnnmuYhmn)
167         #
168         if hasattr(Y, "store"):
169             Ynpu = numpy.ravel( Y[step + 1] ).reshape((__p, 1))
170         else:
171             Ynpu = numpy.ravel( Y ).reshape((__p, 1))
172         _Innovation  = Ynpu - Yhmn.reshape((-1, 1))
173         if selfA._parameters["EstimationOf"] == "Parameters":
174             if Cm is not None and Un is not None:  # Attention : si Cm est aussi dans H, doublon !
175                 _Innovation = _Innovation - Cm @ Un
176         #
177         Kn = Pxyn @ scipy.linalg.inv(Pyyn)
178         Xn = Xhmn.reshape((-1, 1)) + Kn @ _Innovation
179         Pn = Pmn - Kn @ (Pyyn @ Kn.T)
180         #
181         if selfA._parameters["Bounds"] is not None and selfA._parameters["ConstrainedBy"] == "EstimateProjection":
182             Xn = ApplyBounds( Xn, selfA._parameters["Bounds"] )
183         #
184         Xa = Xn  # Pointeurs
185         # --------------------------
186         selfA._setInternalState("Xn", Xn)
187         selfA._setInternalState("Pn", Pn)
188         # --------------------------
189         #
190         selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
191         # ---> avec analysis
192         selfA.StoredVariables["Analysis"].store( Xa )
193         if selfA._toStore("SimulatedObservationAtCurrentAnalysis"):
194             selfA.StoredVariables["SimulatedObservationAtCurrentAnalysis"].store( Hm((Xa, None)) )
195         if selfA._toStore("InnovationAtCurrentAnalysis"):
196             selfA.StoredVariables["InnovationAtCurrentAnalysis"].store( _Innovation )
197         # ---> avec current state
198         if selfA._parameters["StoreInternalVariables"] \
199                 or selfA._toStore("CurrentState"):
200             selfA.StoredVariables["CurrentState"].store( Xn )
201         if selfA._toStore("ForecastState"):
202             selfA.StoredVariables["ForecastState"].store( Xhmn )
203         if selfA._toStore("ForecastCovariance"):
204             selfA.StoredVariables["ForecastCovariance"].store( Pmn )
205         if selfA._toStore("BMA"):
206             selfA.StoredVariables["BMA"].store( Xhmn - Xa )
207         if selfA._toStore("InnovationAtCurrentState"):
208             selfA.StoredVariables["InnovationAtCurrentState"].store( _Innovation )
209         if selfA._toStore("SimulatedObservationAtCurrentState") \
210                 or selfA._toStore("SimulatedObservationAtCurrentOptimum"):
211             selfA.StoredVariables["SimulatedObservationAtCurrentState"].store( Yhmn )
212         # ---> autres
213         if selfA._parameters["StoreInternalVariables"] \
214                 or selfA._toStore("CostFunctionJ") \
215                 or selfA._toStore("CostFunctionJb") \
216                 or selfA._toStore("CostFunctionJo") \
217                 or selfA._toStore("CurrentOptimum") \
218                 or selfA._toStore("APosterioriCovariance"):
219             Jb  = vfloat( 0.5 * (Xa - Xb).T * (BI * (Xa - Xb)) )
220             Jo  = vfloat( 0.5 * _Innovation.T * (RI * _Innovation) )
221             J   = Jb + Jo
222             selfA.StoredVariables["CostFunctionJb"].store( Jb )
223             selfA.StoredVariables["CostFunctionJo"].store( Jo )
224             selfA.StoredVariables["CostFunctionJ" ].store( J )
225             #
226             if selfA._toStore("IndexOfOptimum") \
227                     or selfA._toStore("CurrentOptimum") \
228                     or selfA._toStore("CostFunctionJAtCurrentOptimum") \
229                     or selfA._toStore("CostFunctionJbAtCurrentOptimum") \
230                     or selfA._toStore("CostFunctionJoAtCurrentOptimum") \
231                     or selfA._toStore("SimulatedObservationAtCurrentOptimum"):
232                 IndexMin = numpy.argmin( selfA.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
233             if selfA._toStore("IndexOfOptimum"):
234                 selfA.StoredVariables["IndexOfOptimum"].store( IndexMin )
235             if selfA._toStore("CurrentOptimum"):
236                 selfA.StoredVariables["CurrentOptimum"].store( selfA.StoredVariables["Analysis"][IndexMin] )
237             if selfA._toStore("SimulatedObservationAtCurrentOptimum"):
238                 selfA.StoredVariables["SimulatedObservationAtCurrentOptimum"].store( selfA.StoredVariables["SimulatedObservationAtCurrentAnalysis"][IndexMin] )  # noqa: E501
239             if selfA._toStore("CostFunctionJbAtCurrentOptimum"):
240                 selfA.StoredVariables["CostFunctionJbAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJb"][IndexMin] )  # noqa: E501
241             if selfA._toStore("CostFunctionJoAtCurrentOptimum"):
242                 selfA.StoredVariables["CostFunctionJoAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJo"][IndexMin] )  # noqa: E501
243             if selfA._toStore("CostFunctionJAtCurrentOptimum"):
244                 selfA.StoredVariables["CostFunctionJAtCurrentOptimum" ].store( selfA.StoredVariables["CostFunctionJ" ][IndexMin] )  # noqa: E501
245         if selfA._toStore("APosterioriCovariance"):
246             selfA.StoredVariables["APosterioriCovariance"].store( Pn )
247         if selfA._parameters["EstimationOf"] == "Parameters" \
248                 and J < previousJMinimum:
249             previousJMinimum    = J
250             XaMin               = Xa
251             if selfA._toStore("APosterioriCovariance"):
252                 covarianceXaMin = selfA.StoredVariables["APosterioriCovariance"][-1]
253     #
254     # Stockage final supplémentaire de l'optimum en estimation de paramètres
255     # ----------------------------------------------------------------------
256     if selfA._parameters["EstimationOf"] == "Parameters":
257         selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
258         selfA.StoredVariables["Analysis"].store( XaMin )
259         if selfA._toStore("APosterioriCovariance"):
260             selfA.StoredVariables["APosterioriCovariance"].store( covarianceXaMin )
261         if selfA._toStore("BMA"):
262             selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(XaMin) )
263     #
264     return 0
265
266 # ==============================================================================
267 if __name__ == "__main__":
268     print('\n AUTODIAGNOSTIC\n')