throw YACS::Exception("YACSEvalNonConstLocker::checkNonLocked : this is locked and trying to invoke non-const method !");
}
+void YACSEvalNonConstLocker::checkLocked() const
+{
+ if(!_isLocked)
+ throw YACS::Exception("YACSEvalNonConstLocker::checkLocked : this is NOT locked whereas it should be !");
+}
+
YACSEvalVirtualYACSContainer::YACSEvalVirtualYACSContainer():_gf(0),_cont(0)
{
}
std::string YACSEvalVirtualYACSContainer::getValueOfKey(const char *key) const
{
+ std::string skey(key);
+ if(skey==HOSTNAME_KEY)
+ return _chosenHost;
std::map<std::string,std::string>::const_iterator it;
- it=_overloadedPropertyMap.find(key);
+ it=_overloadedPropertyMap.find(skey);
if(it!=_overloadedPropertyMap.end())
return (*it).second;
- it=_propertyMap.find(key);
+ it=_propertyMap.find(skey);
if(it!=_propertyMap.end())
return (*it).second;
return std::string();
void YACSEvalVirtualYACSContainer::setProperty(const std::string& key, const std::string &value)
{
checkNonLocked();
- _overloadedPropertyMap[key]=value;
+ if(key==HOSTNAME_KEY)
+ setWantedMachine(value);
+ else
+ _overloadedPropertyMap[key]=value;
}
-void YACSEvalVirtualYACSContainer::apply()
+void YACSEvalVirtualYACSContainer::apply(bool interactiveStatus)
{
YACS::ENGINE::SalomeContainer *cont0(dynamic_cast<YACS::ENGINE::SalomeContainer *>(_cont));
YACS::ENGINE::SalomeHPContainer *cont1(dynamic_cast<YACS::ENGINE::SalomeHPContainer *>(_cont));
- if(cont0)
- {
- cont0->setProperty(HOSTNAME_KEY,getValueOfKey(HOSTNAME_KEY));
- return ;
- }
- else if(cont1)
- {
- cont1->setProperty(HOSTNAME_KEY,getValueOfKey(HOSTNAME_KEY));
- return ;
- }
- else
+ if(!cont0 && !cont1)
throw YACS::Exception("YACSEvalVirtualYACSContainer::apply : unrecognized container !");
+ if(interactiveStatus)
+ _cont->setProperty(HOSTNAME_KEY,getValueOfKey(HOSTNAME_KEY));
+ else
+ {// in cluster mode no hostname and policy set to cycl by default
+ _cont->setProperty(HOSTNAME_KEY,std::string());
+ _cont->setProperty("policy","cycl");
+ }
}
std::string YACSEvalVirtualYACSContainer::getName() const
return const_cast<YACSEvalVirtualYACSContainer *>(&_containers[i]);
}
-void YACSEvalResource::apply()
+void YACSEvalResource::apply(bool interactiveStatus)
{
for(std::vector< YACSEvalVirtualYACSContainer >::iterator it=_containers.begin();it!=_containers.end();it++)
- (*it).apply();
+ (*it).apply(interactiveStatus);
}
void YACSEvalResource::fitWithCurrentCatalogAbs()
_containers[i].set(this,conts[i]);
}
+void YACSEvalParamsForCluster::setExclusiveness(bool newStatus)
+{
+ if(newStatus)
+ throw YACS::Exception("YACSEvalParamsForCluster::setExclusiveness : exclusive mode set to true is not implemented yet !");
+}
+
+void YACSEvalParamsForCluster::checkConsistency() const
+{
+ if(_remoteWorkingDir.empty())
+ throw YACS::Exception("YACSEvalParamsForCluster::checkConsistency : remote work dir is not set !");
+ if(_wcKey.empty())
+ throw YACS::Exception("YACSEvalParamsForCluster::checkConsistency : WC key is not set !");
+ if(_nbOfProcs==0)
+ throw YACS::Exception("YACSEvalParamsForCluster::checkConsistency : nb procs must be != 0 !");
+}
+
YACSEvalListOfResources::YACSEvalListOfResources(int maxLevOfPara, ResourcesManager_cpp *rm, const YACS::ENGINE::DeploymentTree& dt):_maxLevOfPara(maxLevOfPara),_rm(rm),_dt(new YACS::ENGINE::DeploymentTree(dt))
{
std::vector<YACS::ENGINE::Container *> conts(_dt->getAllContainers());
unsigned int YACSEvalListOfResources::getNumberOfProcsDeclared() const
{
- std::vector<std::string> chosen(getAllChosenMachines());
- unsigned int ret(0);
- for(std::vector<std::string>::const_iterator it=chosen.begin();it!=chosen.end();it++)
- ret+=getNumberOfProcOfResource(*it);
- return ret;
+ if(isInteractive())
+ {
+ std::vector<std::string> chosen(getAllChosenMachines());
+ unsigned int ret(0);
+ for(std::vector<std::string>::const_iterator it=chosen.begin();it!=chosen.end();it++)
+ ret+=getNumberOfProcOfResource(*it);
+ return ret;
+ }
+ else
+ return _paramsInCaseOfCluster.getNbProcs();
+}
+
+void YACSEvalListOfResources::checkOKForRun() const
+{
+ if(!isInteractive())
+ _paramsInCaseOfCluster.checkConsistency();
}
void YACSEvalListOfResources::apply()
{
+ bool interactiveSt(isInteractive());
for(std::vector<YACSEvalResource *>::iterator it=_resources.begin();it!=_resources.end();it++)
- (*it)->apply();
+ (*it)->apply(interactiveSt);
}
YACSEvalListOfResources::~YACSEvalListOfResources()
throw YACS::Exception("YACSEvalListOfResources::notifyWantedMachine : internal error !");
const ParserResourcesType& oldPRT((*itOld).second);
const ParserResourcesType& newPRT((*itNew).second);
- if(oldPRT.ClusterInternalProtocol==newPRT.ClusterInternalProtocol)
+ bool oldISt(oldPRT.Batch==none),newISt(newPRT.Batch==none);//interactive status
+ if(oldISt==newISt)
return ;
// the batch/interactive mode has changed -> try to change for all.
std::queue<std::string> sts;
try
{
- for(std::vector<YACSEvalResource *>::const_iterator it=_resources.begin();it!=_resources.end();it++)
- {
- std::size_t sz((*it)->size());
- for(std::size_t i=0;i<sz;i++)
- {
- YACSEvalVirtualYACSContainer *cont((*it)->at(i));
- if(cont==sender)
- continue;
- sts.push(cont->findDefault(newPRT.ClusterInternalProtocol==sh));
- }
- }
+ if(newISt)
+ {// switching from interactive to batch -> In batch every YACSEvalVirtualYACSContainer instances in this must lie on newMachine.
+ for(std::vector<YACSEvalResource *>::const_iterator it=_resources.begin();it!=_resources.end();it++)
+ {
+ std::vector<std::string> fms((*it)->getAllFittingMachines());
+ std::vector<std::string>::iterator it0(std::find(fms.begin(),fms.end(),newMachine));
+ if(it0==fms.end())
+ {
+ std::ostringstream oss; oss << "In the context of switch to batch the requested cluster machine \"" << newMachine << "\" is not compatible for following list of containers : " << std::endl;
+ std::size_t sz((*it)->size());
+ for(std::size_t i=0;i<sz;i++)
+ {
+ YACSEvalVirtualYACSContainer *cont((*it)->at(i));
+ if(cont)
+ oss << " \"" << cont->getName() << "\", ";
+ }
+ throw YACS::Exception(oss.str());
+ }
+ std::size_t sz((*it)->size());
+ for(std::size_t i=0;i<sz;i++)
+ {
+ std::size_t sz((*it)->size());
+ for(std::size_t i=0;i<sz;i++)
+ {
+ YACSEvalVirtualYACSContainer *cont((*it)->at(i));
+ if(cont==sender)
+ continue;
+ sts.push(newMachine);
+ }
+ }
+ }
+ }
+ else
+ {
+ for(std::vector<YACSEvalResource *>::const_iterator it=_resources.begin();it!=_resources.end();it++)
+ {
+ std::size_t sz((*it)->size());
+ for(std::size_t i=0;i<sz;i++)
+ {
+ YACSEvalVirtualYACSContainer *cont((*it)->at(i));
+ if(cont==sender)
+ continue;
+ sts.push(cont->findDefault(false));
+ }
+ }
+ }
}
catch(YACS::Exception& e)
{
if(it==zeList.end())
throw YACS::Exception("YACSEvalListOfResources::hasRightInteractiveStatus : internal error !");
const ParserResourcesType& elt((*it).second);
- bool myStatus(elt.ClusterInternalProtocol==sh);
+ bool myStatus(elt.Batch==none);
return myStatus==isInteractive;
}
YACSEVALYFX_EXPORT void lock() { _isLocked=true; }
YACSEVALYFX_EXPORT bool isLocked() const { return _isLocked; }
YACSEVALYFX_EXPORT void checkNonLocked() const;
+ YACSEVALYFX_EXPORT void checkLocked() const;
private:
bool _isLocked;
};
YACSEVALYFX_EXPORT void setProperty(const std::string& key, const std::string &value);
YACSEVALYFX_EXPORT void resetOverloadedProps() { checkNonLocked(); _overloadedPropertyMap.clear(); }
YACSEVALYFX_EXPORT std::string getName() const;
- void apply();
+ void apply(bool interactiveStatus);
public:
YACSEvalVirtualYACSContainer();
void set(YACSEvalResource *gf, YACS::ENGINE::Container *cont);
YACSEVALYFX_EXPORT void setWantedMachine(const std::string& machine);
YACSEVALYFX_EXPORT std::size_t size() const { return _containers.size(); }
YACSEVALYFX_EXPORT YACSEvalVirtualYACSContainer *at(std::size_t i) const;
- YACSEVALYFX_EXPORT void apply();
+ YACSEVALYFX_EXPORT void apply(bool interactiveStatus);
public:
void fitWithCurrentCatalogAbs();
void aggregate(ParserResourcesType& entry) const;
class ResourcesManager_cpp;
+class YACSEvalParamsForCluster
+{
+public:
+ YACSEvalParamsForCluster():_exclusiveness(false),_nbOfProcs(1) { }
+ bool getExclusiveness() const { return _exclusiveness; }
+ void setExclusiveness(bool newStatus);
+ std::string getRemoteWorkingDir() const { return _remoteWorkingDir; }
+ void setRemoteWorkingDir(const std::string& remoteWorkingDir) { _remoteWorkingDir=remoteWorkingDir; }
+ std::string getWCKey() const { return _wcKey; }
+ void setWCKey(const std::string& wcKey) { _wcKey=wcKey; }
+ unsigned int getNbProcs() const { return _nbOfProcs; }
+ void setNbProcs(unsigned int nbProcs) { _nbOfProcs=nbProcs; }
+ void checkConsistency() const;
+private:
+ bool _exclusiveness;
+ std::string _remoteWorkingDir;
+ std::string _wcKey;
+ unsigned int _nbOfProcs;
+};
+
class YACSEvalListOfResources : public YACSEvalNonConstLocker
{
public:
YACSEVALYFX_EXPORT YACSEvalResource *at(std::size_t i) const;
YACSEVALYFX_EXPORT bool isInteractive() const;
YACSEVALYFX_EXPORT unsigned int getNumberOfProcsDeclared() const;
+ YACSEVALYFX_EXPORT void checkOKForRun() const;
+ YACSEVALYFX_EXPORT YACSEvalParamsForCluster& getAddParamsForCluster() { return _paramsInCaseOfCluster; }
void apply();
YACSEVALYFX_EXPORT ~YACSEvalListOfResources();
public:
int _maxLevOfPara;
std::vector<YACSEvalResource *> _resources;
YACS::ENGINE::DeploymentTree *_dt;
+ YACSEvalParamsForCluster _paramsInCaseOfCluster;
};
#endif
#include <Python.h>
-class MyAutoThreadSaver
-{
-public:
- MyAutoThreadSaver():_save(PyEval_SaveThread()) { }
- ~MyAutoThreadSaver() { PyEval_RestoreThread(_save); }
-private:
- PyThreadState *_save;
-};
-
YACSEvalYFX *YACSEvalYFX::BuildFromFile(const std::string& xmlOfScheme)
{
YACS::ENGINE::RuntimeSALOME::setRuntime();
bool YACSEvalYFX::run(YACSEvalSession *session, int& nbOfBranches)
{
- _pattern->assignRandomVarsInputs();
- YACSEvalListOfResources *rss(giveResources());
- if(!rss->isInteractive())
- throw YACS::Exception("YACSEvalYFX::run : not implemented yet for non interactive !");
- YACSEvalSession *mySession(session);
- YACS::AutoCppPtr<YACSEvalSession> loc;
if(!session)
{
throw YACS::Exception("YACSEvalYFX::run : input session in null !");
- /*loc=new YACSEvalSession;
- mySession=loc;*/
}
+ session->launch();
+ //
+ YACSEvalListOfResources *rss(giveResources());
+ rss->checkOKForRun();
+ _pattern->assignRandomVarsInputs();
+ //if(!rss->isInteractive())
+ // throw YACS::Exception("YACSEvalYFX::run : not implemented yet for non interactive !");
rss->apply();
nbOfBranches=_pattern->assignNbOfBranches();
- mySession->launch();
- YACS::ENGINE::Executor exe;
- exe.setKeepGoingProperty(!_params.getStopASAPAfterErrorStatus());
- //
- _pattern->emitStart();
- {
- MyAutoThreadSaver locker;
- exe.RunW(getUndergroundGeneratedGraph());
- }
- return getUndergroundGeneratedGraph()->getState()==YACS::DONE;
+ return _pattern->go(_params.getStopASAPAfterErrorStatus());
}
void YACSEvalYFX::registerObserver(YACSEvalObserver *observer)
#include "PythonNode.hxx"
#include "InlineNode.hxx"
#include "ServiceNode.hxx"
+#include "PyStdout.hxx"
+#include "AutoGIL.hxx"
#include "ResourcesManager.hxx"
#include <sstream>
#include <iterator>
+////
+#include <stdlib.h>
+
const char YACSEvalYFXPattern::DFT_PROC_NAME[]="YFX";
const char YACSEvalYFXPattern::ST_OK[]="ALL_OK";
const char YACSEvalYFXRunOnlyPattern::GATHER_NODE_NAME[]="__gather__";
+class MyAutoThreadSaver
+{
+public:
+ MyAutoThreadSaver():_save(PyEval_SaveThread()) { }
+ ~MyAutoThreadSaver() { PyEval_RestoreThread(_save); }
+private:
+ PyThreadState *_save;
+};
+
std::vector< YACSEvalInputPort *> YACSEvalYFXPattern::getFreeInputPorts() const
{
std::size_t sz(_inputs.size());
obs->notifyNumberOfSamplesToEval(getBoss(),_FEInGeneratedGraph->getNbOfElementsToBeProcessed());
}
+bool YACSEvalYFXRunOnlyPattern::go(bool stopASAP) const
+{
+ emitStart();
+ if(getResourcesInternal()->isInteractive())
+ {
+ YACS::ENGINE::Executor exe;
+ exe.setKeepGoingProperty(!stopASAP);
+ {
+ MyAutoThreadSaver locker;
+ exe.RunW(getUndergroundGeneratedGraph());
+ }
+ return getUndergroundGeneratedGraph()->getState()==YACS::DONE;
+ }
+ else
+ {
+ char EFXGenFileName[]="EFXGenFileName";
+ char EFXGenContent[]="import getpass,datetime,os\nn=datetime.datetime.now()\nreturn os.path.join(os.path.sep,\"tmp\",\"EvalYFX_%s_%s_%s.xml\"%(getpass.getuser(),n.strftime(\"%d%b%y\"),n.strftime(\"%H%M%S\")))";
+ //
+ YACS::ENGINE::AutoPyRef func(YACS::ENGINE::evalPy(EFXGenFileName,EFXGenContent));
+ YACS::ENGINE::AutoPyRef val(YACS::ENGINE::evalFuncPyWithNoParams(func));
+ std::string fn(PyString_AsString(val));
+ getUndergroundGeneratedGraph()->saveSchema(fn);
+ return false;
+ }
+}
+
bool YACSEvalYFXRunOnlyPattern::IsMatching(YACS::ENGINE::Proc *scheme, YACS::ENGINE::ComposedNode *& runNode)
{
std::list<YACS::ENGINE::Node *> nodes(scheme->getChildren());
virtual std::string getStatusOfRunStr() const = 0;
virtual std::vector<YACSEvalSeqAny *> getResults() const = 0;
virtual std::vector<YACSEvalSeqAny *> getResultsInCaseOfFailure(std::vector<unsigned int>& passedIds) const = 0;
- virtual void emitStart() const = 0;
+ virtual bool go(bool stopASAP) const = 0;
public:
static const char DFT_PROC_NAME[];
protected:
std::string getStatusOfRunStr() const;
std::vector<YACSEvalSeqAny *> getResults() const;
std::vector<YACSEvalSeqAny *> getResultsInCaseOfFailure(std::vector<unsigned int>& passedIds) const;
- void emitStart() const;
+ bool go(bool stopASAP) const;
//
YACS::ENGINE::ForEachLoop *getUndergroundForEach() const { return _FEInGeneratedGraph; }
static bool IsMatching(YACS::ENGINE::Proc *scheme, YACS::ENGINE::ComposedNode *& runNode);
static const char FIRST_FE_SUBNODE_NAME[];
static const char GATHER_NODE_NAME[];
private:
+ void emitStart() const;
void buildInputPorts();
void buildOutputPorts();
YACS::ENGINE::ForEachLoop *findTopForEach() const;
YACSEvalResource();
};
+class YACSEvalParamsForCluster
+{
+public:
+ bool getExclusiveness() const;
+ void setExclusiveness(bool newStatus);
+ std::string getRemoteWorkingDir();
+ void setRemoteWorkingDir(const std::string& remoteWorkingDir);
+ std::string getWCKey() const;
+ void setWCKey(const std::string& wcKey);
+ unsigned int getNbProcs() const;
+ void setNbProcs(unsigned int nbProcs);
+ void checkConsistency() const;
+private:
+ YACSEvalParamsForCluster();
+};
+
class YACSEvalListOfResources
{
public:
bool isInteractive() const;
YACSEvalResource *at(std::size_t i) const;
unsigned int getNumberOfProcsDeclared() const;
+ void checkOKForRun() const;
+ YACSEvalParamsForCluster& getAddParamsForCluster();
%extend
{
std::size_t __len__() const
private:
PyGILState_STATE _gstate;
};
+
+ class AutoPyRef
+ {
+ public:
+ AutoPyRef(PyObject *pyobj=0):_pyobj(pyobj) { }
+ ~AutoPyRef() { release(); }
+ AutoPyRef(const AutoPyRef& other):_pyobj(other._pyobj) { if(_pyobj) Py_XINCREF(_pyobj); }
+ AutoPyRef& operator=(const AutoPyRef& other) { if(_pyobj==other._pyobj) return *this; release(); _pyobj=other._pyobj; Py_XINCREF(_pyobj); return *this; }
+ operator PyObject *() { return _pyobj; }
+ void set(PyObject *pyobj) { if(pyobj==_pyobj) return ; release(); _pyobj=pyobj; }
+ PyObject *get() { return _pyobj; }
+ bool isNull() const { return _pyobj==0; }
+ PyObject *retn() { if(_pyobj) Py_XINCREF(_pyobj); return _pyobj; }
+ private:
+ void release() { if(_pyobj) Py_XDECREF(_pyobj); _pyobj=0; }
+ private:
+ PyObject *_pyobj;
+ };
}
}
//
#include "PyStdout.hxx"
+#include "Exception.hxx"
+#include "AutoGIL.hxx"
+
#include <structmember.h>
+
#include <string>
+#include <sstream>
+
+#ifdef WIN32
+#include <process.h>
+#define getpid _getpid
+#endif
namespace YACS
{
return (PyObject*)self;
}
+PyObject *evalPy(const std::string& funcName, const std::string& strToEval)
+{
+ std::ostringstream oss0; oss0 << "def " << funcName << "():\n";
+ std::string::size_type i0(0);
+ while(i0<strToEval.length() && i0!=std::string::npos)
+ {
+ std::string::size_type i2(strToEval.find('\n',i0));
+ std::string::size_type lgth(i2!=std::string::npos?i2-i0:std::string::npos);
+ std::string part(strToEval.substr(i0,lgth));
+ if(!part.empty())
+ oss0 << " " << part << "\n";
+ i0=i2!=std::string::npos?i2+1:std::string::npos;
+ }
+ std::string zeCodeStr(oss0.str());
+ std::ostringstream stream;
+ stream << "/tmp/PythonNode_";
+ stream << getpid();
+ AutoPyRef context(PyDict_New());
+ PyDict_SetItemString( context, "__builtins__", PyEval_GetBuiltins() );
+ AutoPyRef code(Py_CompileString(zeCodeStr.c_str(), "kkkk", Py_file_input));
+ if(code.isNull())
+ {
+ std::string errorDetails;
+ PyObject *new_stderr(newPyStdOut(errorDetails));
+ PySys_SetObject((char*)"stderr", new_stderr);
+ PyErr_Print();
+ PySys_SetObject((char*)"stderr", PySys_GetObject((char*)"__stderr__"));
+ Py_DECREF(new_stderr);
+ std::ostringstream oss; oss << "evalPy failed : " << errorDetails;
+ throw Exception(oss.str());
+ }
+ AutoPyRef res(PyEval_EvalCode(reinterpret_cast<PyCodeObject *>((PyObject *)code),context,context));
+ PyObject *ret(PyDict_GetItemString(context,funcName.c_str())); //borrowed ref
+ if(!ret)
+ throw YACS::Exception("evalPy : Error on returned func !");
+ Py_XINCREF(ret);
+ return ret;
+}
+
+PyObject *evalFuncPyWithNoParams(PyObject *func)
+{
+ if(!func)
+ throw YACS::Exception("evalFuncPyWithNoParams : input func is NULL !");
+ AutoPyRef args(PyTuple_New(0));
+ AutoPyRef ret(PyObject_CallObject(func,args));
+ if(ret.isNull())
+ throw YACS::Exception("evalFuncPyWithNoParams : ret is null !");
+ return ret.retn();
+}
+
}
}
{
namespace ENGINE
{
- YACSRUNTIMESALOME_EXPORT PyObject * newPyStdOut( std::string& out );
+ YACSRUNTIMESALOME_EXPORT PyObject *newPyStdOut( std::string& out );
+ YACSRUNTIMESALOME_EXPORT PyObject *evalPy(const std::string& funcName, const std::string& strToEval);
+ YACSRUNTIMESALOME_EXPORT PyObject *evalFuncPyWithNoParams(PyObject *func);
}
}