X-Git-Url: http://git.salome-platform.org/gitweb/?a=blobdiff_plain;f=src%2Fevalyfx%2FYACSEvalYFXPattern.cxx;h=bd15bee1ee404afb7c7b2ac2c384d0e6d615e755;hb=1c08ba9d8254258e0bed89a2cb2d9cf3cb4a434c;hp=d35fb80834b0ac08c05dbcb21e454bb8fbdcd611;hpb=1eb8ef53bd1123b18002f3ad1ac50038003ac9f3;p=modules%2Fyacs.git diff --git a/src/evalyfx/YACSEvalYFXPattern.cxx b/src/evalyfx/YACSEvalYFXPattern.cxx index d35fb8083..bd15bee1e 100644 --- a/src/evalyfx/YACSEvalYFXPattern.cxx +++ b/src/evalyfx/YACSEvalYFXPattern.cxx @@ -1,4 +1,4 @@ -// Copyright (C) 2012-2016 CEA/DEN, EDF R&D +// Copyright (C) 2012-2022 CEA/DEN, EDF R&D // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Lesser General Public @@ -21,8 +21,11 @@ #include "YACSEvalYFXPattern.hxx" #include "YACSEvalResource.hxx" #include "YACSEvalSeqAny.hxx" +#include "YACSEvalSession.hxx" #include "YACSEvalObserver.hxx" +#include "YACSEvalSessionInternal.hxx" #include "YACSEvalAutoPtr.hxx" +#include "YACSEvalExecParams.hxx" #include "ElementaryNode.hxx" #include "RuntimeSALOME.hxx" @@ -32,6 +35,7 @@ #include "LinkInfo.hxx" #include "TypeCode.hxx" #include "Proc.hxx" +#include "Dispatcher.hxx" #include "PythonPorts.hxx" #include "ForEachLoop.hxx" @@ -39,7 +43,7 @@ #include "InlineNode.hxx" #include "ServiceNode.hxx" #include "PyStdout.hxx" -#include "AutoGIL.hxx" +#include "PythonCppUtils.hxx" #include "ResourcesManager.hxx" @@ -52,8 +56,6 @@ //// #include -const char YACSEvalYFXPattern::DFT_PROC_NAME[]="YFX"; - const char YACSEvalYFXPattern::ST_OK[]="ALL_OK"; const char YACSEvalYFXPattern::ST_FAILED[]="SOME_SAMPLES_FAILED_AND_ALL_OF_THEM_FAILED_DETERMINISTICALLY"; @@ -62,16 +64,27 @@ const char YACSEvalYFXPattern::ST_ERROR[]="SOME_SAMPLES_FAILED_BUT_IMPOSSIBLE_TO const std::size_t YACSEvalYFXPattern::MAX_LGTH_OF_INP_DUMP=10000; -const char YACSEvalYFXRunOnlyPattern::FIRST_FE_SUBNODE_NAME[]="Bloc"; +const char YACSEvalYFXGraphGen::DFT_PROC_NAME[]="YFX"; + +const char YACSEvalYFXGraphGen::FIRST_FE_SUBNODE_NAME[]="Bloc"; -const char YACSEvalYFXRunOnlyPattern::GATHER_NODE_NAME[]="__gather__"; +const char YACSEvalYFXGraphGen::GATHER_NODE_NAME[]="__gather__"; + +const char YACSEvalYFXGraphGen::HIDDEN_INDEX_VAR[]="___idx___"; class MyAutoThreadSaver { public: - MyAutoThreadSaver():_save(PyEval_SaveThread()) { } - ~MyAutoThreadSaver() { PyEval_RestoreThread(_save); } + MyAutoThreadSaver(bool isToSave):_isToSave(isToSave),_save(0) + { + if(_isToSave) + { + _save = PyEval_SaveThread(); + } + } + ~MyAutoThreadSaver() { if(_isToSave) PyEval_RestoreThread(_save); } private: + bool _isToSave; PyThreadState *_save; }; @@ -191,7 +204,7 @@ YACSEvalYFXPattern::YACSEvalYFXPattern(YACSEvalYFX *boss, YACS::ENGINE::Proc *sc { } -YACS::ENGINE::TypeCode *YACSEvalYFXPattern::createSeqTypeCodeFrom(YACS::ENGINE::Proc *scheme, const std::string& zeType) +YACS::ENGINE::TypeCode *YACSEvalYFXPattern::CreateSeqTypeCodeFrom(YACS::ENGINE::Proc *scheme, const std::string& zeType) { std::ostringstream oss; oss << "list[" << zeType << "]"; YACS::ENGINE::TypeCode *tc(scheme->getTypeCode(zeType)); @@ -245,7 +258,7 @@ YACSEvalSeqAny *YACSEvalYFXPattern::BuildValueInPort(YACS::ENGINE::InputPyPort * for(std::size_t i=0;i elt(new YACSEvalSeqAnyInt(eltCpp)); return elt.dettach(); @@ -294,26 +307,34 @@ class YACSEvalYFXRunOnlyPatternInternalObserver : public YACS::ENGINE::Observer { public: YACSEvalYFXRunOnlyPatternInternalObserver(YACSEvalYFXRunOnlyPattern *boss):_boss(boss) { if(!_boss) throw YACS::Exception("YACSEvalYFXRunOnlyPatternInternalObserver constructor : null boss not supported :)"); } - void notifyObserver(YACS::ENGINE::Node *object, const std::string& event); + void notifyObserver2(YACS::ENGINE::Node *object, const std::string& event, void *something); private: YACSEvalYFXRunOnlyPattern *_boss; }; -void YACSEvalYFXRunOnlyPatternInternalObserver::notifyObserver(YACS::ENGINE::Node *object, const std::string& event) +void YACSEvalYFXRunOnlyPatternInternalObserver::notifyObserver2(YACS::ENGINE::Node *object, const std::string& event, void *something) { - YACS::ENGINE::ForEachLoop *object2(dynamic_cast(object)); - if(!object2) - return ; + YACS::ENGINE::ForEachLoop *object2(_boss->getUndergroundForEach()); YACSEvalObserver *obs(_boss->getObserver()); if(!obs) return ; - if(event=="progress") - obs->notifyNewNumberOfPassedItems(_boss->getBoss(),object2->getCurrentIndex()); + if(event=="progress_ok" && object2==object) + { + int *casted(reinterpret_cast(something)); + obs->notifySampleOK(_boss->getBoss(),*casted); + return ; + } + if(event=="progress_ko" && object2==object) + { + int *casted(reinterpret_cast(something)); + obs->notifySampleKO(_boss->getBoss(),*casted); + return ; + } } ///////////////////// -YACSEvalYFXRunOnlyPattern::YACSEvalYFXRunOnlyPattern(YACSEvalYFX *boss, YACS::ENGINE::Proc *scheme, bool ownScheme, YACS::ENGINE::ComposedNode *runNode):YACSEvalYFXPattern(boss,scheme,ownScheme),_runNode(runNode),_generatedGraph(0),_FEInGeneratedGraph(0),_obs(new YACSEvalYFXRunOnlyPatternInternalObserver(this)) +YACSEvalYFXRunOnlyPattern::YACSEvalYFXRunOnlyPattern(YACSEvalYFX *boss, YACS::ENGINE::Proc *scheme, bool ownScheme, YACS::ENGINE::ComposedNode *runNode):YACSEvalYFXPattern(boss,scheme,ownScheme),_lockedStatus(false),_runNode(runNode),_gen(0),_obs(new YACSEvalYFXRunOnlyPatternInternalObserver(this)) { if(!_runNode) throw YACS::Exception("YACSEvalYFXRunOnlyPattern : internal run node must be not null !"); @@ -324,141 +345,45 @@ YACSEvalYFXRunOnlyPattern::YACSEvalYFXRunOnlyPattern(YACSEvalYFX *boss, YACS::EN YACSEvalYFXRunOnlyPattern::~YACSEvalYFXRunOnlyPattern() { delete _obs; + delete _gen; } void YACSEvalYFXRunOnlyPattern::setOutPortsOfInterestForEvaluation(const std::vector& outputsOfInterest) { checkNonLocked(); _outputsOfInterest=outputsOfInterest; + _lockedStatus=true; } void YACSEvalYFXRunOnlyPattern::resetOutputsOfInterest() { checkLocked(); _outputsOfInterest.clear(); + _lockedStatus=false; } void YACSEvalYFXRunOnlyPattern::generateGraph() { - static const char LISTPYOBJ_STR[]="list[pyobj]"; - if(_outputsOfInterest.empty()) - return ; - YACS::ENGINE::RuntimeSALOME::setRuntime(); - YACS::ENGINE::RuntimeSALOME *r(YACS::ENGINE::getSALOMERuntime()); - _generatedGraph=r->createProc(DFT_PROC_NAME); - YACS::ENGINE::TypeCode *pyobjTC(_generatedGraph->createInterfaceTc("python:obj:1.0","pyobj",std::list())); - std::ostringstream oss; oss << "Loop_" << _runNode->getName(); - _generatedGraph->createType(YACSEvalAnyDouble::TYPE_REPR,"double"); - _generatedGraph->createType(YACSEvalAnyInt::TYPE_REPR,"int"); - // - YACS::ENGINE::InlineNode *n0(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__initializer__")); - _generatedGraph->edAddChild(n0); - YACS::ENGINE::TypeCode *listPyobjTC(_generatedGraph->createSequenceTc(LISTPYOBJ_STR,LISTPYOBJ_STR,pyobjTC)); - YACS::ENGINE::OutputPort *sender(n0->edAddOutputPort("sender",listPyobjTC)); - std::ostringstream var0; - for(std::vector< YACSEvalInputPort >::const_iterator it=_inputs.begin();it!=_inputs.end();it++) - { - if((*it).isRandomVar()) - { - var0 << (*it).getName() << ","; - YACS::ENGINE::TypeCode *tc(createSeqTypeCodeFrom(_generatedGraph,(*it).getTypeOfData())); - YACS::ENGINE::InputPort *inp(n0->edAddInputPort((*it).getName(),tc)); - YACS::ENGINE::InputPyPort *inpc(dynamic_cast(inp)); - if(!inpc) - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::generateGraph : internal error 1 !"); - (*it).setUndergroundPortToBeSet(inpc); - } - } - std::ostringstream n0Script; n0Script << "sender=zip(" << var0.str() << ")\n"; - n0->setScript(n0Script.str()); - // - YACS::ENGINE::ForEachLoop *n1(r->createForEachLoop(oss.str(),pyobjTC)); - _FEInGeneratedGraph=n1; - _generatedGraph->edAddChild(n1); - _generatedGraph->edAddCFLink(n0,n1); - _generatedGraph->edAddDFLink(sender,n1->edGetSeqOfSamplesPort()); - YACS::ENGINE::InlineNode *n2(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,GATHER_NODE_NAME)); - _generatedGraph->edAddChild(n2); - _generatedGraph->edAddCFLink(n1,n2); - // - YACS::ENGINE::Bloc *n10(r->createBloc(FIRST_FE_SUBNODE_NAME)); - n1->edAddChild(n10); - YACS::ENGINE::InlineNode *n100(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__dispatch__")); - YACS::ENGINE::Node *n101(_runNode->cloneWithoutCompAndContDeepCpy(0,true)); - n10->edAddChild(n100); - n10->edAddChild(n101); - YACS::ENGINE::InputPort *dispatchIn(n100->edAddInputPort("i0",pyobjTC)); - n10->edAddCFLink(n100,n101); - n1->edAddDFLink(n1->edGetSamplePort(),dispatchIn); - std::ostringstream var1; - for(std::vector< YACSEvalInputPort >::const_iterator it=_inputs.begin();it!=_inputs.end();it++) - { - if((*it).isRandomVar()) - { - var1 << (*it).getName() << ","; - YACS::ENGINE::OutputPort *myOut(n100->edAddOutputPort((*it).getName(),_generatedGraph->getTypeCode((*it).getTypeOfData()))); - std::string tmpPortName(_runNode->getInPortName((*it).getUndergroundPtr())); - YACS::ENGINE::InputPort *myIn(n101->getInputPort(tmpPortName)); - n10->edAddDFLink(myOut,myIn); - } - } - std::ostringstream n100Script; n100Script << var1.str() << "=i0\n"; - n100->setScript(n100Script.str()); - for(std::vector< YACSEvalOutputPort * >::const_iterator it=_outputsOfInterest.begin();it!=_outputsOfInterest.end();it++) - { - YACS::ENGINE::TypeCode *tc(createSeqTypeCodeFrom(_generatedGraph,(*it)->getTypeOfData())); - YACS::ENGINE::InputPort *myIn(n2->edAddInputPort((*it)->getName(),tc)); - std::string tmpPortName(_runNode->getOutPortName((*it)->getUndergroundPtr())); - YACS::ENGINE::OutputPort *myOut(n101->getOutputPort(tmpPortName)); - _generatedGraph->edAddDFLink(myOut,myIn); - } - _generatedGraph->updateContainersAndComponents(); + delete _gen; + if(getResourcesInternal()->isInteractive()) + _gen=new YACSEvalYFXGraphGenInteractive(this); + else + _gen=new YACSEvalYFXGraphGenCluster(this); + _gen->generateGraph(); } void YACSEvalYFXRunOnlyPattern::resetGeneratedGraph() { - delete _generatedGraph; - _generatedGraph=0; - resetResources(); + if(_gen) + _gen->resetGeneratedGraph(); } int YACSEvalYFXRunOnlyPattern::assignNbOfBranches() { checkAlreadyComputedResources(); - if(!_generatedGraph) - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::assignNbOfBranches : the generated graph has not been created !"); - std::list nodes(_generatedGraph->getChildren()); - YACS::ENGINE::ForEachLoop *zeMainNode(0); - for(std::list::const_iterator it=nodes.begin();it!=nodes.end();it++) - { - YACS::ENGINE::ForEachLoop *isZeMainNode(dynamic_cast(*it)); - if(isZeMainNode) - { - if(!zeMainNode) - zeMainNode=isZeMainNode; - else - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::assignNbOfBranches : internal error 1 !"); - } - } - if(!zeMainNode) - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::assignNbOfBranches : internal error 2 !"); - unsigned int nbProcsDeclared(getResourcesInternal()->getNumberOfProcsDeclared()); - nbProcsDeclared=std::max(nbProcsDeclared,4u); - int nbOfBranch=1; - if(getParallelizeStatus()) - { - nbOfBranch=(nbProcsDeclared/getResourcesInternal()->getMaxLevelOfParallelism()); - nbOfBranch=std::max(nbOfBranch,1); - } - YACS::ENGINE::InputPort *zeInputToSet(zeMainNode->edGetNbOfBranchesPort()); - YACS::ENGINE::AnyInputPort *zeInputToSetC(dynamic_cast(zeInputToSet)); - if(!zeInputToSetC) - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::assignNbOfBranches : internal error 3 !"); - YACS::ENGINE::Any *a(YACS::ENGINE::AtomAny::New(nbOfBranch)); - zeInputToSetC->put(a); - zeInputToSetC->exSaveInit(); - a->decrRef(); - return nbOfBranch; + if(!_gen) + throw YACS::Exception("YACSEvalYFXRunOnlyPattern::assignNbOfBranches : generator is NULL ! Please invoke generateGraph before !"); + return _gen->assignNbOfBranches(); } void YACSEvalYFXRunOnlyPattern::assignRandomVarsInputs() @@ -478,7 +403,7 @@ void YACSEvalYFXRunOnlyPattern::assignRandomVarsInputs() bool YACSEvalYFXRunOnlyPattern::isLocked() const { - return _generatedGraph!=0; + return _lockedStatus; } YACSEvalListOfResources *YACSEvalYFXRunOnlyPattern::giveResources() @@ -496,22 +421,27 @@ YACSEvalListOfResources *YACSEvalYFXRunOnlyPattern::giveResources() YACS::ENGINE::Proc *YACSEvalYFXRunOnlyPattern::getUndergroundGeneratedGraph() const { - return _generatedGraph; + return getGenerator()->getUndergroundGeneratedGraph(); } std::string YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure() const { + std::string generatorErrors = getGenerator()->getErrors(); + if(generatorErrors.size() > 0) + { + return generatorErrors; + } std::string st(getStatusOfRunStr());//test if a run has occurred. if(st==ST_OK) throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure : The execution of scheme has been carried out to the end without any problem !"); // All the problem can only comes from foreach -> scan it - YACS::ENGINE::ForEachLoop *fe(findTopForEach()); + YACS::ENGINE::ForEachLoop *fe(getUndergroundForEach()); YACS::ENGINE::NodeStateNameMap nsm; unsigned nbB(fe->getNumberOfBranchesCreatedDyn()); std::ostringstream oss; for(unsigned j=0;jgetChildByNameExec(FIRST_FE_SUBNODE_NAME,j)); + YACS::ENGINE::Node *nn(fe->getChildByNameExec(YACSEvalYFXGraphGen::FIRST_FE_SUBNODE_NAME,j)); YACS::ENGINE::Bloc *nnc(dynamic_cast(nn)); if(!nnc) throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure : internal error 1 ! The direct son of main foreach is expected to be a Bloc !"); @@ -526,14 +456,6 @@ std::string YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure() const oss << "NODE = " << nnc->getChildName(*it1) << std::endl; oss << "STATUS = " << nsm[st0] << std::endl; oss << "BRANCH ID = " << j << std::endl; - std::list inps((*it1)->getSetOfInputPort()); - for(std::list::const_iterator it2=inps.begin();it2!=inps.end();it2++) - { - std::string d((*it2)->getHumanRepr()); - if(d.size()>10000) - d=d.substr(0,MAX_LGTH_OF_INP_DUMP); - oss << "INPUT \"" << (*it2)->getName() << "\" = " << d << std::endl; - } oss << "DETAILS = " << std::endl; oss << (*it1)->getErrorDetails(); } @@ -544,7 +466,7 @@ std::string YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure() const std::string YACSEvalYFXRunOnlyPattern::getStatusOfRunStr() const { - YACS::StatesForNode st(_generatedGraph->getState()); + YACS::StatesForNode st(getUndergroundGeneratedGraph()->getState()); switch(st) { case YACS::READY: @@ -579,26 +501,7 @@ std::string YACSEvalYFXRunOnlyPattern::getStatusOfRunStr() const std::vector YACSEvalYFXRunOnlyPattern::getResults() const { - if(_generatedGraph->getState()!=YACS::DONE) - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResults : the execution did not finished correctly ! getResults should not be called !"); - std::vector ret(_outputsOfInterest.size()); - YACS::ENGINE::Node *node(_generatedGraph->getChildByName(GATHER_NODE_NAME)); - YACS::ENGINE::PythonNode *nodeC(dynamic_cast(node)); - if(!nodeC) - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResults : internal error !"); - std::size_t ii(0); - for(std::vector< YACSEvalOutputPort * >::const_iterator it=_outputsOfInterest.begin();it!=_outputsOfInterest.end();it++,ii++) - { - YACS::ENGINE::InPort *input(nodeC->getInPort((*it)->getName())); - YACS::ENGINE::InputPyPort *inputC(dynamic_cast(input)); - if(!inputC) - { - std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::getResults : internal error for input \"" << (*it)->getName() << "\""; - throw YACS::Exception(oss.str()); - } - ret[ii]=BuildValueInPort(inputC); - } - return ret; + return _gen->getResults(); } /*! @@ -607,7 +510,7 @@ std::vector YACSEvalYFXRunOnlyPattern::getResults() const */ std::vector YACSEvalYFXRunOnlyPattern::getResultsInCaseOfFailure(std::vector& passedIds) const { - YACS::StatesForNode st(_generatedGraph->getState()); + YACS::StatesForNode st(getUndergroundGeneratedGraph()->getState()); if(st==YACS::DONE) { passedIds.clear(); @@ -624,8 +527,8 @@ std::vector YACSEvalYFXRunOnlyPattern::getResultsInCaseOfFailu return ret; } getStatusOfRunStr();// To check that the status is recognized. - std::list lns(_generatedGraph->edGetDirectDescendants()); - YACS::ENGINE::ForEachLoop *fe(findTopForEach()); + std::list lns(getUndergroundGeneratedGraph()->edGetDirectDescendants()); + YACS::ENGINE::ForEachLoop *fe(getUndergroundForEach()); // YACS::ENGINE::Executor exe; std::vector outputs; @@ -637,7 +540,7 @@ std::vector YACSEvalYFXRunOnlyPattern::getResultsInCaseOfFailu { YACS::ENGINE::OutputPort *p((*it1)->getUndergroundPtr()); std::string st(_runNode->getOutPortName(p)); - std::ostringstream oss; oss << FIRST_FE_SUBNODE_NAME << '.' << _runNode->getName() << '.' << st; + std::ostringstream oss; oss << YACSEvalYFXGraphGen::FIRST_FE_SUBNODE_NAME << '.' << _runNode->getName() << '.' << st; st=oss.str(); YACS::ENGINE::ForEachLoop::InterceptorizeNameOfPort(st); std::vector::iterator it2(std::find(nameOfOutputs.begin(),nameOfOutputs.end(),st)); @@ -659,33 +562,24 @@ void YACSEvalYFXRunOnlyPattern::emitStart() const YACSEvalObserver *obs(getObserver()); if(!obs) return ; - obs->notifyNumberOfSamplesToEval(getBoss(),_FEInGeneratedGraph->getNbOfElementsToBeProcessed()); + obs->startComputation(getBoss()); } -bool YACSEvalYFXRunOnlyPattern::go(bool stopASAP) const +bool YACSEvalYFXRunOnlyPattern::go(const YACSEvalExecParams& params, YACSEvalSession *session) const { emitStart(); - if(getResourcesInternal()->isInteractive()) - { - YACS::ENGINE::Executor exe; - exe.setKeepGoingProperty(!stopASAP); - { - MyAutoThreadSaver locker; - exe.RunW(getUndergroundGeneratedGraph()); - } - return getUndergroundGeneratedGraph()->getState()==YACS::DONE; - } - else - { - char EFXGenFileName[]="EFXGenFileName"; - char EFXGenContent[]="import getpass,datetime,os\nn=datetime.datetime.now()\nreturn os.path.join(os.path.sep,\"tmp\",\"EvalYFX_%s_%s_%s.xml\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\")))"; - // - YACS::ENGINE::AutoPyRef func(YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent)); - YACS::ENGINE::AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func)); - std::string fn(PyString_AsString(val)); - getUndergroundGeneratedGraph()->saveSchema(fn); - return false; - } + YACS::ENGINE::Dispatcher *disp(YACS::ENGINE::Dispatcher::getDispatcher()); + disp->addObserver(_obs,getUndergroundForEach(),"progress_ok"); + disp->addObserver(_obs,getUndergroundForEach(),"progress_ko"); + bool ret(getGenerator()->go(params,session)); + disp->removeObserver(_obs,getUndergroundForEach(),"progress_ok"); + disp->removeObserver(_obs,getUndergroundForEach(),"progress_ko"); + return ret; +} + +YACS::ENGINE::ForEachLoop *YACSEvalYFXRunOnlyPattern::getUndergroundForEach() const +{ + return getGenerator()->getUndergroundForEach(); } bool YACSEvalYFXRunOnlyPattern::IsMatching(YACS::ENGINE::Proc *scheme, YACS::ENGINE::ComposedNode *& runNode) @@ -768,17 +662,413 @@ void YACSEvalYFXRunOnlyPattern::buildOutputPorts() } } -YACS::ENGINE::ForEachLoop *YACSEvalYFXRunOnlyPattern::findTopForEach() const +YACSEvalYFXGraphGen *YACSEvalYFXRunOnlyPattern::getGenerator() const +{ + if(!_gen) + throw YACS::Exception("getGenerator : generator is NULL !"); + return _gen; +} + +///////////////////// + +YACSEvalYFXGraphGen::YACSEvalYFXGraphGen(YACSEvalYFXRunOnlyPattern *boss):_boss(boss),_generatedGraph(0),_FEInGeneratedGraph(0) +{ + if(!_boss) + throw YACS::Exception("YACSEvalYFXGraphGen constructor : boss is NULL !"); +} + +YACSEvalYFXGraphGen::~YACSEvalYFXGraphGen() +{ + //delete _generatedGraph;// -> TODO : AGY why ? +} + +void YACSEvalYFXGraphGen::resetGeneratedGraph() +{ + delete _generatedGraph; + _generatedGraph=0; _FEInGeneratedGraph=0; +} + +void YACSEvalYFXGraphGen::generateGraphCommon(CustomPatcher& patcher) +{ + if(_generatedGraph) + { delete _generatedGraph; _generatedGraph=0; _FEInGeneratedGraph=0; } + static const char LISTPYOBJ_STR[]="list[pyobj]"; + if(getBoss()->getOutputsOfInterest().empty()) + return ; + YACS::ENGINE::RuntimeSALOME::setRuntime(); + YACS::ENGINE::RuntimeSALOME *r(YACS::ENGINE::getSALOMERuntime()); + _generatedGraph=r->createProc(DFT_PROC_NAME); + YACS::ENGINE::TypeCode *pyobjTC(_generatedGraph->createInterfaceTc("python:obj:1.0","pyobj",std::list())); + std::ostringstream oss; oss << "Loop_" << getBoss()->getRunNode()->getName(); + _generatedGraph->createType(YACSEvalAnyDouble::TYPE_REPR,"double"); + YACS::ENGINE::TypeCode *tcInt(_generatedGraph->createType(YACSEvalAnyInt::TYPE_REPR,"int")); + // + YACS::ENGINE::InlineNode *n0(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__initializer__")); + _generatedGraph->edAddChild(n0); + YACS::ENGINE::TypeCode *listPyobjTC(_generatedGraph->createSequenceTc(LISTPYOBJ_STR,LISTPYOBJ_STR,pyobjTC)); + YACS::ENGINE::OutputPort *sender(n0->edAddOutputPort("sender",listPyobjTC)); + std::ostringstream var0; + const std::vector< YACSEvalInputPort >& inputs(getBoss()->getInputs()); + for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++) + { + if((*it).isRandomVar()) + { + var0 << (*it).getName() << ","; + YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it).getTypeOfData())); + YACS::ENGINE::InputPort *inp(n0->edAddInputPort((*it).getName(),tc)); + YACS::ENGINE::InputPyPort *inpc(dynamic_cast(inp)); + if(!inpc) + throw YACS::Exception("YACSEvalYFXRunOnlyPattern::generateGraph : internal error 1 !"); + (*it).setUndergroundPortToBeSet(inpc); + } + } + std::ostringstream n0Script; n0Script << "sender=[tuple([__p9Sq]+list(__p9Sw)) for __p9Sq,__p9Sw in enumerate(zip(" << var0.str() << "))]\n"; + n0->setScript(n0Script.str()); + // + YACS::ENGINE::ForEachLoop *n1(r->createForEachLoop(oss.str(),pyobjTC)); + _FEInGeneratedGraph=n1; + _generatedGraph->edAddChild(n1); + _generatedGraph->edAddCFLink(n0,n1); + _generatedGraph->edAddDFLink(sender,n1->edGetSeqOfSamplesPort()); + YACS::ENGINE::InlineNode *n2(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,GATHER_NODE_NAME)); + _generatedGraph->edAddChild(n2); + _generatedGraph->edAddCFLink(n1,n2); + // + YACS::ENGINE::Bloc *n10(r->createBloc(FIRST_FE_SUBNODE_NAME)); + n1->edAddChild(n10); + YACS::ENGINE::InlineNode *n100(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__dispatch__")); + YACS::ENGINE::ComposedNode *runNode(getBoss()->getRunNode()); + YACS::ENGINE::Node *n101(runNode->cloneWithoutCompAndContDeepCpy(0,true)); + n10->edAddChild(n100); + n10->edAddChild(n101); + YACS::ENGINE::InputPort *dispatchIn(n100->edAddInputPort("i0",pyobjTC)); + n10->edAddCFLink(n100,n101); + n1->edAddDFLink(n1->edGetSamplePort(),dispatchIn); + std::ostringstream var1; + YACS::ENGINE::OutputPort *n100_output(n100->edAddOutputPort(HIDDEN_INDEX_VAR,tcInt)); + var1 << HIDDEN_INDEX_VAR << ","; + for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++) + { + if((*it).isRandomVar()) + { + var1 << (*it).getName() << ","; + YACS::ENGINE::OutputPort *myOut(n100->edAddOutputPort((*it).getName(),_generatedGraph->getTypeCode((*it).getTypeOfData()))); + std::string tmpPortName(runNode->getInPortName((*it).getUndergroundPtr())); + YACS::ENGINE::InputPort *myIn(n101->getInputPort(tmpPortName)); + n10->edAddDFLink(myOut,myIn); + } + } + std::ostringstream n100Script; n100Script << var1.str() << "=i0\n"; + n100->setScript(n100Script.str()); + const std::vector& outputsOfInt(getBoss()->getOutputsOfInterest()); + { + std::list n101_constit(n101->getRecursiveConstituents()); + for(std::list::const_iterator it=n101_constit.begin();it!=n101_constit.end();it++) + { + YACS::ENGINE::InlineNode *eltc(dynamic_cast(*it)); + if(eltc) + { + YACS::ENGINE::InputPort *n101_input(eltc->edAddInputPort(HIDDEN_INDEX_VAR,tcInt)); + _generatedGraph->edAddDFLink(n100_output,n101_input); + } + } + } + for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++) + { + patcher.addOutputVar((*it)->getName()); + YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it)->getTypeOfData())); + YACS::ENGINE::InputPort *myIn(n2->edAddInputPort((*it)->getName(),tc)); + std::string tmpPortName(runNode->getOutPortName((*it)->getUndergroundPtr())); + YACS::ENGINE::OutputPort *myOut(n101->getOutputPort(tmpPortName)); + _generatedGraph->edAddDFLink(myOut,myIn); + } + patcher.assignOutput(n2); + _generatedGraph->updateContainersAndComponents(); +} + +bool YACSEvalYFXGraphGen::isLocked() const +{ + return _generatedGraph!=0; +} + +int YACSEvalYFXGraphGen::assignNbOfBranches() +{ + if(!_generatedGraph) + throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : the generated graph has not been created !"); + std::list nodes(_generatedGraph->getChildren()); + YACS::ENGINE::ForEachLoop *zeMainNode(0); + for(std::list::const_iterator it=nodes.begin();it!=nodes.end();it++) + { + YACS::ENGINE::ForEachLoop *isZeMainNode(dynamic_cast(*it)); + if(isZeMainNode) + { + if(!zeMainNode) + zeMainNode=isZeMainNode; + else + throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 1 !"); + } + } + if(!zeMainNode) + throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 2 !"); + unsigned int nbProcsDeclared(getBoss()->getResourcesInternal()->getNumberOfProcsDeclared()); + nbProcsDeclared=std::max(nbProcsDeclared,4u); + int nbOfBranch=1; + if(getBoss()->getParallelizeStatus()) + { + nbOfBranch=(nbProcsDeclared/getBoss()->getResourcesInternal()->getMaxLevelOfParallelism()); + nbOfBranch=std::max(nbOfBranch,1); + } + YACS::ENGINE::InputPort *zeInputToSet(zeMainNode->edGetNbOfBranchesPort()); + YACS::ENGINE::AnyInputPort *zeInputToSetC(dynamic_cast(zeInputToSet)); + if(!zeInputToSetC) + throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 3 !"); + YACS::ENGINE::Any *a(YACS::ENGINE::AtomAny::New(nbOfBranch)); + zeInputToSetC->put(a); + zeInputToSetC->exSaveInit(); + a->decrRef(); + return nbOfBranch; +} + +void YACSEvalYFXGraphGenInteractive::generateGraph() +{ + class LocalPatcher : public YACSEvalYFXGraphGen::CustomPatcher + { + public: + void addOutputVar(const std::string& name) { } + void assignOutput(YACS::ENGINE::InlineNode *node) { } + }; + LocalPatcher lp; + this->generateGraphCommon(lp); +} + +bool YACSEvalYFXGraphGenInteractive::go(const YACSEvalExecParams& params, YACSEvalSession *session) const +{ + YACS::ENGINE::Executor exe; + exe.setKeepGoingProperty(!params.getStopASAPAfterErrorStatus()); + { + MyAutoThreadSaver locker(!session->isAlreadyPyThreadSaved()); + exe.RunW(getUndergroundGeneratedGraph()); + } + return getUndergroundGeneratedGraph()->getState()==YACS::DONE; +} + +std::string YACSEvalYFXGraphGenInteractive::getErrors()const +{ + return ""; +} + +std::vector YACSEvalYFXGraphGenInteractive::getResults() const +{ + if(getUndergroundGeneratedGraph()->getState()!=YACS::DONE) + throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResults : the execution did not finished correctly ! getResults should not be called !"); + const std::vector& outputsOfInt(getBoss()->getOutputsOfInterest()); + std::vector ret(outputsOfInt.size()); + YACS::ENGINE::Node *node(getUndergroundGeneratedGraph()->getChildByName(YACSEvalYFXGraphGen::GATHER_NODE_NAME)); + YACS::ENGINE::PythonNode *nodeC(dynamic_cast(node)); + if(!nodeC) + throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResults : internal error !"); + std::size_t ii(0); + for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++,ii++) + { + YACS::ENGINE::InPort *input(nodeC->getInPort((*it)->getName())); + YACS::ENGINE::InputPyPort *inputC(dynamic_cast(input)); + if(!inputC) + { + std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::getResults : internal error for input \"" << (*it)->getName() << "\""; + throw YACS::Exception(oss.str()); + } + ret[ii]=YACSEvalYFXPattern::BuildValueInPort(inputC); + } + return ret; +} + +//////////////////// + +void YACSEvalYFXGraphGenCluster::generateGraph() { - std::list lns(_generatedGraph->edGetDirectDescendants()); - YACS::ENGINE::ForEachLoop *fe(0); - for(std::list::const_iterator it=lns.begin();it!=lns.end();it++) + AutoGIL agil; + // + const char EFXGenFileName[]="EFXGenFileName"; + const char EFXGenContent[]="import getpass,datetime,os\nn=datetime.datetime.now()\nreturn os.path.join(os.path.sep,\"tmp\",\"EvalYFX_%s_%s_%s.xml\"%(getpass.getuser(),n.strftime(\"%d%m%y\"),n.strftime(\"%H%M%S\")))"; + const char EFXGenContent2[]="import getpass,datetime\nn=datetime.datetime.now()\nreturn \"EvalYFX_%s_%s_%s\"%(getpass.getuser(),n.strftime(\"%d%m%y\"),n.strftime(\"%H%M%S\"))"; + // + AutoPyRef func(YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent)); + AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func)); + if (PyUnicode_Check(val)) + _locSchemaFile = PyUnicode_AsUTF8(val); + else + throw YACS::Exception("YACSEvalYFXGraphGenCluster::generateGraph: python call error. "); + + func=YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent2); + val=YACS::ENGINE::evalFuncPyWithNoParams(func); + if (PyUnicode_Check(val)) + _jobName = PyUnicode_AsUTF8(val); + else + throw YACS::Exception("YACSEvalYFXGraphGenCluster::generateGraph: python call error. "); + + class ClusterPatcher : public YACSEvalYFXGraphGen::CustomPatcher + { + public: + ClusterPatcher(const std::string& jobName):_jobName(jobName) { n2Script << "zeRes=["; } + void addOutputVar(const std::string& name) { n2Script<< name << ", "; } + void assignOutput(YACS::ENGINE::InlineNode *node) { + n2Script << "]\nwith open(\"" << _jobName << "\",\"w\") as f:" << std::endl; + n2Script << " f.write(repr(zeRes))" << std::endl; + node->setScript(n2Script.str()); + } + private: + std::ostringstream n2Script; + std::string _jobName; + }; + ClusterPatcher cp(_jobName); + // + this->generateGraphCommon(cp); +} + +bool YACSEvalYFXGraphGenCluster::go(const YACSEvalExecParams& params, YACSEvalSession *session) const +{ + AutoGIL agil; + _errors = ""; + getUndergroundGeneratedGraph()->saveSchema(_locSchemaFile); + YACSEvalListOfResources *rss(getBoss()->getResourcesInternal()); + const YACSEvalParamsForCluster& cli(rss->getAddParamsForCluster()); + std::vector machines(rss->getAllChosenMachines()); + if(machines.size()!=1) + throw YACS::Exception("YACSEvalYFXGraphGenCluster::go : internal error ! In batch mode and not exactly one machine !"); + Engines::SalomeLauncher_var sl(session->getInternal()->goFetchingSalomeLauncherInNS()); + Engines::ResourceParameters rr; + rr.name=CORBA::string_dup(machines[0].c_str()); + rr.hostname=CORBA::string_dup(""); + rr.can_launch_batch_jobs=true; + rr.can_run_containers=true; + rr.OS=CORBA::string_dup("Linux"); + rr.componentList.length(0); + rr.nb_proc=rss->getNumberOfProcsDeclared();// <- important + rr.mem_mb=0; // use default value + rr.cpu_clock=0; // use default value + rr.nb_node=0;// the number of nodes is not explicitly set + rr.nb_proc_per_node=1;// useless only nb_proc used. + rr.policy=CORBA::string_dup("cycl"); + rr.resList.length(0); + Engines::JobParameters jp; + jp.job_name=CORBA::string_dup(_jobName.c_str()); + jp.job_type=CORBA::string_dup("yacs_file"); + jp.job_file=CORBA::string_dup(_locSchemaFile.c_str()); + jp.env_file=CORBA::string_dup(""); + jp.in_files.length(cli.getInFiles().size()); + std::list::const_iterator it; + int i; + for (it = cli.getInFiles().begin(), i=0 ; + it != cli.getInFiles().end(); + it++, i++) + { + jp.in_files[i] = CORBA::string_dup((*it).c_str()); + } + jp.out_files.length(1); + jp.out_files[0]=CORBA::string_dup(_jobName.c_str()); + jp.work_directory=CORBA::string_dup(cli.getRemoteWorkingDir().c_str()); + jp.local_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str()); + jp.result_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str()); + jp.maximum_duration=CORBA::string_dup(cli.getMaxDuration().c_str()); + jp.resource_required=rr; + jp.queue=CORBA::string_dup(""); + jp.exclusive=false; + jp.mem_per_cpu=rr.mem_mb; + jp.wckey=CORBA::string_dup(cli.getWCKey().c_str()); + jp.extra_params=CORBA::string_dup(""); + jp.specific_parameters.length(0); + jp.launcher_file=CORBA::string_dup(""); + jp.launcher_args=CORBA::string_dup(""); + _jobid=sl->createJob(jp); + try + { + sl->launchJob(_jobid); + } + catch (const SALOME::SALOME_Exception & ex) + { + _errors = ex.details.text.in(); + return false; + } + catch (const CORBA::SystemException& ex) + { + _errors = "Receive CORBA SystemException."; + return false; + } + + bool ret(false); + while(true) + { + PyRun_SimpleString("import time ; time.sleep(10)"); + char *state(sl->getJobState(_jobid));//"CREATED", "IN_PROCESS", "QUEUED", "RUNNING", "PAUSED", "FINISHED" or "FAILED" + std::string sstate(state); + CORBA::string_free(state); + if(sstate=="FINISHED" || sstate=="FAILED") + { + ret=sstate=="FINISHED"; + break; + } + } + sl->getJobResults(_jobid,cli.getLocalWorkingDir().c_str()); + // + try { - fe=dynamic_cast(*it); - if(fe) - break; + std::ostringstream oss; oss << "import os" << std::endl << "p=os.path.join(\"" << cli.getLocalWorkingDir() << "\",\"" << _jobName << "\")" << std::endl; + oss << "if not os.path.exists(p):\n return None\n"; + oss << "f=open(p,\"r\")" << std::endl; + oss << "return eval(f.read())"; + std::string zeInput(oss.str()); + AutoPyRef func(YACS::ENGINE::evalPy("fetch",zeInput)); + AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func)); + if(!PyList_Check(val)) + throw YACS::Exception("Fetched file does not contain a list !"); + Py_ssize_t sz(PyList_Size(val)); + _res.resize(sz); + for(Py_ssize_t i=0;i& res0(_res[i]); + PyObject *elt0(PyList_GetItem(val,i)); + if(!PyList_Check(elt0)) + throw YACS::Exception("Fetched file does contain a list of list !"); + Py_ssize_t sz0(PyList_Size(elt0)); res0.resize(sz0); + for(Py_ssize_t j=0;j YACSEvalYFXGraphGenCluster::getResults() const +{ + std::size_t sz(_res.size()); + std::vector ret(sz); + for(std::size_t i=0;i elt(new YACSEvalSeqAnyDouble(_res[i])); + ret[i]=elt.dettach(); + } + return ret; }