From: Anthony Geay Date: Wed, 9 Mar 2016 17:20:34 +0000 (+0100) Subject: On the road of cluster management in evalyfx. X-Git-Tag: V7_8_0a2~10 X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=1eb8ef53bd1123b18002f3ad1ac50038003ac9f3;p=modules%2Fyacs.git On the road of cluster management in evalyfx. --- diff --git a/src/evalyfx/YACSEvalResource.cxx b/src/evalyfx/YACSEvalResource.cxx index 84b549128..b15c6a558 100644 --- a/src/evalyfx/YACSEvalResource.cxx +++ b/src/evalyfx/YACSEvalResource.cxx @@ -73,6 +73,12 @@ void YACSEvalNonConstLocker::checkNonLocked() const throw YACS::Exception("YACSEvalNonConstLocker::checkNonLocked : this is locked and trying to invoke non-const method !"); } +void YACSEvalNonConstLocker::checkLocked() const +{ + if(!_isLocked) + throw YACS::Exception("YACSEvalNonConstLocker::checkLocked : this is NOT locked whereas it should be !"); +} + YACSEvalVirtualYACSContainer::YACSEvalVirtualYACSContainer():_gf(0),_cont(0) { } @@ -173,11 +179,14 @@ std::vector YACSEvalVirtualYACSContainer::listOfPropertyKeys() cons std::string YACSEvalVirtualYACSContainer::getValueOfKey(const char *key) const { + std::string skey(key); + if(skey==HOSTNAME_KEY) + return _chosenHost; std::map::const_iterator it; - it=_overloadedPropertyMap.find(key); + it=_overloadedPropertyMap.find(skey); if(it!=_overloadedPropertyMap.end()) return (*it).second; - it=_propertyMap.find(key); + it=_propertyMap.find(skey); if(it!=_propertyMap.end()) return (*it).second; return std::string(); @@ -186,25 +195,25 @@ std::string YACSEvalVirtualYACSContainer::getValueOfKey(const char *key) const void YACSEvalVirtualYACSContainer::setProperty(const std::string& key, const std::string &value) { checkNonLocked(); - _overloadedPropertyMap[key]=value; + if(key==HOSTNAME_KEY) + setWantedMachine(value); + else + _overloadedPropertyMap[key]=value; } -void YACSEvalVirtualYACSContainer::apply() +void YACSEvalVirtualYACSContainer::apply(bool interactiveStatus) { YACS::ENGINE::SalomeContainer *cont0(dynamic_cast(_cont)); YACS::ENGINE::SalomeHPContainer *cont1(dynamic_cast(_cont)); - if(cont0) - { - cont0->setProperty(HOSTNAME_KEY,getValueOfKey(HOSTNAME_KEY)); - return ; - } - else if(cont1) - { - cont1->setProperty(HOSTNAME_KEY,getValueOfKey(HOSTNAME_KEY)); - return ; - } - else + if(!cont0 && !cont1) throw YACS::Exception("YACSEvalVirtualYACSContainer::apply : unrecognized container !"); + if(interactiveStatus) + _cont->setProperty(HOSTNAME_KEY,getValueOfKey(HOSTNAME_KEY)); + else + {// in cluster mode no hostname and policy set to cycl by default + _cont->setProperty(HOSTNAME_KEY,std::string()); + _cont->setProperty("policy","cycl"); + } } std::string YACSEvalVirtualYACSContainer::getName() const @@ -279,10 +288,10 @@ YACSEvalVirtualYACSContainer *YACSEvalResource::at(std::size_t i) const return const_cast(&_containers[i]); } -void YACSEvalResource::apply() +void YACSEvalResource::apply(bool interactiveStatus) { for(std::vector< YACSEvalVirtualYACSContainer >::iterator it=_containers.begin();it!=_containers.end();it++) - (*it).apply(); + (*it).apply(interactiveStatus); } void YACSEvalResource::fitWithCurrentCatalogAbs() @@ -318,6 +327,22 @@ YACSEvalResource::YACSEvalResource(YACSEvalListOfResources *gf, const std::vecto _containers[i].set(this,conts[i]); } +void YACSEvalParamsForCluster::setExclusiveness(bool newStatus) +{ + if(newStatus) + throw YACS::Exception("YACSEvalParamsForCluster::setExclusiveness : exclusive mode set to true is not implemented yet !"); +} + +void YACSEvalParamsForCluster::checkConsistency() const +{ + if(_remoteWorkingDir.empty()) + throw YACS::Exception("YACSEvalParamsForCluster::checkConsistency : remote work dir is not set !"); + if(_wcKey.empty()) + throw YACS::Exception("YACSEvalParamsForCluster::checkConsistency : WC key is not set !"); + if(_nbOfProcs==0) + throw YACS::Exception("YACSEvalParamsForCluster::checkConsistency : nb procs must be != 0 !"); +} + YACSEvalListOfResources::YACSEvalListOfResources(int maxLevOfPara, ResourcesManager_cpp *rm, const YACS::ENGINE::DeploymentTree& dt):_maxLevOfPara(maxLevOfPara),_rm(rm),_dt(new YACS::ENGINE::DeploymentTree(dt)) { std::vector conts(_dt->getAllContainers()); @@ -405,17 +430,29 @@ bool YACSEvalListOfResources::isInteractive() const unsigned int YACSEvalListOfResources::getNumberOfProcsDeclared() const { - std::vector chosen(getAllChosenMachines()); - unsigned int ret(0); - for(std::vector::const_iterator it=chosen.begin();it!=chosen.end();it++) - ret+=getNumberOfProcOfResource(*it); - return ret; + if(isInteractive()) + { + std::vector chosen(getAllChosenMachines()); + unsigned int ret(0); + for(std::vector::const_iterator it=chosen.begin();it!=chosen.end();it++) + ret+=getNumberOfProcOfResource(*it); + return ret; + } + else + return _paramsInCaseOfCluster.getNbProcs(); +} + +void YACSEvalListOfResources::checkOKForRun() const +{ + if(!isInteractive()) + _paramsInCaseOfCluster.checkConsistency(); } void YACSEvalListOfResources::apply() { + bool interactiveSt(isInteractive()); for(std::vector::iterator it=_resources.begin();it!=_resources.end();it++) - (*it)->apply(); + (*it)->apply(interactiveSt); } YACSEvalListOfResources::~YACSEvalListOfResources() @@ -567,23 +604,59 @@ void YACSEvalListOfResources::notifyWantedMachine(YACSEvalVirtualYACSContainer * throw YACS::Exception("YACSEvalListOfResources::notifyWantedMachine : internal error !"); const ParserResourcesType& oldPRT((*itOld).second); const ParserResourcesType& newPRT((*itNew).second); - if(oldPRT.ClusterInternalProtocol==newPRT.ClusterInternalProtocol) + bool oldISt(oldPRT.Batch==none),newISt(newPRT.Batch==none);//interactive status + if(oldISt==newISt) return ; // the batch/interactive mode has changed -> try to change for all. std::queue sts; try { - for(std::vector::const_iterator it=_resources.begin();it!=_resources.end();it++) - { - std::size_t sz((*it)->size()); - for(std::size_t i=0;iat(i)); - if(cont==sender) - continue; - sts.push(cont->findDefault(newPRT.ClusterInternalProtocol==sh)); - } - } + if(newISt) + {// switching from interactive to batch -> In batch every YACSEvalVirtualYACSContainer instances in this must lie on newMachine. + for(std::vector::const_iterator it=_resources.begin();it!=_resources.end();it++) + { + std::vector fms((*it)->getAllFittingMachines()); + std::vector::iterator it0(std::find(fms.begin(),fms.end(),newMachine)); + if(it0==fms.end()) + { + std::ostringstream oss; oss << "In the context of switch to batch the requested cluster machine \"" << newMachine << "\" is not compatible for following list of containers : " << std::endl; + std::size_t sz((*it)->size()); + for(std::size_t i=0;iat(i)); + if(cont) + oss << " \"" << cont->getName() << "\", "; + } + throw YACS::Exception(oss.str()); + } + std::size_t sz((*it)->size()); + for(std::size_t i=0;isize()); + for(std::size_t i=0;iat(i)); + if(cont==sender) + continue; + sts.push(newMachine); + } + } + } + } + else + { + for(std::vector::const_iterator it=_resources.begin();it!=_resources.end();it++) + { + std::size_t sz((*it)->size()); + for(std::size_t i=0;iat(i)); + if(cont==sender) + continue; + sts.push(cont->findDefault(false)); + } + } + } } catch(YACS::Exception& e) { @@ -611,7 +684,7 @@ bool YACSEvalListOfResources::hasRightInteractiveStatus(const std::string& machi if(it==zeList.end()) throw YACS::Exception("YACSEvalListOfResources::hasRightInteractiveStatus : internal error !"); const ParserResourcesType& elt((*it).second); - bool myStatus(elt.ClusterInternalProtocol==sh); + bool myStatus(elt.Batch==none); return myStatus==isInteractive; } diff --git a/src/evalyfx/YACSEvalResource.hxx b/src/evalyfx/YACSEvalResource.hxx index 292d53468..75c1fb515 100644 --- a/src/evalyfx/YACSEvalResource.hxx +++ b/src/evalyfx/YACSEvalResource.hxx @@ -48,6 +48,7 @@ public: YACSEVALYFX_EXPORT void lock() { _isLocked=true; } YACSEVALYFX_EXPORT bool isLocked() const { return _isLocked; } YACSEVALYFX_EXPORT void checkNonLocked() const; + YACSEVALYFX_EXPORT void checkLocked() const; private: bool _isLocked; }; @@ -67,7 +68,7 @@ public: YACSEVALYFX_EXPORT void setProperty(const std::string& key, const std::string &value); YACSEVALYFX_EXPORT void resetOverloadedProps() { checkNonLocked(); _overloadedPropertyMap.clear(); } YACSEVALYFX_EXPORT std::string getName() const; - void apply(); + void apply(bool interactiveStatus); public: YACSEvalVirtualYACSContainer(); void set(YACSEvalResource *gf, YACS::ENGINE::Container *cont); @@ -114,7 +115,7 @@ public: YACSEVALYFX_EXPORT void setWantedMachine(const std::string& machine); YACSEVALYFX_EXPORT std::size_t size() const { return _containers.size(); } YACSEVALYFX_EXPORT YACSEvalVirtualYACSContainer *at(std::size_t i) const; - YACSEVALYFX_EXPORT void apply(); + YACSEVALYFX_EXPORT void apply(bool interactiveStatus); public: void fitWithCurrentCatalogAbs(); void aggregate(ParserResourcesType& entry) const; @@ -130,6 +131,26 @@ protected: class ResourcesManager_cpp; +class YACSEvalParamsForCluster +{ +public: + YACSEvalParamsForCluster():_exclusiveness(false),_nbOfProcs(1) { } + bool getExclusiveness() const { return _exclusiveness; } + void setExclusiveness(bool newStatus); + std::string getRemoteWorkingDir() const { return _remoteWorkingDir; } + void setRemoteWorkingDir(const std::string& remoteWorkingDir) { _remoteWorkingDir=remoteWorkingDir; } + std::string getWCKey() const { return _wcKey; } + void setWCKey(const std::string& wcKey) { _wcKey=wcKey; } + unsigned int getNbProcs() const { return _nbOfProcs; } + void setNbProcs(unsigned int nbProcs) { _nbOfProcs=nbProcs; } + void checkConsistency() const; +private: + bool _exclusiveness; + std::string _remoteWorkingDir; + std::string _wcKey; + unsigned int _nbOfProcs; +}; + class YACSEvalListOfResources : public YACSEvalNonConstLocker { public: @@ -141,6 +162,8 @@ public: YACSEVALYFX_EXPORT YACSEvalResource *at(std::size_t i) const; YACSEVALYFX_EXPORT bool isInteractive() const; YACSEVALYFX_EXPORT unsigned int getNumberOfProcsDeclared() const; + YACSEVALYFX_EXPORT void checkOKForRun() const; + YACSEVALYFX_EXPORT YACSEvalParamsForCluster& getAddParamsForCluster() { return _paramsInCaseOfCluster; } void apply(); YACSEVALYFX_EXPORT ~YACSEvalListOfResources(); public: @@ -158,6 +181,7 @@ private: int _maxLevOfPara; std::vector _resources; YACS::ENGINE::DeploymentTree *_dt; + YACSEvalParamsForCluster _paramsInCaseOfCluster; }; #endif diff --git a/src/evalyfx/YACSEvalYFX.cxx b/src/evalyfx/YACSEvalYFX.cxx index 537c7303c..423c46f4d 100644 --- a/src/evalyfx/YACSEvalYFX.cxx +++ b/src/evalyfx/YACSEvalYFX.cxx @@ -37,15 +37,6 @@ #include -class MyAutoThreadSaver -{ -public: - MyAutoThreadSaver():_save(PyEval_SaveThread()) { } - ~MyAutoThreadSaver() { PyEval_RestoreThread(_save); } -private: - PyThreadState *_save; -}; - YACSEvalYFX *YACSEvalYFX::BuildFromFile(const std::string& xmlOfScheme) { YACS::ENGINE::RuntimeSALOME::setRuntime(); @@ -97,30 +88,20 @@ YACSEvalListOfResources *YACSEvalYFX::giveResources() bool YACSEvalYFX::run(YACSEvalSession *session, int& nbOfBranches) { - _pattern->assignRandomVarsInputs(); - YACSEvalListOfResources *rss(giveResources()); - if(!rss->isInteractive()) - throw YACS::Exception("YACSEvalYFX::run : not implemented yet for non interactive !"); - YACSEvalSession *mySession(session); - YACS::AutoCppPtr loc; if(!session) { throw YACS::Exception("YACSEvalYFX::run : input session in null !"); - /*loc=new YACSEvalSession; - mySession=loc;*/ } + session->launch(); + // + YACSEvalListOfResources *rss(giveResources()); + rss->checkOKForRun(); + _pattern->assignRandomVarsInputs(); + //if(!rss->isInteractive()) + // throw YACS::Exception("YACSEvalYFX::run : not implemented yet for non interactive !"); rss->apply(); nbOfBranches=_pattern->assignNbOfBranches(); - mySession->launch(); - YACS::ENGINE::Executor exe; - exe.setKeepGoingProperty(!_params.getStopASAPAfterErrorStatus()); - // - _pattern->emitStart(); - { - MyAutoThreadSaver locker; - exe.RunW(getUndergroundGeneratedGraph()); - } - return getUndergroundGeneratedGraph()->getState()==YACS::DONE; + return _pattern->go(_params.getStopASAPAfterErrorStatus()); } void YACSEvalYFX::registerObserver(YACSEvalObserver *observer) diff --git a/src/evalyfx/YACSEvalYFXPattern.cxx b/src/evalyfx/YACSEvalYFXPattern.cxx index 5472bb427..d35fb8083 100644 --- a/src/evalyfx/YACSEvalYFXPattern.cxx +++ b/src/evalyfx/YACSEvalYFXPattern.cxx @@ -38,6 +38,8 @@ #include "PythonNode.hxx" #include "InlineNode.hxx" #include "ServiceNode.hxx" +#include "PyStdout.hxx" +#include "AutoGIL.hxx" #include "ResourcesManager.hxx" @@ -47,6 +49,9 @@ #include #include +//// +#include + const char YACSEvalYFXPattern::DFT_PROC_NAME[]="YFX"; const char YACSEvalYFXPattern::ST_OK[]="ALL_OK"; @@ -61,6 +66,15 @@ const char YACSEvalYFXRunOnlyPattern::FIRST_FE_SUBNODE_NAME[]="Bloc"; const char YACSEvalYFXRunOnlyPattern::GATHER_NODE_NAME[]="__gather__"; +class MyAutoThreadSaver +{ +public: + MyAutoThreadSaver():_save(PyEval_SaveThread()) { } + ~MyAutoThreadSaver() { PyEval_RestoreThread(_save); } +private: + PyThreadState *_save; +}; + std::vector< YACSEvalInputPort *> YACSEvalYFXPattern::getFreeInputPorts() const { std::size_t sz(_inputs.size()); @@ -648,6 +662,32 @@ void YACSEvalYFXRunOnlyPattern::emitStart() const obs->notifyNumberOfSamplesToEval(getBoss(),_FEInGeneratedGraph->getNbOfElementsToBeProcessed()); } +bool YACSEvalYFXRunOnlyPattern::go(bool stopASAP) const +{ + emitStart(); + if(getResourcesInternal()->isInteractive()) + { + YACS::ENGINE::Executor exe; + exe.setKeepGoingProperty(!stopASAP); + { + MyAutoThreadSaver locker; + exe.RunW(getUndergroundGeneratedGraph()); + } + return getUndergroundGeneratedGraph()->getState()==YACS::DONE; + } + else + { + char EFXGenFileName[]="EFXGenFileName"; + char EFXGenContent[]="import getpass,datetime,os\nn=datetime.datetime.now()\nreturn os.path.join(os.path.sep,\"tmp\",\"EvalYFX_%s_%s_%s.xml\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\")))"; + // + YACS::ENGINE::AutoPyRef func(YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent)); + YACS::ENGINE::AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func)); + std::string fn(PyString_AsString(val)); + getUndergroundGeneratedGraph()->saveSchema(fn); + return false; + } +} + bool YACSEvalYFXRunOnlyPattern::IsMatching(YACS::ENGINE::Proc *scheme, YACS::ENGINE::ComposedNode *& runNode) { std::list nodes(scheme->getChildren()); diff --git a/src/evalyfx/YACSEvalYFXPattern.hxx b/src/evalyfx/YACSEvalYFXPattern.hxx index b5abcb7b5..bcfe0e6d5 100644 --- a/src/evalyfx/YACSEvalYFXPattern.hxx +++ b/src/evalyfx/YACSEvalYFXPattern.hxx @@ -79,7 +79,7 @@ public: virtual std::string getStatusOfRunStr() const = 0; virtual std::vector getResults() const = 0; virtual std::vector getResultsInCaseOfFailure(std::vector& passedIds) const = 0; - virtual void emitStart() const = 0; + virtual bool go(bool stopASAP) const = 0; public: static const char DFT_PROC_NAME[]; protected: @@ -129,7 +129,7 @@ public: std::string getStatusOfRunStr() const; std::vector getResults() const; std::vector getResultsInCaseOfFailure(std::vector& passedIds) const; - void emitStart() const; + bool go(bool stopASAP) const; // YACS::ENGINE::ForEachLoop *getUndergroundForEach() const { return _FEInGeneratedGraph; } static bool IsMatching(YACS::ENGINE::Proc *scheme, YACS::ENGINE::ComposedNode *& runNode); @@ -137,6 +137,7 @@ public: static const char FIRST_FE_SUBNODE_NAME[]; static const char GATHER_NODE_NAME[]; private: + void emitStart() const; void buildInputPorts(); void buildOutputPorts(); YACS::ENGINE::ForEachLoop *findTopForEach() const; diff --git a/src/evalyfx_swig/evalyfx.i b/src/evalyfx_swig/evalyfx.i index 2f39874c2..aceff91ae 100644 --- a/src/evalyfx_swig/evalyfx.i +++ b/src/evalyfx_swig/evalyfx.i @@ -337,6 +337,22 @@ private: YACSEvalResource(); }; +class YACSEvalParamsForCluster +{ +public: + bool getExclusiveness() const; + void setExclusiveness(bool newStatus); + std::string getRemoteWorkingDir(); + void setRemoteWorkingDir(const std::string& remoteWorkingDir); + std::string getWCKey() const; + void setWCKey(const std::string& wcKey); + unsigned int getNbProcs() const; + void setNbProcs(unsigned int nbProcs); + void checkConsistency() const; +private: + YACSEvalParamsForCluster(); +}; + class YACSEvalListOfResources { public: @@ -347,6 +363,8 @@ public: bool isInteractive() const; YACSEvalResource *at(std::size_t i) const; unsigned int getNumberOfProcsDeclared() const; + void checkOKForRun() const; + YACSEvalParamsForCluster& getAddParamsForCluster(); %extend { std::size_t __len__() const diff --git a/src/runtime/AutoGIL.hxx b/src/runtime/AutoGIL.hxx index 640d59070..70b220b68 100644 --- a/src/runtime/AutoGIL.hxx +++ b/src/runtime/AutoGIL.hxx @@ -35,6 +35,24 @@ namespace YACS private: PyGILState_STATE _gstate; }; + + class AutoPyRef + { + public: + AutoPyRef(PyObject *pyobj=0):_pyobj(pyobj) { } + ~AutoPyRef() { release(); } + AutoPyRef(const AutoPyRef& other):_pyobj(other._pyobj) { if(_pyobj) Py_XINCREF(_pyobj); } + AutoPyRef& operator=(const AutoPyRef& other) { if(_pyobj==other._pyobj) return *this; release(); _pyobj=other._pyobj; Py_XINCREF(_pyobj); return *this; } + operator PyObject *() { return _pyobj; } + void set(PyObject *pyobj) { if(pyobj==_pyobj) return ; release(); _pyobj=pyobj; } + PyObject *get() { return _pyobj; } + bool isNull() const { return _pyobj==0; } + PyObject *retn() { if(_pyobj) Py_XINCREF(_pyobj); return _pyobj; } + private: + void release() { if(_pyobj) Py_XDECREF(_pyobj); _pyobj=0; } + private: + PyObject *_pyobj; + }; } } diff --git a/src/runtime/PyStdout.cxx b/src/runtime/PyStdout.cxx index 83980a20f..cc939672c 100644 --- a/src/runtime/PyStdout.cxx +++ b/src/runtime/PyStdout.cxx @@ -18,8 +18,18 @@ // #include "PyStdout.hxx" +#include "Exception.hxx" +#include "AutoGIL.hxx" + #include + #include +#include + +#ifdef WIN32 +#include +#define getpid _getpid +#endif namespace YACS { @@ -127,6 +137,56 @@ PyObject * newPyStdOut( std::string& out ) return (PyObject*)self; } +PyObject *evalPy(const std::string& funcName, const std::string& strToEval) +{ + std::ostringstream oss0; oss0 << "def " << funcName << "():\n"; + std::string::size_type i0(0); + while(i0((PyObject *)code),context,context)); + PyObject *ret(PyDict_GetItemString(context,funcName.c_str())); //borrowed ref + if(!ret) + throw YACS::Exception("evalPy : Error on returned func !"); + Py_XINCREF(ret); + return ret; +} + +PyObject *evalFuncPyWithNoParams(PyObject *func) +{ + if(!func) + throw YACS::Exception("evalFuncPyWithNoParams : input func is NULL !"); + AutoPyRef args(PyTuple_New(0)); + AutoPyRef ret(PyObject_CallObject(func,args)); + if(ret.isNull()) + throw YACS::Exception("evalFuncPyWithNoParams : ret is null !"); + return ret.retn(); +} + } } diff --git a/src/runtime/PyStdout.hxx b/src/runtime/PyStdout.hxx index af204cea6..8899c0e6c 100644 --- a/src/runtime/PyStdout.hxx +++ b/src/runtime/PyStdout.hxx @@ -29,7 +29,9 @@ namespace YACS { namespace ENGINE { - YACSRUNTIMESALOME_EXPORT PyObject * newPyStdOut( std::string& out ); + YACSRUNTIMESALOME_EXPORT PyObject *newPyStdOut( std::string& out ); + YACSRUNTIMESALOME_EXPORT PyObject *evalPy(const std::string& funcName, const std::string& strToEval); + YACSRUNTIMESALOME_EXPORT PyObject *evalFuncPyWithNoParams(PyObject *func); } }