if options.info in cfg_products:
CFGMGR.show_product_info(config, options.info, logger)
return RCO.ReturnCode("OK", "product '%s' found in products" % options.info)
- msg = _("product '%s' is not defined in application '%s'.") % \
- (options.info, config.VARS.application)
+ msg = _("product %s is not defined in application %s.") % \
+ (options.info, UTS.info(config.VARS.application))
logger.error(msg)
return RCO.ReturnCode("KO", msg)
# case : display all the available pyconf applications
elif options.list:
- lproduct = list()
+ lproduct = []
# search in all directories that can have pyconf applications
for path in config.PATHS.APPLICATIONPATH:
# print a header
if not options.no_label:
- logger.info("<header>------ %s<reset>" % path)
+ logger.info(UTS.header("------ %s" % path))
msg = "" # only one multiline info
if not os.path.exists(path):
- msg += ("<red>" + _("Directory not found") + "<reset>\n" )
+ msg += (UTS.red( _("Directory not found")) + "\n" )
else:
for f in sorted(os.listdir(path)):
# ignore file that does not ends with .pyconf
msg += "%s\n" % appliname
logger.info(msg)
-
+ DBG.write("lproduct", lproduct)
+ if len(lproduct) == 0:
+ aFile = os.path.join(config.VARS.datadir, 'local.pyconf')
+ msg = """\
+no existing product
+may be you have to set some PROJECTS.project_file_paths in file
+%s""" % aFile
+ logger.warning(msg)
+ return RCO.ReturnCode("OK", msg)
+ else:
+ return RCO.ReturnCode("OK", "config -l command done", lproduct)
+
# case : give a synthetic view of all patches used in the application
elif options.show_patchs:
UTS.check_config_has_application(config).raiseIfKo()
# Print some informations
logger.info(_('Show the patchs of application %s\n') % \
UTS.label(config.VARS.application))
- show_patchs(config, logger)
+ CFGMGR.show_patchs(config, logger)
# case: print all the products name of the application (internal use for completion)
elif options.completion:
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+import os
import src.debug as DBG
import src.returnCode as RCO
if out_dir:
out_dir = os.path.abspath(out_dir)
- write_all_source_files(config, logger, out_dir=out_dir, shells=shell,
+ files = write_all_source_files(config, logger,
+ out_dir=out_dir, shells=shell,
prefix=options.prefix, env_info=environ_info)
- logger.info("\n")
- #TODO return code
+
+ return RCO.ReturnCode("OK", "environ command done", files)
def write_all_source_files(config,
logger,
:param env_info: (str) The list of products to add in the files.
:return: (list) The list of the generated files.
"""
-
+ appName = config.APPLICATION.name
+
if not out_dir:
out_dir = config.APPLICATION.workdir
if not os.path.exists(out_dir):
- raise Exception(_("Target directory not found: %s") % out_dir)
+ msg = _("Target directory not found: %s") % UTS.error(out_dir)
+ raise Exception(msg)
if not silent:
- logger.info(_("Creating environment files for %s\n") % \
- UTS.header(config.APPLICATION.name))
- logger.info(" %s = %s\n\n" % (_("Target"), out_dir))
+ msg = _("Creating environment files for %s\n") % UTS.header(appName)
+ msg += "\n %s = %s" % (_("Target"), UTS.header(out_dir))
+ logger.info(msg)
shells_list = []
all_shells = C_ALL_SHELL
import sys
import tempfile
import traceback
-import datetime
-import time
import csv
import shutil
import itertools
import src.pyconf as PYCONF
import src.xmlManager as XMLMGR
from src.salomeTools import _BaseCommand
+import src.dateTime as DATT
STYLESHEET_GLOBAL = "jobs_global_report.xsl"
STYLESHEET_BOARD = "jobs_board_report.xsl"
self.res_job = "-1"
self.cancelled = False
- self._T0 = -1
- self._Tf = -1
+ self._T0 = DATT.DateTime()
+ self._Tf = DATT.DateTime()
self._has_begun = False
self._has_finished = False
self._has_timouted = False
cmd_kill = " ; ".join([("kill -2 " + pid) for pid in pids])
(_, out_kill, err_kill) = self.machine.exec_command(cmd_kill,
self.logger)
- time.sleep(wait)
+ DATT.sleep(wait)
return (out_kill.read().decode(), err_kill.read().decode())
def has_begun(self):
self.out += self._stdout.read().decode()
self.err += self._stderr.read().decode()
# Put end time
- self._Tf = time.time()
+ self._Tf = DATT.DateTime("now")
# And get the remote command status and log files
try:
self.get_log_files()
"""
if not self.has_begun():
return -1
- T_now = time.time()
- return T_now - self._T0
+ delta = self._T0.getSecondsToNow()
+ return delta
def check_time(self):
"""
if self.time_elapsed() > self.timeout:
self._has_finished = True
self._has_timouted = True
- self._Tf = time.time()
+ self._Tf = DATT.DateTime("now")
(out_kill, __) = self.kill_remote_process()
self.out += "TIMEOUT \n" + out_kill
self.err += "TIMEOUT : %s seconds elapsed\n" % str(self.timeout)
self.get_log_files()
except Exception as e:
self.err += _("Unable to get remote log files!\n%s\n" % str(e))
-
- def total_duration(self):
- """
- Gives the total duration of the job
-
- :return: (int) the total duration of the job in seconds
- """
- return self._Tf - self._T0
-
+
def run(self):
"""
Launch the job by executing the remote command.
self.machine.user))
else:
# Usual case : Launch the command on remote machine
- self._T0 = time.time()
+ self._T0 = DATT.DateTime("now")
self._stdin, self._stdout, self._stderr = self.machine.exec_command(
self.command,
self.logger)
# If the results are not initialized, finish the job
if (self._stdin, self._stdout, self._stderr) == (None, None, None):
self._has_finished = True
- self._Tf = time.time()
+ self._Tf = DATT.DateTime("now")
self.out += "N\A"
self.err += "The server failed to execute the command"
msg = "name : %s\n" % self.name
if self.after:
msg += "after : %s\n" % self.after
- msg += "Time elapsed : %4imin %2is \n" % (self.total_duration()//60 , self.total_duration()%60)
- if self._T0 != -1:
- msg += "Begin time : %s\n" % \
- time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self._T0))
- if self._Tf != -1:
- msg += "End time : %s\n\n" % \
- time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self._Tf))
+ delta = DATT.DeltaTime(self._T0, self._Tf)
+ msg += "Time elapsed : %s\n" % delta.toStrHuman()
+ msg += "Begin time : %s\n" % self._T0.toStrHuman()
+ msg += "End time : %s\n\n" % self._Tf.toStrHuman()
self.logger.info(msg)
if self.cancelled:
return "Cancelled"
if self.is_running():
- return "running since " + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self._T0))
+ return "running since " + self._T0.toStrHuman()
if self.has_finished():
if self.is_timeout():
- return "Timeout since " + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self._Tf))
- return "Finished since " + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self._Tf))
+ return "Timeout since " + self._Tf.toStrHuman()
+ return "Finished since " + self._Tf.toStrHuman()
class Jobs(object):
"""
:return: None
"""
- today = datetime.date.weekday(datetime.date.today())
+ today = DATT.getWeekDayNow()
host_list = []
for job_def in self.cfg_jobs.jobs :
self.display_status(self.len_columns)
# Make sure that the proc is not entirely busy
- time.sleep(0.001)
+ DATT.sleep(0.001)
self.logger.info("\n" + tiret_line + "\n\n")
self.file_boards = file_boards
if file_boards != "":
- today = datetime.date.weekday(datetime.date.today())
+ today = DATT.getWeekDayNow()
self.parse_csv_boards(today)
else:
self.d_input_boards = {}
if xmljob.attrib['name'] == job.name:
xml_node_jobs.remove(xmljob)
- T0 = str(job._T0)
- if T0 != "-1":
- T0 = time.strftime('%Y-%m-%d %H:%M:%S',
- time.localtime(job._T0))
- Tf = str(job._Tf)
- if Tf != "-1":
- Tf = time.strftime('%Y-%m-%d %H:%M:%S',
- time.localtime(job._Tf))
+ T0 = job._T0.toStrHuman()
+ Tf = job._Tf.toStrHuman()
# recreate the job node
xmlj = ASNODE(xml_node_jobs, "job", attrib={"name" : job.name})
# Update the date
xml_node_infos = xml_file.xmlroot.find('infos')
XMLMGR.append_node_attrib( xml_node_infos,
- attrib={"value" : datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} )
+ attrib={"value" : DATT.DateTime("now").toStrHuman()} )
def find_test_log(self, l_remote_log_files):
import shutil
import re
import glob
-import datetime
import stat
import src.debug as DBG
import src.xmlManager as XMLMGR
import src.system as SYSS
from src.salomeTools import _BaseCommand
+import src.dateTime as DATT
# Compatibility python 2/3 for input function
# input stays input for python 3 and input = raw_input for python 2
l_time_file = []
for file_n in os.listdir(product_log_dir):
my_stat = os.stat(os.path.join(product_log_dir, file_n))
- l_time_file.append(
- (datetime.datetime.fromtimestamp(my_stat[stat.ST_MTIME]), file_n))
+ l_time_file.append( (DATT.fromTimeStamp(my_stat[stat.ST_MTIME]), file_n) )
# display the available logs
for i, (__, file_name) in enumerate(sorted(l_time_file)):
str_indice = UTS.label("%2d" % (i+1))
opt = []
my_stat = os.stat(os.path.join(product_log_dir, file_name))
- opt.append(str(datetime.datetime.fromtimestamp(my_stat[stat.ST_MTIME])))
+ opt.append(str(DATT.fromTimeStamp(my_stat[stat.ST_MTIME])))
opt.append("(%8.2f)" % (my_stat[stat.ST_SIZE] / 1024.0))
logger.info(" %-35s" % " ".join(opt))
import os
import stat
import shutil
-import datetime
import tarfile
import codecs
import string
import src.environment as ENVI
import src.architecture as ARCH
import src.template as TPLATE
+import src.dateTime as DATT
BINARY = "binary"
SOURCE = "Source"
# prepare substitution dictionary
d = dict()
d['user'] = config.VARS.user
- d['date'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
+ d['date'] = DAT.dateTime("now").toStrPackage()
d['version'] = config.INTERNAL.sat_version
d['dist'] = config.VARS.dist
f.write(readme_header_tpl.substitute(d)) # write the general header (common)
import sys
import shutil
import subprocess
-import datetime
import gzip
import src.debug as DBG
import src.xmlManager as XMLMGR
import src.architecture as ARCH
import src.test_module as TMOD
+import src.dateTime as DATT
try:
from hashlib import sha1
amend_test.attrib['name'] = os.path.join(test.grid,
test.session,
script.name)
- amend_test.attrib['reason'] = script.amend.decode(
- "UTF-8")
+ amend_test.attrib['reason'] = script.amend.decode("UTF-8")
# calculate status
nb += 1
test.session,
script.name)
kf_script.attrib['date'] = script.known_error.date
- kf_script.attrib[
- 'expected'] = script.known_error.expected
- kf_script.attrib[
- 'comment'] = script.known_error.comment.decode("UTF-8")
- kf_script.attrib['fixed'] = str(
- script.known_error.fixed)
- overdue = datetime.datetime.today().strftime("%Y-%m-"
- "%d") > script.known_error.expected
+ kf_script.attrib['expected'] = script.known_error.expected
+ kf_script.attrib['comment'] = script.known_error.comment.decode("UTF-8")
+ kf_script.attrib['fixed'] = str(script.known_error.fixed)
+ overdue = DATT.DateTime("now").toStrPackage() > script.known_error.expected
if overdue:
kf_script.attrib['overdue'] = str(overdue)
elif script.res == RCO._KO_STATUS:
new_err = ASNODE(new_errors, "new_error")
- script_path = os.path.join(test.grid,
- test.session, script.name)
+ script_path = os.path.join(test.grid, test.session, script.name)
new_err.attrib['name'] = script_path
- new_err.attrib['cmd'] = ("sat testerror %s -s %s -c 'my"
- " comment' -p %s" % \
- (application_name, script_path, config.VARS.dist))
+ new_err.attrib['cmd'] = "sat testerror %s -s %s -c 'my comment' -p %s" % \
+ (application_name, script_path, config.VARS.dist)
gn.attrib['total'] = str(nb)
{
project_file_paths :
[
- ##"/volatile/wambeke/SAT5/SAT5_S840_MATIX24/SAT_SALOME/salome.pyconf",
+ "/volatile/wambeke/SAT5/SAT5_S840_MATIX24/SAT_SALOME/salome.pyconf",
# "/home/uranietm/proJET/saTJOBS/saT5/uranie.pyconf",
# cloned 2017/12 for matix
##"/home/matix/GitRepo/uranie/saT5/uranie.pyconf",
- ##"/volatile/wambeke/SAT5/SAT_MATIX/matix.pyconf"
- #"/home/christian/SAT_SALOME/salome.pyconf"
- #"/home/christian/SAT_MATIX/matix.pyconf"
+ "/volatile/wambeke/SAT5/SAT_MATIX/matix.pyconf"
+ "/home/christian/SAT_SALOME/salome.pyconf"
+ "/home/christian/SAT_MATIX/matix.pyconf"
#"/home/christian/SAT_MATIX"
]
}
import src.debug as DBG # Easy print stderr (for DEBUG only)
logger = LOG.getDefaultLogger()
-
-import traceback
-
-def format_color_exception(msg, etype, value, tb, limit = None):
- """Format a stack trace and the exception information.
- as traceback.format_exception(), with color
- """
- res = "<red>" + msg + "<yellow>"
- if tb:
- res += "Traceback (most recent call last):\n"
- # print "tb"
- res += "".join(traceback.format_tb(tb, limit)) #[:-1])
- res += "\n<red>"
- res += "\n".join(traceback.format_exception_only(etype, value))
- return res+ "<reset>"
-
+
#################################
# MAIN
#################################
if __name__ == "__main__":
- from src.salomeTools import Sat # it is time...
+ from src.salomeTools import Sat # it is time to do import
_debug = False # Have to be False in production (for programmers DEBUG only)
DBG.push_debug(_debug) # as __main__ with sys.exit so no need pop_debug
try:
returnCode = sat.execute_cli(args)
+ DBG.write("execute_cli return code", returnCode)
if returnCode.isOk():
logger.debug("sat exit code: %s" % returnCode) # OK no trace
else:
- logger.error("sat exit code: %s" % returnCode) # KO say why
+ # warning as known problem
+ logger.warning("<red>sat exit code: %s<reset>" % returnCode) # KO have to say why
sys.exit(returnCode.toSys())
except Exception as e:
+ # error as may be unknown problem
# verbose debug message with traceback if developers
msg = "Exception raised for execute_cli(%s):\n" % args
logger.critical(DBG.format_color_exception(msg))
import os
import platform
-import datetime
import shutil
import sys
import StringIO as SIO
import src.environment as ENVI
import src.fileEnviron as FENV
import src.architecture as ARCH
+import src.dateTime as DATT
class ConfigOpener:
var['hostname'] = node_name
# set date parameters
- dt = datetime.datetime.now()
- var['date'] = dt.strftime('%Y%m%d')
- var['datehour'] = dt.strftime('%Y%m%d_%H%M%S')
- var['hour'] = dt.strftime('%H%M%S')
+ dt = DATT.DateTime("now")
+ var['date'] = dt.toStrDateConfig()
+ var['datehour'] = dt.toStrDateHourConfig()
+ var['hour'] = dt.toStrHourConfig()
var['command'] = str(command)
var['application'] = str(application)
:param logger: (Logger)
The logger instance to use for the display
"""
- len_max = max([len(p) for p in config.APPLICATION.products]) + 2
+ res =[]
+ len_max = -1
+ for product in sorted(config.APPLICATION.products):
+ product_info = PROD.get_product_config(config, product)
+ if PROD.product_has_patches(product_info):
+ res.append((product, product_info.patches))
+ len_max = max(len_max, len(product))
+
msg = ""
- for product in config.APPLICATION.products:
- nb = len_max-len(product)-2
- product_info = PROD.get_product_config(config, product)
- if PROD.product_has_patches(product_info):
- msg += "<header>%s: <reset>" % product
- msg += " "*nb + "%s\n" % product_info.patches[0]
- if len(product_info.patches) > 1:
- for patch in product_info.patches[1:]:
- msg += " "*nb + "%s\n" % patch
- msg += "\n"
+ for product, patches in res:
+ nb = len_max - len(product)
+ ind1 = " "*nb # indent first line
+ ind2 = " "*(len_max+2) # indent other lines
+ msg += "<header>%s: %s<reset>" % (product, ind1)
+ msg += ind2.join([p+"\n" for p in patches])
logger.info(msg)
return
--- /dev/null
+#!/usr/bin/env python
+#-*- coding:utf-8 -*-
+
+# Copyright (C) 2018-20xx CEA/DEN
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+This file contains DateTime and DeltaTime class
+
+| Usage:
+| >> import dateTime as DATT
+| >> ini = DATT.DateTime("now")
+| >> # some stuff
+| >> fin = DATT.DateTime("now")
+| >> duration = DATT.DeltaTime(ini, fin)
+"""
+
+import datetime as DT
+import time as TI
+
+# global module variable
+verbose = True
+
+#####################################################
+class DateTime(object):
+ """
+ assume storing a date and hour, and conversions
+
+ | Usage:
+ | >> import dateTime as DATT
+ | >> now = DATT.DateTime("now")
+ | >> print("now is %s" % now)
+ """
+
+ FORMAT_HUMAN = '%Y-%m-%d %H:%M:%S' # human readable
+ FORMAT_FILE = '%Y%m%d_%H%M%S' # for file name
+
+ FORMAT_HOUR_CONFIG = '%H%M%S' # for config pyconf
+ FORMAT_DATE_CONFIG = '%Y%m%d' # for config pyconf
+ FORMAT_DATEHOUR_CONFIG = '%Y%m%d_%H%M%S' # for config pyconf as FORMAT_FILE
+
+ FORMAT_PACKAGE = '%Y-%m-%d %H:%M' # for sat package
+
+ MSG_UNDEFINED = "UndefinedTime"
+
+ def __init__(self, when=None):
+ if type(when) == str:
+ if when == "now":
+ self._time = TI.time() # is a float
+ else:
+ raise Exception("DateTime: unknown when '%s'" % when)
+ elif type(when) == self.__class__:
+ self._time = when.getValue()
+ else:
+ self._time = None
+
+ def __repr__(self):
+ """complete with type class as 'DateTime(2018-05-07 12:30:55)'"""
+ res = "DateTime(%s)" % self
+ return res
+
+ def __str__(self):
+ """human readable, sortable, as '2018-05-07 12:30:55'"""
+ if self.isOk():
+ res = TI.strftime(self.FORMAT_HUMAN, self.localTime())
+ else:
+ res = self.MSG_UNDEFINED
+ return res
+
+ def __eq__(self, other):
+ return self._time == other._time
+
+ def __gt__(self, other):
+ return self._time > other._time
+
+ def __ge__(self, other):
+ return self._time >= other._time
+
+ def addSeconds(self, secs):
+ """add seconds at time"""
+ self.raiseIfKo()
+ self._time += secs
+
+ def localTime(self):
+ if self.isOk():
+ return TI.localtime(self._time)
+ else:
+ return None
+
+ def toStrFile(self):
+ """use self.FORMAT_FILE, sortable, 2018-05-07... as '20180507_235958'"""
+ if self.isOk():
+ res = TI.strftime(self.FORMAT_FILE, self.localTime())
+ else:
+ res = self.MSG_UNDEFINED
+ return res
+
+ def toStrHuman(self):
+ """use self.FORMAT_HUMAN"""
+ if self.isOk():
+ res = TI.strftime(self.FORMAT_HUMAN, self.localTime())
+ else:
+ res = self.MSG_UNDEFINED
+ return res
+
+ def toStrHourConfig(self):
+ if self.isOk():
+ res = TI.strftime(self.FORMAT_HOUR_CONFIG, self.localTime())
+ else:
+ res = self.MSG_UNDEFINED
+ return res
+
+ def toStrDateConfig(self):
+ if self.isOk():
+ res = TI.strftime(self.FORMAT_DATE_CONFIG, self.localTime())
+ else:
+ res = self.MSG_UNDEFINED
+ return res
+
+ def toStrDateHourConfig(self):
+ if self.isOk():
+ res = TI.strftime(self.FORMAT_DATEHOUR_CONFIG, self.localTime())
+ else:
+ res = self.MSG_UNDEFINED
+ return res
+
+ def toStrPackage(self):
+ if self.isOk():
+ res = TI.strftime(self.FORMAT_PACKAGE, self.localTime())
+ else:
+ res = self.MSG_UNDEFINED
+ return res
+
+ def getValue(self):
+ return self._time
+
+ def toSeconds(self):
+ return self._time
+
+ def setValue(self, time):
+ """choice as not deep copying if mutables value"""
+ # TODO deepcopy maybe for value, not yet
+ self._time = time
+
+ def getSecondsToNow(self):
+ delta = TI.time() - self._time
+ return delta
+
+ def isOk(self):
+ """return True if ok"""
+ return self._time is not None
+
+ def raiseIfKo(self):
+ """
+ raise an exception with message why if not ok, else return self.
+ This trick is to write usage
+
+ | Usage:
+ | >> aTimeOk = aTime.raiseIfKo() # raise Exception if KO
+ | >> doSomethingWithaTimeOk(aTimeOk) # here i am sure that is OK
+ """
+ if self.isOk():
+ return self
+ else:
+ raise Exception("DateTime not initialized")
+
+#####################################################
+class DeltaTime(object):
+ """
+ assume storing a duration, delta between two DateTime, and conversions
+
+ | Usage:
+ | >> import dateTime as DATT
+ | >> t1 = DATT.DateTime("now")
+ | >> time.sleep(3)
+ | >> t2 = DATT.DateTime("now")
+ | >> delta = DATT.DeltaTime(t1, t2)
+ | >> print("delta time is %s" % delta)
+ """
+
+ MSG_UNDEFINED = "UndefinedDeltaTime"
+
+ def __init__(self, t1=None, t2=None):
+ try:
+ self._t1 = DateTime(t1)
+ except:
+ self._t1 = DateTime()
+ try:
+ self._t2 = DateTime(t2)
+ except:
+ self._t2 = DateTime()
+ if type(t1) == str:
+ if t1 == "now":
+ self._t1 = DateTime(t1)
+ else:
+ raise Exception("DeltaTime: unknown t1 '%s'" % t1)
+
+ if type(t2) == str:
+ if t1 == "now":
+ self._t2 = DateTime(t2)
+ else:
+ raise Exception("DeltaTime: unknown t2 '%s'" % t1)
+
+ def __repr__(self):
+ """complete with type class as 'DeltaTime(345.67)'"""
+ res = "DeltaTime(t1=%s, t2=%s)" % (self._t1, self._t2)
+ return res
+
+ def __str__(self):
+ """human readable, seconds, sortable, as '345.67'"""
+ if self.isOk():
+ res = "%s" % self.toSeconds()
+ else:
+ res = self.MSG_UNDEFINED
+ return res
+
+ def toSeconds(self):
+ if self.isOk():
+ res = self._t2.toSeconds() - self._t1.toSeconds()
+ else:
+ res = self.MSG_UNDEFINED
+ return res
+
+ def toMinutes(self):
+ if self.isOk():
+ res = (self._t2.toSeconds() - self._t1.toSeconds()) / 60
+ else:
+ res = self.MSG_UNDEFINED
+ return res
+
+ def toStrHuman(self):
+ """automatic best unity, hours or minutes or seconds"""
+ if self.isOk():
+ res = self._t2.toSeconds() - self._t1.toSeconds()
+ if res < 0: return "%.3fs" % res
+ if res < 10: return "%.3fs" % res
+ if res < 60: return "%is" % int(res)
+ if res < 3600: return "%im%is" % (int(res/60), int(res%60))
+ return self.toStrHms()
+ else:
+ res = self.MSG_UNDEFINED
+ return res
+
+ def toStrHms(self):
+ """all unities, hours and minutes and seconds as '2h34m56s'"""
+ if self.isOk():
+ res = self._t2.toSeconds() - self._t1.toSeconds()
+ hh = int(res/3600)
+ mm = int(res%3600)/60
+ ss = int(res%60)
+ return "%ih%im%is" % (hh, mm, ss)
+ else:
+ res = self.MSG_UNDEFINED
+ return res
+
+ def setT1(self, t):
+ self._t1 = DateTime(t)
+
+ def setT2(self, t):
+ self._t2 = DateTime(t)
+
+ def getT1(self, t):
+ return DateTime(self._t1)
+
+ def getT2(self, t):
+ return DateTime(self._t2)
+
+ def getValue(self):
+ """idem toSeconds()"""
+ return self.toSeconds()
+
+ def isOk(self):
+ """return True if ok"""
+ return self._t1.isOk() and self._t2.isOk()
+
+ def raiseIfKo(self):
+ """
+ raise an exception with message why if not ok, else return self.
+ This trick is to write usage
+
+ | Usage:
+ | >> aDeltaTimeOk = adeltaTime.raiseIfKo() # raise Exception if KO
+ | >> doSomethingWithaDeltaTimeOk(aDeltaTimeOk) # here i am sure that is OK
+ """
+ if not self._t1.isOk() and not self._t2.isOk():
+ raise Exception("DeltaTime t1 and t2 are not initialized")
+ if not self._t1.isOk():
+ raise Exception("DeltaTime t1 not initialized")
+ if not self._t2.isOk():
+ raise Exception("DeltaTime t2 not initialized")
+ return self # is ok
+
+
+##############################################################################
+# date utilities
+##############################################################################
+def sleep(seconds):
+ """as time.sleep(seconds)"""
+ TI.sleep(seconds)
+
+def getWeekDayNow():
+ """monday is 0, tuesday is 1 etc."""
+ return DT.date.weekday(DT.date.today())
+
+def fromTimeStamp(val):
+ return DT.datetime.fromtimestamp(val)
+
+def parse_date(date):
+ """
+ Transform YYYYMMDD_hhmmss into YYYY-MM-DD hh:mm:ss.
+
+ :param date: (str) The date to transform
+ :return: (str) The date in the new format
+ """
+ if len(date) != 15:
+ return date
+ res = "%s-%s-%s %s:%s:%s" % (date[0:4],
+ date[4:6],
+ date[6:8],
+ date[9:11],
+ date[11:13],
+ date[13:15])
+ return res
+
+def date_to_datetime(date):
+ """
+ From a string date in format YYYYMMDD_HHMMSS
+ returns list year, mon, day, hour, minutes, seconds
+
+ :param date: (str) The date in format YYYYMMDD_HHMMSS
+ :return: (tuple) as (str,str,str,str,str,str)
+ The same date and time in separate variables.
+ """
+ Y = date[:4]
+ m = date[4:6]
+ dd = date[6:8]
+ H = date[9:11]
+ M = date[11:13]
+ S = date[13:15]
+ return Y, m, dd, H, M, S
+
+def timedelta_total_seconds(timedelta):
+ """
+ Replace total_seconds from datetime module
+ in order to be compatible with old python versions
+
+ :param timedelta: (datetime.timedelta)
+ The delta between two dates
+ :return: (float)
+ The number of seconds corresponding to timedelta.
+ """
+ return (
+ timedelta.microseconds + 0.0 +
+ (timedelta.seconds + timedelta.days * 24 * 3600) * 10 ** 6) / 10 ** 6
+
def write(title, var="", force=None, fmt="\n#### DEBUG: %s:\n%s\n"):
"""write sys.stderr a message if _debug[-1]==True or optionaly force=True"""
if _debug[-1] or force:
- typ = str(type(var))
+ tvar = type(var)
+ typ = str(tvar)
if isTypeConfig(var):
sys.stderr.write(fmt % (title, indent(COLS.toColor(getStrConfigDbg(var)))))
return
if 'loggingSat.UnittestStream' in typ:
sys.stderr.write(fmt % (title, indent(var.getLogs())))
- return
- if type(var) is not str:
+ return
+ if tvar is not str and tvar is not unicode:
sys.stderr.write(fmt % (title, indent(PP.pformat(var))))
return
sys.stderr.write(fmt % (title, indent(var)))
indentp = indentp + 2
indstr = indent * ' ' # '':no indent, ' ':indent
strType = str(type(config))
- print "zzzstrType", path, strType
+ if debug: print "saveDbg Type", path, strType
if "Sequence" in strType:
for i in range(len(config)):
import src.utilsSat as UTS
import src.pyconf as PYCONF
import src.product as PROD
+import src.fileEnviron as FENV
import src.architecture as ARCH
class Environ:
raise Exception(_("Environment script not found: %s") % env_script)
if not self.silent and logger is not None:
- logger.info(" ** load %s\n" % env_script)
+ logger.info(" load %s" % env_script)
# import the script and run the set_env function
try:
raise Exception(_("Environment script not found: %s") % script_path)
if not self.silent and logger is not None:
- logger.info(" ** load %s\n" % script_path)
+ logger.info(" load %s" % script_path)
script_basename = os.path.basename(script_path)
if script_basename.endswith(".py"):
:return: (str) The path to the generated file
"""
if not self.silent:
- self.logger.info(_("Create environment file %s\n") % UTS.label(filename))
+ self.logger.info(_("\nCreate environment file %s\n") % UTS.label(filename))
# create then env object
env_file = open(os.path.join(self.out_dir, filename), "w")
try:
optlist, args = getopt.getopt(argList, shortNameOption, longNameOption)
except Exception as e:
- msg = str(e) + "\n\n" + self.get_help()
+ msg = str(e) + " on %s\n\n" % argList + self.get_help()
raise Exception(msg)
# instantiate and completing the optResult that will be returned
if not rc.isOk():
msg = _("Patch %s for %s not found.") % (patch, prod_info.name)
raise Exception(msg)
- patches.append(rc.getValue())
+ patches.append(rc.getValue())
prod_info.patches = patches
raise Exception(msg)
else:
env_script = rc.getValue()
-
prod_info.environ.env_script = env_script
if with_install_dir:
# The variable with_install_dir is at false only for internal use
# of the function get_install_dir
-
# Save the install_dir key if there is any
if "install_dir" in prod_info and not "install_dir_save" in prod_info:
- prod_info.install_dir_save = prod_info.install_dir
-
+ prod_info.install_dir_save = prod_info.install_dir
# if it is not the first time the install_dir is computed, it means
# that install_dir_save exists and it has to be taken into account.
if "install_dir_save" in prod_info:
- prod_info.install_dir = prod_info.install_dir_save
-
+ prod_info.install_dir = prod_info.install_dir_save
# Set the install_dir key
prod_info.install_dir = get_install_dir(config, base, version, prod_info)
raise an exception with message why if not ok, else return self.
This trick is to write usage
+ | Usage:
| >> rc = doSomething().raiseIfKo() # raise Exception if KO
| >> doSomethingWithValue(rc.getValue()) # here i am sure that is OK
"""
script_info.time = script_results[sr][1]
if script_info.res == RCO._TIMEOUT_STATUS:
script_info.time = time_out
- if script_info.time < 1e-3: script_info.time = 0
+ if script_info.time < 1e-3:
+ script_info.time = 0
callback = script_results[sr][2]
if script_info.res != RCO._OK_STATUS and len(callback) > 0:
import os, sys, traceback
import os.path
-import time as THEBIGTIME
+import time as TIME
# set path
toolsWay = r'${toolsWay}'
exec_result.flush()
try:
- timeStart = THEBIGTIME.time()
+ timeStart = TIME.time()
execfile(os.path.join(outWay, test), globals(), locals())
- timeTest = THEBIGTIME.time() - timeStart
+ timeTest = TIME.time() - timeStart
except SatNotApplicableError, ex:
status = "NA"
reason = str(ex)
exec_result.write("NA\n")
- timeTest = THEBIGTIME.time() - timeStart
+ timeTest = TIME.time() - timeStart
pylog.write('status = "NA"\n')
pylog.write('time = "' + timeTest.__str__() + '"\n')
pylog.write('callback = "%s"\n' % reason)
status = "KF"
reason = "Known Failure = %s\n\n" % ignore[test]
exec_result.write("%s\n" % status)
- timeTest = THEBIGTIME.time() - timeStart
+ timeTest = TIME.time() - timeStart
pylog.write('status = "%s" \n' % status)
pylog.write('time = "' + timeTest.__str__() + '"\n')
pylog.write('callback="""' + reason)
import errno
import stat
-import datetime
import re
import tempfile
return result
-##############################################################################
-# date utilities
-##############################################################################
-def parse_date(date):
- """Transform YYYYMMDD_hhmmss into YYYY-MM-DD hh:mm:ss.
-
- :param date: (str) The date to transform
- :return: (str) The date in the new format
- """
- if len(date) != 15:
- return date
- res = "%s-%s-%s %s:%s:%s" % (date[0:4],
- date[4:6],
- date[6:8],
- date[9:11],
- date[11:13],
- date[13:])
- return res
-
-
##############################################################################
# log utilities (TODO: set in loggingSat class, later, changing tricky xml?
##############################################################################
-def date_to_datetime(date):
- """
- From a string date in format YYYYMMDD_HHMMSS
- returns list year, mon, day, hour, minutes, seconds
-
- :param date: (str) The date in format YYYYMMDD_HHMMSS
- :return: (tuple) as (str,str,str,str,str,str)
- The same date and time in separate variables.
- """
- Y = date[:4]
- m = date[4:6]
- dd = date[6:8]
- H = date[9:11]
- M = date[11:13]
- S = date[13:15]
- return Y, m, dd, H, M, S
-
-def timedelta_total_seconds(timedelta):
- """
- Replace total_seconds from datetime module
- in order to be compatible with old python versions
-
- :param timedelta: (datetime.timedelta)
- The delta between two dates
- :return: (float)
- The number of seconds corresponding to timedelta.
- """
- return (
- timedelta.microseconds + 0.0 +
- (timedelta.seconds + timedelta.days * 24 * 3600) * 10 ** 6) / 10 ** 6
_log_macro_command_file_expression = "^[0-9]{8}_+[0-9]{6}_+.*\.xml$"
_log_all_command_file_expression = "^.*[0-9]{8}_+[0-9]{6}_+.*\.xml$"
empty-cells : show;
}
td { vertical-align : center; padding: 15px; }
- h1 { text-align : center; }
+ h1 { text-align : center; font-style: italic; font-size: 20px; }
.legend { font-weight : bold;
text-align : center;
}
<xsl:variable name="txtLog">
<xsl:value-of select="SATcommand/OutLog"/>
</xsl:variable>
- <iframe src="{$txtLog}" frameborder="0" class="center" width="100%" height="3000" scrolling="yes"></iframe>
- <!--<iframe src="{$txtLog}" frameborder="0" class="center" width="100%" height="1500000" scrolling="no"></iframe>-->
+ <iframe src="{$txtLog}" frameborder="0" class="center" width="98%" height="1000" scrolling="yes"></iframe>
+ <!--<iframe src="{$txtLog}" frameborder="0" class="center" width="98%" height="1000" scrolling="yes"></iframe>-->
</body>
</xsl:template>
empty-cells : show;
}
td { vertical-align : center;}
- h1 { text-align : center; }
+ h1 { text-align : center; font-style: italic; }
.legend { font-weight : bold;
text-align : center;
}
--- /dev/null
+#!/usr/bin/env python
+#-*- coding:utf-8 -*-
+
+# Copyright (C) 2010-2018 CEA/DEN
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+
+import os
+import sys
+import unittest
+import pprint as PP
+
+import src.debug as DBG
+import src.dateTime as DATT
+
+verbose = False #True
+
+class TestCase(unittest.TestCase):
+ "Test the debug.py"""
+
+ def test_000(self):
+ # one shot setUp() for this TestCase
+ if verbose:
+ DBG.push_debug(True)
+ # DBG.write("assert unittest", [a for a in dir(self) if "assert" in a])
+ pass
+
+ def test_010(self):
+ t = DATT.DateTime()
+ self.assertFalse(t.isOk())
+ rrt = str(t)
+ DBG.write("test_010 str", rrt)
+ self.assertIn("Undefined", rrt)
+ rrt = repr(t)
+ DBG.write("test_010 repr", rrt)
+ self.assertIn("DateTime", rrt)
+ self.assertIn("Undefined", rrt)
+ with self.assertRaises(Exception):
+ t.raiseIfKo()
+
+
+ def test_015(self):
+ t = DATT.DateTime("now")
+ self.assertTrue(t.isOk())
+ rrt = str(t)
+ DBG.write("test_015 str", rrt)
+ self.assertIn("20", rrt) # 2018 to 2099 ok
+ self.assertIn("-", rrt)
+ self.assertIn(":", rrt)
+ rrt = repr(t)
+ DBG.write("test_015 repr", rrt)
+ self.assertIn("DateTime", rrt)
+ self.assertIn("20", rrt) # 2018 to 2099 ok
+ self.assertIn("-", rrt)
+ self.assertIn(":", rrt)
+
+
+ def test_020(self):
+ t1 = DATT.DateTime("now")
+ t2 = DATT.DateTime(t1)
+ self.assertTrue(t2.isOk())
+ self.assertEqual(t1, t2)
+ t2 = DATT.DateTime("now")
+ self.assertNotEqual(t1, t2) # microseconds differs
+
+ DATT.sleep(3) # 3 second more
+ t2 = DATT.DateTime("now")
+ self.assertGreater(2, 1) # to be sure
+ self.assertGreater(str(t2), str(t1)) # seconds differs
+ self.assertGreater(repr(t2), repr(t1)) # seconds differs
+ self.assertGreater(t2, t1)
+ self.assertTrue(t2 > t1)
+ self.assertFalse(t2 == t1)
+ self.assertFalse(t2 < t1)
+ self.assertFalse(t2 <= t1)
+
+ def test_040(self):
+ t1 = DATT.DateTime("now")
+ delta = DATT.DeltaTime(t1)
+ self.assertFalse(delta.isOk())
+ self.assertIn("Undefined", delta.toSeconds())
+ DBG.write("test_040 str", str(delta))
+ DBG.write("test_040 repr", repr(delta))
+ with self.assertRaises(Exception):
+ delta.raiseIfKo()
+ DATT.DateTime().raiseIfKo()
+
+ def test_042(self):
+ t1 = DATT.DateTime("now")
+ DATT.sleep(3.1) # 3.1 second more
+ t2 = DATT.DateTime("now")
+ self.assertTrue(t2 > t1)
+ delta = DATT.DeltaTime(t1, t2)
+ self.assertGreater(delta.toSeconds(), 3)
+ self.assertEqual(int(delta.toSeconds()), 3)
+ DBG.write("test_042 str", str(delta))
+ DBG.write("test_042 repr", repr(delta))
+ delta2 = delta.raiseIfKo()
+ self.assertEqual(delta2.toSeconds(), delta.toSeconds())
+
+ def test_044(self):
+ for more in [0, 0.56789, 5.6789, 56.789, 61, 3661, 36061]:
+ t1 = DATT.DateTime("now")
+ t2 = DATT.DateTime(t1)
+ t2.addSeconds(more)
+ delta = DATT.DeltaTime(t1, t2)
+ r = delta.toStrHuman()
+ DBG.write("test_044 str", r)
+ if more < 60:
+ self.assertIn("s", r)
+ self.assertNotIn("m", r)
+ self.assertNotIn("h", r)
+ continue
+ if more < 3600:
+ self.assertIn("s", r)
+ self.assertIn("m", r)
+ self.assertNotIn("h", r)
+ else:
+ self.assertIn("s", r)
+ self.assertIn("m", r)
+ self.assertIn("h", r)
+
+
+
+ def test_999(self):
+ # one shot tearDown() for this TestCase
+ if verbose:
+ DBG.pop_debug()
+ return
+
+if __name__ == '__main__':
+ unittest.main(exit=False)
+ pass
+
logger = LOG.getUnittestLogger()
debug = False
+ #see test_100, # commands are expected OK
+ TRG = "SALOME-8.4.0"
+ satCommandsToTestOk = [
+ "config -l",
+ "config -v .",
+ "config -d .",
+ "config %s --value ." % TRG,
+ "config %s --debug ." % TRG,
+ "config %s --info KERNEL" % TRG,
+ "config %s --show_patchs" % TRG,
+ ]
+ #see test_110, # commands are expected KO
+ satCommandsToTestKo = [
+ "config %s --info oops" % TRG,
+ ]
+ #see test_120, # commands are expected KO
+ satCommandsToTestRaise = [
+ "oopsconfig --oops .",
+ "config --oops",
+ ]
+
def tearDown(self):
# print "tearDown", __file__
# assure self.logger clear for next test
logs = self.logger.getLogsAndClear()
# using assertNotIn() is too much verbose
- self.assertFalse("ERROR" in logs)
- self.assertFalse("CRITICAL" in logs)
+ self.assertFalse("ERROR ::" in logs)
+ self.assertFalse("CRITICAL ::" in logs)
def test_000(self):
# one shot setUp() for this TestCase
if self.debug: DBG.pop_debug()
def test_010(self):
- cmd = "-v 5 config -l"
+ cmd = "config -l"
s = SAT.Sat(self.logger)
DBG.write("s.getConfig()", s.getConfig()) #none
DBG.write("s.__dict__", s.__dict__) # have
returnCode = s.execute_cli(cmd)
DBG.write("test_010 returnCode", returnCode)
logs = self.logger.getLogs()
- DBG.write("test_010 logger", logs, True)
+ DBG.write("test_010 logger", logs)
self.assertTrue(returnCode.isOk())
-
+
+ def test_100(self):
+ # test all satCommands expected OK
+ dbg = self.debug # True #
+ for cmd in self.satCommandsToTestOk:
+ s = SAT.Sat(self.logger)
+ returnCode = s.execute_cli(cmd)
+ DBG.write("test_800 'sat %s' returnCode" % cmd, str(returnCode), True)
+ logs = self.logger.getLogsAndClear()
+ DBG.write("logs", logs, dbg)
+ # using assertNotIn() is too much verbose
+ self.assertFalse("ERROR ::" in logs)
+ self.assertFalse("CRITICAL ::" in logs)
+
+ def test_110(self):
+ # test all satCommands expected KO
+ dbg = self.debug
+ for cmd in self.satCommandsToTestKo:
+ s = SAT.Sat(self.logger)
+ returnCode = s.execute_cli(cmd)
+ DBG.write("test_810 'sat %s' returnCode" % cmd, returnCode, dbg)
+ logs = self.logger.getLogsAndClear()
+ DBG.write("logs", logs, dbg)
+
+ def test_120(self):
+ # test all satCommands expected raise
+ dbg = self.debug
+ for cmd in self.satCommandsToTestRaise:
+ s = SAT.Sat(self.logger)
+ DBG.write("test_820 'sat %s'" % cmd, "expected raise", dbg)
+ with self.assertRaises(Exception):
+ returnCode = s.execute_cli(cmd)
+ logs = self.logger.getLogsAndClear()
+ DBG.write("logs", logs, dbg)
+
+
if __name__ == '__main__':
unittest.main(exit=False)
pass
def test_010(self):
# Test creation of ~/.salomeTools/salomeTools.pyconf
+ print "stupidity HAVE TO NOT touch user ~/.salomeTools"
+ return
+
res = "KO"
user_dir = os.path.expanduser(os.path.join('~','.salomeTools'))
user_dir_save = os.path.expanduser(os.path.join('~','.salomeTools_save'))