1 // Copyright (C) 2012-2016 CEA/DEN, EDF R&D
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Lesser General Public
5 // License as published by the Free Software Foundation; either
6 // version 2.1 of the License, or (at your option) any later version.
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 // Lesser General Public License for more details.
13 // You should have received a copy of the GNU Lesser General Public
14 // License along with this library; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
19 // Author : Anthony Geay (EDF R&D)
21 #include "YACSEvalYFXPattern.hxx"
22 #include "YACSEvalResource.hxx"
23 #include "YACSEvalSeqAny.hxx"
24 #include "YACSEvalSession.hxx"
25 #include "YACSEvalObserver.hxx"
26 #include "YACSEvalSessionInternal.hxx"
27 #include "YACSEvalAutoPtr.hxx"
29 #include "ElementaryNode.hxx"
30 #include "RuntimeSALOME.hxx"
31 #include "Dispatcher.hxx"
32 #include "Executor.hxx"
33 #include "InputPort.hxx"
34 #include "LinkInfo.hxx"
35 #include "TypeCode.hxx"
37 #include "Dispatcher.hxx"
39 #include "PythonPorts.hxx"
40 #include "ForEachLoop.hxx"
41 #include "PythonNode.hxx"
42 #include "InlineNode.hxx"
43 #include "ServiceNode.hxx"
44 #include "PyStdout.hxx"
45 #include "AutoGIL.hxx"
47 #include "ResourcesManager.hxx"
58 const char YACSEvalYFXPattern::ST_OK[]="ALL_OK";
60 const char YACSEvalYFXPattern::ST_FAILED[]="SOME_SAMPLES_FAILED_AND_ALL_OF_THEM_FAILED_DETERMINISTICALLY";
62 const char YACSEvalYFXPattern::ST_ERROR[]="SOME_SAMPLES_FAILED_BUT_IMPOSSIBLE_TO_CONCLUDE_ON_THEM";
64 const std::size_t YACSEvalYFXPattern::MAX_LGTH_OF_INP_DUMP=10000;
66 const char YACSEvalYFXGraphGen::DFT_PROC_NAME[]="YFX";
68 const char YACSEvalYFXGraphGen::FIRST_FE_SUBNODE_NAME[]="Bloc";
70 const char YACSEvalYFXGraphGen::GATHER_NODE_NAME[]="__gather__";
72 class MyAutoThreadSaver
75 MyAutoThreadSaver(bool isToSave):_isToSave(isToSave),_save(0) { if(_isToSave) _save=PyEval_SaveThread(); }
76 ~MyAutoThreadSaver() { if(_isToSave) PyEval_RestoreThread(_save); }
82 std::vector< YACSEvalInputPort *> YACSEvalYFXPattern::getFreeInputPorts() const
84 std::size_t sz(_inputs.size());
85 std::vector< YACSEvalInputPort *> ret;
86 std::vector< YACSEvalInputPort >::const_iterator it(_inputs.begin());
87 for(std::size_t i=0;i<sz;i++,it++)
88 ret.push_back(const_cast<YACSEvalInputPort *>(&(*it)));
92 std::vector< YACSEvalOutputPort *> YACSEvalYFXPattern::getFreeOutputPorts() const
94 std::size_t sz(_outputs.size());
95 std::vector< YACSEvalOutputPort *> ret;
96 std::vector< YACSEvalOutputPort >::const_iterator it(_outputs.begin());
97 for(std::size_t i=0;i<sz;i++,it++)
98 ret.push_back(const_cast<YACSEvalOutputPort *>(&(*it)));
102 YACSEvalYFXPattern *YACSEvalYFXPattern::FindPatternFrom(YACSEvalYFX *boss, YACS::ENGINE::Proc *scheme, bool ownScheme)
105 throw YACS::Exception("YACSEvalYFXPattern::FindPatternFrom : input scheme must be not null !");
107 YACS::ENGINE::ComposedNode *zeRunNode(0);
108 bool isMatchingRunOnlyPattern(YACSEvalYFXRunOnlyPattern::IsMatching(scheme,zeRunNode));
109 if(isMatchingRunOnlyPattern)
110 return new YACSEvalYFXRunOnlyPattern(boss,scheme,ownScheme,zeRunNode);
112 throw YACS::Exception("YACSEvalYFXPattern::FindPatternFrom : no pattern found for the input scheme !");
115 bool YACSEvalYFXPattern::isAlreadyComputedResources() const
120 void YACSEvalYFXPattern::checkNonAlreadyComputedResources() const
122 if(isAlreadyComputedResources())
123 throw YACS::Exception("checkNonAlreadyComputedResources : instance of computed resources already computed !");
126 void YACSEvalYFXPattern::checkAlreadyComputedResources() const
128 if(!isAlreadyComputedResources())
129 throw YACS::Exception("checkAlreadyComputedResources : instance of computed resources not already computed !");
132 void YACSEvalYFXPattern::checkLocked() const
135 throw YACS::Exception("YACSEvalYFXPattern::checkLocked : Pattern is not locked !");
138 void YACSEvalYFXPattern::checkNonLocked() const
141 throw YACS::Exception("YACSEvalYFXPattern::checkNonLocked : Pattern is locked !");
144 void YACSEvalYFXPattern::CheckNodeIsOK(YACS::ENGINE::ComposedNode *node)
146 /*YACS::ENGINE::LinkInfo info(YACS::ENGINE::LinkInfo::WARN_ONLY_DONT_STOP);
149 node->checkConsistency(info);
151 catch(YACS::Exception& e)
154 if(info.getNumberOfErrLinks(YACS::ENGINE::E_ALL)!=0)
155 throw YACS::Exception("YACSEvalYFXPattern::CheckNodeIsOK : found node is not OK !");
156 std::list<YACS::ENGINE::ElementaryNode *> allNodes(node->getRecursiveConstituents());
157 for(std::list<YACS::ENGINE::ElementaryNode *>::const_iterator it=allNodes.begin();it!=allNodes.end();it++)
159 YACS::ENGINE::ServiceNode *node0(dynamic_cast<YACS::ENGINE::ServiceNode *>(*it));
160 YACS::ENGINE::InlineNode *node1(dynamic_cast<YACS::ENGINE::InlineNode *>(*it));
163 YACS::ENGINE::Container *cont(node0->getContainer());
164 YACS::ENGINE::ComponentInstance *comp(node0->getComponent());
167 std::ostringstream oss; oss << "YACSEvalYFXPattern::CheckNodeIsOK : ServiceNode called \"" << node0->getName() << "\" is not correctly defined !";
168 throw YACS::Exception(oss.str());
173 YACS::ENGINE::Container *cont(node1->getContainer());
174 if(!cont && node1->getExecutionMode()==YACS::ENGINE::InlineNode::REMOTE_STR)
176 std::ostringstream oss; oss << "YACSEvalYFXPattern::CheckNodeIsOK : InlineNode called \"" << node1->getName() << "\" is not correctly defined !";
177 throw YACS::Exception(oss.str());
183 void YACSEvalYFXPattern::registerObserver(YACSEvalObserver *observer)
185 if(_observer==observer)
188 _observer->decrRef();
191 _observer->incrRef();
194 YACSEvalYFXPattern::YACSEvalYFXPattern(YACSEvalYFX *boss, YACS::ENGINE::Proc *scheme, bool ownScheme):_boss(boss),_scheme(scheme),_ownScheme(ownScheme),_parallelizeStatus(true),_rm(new ResourcesManager_cpp),_res(0),_observer(0)
198 YACS::ENGINE::TypeCode *YACSEvalYFXPattern::CreateSeqTypeCodeFrom(YACS::ENGINE::Proc *scheme, const std::string& zeType)
200 std::ostringstream oss; oss << "list[" << zeType << "]";
201 YACS::ENGINE::TypeCode *tc(scheme->getTypeCode(zeType));
202 return scheme->createSequenceTc(oss.str(),oss.str(),tc);
205 void YACSEvalYFXPattern::setResources(YACSEvalListOfResources *res)
207 checkNonAlreadyComputedResources();
213 void YACSEvalYFXPattern::resetResources()
219 YACSEvalSeqAny *YACSEvalYFXPattern::BuildValueInPort(YACS::ENGINE::InputPyPort *port)
222 throw YACS::Exception("YACSEvalYFXPattern::GetValueInPort : null input port !");
223 PyObject *obj(port->getPyObj());
224 YACS::ENGINE::TypeCode *tc(port->edGetType());
225 YACS::ENGINE::TypeCodeSeq *tcc(dynamic_cast<YACS::ENGINE::TypeCodeSeq *>(tc));
228 std::ostringstream oss; oss << "YACSEvalYFXPattern::GetValueInPort : internal error for tc of input \"" << port->getName() << "\"";
229 throw YACS::Exception(oss.str());
231 const YACS::ENGINE::TypeCode *tcct(tcc->contentType());
232 if(!PyList_Check(obj))
233 throw YACS::Exception("YACSEvalYFXPattern::GetValueInPort : internal error 2 !");
234 std::size_t sz(PyList_Size(obj));
235 if(tcct->kind()==YACS::ENGINE::Double)
237 std::vector<double> eltCpp(sz);
238 for(std::size_t i=0;i<sz;i++)
240 PyObject *elt(PyList_GetItem(obj,i));
241 eltCpp[i]=PyFloat_AsDouble(elt);
243 YACS::AutoCppPtr<YACSEvalSeqAnyDouble> elt(new YACSEvalSeqAnyDouble(eltCpp));
244 return elt.dettach();
246 else if(tcct->kind()==YACS::ENGINE::Int)
248 std::vector<int> eltCpp(sz);
249 for(std::size_t i=0;i<sz;i++)
251 PyObject *elt(PyList_GetItem(obj,i));
252 eltCpp[i]=PyInt_AsLong(elt);
254 YACS::AutoCppPtr<YACSEvalSeqAnyInt> elt(new YACSEvalSeqAnyInt(eltCpp));
255 return elt.dettach();
258 throw YACS::Exception("YACSEvalYFXPattern::GetValueInPort : not implemented yet for other than Double and Int !");
261 YACSEvalSeqAny *YACSEvalYFXPattern::BuildValueFromEngineFrmt(YACS::ENGINE::SequenceAny *data)
263 unsigned int sz(data->size());
264 std::vector<double> eltCpp(sz);
265 for(unsigned int ii=0;ii<sz;ii++)
267 YACS::ENGINE::AnyPtr elt((*data)[ii]);
268 YACS::ENGINE::Any *eltPtr((YACS::ENGINE::Any *)elt);
269 YACS::ENGINE::AtomAny *eltPtr2(dynamic_cast<YACS::ENGINE::AtomAny *>(eltPtr));
272 std::ostringstream oss; oss << "YACSEvalYFXPattern::BuildValueFromEngineFrmt : error at pos #" << ii << " ! It is not an AtomAny !";
273 throw YACS::Exception(oss.str());
275 eltCpp[ii]=eltPtr2->getDoubleValue();
277 return new YACSEvalSeqAnyDouble(eltCpp);
280 void YACSEvalYFXPattern::cleanScheme()
287 YACSEvalYFXPattern::~YACSEvalYFXPattern()
290 _observer->decrRef();
295 /////////////////////
297 class YACSEvalYFXRunOnlyPatternInternalObserver : public YACS::ENGINE::Observer
300 YACSEvalYFXRunOnlyPatternInternalObserver(YACSEvalYFXRunOnlyPattern *boss):_boss(boss) { if(!_boss) throw YACS::Exception("YACSEvalYFXRunOnlyPatternInternalObserver constructor : null boss not supported :)"); }
301 void notifyObserver2(YACS::ENGINE::Node *object, const std::string& event, void *something);
303 YACSEvalYFXRunOnlyPattern *_boss;
306 void YACSEvalYFXRunOnlyPatternInternalObserver::notifyObserver2(YACS::ENGINE::Node *object, const std::string& event, void *something)
308 YACS::ENGINE::ForEachLoop *object2(_boss->getUndergroundForEach());
309 YACSEvalObserver *obs(_boss->getObserver());
312 if(event=="progress_ok" && object2==object)
314 int *casted(reinterpret_cast<int *>(something));
315 obs->notifySampleOK(_boss->getBoss(),*casted);
318 if(event=="progress_ko" && object2==object)
320 int *casted(reinterpret_cast<int *>(something));
321 obs->notifySampleKO(_boss->getBoss(),*casted);
326 /////////////////////
328 YACSEvalYFXRunOnlyPattern::YACSEvalYFXRunOnlyPattern(YACSEvalYFX *boss, YACS::ENGINE::Proc *scheme, bool ownScheme, YACS::ENGINE::ComposedNode *runNode):YACSEvalYFXPattern(boss,scheme,ownScheme),_lockedStatus(false),_runNode(runNode),_gen(0),_obs(new YACSEvalYFXRunOnlyPatternInternalObserver(this))
331 throw YACS::Exception("YACSEvalYFXRunOnlyPattern : internal run node must be not null !");
336 YACSEvalYFXRunOnlyPattern::~YACSEvalYFXRunOnlyPattern()
342 void YACSEvalYFXRunOnlyPattern::setOutPortsOfInterestForEvaluation(const std::vector<YACSEvalOutputPort *>& outputsOfInterest)
345 _outputsOfInterest=outputsOfInterest;
349 void YACSEvalYFXRunOnlyPattern::resetOutputsOfInterest()
352 _outputsOfInterest.clear();
356 void YACSEvalYFXRunOnlyPattern::generateGraph()
359 if(getResourcesInternal()->isInteractive())
360 _gen=new YACSEvalYFXGraphGenInteractive(this);
362 _gen=new YACSEvalYFXGraphGenCluster(this);
363 _gen->generateGraph();
366 void YACSEvalYFXRunOnlyPattern::resetGeneratedGraph()
369 _gen->resetGeneratedGraph();
372 int YACSEvalYFXRunOnlyPattern::assignNbOfBranches()
374 checkAlreadyComputedResources();
376 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::assignNbOfBranches : generator is NULL ! Please invoke generateGraph before !");
377 return _gen->assignNbOfBranches();
380 void YACSEvalYFXRunOnlyPattern::assignRandomVarsInputs()
382 std::size_t sz(std::numeric_limits<std::size_t>::max());
383 for(std::vector< YACSEvalInputPort >::const_iterator it=_inputs.begin();it!=_inputs.end();it++)
384 if((*it).isRandomVar())
386 std::size_t locSize((*it).initializeUndergroundWithSeq());
387 if(sz==std::numeric_limits<std::size_t>::max())
391 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::assignRandomVarsInputs : length of sequences in random vars must be the same !");
395 bool YACSEvalYFXRunOnlyPattern::isLocked() const
397 return _lockedStatus;
400 YACSEvalListOfResources *YACSEvalYFXRunOnlyPattern::giveResources()
403 if(!isAlreadyComputedResources())
405 YACS::ENGINE::DeploymentTree dt(_runNode->getDeploymentTree());
406 _runNode->removeRecursivelyRedundantCL();
407 YACSEvalListOfResources *res(new YACSEvalListOfResources(_runNode->getMaxLevelOfParallelism(),getCatalogInAppli(),dt));
410 return getResourcesInternal();
413 YACS::ENGINE::Proc *YACSEvalYFXRunOnlyPattern::getUndergroundGeneratedGraph() const
415 return getGenerator()->getUndergroundGeneratedGraph();
418 std::string YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure() const
420 std::string st(getStatusOfRunStr());//test if a run has occurred.
422 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure : The execution of scheme has been carried out to the end without any problem !");
423 // All the problem can only comes from foreach -> scan it
424 YACS::ENGINE::ForEachLoop *fe(getUndergroundForEach());
425 YACS::ENGINE::NodeStateNameMap nsm;
426 unsigned nbB(fe->getNumberOfBranchesCreatedDyn());
427 std::ostringstream oss;
428 for(unsigned j=0;j<nbB;j++)
430 YACS::ENGINE::Node *nn(fe->getChildByNameExec(YACSEvalYFXGraphGen::FIRST_FE_SUBNODE_NAME,j));
431 YACS::ENGINE::Bloc *nnc(dynamic_cast<YACS::ENGINE::Bloc *>(nn));
433 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getErrorDetailsInCaseOfFailure : internal error 1 ! The direct son of main foreach is expected to be a Bloc !");
434 if(nnc->getState()==YACS::DONE)
436 std::list< YACS::ENGINE::ElementaryNode *> fec(nnc->getRecursiveConstituents());
437 for(std::list< YACS::ENGINE::ElementaryNode *>::reverse_iterator it1=fec.rbegin();it1!=fec.rend();it1++)
439 YACS::StatesForNode st0((*it1)->getState());
442 oss << "NODE = " << nnc->getChildName(*it1) << std::endl;
443 oss << "STATUS = " << nsm[st0] << std::endl;
444 oss << "BRANCH ID = " << j << std::endl;
445 std::list<YACS::ENGINE::InputPort *> inps((*it1)->getSetOfInputPort());
446 for(std::list<YACS::ENGINE::InputPort *>::const_iterator it2=inps.begin();it2!=inps.end();it2++)
448 std::string d((*it2)->getHumanRepr());
450 d=d.substr(0,MAX_LGTH_OF_INP_DUMP);
451 oss << "INPUT \"" << (*it2)->getName() << "\" = " << d << std::endl;
453 oss << "DETAILS = " << std::endl;
454 oss << (*it1)->getErrorDetails();
461 std::string YACSEvalYFXRunOnlyPattern::getStatusOfRunStr() const
463 YACS::StatesForNode st(getUndergroundGeneratedGraph()->getState());
469 case YACS::TOACTIVATE:
470 case YACS::ACTIVATED:
471 case YACS::SUSPENDED:
474 case YACS::DESACTIVATED:
476 std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::getStatusOfRunStr : Unexpected state \"" << YACS::ENGINE::Node::getStateName(st) << "\" ! Did you invoke run ?";
477 throw YACS::Exception(oss.str());
479 case YACS::LOADFAILED:
480 case YACS::EXECFAILED:
482 case YACS::INTERNALERR:
483 return std::string(ST_ERROR);
485 return std::string(ST_FAILED);
487 return std::string(ST_OK);
490 std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::getStatusOfRunStr : unrecognized and managed state \"" << YACS::ENGINE::Node::getStateName(st) << "\" !";
491 throw YACS::Exception(oss.str());
496 std::vector<YACSEvalSeqAny *> YACSEvalYFXRunOnlyPattern::getResults() const
498 return _gen->getResults();
502 * This method works if run succeeded (true return) and also if graph has failed. Graph failed means soft error of evaluation due to error in evaluation (example 1/0 or a normal throw from one node)
503 * If a more serious error occured (SIGSEGV of a server or internal error in YACS engine, cluster error, loose of connection...) this method will throw an exception to warn the caller that the results may be
505 std::vector<YACSEvalSeqAny *> YACSEvalYFXRunOnlyPattern::getResultsInCaseOfFailure(std::vector<unsigned int>& passedIds) const
507 YACS::StatesForNode st(getUndergroundGeneratedGraph()->getState());
511 std::vector<YACSEvalSeqAny *> ret(getResults());
515 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResultsInCaseOfFailure : internal error ! The returned vector has a null pointer at pos #0 !");
516 std::size_t sz(ret[0]->size());
517 passedIds.resize(sz);
518 for(std::size_t i=0;i<sz;i++)
523 getStatusOfRunStr();// To check that the status is recognized.
524 std::list<YACS::ENGINE::Node *> lns(getUndergroundGeneratedGraph()->edGetDirectDescendants());
525 YACS::ENGINE::ForEachLoop *fe(getUndergroundForEach());
527 YACS::ENGINE::Executor exe;
528 std::vector<YACS::ENGINE::SequenceAny *> outputs;
529 std::vector<std::string> nameOfOutputs;
530 passedIds=fe->getPassedResults(&exe,outputs,nameOfOutputs);//<- the key invokation is here.
531 std::size_t sz(passedIds.size()),ii(0);
532 std::vector<YACSEvalSeqAny *> ret(_outputsOfInterest.size());
533 for(std::vector<YACSEvalOutputPort *>::const_iterator it1=_outputsOfInterest.begin();it1!=_outputsOfInterest.end();it1++,ii++)
535 YACS::ENGINE::OutputPort *p((*it1)->getUndergroundPtr());
536 std::string st(_runNode->getOutPortName(p));
537 std::ostringstream oss; oss << YACSEvalYFXGraphGen::FIRST_FE_SUBNODE_NAME << '.' << _runNode->getName() << '.' << st;
539 YACS::ENGINE::ForEachLoop::InterceptorizeNameOfPort(st);
540 std::vector<std::string>::iterator it2(std::find(nameOfOutputs.begin(),nameOfOutputs.end(),st));
541 if(it2==nameOfOutputs.end())
543 std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::getResultsInCaseOfFailure : internal error 3 ! Unable to locate interceptor with name " << st << " ! Possibilities are : ";
544 std::copy(nameOfOutputs.begin(),nameOfOutputs.end(),std::ostream_iterator<std::string>(oss," "));
546 throw YACS::Exception(oss.str());
548 std::size_t pos(std::distance(nameOfOutputs.begin(),it2));
549 ret[ii]=BuildValueFromEngineFrmt(outputs[pos]);
554 void YACSEvalYFXRunOnlyPattern::emitStart() const
556 YACSEvalObserver *obs(getObserver());
559 obs->startComputation(getBoss());
562 bool YACSEvalYFXRunOnlyPattern::go(bool stopASAP, YACSEvalSession *session) const
565 YACS::ENGINE::Dispatcher *disp(YACS::ENGINE::Dispatcher::getDispatcher());
566 disp->addObserver(_obs,getUndergroundForEach(),"progress_ok");
567 disp->addObserver(_obs,getUndergroundForEach(),"progress_ko");
568 bool ret(getGenerator()->go(stopASAP,session));
569 disp->removeObserver(_obs,getUndergroundForEach(),"progress_ok");
570 disp->removeObserver(_obs,getUndergroundForEach(),"progress_ko");
574 YACS::ENGINE::ForEachLoop *YACSEvalYFXRunOnlyPattern::getUndergroundForEach() const
576 return getGenerator()->getUndergroundForEach();
579 bool YACSEvalYFXRunOnlyPattern::IsMatching(YACS::ENGINE::Proc *scheme, YACS::ENGINE::ComposedNode *& runNode)
581 std::list<YACS::ENGINE::Node *> nodes(scheme->getChildren());
584 bool areAllElementary(true);
585 for(std::list<YACS::ENGINE::Node *>::const_iterator it=nodes.begin();it!=nodes.end() && areAllElementary;it++)
586 if(!dynamic_cast<YACS::ENGINE::ElementaryNode *>(*it))
587 areAllElementary=false;
591 CheckNodeIsOK(scheme);
597 YACS::ENGINE::ComposedNode *candidate(dynamic_cast<YACS::ENGINE::ComposedNode *>(nodes.front()));
600 CheckNodeIsOK(candidate);
604 void YACSEvalYFXRunOnlyPattern::buildInputPorts()
607 std::list< YACS::ENGINE::InputPort *> allInputPorts(_runNode->getSetOfInputPort());
608 std::vector<std::string> allNames;
609 for(std::list< YACS::ENGINE::InputPort *>::const_iterator it=allInputPorts.begin();it!=allInputPorts.end();it++)
611 YACS::ENGINE::InputPort *elt(*it);
613 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::buildInputPorts : presence of null input !");
614 std::set<YACS::ENGINE::OutPort *> bls(elt->edSetOutPort());
617 if(YACSEvalPort::IsInputPortPublishable(elt))
619 std::string inpName(elt->getName());
621 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::buildInputPorts : an input has empty name ! Should not !");
622 _inputs.push_back(YACSEvalInputPort(elt));
623 if(std::find(allNames.begin(),allNames.end(),inpName)!=allNames.end())
625 std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::buildInputPorts : input name \"" << inpName << "\" appears more than once !";
626 throw YACS::Exception(oss.str());
628 allNames.push_back(inpName);
634 void YACSEvalYFXRunOnlyPattern::buildOutputPorts()
637 std::list< YACS::ENGINE::OutputPort *> allOutputPorts(_runNode->getSetOfOutputPort());
638 std::vector<std::string> allNames;
639 for(std::list< YACS::ENGINE::OutputPort *>::const_iterator it=allOutputPorts.begin();it!=allOutputPorts.end();it++)
641 YACS::ENGINE::OutputPort *elt(*it);
642 if(YACSEvalPort::IsOutputPortPublishable(elt))
645 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::buildOutputPorts : presence of null output !");
646 std::string outpName(elt->getName());
648 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::buildOutputPorts : an output has empty name ! Should not !");
649 if(std::find(allNames.begin(),allNames.end(),outpName)!=allNames.end())
651 std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::buildOutputPorts : output name \"" << outpName << "\" appears more than once !";
652 throw YACS::Exception(oss.str());
654 _outputs.push_back(YACSEvalOutputPort(*it));
659 YACSEvalYFXGraphGen *YACSEvalYFXRunOnlyPattern::getGenerator() const
662 throw YACS::Exception("getGenerator : generator is NULL !");
666 /////////////////////
668 YACSEvalYFXGraphGen::YACSEvalYFXGraphGen(YACSEvalYFXRunOnlyPattern *boss):_boss(boss),_generatedGraph(0),_FEInGeneratedGraph(0)
671 throw YACS::Exception("YACSEvalYFXGraphGen constructor : boss is NULL !");
674 YACSEvalYFXGraphGen::~YACSEvalYFXGraphGen()
676 //delete _generatedGraph;// -> TODO : AGY why ?
679 void YACSEvalYFXGraphGen::resetGeneratedGraph()
681 delete _generatedGraph;
682 _generatedGraph=0; _FEInGeneratedGraph=0;
685 bool YACSEvalYFXGraphGen::isLocked() const
687 return _generatedGraph!=0;
690 int YACSEvalYFXGraphGen::assignNbOfBranches()
693 throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : the generated graph has not been created !");
694 std::list<YACS::ENGINE::Node *> nodes(_generatedGraph->getChildren());
695 YACS::ENGINE::ForEachLoop *zeMainNode(0);
696 for(std::list<YACS::ENGINE::Node *>::const_iterator it=nodes.begin();it!=nodes.end();it++)
698 YACS::ENGINE::ForEachLoop *isZeMainNode(dynamic_cast<YACS::ENGINE::ForEachLoop *>(*it));
702 zeMainNode=isZeMainNode;
704 throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 1 !");
708 throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 2 !");
709 unsigned int nbProcsDeclared(getBoss()->getResourcesInternal()->getNumberOfProcsDeclared());
710 nbProcsDeclared=std::max(nbProcsDeclared,4u);
712 if(getBoss()->getParallelizeStatus())
714 nbOfBranch=(nbProcsDeclared/getBoss()->getResourcesInternal()->getMaxLevelOfParallelism());
715 nbOfBranch=std::max(nbOfBranch,1);
717 YACS::ENGINE::InputPort *zeInputToSet(zeMainNode->edGetNbOfBranchesPort());
718 YACS::ENGINE::AnyInputPort *zeInputToSetC(dynamic_cast<YACS::ENGINE::AnyInputPort *>(zeInputToSet));
720 throw YACS::Exception("YACSEvalYFXGraphGen::assignNbOfBranches : internal error 3 !");
721 YACS::ENGINE::Any *a(YACS::ENGINE::AtomAny::New(nbOfBranch));
722 zeInputToSetC->put(a);
723 zeInputToSetC->exSaveInit();
728 void YACSEvalYFXGraphGenInteractive::generateGraph()
731 { delete _generatedGraph; _generatedGraph=0; _FEInGeneratedGraph=0; }
732 static const char LISTPYOBJ_STR[]="list[pyobj]";
733 if(getBoss()->getOutputsOfInterest().empty())
735 YACS::ENGINE::RuntimeSALOME::setRuntime();
736 YACS::ENGINE::RuntimeSALOME *r(YACS::ENGINE::getSALOMERuntime());
737 _generatedGraph=r->createProc(DFT_PROC_NAME);
738 YACS::ENGINE::TypeCode *pyobjTC(_generatedGraph->createInterfaceTc("python:obj:1.0","pyobj",std::list<YACS::ENGINE::TypeCodeObjref *>()));
739 std::ostringstream oss; oss << "Loop_" << getBoss()->getRunNode()->getName();
740 _generatedGraph->createType(YACSEvalAnyDouble::TYPE_REPR,"double");
741 _generatedGraph->createType(YACSEvalAnyInt::TYPE_REPR,"int");
743 YACS::ENGINE::InlineNode *n0(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__initializer__"));
744 _generatedGraph->edAddChild(n0);
745 YACS::ENGINE::TypeCode *listPyobjTC(_generatedGraph->createSequenceTc(LISTPYOBJ_STR,LISTPYOBJ_STR,pyobjTC));
746 YACS::ENGINE::OutputPort *sender(n0->edAddOutputPort("sender",listPyobjTC));
747 std::ostringstream var0;
748 const std::vector< YACSEvalInputPort >& inputs(getBoss()->getInputs());
749 for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++)
751 if((*it).isRandomVar())
753 var0 << (*it).getName() << ",";
754 YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it).getTypeOfData()));
755 YACS::ENGINE::InputPort *inp(n0->edAddInputPort((*it).getName(),tc));
756 YACS::ENGINE::InputPyPort *inpc(dynamic_cast<YACS::ENGINE::InputPyPort *>(inp));
758 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::generateGraph : internal error 1 !");
759 (*it).setUndergroundPortToBeSet(inpc);
762 std::ostringstream n0Script; n0Script << "sender=zip(" << var0.str() << ")\n";
763 n0->setScript(n0Script.str());
765 YACS::ENGINE::ForEachLoop *n1(r->createForEachLoop(oss.str(),pyobjTC));
766 _FEInGeneratedGraph=n1;
767 _generatedGraph->edAddChild(n1);
768 _generatedGraph->edAddCFLink(n0,n1);
769 _generatedGraph->edAddDFLink(sender,n1->edGetSeqOfSamplesPort());
770 YACS::ENGINE::InlineNode *n2(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,GATHER_NODE_NAME));
771 _generatedGraph->edAddChild(n2);
772 _generatedGraph->edAddCFLink(n1,n2);
774 YACS::ENGINE::Bloc *n10(r->createBloc(FIRST_FE_SUBNODE_NAME));
776 YACS::ENGINE::InlineNode *n100(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__dispatch__"));
777 YACS::ENGINE::ComposedNode *runNode(getBoss()->getRunNode());
778 YACS::ENGINE::Node *n101(runNode->cloneWithoutCompAndContDeepCpy(0,true));
779 n10->edAddChild(n100);
780 n10->edAddChild(n101);
781 YACS::ENGINE::InputPort *dispatchIn(n100->edAddInputPort("i0",pyobjTC));
782 n10->edAddCFLink(n100,n101);
783 n1->edAddDFLink(n1->edGetSamplePort(),dispatchIn);
784 std::ostringstream var1;
785 for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++)
787 if((*it).isRandomVar())
789 var1 << (*it).getName() << ",";
790 YACS::ENGINE::OutputPort *myOut(n100->edAddOutputPort((*it).getName(),_generatedGraph->getTypeCode((*it).getTypeOfData())));
791 std::string tmpPortName(runNode->getInPortName((*it).getUndergroundPtr()));
792 YACS::ENGINE::InputPort *myIn(n101->getInputPort(tmpPortName));
793 n10->edAddDFLink(myOut,myIn);
796 std::ostringstream n100Script; n100Script << var1.str() << "=i0\n";
797 n100->setScript(n100Script.str());
798 const std::vector<YACSEvalOutputPort *>& outputsOfInt(getBoss()->getOutputsOfInterest());
799 for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++)
801 YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it)->getTypeOfData()));
802 YACS::ENGINE::InputPort *myIn(n2->edAddInputPort((*it)->getName(),tc));
803 std::string tmpPortName(runNode->getOutPortName((*it)->getUndergroundPtr()));
804 YACS::ENGINE::OutputPort *myOut(n101->getOutputPort(tmpPortName));
805 _generatedGraph->edAddDFLink(myOut,myIn);
807 _generatedGraph->updateContainersAndComponents();
810 bool YACSEvalYFXGraphGenInteractive::go(bool stopASAP, YACSEvalSession *session) const
812 YACS::ENGINE::Executor exe;
813 exe.setKeepGoingProperty(!stopASAP);
815 MyAutoThreadSaver locker(!session->isAlreadyPyThreadSaved());
816 exe.RunW(getUndergroundGeneratedGraph());
818 return getUndergroundGeneratedGraph()->getState()==YACS::DONE;
821 std::vector<YACSEvalSeqAny *> YACSEvalYFXGraphGenInteractive::getResults() const
823 if(getUndergroundGeneratedGraph()->getState()!=YACS::DONE)
824 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResults : the execution did not finished correctly ! getResults should not be called !");
825 const std::vector<YACSEvalOutputPort *>& outputsOfInt(getBoss()->getOutputsOfInterest());
826 std::vector<YACSEvalSeqAny *> ret(outputsOfInt.size());
827 YACS::ENGINE::Node *node(getUndergroundGeneratedGraph()->getChildByName(YACSEvalYFXGraphGen::GATHER_NODE_NAME));
828 YACS::ENGINE::PythonNode *nodeC(dynamic_cast<YACS::ENGINE::PythonNode *>(node));
830 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::getResults : internal error !");
832 for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++,ii++)
834 YACS::ENGINE::InPort *input(nodeC->getInPort((*it)->getName()));
835 YACS::ENGINE::InputPyPort *inputC(dynamic_cast<YACS::ENGINE::InputPyPort *>(input));
838 std::ostringstream oss; oss << "YACSEvalYFXRunOnlyPattern::getResults : internal error for input \"" << (*it)->getName() << "\"";
839 throw YACS::Exception(oss.str());
841 ret[ii]=YACSEvalYFXPattern::BuildValueInPort(inputC);
848 void YACSEvalYFXGraphGenCluster::generateGraph()
851 { delete _generatedGraph; _generatedGraph=0; _FEInGeneratedGraph=0; }
853 const char EFXGenFileName[]="EFXGenFileName";
854 const char EFXGenContent[]="import getpass,datetime,os\nn=datetime.datetime.now()\nreturn os.path.join(os.path.sep,\"tmp\",\"EvalYFX_%s_%s_%s.xml\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\")))";
855 const char EFXGenContent2[]="import getpass,datetime\nn=datetime.datetime.now()\nreturn \"EvalYFX_%s_%s_%s\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\"))";
857 YACS::ENGINE::AutoPyRef func(YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent));
858 YACS::ENGINE::AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func));
859 _locSchemaFile=PyString_AsString(val);
860 func=YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent2);
861 val=YACS::ENGINE::evalFuncPyWithNoParams(func);
862 _jobName=PyString_AsString(val);
864 static const char LISTPYOBJ_STR[]="list[pyobj]";
865 if(getBoss()->getOutputsOfInterest().empty())
867 YACS::ENGINE::RuntimeSALOME::setRuntime();
868 YACS::ENGINE::RuntimeSALOME *r(YACS::ENGINE::getSALOMERuntime());
869 _generatedGraph=r->createProc(DFT_PROC_NAME);
870 YACS::ENGINE::TypeCode *pyobjTC(_generatedGraph->createInterfaceTc("python:obj:1.0","pyobj",std::list<YACS::ENGINE::TypeCodeObjref *>()));
871 std::ostringstream oss; oss << "Loop_" << getBoss()->getRunNode()->getName();
872 _generatedGraph->createType(YACSEvalAnyDouble::TYPE_REPR,"double");
873 _generatedGraph->createType(YACSEvalAnyInt::TYPE_REPR,"int");
875 YACS::ENGINE::InlineNode *n0(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__initializer__"));
876 _generatedGraph->edAddChild(n0);
877 YACS::ENGINE::TypeCode *listPyobjTC(_generatedGraph->createSequenceTc(LISTPYOBJ_STR,LISTPYOBJ_STR,pyobjTC));
878 YACS::ENGINE::OutputPort *sender(n0->edAddOutputPort("sender",listPyobjTC));
879 std::ostringstream var0;
880 const std::vector< YACSEvalInputPort >& inputs(getBoss()->getInputs());
881 for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++)
883 if((*it).isRandomVar())
885 var0 << (*it).getName() << ",";
886 YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it).getTypeOfData()));
887 YACS::ENGINE::InputPort *inp(n0->edAddInputPort((*it).getName(),tc));
888 YACS::ENGINE::InputPyPort *inpc(dynamic_cast<YACS::ENGINE::InputPyPort *>(inp));
890 throw YACS::Exception("YACSEvalYFXRunOnlyPattern::generateGraph : internal error 1 !");
891 (*it).setUndergroundPortToBeSet(inpc);
894 std::ostringstream n0Script; n0Script << "sender=zip(" << var0.str() << ")\n";
895 n0->setScript(n0Script.str());
897 YACS::ENGINE::ForEachLoop *n1(r->createForEachLoop(oss.str(),pyobjTC));
898 _FEInGeneratedGraph=n1;
899 _generatedGraph->edAddChild(n1);
900 _generatedGraph->edAddCFLink(n0,n1);
901 _generatedGraph->edAddDFLink(sender,n1->edGetSeqOfSamplesPort());
902 YACS::ENGINE::InlineNode *n2(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,GATHER_NODE_NAME));
903 _generatedGraph->edAddChild(n2);
904 _generatedGraph->edAddCFLink(n1,n2);
906 YACS::ENGINE::Bloc *n10(r->createBloc(FIRST_FE_SUBNODE_NAME));
908 YACS::ENGINE::InlineNode *n100(r->createScriptNode(YACS::ENGINE::PythonNode::KIND,"__dispatch__"));
909 YACS::ENGINE::ComposedNode *runNode(getBoss()->getRunNode());
910 YACS::ENGINE::Node *n101(runNode->cloneWithoutCompAndContDeepCpy(0,true));
911 n10->edAddChild(n100);
912 n10->edAddChild(n101);
913 YACS::ENGINE::InputPort *dispatchIn(n100->edAddInputPort("i0",pyobjTC));
914 n10->edAddCFLink(n100,n101);
915 n1->edAddDFLink(n1->edGetSamplePort(),dispatchIn);
916 std::ostringstream var1;
917 for(std::vector< YACSEvalInputPort >::const_iterator it=inputs.begin();it!=inputs.end();it++)
919 if((*it).isRandomVar())
921 var1 << (*it).getName() << ",";
922 YACS::ENGINE::OutputPort *myOut(n100->edAddOutputPort((*it).getName(),_generatedGraph->getTypeCode((*it).getTypeOfData())));
923 std::string tmpPortName(runNode->getInPortName((*it).getUndergroundPtr()));
924 YACS::ENGINE::InputPort *myIn(n101->getInputPort(tmpPortName));
925 n10->edAddDFLink(myOut,myIn);
928 std::ostringstream n100Script; n100Script << var1.str() << "=i0\n";
929 n100->setScript(n100Script.str());
930 const std::vector<YACSEvalOutputPort *>& outputsOfInt(getBoss()->getOutputsOfInterest());
931 std::ostringstream n2Script; n2Script << "zeRes=[";
932 for(std::vector< YACSEvalOutputPort * >::const_iterator it=outputsOfInt.begin();it!=outputsOfInt.end();it++)
934 YACS::ENGINE::TypeCode *tc(YACSEvalYFXPattern::CreateSeqTypeCodeFrom(_generatedGraph,(*it)->getTypeOfData()));
935 YACS::ENGINE::InputPort *myIn(n2->edAddInputPort((*it)->getName(),tc));
936 n2Script << (*it)->getName() << ", ";
937 std::string tmpPortName(runNode->getOutPortName((*it)->getUndergroundPtr()));
938 YACS::ENGINE::OutputPort *myOut(n101->getOutputPort(tmpPortName));
939 _generatedGraph->edAddDFLink(myOut,myIn);
941 n2Script << "]\nf=file(\"" << _jobName << "\",\"w\") ; f.write(str(zeRes)) ; del f";
942 n2->setScript(n2Script.str());
943 _generatedGraph->updateContainersAndComponents();
946 bool YACSEvalYFXGraphGenCluster::go(bool stopASAP, YACSEvalSession *session) const
948 getUndergroundGeneratedGraph()->saveSchema(_locSchemaFile);
949 YACSEvalListOfResources *rss(getBoss()->getResourcesInternal());
950 const YACSEvalParamsForCluster& cli(rss->getAddParamsForCluster());
951 std::vector<std::string> machines(rss->getAllChosenMachines());
952 if(machines.size()!=1)
953 throw YACS::Exception("YACSEvalYFXGraphGenCluster::go : internal error ! In batch mode and not exactly one machine !");
954 Engines::SalomeLauncher_var sl(session->getInternal()->goFetchingSalomeLauncherInNS());
955 Engines::ResourceParameters rr;
956 rr.name=CORBA::string_dup(machines[0].c_str());
957 rr.hostname=CORBA::string_dup("");
958 rr.can_launch_batch_jobs=true;
959 rr.can_run_containers=true;
960 rr.OS=CORBA::string_dup("Linux");
961 rr.componentList.length(0);
962 rr.nb_proc=rss->getNumberOfProcsDeclared();// <- important
965 rr.nb_node=1;// useless only nb_proc used.
966 rr.nb_proc_per_node=1;// useless only nb_proc used.
967 rr.policy=CORBA::string_dup("cycl");
968 rr.resList.length(0);
969 Engines::JobParameters jp;
970 jp.job_name=CORBA::string_dup(_jobName.c_str());
971 jp.job_type=CORBA::string_dup("yacs_file");
972 jp.job_file=CORBA::string_dup(_locSchemaFile.c_str());
973 jp.env_file=CORBA::string_dup("");
974 jp.in_files.length(0);
975 jp.out_files.length(1);
976 jp.out_files[0]=CORBA::string_dup(_jobName.c_str());
977 jp.work_directory=CORBA::string_dup(cli.getRemoteWorkingDir().c_str());
978 jp.local_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str());
979 jp.result_directory=CORBA::string_dup(cli.getLocalWorkingDir().c_str());
980 jp.maximum_duration=CORBA::string_dup(cli.getMaxDuration().c_str());
981 jp.resource_required=rr;
982 jp.queue=CORBA::string_dup("");
984 jp.mem_per_cpu=rr.mem_mb;
985 jp.wckey=CORBA::string_dup(cli.getWCKey().c_str());
986 jp.extra_params=CORBA::string_dup("");
987 jp.specific_parameters.length(0);
988 jp.launcher_file=CORBA::string_dup("");
989 jp.launcher_args=CORBA::string_dup("");
990 _jobid=sl->createJob(jp);
991 sl->launchJob(_jobid);
995 PyRun_SimpleString("import time ; time.sleep(10)");
996 char *state(sl->getJobState(_jobid));//"CREATED", "IN_PROCESS", "QUEUED", "RUNNING", "PAUSED", "FINISHED" or "FAILED"
997 std::string sstate(state);
998 CORBA::string_free(state);
999 if(sstate=="FINISHED" || sstate=="FAILED")
1001 ret=sstate=="FINISHED";
1005 sl->getJobResults(_jobid,cli.getLocalWorkingDir().c_str());
1009 std::ostringstream oss; oss << "import os" << std::endl << "p=os.path.join(\"" << cli.getLocalWorkingDir() << "\",\"" << _jobName << "\")" << std::endl;
1010 oss << "if not os.path.exists(p):\n return None\n";
1011 oss << "f=file(p,\"r\")" << std::endl;
1012 oss << "return eval(f.read())";
1013 std::string zeInput(oss.str());
1014 YACS::ENGINE::AutoPyRef func(YACS::ENGINE::evalPy("fetch",zeInput));
1015 YACS::ENGINE::AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func));
1016 if(!PyList_Check(val))
1017 throw YACS::Exception("Fetched file does not contain a list !");
1018 Py_ssize_t sz(PyList_Size(val));
1020 for(Py_ssize_t i=0;i<sz;i++)
1022 std::vector<double>& res0(_res[i]);
1023 PyObject *elt0(PyList_GetItem(val,i));
1024 if(!PyList_Check(elt0))
1025 throw YACS::Exception("Fetched file does contain a list of list !");
1026 Py_ssize_t sz0(PyList_Size(elt0)); res0.resize(sz0);
1027 for(Py_ssize_t j=0;j<sz0;j++)
1029 PyObject *elt1(PyList_GetItem(elt0,j));
1030 res0[j]=PyFloat_AsDouble(elt1);
1034 std::ostringstream oss1; oss1 << "import os" << std::endl << "p=os.path.join(\"" << cli.getLocalWorkingDir() << "\",\"" << _jobName << "\") ; os.remove(p)" << std::endl;
1035 std::string s1(oss1.str());
1036 PyRun_SimpleString(s1.c_str());
1037 std::ostringstream oss2; oss2 << "import os,shutil" << std::endl << "p=os.path.join(\"" << cli.getLocalWorkingDir() << "\",\"logs\") ; shutil.rmtree(p)" << std::endl;
1038 std::string s2(oss2.str());
1039 PyRun_SimpleString(s2.c_str());
1041 catch(YACS::Exception& e)
1050 std::vector<YACSEvalSeqAny *> YACSEvalYFXGraphGenCluster::getResults() const
1052 std::size_t sz(_res.size());
1053 std::vector<YACSEvalSeqAny *> ret(sz);
1054 for(std::size_t i=0;i<sz;i++)
1056 YACS::AutoCppPtr<YACSEvalSeqAnyDouble> elt(new YACSEvalSeqAnyDouble(_res[i]));
1057 ret[i]=elt.dettach();