1 # -*- coding: utf-8 -*-
3 # Copyright (C) 2008-2024 EDF R&D
5 # This library is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU Lesser General Public
7 # License as published by the Free Software Foundation; either
8 # version 2.1 of the License.
10 # This library is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 # Lesser General Public License for more details.
15 # You should have received a copy of the GNU Lesser General Public
16 # License along with this library; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 # See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
21 # Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
24 Constrained Unscented Kalman Filter
26 __author__ = "Jean-Philippe ARGAUD"
28 import numpy, scipy, copy
29 from daCore.NumericObjects import GenerateWeightsAndSigmaPoints
30 from daCore.PlatformInfo import PlatformInfo, vfloat
31 from daCore.NumericObjects import ApplyBounds, ForceNumericBounds
32 mpr = PlatformInfo().MachinePrecision()
34 # ==============================================================================
35 def ecw2ukf(selfA, Xb, Y, U, HO, EM, CM, R, B, Q, VariantM="UKF"):
37 Constrained Unscented Kalman Filter
39 if selfA._parameters["EstimationOf"] == "Parameters":
40 selfA._parameters["StoreInternalVariables"] = True
41 selfA._parameters["Bounds"] = ForceNumericBounds( selfA._parameters["Bounds"] )
43 wsp = GenerateWeightsAndSigmaPoints(
45 EO = selfA._parameters["EstimationOf"],
47 Alpha = selfA._parameters["Alpha"],
48 Beta = selfA._parameters["Beta"],
49 Kappa = selfA._parameters["Kappa"],
51 Wm, Wc, SC = wsp.get()
53 # Durée d'observation et tailles
54 if hasattr(Y, "stepnumber"):
55 duration = Y.stepnumber()
56 __p = numpy.cumprod(Y.shape())[-1]
61 # Précalcul des inversions de B et R
62 if selfA._parameters["StoreInternalVariables"] \
63 or selfA._toStore("CostFunctionJ") \
64 or selfA._toStore("CostFunctionJb") \
65 or selfA._toStore("CostFunctionJo") \
66 or selfA._toStore("CurrentOptimum") \
67 or selfA._toStore("APosterioriCovariance"):
72 nbPreviousSteps = len(selfA.StoredVariables["Analysis"])
74 if len(selfA.StoredVariables["Analysis"]) == 0 or not selfA._parameters["nextStep"]:
76 if hasattr(B, "asfullmatrix"):
77 Pn = B.asfullmatrix(__n)
80 selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
81 selfA.StoredVariables["Analysis"].store( Xb )
82 if selfA._toStore("APosterioriCovariance"):
83 selfA.StoredVariables["APosterioriCovariance"].store( Pn )
84 elif selfA._parameters["nextStep"]:
85 Xn = selfA._getInternalState("Xn")
86 Pn = selfA._getInternalState("Pn")
88 if selfA._parameters["EstimationOf"] == "Parameters":
90 previousJMinimum = numpy.finfo(float).max
92 for step in range(duration - 1):
95 if hasattr(U, "store") and len(U) > 1:
96 Un = numpy.ravel( U[step] ).reshape((-1, 1))
97 elif hasattr(U, "store") and len(U) == 1:
98 Un = numpy.ravel( U[0] ).reshape((-1, 1))
100 Un = numpy.ravel( U ).reshape((-1, 1))
104 Hm = HO["Direct"].appliedControledFormTo
105 if selfA._parameters["EstimationOf"] == "State":
106 Mm = EM["Direct"].appliedControledFormTo
107 if CM is not None and "Tangent" in CM and U is not None:
108 Cm = CM["Tangent"].asMatrix(Xn)
112 Pndemi = numpy.real(scipy.linalg.sqrtm(Pn))
113 Xnmu = Xn + Pndemi @ SC
116 if selfA._parameters["Bounds"] is not None and selfA._parameters["ConstrainedBy"] == "EstimateProjection":
117 for point in range(nbSpts):
118 Xnmu[:, point] = ApplyBounds( Xnmu[:, point], selfA._parameters["Bounds"] )
120 if selfA._parameters["EstimationOf"] == "State":
121 XEnnmu = Mm( [(Xnmu[:, point].reshape((-1, 1)), Un) for point in range(nbSpts)],
123 returnSerieAsArrayMatrix = True )
124 if Cm is not None and Un is not None: # Attention : si Cm est aussi dans M, doublon !
125 Cm = Cm.reshape(__n, Un.size) # ADAO & check shape
126 XEnnmu = XEnnmu + Cm @ Un
127 elif selfA._parameters["EstimationOf"] == "Parameters":
128 # --- > Par principe, M = Id, Q = 0
129 XEnnmu = numpy.array( Xnmu )
131 Xhmn = ( XEnnmu * Wm ).sum(axis=1)
133 if selfA._parameters["Bounds"] is not None and selfA._parameters["ConstrainedBy"] == "EstimateProjection":
134 Xhmn = ApplyBounds( Xhmn, selfA._parameters["Bounds"] )
136 if selfA._parameters["EstimationOf"] == "State":
138 elif selfA._parameters["EstimationOf"] == "Parameters":
140 for point in range(nbSpts):
141 dXEnnmuXhmn = XEnnmu[:, point].flat - Xhmn
142 Pmn += Wc[point] * numpy.outer(dXEnnmuXhmn, dXEnnmuXhmn)
144 if selfA._parameters["EstimationOf"] == "Parameters" and selfA._parameters["Bounds"] is not None:
145 Pmndemi = selfA._parameters["Reconditioner"] * numpy.real(scipy.linalg.sqrtm(Pmn))
147 Pmndemi = numpy.real(scipy.linalg.sqrtm(Pmn))
149 Xnnmu = Xhmn.reshape((-1, 1)) + Pmndemi @ SC
151 if selfA._parameters["Bounds"] is not None and selfA._parameters["ConstrainedBy"] == "EstimateProjection":
152 for point in range(nbSpts):
153 Xnnmu[:, point] = ApplyBounds( Xnnmu[:, point], selfA._parameters["Bounds"] )
155 Ynnmu = Hm( [(Xnnmu[:, point], None) for point in range(nbSpts)],
157 returnSerieAsArrayMatrix = True )
159 Yhmn = ( Ynnmu * Wm ).sum(axis=1)
163 for point in range(nbSpts):
164 dYnnmuYhmn = Ynnmu[:, point].flat - Yhmn
165 dXnnmuXhmn = Xnnmu[:, point].flat - Xhmn
166 Pyyn += Wc[point] * numpy.outer(dYnnmuYhmn, dYnnmuYhmn)
167 Pxyn += Wc[point] * numpy.outer(dXnnmuXhmn, dYnnmuYhmn)
169 if hasattr(Y, "store"):
170 Ynpu = numpy.ravel( Y[step + 1] ).reshape((__p, 1))
172 Ynpu = numpy.ravel( Y ).reshape((__p, 1))
173 _Innovation = Ynpu - Yhmn.reshape((-1, 1))
174 if selfA._parameters["EstimationOf"] == "Parameters":
175 if Cm is not None and Un is not None: # Attention : si Cm est aussi dans H, doublon !
176 _Innovation = _Innovation - Cm @ Un
178 Kn = Pxyn @ scipy.linalg.inv(Pyyn)
179 Xn = Xhmn.reshape((-1, 1)) + Kn @ _Innovation
180 Pn = Pmn - Kn @ (Pyyn @ Kn.T)
182 if selfA._parameters["Bounds"] is not None and selfA._parameters["ConstrainedBy"] == "EstimateProjection":
183 Xn = ApplyBounds( Xn, selfA._parameters["Bounds"] )
186 # --------------------------
187 selfA._setInternalState("Xn", Xn)
188 selfA._setInternalState("Pn", Pn)
189 # --------------------------
191 selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
193 selfA.StoredVariables["Analysis"].store( Xa )
194 if selfA._toStore("SimulatedObservationAtCurrentAnalysis"):
195 selfA.StoredVariables["SimulatedObservationAtCurrentAnalysis"].store( Hm((Xa, None)) )
196 if selfA._toStore("InnovationAtCurrentAnalysis"):
197 selfA.StoredVariables["InnovationAtCurrentAnalysis"].store( _Innovation )
198 # ---> avec current state
199 if selfA._parameters["StoreInternalVariables"] \
200 or selfA._toStore("CurrentState"):
201 selfA.StoredVariables["CurrentState"].store( Xn )
202 if selfA._toStore("ForecastState"):
203 selfA.StoredVariables["ForecastState"].store( Xhmn )
204 if selfA._toStore("ForecastCovariance"):
205 selfA.StoredVariables["ForecastCovariance"].store( Pmn )
206 if selfA._toStore("BMA"):
207 selfA.StoredVariables["BMA"].store( Xhmn - Xa )
208 if selfA._toStore("InnovationAtCurrentState"):
209 selfA.StoredVariables["InnovationAtCurrentState"].store( _Innovation )
210 if selfA._toStore("SimulatedObservationAtCurrentState") \
211 or selfA._toStore("SimulatedObservationAtCurrentOptimum"):
212 selfA.StoredVariables["SimulatedObservationAtCurrentState"].store( Yhmn )
214 if selfA._parameters["StoreInternalVariables"] \
215 or selfA._toStore("CostFunctionJ") \
216 or selfA._toStore("CostFunctionJb") \
217 or selfA._toStore("CostFunctionJo") \
218 or selfA._toStore("CurrentOptimum") \
219 or selfA._toStore("APosterioriCovariance"):
220 Jb = vfloat( 0.5 * (Xa - Xb).T * (BI * (Xa - Xb)) )
221 Jo = vfloat( 0.5 * _Innovation.T * (RI * _Innovation) )
223 selfA.StoredVariables["CostFunctionJb"].store( Jb )
224 selfA.StoredVariables["CostFunctionJo"].store( Jo )
225 selfA.StoredVariables["CostFunctionJ" ].store( J )
227 if selfA._toStore("IndexOfOptimum") \
228 or selfA._toStore("CurrentOptimum") \
229 or selfA._toStore("CostFunctionJAtCurrentOptimum") \
230 or selfA._toStore("CostFunctionJbAtCurrentOptimum") \
231 or selfA._toStore("CostFunctionJoAtCurrentOptimum") \
232 or selfA._toStore("SimulatedObservationAtCurrentOptimum"):
233 IndexMin = numpy.argmin( selfA.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
234 if selfA._toStore("IndexOfOptimum"):
235 selfA.StoredVariables["IndexOfOptimum"].store( IndexMin )
236 if selfA._toStore("CurrentOptimum"):
237 selfA.StoredVariables["CurrentOptimum"].store( selfA.StoredVariables["Analysis"][IndexMin] )
238 if selfA._toStore("SimulatedObservationAtCurrentOptimum"):
239 selfA.StoredVariables["SimulatedObservationAtCurrentOptimum"].store( selfA.StoredVariables["SimulatedObservationAtCurrentAnalysis"][IndexMin] ) # noqa: E501
240 if selfA._toStore("CostFunctionJbAtCurrentOptimum"):
241 selfA.StoredVariables["CostFunctionJbAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJb"][IndexMin] ) # noqa: E501
242 if selfA._toStore("CostFunctionJoAtCurrentOptimum"):
243 selfA.StoredVariables["CostFunctionJoAtCurrentOptimum"].store( selfA.StoredVariables["CostFunctionJo"][IndexMin] ) # noqa: E501
244 if selfA._toStore("CostFunctionJAtCurrentOptimum"):
245 selfA.StoredVariables["CostFunctionJAtCurrentOptimum" ].store( selfA.StoredVariables["CostFunctionJ" ][IndexMin] ) # noqa: E501
246 if selfA._toStore("APosterioriCovariance"):
247 selfA.StoredVariables["APosterioriCovariance"].store( Pn )
248 if selfA._parameters["EstimationOf"] == "Parameters" \
249 and J < previousJMinimum:
252 if selfA._toStore("APosterioriCovariance"):
253 covarianceXaMin = selfA.StoredVariables["APosterioriCovariance"][-1]
255 # Stockage final supplémentaire de l'optimum en estimation de paramètres
256 # ----------------------------------------------------------------------
257 if selfA._parameters["EstimationOf"] == "Parameters":
258 selfA.StoredVariables["CurrentIterationNumber"].store( len(selfA.StoredVariables["Analysis"]) )
259 selfA.StoredVariables["Analysis"].store( XaMin )
260 if selfA._toStore("APosterioriCovariance"):
261 selfA.StoredVariables["APosterioriCovariance"].store( covarianceXaMin )
262 if selfA._toStore("BMA"):
263 selfA.StoredVariables["BMA"].store( numpy.ravel(Xb) - numpy.ravel(XaMin) )
267 # ==============================================================================
268 if __name__ == "__main__":
269 print('\n AUTODIAGNOSTIC\n')