#
# Author: Jean-Philippe Argaud, jean-philippe.argaud@edf.fr, EDF R&D
-import logging
+import copy, logging, itertools
+import numpy
from daCore import BasicObjects
-import numpy, copy, itertools
+from daCore.PlatformInfo import PlatformInfo
+mfp = PlatformInfo().MaximumPrecision()
# ==============================================================================
class ElementaryAlgorithm(BasicObjects.Algorithm):
#
Hm = HO["Direct"].appliedTo
#
- Xn = copy.copy( Xb )
+ X0 = numpy.ravel( Xb )
+ Y0 = numpy.ravel( Y )
#
# ---------------------------
if len(self._parameters["SampleAsnUplet"]) > 0:
sampleList = self._parameters["SampleAsnUplet"]
for i,Xx in enumerate(sampleList):
- if numpy.ravel(Xx).size != Xn.size:
- raise ValueError("The size %i of the %ith state X in the sample and %i of the checking point Xb are different, they have to be identical."%(numpy.ravel(Xx).size,i+1,Xn.size))
+ if numpy.ravel(Xx).size != X0.size:
+ raise ValueError("The size %i of the %ith state X in the sample and %i of the checking point Xb are different, they have to be identical."%(numpy.ravel(Xx).size,i+1,X0.size))
elif len(self._parameters["SampleAsExplicitHyperCube"]) > 0:
sampleList = itertools.product(*list(self._parameters["SampleAsExplicitHyperCube"]))
elif len(self._parameters["SampleAsMinMaxStepHyperCube"]) > 0:
coordinatesList.append(distribution(*dim[1], size=max(1,int(dim[2]))))
sampleList = itertools.product(*coordinatesList)
else:
- sampleList = iter([Xn,])
+ sampleList = iter([X0,])
# ----------
BI = B.getI()
RI = R.getI()
_HX = numpy.nan
Jb, Jo, J = numpy.nan, numpy.nan, numpy.nan
else:
- _X = numpy.asmatrix(numpy.ravel( x )).T
- _HX = numpy.asmatrix(numpy.ravel( HmX )).T
+ _X = numpy.ravel( x )
+ _HX = numpy.ravel( HmX )
if QualityMeasure in ["AugmentedWeightedLeastSquares","AWLS","AugmentedPonderatedLeastSquares","APLS","DA"]:
if BI is None or RI is None:
raise ValueError("Background and Observation error covariance matrix has to be properly defined!")
- Jb = 0.5 * (_X - Xb).T * BI * (_X - Xb)
- Jo = 0.5 * (Y - _HX).T * RI * (Y - _HX)
+ Jb = float( 0.5 * (_X - X0).T * (BI * (_X - X0)) )
+ Jo = float( 0.5 * (Y0 - _HX).T * (RI * (Y0 - _HX)) )
elif QualityMeasure in ["WeightedLeastSquares","WLS","PonderatedLeastSquares","PLS"]:
if RI is None:
raise ValueError("Observation error covariance matrix has to be properly defined!")
Jb = 0.
- Jo = 0.5 * (Y - _HX).T * RI * (Y - _HX)
+ Jo = float( 0.5 * (Y0 - _HX).T * (RI * (Y0 - _HX)) )
elif QualityMeasure in ["LeastSquares","LS","L2"]:
Jb = 0.
- Jo = 0.5 * (Y - _HX).T * (Y - _HX)
+ Jo = float( 0.5 * (Y0 - _HX).T @ (Y0 - _HX) )
elif QualityMeasure in ["AbsoluteValue","L1"]:
Jb = 0.
- Jo = numpy.sum( numpy.abs(Y - _HX) )
+ Jo = float( numpy.sum( numpy.abs(Y0 - _HX), dtype=mfp ) )
elif QualityMeasure in ["MaximumError","ME"]:
Jb = 0.
- Jo = numpy.max( numpy.abs(Y - _HX) )
+ Jo = numpy.max( numpy.abs(Y0 - _HX) )
#
- J = float( Jb ) + float( Jo )
+ J = Jb + Jo
if self._toStore("CurrentState"):
self.StoredVariables["CurrentState"].store( _X )
if self._toStore("InnovationAtCurrentState"):
- self.StoredVariables["InnovationAtCurrentState"].store( Y - _HX )
+ self.StoredVariables["InnovationAtCurrentState"].store( Y0 - _HX )
if self._toStore("SimulatedObservationAtCurrentState"):
self.StoredVariables["SimulatedObservationAtCurrentState"].store( _HX )
self.StoredVariables["CostFunctionJb"].store( Jb )
for i,Xx in enumerate(sampleList):
if self._parameters["SetDebug"]:
print("===> Launching evaluation for state %i"%i)
- __Xn = numpy.asmatrix(numpy.ravel( Xx )).T
try:
- Yn = Hm( __Xn )
+ Yy = Hm( numpy.ravel( Xx ) )
except:
- Yn = numpy.nan
+ Yy = numpy.nan
#
- J, Jb, Jo = CostFunction(__Xn,Yn,self._parameters["QualityCriterion"])
+ J, Jb, Jo = CostFunction( Xx, Yy, self._parameters["QualityCriterion"])
# ----------
#
if self._parameters["SetDebug"]: