#
# Précalcul des inversions de B et R
# ----------------------------------
- if B is not None:
- BI = B.I
- elif self._parameters["B_scalar"] is not None:
- BI = 1.0 / self._parameters["B_scalar"]
- else:
- raise ValueError("Background error covariance matrix has to be properly defined!")
- #
- if R is not None:
- RI = R.I
- elif self._parameters["R_scalar"] is not None:
- RI = 1.0 / self._parameters["R_scalar"]
- else:
- raise ValueError("Observation error covariance matrix has to be properly defined!")
+ BI = B.getI()
+ RI = R.getI()
#
# Définition de la fonction-coût
# ------------------------------
_ee[i] = 1.
_HtEE = numpy.dot(HtM,_ee)
_HtEE = numpy.asmatrix(numpy.ravel( _HtEE )).T
- HessienneI.append( numpy.ravel( numpy.dot(BI,_ee) + numpy.dot(HaM,numpy.dot(RI,_HtEE)) ) )
+ HessienneI.append( numpy.ravel( BI*_ee + HaM * (RI * _HtEE) ) )
HessienneI = numpy.matrix( HessienneI )
A = HessienneI.I
if min(A.shape) != max(A.shape):
if "OMB" in self._parameters["StoreSupplementaryCalculations"]:
self.StoredVariables["OMB"].store( numpy.ravel(d) )
if "SigmaObs2" in self._parameters["StoreSupplementaryCalculations"]:
- if R is not None:
- TraceR = R.trace()
- elif self._parameters["R_scalar"] is not None:
- TraceR = float(self._parameters["R_scalar"]*Y.size)
+ TraceR = R.trace(Y.size)
self.StoredVariables["SigmaObs2"].store( float( (d.T * (numpy.asmatrix(numpy.ravel(Y)).T-numpy.asmatrix(numpy.ravel(Hm(Xa))).T)) ) / TraceR )
if "MahalanobisConsistency" in self._parameters["StoreSupplementaryCalculations"]:
self.StoredVariables["MahalanobisConsistency"].store( float( 2.*MinJ/d.size ) )
#
# Précalcul des inversions de B et R
# ----------------------------------
- if B is not None:
- BI = B.I
- elif self._parameters["B_scalar"] is not None:
- BI = 1.0 / self._parameters["B_scalar"]
- B = self._parameters["B_scalar"]
- else:
- raise ValueError("Background error covariance matrix has to be properly defined!")
- #
- if R is not None:
- RI = R.I
- elif self._parameters["R_scalar"] is not None:
- RI = 1.0 / self._parameters["R_scalar"]
- else:
- raise ValueError("Observation error covariance matrix has to be properly defined!")
+ BI = B.getI()
+ RI = R.getI()
#
# Calcul de l'innovation
# ----------------------
# Calcul de la matrice de gain et de l'analyse
# --------------------------------------------
if Y.size <= Xb.size:
- if self._parameters["R_scalar"] is not None:
- R = self._parameters["R_scalar"] * numpy.eye(Y.size, dtype=numpy.float)
if Y.size > 100: # len(R)
- _A = Hm * B * Ha + R
+ _A = R + Hm * B * Ha
_u = numpy.linalg.solve( _A , d )
Xa = Xb + B * Ha * _u
else:
- K = B * Ha * (Hm * B * Ha + R).I
+ K = B * Ha * (R + Hm * B * Ha).I
Xa = Xb + K*d
else:
if Y.size > 100: # len(R)
- _A = Ha * RI * Hm + BI
+ _A = BI + Ha * RI * Hm
_u = numpy.linalg.solve( _A , Ha * RI * d )
Xa = Xb + _u
else:
- K = (Ha * RI * Hm + BI).I * Ha * RI
+ K = (BI + Ha * RI * Hm).I * Ha * RI
Xa = Xb + K*d
self.StoredVariables["Analysis"].store( Xa.A1 )
#
if "OMB" in self._parameters["StoreSupplementaryCalculations"]:
self.StoredVariables["OMB"].store( numpy.ravel(d) )
if "SigmaObs2" in self._parameters["StoreSupplementaryCalculations"]:
- if R is not None:
- TraceR = R.trace()
- elif self._parameters["R_scalar"] is not None:
- TraceR = float(self._parameters["R_scalar"]*Y.size)
+ TraceR = R.trace(Y.size)
self.StoredVariables["SigmaObs2"].store( float( (d.T * (numpy.asmatrix(numpy.ravel(oma)).T)) ) / TraceR )
if "SigmaBck2" in self._parameters["StoreSupplementaryCalculations"]:
self.StoredVariables["SigmaBck2"].store( float( (d.T * Hm * (Xa - Xb))/(Hm * B * Hm.T).trace() ) )
#
# Précalcul des inversions de B et R
# ----------------------------------
- if B is not None:
- BI = B.I
- elif self._parameters["B_scalar"] is not None:
- BI = 1.0 / self._parameters["B_scalar"]
- B = self._parameters["B_scalar"]
- else:
- raise ValueError("Background error covariance matrix has to be properly defined!")
- #
- if R is not None:
- RI = R.I
- elif self._parameters["R_scalar"] is not None:
- RI = 1.0 / self._parameters["R_scalar"]
- else:
- raise ValueError("Observation error covariance matrix has to be properly defined!")
+ BI = B.getI()
+ RI = R.getI()
#
- # Nombre d'ensemble pour l'ébauche
+ # Nombre d'ensemble pour l'ébauche
# --------------------------------
nb_ens = Xb.stepnumber()
#
# Construction de l'ensemble des observations, par génération a partir
# de la diagonale de R
# --------------------------------------------------------------------
- DiagonaleR = numpy.diag(R)
+ DiagonaleR = R.diag(Y.size)
EnsembleY = numpy.zeros([Y.size,nb_ens])
- for npar in range(DiagonaleR.size) :
+ for npar in range(DiagonaleR.size):
bruit = numpy.random.normal(0,DiagonaleR[npar],nb_ens)
EnsembleY[npar,:] = Y[npar] + bruit
EnsembleY = numpy.matrix(EnsembleY)
# Calcul de la matrice de gain dans l'espace le plus petit et de l'analyse
# ------------------------------------------------------------------------
if Y.size <= Xb[0].size:
- if self._parameters["R_scalar"] is not None:
- R = self._parameters["R_scalar"] * numpy.eye(Y.size, dtype=numpy.float)
- K = B * Ha * (Hm * B * Ha + R).I
+ K = B * Ha * (R + Hm * B * Ha).I
else:
- K = (Ha * RI * Hm + BI).I * Ha * RI
+ K = (BI + Ha * RI * Hm).I * Ha * RI
#
# Calcul du BLUE pour chaque membre de l'ensemble
# -----------------------------------------------
# Précalcul des inversions de B et R
# ----------------------------------
if self._parameters["StoreInternalVariables"]:
- if B is not None:
- BI = B.I
- elif self._parameters["B_scalar"] is not None:
- BI = 1.0 / self._parameters["B_scalar"]
- #
- if R is not None:
- RI = R.I
- elif self._parameters["R_scalar"] is not None:
- RI = 1.0 / self._parameters["R_scalar"]
+ BI = B.getI()
+ RI = R.getI()
#
# Initialisation
# --------------
if Cm is not None and Un is not None: # Attention : si Cm est aussi dans M, doublon !
Cm = Cm.reshape(Xn.size,Un.size) # ADAO & check shape
Xn_predicted = Xn_predicted + Cm * Un
- Pn_predicted = Mt * Pn * Ma + Q
+ Pn_predicted = Q + Mt * Pn * Ma
elif self._parameters["EstimationOf"] == "Parameters":
# --- > Par principe, M = Id, Q = 0
Xn_predicted = Xn
if Cm is not None and Un is not None: # Attention : si Cm est aussi dans H, doublon !
d = d - Cm * Un
#
- K = Pn_predicted * Ha * (Ht * Pn_predicted * Ha + R).I
+ K = Pn_predicted * Ha * (R + Ht * Pn_predicted * Ha).I
Xn = Xn_predicted + K * d
Pn = Pn_predicted - K * Ht * Pn_predicted
#
# Précalcul des inversions de B et R
# ----------------------------------
if self._parameters["StoreInternalVariables"]:
- if B is not None:
- BI = B.I
- elif self._parameters["B_scalar"] is not None:
- BI = 1.0 / self._parameters["B_scalar"]
- #
- if R is not None:
- RI = R.I
- elif self._parameters["R_scalar"] is not None:
- RI = 1.0 / self._parameters["R_scalar"]
+ BI = B.getI()
+ RI = R.getI()
#
# Initialisation
# --------------
if self._parameters["EstimationOf"] == "State":
Xn_predicted = Mt * Xn
if Cm is not None and Un is not None: # Attention : si Cm est aussi dans M, doublon !
+ Cm = Cm.reshape(Xn.size,Un.size) # ADAO & check shape
Xn_predicted = Xn_predicted + Cm * Un
- Pn_predicted = Mt * Pn * Ma + Q
+ Pn_predicted = Q + Mt * Pn * Ma
elif self._parameters["EstimationOf"] == "Parameters":
# --- > Par principe, M = Id, Q = 0
Xn_predicted = Xn
if Cm is not None and Un is not None: # Attention : si Cm est aussi dans H, doublon !
d = d - Cm * Un
#
- K = Pn_predicted * Ha * (Ht * Pn_predicted * Ha + R).I
+ K = Pn_predicted * Ha * (R + Ht * Pn_predicted * Ha).I
Xn = Xn_predicted + K * d
Pn = Pn_predicted - K * Ht * Pn_predicted
#
Ha = HO["Adjoint"].asMatrix(None)
Ha = Ha.reshape(-1,Y.size) # ADAO & check shape
#
- if R is not None:
- RI = R.I
- elif self._parameters["R_scalar"] is not None:
- RI = 1.0 / self._parameters["R_scalar"]
- else:
- raise ValueError("Observation error covariance matrix has to be properly defined!")
+ RI = R.getI()
#
# Calcul de la matrice de gain et de l'analyse
# --------------------------------------------
#
# Précalcul des inversions de B et R
# ----------------------------------
- # if B is not None:
- # BI = B.I
- # elif self._parameters["B_scalar"] is not None:
- # BI = 1.0 / self._parameters["B_scalar"]
- # else:
- # raise ValueError("Background error covariance matrix has to be properly defined!")
- #
- if R is not None:
- RI = R.I
- if self._parameters["Minimizer"] == "LM":
- RdemiI = numpy.linalg.cholesky(R).I
- elif self._parameters["R_scalar"] is not None:
- RI = 1.0 / self._parameters["R_scalar"]
- if self._parameters["Minimizer"] == "LM":
- import math
- RdemiI = 1.0 / math.sqrt( self._parameters["R_scalar"] )
- else:
- raise ValueError("Observation error covariance matrix has to be properly defined!")
+ RI = R.getI()
+ if self._parameters["Minimizer"] == "LM":
+ RdemiI = R.choleskyI()
#
# Définition de la fonction-coût
# ------------------------------
raise ValueError("Error in Minimizer name: %s"%self._parameters["Minimizer"])
#
IndexMin = numpy.argmin( self.StoredVariables["CostFunctionJ"][nbPreviousSteps:] ) + nbPreviousSteps
- MinJ = self.StoredVariables["CostFunctionJ"][IndexMin]
+ MinJ = self.StoredVariables["CostFunctionJ"][IndexMin]
#
# Correction pour pallier a un bug de TNC sur le retour du Minimum
# ----------------------------------------------------------------
#
# Précalcul des inversions de B et R
# ----------------------------------
- if B is not None:
- BI = B.I
- elif self._parameters["B_scalar"] is not None:
- BI = 1.0 / self._parameters["B_scalar"]
- else:
- BI = None
- #
- if R is not None:
- RI = R.I
- elif self._parameters["R_scalar"] is not None:
- RI = 1.0 / self._parameters["R_scalar"]
- else:
- RI = None
+ BI = B.getI()
+ RI = R.getI()
#
# Définition de la fonction-coût
# ------------------------------
import Logging ; Logging.Logging() # A importer en premier
import scipy.optimize
import Persistence
-from BasicObjects import Operator
+from BasicObjects import Operator, Covariance
from PlatformInfo import uniq
# ==============================================================================
self.__StoredDiagnostics = {}
self.__StoredInputs = {}
#
- self.__B_scalar = None
- self.__R_scalar = None
- self.__Q_scalar = None
- self.__Parameters["B_scalar"] = None
- self.__Parameters["R_scalar"] = None
- self.__Parameters["Q_scalar"] = None
- #
# Variables temporaires
self.__algorithm = {}
self.__algorithmFile = None
- toBeStored : booléen indiquant si la donnée d'entrée est sauvée pour
être rendue disponible au même titre que les variables de calcul
"""
- if asEyeByScalar is not None:
- self.__B_scalar = float(asEyeByScalar)
- self.__B = None
- elif asEyeByVector is not None:
- self.__B_scalar = None
- self.__B = numpy.matrix( numpy.diagflat( asEyeByVector ), numpy.float )
- elif asCovariance is not None:
- self.__B_scalar = None
- self.__B = numpy.matrix( asCovariance, numpy.float )
- else:
- raise ValueError("Background error covariance matrix has to be specified either as a matrix, a vector for its diagonal or a scalar multiplying an identity matrix")
- #
- self.__Parameters["B_scalar"] = self.__B_scalar
+ self.__B = Covariance(
+ name = "BackgroundError",
+ asCovariance = asCovariance,
+ asEyeByScalar = asEyeByScalar,
+ asEyeByVector = asEyeByVector,
+ )
if toBeStored:
self.__StoredInputs["BackgroundError"] = self.__B
return 0
- toBeStored : booléen indiquant si la donnée d'entrée est sauvée pour
être rendue disponible au même titre que les variables de calcul
"""
- if asEyeByScalar is not None:
- self.__R_scalar = float(asEyeByScalar)
- self.__R = None
- elif asEyeByVector is not None:
- self.__R_scalar = None
- self.__R = numpy.matrix( numpy.diagflat( asEyeByVector ), numpy.float )
- elif asCovariance is not None:
- self.__R_scalar = None
- self.__R = numpy.matrix( asCovariance, numpy.float )
- else:
- raise ValueError("Observation error covariance matrix has to be specified either as a matrix, a vector for its diagonal or a scalar multiplying an identity matrix")
- #
- self.__Parameters["R_scalar"] = self.__R_scalar
+ self.__R = Covariance(
+ name = "ObservationError",
+ asCovariance = asCovariance,
+ asEyeByScalar = asEyeByScalar,
+ asEyeByVector = asEyeByVector,
+ )
if toBeStored:
self.__StoredInputs["ObservationError"] = self.__R
return 0
- toBeStored : booléen indiquant si la donnée d'entrée est sauvée pour
être rendue disponible au même titre que les variables de calcul
"""
- if asEyeByScalar is not None:
- self.__Q_scalar = float(asEyeByScalar)
- self.__Q = None
- elif asEyeByVector is not None:
- self.__Q_scalar = None
- self.__Q = numpy.matrix( numpy.diagflat( asEyeByVector ), numpy.float )
- elif asCovariance is not None:
- self.__Q_scalar = None
- self.__Q = numpy.matrix( asCovariance, numpy.float )
- else:
- raise ValueError("Evolution error covariance matrix has to be specified either as a matrix, a vector for its diagonal or a scalar multiplying an identity matrix")
- #
- self.__Parameters["Q_scalar"] = self.__Q_scalar
+ self.__Q = Covariance(
+ name = "ObservationError",
+ asCovariance = asCovariance,
+ asEyeByScalar = asEyeByScalar,
+ asEyeByVector = asEyeByVector,
+ )
if toBeStored:
self.__StoredInputs["EvolutionError"] = self.__Q
return 0
des matrices s'il y en a.
"""
if self.__Xb is None: __Xb_shape = (0,)
+ elif hasattr(self.__Xb,"size"): __Xb_shape = (self.__Xb.size,)
elif hasattr(self.__Xb,"shape"):
if type(self.__Xb.shape) is tuple: __Xb_shape = self.__Xb.shape
else: __Xb_shape = self.__Xb.shape()
else: raise TypeError("Xb has no attribute of shape: problem !")
#
if self.__Y is None: __Y_shape = (0,)
+ elif hasattr(self.__Y,"size"): __Y_shape = (self.__Y.size,)
elif hasattr(self.__Y,"shape"):
if type(self.__Y.shape) is tuple: __Y_shape = self.__Y.shape
else: __Y_shape = self.__Y.shape()
else: raise TypeError("Y has no attribute of shape: problem !")
#
if self.__U is None: __U_shape = (0,)
+ elif hasattr(self.__U,"size"): __U_shape = (self.__U.size,)
elif hasattr(self.__U,"shape"):
if type(self.__U.shape) is tuple: __U_shape = self.__U.shape
else: __U_shape = self.__U.shape()
else: raise TypeError("U has no attribute of shape: problem !")
#
- if self.__B is None and self.__B_scalar is None:
- __B_shape = (0,0)
- elif self.__B is None and self.__B_scalar is not None:
- __B_shape = (max(__Xb_shape),max(__Xb_shape))
+ if self.__B is None: __B_shape = (0,0)
elif hasattr(self.__B,"shape"):
if type(self.__B.shape) is tuple: __B_shape = self.__B.shape
else: __B_shape = self.__B.shape()
else: raise TypeError("B has no attribute of shape: problem !")
#
- if self.__R is None and self.__R_scalar is None:
- __R_shape = (0,0)
- elif self.__R is None and self.__R_scalar is not None:
- __R_shape = (max(__Y_shape),max(__Y_shape))
+ if self.__R is None: __R_shape = (0,0)
elif hasattr(self.__R,"shape"):
if type(self.__R.shape) is tuple: __R_shape = self.__R.shape
else: __R_shape = self.__R.shape()
if self.__Matrix is not None:
return self.__Matrix
elif ValueForMethodForm is not "UnknownVoidValue": # Ne pas utiliser "None"
- return self.__Method( (ValueForMethodForm, None) )
+ return numpy.matrix( self.__Method( (ValueForMethodForm, None) ) )
else:
raise ValueError("Matrix form of the operator defined as a function/method requires to give an operating point.")
"""
raise NotImplementedError("Diagnostic activation method has not been implemented!")
+# ==============================================================================
+class Covariance:
+ """
+ Classe générale d'interface de type covariance
+ """
+ def __init__(self,
+ name = "GenericCovariance",
+ asCovariance = None,
+ asEyeByScalar = None,
+ asEyeByVector = None,
+ ):
+ """
+ Permet de définir une covariance :
+ - asCovariance : entrée des données, comme une matrice compatible avec
+ le constructeur de numpy.matrix
+ - asEyeByScalar : entrée des données comme un seul scalaire de variance,
+ multiplicatif d'une matrice de corrélation identité, aucune matrice
+ n'étant donc explicitement à donner
+ - asEyeByVector : entrée des données comme un seul vecteur de variance,
+ à mettre sur la diagonale d'une matrice de corrélation, aucune matrice
+ n'étant donc explicitement à donner
+ """
+ self.__name = str(name)
+ #
+ self.__B = None
+ self.__is_scalar = False
+ self.__is_vector = False
+ self.__is_matrix = False
+ if asEyeByScalar is not None:
+ self.__is_scalar = True
+ self.__B = float(asEyeByScalar)
+ self.shape = (0,0)
+ self.size = 0
+ elif asEyeByVector is not None:
+ self.__is_vector = True
+ self.__B = numpy.array( numpy.ravel( asEyeByVector ), float )
+ self.shape = (self.__B.size,self.__B.size)
+ self.size = self.__B.size**2
+ elif asCovariance is not None:
+ self.__is_matrix = True
+ self.__B = numpy.matrix( asCovariance, float )
+ self.shape = self.__B.shape
+ self.size = self.__B.size
+ else:
+ pass
+ # raise ValueError("The %s covariance matrix has to be specified either as a matrix, a vector for its diagonal or a scalar multiplying an identity matrix."%self.__name)
+ #
+ self.__validate()
+
+ def __validate(self):
+ if self.ismatrix() and min(self.shape) != max(self.shape):
+ raise ValueError("The given matrix for %s is not a square one, its shape is %s. Please check your matrix input."%(self.__name,self.shape))
+ if self.isscalar() and self.__B <= 0:
+ raise ValueError("The %s covariance matrix is not positive-definite. Please check your scalar input %s."%(self.__name,self.__B_scalar))
+ if self.isvector() and (self.__B <= 0).any():
+ raise ValueError("The %s covariance matrix is not positive-definite. Please check your vector input."%(self.__name,))
+ if self.ismatrix() and logging.getLogger().level < logging.WARNING: # La verification n'a lieu qu'en debug
+ try:
+ L = numpy.linalg.cholesky( self.__B )
+ except:
+ raise ValueError("The %s covariance matrix is not symmetric positive-definite. Please check your matrix input."%(self.__name,))
+
+ def isscalar(self):
+ return self.__is_scalar
+
+ def isvector(self):
+ return self.__is_vector
+
+ def ismatrix(self):
+ return self.__is_matrix
+
+ def getI(self):
+ if self.ismatrix():
+ return Covariance(self.__name+"I", asCovariance = self.__B.I )
+ elif self.isvector():
+ return Covariance(self.__name+"I", asEyeByVector = 1. / self.__B )
+ elif self.isscalar():
+ return Covariance(self.__name+"I", asEyeByScalar = 1. / self.__B )
+ else:
+ return None
+
+ def getT(self):
+ if self.ismatrix():
+ return Covariance(self.__name+"T", asCovariance = self.__B.T )
+ elif self.isvector():
+ return Covariance(self.__name+"T", asEyeByVector = self.__B )
+ elif self.isscalar():
+ return Covariance(self.__name+"T", asEyeByScalar = self.__B )
+
+ def cholesky(self):
+ if self.ismatrix():
+ return Covariance(self.__name+"C", asCovariance = numpy.linalg.cholesky(self.__B) )
+ elif self.isvector():
+ return Covariance(self.__name+"C", asEyeByVector = numpy.sqrt( self.__B ) )
+ elif self.isscalar():
+ return Covariance(self.__name+"C", asEyeByScalar = numpy.sqrt( self.__B ) )
+
+ def choleskyI(self):
+ if self.ismatrix():
+ return Covariance(self.__name+"H", asCovariance = numpy.linalg.cholesky(self.__B).I )
+ elif self.isvector():
+ return Covariance(self.__name+"H", asEyeByVector = 1.0 / numpy.sqrt( self.__B ) )
+ elif self.isscalar():
+ return Covariance(self.__name+"H", asEyeByScalar = 1.0 / numpy.sqrt( self.__B ) )
+
+ def diag(self, msize=None):
+ if self.ismatrix():
+ return numpy.diag(self.__B)
+ elif self.isvector():
+ return self.__B
+ elif self.isscalar():
+ if msize is None:
+ raise ValueError("the size of the %s covariance matrix has to be given in case of definition as a scalar over the diagonal."%(self.__name,))
+ else:
+ return self.__B * numpy.ones(int(msize))
+
+ def trace(self, msize=None):
+ if self.ismatrix():
+ return numpy.trace(self.__B)
+ elif self.isvector():
+ return float(numpy.sum(self.__B))
+ elif self.isscalar():
+ if msize is None:
+ raise ValueError("the size of the %s covariance matrix has to be given in case of definition as a scalar over the diagonal."%(self.__name,))
+ else:
+ return self.__B * int(msize)
+
+ def __repr__(self):
+ return repr(self.__B)
+
+ def __str__(self):
+ return str(self.__B)
+
+ def __add__(self, other):
+ if self.ismatrix():
+ return self.__B + numpy.asmatrix(other)
+ elif self.isvector() or self.isscalar():
+ _A = numpy.asarray(other)
+ _A.reshape(_A.size)[::_A.shape[1]+1] += self.__B
+ return numpy.asmatrix(_A)
+
+ def __radd__(self, other):
+ raise NotImplementedError("%s covariance matrix __radd__ method not available for %s type!"%(self.__name,type(other)))
+
+ def __sub__(self, other):
+ if self.ismatrix():
+ return self.__B - numpy.asmatrix(other)
+ elif self.isvector() or self.isscalar():
+ _A = numpy.asarray(other)
+ _A.reshape(_A.size)[::_A.shape[1]+1] = self.__B - _A.reshape(_A.size)[::_A.shape[1]+1]
+ return numpy.asmatrix(_A)
+
+ def __rsub__(self, other):
+ raise NotImplementedError("%s covariance matrix __rsub__ method not available for %s type!"%(self.__name,type(other)))
+
+ def __neg__(self):
+ return - self.__B
+
+ def __mul__(self, other):
+ if self.ismatrix() and isinstance(other,numpy.matrix):
+ return self.__B * other
+ elif self.ismatrix() and (isinstance(other,numpy.ndarray) \
+ or isinstance(other,list) \
+ or isinstance(other,tuple)):
+ if numpy.ravel(other).size == self.shape[1]: # Vecteur
+ return self.__B * numpy.asmatrix(numpy.ravel(other)).T
+ elif numpy.asmatrix(other).shape[0] == self.shape[1]: # Matrice
+ return self.__B * numpy.asmatrix(other)
+ else:
+ raise ValueError("operands could not be broadcast together with shapes %s %s in %s matrix"%(self.shape,numpy.asmatrix(other).shape,self.__name))
+ elif self.isvector() and (isinstance(other,numpy.matrix) \
+ or isinstance(other,numpy.ndarray) \
+ or isinstance(other,list) \
+ or isinstance(other,tuple)):
+ if numpy.ravel(other).size == self.shape[1]: # Vecteur
+ return numpy.asmatrix(self.__B * numpy.ravel(other)).T
+ elif numpy.asmatrix(other).shape[0] == self.shape[1]: # Matrice
+ return numpy.asmatrix((self.__B * (numpy.asarray(other).transpose())).transpose())
+ else:
+ raise ValueError("operands could not be broadcast together with shapes %s %s in %s matrix"%(self.shape,numpy.ravel(other).shape,self.__name))
+ elif self.isscalar() and isinstance(other,numpy.matrix):
+ return self.__B * other
+ elif self.isscalar() and (isinstance(other,numpy.ndarray) \
+ or isinstance(other,list) \
+ or isinstance(other,tuple)):
+ if len(numpy.asarray(other).shape) == 1 or numpy.asarray(other).shape[1] == 1 or numpy.asarray(other).shape[0] == 1:
+ return self.__B * numpy.asmatrix(numpy.ravel(other)).T
+ else:
+ return self.__B * numpy.asmatrix(other)
+ else:
+ raise NotImplementedError("%s covariance matrix __mul__ method not available for %s type!"%(self.__name,type(other)))
+
+ def __rmul__(self, other):
+ if self.ismatrix() and isinstance(other,numpy.matrix):
+ return other * self.__B
+ elif self.isvector() and isinstance(other,numpy.matrix):
+ if numpy.ravel(other).size == self.shape[0]: # Vecteur
+ return numpy.asmatrix(numpy.ravel(other) * self.__B)
+ elif numpy.asmatrix(other).shape[1] == self.shape[0]: # Matrice
+ return numpy.asmatrix(numpy.array(other) * self.__B)
+ else:
+ raise ValueError("operands could not be broadcast together with shapes %s %s in %s matrix"%(self.shape,numpy.ravel(other).shape,self.__name))
+ elif self.isscalar() and isinstance(other,numpy.matrix):
+ return other * self.__B
+ else:
+ raise NotImplementedError("%s covariance matrix __rmul__ method not available for %s type!"%(self.__name,type(other)))
+
+ def __len__(self):
+ return self.shape[0]
+
# ==============================================================================
if __name__ == "__main__":
print '\n AUTODIAGNOSTIC \n'