From: Anthony Geay Date: Fri, 11 Mar 2016 17:08:54 +0000 (+0100) Subject: First use case using cluster OK. X-Git-Tag: V7_8_0a2~8 X-Git-Url: http://git.salome-platform.org/gitweb/?a=commitdiff_plain;h=3ba5c7fe19bf5fdcb2c683f4e59b2aba27415c71;p=modules%2Fyacs.git First use case using cluster OK. --- diff --git a/src/evalyfx/YACSEvalYFX.cxx b/src/evalyfx/YACSEvalYFX.cxx index 17f46ae71..201bdb65c 100644 --- a/src/evalyfx/YACSEvalYFX.cxx +++ b/src/evalyfx/YACSEvalYFX.cxx @@ -64,7 +64,6 @@ void YACSEvalYFX::lockPortsForEvaluation(const std::vector< YACSEvalInputPort * { checkPortsForEvaluation(inputsOfInterest,outputsOfInterest); _pattern->setOutPortsOfInterestForEvaluation(outputsOfInterest); - _pattern->generateGraph(); } void YACSEvalYFX::unlockAll() @@ -88,17 +87,15 @@ YACSEvalListOfResources *YACSEvalYFX::giveResources() bool YACSEvalYFX::run(YACSEvalSession *session, int& nbOfBranches) { + _pattern->generateGraph(); if(!session) { throw YACS::Exception("YACSEvalYFX::run : input session in null !"); } session->launch(); - // YACSEvalListOfResources *rss(giveResources()); rss->checkOKForRun(); _pattern->assignRandomVarsInputs(); - //if(!rss->isInteractive()) - // throw YACS::Exception("YACSEvalYFX::run : not implemented yet for non interactive !"); rss->apply(); nbOfBranches=_pattern->assignNbOfBranches(); return _pattern->go(_params.getStopASAPAfterErrorStatus(),session); diff --git a/src/evalyfx/YACSEvalYFXPattern.cxx b/src/evalyfx/YACSEvalYFXPattern.cxx index 69bd3ccb3..3c00612bf 100644 --- a/src/evalyfx/YACSEvalYFXPattern.cxx +++ b/src/evalyfx/YACSEvalYFXPattern.cxx @@ -54,8 +54,6 @@ //// #include -const char YACSEvalYFXPattern::DFT_PROC_NAME[]="YFX"; - const char YACSEvalYFXPattern::ST_OK[]="ALL_OK"; const char YACSEvalYFXPattern::ST_FAILED[]="SOME_SAMPLES_FAILED_AND_ALL_OF_THEM_FAILED_DETERMINISTICALLY"; @@ -64,9 +62,11 @@ const char YACSEvalYFXPattern::ST_ERROR[]="SOME_SAMPLES_FAILED_BUT_IMPOSSIBLE_TO const std::size_t YACSEvalYFXPattern::MAX_LGTH_OF_INP_DUMP=10000; -const char YACSEvalYFXRunOnlyPattern::FIRST_FE_SUBNODE_NAME[]="Bloc"; +const char YACSEvalYFXGraphGen::DFT_PROC_NAME[]="YFX"; + +const char YACSEvalYFXGraphGen::FIRST_FE_SUBNODE_NAME[]="Bloc"; -const char YACSEvalYFXRunOnlyPattern::GATHER_NODE_NAME[]="__gather__"; +const char YACSEvalYFXGraphGen::GATHER_NODE_NAME[]="__gather__"; class MyAutoThreadSaver { @@ -193,7 +193,7 @@ YACSEvalYFXPattern::YACSEvalYFXPattern(YACSEvalYFX *boss, YACS::ENGINE::Proc *sc { } -YACS::ENGINE::TypeCode *YACSEvalYFXPattern::createSeqTypeCodeFrom(YACS::ENGINE::Proc *scheme, const std::string& zeType) +YACS::ENGINE::TypeCode *YACSEvalYFXPattern::CreateSeqTypeCodeFrom(YACS::ENGINE::Proc *scheme, const std::string& zeType) { std::ostringstream oss; oss << "list[" << zeType << "]"; YACS::ENGINE::TypeCode *tc(scheme->getTypeCode(zeType)); @@ -315,7 +315,7 @@ void YACSEvalYFXRunOnlyPatternInternalObserver::notifyObserver(YACS::ENGINE::Nod ///////////////////// -YACSEvalYFXRunOnlyPattern::YACSEvalYFXRunOnlyPattern(YACSEvalYFX *boss, YACS::ENGINE::Proc *scheme, bool ownScheme, YACS::ENGINE::ComposedNode *runNode):YACSEvalYFXPattern(boss,scheme,ownScheme),_runNode(runNode),_generatedGraph(0),_FEInGeneratedGraph(0),_obs(new YACSEvalYFXRunOnlyPatternInternalObserver(this)) +YACSEvalYFXRunOnlyPattern::YACSEvalYFXRunOnlyPattern(YACSEvalYFX *boss, YACS::ENGINE::Proc *scheme, bool ownScheme, YACS::ENGINE::ComposedNode *runNode):YACSEvalYFXPattern(boss,scheme,ownScheme),_lockedStatus(false),_runNode(runNode),_gen(0),_obs(new YACSEvalYFXRunOnlyPatternInternalObserver(this)) { if(!_runNode) throw YACS::Exception("YACSEvalYFXRunOnlyPattern : internal run node must be not null !"); @@ -326,141 +326,45 @@ YACSEvalYFXRunOnlyPattern::YACSEvalYFXRunOnlyPattern(YACSEvalYFX *boss, YACS::EN YACSEvalYFXRunOnlyPattern::~YACSEvalYFXRunOnlyPattern() { delete _obs; + delete _gen; } void YACSEvalYFXRunOnlyPattern::setOutPortsOfInterestForEvaluation(const std::vector& outputsOfInterest) { checkNonLocked(); _outputsOfInterest=outputsOfInterest; + _lockedStatus=true; } void YACSEvalYFXRunOnlyPattern::resetOutputsOfInterest() { checkLocked(); _outputsOfInterest.clear(); + _lockedStatus=false; } void YACSEvalYFXRunOnlyPattern::generateGraph() { - static const char LISTPYOBJ_STR[]="list[pyobj]"; - if(_outputsOfInterest.empty()) - return ; - YACS::ENGINE::RuntimeSALOME::setRuntime(); - YACS::ENGINE::RuntimeSALOME *r(YACS::ENGINE::getSALOMERuntime()); - _generatedGraph=r->createProc(DFT_PROC_NAME); - YACS::ENGINE::TypeCode *pyobjTC(_generatedGraph->createInterfaceTc("python:obj:1.0","pyobj",std::list())); - std::ostringstream oss; oss << "Loop_" << _runNode->getName(); - _generatedGraph->createType(YACSEvalAnyDouble::TYPE_REPR,"double"); - _generatedGraph->createType(YACSEvalAnyInt::TYPE_REPR,"int"); - // - YACS::ENGINE::InlineNode *n0(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__initializer__")); - _generatedGraph->edAddChild(n0); - YACS::ENGINE::TypeCode *listPyobjTC(_generatedGraph->createSequenceTc(LISTPYOBJ_STR,LISTPYOBJ_STR,pyobjTC)); - YACS::ENGINE::OutputPort *sender(n0->edAddOutputPort("sender",listPyobjTC)); - std::ostringstream var0; - for(std::vector< YACSEvalInputPort >::const_iterator it=_inputs.begin();it!=_inputs.end();it++) - { - if((*it).isRandomVar()) - { - var0 << (*it).getName() << ","; - YACS::ENGINE::TypeCode *tc(createSeqTypeCodeFrom(_generatedGraph,(*it).getTypeOfData())); - YACS::ENGINE::InputPort *inp(n0->edAddInputPort((*it).getName(),tc)); - YACS::ENGINE::InputPyPort *inpc(dynamic_cast(inp)); - if(!inpc) - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::generateGraph : internal error 1 !"); - (*it).setUndergroundPortToBeSet(inpc); - } - } - std::ostringstream n0Script; n0Script << "sender=zip(" << var0.str() << ")\n"; - n0->setScript(n0Script.str()); - // - YACS::ENGINE::ForEachLoop *n1(r->createForEachLoop(oss.str(),pyobjTC)); - _FEInGeneratedGraph=n1; - _generatedGraph->edAddChild(n1); - _generatedGraph->edAddCFLink(n0,n1); - _generatedGraph->edAddDFLink(sender,n1->edGetSeqOfSamplesPort()); - YACS::ENGINE::InlineNode *n2(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,GATHER_NODE_NAME)); - _generatedGraph->edAddChild(n2); - _generatedGraph->edAddCFLink(n1,n2); - // - YACS::ENGINE::Bloc *n10(r->createBloc(FIRST_FE_SUBNODE_NAME)); - n1->edAddChild(n10); - YACS::ENGINE::InlineNode *n100(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__dispatch__")); - YACS::ENGINE::Node *n101(_runNode->cloneWithoutCompAndContDeepCpy(0,true)); - n10->edAddChild(n100); - n10->edAddChild(n101); - YACS::ENGINE::InputPort *dispatchIn(n100->edAddInputPort("i0",pyobjTC)); - n10->edAddCFLink(n100,n101); - n1->edAddDFLink(n1->edGetSamplePort(),dispatchIn); - std::ostringstream var1; - for(std::vector< YACSEvalInputPort >::const_iterator it=_inputs.begin();it!=_inputs.end();it++) - { - if((*it).isRandomVar()) - { - var1 << (*it).getName() << ","; - YACS::ENGINE::OutputPort *myOut(n100->edAddOutputPort((*it).getName(),_generatedGraph->getTypeCode((*it).getTypeOfData()))); - std::string tmpPortName(_runNode->getInPortName((*it).getUndergroundPtr())); - YACS::ENGINE::InputPort *myIn(n101->getInputPort(tmpPortName)); - n10->edAddDFLink(myOut,myIn); - } - } - std::ostringstream n100Script; n100Script << var1.str() << "=i0\n"; - n100->setScript(n100Script.str()); - for(std::vector< YACSEvalOutputPort * >::const_iterator it=_outputsOfInterest.begin();it!=_outputsOfInterest.end();it++) - { - YACS::ENGINE::TypeCode *tc(createSeqTypeCodeFrom(_generatedGraph,(*it)->getTypeOfData())); - YACS::ENGINE::InputPort *myIn(n2->edAddInputPort((*it)->getName(),tc)); - std::string tmpPortName(_runNode->getOutPortName((*it)->getUndergroundPtr())); - YACS::ENGINE::OutputPort *myOut(n101->getOutputPort(tmpPortName)); - _generatedGraph->edAddDFLink(myOut,myIn); - } - _generatedGraph->updateContainersAndComponents(); + delete _gen; + if(getResourcesInternal()->isInteractive()) + _gen=new YACSEvalYFXGraphGenInteractive(this); + else + _gen=new YACSEvalYFXGraphGenCluster(this); + _gen->generateGraph(); } void YACSEvalYFXRunOnlyPattern::resetGeneratedGraph() { - delete _generatedGraph; - _generatedGraph=0; - resetResources(); + if(_gen) + _gen->resetGeneratedGraph(); } int YACSEvalYFXRunOnlyPattern::assignNbOfBranches() { checkAlreadyComputedResources(); - if(!_generatedGraph) - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::assignNbOfBranches : the generated graph has not been created !"); - std::list nodes(_generatedGraph->getChildren()); - YACS::ENGINE::ForEachLoop *zeMainNode(0); - for(std::list::const_iterator it=nodes.begin();it!=nodes.end();it++) - { - YACS::ENGINE::ForEachLoop *isZeMainNode(dynamic_cast(*it)); - if(isZeMainNode) - { - if(!zeMainNode) - zeMainNode=isZeMainNode; - else - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::assignNbOfBranches : internal error 1 !"); - } - } - if(!zeMainNode) - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::assignNbOfBranches : internal error 2 !"); - unsigned int nbProcsDeclared(getResourcesInternal()->getNumberOfProcsDeclared()); - nbProcsDeclared=std::max(nbProcsDeclared,4u); - int nbOfBranch=1; - if(getParallelizeStatus()) - { - nbOfBranch=(nbProcsDeclared/getResourcesInternal()->getMaxLevelOfParallelism()); - nbOfBranch=std::max(nbOfBranch,1); - } - YACS::ENGINE::InputPort *zeInputToSet(zeMainNode->edGetNbOfBranchesPort()); - YACS::ENGINE::AnyInputPort *zeInputToSetC(dynamic_cast(zeInputToSet)); - if(!zeInputToSetC) - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::assignNbOfBranches : internal error 3 !"); - YACS::ENGINE::Any *a(YACS::ENGINE::AtomAny::New(nbOfBranch)); - zeInputToSetC->put(a); - zeInputToSetC->exSaveInit(); - a->decrRef(); - return nbOfBranch; + if(!_gen) + throw YACS::Exception("YACSEvalYFXRunOnlyPattern::assignNbOfBranches : generator is NULL ! Please invoke generateGraph before !"); + return _gen->assignNbOfBranches(); } void YACSEvalYFXRunOnlyPattern::assignRandomVarsInputs() @@ -480,7 +384,7 @@ void YACSEvalYFXRunOnlyPattern::assignRandomVarsInputs() bool YACSEvalYFXRunOnlyPattern::isLocked() const { - return _generatedGraph!=0; + return _lockedStatus; } YACSEvalListOfResources *YACSEvalYFXRunOnlyPattern::giveResources() @@ -498,7 +402,7 @@ YACSEvalListOfResources *YACSEvalYFXRunOnlyPattern::giveResources() YACS::ENGINE::Proc *YACSEvalYFXRunOnlyPattern::getUndergroundGeneratedGraph() const { - return _generatedGraph; + return getGenerator()->getUndergroundGeneratedGraph(); } std::string YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure() const @@ -507,13 +411,13 @@ std::string YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure() const if(st==ST_OK) throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure : The execution of scheme has been carried out to the end without any problem !"); // All the problem can only comes from foreach -> scan it - YACS::ENGINE::ForEachLoop *fe(findTopForEach()); + YACS::ENGINE::ForEachLoop *fe(getUndergroundForEach()); YACS::ENGINE::NodeStateNameMap nsm; unsigned nbB(fe->getNumberOfBranchesCreatedDyn()); std::ostringstream oss; for(unsigned j=0;jgetChildByNameExec(FIRST_FE_SUBNODE_NAME,j)); + YACS::ENGINE::Node *nn(fe->getChildByNameExec(YACSEvalYFXGraphGen::FIRST_FE_SUBNODE_NAME,j)); YACS::ENGINE::Bloc *nnc(dynamic_cast(nn)); if(!nnc) throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure : internal error 1 ! The direct son of main foreach is expected to be a Bloc !"); @@ -546,7 +450,7 @@ std::string YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure() const std::string YACSEvalYFXRunOnlyPattern::getStatusOfRunStr() const { - YACS::StatesForNode st(_generatedGraph->getState()); + YACS::StatesForNode st(getUndergroundGeneratedGraph()->getState()); switch(st) { case YACS::READY: @@ -581,26 +485,7 @@ std::string YACSEvalYFXRunOnlyPattern::getStatusOfRunStr() const std::vector YACSEvalYFXRunOnlyPattern::getResults() const { - if(_generatedGraph->getState()!=YACS::DONE) - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResults : the execution did not finished correctly ! getResults should not be called !"); - std::vector ret(_outputsOfInterest.size()); - YACS::ENGINE::Node *node(_generatedGraph->getChildByName(GATHER_NODE_NAME)); - YACS::ENGINE::PythonNode *nodeC(dynamic_cast(node)); - if(!nodeC) - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResults : internal error !"); - std::size_t ii(0); - for(std::vector< YACSEvalOutputPort * >::const_iterator it=_outputsOfInterest.begin();it!=_outputsOfInterest.end();it++,ii++) - { - YACS::ENGINE::InPort *input(nodeC->getInPort((*it)->getName())); - YACS::ENGINE::InputPyPort *inputC(dynamic_cast(input)); - if(!inputC) - { - std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::getResults : internal error for input \"" << (*it)->getName() << "\""; - throw YACS::Exception(oss.str()); - } - ret[ii]=BuildValueInPort(inputC); - } - return ret; + return _gen->getResults(); } /*! @@ -609,7 +494,7 @@ std::vector YACSEvalYFXRunOnlyPattern::getResults() const */ std::vector YACSEvalYFXRunOnlyPattern::getResultsInCaseOfFailure(std::vector& passedIds) const { - YACS::StatesForNode st(_generatedGraph->getState()); + YACS::StatesForNode st(getUndergroundGeneratedGraph()->getState()); if(st==YACS::DONE) { passedIds.clear(); @@ -626,8 +511,8 @@ std::vector YACSEvalYFXRunOnlyPattern::getResultsInCaseOfFailu return ret; } getStatusOfRunStr();// To check that the status is recognized. - std::list lns(_generatedGraph->edGetDirectDescendants()); - YACS::ENGINE::ForEachLoop *fe(findTopForEach()); + std::list lns(getUndergroundGeneratedGraph()->edGetDirectDescendants()); + YACS::ENGINE::ForEachLoop *fe(getUndergroundForEach()); // YACS::ENGINE::Executor exe; std::vector outputs; @@ -639,7 +524,7 @@ std::vector YACSEvalYFXRunOnlyPattern::getResultsInCaseOfFailu { YACS::ENGINE::OutputPort *p((*it1)->getUndergroundPtr()); std::string st(_runNode->getOutPortName(p)); - std::ostringstream oss; oss << FIRST_FE_SUBNODE_NAME << '.' << _runNode->getName() << '.' << st; + std::ostringstream oss; oss << YACSEvalYFXGraphGen::FIRST_FE_SUBNODE_NAME << '.' << _runNode->getName() << '.' << st; st=oss.str(); YACS::ENGINE::ForEachLoop::InterceptorizeNameOfPort(st); std::vector::iterator it2(std::find(nameOfOutputs.begin(),nameOfOutputs.end(),st)); @@ -661,80 +546,18 @@ void YACSEvalYFXRunOnlyPattern::emitStart() const YACSEvalObserver *obs(getObserver()); if(!obs) return ; - obs->notifyNumberOfSamplesToEval(getBoss(),_FEInGeneratedGraph->getNbOfElementsToBeProcessed()); + obs->notifyNumberOfSamplesToEval(getBoss(),getUndergroundForEach()->getNbOfElementsToBeProcessed()); } bool YACSEvalYFXRunOnlyPattern::go(bool stopASAP, YACSEvalSession *session) const { emitStart(); - if(getResourcesInternal()->isInteractive()) - { - YACS::ENGINE::Executor exe; - exe.setKeepGoingProperty(!stopASAP); - { - MyAutoThreadSaver locker; - exe.RunW(getUndergroundGeneratedGraph()); - } - return getUndergroundGeneratedGraph()->getState()==YACS::DONE; - } - else - { - const char EFXGenFileName[]="EFXGenFileName"; - const char EFXGenContent[]="import getpass,datetime,os\nn=datetime.datetime.now()\nreturn os.path.join(os.path.sep,\"tmp\",\"EvalYFX_%s_%s_%s.xml\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\")))"; - const char EFXGenContent2[]="import getpass,datetime\nn=datetime.datetime.now()\nreturn \"EvalYFX_%s_%s_%s\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\"))"; - // - YACS::ENGINE::AutoPyRef func(YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent)); - YACS::ENGINE::AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func)); - std::string locSchemaFile(PyString_AsString(val)); - getUndergroundGeneratedGraph()->saveSchema(locSchemaFile); - func=YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent2); - val=YACS::ENGINE::evalFuncPyWithNoParams(func); - std::string jobName(PyString_AsString(val)); - YACSEvalListOfResources *rss(getResourcesInternal()); - const YACSEvalParamsForCluster& cli(rss->getAddParamsForCluster()); - std::vector machines(rss->getAllChosenMachines()); - if(machines.size()!=1) - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::go : internal error ! In batch mode and not exactly one machine !"); - Engines::SalomeLauncher_var sl(session->getInternal()->goFetchingSalomeLauncherInNS()); - Engines::ResourceParameters rr; - rr.name=CORBA::string_dup(machines[0].c_str()); - rr.hostname=CORBA::string_dup(""); - rr.can_launch_batch_jobs=true; - rr.can_run_containers=true; - rr.OS=CORBA::string_dup("Linux"); - rr.componentList.length(0); - rr.nb_proc=rss->getNumberOfProcsDeclared();// <- important - rr.mem_mb=1024; - rr.cpu_clock=1000; - rr.nb_node=1;// useless only nb_proc used. - rr.nb_proc_per_node=1;// useless only nb_proc used. - rr.policy=CORBA::string_dup("cycl"); - rr.resList.length(0); - Engines::JobParameters jp; - jp.job_name=CORBA::string_dup(jobName.c_str()); - jp.job_type=CORBA::string_dup("yacs_file"); - jp.job_file=CORBA::string_dup(locSchemaFile.c_str()); - jp.env_file=CORBA::string_dup(""); - jp.in_files.length(); - jp.out_files.length(); - jp.work_directory=CORBA::string_dup(cli.getRemoteWorkingDir().c_str()); - jp.local_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str()); - jp.result_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str()); - jp.maximum_duration=CORBA::string_dup(cli.getMaxDuration().c_str()); - jp.resource_required=rr; - jp.queue=CORBA::string_dup(""); - jp.exclusive=false; - jp.mem_per_cpu=rr.mem_mb; - jp.wckey=CORBA::string_dup(cli.getWCKey().c_str()); - jp.extra_params=CORBA::string_dup(""); - jp.specific_parameters.length(0); - jp.launcher_file=CORBA::string_dup(""); - jp.launcher_args=CORBA::string_dup(""); - CORBA::Long jobid(sl->createJob(jp)); - sl->launchJob(jobid); - std::cerr << "*** " << jobName << " -> " << jobid << std::endl; - return false; - } + return getGenerator()->go(stopASAP,session); +} + +YACS::ENGINE::ForEachLoop *YACSEvalYFXRunOnlyPattern::getUndergroundForEach() const +{ + return getGenerator()->getUndergroundForEach(); } bool YACSEvalYFXRunOnlyPattern::IsMatching(YACS::ENGINE::Proc *scheme, YACS::ENGINE::ComposedNode *& runNode) @@ -817,17 +640,405 @@ void YACSEvalYFXRunOnlyPattern::buildOutputPorts() } } -YACS::ENGINE::ForEachLoop *YACSEvalYFXRunOnlyPattern::findTopForEach() const +YACSEvalYFXGraphGen *YACSEvalYFXRunOnlyPattern::getGenerator() const +{ + if(!_gen) + throw YACS::Exception("getGenerator : generator is NULL !"); + return _gen; +} + +///////////////////// + +YACSEvalYFXGraphGen::YACSEvalYFXGraphGen(YACSEvalYFXRunOnlyPattern *boss):_boss(boss),_generatedGraph(0),_FEInGeneratedGraph(0) +{ + if(!_boss) + throw YACS::Exception("YACSEvalYFXGraphGen constructor : boss is NULL !"); +} + +YACSEvalYFXGraphGen::~YACSEvalYFXGraphGen() { - std::list lns(_generatedGraph->edGetDirectDescendants()); - YACS::ENGINE::ForEachLoop *fe(0); - for(std::list::const_iterator it=lns.begin();it!=lns.end();it++) + //delete _generatedGraph;// -> TODO : AGY why ? +} + +void YACSEvalYFXGraphGen::resetGeneratedGraph() +{ + delete _generatedGraph; + _generatedGraph=0; _FEInGeneratedGraph=0; +} + +bool YACSEvalYFXGraphGen::isLocked() const +{ + return _generatedGraph!=0; +} + +int YACSEvalYFXGraphGen::assignNbOfBranches() +{ + if(!_generatedGraph) + throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : the generated graph has not been created !"); + std::list nodes(_generatedGraph->getChildren()); + YACS::ENGINE::ForEachLoop *zeMainNode(0); + for(std::list::const_iterator it=nodes.begin();it!=nodes.end();it++) { - fe=dynamic_cast(*it); - if(fe) - break; + YACS::ENGINE::ForEachLoop *isZeMainNode(dynamic_cast(*it)); + if(isZeMainNode) + { + if(!zeMainNode) + zeMainNode=isZeMainNode; + else + throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 1 !"); + } } - if(!fe) - throw YACS::Exception("YACSEvalYFXRunOnlyPattern::findTopForEach : internal error 2 ! ForEach is not accessible !"); - return fe; + if(!zeMainNode) + throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 2 !"); + unsigned int nbProcsDeclared(getBoss()->getResourcesInternal()->getNumberOfProcsDeclared()); + nbProcsDeclared=std::max(nbProcsDeclared,4u); + int nbOfBranch=1; + if(getBoss()->getParallelizeStatus()) + { + nbOfBranch=(nbProcsDeclared/getBoss()->getResourcesInternal()->getMaxLevelOfParallelism()); + nbOfBranch=std::max(nbOfBranch,1); + } + YACS::ENGINE::InputPort *zeInputToSet(zeMainNode->edGetNbOfBranchesPort()); + YACS::ENGINE::AnyInputPort *zeInputToSetC(dynamic_cast(zeInputToSet)); + if(!zeInputToSetC) + throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 3 !"); + YACS::ENGINE::Any *a(YACS::ENGINE::AtomAny::New(nbOfBranch)); + zeInputToSetC->put(a); + zeInputToSetC->exSaveInit(); + a->decrRef(); + return nbOfBranch; +} + +void YACSEvalYFXGraphGenInteractive::generateGraph() +{ + if(_generatedGraph) + { delete _generatedGraph; _generatedGraph=0; _FEInGeneratedGraph=0; } + static const char LISTPYOBJ_STR[]="list[pyobj]"; + if(getBoss()->getOutputsOfInterest().empty()) + return ; + YACS::ENGINE::RuntimeSALOME::setRuntime(); + YACS::ENGINE::RuntimeSALOME *r(YACS::ENGINE::getSALOMERuntime()); + _generatedGraph=r->createProc(DFT_PROC_NAME); + YACS::ENGINE::TypeCode *pyobjTC(_generatedGraph->createInterfaceTc("python:obj:1.0","pyobj",std::list())); + std::ostringstream oss; oss << "Loop_" << getBoss()->getRunNode()->getName(); + _generatedGraph->createType(YACSEvalAnyDouble::TYPE_REPR,"double"); + _generatedGraph->createType(YACSEvalAnyInt::TYPE_REPR,"int"); + // + YACS::ENGINE::InlineNode *n0(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__initializer__")); + _generatedGraph->edAddChild(n0); + YACS::ENGINE::TypeCode *listPyobjTC(_generatedGraph->createSequenceTc(LISTPYOBJ_STR,LISTPYOBJ_STR,pyobjTC)); + YACS::ENGINE::OutputPort *sender(n0->edAddOutputPort("sender",listPyobjTC)); + std::ostringstream var0; + const std::vector< YACSEvalInputPort >& inputs(getBoss()->getInputs()); + for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++) + { + if((*it).isRandomVar()) + { + var0 << (*it).getName() << ","; + YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it).getTypeOfData())); + YACS::ENGINE::InputPort *inp(n0->edAddInputPort((*it).getName(),tc)); + YACS::ENGINE::InputPyPort *inpc(dynamic_cast(inp)); + if(!inpc) + throw YACS::Exception("YACSEvalYFXRunOnlyPattern::generateGraph : internal error 1 !"); + (*it).setUndergroundPortToBeSet(inpc); + } + } + std::ostringstream n0Script; n0Script << "sender=zip(" << var0.str() << ")\n"; + n0->setScript(n0Script.str()); + // + YACS::ENGINE::ForEachLoop *n1(r->createForEachLoop(oss.str(),pyobjTC)); + _FEInGeneratedGraph=n1; + _generatedGraph->edAddChild(n1); + _generatedGraph->edAddCFLink(n0,n1); + _generatedGraph->edAddDFLink(sender,n1->edGetSeqOfSamplesPort()); + YACS::ENGINE::InlineNode *n2(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,GATHER_NODE_NAME)); + _generatedGraph->edAddChild(n2); + _generatedGraph->edAddCFLink(n1,n2); + // + YACS::ENGINE::Bloc *n10(r->createBloc(FIRST_FE_SUBNODE_NAME)); + n1->edAddChild(n10); + YACS::ENGINE::InlineNode *n100(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__dispatch__")); + YACS::ENGINE::ComposedNode *runNode(getBoss()->getRunNode()); + YACS::ENGINE::Node *n101(runNode->cloneWithoutCompAndContDeepCpy(0,true)); + n10->edAddChild(n100); + n10->edAddChild(n101); + YACS::ENGINE::InputPort *dispatchIn(n100->edAddInputPort("i0",pyobjTC)); + n10->edAddCFLink(n100,n101); + n1->edAddDFLink(n1->edGetSamplePort(),dispatchIn); + std::ostringstream var1; + for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++) + { + if((*it).isRandomVar()) + { + var1 << (*it).getName() << ","; + YACS::ENGINE::OutputPort *myOut(n100->edAddOutputPort((*it).getName(),_generatedGraph->getTypeCode((*it).getTypeOfData()))); + std::string tmpPortName(runNode->getInPortName((*it).getUndergroundPtr())); + YACS::ENGINE::InputPort *myIn(n101->getInputPort(tmpPortName)); + n10->edAddDFLink(myOut,myIn); + } + } + std::ostringstream n100Script; n100Script << var1.str() << "=i0\n"; + n100->setScript(n100Script.str()); + const std::vector& outputsOfInt(getBoss()->getOutputsOfInterest()); + for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++) + { + YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it)->getTypeOfData())); + YACS::ENGINE::InputPort *myIn(n2->edAddInputPort((*it)->getName(),tc)); + std::string tmpPortName(runNode->getOutPortName((*it)->getUndergroundPtr())); + YACS::ENGINE::OutputPort *myOut(n101->getOutputPort(tmpPortName)); + _generatedGraph->edAddDFLink(myOut,myIn); + } + _generatedGraph->updateContainersAndComponents(); +} + +bool YACSEvalYFXGraphGenInteractive::go(bool stopASAP, YACSEvalSession *session) const +{ + YACS::ENGINE::Executor exe; + exe.setKeepGoingProperty(!stopASAP); + { + MyAutoThreadSaver locker; + exe.RunW(getUndergroundGeneratedGraph()); + } + return getUndergroundGeneratedGraph()->getState()==YACS::DONE; +} + +std::vector YACSEvalYFXGraphGenInteractive::getResults() const +{ + if(getUndergroundGeneratedGraph()->getState()!=YACS::DONE) + throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResults : the execution did not finished correctly ! getResults should not be called !"); + const std::vector& outputsOfInt(getBoss()->getOutputsOfInterest()); + std::vector ret(outputsOfInt.size()); + YACS::ENGINE::Node *node(getUndergroundGeneratedGraph()->getChildByName(YACSEvalYFXGraphGen::GATHER_NODE_NAME)); + YACS::ENGINE::PythonNode *nodeC(dynamic_cast(node)); + if(!nodeC) + throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResults : internal error !"); + std::size_t ii(0); + for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++,ii++) + { + YACS::ENGINE::InPort *input(nodeC->getInPort((*it)->getName())); + YACS::ENGINE::InputPyPort *inputC(dynamic_cast(input)); + if(!inputC) + { + std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::getResults : internal error for input \"" << (*it)->getName() << "\""; + throw YACS::Exception(oss.str()); + } + ret[ii]=YACSEvalYFXPattern::BuildValueInPort(inputC); + } + return ret; +} + +//////////////////// + +void YACSEvalYFXGraphGenCluster::generateGraph() +{ + if(_generatedGraph) + { delete _generatedGraph; _generatedGraph=0; _FEInGeneratedGraph=0; } + // + const char EFXGenFileName[]="EFXGenFileName"; + const char EFXGenContent[]="import getpass,datetime,os\nn=datetime.datetime.now()\nreturn os.path.join(os.path.sep,\"tmp\",\"EvalYFX_%s_%s_%s.xml\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\")))"; + const char EFXGenContent2[]="import getpass,datetime\nn=datetime.datetime.now()\nreturn \"EvalYFX_%s_%s_%s\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\"))"; + // + YACS::ENGINE::AutoPyRef func(YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent)); + YACS::ENGINE::AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func)); + _locSchemaFile=PyString_AsString(val); + func=YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent2); + val=YACS::ENGINE::evalFuncPyWithNoParams(func); + _jobName=PyString_AsString(val); + // + static const char LISTPYOBJ_STR[]="list[pyobj]"; + if(getBoss()->getOutputsOfInterest().empty()) + return ; + YACS::ENGINE::RuntimeSALOME::setRuntime(); + YACS::ENGINE::RuntimeSALOME *r(YACS::ENGINE::getSALOMERuntime()); + _generatedGraph=r->createProc(DFT_PROC_NAME); + YACS::ENGINE::TypeCode *pyobjTC(_generatedGraph->createInterfaceTc("python:obj:1.0","pyobj",std::list())); + std::ostringstream oss; oss << "Loop_" << getBoss()->getRunNode()->getName(); + _generatedGraph->createType(YACSEvalAnyDouble::TYPE_REPR,"double"); + _generatedGraph->createType(YACSEvalAnyInt::TYPE_REPR,"int"); + // + YACS::ENGINE::InlineNode *n0(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__initializer__")); + _generatedGraph->edAddChild(n0); + YACS::ENGINE::TypeCode *listPyobjTC(_generatedGraph->createSequenceTc(LISTPYOBJ_STR,LISTPYOBJ_STR,pyobjTC)); + YACS::ENGINE::OutputPort *sender(n0->edAddOutputPort("sender",listPyobjTC)); + std::ostringstream var0; + const std::vector< YACSEvalInputPort >& inputs(getBoss()->getInputs()); + for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++) + { + if((*it).isRandomVar()) + { + var0 << (*it).getName() << ","; + YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it).getTypeOfData())); + YACS::ENGINE::InputPort *inp(n0->edAddInputPort((*it).getName(),tc)); + YACS::ENGINE::InputPyPort *inpc(dynamic_cast(inp)); + if(!inpc) + throw YACS::Exception("YACSEvalYFXRunOnlyPattern::generateGraph : internal error 1 !"); + (*it).setUndergroundPortToBeSet(inpc); + } + } + std::ostringstream n0Script; n0Script << "sender=zip(" << var0.str() << ")\n"; + n0->setScript(n0Script.str()); + // + YACS::ENGINE::ForEachLoop *n1(r->createForEachLoop(oss.str(),pyobjTC)); + _FEInGeneratedGraph=n1; + _generatedGraph->edAddChild(n1); + _generatedGraph->edAddCFLink(n0,n1); + _generatedGraph->edAddDFLink(sender,n1->edGetSeqOfSamplesPort()); + YACS::ENGINE::InlineNode *n2(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,GATHER_NODE_NAME)); + _generatedGraph->edAddChild(n2); + _generatedGraph->edAddCFLink(n1,n2); + // + YACS::ENGINE::Bloc *n10(r->createBloc(FIRST_FE_SUBNODE_NAME)); + n1->edAddChild(n10); + YACS::ENGINE::InlineNode *n100(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__dispatch__")); + YACS::ENGINE::ComposedNode *runNode(getBoss()->getRunNode()); + YACS::ENGINE::Node *n101(runNode->cloneWithoutCompAndContDeepCpy(0,true)); + n10->edAddChild(n100); + n10->edAddChild(n101); + YACS::ENGINE::InputPort *dispatchIn(n100->edAddInputPort("i0",pyobjTC)); + n10->edAddCFLink(n100,n101); + n1->edAddDFLink(n1->edGetSamplePort(),dispatchIn); + std::ostringstream var1; + for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++) + { + if((*it).isRandomVar()) + { + var1 << (*it).getName() << ","; + YACS::ENGINE::OutputPort *myOut(n100->edAddOutputPort((*it).getName(),_generatedGraph->getTypeCode((*it).getTypeOfData()))); + std::string tmpPortName(runNode->getInPortName((*it).getUndergroundPtr())); + YACS::ENGINE::InputPort *myIn(n101->getInputPort(tmpPortName)); + n10->edAddDFLink(myOut,myIn); + } + } + std::ostringstream n100Script; n100Script << var1.str() << "=i0\n"; + n100->setScript(n100Script.str()); + const std::vector& outputsOfInt(getBoss()->getOutputsOfInterest()); + std::ostringstream n2Script; n2Script << "zeRes=["; + for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++) + { + YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it)->getTypeOfData())); + YACS::ENGINE::InputPort *myIn(n2->edAddInputPort((*it)->getName(),tc)); + n2Script << (*it)->getName() << ", "; + std::string tmpPortName(runNode->getOutPortName((*it)->getUndergroundPtr())); + YACS::ENGINE::OutputPort *myOut(n101->getOutputPort(tmpPortName)); + _generatedGraph->edAddDFLink(myOut,myIn); + } + n2Script << "]\nf=file(\"" << _jobName << "\",\"w\") ; f.write(str(zeRes)) ; del f"; + n2->setScript(n2Script.str()); + _generatedGraph->updateContainersAndComponents(); +} + +bool YACSEvalYFXGraphGenCluster::go(bool stopASAP, YACSEvalSession *session) const +{ + getUndergroundGeneratedGraph()->saveSchema(_locSchemaFile); + YACSEvalListOfResources *rss(getBoss()->getResourcesInternal()); + const YACSEvalParamsForCluster& cli(rss->getAddParamsForCluster()); + std::vector machines(rss->getAllChosenMachines()); + if(machines.size()!=1) + throw YACS::Exception("YACSEvalYFXGraphGenCluster::go : internal error ! In batch mode and not exactly one machine !"); + Engines::SalomeLauncher_var sl(session->getInternal()->goFetchingSalomeLauncherInNS()); + Engines::ResourceParameters rr; + rr.name=CORBA::string_dup(machines[0].c_str()); + rr.hostname=CORBA::string_dup(""); + rr.can_launch_batch_jobs=true; + rr.can_run_containers=true; + rr.OS=CORBA::string_dup("Linux"); + rr.componentList.length(0); + rr.nb_proc=rss->getNumberOfProcsDeclared();// <- important + rr.mem_mb=1024; + rr.cpu_clock=1000; + rr.nb_node=1;// useless only nb_proc used. + rr.nb_proc_per_node=1;// useless only nb_proc used. + rr.policy=CORBA::string_dup("cycl"); + rr.resList.length(0); + Engines::JobParameters jp; + jp.job_name=CORBA::string_dup(_jobName.c_str()); + jp.job_type=CORBA::string_dup("yacs_file"); + jp.job_file=CORBA::string_dup(_locSchemaFile.c_str()); + jp.env_file=CORBA::string_dup(""); + jp.in_files.length(0); + jp.out_files.length(1); + jp.out_files[0]=CORBA::string_dup(_jobName.c_str()); + jp.work_directory=CORBA::string_dup(cli.getRemoteWorkingDir().c_str()); + jp.local_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str()); + jp.result_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str()); + jp.maximum_duration=CORBA::string_dup(cli.getMaxDuration().c_str()); + jp.resource_required=rr; + jp.queue=CORBA::string_dup(""); + jp.exclusive=false; + jp.mem_per_cpu=rr.mem_mb; + jp.wckey=CORBA::string_dup(cli.getWCKey().c_str()); + jp.extra_params=CORBA::string_dup(""); + jp.specific_parameters.length(0); + jp.launcher_file=CORBA::string_dup(""); + jp.launcher_args=CORBA::string_dup(""); + _jobid=sl->createJob(jp); + sl->launchJob(_jobid); + bool ret(false); + while(true) + { + PyRun_SimpleString("import time ; time.sleep(10)"); + char *state(sl->getJobState(_jobid));//"CREATED", "IN_PROCESS", "QUEUED", "RUNNING", "PAUSED", "FINISHED" or "FAILED" + std::string sstate(state); + CORBA::string_free(state); + if(sstate=="FINISHED" || sstate=="FAILED") + { + ret=sstate=="FINISHED"; + break; + } + } + sl->getJobResults(_jobid,cli.getLocalWorkingDir().c_str()); + // + try + { + std::ostringstream oss; oss << "import os" << std::endl << "p=os.path.join(\"" << cli.getLocalWorkingDir() << "\",\"" << _jobName << "\")" << std::endl; + oss << "if not os.path.exists(p):\n return None\n"; + oss << "f=file(p,\"r\")" << std::endl; + oss << "return eval(f.read())"; + std::string zeInput(oss.str()); + YACS::ENGINE::AutoPyRef func(YACS::ENGINE::evalPy("fetch",zeInput)); + YACS::ENGINE::AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func)); + if(!PyList_Check(val)) + throw YACS::Exception("Fetched file does not contain a list !"); + Py_ssize_t sz(PyList_Size(val)); + _res.resize(sz); + for(Py_ssize_t i=0;i& res0(_res[i]); + PyObject *elt0(PyList_GetItem(val,i)); + if(!PyList_Check(elt0)) + throw YACS::Exception("Fetched file does contain a list of list !"); + Py_ssize_t sz0(PyList_Size(elt0)); res0.resize(sz0); + for(Py_ssize_t j=0;j YACSEvalYFXGraphGenCluster::getResults() const +{ + std::size_t sz(_res.size()); + std::vector ret(sz); + for(std::size_t i=0;i elt(new YACSEvalSeqAnyDouble(_res[i])); + ret[i]=elt.dettach(); + } + return ret; } diff --git a/src/evalyfx/YACSEvalYFXPattern.hxx b/src/evalyfx/YACSEvalYFXPattern.hxx index a53bf4a05..9a1113a9a 100644 --- a/src/evalyfx/YACSEvalYFXPattern.hxx +++ b/src/evalyfx/YACSEvalYFXPattern.hxx @@ -67,6 +67,7 @@ public: void registerObserver(YACSEvalObserver *observer); YACSEvalObserver *getObserver() const { return _observer; } YACSEvalYFX *getBoss() const { return _boss; } + YACSEvalListOfResources *getResourcesInternal() const { return _res; } virtual void setOutPortsOfInterestForEvaluation(const std::vector& outputs) = 0; virtual void resetOutputsOfInterest() = 0; virtual void generateGraph() = 0; @@ -81,16 +82,15 @@ public: virtual std::vector getResults() const = 0; virtual std::vector getResultsInCaseOfFailure(std::vector& passedIds) const = 0; virtual bool go(bool stopASAP, YACSEvalSession *session) const = 0; -public: - static const char DFT_PROC_NAME[]; +public://for _gen + const std::vector< YACSEvalInputPort >& getInputs() const { return _inputs; } + static YACS::ENGINE::TypeCode *CreateSeqTypeCodeFrom(YACS::ENGINE::Proc *scheme, const std::string& zeType); + static YACSEvalSeqAny *BuildValueInPort(YACS::ENGINE::InputPyPort *port); protected: YACSEvalYFXPattern(YACSEvalYFX *boss, YACS::ENGINE::Proc *scheme, bool ownScheme); - YACS::ENGINE::TypeCode *createSeqTypeCodeFrom(YACS::ENGINE::Proc *scheme, const std::string& zeType); void setResources(YACSEvalListOfResources *res); void resetResources(); - YACSEvalListOfResources *getResourcesInternal() const { return _res; } ResourcesManager_cpp *getCatalogInAppli() const { return _rm; } - static YACSEvalSeqAny *BuildValueInPort(YACS::ENGINE::InputPyPort *port); static YACSEvalSeqAny *BuildValueFromEngineFrmt(YACS::ENGINE::SequenceAny *data); private: void cleanScheme(); @@ -112,6 +112,8 @@ public: static const std::size_t MAX_LGTH_OF_INP_DUMP; }; +class YACSEvalYFXGraphGen; + class YACSEvalYFXRunOnlyPattern : public YACSEvalYFXPattern { public: @@ -132,22 +134,72 @@ public: std::vector getResultsInCaseOfFailure(std::vector& passedIds) const; bool go(bool stopASAP, YACSEvalSession *session) const; // - YACS::ENGINE::ForEachLoop *getUndergroundForEach() const { return _FEInGeneratedGraph; } + YACS::ENGINE::ForEachLoop *getUndergroundForEach() const; static bool IsMatching(YACS::ENGINE::Proc *scheme, YACS::ENGINE::ComposedNode *& runNode); -public: - static const char FIRST_FE_SUBNODE_NAME[]; - static const char GATHER_NODE_NAME[]; private: void emitStart() const; void buildInputPorts(); void buildOutputPorts(); - YACS::ENGINE::ForEachLoop *findTopForEach() const; + YACSEvalYFXGraphGen *getGenerator() const; +public://for _gen + const std::vector& getOutputsOfInterest() const { return _outputsOfInterest; } + YACS::ENGINE::ComposedNode *getRunNode() const { return _runNode; } private: + bool _lockedStatus; YACS::ENGINE::ComposedNode *_runNode; std::vector _outputsOfInterest; + YACSEvalYFXGraphGen *_gen; + YACSEvalYFXRunOnlyPatternInternalObserver *_obs; +}; + +class YACSEvalYFXGraphGen +{ +protected: + YACSEvalYFXGraphGen(YACSEvalYFXRunOnlyPattern *boss); + YACSEvalYFXRunOnlyPattern *getBoss() const { return _boss; } +public: + virtual ~YACSEvalYFXGraphGen(); + virtual void generateGraph() = 0; + virtual bool go(bool stopASAP, YACSEvalSession *session) const = 0; + virtual std::vector getResults() const = 0; + bool isLocked() const; + int assignNbOfBranches(); + void resetGeneratedGraph(); + YACS::ENGINE::Proc *getUndergroundGeneratedGraph() const { return _generatedGraph; } + YACS::ENGINE::ForEachLoop *getUndergroundForEach() const { return _FEInGeneratedGraph; } +private: + YACSEvalYFXRunOnlyPattern *_boss; +protected: YACS::ENGINE::Proc *_generatedGraph; YACS::ENGINE::ForEachLoop *_FEInGeneratedGraph; - YACSEvalYFXRunOnlyPatternInternalObserver *_obs; +public: + static const char DFT_PROC_NAME[]; + static const char FIRST_FE_SUBNODE_NAME[]; + static const char GATHER_NODE_NAME[]; +}; + +class YACSEvalYFXGraphGenInteractive : public YACSEvalYFXGraphGen +{ +public: + YACSEvalYFXGraphGenInteractive(YACSEvalYFXRunOnlyPattern *boss):YACSEvalYFXGraphGen(boss) { } + void generateGraph(); + bool go(bool stopASAP, YACSEvalSession *session) const; + std::vector getResults() const; +}; + +class YACSEvalYFXGraphGenCluster : public YACSEvalYFXGraphGen +{ +public: + YACSEvalYFXGraphGenCluster(YACSEvalYFXRunOnlyPattern *boss):YACSEvalYFXGraphGen(boss) { } + void generateGraph(); + bool go(bool stopASAP, YACSEvalSession *session) const; + std::vector getResults() const; +private: + std::string _locSchemaFile; + std::string _jobName; + mutable int _jobid; + mutable std::string _errors; + mutable std::vector< std::vector > _res; }; #endif diff --git a/src/evalyfx_swig/test1.py b/src/evalyfx_swig/test1.py index 05520f719..6a28ed1f2 100644 --- a/src/evalyfx_swig/test1.py +++ b/src/evalyfx_swig/test1.py @@ -53,6 +53,7 @@ for inp in inps: efx.lockPortsForEvaluation(inps,outps) rss=efx.giveResources() rss[0][0].setWantedMachine("localhost") +assert(rss.isInteractive()) a,b=efx.run(session) ; assert(a) efx.unlockAll() for ii,inp in enumerate(inps): diff --git a/src/evalyfx_swig/test2.py b/src/evalyfx_swig/test2.py index 14048755f..0855c799e 100644 --- a/src/evalyfx_swig/test2.py +++ b/src/evalyfx_swig/test2.py @@ -56,6 +56,7 @@ inps[0].setSequenceOfValuesToEval([1.,2.,3.,4.,5.,6.]) efx.lockPortsForEvaluation(inps,outps) rss=efx.giveResources() rss[0][0].setWantedMachine("localhost") +assert(rss.isInteractive()) a,b=efx.run(session) assert(efx.getStatusOfRunStr()=='SOME_SAMPLES_FAILED_AND_ALL_OF_THEM_FAILED_DETERMINISTICALLY') c,d=efx.getResultsInCaseOfFailure()