]> SALOME platform Git repositories - modules/adao.git/commitdiff
Salome HOME
Adding user helper function ImportFromFile
authorJean-Philippe ARGAUD <jean-philippe.argaud@edf.fr>
Sat, 30 Jun 2018 04:02:09 +0000 (06:02 +0200)
committerJean-Philippe ARGAUD <jean-philippe.argaud@edf.fr>
Sat, 30 Jun 2018 04:02:09 +0000 (06:02 +0200)
src/daComposant/daCore/Interfaces.py

index 589e718f77f07321da620138e6a3e8b47d4b8875..19a42d515e010ee38628ac7e59bdbc6b21e13667 100644 (file)
@@ -533,6 +533,274 @@ class ImportFromScript(object):
         "Renvoie le script complet"
         return self.__filestring
 
+# ==============================================================================
+class ImportFromFile(object):
+    """
+    Obtention de variables disrétisées en 1D, définies par une ou des variables
+    nommées, et sous la forme d'une série de points éventuellement indexés. La
+    lecture d'un fichier au format spécifié (ou intuité) permet de charger ces
+    fonctions depuis :
+        - des fichiers textes en colonnes de type TXT, CSV, TSV...
+        - des fichiers de données binaires NPY, NPZ...
+    La lecture du fichier complet ne se fait que si nécessaire, pour assurer la
+    performance tout en disposant de l'interprétation du contenu. Les fichiers
+    textes doivent présenter en première ligne (hors commentaire ou ligne vide)
+    les noms des variables de colonnes. Les commentaires commencent par un "#".
+    """
+    __slots__ = (
+        "_filename", "_varsline", "_format", "_delimiter", "_skiprows",
+        "__colnames", "__colindex", "__filestring", "__header")
+    def __enter__(self): return self
+    def __exit__(self, exc_type, exc_val, exc_tb): return False
+    #
+    def __init__(self, Filename=None, ColNames=None, ColIndex=None, Format="Guess"):
+        """
+        Verifie l'existence et les informations de définition du fichier. Les
+        noms de colonnes ou de variables sont ignorées si le format ne permet
+        pas de les indiquer.
+        Arguments :
+            - Filename : nom du fichier
+            - ColNames : noms de la ou des colonnes/variables à lire
+            - ColIndex : nom unique de la colonne/variable servant d'index
+            - Format : format du fichier et/ou des données inclues
+        """
+        if Filename is None:
+            raise ValueError("The name of the file, containing the variables to be read, has to be specified.")
+        if not os.path.isfile(Filename):
+            raise ValueError("The file, containing the variables to be read, doesn't seem to exist. Please check the file. The given file name is:\n  \"%s\""%str(Filename))
+        self._filename = os.path.abspath(Filename)
+        #
+        self.__header, self._varsline, self._skiprows = self.__getentete(self._filename)
+        #
+        self._delimiter = None
+        self.__filestring = "".join(self.__header)
+        if Format.upper() == "GUESS":
+            if self._filename.split(".")[-1].lower() == "npy":
+                self._format = "NPY"
+            elif self._filename.split(".")[-1].lower() == "npz":
+                self._format = "NPZ"
+            elif self.__filestring.count(",") > 1  and self._filename.split(".")[-1].lower() == "csv":
+                self._format = "CSV"
+                self._delimiter = ","
+            elif self.__filestring.count(";") > 1  and self._filename.split(".")[-1].lower() == "csv":
+                self._format = "CSV"
+                self._delimiter = ";"
+            elif self.__filestring.count("\t") > 1 and self._filename.split(".")[-1].lower() == "tsv":
+                self._format = "TSV"
+                self._delimiter = "\t"
+            elif self.__filestring.count(" ") > 1  and self._filename.split(".")[-1].lower() == "txt":
+                self._format = "TXT"
+            else:
+                raise ValueError("Can not guess the file format, please specify the good one")
+        elif Format.upper() == "CSV" and self._delimiter is None:
+            if self.__filestring.count(",") > 1 and self._filename.split(".")[-1].lower() == "csv":
+                self._format    = "CSV"
+                self._delimiter = ","
+            elif self.__filestring.count(";") > 1 and self._filename.split(".")[-1].lower() == "csv":
+                self._format    = "CSV"
+                self._delimiter = ";"
+        elif Format.upper() == "TSV" and self._delimiter is None:
+            self._format    = "TSV"
+            self._delimiter = "\t"
+        else:
+            self._format     = str(Format).upper()
+        #
+        if ColNames is not None: self.__colnames = tuple(ColNames)
+        else:                    self.__colnames = None
+        #
+        if ColIndex is not None: self.__colindex = str(ColIndex)
+        else:                    self.__colindex = None
+
+    def __getentete(self, __filename, __nblines = 3):
+        "Lit l'entête du fichier pour trouver la définition des variables"
+        __header, __varsline, __skiprows = [], "", 1
+        if __filename.split(".")[-1].lower() in ("npy", "npz"):
+            pass
+        else:
+            with open(__filename,'r') as fid:
+                __line = fid.readline().strip()
+                while "#" in __line or len(__line) < 1:
+                    __header.append(__line)
+                    __skiprows += 1
+                    __line = fid.readline().strip()
+                __varsline = __line # Première ligne non commentée non vide
+                for i in range(max(0,__nblines)):
+                    __header.append(fid.readline())
+        return (__header, __varsline, __skiprows)
+
+    def __getindices(self, __colnames, __colindex, __delimiter=None ):
+        "Indices de colonnes correspondants à l'index et aux variables"
+        if __delimiter is None:
+            __varserie = self._varsline.strip('#').strip().split()
+        else:
+            __varserie = self._varsline.strip('#').strip().split(str(__delimiter))
+        #
+        if __colnames is not None:
+            __usecols = []
+            __colnames = tuple(__colnames)
+            for v in __colnames:
+                for i, n in enumerate(__varserie):
+                    if v == n: __usecols.append(i)
+            __usecols = tuple(__usecols)
+            if len(__usecols) == 0: __usecols = None
+        else:
+            __usecols = None
+        #
+        if __colindex is not None:
+            __useindex = None
+            __colindex = str(__colindex)
+            for i, n in enumerate(__varserie):
+                if __colindex == n: __useindex = i
+        else:
+            __useindex = None
+        #
+        return (__usecols, __useindex)
+
+    def getvalue(self, ColNames=None, ColIndex=None ):
+        "Renvoie la ou les variables demandees par la liste de leurs noms"
+        # Uniquement si mise à jour
+        if ColNames is not None: self.__colnames = tuple(ColNames)
+        if ColIndex is not None: self.__colindex = str(ColIndex)
+        #
+        __index = None
+        if self._format == "NPY":
+            __columns = numpy.load(self._filename)
+        elif self._format == "NPZ":
+            __columns = None
+            with numpy.load(self._filename) as __allcolumns:
+                if self.__colnames is None:
+                    self.__colnames = __allcolumns.files
+                for nom in self.__colnames:
+                    if nom in __allcolumns.files:
+                        if __columns is not None:
+                            # Attention : toutes les variables doivent avoir la même taille
+                            __columns = numpy.vstack((__columns, numpy.reshape(__allcolumns[nom], (1,-1))))
+                        else:
+                            # Première colonne
+                            __columns = numpy.reshape(__allcolumns[nom], (1,-1))
+                if self.__colindex is not None and self.__colindex in __allcolumns.files:
+                    __index = numpy.reshape(__allcolumns[self.__colindex], (1,-1))
+        elif self._format == "TXT":
+            __usecols, __useindex = self.__getindices(self.__colnames, self.__colindex)
+            __columns = numpy.loadtxt(self._filename, usecols = __usecols, skiprows=self._skiprows)
+            if __useindex is not None:
+                __index = numpy.loadtxt(self._filename, usecols = __useindex, skiprows=self._skiprows)
+        #
+        elif self._format == "CSV":
+            __usecols, __useindex = self.__getindices(self.__colnames, self.__colindex, self._delimiter)
+            __columns = numpy.loadtxt(self._filename, usecols = __usecols, delimiter = self._delimiter, skiprows=self._skiprows)
+            if __useindex is not None:
+                __index = numpy.loadtxt(self._filename, usecols = __useindex, delimiter = self._delimiter, skiprows=self._skiprows)
+        #
+        elif self._format == "TSV":
+            __usecols, __useindex = self.__getindices(self.__colnames, self.__colindex, self._delimiter)
+            __columns = numpy.loadtxt(self._filename, usecols = __usecols, delimiter = self._delimiter, skiprows=self._skiprows)
+            if __useindex is not None:
+                __index = numpy.loadtxt(self._filename, usecols = __useindex, delimiter = self._delimiter, skiprows=self._skiprows)
+        else:
+            raise ValueError("Unkown file format %s"%self._format)
+        #
+        return (self.__colnames, __columns, self.__colindex, __index)
+
+    def getstring(self):
+        "Renvoie le fichier complet"
+        with open(self._filename,'r') as fid:
+            return fid.read()
+
+# ==============================================================================
+class ImportScalarLinesFromFile(ImportFromFile):
+    """
+    Importation de fichier contenant des variables scalaires nommées. Le
+    fichier comporte soit 2, soit 4 colonnes, obligatoirement nommées "Name",
+    "Value", "Minimum", "Maximum" si les noms sont précisés. Sur chaque ligne
+    est indiqué le nom, la valeur, et éventuelement deux bornes min et max (ou
+    None si nécessaire pour une borne).
+
+    Seule la méthode "getvalue" est changée.
+    """
+    def __enter__(self): return self
+    def __exit__(self, exc_type, exc_val, exc_tb): return False
+    #
+    def __init__(self, Filename=None, ColNames=None, ColIndex=None, Format="Guess"):
+        ImportFromFile.__init__(self, Filename, ColNames, ColIndex, Format)
+        if self._format not in ["TXT", "CSV", "TSV"]:
+            raise ValueError("Unkown file format \"%s\""%self._format)
+    #
+    def getvalue(self, VarNames = None, HeaderNames=()):
+        "Renvoie la ou les variables demandees par la liste de leurs noms"
+        if VarNames is not None: __varnames = tuple(VarNames)
+        else:                    __varnames = None
+        #
+        if "Name" in self._varsline and "Value" in self._varsline and "Minimum" in self._varsline and "Maximum" in self._varsline:
+            __ftype = "NamValMinMax"
+            __dtypes   = {'names'  : ('Name', 'Value', 'Minimum', 'Maximum'),
+                          'formats': ('S128', 'g', 'g', 'g')}
+            __usecols  = (0, 1, 2, 3)
+            def __replaceNoneN( s ):
+                if s.strip() == b'None': return numpy.NINF
+                else:                    return s
+            def __replaceNoneP( s ):
+                if s.strip() == b'None': return numpy.PINF
+                else:                    return s
+            __converters = {2: __replaceNoneN, 3: __replaceNoneP}
+        elif "Name" in self._varsline and "Value" in self._varsline and ("Minimum" not in self._varsline or "Maximum" not in self._varsline):
+            __ftype = "NamVal"
+            __dtypes   = {'names'  : ('Name', 'Value'),
+                          'formats': ('S128', 'g')}
+            __converters = None
+            __usecols  = (0, 1)
+        elif len(HeaderNames)>0 and numpy.all([kw in self._varsline for kw in HeaderNames]):
+            __ftype = "NamLotOfVals"
+            __dtypes   = {'names'  : HeaderNames,
+                          'formats': tuple(['S128',]+['g']*(len(HeaderNames)-1))}
+            __usecols  = tuple(range(len(HeaderNames)))
+            def __replaceNone( s ):
+                if s.strip() == b'None': return numpy.NAN
+                else:                    return s
+            __converters = dict()
+            for i in range(1,len(HeaderNames)):
+                __converters[i] = __replaceNone
+        else:
+            raise ValueError("Can not find names of columns for initial values. Wrong first line is:\n            \"%s\""%__firstline)
+        #
+        if self._format == "TXT":
+            __content = numpy.loadtxt(self._filename, dtype = __dtypes, usecols = __usecols, skiprows = self._skiprows, converters = __converters)
+        elif self._format in ["CSV", "TSV"]:
+            __content = numpy.loadtxt(self._filename, dtype = __dtypes, usecols = __usecols, skiprows = self._skiprows, converters = __converters, delimiter = self._delimiter)
+        else:
+            raise ValueError("Unkown file format \"%s\""%self._format)
+        #
+        __names, __background, __bounds = [], [], []
+        for sub in __content:
+            if len(__usecols) == 4:
+                na, va, mi, ma = sub
+                if numpy.isneginf(mi): mi = None # Réattribue les variables None
+                elif numpy.isnan(mi):  mi = None # Réattribue les variables None
+                if numpy.isposinf(ma): ma = None # Réattribue les variables None
+                elif numpy.isnan(ma):  ma = None # Réattribue les variables None
+            elif len(__usecols) == 2 and __ftype == "NamVal":
+                na, va = sub
+                mi, ma = None, None
+            else:
+                nsub = list(sub)
+                na = sub[0]
+                for i, v in enumerate(nsub[1:]):
+                    if numpy.isnan(v): nsub[i+1] = None
+                va = nsub[1:]
+                mi, ma = None, None
+            na = na.decode()
+            if (__varnames is None or na in __varnames) and (na not in __names):
+                # Ne stocke que la premiere occurence d'une variable
+                __names.append(na)
+                __background.append(va)
+                __bounds.append((mi,ma))
+        #
+        __names      = tuple(__names)
+        __background = numpy.array(__background)
+        __bounds     = tuple(__bounds)
+        #
+        return (__names, __background, __bounds)
+
 # ==============================================================================
 if __name__ == "__main__":
     print('\n AUTODIAGNOSTIC \n')