1 // Copyright (C) 2012-2016 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 // Author : Anthony Geay (EDF R&D)
21 #include "YACSEvalYFXPattern.hxx"
22 #include "YACSEvalResource.hxx"
23 #include "YACSEvalSeqAny.hxx"
24 #include "YACSEvalSession.hxx"
25 #include "YACSEvalObserver.hxx"
26 #include "YACSEvalSessionInternal.hxx"
27 #include "YACSEvalAutoPtr.hxx"
29 #include "ElementaryNode.hxx"
30 #include "RuntimeSALOME.hxx"
31 #include "Dispatcher.hxx"
32 #include "Executor.hxx"
33 #include "InputPort.hxx"
34 #include "LinkInfo.hxx"
35 #include "TypeCode.hxx"
38 #include "PythonPorts.hxx"
39 #include "ForEachLoop.hxx"
40 #include "PythonNode.hxx"
41 #include "InlineNode.hxx"
42 #include "ServiceNode.hxx"
43 #include "PyStdout.hxx"
44 #include "AutoGIL.hxx"
46 #include "ResourcesManager.hxx"
57 const char YACSEvalYFXPattern::ST_OK[]="ALL_OK";
59 const char YACSEvalYFXPattern::ST_FAILED[]="SOME_SAMPLES_FAILED_AND_ALL_OF_THEM_FAILED_DETERMINISTICALLY";
61 const char YACSEvalYFXPattern::ST_ERROR[]="SOME_SAMPLES_FAILED_BUT_IMPOSSIBLE_TO_CONCLUDE_ON_THEM";
63 const std::size_t YACSEvalYFXPattern::MAX_LGTH_OF_INP_DUMP=10000;
65 const char YACSEvalYFXGraphGen::DFT_PROC_NAME[]="YFX";
67 const char YACSEvalYFXGraphGen::FIRST_FE_SUBNODE_NAME[]="Bloc";
69 const char YACSEvalYFXGraphGen::GATHER_NODE_NAME[]="__gather__";
71 class MyAutoThreadSaver
74 MyAutoThreadSaver(bool isToSave):_isToSave(isToSave),_save(0) { if(_isToSave) _save=PyEval_SaveThread(); }
75 ~MyAutoThreadSaver() { if(_isToSave) PyEval_RestoreThread(_save); }
81 std::vector< YACSEvalInputPort *> YACSEvalYFXPattern::getFreeInputPorts() const
83 std::size_t sz(_inputs.size());
84 std::vector< YACSEvalInputPort *> ret;
85 std::vector< YACSEvalInputPort >::const_iterator it(_inputs.begin());
86 for(std::size_t i=0;i<sz;i++,it++)
87 ret.push_back(const_cast<YACSEvalInputPort *>(&(*it)));
91 std::vector< YACSEvalOutputPort *> YACSEvalYFXPattern::getFreeOutputPorts() const
93 std::size_t sz(_outputs.size());
94 std::vector< YACSEvalOutputPort *> ret;
95 std::vector< YACSEvalOutputPort >::const_iterator it(_outputs.begin());
96 for(std::size_t i=0;i<sz;i++,it++)
97 ret.push_back(const_cast<YACSEvalOutputPort *>(&(*it)));
101 YACSEvalYFXPattern *YACSEvalYFXPattern::FindPatternFrom(YACSEvalYFX *boss, YACS::ENGINE::Proc *scheme, bool ownScheme)
104 throw YACS::Exception("YACSEvalYFXPattern::FindPatternFrom : input scheme must be not null !");
106 YACS::ENGINE::ComposedNode *zeRunNode(0);
107 bool isMatchingRunOnlyPattern(YACSEvalYFXRunOnlyPattern::IsMatching(scheme,zeRunNode));
108 if(isMatchingRunOnlyPattern)
109 return new YACSEvalYFXRunOnlyPattern(boss,scheme,ownScheme,zeRunNode);
111 throw YACS::Exception("YACSEvalYFXPattern::FindPatternFrom : no pattern found for the input scheme !");
114 bool YACSEvalYFXPattern::isAlreadyComputedResources() const
119 void YACSEvalYFXPattern::checkNonAlreadyComputedResources() const
121 if(isAlreadyComputedResources())
122 throw YACS::Exception("checkNonAlreadyComputedResources : instance of computed resources already computed !");
125 void YACSEvalYFXPattern::checkAlreadyComputedResources() const
127 if(!isAlreadyComputedResources())
128 throw YACS::Exception("checkAlreadyComputedResources : instance of computed resources not already computed !");
131 void YACSEvalYFXPattern::checkLocked() const
134 throw YACS::Exception("YACSEvalYFXPattern::checkLocked : Pattern is not locked !");
137 void YACSEvalYFXPattern::checkNonLocked() const
140 throw YACS::Exception("YACSEvalYFXPattern::checkNonLocked : Pattern is locked !");
143 void YACSEvalYFXPattern::CheckNodeIsOK(YACS::ENGINE::ComposedNode *node)
145 /*YACS::ENGINE::LinkInfo info(YACS::ENGINE::LinkInfo::WARN_ONLY_DONT_STOP);
148 node->checkConsistency(info);
150 catch(YACS::Exception& e)
153 if(info.getNumberOfErrLinks(YACS::ENGINE::E_ALL)!=0)
154 throw YACS::Exception("YACSEvalYFXPattern::CheckNodeIsOK : found node is not OK !");
155 std::list<YACS::ENGINE::ElementaryNode *> allNodes(node->getRecursiveConstituents());
156 for(std::list<YACS::ENGINE::ElementaryNode *>::const_iterator it=allNodes.begin();it!=allNodes.end();it++)
158 YACS::ENGINE::ServiceNode *node0(dynamic_cast<YACS::ENGINE::ServiceNode *>(*it));
159 YACS::ENGINE::InlineNode *node1(dynamic_cast<YACS::ENGINE::InlineNode *>(*it));
162 YACS::ENGINE::Container *cont(node0->getContainer());
163 YACS::ENGINE::ComponentInstance *comp(node0->getComponent());
166 std::ostringstream oss; oss << "YACSEvalYFXPattern::CheckNodeIsOK : ServiceNode called \"" << node0->getName() << "\" is not correctly defined !";
167 throw YACS::Exception(oss.str());
172 YACS::ENGINE::Container *cont(node1->getContainer());
173 if(!cont && node1->getExecutionMode()==YACS::ENGINE::InlineNode::REMOTE_STR)
175 std::ostringstream oss; oss << "YACSEvalYFXPattern::CheckNodeIsOK : InlineNode called \"" << node1->getName() << "\" is not correctly defined !";
176 throw YACS::Exception(oss.str());
182 void YACSEvalYFXPattern::registerObserver(YACSEvalObserver *observer)
184 if(_observer==observer)
187 _observer->decrRef();
190 _observer->incrRef();
193 YACSEvalYFXPattern::YACSEvalYFXPattern(YACSEvalYFX *boss, YACS::ENGINE::Proc *scheme, bool ownScheme):_boss(boss),_scheme(scheme),_ownScheme(ownScheme),_parallelizeStatus(true),_rm(new ResourcesManager_cpp),_res(0),_observer(0)
197 YACS::ENGINE::TypeCode *YACSEvalYFXPattern::CreateSeqTypeCodeFrom(YACS::ENGINE::Proc *scheme, const std::string& zeType)
199 std::ostringstream oss; oss << "list[" << zeType << "]";
200 YACS::ENGINE::TypeCode *tc(scheme->getTypeCode(zeType));
201 return scheme->createSequenceTc(oss.str(),oss.str(),tc);
204 void YACSEvalYFXPattern::setResources(YACSEvalListOfResources *res)
206 checkNonAlreadyComputedResources();
212 void YACSEvalYFXPattern::resetResources()
218 YACSEvalSeqAny *YACSEvalYFXPattern::BuildValueInPort(YACS::ENGINE::InputPyPort *port)
221 throw YACS::Exception("YACSEvalYFXPattern::GetValueInPort : null input port !");
222 PyObject *obj(port->getPyObj());
223 YACS::ENGINE::TypeCode *tc(port->edGetType());
224 YACS::ENGINE::TypeCodeSeq *tcc(dynamic_cast<YACS::ENGINE::TypeCodeSeq *>(tc));
227 std::ostringstream oss; oss << "YACSEvalYFXPattern::GetValueInPort : internal error for tc of input \"" << port->getName() << "\"";
228 throw YACS::Exception(oss.str());
230 const YACS::ENGINE::TypeCode *tcct(tcc->contentType());
231 if(!PyList_Check(obj))
232 throw YACS::Exception("YACSEvalYFXPattern::GetValueInPort : internal error 2 !");
233 std::size_t sz(PyList_Size(obj));
234 if(tcct->kind()==YACS::ENGINE::Double)
236 std::vector<double> eltCpp(sz);
237 for(std::size_t i=0;i<sz;i++)
239 PyObject *elt(PyList_GetItem(obj,i));
240 eltCpp[i]=PyFloat_AsDouble(elt);
242 YACS::AutoCppPtr<YACSEvalSeqAnyDouble> elt(new YACSEvalSeqAnyDouble(eltCpp));
243 return elt.dettach();
245 else if(tcct->kind()==YACS::ENGINE::Int)
247 std::vector<int> eltCpp(sz);
248 for(std::size_t i=0;i<sz;i++)
250 PyObject *elt(PyList_GetItem(obj,i));
251 eltCpp[i]=PyInt_AsLong(elt);
253 YACS::AutoCppPtr<YACSEvalSeqAnyInt> elt(new YACSEvalSeqAnyInt(eltCpp));
254 return elt.dettach();
257 throw YACS::Exception("YACSEvalYFXPattern::GetValueInPort : not implemented yet for other than Double and Int !");
260 YACSEvalSeqAny *YACSEvalYFXPattern::BuildValueFromEngineFrmt(YACS::ENGINE::SequenceAny *data)
262 unsigned int sz(data->size());
263 std::vector<double> eltCpp(sz);
264 for(unsigned int ii=0;ii<sz;ii++)
266 YACS::ENGINE::AnyPtr elt((*data)[ii]);
267 YACS::ENGINE::Any *eltPtr((YACS::ENGINE::Any *)elt);
268 YACS::ENGINE::AtomAny *eltPtr2(dynamic_cast<YACS::ENGINE::AtomAny *>(eltPtr));
271 std::ostringstream oss; oss << "YACSEvalYFXPattern::BuildValueFromEngineFrmt : error at pos #" << ii << " ! It is not an AtomAny !";
272 throw YACS::Exception(oss.str());
274 eltCpp[ii]=eltPtr2->getDoubleValue();
276 return new YACSEvalSeqAnyDouble(eltCpp);
279 void YACSEvalYFXPattern::cleanScheme()
286 YACSEvalYFXPattern::~YACSEvalYFXPattern()
289 _observer->decrRef();
294 /////////////////////
296 class YACSEvalYFXRunOnlyPatternInternalObserver : public YACS::ENGINE::Observer
299 YACSEvalYFXRunOnlyPatternInternalObserver(YACSEvalYFXRunOnlyPattern *boss):_boss(boss) { if(!_boss) throw YACS::Exception("YACSEvalYFXRunOnlyPatternInternalObserver constructor : null boss not supported :)"); }
300 void notifyObserver(YACS::ENGINE::Node *object, const std::string& event);
302 YACSEvalYFXRunOnlyPattern *_boss;
305 void YACSEvalYFXRunOnlyPatternInternalObserver::notifyObserver(YACS::ENGINE::Node *object, const std::string& event)
307 YACS::ENGINE::ForEachLoop *object2(dynamic_cast<YACS::ENGINE::ForEachLoop *>(object));
310 YACSEvalObserver *obs(_boss->getObserver());
313 if(event=="progress")
314 obs->notifyNewNumberOfPassedItems(_boss->getBoss(),object2->getCurrentIndex());
317 /////////////////////
319 YACSEvalYFXRunOnlyPattern::YACSEvalYFXRunOnlyPattern(YACSEvalYFX *boss, YACS::ENGINE::Proc *scheme, bool ownScheme, YACS::ENGINE::ComposedNode *runNode):YACSEvalYFXPattern(boss,scheme,ownScheme),_lockedStatus(false),_runNode(runNode),_gen(0),_obs(new YACSEvalYFXRunOnlyPatternInternalObserver(this))
322 throw YACS::Exception("YACSEvalYFXRunOnlyPattern : internal run node must be not null !");
327 YACSEvalYFXRunOnlyPattern::~YACSEvalYFXRunOnlyPattern()
333 void YACSEvalYFXRunOnlyPattern::setOutPortsOfInterestForEvaluation(const std::vector<YACSEvalOutputPort *>& outputsOfInterest)
336 _outputsOfInterest=outputsOfInterest;
340 void YACSEvalYFXRunOnlyPattern::resetOutputsOfInterest()
343 _outputsOfInterest.clear();
347 void YACSEvalYFXRunOnlyPattern::generateGraph()
350 if(getResourcesInternal()->isInteractive())
351 _gen=new YACSEvalYFXGraphGenInteractive(this);
353 _gen=new YACSEvalYFXGraphGenCluster(this);
354 _gen->generateGraph();
357 void YACSEvalYFXRunOnlyPattern::resetGeneratedGraph()
360 _gen->resetGeneratedGraph();
363 int YACSEvalYFXRunOnlyPattern::assignNbOfBranches()
365 checkAlreadyComputedResources();
367 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::assignNbOfBranches : generator is NULL ! Please invoke generateGraph before !");
368 return _gen->assignNbOfBranches();
371 void YACSEvalYFXRunOnlyPattern::assignRandomVarsInputs()
373 std::size_t sz(std::numeric_limits<std::size_t>::max());
374 for(std::vector< YACSEvalInputPort >::const_iterator it=_inputs.begin();it!=_inputs.end();it++)
375 if((*it).isRandomVar())
377 std::size_t locSize((*it).initializeUndergroundWithSeq());
378 if(sz==std::numeric_limits<std::size_t>::max())
382 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::assignRandomVarsInputs : length of sequences in random vars must be the same !");
386 bool YACSEvalYFXRunOnlyPattern::isLocked() const
388 return _lockedStatus;
391 YACSEvalListOfResources *YACSEvalYFXRunOnlyPattern::giveResources()
394 if(!isAlreadyComputedResources())
396 YACS::ENGINE::DeploymentTree dt(_runNode->getDeploymentTree());
397 _runNode->removeRecursivelyRedundantCL();
398 YACSEvalListOfResources *res(new YACSEvalListOfResources(_runNode->getMaxLevelOfParallelism(),getCatalogInAppli(),dt));
401 return getResourcesInternal();
404 YACS::ENGINE::Proc *YACSEvalYFXRunOnlyPattern::getUndergroundGeneratedGraph() const
406 return getGenerator()->getUndergroundGeneratedGraph();
409 std::string YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure() const
411 std::string st(getStatusOfRunStr());//test if a run has occurred.
413 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure : The execution of scheme has been carried out to the end without any problem !");
414 // All the problem can only comes from foreach -> scan it
415 YACS::ENGINE::ForEachLoop *fe(getUndergroundForEach());
416 YACS::ENGINE::NodeStateNameMap nsm;
417 unsigned nbB(fe->getNumberOfBranchesCreatedDyn());
418 std::ostringstream oss;
419 for(unsigned j=0;j<nbB;j++)
421 YACS::ENGINE::Node *nn(fe->getChildByNameExec(YACSEvalYFXGraphGen::FIRST_FE_SUBNODE_NAME,j));
422 YACS::ENGINE::Bloc *nnc(dynamic_cast<YACS::ENGINE::Bloc *>(nn));
424 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure : internal error 1 ! The direct son of main foreach is expected to be a Bloc !");
425 if(nnc->getState()==YACS::DONE)
427 std::list< YACS::ENGINE::ElementaryNode *> fec(nnc->getRecursiveConstituents());
428 for(std::list< YACS::ENGINE::ElementaryNode *>::reverse_iterator it1=fec.rbegin();it1!=fec.rend();it1++)
430 YACS::StatesForNode st0((*it1)->getState());
433 oss << "NODE = " << nnc->getChildName(*it1) << std::endl;
434 oss << "STATUS = " << nsm[st0] << std::endl;
435 oss << "BRANCH ID = " << j << std::endl;
436 std::list<YACS::ENGINE::InputPort *> inps((*it1)->getSetOfInputPort());
437 for(std::list<YACS::ENGINE::InputPort *>::const_iterator it2=inps.begin();it2!=inps.end();it2++)
439 std::string d((*it2)->getHumanRepr());
441 d=d.substr(0,MAX_LGTH_OF_INP_DUMP);
442 oss << "INPUT \"" << (*it2)->getName() << "\" = " << d << std::endl;
444 oss << "DETAILS = " << std::endl;
445 oss << (*it1)->getErrorDetails();
452 std::string YACSEvalYFXRunOnlyPattern::getStatusOfRunStr() const
454 YACS::StatesForNode st(getUndergroundGeneratedGraph()->getState());
460 case YACS::TOACTIVATE:
461 case YACS::ACTIVATED:
462 case YACS::SUSPENDED:
465 case YACS::DESACTIVATED:
467 std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::getStatusOfRunStr : Unexpected state \"" << YACS::ENGINE::Node::getStateName(st) << "\" ! Did you invoke run ?";
468 throw YACS::Exception(oss.str());
470 case YACS::LOADFAILED:
471 case YACS::EXECFAILED:
473 case YACS::INTERNALERR:
474 return std::string(ST_ERROR);
476 return std::string(ST_FAILED);
478 return std::string(ST_OK);
481 std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::getStatusOfRunStr : unrecognized and managed state \"" << YACS::ENGINE::Node::getStateName(st) << "\" !";
482 throw YACS::Exception(oss.str());
487 std::vector<YACSEvalSeqAny *> YACSEvalYFXRunOnlyPattern::getResults() const
489 return _gen->getResults();
493 * This method works if run succeeded (true return) and also if graph has failed. Graph failed means soft error of evaluation due to error in evaluation (example 1/0 or a normal throw from one node)
494 * If a more serious error occured (SIGSEGV of a server or internal error in YACS engine, cluster error, loose of connection...) this method will throw an exception to warn the caller that the results may be
496 std::vector<YACSEvalSeqAny *> YACSEvalYFXRunOnlyPattern::getResultsInCaseOfFailure(std::vector<unsigned int>& passedIds) const
498 YACS::StatesForNode st(getUndergroundGeneratedGraph()->getState());
502 std::vector<YACSEvalSeqAny *> ret(getResults());
506 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResultsInCaseOfFailure : internal error ! The returned vector has a null pointer at pos #0 !");
507 std::size_t sz(ret[0]->size());
508 passedIds.resize(sz);
509 for(std::size_t i=0;i<sz;i++)
514 getStatusOfRunStr();// To check that the status is recognized.
515 std::list<YACS::ENGINE::Node *> lns(getUndergroundGeneratedGraph()->edGetDirectDescendants());
516 YACS::ENGINE::ForEachLoop *fe(getUndergroundForEach());
518 YACS::ENGINE::Executor exe;
519 std::vector<YACS::ENGINE::SequenceAny *> outputs;
520 std::vector<std::string> nameOfOutputs;
521 passedIds=fe->getPassedResults(&exe,outputs,nameOfOutputs);//<- the key invokation is here.
522 std::size_t sz(passedIds.size()),ii(0);
523 std::vector<YACSEvalSeqAny *> ret(_outputsOfInterest.size());
524 for(std::vector<YACSEvalOutputPort *>::const_iterator it1=_outputsOfInterest.begin();it1!=_outputsOfInterest.end();it1++,ii++)
526 YACS::ENGINE::OutputPort *p((*it1)->getUndergroundPtr());
527 std::string st(_runNode->getOutPortName(p));
528 std::ostringstream oss; oss << YACSEvalYFXGraphGen::FIRST_FE_SUBNODE_NAME << '.' << _runNode->getName() << '.' << st;
530 YACS::ENGINE::ForEachLoop::InterceptorizeNameOfPort(st);
531 std::vector<std::string>::iterator it2(std::find(nameOfOutputs.begin(),nameOfOutputs.end(),st));
532 if(it2==nameOfOutputs.end())
534 std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::getResultsInCaseOfFailure : internal error 3 ! Unable to locate interceptor with name " << st << " ! Possibilities are : ";
535 std::copy(nameOfOutputs.begin(),nameOfOutputs.end(),std::ostream_iterator<std::string>(oss," "));
537 throw YACS::Exception(oss.str());
539 std::size_t pos(std::distance(nameOfOutputs.begin(),it2));
540 ret[ii]=BuildValueFromEngineFrmt(outputs[pos]);
545 void YACSEvalYFXRunOnlyPattern::emitStart() const
547 YACSEvalObserver *obs(getObserver());
550 obs->notifyNumberOfSamplesToEval(getBoss(),getUndergroundForEach()->getNbOfElementsToBeProcessed());
553 bool YACSEvalYFXRunOnlyPattern::go(bool stopASAP, YACSEvalSession *session) const
556 return getGenerator()->go(stopASAP,session);
559 YACS::ENGINE::ForEachLoop *YACSEvalYFXRunOnlyPattern::getUndergroundForEach() const
561 return getGenerator()->getUndergroundForEach();
564 bool YACSEvalYFXRunOnlyPattern::IsMatching(YACS::ENGINE::Proc *scheme, YACS::ENGINE::ComposedNode *& runNode)
566 std::list<YACS::ENGINE::Node *> nodes(scheme->getChildren());
569 bool areAllElementary(true);
570 for(std::list<YACS::ENGINE::Node *>::const_iterator it=nodes.begin();it!=nodes.end() && areAllElementary;it++)
571 if(!dynamic_cast<YACS::ENGINE::ElementaryNode *>(*it))
572 areAllElementary=false;
576 CheckNodeIsOK(scheme);
582 YACS::ENGINE::ComposedNode *candidate(dynamic_cast<YACS::ENGINE::ComposedNode *>(nodes.front()));
585 CheckNodeIsOK(candidate);
589 void YACSEvalYFXRunOnlyPattern::buildInputPorts()
592 std::list< YACS::ENGINE::InputPort *> allInputPorts(_runNode->getSetOfInputPort());
593 std::vector<std::string> allNames;
594 for(std::list< YACS::ENGINE::InputPort *>::const_iterator it=allInputPorts.begin();it!=allInputPorts.end();it++)
596 YACS::ENGINE::InputPort *elt(*it);
598 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::buildInputPorts : presence of null input !");
599 std::set<YACS::ENGINE::OutPort *> bls(elt->edSetOutPort());
602 if(YACSEvalPort::IsInputPortPublishable(elt))
604 std::string inpName(elt->getName());
606 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::buildInputPorts : an input has empty name ! Should not !");
607 _inputs.push_back(YACSEvalInputPort(elt));
608 if(std::find(allNames.begin(),allNames.end(),inpName)!=allNames.end())
610 std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::buildInputPorts : input name \"" << inpName << "\" appears more than once !";
611 throw YACS::Exception(oss.str());
613 allNames.push_back(inpName);
619 void YACSEvalYFXRunOnlyPattern::buildOutputPorts()
622 std::list< YACS::ENGINE::OutputPort *> allOutputPorts(_runNode->getSetOfOutputPort());
623 std::vector<std::string> allNames;
624 for(std::list< YACS::ENGINE::OutputPort *>::const_iterator it=allOutputPorts.begin();it!=allOutputPorts.end();it++)
626 YACS::ENGINE::OutputPort *elt(*it);
627 if(YACSEvalPort::IsOutputPortPublishable(elt))
630 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::buildOutputPorts : presence of null output !");
631 std::string outpName(elt->getName());
633 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::buildOutputPorts : an output has empty name ! Should not !");
634 if(std::find(allNames.begin(),allNames.end(),outpName)!=allNames.end())
636 std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::buildOutputPorts : output name \"" << outpName << "\" appears more than once !";
637 throw YACS::Exception(oss.str());
639 _outputs.push_back(YACSEvalOutputPort(*it));
644 YACSEvalYFXGraphGen *YACSEvalYFXRunOnlyPattern::getGenerator() const
647 throw YACS::Exception("getGenerator : generator is NULL !");
651 /////////////////////
653 YACSEvalYFXGraphGen::YACSEvalYFXGraphGen(YACSEvalYFXRunOnlyPattern *boss):_boss(boss),_generatedGraph(0),_FEInGeneratedGraph(0)
656 throw YACS::Exception("YACSEvalYFXGraphGen constructor : boss is NULL !");
659 YACSEvalYFXGraphGen::~YACSEvalYFXGraphGen()
661 //delete _generatedGraph;// -> TODO : AGY why ?
664 void YACSEvalYFXGraphGen::resetGeneratedGraph()
666 delete _generatedGraph;
667 _generatedGraph=0; _FEInGeneratedGraph=0;
670 bool YACSEvalYFXGraphGen::isLocked() const
672 return _generatedGraph!=0;
675 int YACSEvalYFXGraphGen::assignNbOfBranches()
678 throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : the generated graph has not been created !");
679 std::list<YACS::ENGINE::Node *> nodes(_generatedGraph->getChildren());
680 YACS::ENGINE::ForEachLoop *zeMainNode(0);
681 for(std::list<YACS::ENGINE::Node *>::const_iterator it=nodes.begin();it!=nodes.end();it++)
683 YACS::ENGINE::ForEachLoop *isZeMainNode(dynamic_cast<YACS::ENGINE::ForEachLoop *>(*it));
687 zeMainNode=isZeMainNode;
689 throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 1 !");
693 throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 2 !");
694 unsigned int nbProcsDeclared(getBoss()->getResourcesInternal()->getNumberOfProcsDeclared());
695 nbProcsDeclared=std::max(nbProcsDeclared,4u);
697 if(getBoss()->getParallelizeStatus())
699 nbOfBranch=(nbProcsDeclared/getBoss()->getResourcesInternal()->getMaxLevelOfParallelism());
700 nbOfBranch=std::max(nbOfBranch,1);
702 YACS::ENGINE::InputPort *zeInputToSet(zeMainNode->edGetNbOfBranchesPort());
703 YACS::ENGINE::AnyInputPort *zeInputToSetC(dynamic_cast<YACS::ENGINE::AnyInputPort *>(zeInputToSet));
705 throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 3 !");
706 YACS::ENGINE::Any *a(YACS::ENGINE::AtomAny::New(nbOfBranch));
707 zeInputToSetC->put(a);
708 zeInputToSetC->exSaveInit();
713 void YACSEvalYFXGraphGenInteractive::generateGraph()
716 { delete _generatedGraph; _generatedGraph=0; _FEInGeneratedGraph=0; }
717 static const char LISTPYOBJ_STR[]="list[pyobj]";
718 if(getBoss()->getOutputsOfInterest().empty())
720 YACS::ENGINE::RuntimeSALOME::setRuntime();
721 YACS::ENGINE::RuntimeSALOME *r(YACS::ENGINE::getSALOMERuntime());
722 _generatedGraph=r->createProc(DFT_PROC_NAME);
723 YACS::ENGINE::TypeCode *pyobjTC(_generatedGraph->createInterfaceTc("python:obj:1.0","pyobj",std::list<YACS::ENGINE::TypeCodeObjref *>()));
724 std::ostringstream oss; oss << "Loop_" << getBoss()->getRunNode()->getName();
725 _generatedGraph->createType(YACSEvalAnyDouble::TYPE_REPR,"double");
726 _generatedGraph->createType(YACSEvalAnyInt::TYPE_REPR,"int");
728 YACS::ENGINE::InlineNode *n0(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__initializer__"));
729 _generatedGraph->edAddChild(n0);
730 YACS::ENGINE::TypeCode *listPyobjTC(_generatedGraph->createSequenceTc(LISTPYOBJ_STR,LISTPYOBJ_STR,pyobjTC));
731 YACS::ENGINE::OutputPort *sender(n0->edAddOutputPort("sender",listPyobjTC));
732 std::ostringstream var0;
733 const std::vector< YACSEvalInputPort >& inputs(getBoss()->getInputs());
734 for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++)
736 if((*it).isRandomVar())
738 var0 << (*it).getName() << ",";
739 YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it).getTypeOfData()));
740 YACS::ENGINE::InputPort *inp(n0->edAddInputPort((*it).getName(),tc));
741 YACS::ENGINE::InputPyPort *inpc(dynamic_cast<YACS::ENGINE::InputPyPort *>(inp));
743 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::generateGraph : internal error 1 !");
744 (*it).setUndergroundPortToBeSet(inpc);
747 std::ostringstream n0Script; n0Script << "sender=zip(" << var0.str() << ")\n";
748 n0->setScript(n0Script.str());
750 YACS::ENGINE::ForEachLoop *n1(r->createForEachLoop(oss.str(),pyobjTC));
751 _FEInGeneratedGraph=n1;
752 _generatedGraph->edAddChild(n1);
753 _generatedGraph->edAddCFLink(n0,n1);
754 _generatedGraph->edAddDFLink(sender,n1->edGetSeqOfSamplesPort());
755 YACS::ENGINE::InlineNode *n2(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,GATHER_NODE_NAME));
756 _generatedGraph->edAddChild(n2);
757 _generatedGraph->edAddCFLink(n1,n2);
759 YACS::ENGINE::Bloc *n10(r->createBloc(FIRST_FE_SUBNODE_NAME));
761 YACS::ENGINE::InlineNode *n100(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__dispatch__"));
762 YACS::ENGINE::ComposedNode *runNode(getBoss()->getRunNode());
763 YACS::ENGINE::Node *n101(runNode->cloneWithoutCompAndContDeepCpy(0,true));
764 n10->edAddChild(n100);
765 n10->edAddChild(n101);
766 YACS::ENGINE::InputPort *dispatchIn(n100->edAddInputPort("i0",pyobjTC));
767 n10->edAddCFLink(n100,n101);
768 n1->edAddDFLink(n1->edGetSamplePort(),dispatchIn);
769 std::ostringstream var1;
770 for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++)
772 if((*it).isRandomVar())
774 var1 << (*it).getName() << ",";
775 YACS::ENGINE::OutputPort *myOut(n100->edAddOutputPort((*it).getName(),_generatedGraph->getTypeCode((*it).getTypeOfData())));
776 std::string tmpPortName(runNode->getInPortName((*it).getUndergroundPtr()));
777 YACS::ENGINE::InputPort *myIn(n101->getInputPort(tmpPortName));
778 n10->edAddDFLink(myOut,myIn);
781 std::ostringstream n100Script; n100Script << var1.str() << "=i0\n";
782 n100->setScript(n100Script.str());
783 const std::vector<YACSEvalOutputPort *>& outputsOfInt(getBoss()->getOutputsOfInterest());
784 for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++)
786 YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it)->getTypeOfData()));
787 YACS::ENGINE::InputPort *myIn(n2->edAddInputPort((*it)->getName(),tc));
788 std::string tmpPortName(runNode->getOutPortName((*it)->getUndergroundPtr()));
789 YACS::ENGINE::OutputPort *myOut(n101->getOutputPort(tmpPortName));
790 _generatedGraph->edAddDFLink(myOut,myIn);
792 _generatedGraph->updateContainersAndComponents();
795 bool YACSEvalYFXGraphGenInteractive::go(bool stopASAP, YACSEvalSession *session) const
797 YACS::ENGINE::Executor exe;
798 exe.setKeepGoingProperty(!stopASAP);
800 MyAutoThreadSaver locker(!session->isAttached());
801 exe.RunW(getUndergroundGeneratedGraph());
803 return getUndergroundGeneratedGraph()->getState()==YACS::DONE;
806 std::vector<YACSEvalSeqAny *> YACSEvalYFXGraphGenInteractive::getResults() const
808 if(getUndergroundGeneratedGraph()->getState()!=YACS::DONE)
809 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResults : the execution did not finished correctly ! getResults should not be called !");
810 const std::vector<YACSEvalOutputPort *>& outputsOfInt(getBoss()->getOutputsOfInterest());
811 std::vector<YACSEvalSeqAny *> ret(outputsOfInt.size());
812 YACS::ENGINE::Node *node(getUndergroundGeneratedGraph()->getChildByName(YACSEvalYFXGraphGen::GATHER_NODE_NAME));
813 YACS::ENGINE::PythonNode *nodeC(dynamic_cast<YACS::ENGINE::PythonNode *>(node));
815 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResults : internal error !");
817 for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++,ii++)
819 YACS::ENGINE::InPort *input(nodeC->getInPort((*it)->getName()));
820 YACS::ENGINE::InputPyPort *inputC(dynamic_cast<YACS::ENGINE::InputPyPort *>(input));
823 std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::getResults : internal error for input \"" << (*it)->getName() << "\"";
824 throw YACS::Exception(oss.str());
826 ret[ii]=YACSEvalYFXPattern::BuildValueInPort(inputC);
833 void YACSEvalYFXGraphGenCluster::generateGraph()
836 { delete _generatedGraph; _generatedGraph=0; _FEInGeneratedGraph=0; }
838 const char EFXGenFileName[]="EFXGenFileName";
839 const char EFXGenContent[]="import getpass,datetime,os\nn=datetime.datetime.now()\nreturn os.path.join(os.path.sep,\"tmp\",\"EvalYFX_%s_%s_%s.xml\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\")))";
840 const char EFXGenContent2[]="import getpass,datetime\nn=datetime.datetime.now()\nreturn \"EvalYFX_%s_%s_%s\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\"))";
842 YACS::ENGINE::AutoPyRef func(YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent));
843 YACS::ENGINE::AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func));
844 _locSchemaFile=PyString_AsString(val);
845 func=YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent2);
846 val=YACS::ENGINE::evalFuncPyWithNoParams(func);
847 _jobName=PyString_AsString(val);
849 static const char LISTPYOBJ_STR[]="list[pyobj]";
850 if(getBoss()->getOutputsOfInterest().empty())
852 YACS::ENGINE::RuntimeSALOME::setRuntime();
853 YACS::ENGINE::RuntimeSALOME *r(YACS::ENGINE::getSALOMERuntime());
854 _generatedGraph=r->createProc(DFT_PROC_NAME);
855 YACS::ENGINE::TypeCode *pyobjTC(_generatedGraph->createInterfaceTc("python:obj:1.0","pyobj",std::list<YACS::ENGINE::TypeCodeObjref *>()));
856 std::ostringstream oss; oss << "Loop_" << getBoss()->getRunNode()->getName();
857 _generatedGraph->createType(YACSEvalAnyDouble::TYPE_REPR,"double");
858 _generatedGraph->createType(YACSEvalAnyInt::TYPE_REPR,"int");
860 YACS::ENGINE::InlineNode *n0(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__initializer__"));
861 _generatedGraph->edAddChild(n0);
862 YACS::ENGINE::TypeCode *listPyobjTC(_generatedGraph->createSequenceTc(LISTPYOBJ_STR,LISTPYOBJ_STR,pyobjTC));
863 YACS::ENGINE::OutputPort *sender(n0->edAddOutputPort("sender",listPyobjTC));
864 std::ostringstream var0;
865 const std::vector< YACSEvalInputPort >& inputs(getBoss()->getInputs());
866 for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++)
868 if((*it).isRandomVar())
870 var0 << (*it).getName() << ",";
871 YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it).getTypeOfData()));
872 YACS::ENGINE::InputPort *inp(n0->edAddInputPort((*it).getName(),tc));
873 YACS::ENGINE::InputPyPort *inpc(dynamic_cast<YACS::ENGINE::InputPyPort *>(inp));
875 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::generateGraph : internal error 1 !");
876 (*it).setUndergroundPortToBeSet(inpc);
879 std::ostringstream n0Script; n0Script << "sender=zip(" << var0.str() << ")\n";
880 n0->setScript(n0Script.str());
882 YACS::ENGINE::ForEachLoop *n1(r->createForEachLoop(oss.str(),pyobjTC));
883 _FEInGeneratedGraph=n1;
884 _generatedGraph->edAddChild(n1);
885 _generatedGraph->edAddCFLink(n0,n1);
886 _generatedGraph->edAddDFLink(sender,n1->edGetSeqOfSamplesPort());
887 YACS::ENGINE::InlineNode *n2(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,GATHER_NODE_NAME));
888 _generatedGraph->edAddChild(n2);
889 _generatedGraph->edAddCFLink(n1,n2);
891 YACS::ENGINE::Bloc *n10(r->createBloc(FIRST_FE_SUBNODE_NAME));
893 YACS::ENGINE::InlineNode *n100(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__dispatch__"));
894 YACS::ENGINE::ComposedNode *runNode(getBoss()->getRunNode());
895 YACS::ENGINE::Node *n101(runNode->cloneWithoutCompAndContDeepCpy(0,true));
896 n10->edAddChild(n100);
897 n10->edAddChild(n101);
898 YACS::ENGINE::InputPort *dispatchIn(n100->edAddInputPort("i0",pyobjTC));
899 n10->edAddCFLink(n100,n101);
900 n1->edAddDFLink(n1->edGetSamplePort(),dispatchIn);
901 std::ostringstream var1;
902 for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++)
904 if((*it).isRandomVar())
906 var1 << (*it).getName() << ",";
907 YACS::ENGINE::OutputPort *myOut(n100->edAddOutputPort((*it).getName(),_generatedGraph->getTypeCode((*it).getTypeOfData())));
908 std::string tmpPortName(runNode->getInPortName((*it).getUndergroundPtr()));
909 YACS::ENGINE::InputPort *myIn(n101->getInputPort(tmpPortName));
910 n10->edAddDFLink(myOut,myIn);
913 std::ostringstream n100Script; n100Script << var1.str() << "=i0\n";
914 n100->setScript(n100Script.str());
915 const std::vector<YACSEvalOutputPort *>& outputsOfInt(getBoss()->getOutputsOfInterest());
916 std::ostringstream n2Script; n2Script << "zeRes=[";
917 for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++)
919 YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it)->getTypeOfData()));
920 YACS::ENGINE::InputPort *myIn(n2->edAddInputPort((*it)->getName(),tc));
921 n2Script << (*it)->getName() << ", ";
922 std::string tmpPortName(runNode->getOutPortName((*it)->getUndergroundPtr()));
923 YACS::ENGINE::OutputPort *myOut(n101->getOutputPort(tmpPortName));
924 _generatedGraph->edAddDFLink(myOut,myIn);
926 n2Script << "]\nf=file(\"" << _jobName << "\",\"w\") ; f.write(str(zeRes)) ; del f";
927 n2->setScript(n2Script.str());
928 _generatedGraph->updateContainersAndComponents();
931 bool YACSEvalYFXGraphGenCluster::go(bool stopASAP, YACSEvalSession *session) const
933 getUndergroundGeneratedGraph()->saveSchema(_locSchemaFile);
934 YACSEvalListOfResources *rss(getBoss()->getResourcesInternal());
935 const YACSEvalParamsForCluster& cli(rss->getAddParamsForCluster());
936 std::vector<std::string> machines(rss->getAllChosenMachines());
937 if(machines.size()!=1)
938 throw YACS::Exception("YACSEvalYFXGraphGenCluster::go : internal error ! In batch mode and not exactly one machine !");
939 Engines::SalomeLauncher_var sl(session->getInternal()->goFetchingSalomeLauncherInNS());
940 Engines::ResourceParameters rr;
941 rr.name=CORBA::string_dup(machines[0].c_str());
942 rr.hostname=CORBA::string_dup("");
943 rr.can_launch_batch_jobs=true;
944 rr.can_run_containers=true;
945 rr.OS=CORBA::string_dup("Linux");
946 rr.componentList.length(0);
947 rr.nb_proc=rss->getNumberOfProcsDeclared();// <- important
950 rr.nb_node=1;// useless only nb_proc used.
951 rr.nb_proc_per_node=1;// useless only nb_proc used.
952 rr.policy=CORBA::string_dup("cycl");
953 rr.resList.length(0);
954 Engines::JobParameters jp;
955 jp.job_name=CORBA::string_dup(_jobName.c_str());
956 jp.job_type=CORBA::string_dup("yacs_file");
957 jp.job_file=CORBA::string_dup(_locSchemaFile.c_str());
958 jp.env_file=CORBA::string_dup("");
959 jp.in_files.length(0);
960 jp.out_files.length(1);
961 jp.out_files[0]=CORBA::string_dup(_jobName.c_str());
962 jp.work_directory=CORBA::string_dup(cli.getRemoteWorkingDir().c_str());
963 jp.local_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str());
964 jp.result_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str());
965 jp.maximum_duration=CORBA::string_dup(cli.getMaxDuration().c_str());
966 jp.resource_required=rr;
967 jp.queue=CORBA::string_dup("");
969 jp.mem_per_cpu=rr.mem_mb;
970 jp.wckey=CORBA::string_dup(cli.getWCKey().c_str());
971 jp.extra_params=CORBA::string_dup("");
972 jp.specific_parameters.length(0);
973 jp.launcher_file=CORBA::string_dup("");
974 jp.launcher_args=CORBA::string_dup("");
975 _jobid=sl->createJob(jp);
976 sl->launchJob(_jobid);
980 PyRun_SimpleString("import time ; time.sleep(10)");
981 char *state(sl->getJobState(_jobid));//"CREATED", "IN_PROCESS", "QUEUED", "RUNNING", "PAUSED", "FINISHED" or "FAILED"
982 std::string sstate(state);
983 CORBA::string_free(state);
984 if(sstate=="FINISHED" || sstate=="FAILED")
986 ret=sstate=="FINISHED";
990 sl->getJobResults(_jobid,cli.getLocalWorkingDir().c_str());
994 std::ostringstream oss; oss << "import os" << std::endl << "p=os.path.join(\"" << cli.getLocalWorkingDir() << "\",\"" << _jobName << "\")" << std::endl;
995 oss << "if not os.path.exists(p):\n return None\n";
996 oss << "f=file(p,\"r\")" << std::endl;
997 oss << "return eval(f.read())";
998 std::string zeInput(oss.str());
999 YACS::ENGINE::AutoPyRef func(YACS::ENGINE::evalPy("fetch",zeInput));
1000 YACS::ENGINE::AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func));
1001 if(!PyList_Check(val))
1002 throw YACS::Exception("Fetched file does not contain a list !");
1003 Py_ssize_t sz(PyList_Size(val));
1005 for(Py_ssize_t i=0;i<sz;i++)
1007 std::vector<double>& res0(_res[i]);
1008 PyObject *elt0(PyList_GetItem(val,i));
1009 if(!PyList_Check(elt0))
1010 throw YACS::Exception("Fetched file does contain a list of list !");
1011 Py_ssize_t sz0(PyList_Size(elt0)); res0.resize(sz0);
1012 for(Py_ssize_t j=0;j<sz0;j++)
1014 PyObject *elt1(PyList_GetItem(elt0,j));
1015 res0[j]=PyFloat_AsDouble(elt1);
1019 std::ostringstream oss1; oss1 << "import os" << std::endl << "p=os.path.join(\"" << cli.getLocalWorkingDir() << "\",\"" << _jobName << "\") ; os.remove(p)" << std::endl;
1020 std::string s1(oss1.str());
1021 PyRun_SimpleString(s1.c_str());
1022 std::ostringstream oss2; oss2 << "import os,shutil" << std::endl << "p=os.path.join(\"" << cli.getLocalWorkingDir() << "\",\"logs\") ; shutil.rmtree(p)" << std::endl;
1023 std::string s2(oss2.str());
1024 PyRun_SimpleString(s2.c_str());
1026 catch(YACS::Exception& e)
1035 std::vector<YACSEvalSeqAny *> YACSEvalYFXGraphGenCluster::getResults() const
1037 std::size_t sz(_res.size());
1038 std::vector<YACSEvalSeqAny *> ret(sz);
1039 for(std::size_t i=0;i<sz;i++)
1041 YACS::AutoCppPtr<YACSEvalSeqAnyDouble> elt(new YACSEvalSeqAnyDouble(_res[i]));
1042 ret[i]=elt.dettach();