From ec220941bf562a1331ece120de335f94db9b3d11 Mon Sep 17 00:00:00 2001 From: Jean-Philippe ARGAUD Date: Sat, 14 Jul 2018 22:08:41 +0200 Subject: [PATCH] Improvements in file detection for user helper function --- src/daComposant/daCore/Interfaces.py | 191 ++++++++++++++++++--------- 1 file changed, 128 insertions(+), 63 deletions(-) diff --git a/src/daComposant/daCore/Interfaces.py b/src/daComposant/daCore/Interfaces.py index 969686a..76aea88 100644 --- a/src/daComposant/daCore/Interfaces.py +++ b/src/daComposant/daCore/Interfaces.py @@ -31,6 +31,7 @@ import sys import logging import copy import numpy +import mimetypes from daCore import Persistence from daCore import PlatformInfo from daCore import Templates @@ -533,6 +534,91 @@ class ImportFromScript(object): "Renvoie le script complet" return self.__filestring +# ============================================================================== +class ImportDetector(object): + """ + Détection des caractéristiques de fichiers ou objets en entrée + """ + __slots__ = ( + "__url", "__usr", "__root", "__end") + def __enter__(self): return self + def __exit__(self, exc_type, exc_val, exc_tb): return False + # + def __init__(self, __url, UserMime=""): + if __url is None: + raise ValueError("The name or url of the file object has to be specified.") + if __url is bytes: + self.__url = __url.decode() + else: + self.__url = str(__url) + if UserMime is bytes: + self.__usr = UserMime.decode().lower() + else: + self.__usr = str(UserMime).lower() + (self.__root, self.__end) = os.path.splitext(self.__url) + # + mimetypes.add_type('application/numpy.npy', '.npy') + mimetypes.add_type('application/numpy.npz', '.npz') + # + # File related f + # ------------------ + def is_local_file(self): + if os.path.isfile(os.path.realpath(self.__url)): + return True + else: + return False + def is_not_local_file(self): + if not os.path.isfile(os.path.realpath(self.__url)): + return True + else: + return False + def raise_error_if_not_local_file(self): + if not os.path.isfile(os.path.realpath(self.__url)): + raise ValueError("The name or the url of the file object doesn't seem to exist. The given name is:\n \"%s\""%str(self.__url)) + else: + return False + # Directory related tests + # ----------------------- + def is_local_dir(self): + if os.path.isdir(self.__url): + return True + else: + return False + def is_not_local_dir(self): + if not os.path.isdir(self.__url): + return True + else: + return False + def raise_error_if_not_local_dir(self): + if not os.path.isdir(self.__url): + raise ValueError("The name or the url of the directory object doesn't seem to exist. The given name is:\n \"%s\""%str(self.__url)) + else: + return False + # Mime related functions + # ------------------------ + def get_standard_mime(self): + (__mtype, __encoding) = mimetypes.guess_type(self.__url, strict=False) + return __mtype + def get_user_mime(self): + __fake = "fake."+self.__usr.lower() + (__mtype, __encoding) = mimetypes.guess_type(__fake, strict=False) + return __mtype + def get_comprehensive_mime(self): + if self.get_standard_mime() is not None: + return self.get_standard_mime() + elif self.get_user_mime() is not None: + return self.get_user_mime() + else: + return None + # Name related functions + # ---------------------- + def get_user_name(self): + return self.__url + def get_absolute_name(self): + return os.path.abspath(os.path.realpath(self.__url)) + def get_extension(self): + return self.__end + # ============================================================================== class ImportFromFile(object): """ @@ -548,8 +634,9 @@ class ImportFromFile(object): les noms des variables de colonnes. Les commentaires commencent par un "#". """ __slots__ = ( - "_filename", "_varsline", "_format", "_delimiter", "_skiprows", - "__colnames", "__colindex", "__filestring", "__header", "__allowvoid") + "_filename", "_colnames", "_colindex", "_varsline", "_format", + "_delimiter", "_skiprows", "__url", "__filestring", "__header", + "__allowvoid") def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): return False # @@ -566,62 +653,40 @@ class ImportFromFile(object): - AllowVoidNameList : permet, si la liste de noms est vide, de prendre par défaut toutes les colonnes """ - if Filename is None: - raise ValueError("The name of the file, containing the variables to be read, has to be specified.") - if not os.path.isfile(Filename): - raise ValueError("The file, containing the variables to be read, doesn't seem to exist. The given file name is:\n \"%s\""%str(Filename)) - self._filename = os.path.abspath(Filename) + self.__url = ImportDetector( Filename, Format) + self.__url.raise_error_if_not_local_file() + self._filename = self.__url.get_absolute_name() # - self.__header, self._varsline, self._skiprows = self.__getentete(self._filename) + self._format = self.__url.get_comprehensive_mime() # - self._delimiter = None - self.__filestring = "".join(self.__header) - if Format.upper() == "GUESS": - if self._filename.split(".")[-1].lower() == "npy": - self._format = "NPY" - elif self._filename.split(".")[-1].lower() == "npz": - self._format = "NPZ" - elif self.__filestring.count(",") > 1 and self._filename.split(".")[-1].lower() == "csv": - self._format = "CSV" - self._delimiter = "," - elif self.__filestring.count(";") > 1 and self._filename.split(".")[-1].lower() == "csv": - self._format = "CSV" - self._delimiter = ";" - elif self.__filestring.count("\t") > 1 and self._filename.split(".")[-1].lower() == "tsv": - self._format = "TSV" - self._delimiter = "\t" - elif self.__filestring.count(" ") > 1 and self._filename.split(".")[-1].lower() == "txt": - self._format = "TXT" - else: - raise ValueError("Can not guess the file format, please specify the good one") - elif Format.upper() == "CSV" and self._delimiter is None: - if self.__filestring.count(",") > 1 and self._filename.split(".")[-1].lower() == "csv": - self._format = "CSV" + self.__header, self._varsline, self._skiprows = self.__getentete() + # + if self._format == "text/csv" or Format.upper() == "CSV": + self.__filestring = "".join(self.__header) + if self.__filestring.count(",") > 1: self._delimiter = "," - elif self.__filestring.count(";") > 1 and self._filename.split(".")[-1].lower() == "csv": - self._format = "CSV" + elif self.__filestring.count(";") > 1: self._delimiter = ";" - elif Format.upper() == "TSV" and self._delimiter is None: - self._format = "TSV" + elif self._format == "text/tab-separated-values" or Format.upper() == "TSV": self._delimiter = "\t" else: - self._format = str(Format).upper() + self._delimiter = None # - if ColNames is not None: self.__colnames = tuple(ColNames) - else: self.__colnames = None + if ColNames is not None: self._colnames = tuple(ColNames) + else: self._colnames = None # - if ColIndex is not None: self.__colindex = str(ColIndex) - else: self.__colindex = None + if ColIndex is not None: self._colindex = str(ColIndex) + else: self._colindex = None # self.__allowvoid = bool(AllowVoidNameList) - def __getentete(self, __filename, __nblines = 3): + def __getentete(self, __nblines = 3): "Lit l'entête du fichier pour trouver la définition des variables" __header, __varsline, __skiprows = [], "", 1 - if __filename.split(".")[-1].lower() in ("npy", "npz"): + if self._format in ("application/numpy.npy", "application/numpy.npz"): pass else: - with open(__filename,'r') as fid: + with open(self._filename,'r') as fid: __line = fid.readline().strip() while "#" in __line or len(__line) < 1: __header.append(__line) @@ -667,18 +732,18 @@ class ImportFromFile(object): def getvalue(self, ColNames=None, ColIndex=None ): "Renvoie la ou les variables demandees par la liste de leurs noms" # Uniquement si mise à jour - if ColNames is not None: self.__colnames = tuple(ColNames) - if ColIndex is not None: self.__colindex = str(ColIndex) + if ColNames is not None: self._colnames = tuple(ColNames) + if ColIndex is not None: self._colindex = str(ColIndex) # __index = None - if self._format == "NPY": + if self._format == "application/numpy.npy": __columns = numpy.load(self._filename) - elif self._format == "NPZ": + elif self._format == "application/numpy.npz": __columns = None with numpy.load(self._filename) as __allcolumns: - if self.__colnames is None: - self.__colnames = __allcolumns.files - for nom in self.__colnames: + if self._colnames is None: + self._colnames = __allcolumns.files + for nom in self._colnames: if nom in __allcolumns.files: if __columns is not None: # Attention : toutes les variables doivent avoir la même taille @@ -686,27 +751,27 @@ class ImportFromFile(object): else: # Première colonne __columns = numpy.reshape(__allcolumns[nom], (1,-1)) - if self.__colindex is not None and self.__colindex in __allcolumns.files: - __index = numpy.array(numpy.reshape(__allcolumns[self.__colindex], (1,-1)), dtype=bytes) - elif self._format == "TXT": - __usecols, __useindex = self.__getindices(self.__colnames, self.__colindex) + if self._colindex is not None and self._colindex in __allcolumns.files: + __index = numpy.array(numpy.reshape(__allcolumns[self._colindex], (1,-1)), dtype=bytes) + elif self._format == "text/plain": + __usecols, __useindex = self.__getindices(self._colnames, self._colindex) __columns = numpy.loadtxt(self._filename, usecols = __usecols, skiprows=self._skiprows) if __useindex is not None: __index = numpy.loadtxt(self._filename, dtype = bytes, usecols = (__useindex,), skiprows=self._skiprows) # - elif self._format == "CSV": - __usecols, __useindex = self.__getindices(self.__colnames, self.__colindex, self._delimiter) + elif self._format == "text/csv": + __usecols, __useindex = self.__getindices(self._colnames, self._colindex, self._delimiter) __columns = numpy.loadtxt(self._filename, usecols = __usecols, delimiter = self._delimiter, skiprows=self._skiprows) if __useindex is not None: __index = numpy.loadtxt(self._filename, dtype = bytes, usecols = (__useindex,), delimiter = self._delimiter, skiprows=self._skiprows) # - elif self._format == "TSV": - __usecols, __useindex = self.__getindices(self.__colnames, self.__colindex, self._delimiter) + elif self._format == "text/tab-separated-values": + __usecols, __useindex = self.__getindices(self._colnames, self._colindex, self._delimiter) __columns = numpy.loadtxt(self._filename, usecols = __usecols, delimiter = self._delimiter, skiprows=self._skiprows) if __useindex is not None: __index = numpy.loadtxt(self._filename, dtype = bytes, usecols = (__useindex,), delimiter = self._delimiter, skiprows=self._skiprows) else: - raise ValueError("Unkown file format %s"%self._format) + raise ValueError("Unkown file format \"%s\""%self._format) # def toString(value): try: @@ -716,7 +781,7 @@ class ImportFromFile(object): if __index is not None: __index = tuple([toString(v) for v in __index]) # - return (self.__colnames, __columns, self.__colindex, __index) + return (self._colnames, __columns, self._colindex, __index) def getstring(self): "Renvoie le fichier complet" @@ -739,7 +804,7 @@ class ImportScalarLinesFromFile(ImportFromFile): # def __init__(self, Filename=None, ColNames=None, ColIndex=None, Format="Guess"): ImportFromFile.__init__(self, Filename, ColNames, ColIndex, Format) - if self._format not in ["TXT", "CSV", "TSV"]: + if self._format not in ["text/plain", "text/csv", "text/tab-separated-values"]: raise ValueError("Unkown file format \"%s\""%self._format) # def getvalue(self, VarNames = None, HeaderNames=()): @@ -779,9 +844,9 @@ class ImportScalarLinesFromFile(ImportFromFile): else: raise ValueError("Can not find names of columns for initial values. Wrong first line is:\n \"%s\""%__firstline) # - if self._format == "TXT": + if self._format == "text/plain": __content = numpy.loadtxt(self._filename, dtype = __dtypes, usecols = __usecols, skiprows = self._skiprows, converters = __converters) - elif self._format in ["CSV", "TSV"]: + elif self._format in ["text/csv", "text/tab-separated-values"]: __content = numpy.loadtxt(self._filename, dtype = __dtypes, usecols = __usecols, skiprows = self._skiprows, converters = __converters, delimiter = self._delimiter) else: raise ValueError("Unkown file format \"%s\""%self._format) -- 2.39.2