#
# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
-import numpy, logging, itertools
+import numpy, logging
from daCore import BasicObjects, NumericObjects
+from daAlgorithms.Atoms import eosg
from daCore.PlatformInfo import PlatformInfo
mfp = PlatformInfo().MaximumPrecision()
"CostFunctionJb",
"CostFunctionJo",
"CurrentState",
+ "EnsembleOfSimulations",
+ "EnsembleOfStates",
"InnovationAtCurrentState",
"SimulatedObservationAtCurrentState",
]
def run(self, Xb=None, Y=None, U=None, HO=None, EM=None, CM=None, R=None, B=None, Q=None, Parameters=None):
self._pre_run(Parameters, Xb, Y, U, HO, EM, CM, R, B, Q)
#
- Hm = HO["Direct"].appliedTo
- #
- X0 = numpy.ravel( Xb )
- Y0 = numpy.ravel( Y )
- #
- # ---------------------------
- sampleList = NumericObjects.BuildComplexSampleList(
- self._parameters["SampleAsnUplet"],
- self._parameters["SampleAsExplicitHyperCube"],
- self._parameters["SampleAsMinMaxStepHyperCube"],
- self._parameters["SampleAsIndependantRandomVariables"],
- X0,
- )
- # ----------
+ if hasattr(Y,"store"):
+ Yb = numpy.asarray( Y[-1] ).reshape((-1,1)) # Y en Vector ou VectorSerie
+ else:
+ Yb = numpy.asarray( Y ).reshape((-1,1)) # Y en Vector ou VectorSerie
BI = B.getI()
RI = R.getI()
def CostFunction(x, HmX, QualityMeasure="AugmentedWeightedLeastSquares"):
_HX = numpy.nan
Jb, Jo, J = numpy.nan, numpy.nan, numpy.nan
else:
- _X = numpy.ravel( x )
- _HX = numpy.ravel( HmX )
+ _X = numpy.asarray( x ).reshape((-1,1))
+ _HX = numpy.asarray( HmX ).reshape((-1,1))
+ _Innovation = Yb - _HX
+ assert Yb.size == _HX.size
+ assert Yb.size == _Innovation.size
if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","AugmentedPonderatedLeastSquares","APLS","DA"]:
if BI is None or RI is None:
raise ValueError("Background and Observation error covariance matrix has to be properly defined!")
- Jb = float( 0.5 * (_X - X0).T * (BI * (_X - X0)) )
- Jo = float( 0.5 * (Y0 - _HX).T * (RI * (Y0 - _HX)) )
+ Jb = float( 0.5 * (_X - Xb).T * (BI * (_X - Xb)) )
+ Jo = float( 0.5 * _Innovation.T * (RI * _Innovation) )
elif QualityMeasure in ["WeightedLeastSquares","WLS","PonderatedLeastSquares","PLS"]:
if RI is None:
raise ValueError("Observation error covariance matrix has to be properly defined!")
Jb = 0.
- Jo = float( 0.5 * (Y0 - _HX).T * (RI * (Y0 - _HX)) )
+ Jo = float( 0.5 * _Innovation.T * (RI * _Innovation) )
elif QualityMeasure in ["LeastSquares","LS","L2"]:
Jb = 0.
- Jo = float( 0.5 * (Y0 - _HX).T @ (Y0 - _HX) )
+ Jo = float( 0.5 * _Innovation.T @ _Innovation )
elif QualityMeasure in ["AbsoluteValue","L1"]:
Jb = 0.
- Jo = float( numpy.sum( numpy.abs(Y0 - _HX), dtype=mfp ) )
+ Jo = float( numpy.sum( numpy.abs(_Innovation), dtype=mfp ) )
elif QualityMeasure in ["MaximumError","ME", "Linf"]:
Jb = 0.
- Jo = numpy.max( numpy.abs(Y0 - _HX) )
+ Jo = numpy.max( numpy.abs(_Innovation) )
#
J = Jb + Jo
if self._toStore("CurrentState"):
self.StoredVariables["CurrentState"].store( _X )
if self._toStore("InnovationAtCurrentState"):
- self.StoredVariables["InnovationAtCurrentState"].store( Y0 - _HX )
+ self.StoredVariables["InnovationAtCurrentState"].store( _Innovation )
if self._toStore("SimulatedObservationAtCurrentState"):
self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HX )
self.StoredVariables["CostFunctionJb"].store( Jb )
self.StoredVariables["CostFunctionJo"].store( Jo )
self.StoredVariables["CostFunctionJ" ].store( J )
return J, Jb, Jo
- # ----------
- if self._parameters["SetDebug"]:
- CUR_LEVEL = logging.getLogger().getEffectiveLevel()
- logging.getLogger().setLevel(logging.DEBUG)
- print("===> Beginning of evaluation, activating debug\n")
- print(" %s\n"%("-"*75,))
#
# ----------
- for i,Xx in enumerate(sampleList):
- if self._parameters["SetDebug"]:
- print("===> Launching evaluation for state %i"%i)
- try:
- Yy = Hm( numpy.ravel( Xx ) )
- except:
- Yy = numpy.nan
- #
- J, Jb, Jo = CostFunction( Xx, Yy, self._parameters["QualityCriterion"])
- # ----------
+ EOX, EOS = eosg.eosg(self, Xb, HO, True)
#
- if self._parameters["SetDebug"]:
- print("\n %s\n"%("-"*75,))
- print("===> End evaluation, deactivating debug if necessary\n")
- logging.getLogger().setLevel(CUR_LEVEL)
+ for i in range(EOS.shape[1]):
+ J, Jb, Jo = CostFunction( EOX[:,i], EOS[:,i], self._parameters["QualityCriterion"])
+ # ----------
#
self._post_run(HO)
return 0