From 12a8c694ef5f364ea91cbe001740bb858182a2e6 Mon Sep 17 00:00:00 2001 From: ribes Date: Wed, 11 Apr 2007 13:20:37 +0000 Subject: [PATCH] - Adding ParallelContainer implementation --- configure.ac | 1 + idl/Makefile.am | 4 +- src/Container/Makefile.am | 14 +- src/DSC/DSC_Basic/Makefile.am | 11 +- src/Makefile.am | 14 +- src/ParallelContainer/Makefile.am | 117 +++ .../SALOME_ParallelComponent_i.cxx | 772 ++++++++++++++ .../SALOME_ParallelComponent_i.hxx | 153 +++ .../SALOME_ParallelContainerNodeDummy.cxx | 165 +++ .../SALOME_ParallelContainerNodeMpi.cxx | 198 ++++ .../SALOME_ParallelContainerProxyDummy.cxx | 153 +++ .../SALOME_ParallelContainerProxyMpi.cxx | 166 +++ .../SALOME_ParallelContainer_i.cxx | 994 ++++++++++++++++++ .../SALOME_ParallelContainer_i.hxx | 159 +++ 14 files changed, 2903 insertions(+), 18 deletions(-) create mode 100644 src/ParallelContainer/Makefile.am create mode 100644 src/ParallelContainer/SALOME_ParallelComponent_i.cxx create mode 100644 src/ParallelContainer/SALOME_ParallelComponent_i.hxx create mode 100644 src/ParallelContainer/SALOME_ParallelContainerNodeDummy.cxx create mode 100644 src/ParallelContainer/SALOME_ParallelContainerNodeMpi.cxx create mode 100644 src/ParallelContainer/SALOME_ParallelContainerProxyDummy.cxx create mode 100644 src/ParallelContainer/SALOME_ParallelContainerProxyMpi.cxx create mode 100644 src/ParallelContainer/SALOME_ParallelContainer_i.cxx create mode 100644 src/ParallelContainer/SALOME_ParallelContainer_i.hxx diff --git a/configure.ac b/configure.ac index 09a977e8c..9e7b98ba9 100644 --- a/configure.ac +++ b/configure.ac @@ -557,6 +557,7 @@ AC_OUTPUT([ \ ./src/Communication/Makefile \ ./src/Communication_SWIG/Makefile \ ./src/Container/Makefile \ + ./src/ParallelContainer/Makefile \ ./src/DSC/Makefile \ ./src/DSC/DSC_Basic/Makefile \ ./src/DSC/DSC_User/Makefile \ diff --git a/idl/Makefile.am b/idl/Makefile.am index df3667ab6..016a6c9c4 100644 --- a/idl/Makefile.am +++ b/idl/Makefile.am @@ -61,7 +61,8 @@ if MPI_IS_OK endif # all the idl files are needed for make dist -EXTRA_DIST= $(BASEIDL_FILES) $(MPIIDL_FILES) +EXTRA_DIST= $(BASEIDL_FILES) $(MPIIDL_FILES) \ + SALOME_Component.xml DSC_Engines.xml SALOME_Ports.xml # This variable defines the files to be installed salomeidl_DATA = $(IDL_FILES) @@ -215,5 +216,4 @@ mostlyclean-local: -include .depidl CLEANFILES = *PaCO* *.h -EXTRA_DIST = SALOME_Component.xml DSC_Engines.xml SALOME_Ports.xml diff --git a/src/Container/Makefile.am b/src/Container/Makefile.am index 34b2f9e00..6134220a7 100644 --- a/src/Container/Makefile.am +++ b/src/Container/Makefile.am @@ -101,7 +101,8 @@ libSalomeContainer_la_SOURCES=\ Container_i.cxx \ SALOME_FileTransfer_i.cxx \ SALOME_FileRef_i.cxx \ - Container_init_python.cxx + Container_init_python.cxx \ + SALOME_ContainerManager.cxx libSalomeContainer_la_CPPFLAGS =\ $(COMMON_CPPFLAGS) @@ -113,6 +114,10 @@ libSalomeContainer_la_LDFLAGS =\ libSalomeContainer_la_LIBADD =\ $(COMMON_LIBS) +if WITH_PACO_PARALLEL +libSalomeContainer_la_CPPFLAGS += -DWITH_PACO_PARALLEL @PACO_INCLUDES@ +libSalomeContainer_la_LIBADD += @PACO_LIBS@ +endif # # =============================================================== @@ -133,8 +138,7 @@ SALOME_Container_LDADD =\ ../Basics/libSALOMEBasics.la SALOME_ContainerManagerServer_SOURCES =\ - SALOME_ContainerManagerServer.cxx \ - SALOME_ContainerManager.cxx + SALOME_ContainerManagerServer.cxx SALOME_ContainerManagerServer_CPPFLAGS=\ $(COMMON_CPPFLAGS) @@ -144,8 +148,4 @@ SALOME_ContainerManagerServer_LDADD =\ $(COMMON_LIBS) \ ../Basics/libSALOMEBasics.la -if WITH_PACO_PARALLEL -SALOME_ContainerManagerServer_CPPFLAGS += -DWITH_PACO_PARALLEL @PACO_INCLUDES@ -SALOME_ContainerManagerServer_LDADD += @PACO_LIBS@ -endif diff --git a/src/DSC/DSC_Basic/Makefile.am b/src/DSC/DSC_Basic/Makefile.am index 09638e076..3272b3e24 100644 --- a/src/DSC/DSC_Basic/Makefile.am +++ b/src/DSC/DSC_Basic/Makefile.am @@ -20,7 +20,7 @@ # # # File : Makefile.am -# Author : André RIBES (EDF) +# Author : André RIBES (EDF) # Module : KERNEL include $(top_srcdir)/salome_adm/unix/make_common_starter.am @@ -65,7 +65,6 @@ COMMON_LIBS = $(top_builddir)/src/Container/libSalomeContainer.la \ lib_LTLIBRARIES = libSalomeDSCContainer.la libSalomeDSCContainer_la_SOURCES = DSC_i.cxx \ - ConnectionManager_i.cxx \ DSC_interface.cxx \ PortProperties_i.cxx @@ -82,9 +81,11 @@ libSalomeDSCContainer_la_LDFLAGS = -no-undefined -version-info=0:0:0 # bin_PROGRAMS = SALOME_ConnectionManagerServer -SALOME_ConnectionManagerServer_SOURCES = SALOME_ConnectionManagerServer.cxx +SALOME_ConnectionManagerServer_SOURCES = SALOME_ConnectionManagerServer.cxx \ + ConnectionManager_i.cxx SALOME_ConnectionManagerServer_CXXFLAGS = $(COMMON_CPPFLAGS) -SALOME_ConnectionManagerServer_LDADD = libSalomeDSCContainer.la \ - $(COMMON_LIBS) +SALOME_ConnectionManagerServer_LDADD = $(top_builddir)/idl/libSalomeIDLKernel.la \ + $(top_builddir)/src/NamingService/libSalomeNS.la \ + $(top_builddir)/src/SALOMELocalTrace/libSALOMELocalTrace.la diff --git a/src/Makefile.am b/src/Makefile.am index 2b3828839..55eff87da 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -80,10 +80,14 @@ SUBDIR_BATCH= \ Batch \ Batch_SWIG +SUBDIR_PARALLEL= ParallelContainer + + DIST_SUBDIRS = \ $(SUBDIR_BASE) \ $(SUBDIR_CORBA) \ $(SUBDIR_MPI) \ + $(SUBDIR_PARALLEL) \ $(SUBDIR_BATCH) \ $(SUBDIR_CPPUNIT_BASE) \ $(SUBDIR_CPPUNIT_CORBA) \ @@ -101,9 +105,9 @@ if CORBA_GEN SUBDIRS += $(SUBDIR_CORBA) endif -if MPI_IS_OK - SUBDIRS += $(SUBDIR_MPI) -endif +#if MPI_IS_OK +# SUBDIRS += $(SUBDIR_MPI) +#endif if WITH_BATCH SUBDIRS += $(SUBDIR_BATCH) @@ -121,4 +125,6 @@ else $(SUBDIR_CPPUNIT_GENERAL) endif endif - +if WITH_PACO_PARALLEL +SUBDIRS += $(SUBDIR_PARALLEL) +endif diff --git a/src/ParallelContainer/Makefile.am b/src/ParallelContainer/Makefile.am new file mode 100644 index 000000000..0e04b8a16 --- /dev/null +++ b/src/ParallelContainer/Makefile.am @@ -0,0 +1,117 @@ +# Copyright (C) 2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN, +# CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com +# +# +# +# File : Makefile.am +# Author : André RIBES (EDF) +# Module : KERNEL +# + +include $(top_srcdir)/salome_adm/unix/make_common_starter.am + +# +# =============================================================== +# Header to be installed +# =============================================================== +# +# header files +salomeinclude_HEADERS = SALOME_ParallelComponent_i.hxx \ + SALOME_ParallelContainer_i.hxx + +# +# =============================================================== +# Local definitions +# =============================================================== +# + +# This local variable defines the list of CPPFLAGS common to all target in this package. +COMMON_CPPFLAGS= -I$(top_srcdir)/src/Container \ + -I$(top_srcdir)/src/Notification \ + -I$(top_srcdir)/src/SALOMELocalTrace \ + -I$(top_srcdir)/src/Basics \ + -I$(top_srcdir)/src/NamingService \ + -I$(top_srcdir)/src/Registry \ + -I$(top_srcdir)/src/Utils \ + -I$(top_builddir)/salome_adm/unix \ + -I$(top_builddir)/idl \ + -I$(top_srcdir)/src/SALOMETraceCollector \ + @CORBA_CXXFLAGS@ \ + @CORBA_INCLUDES@ \ + @PACO_INCLUDES@ + +# This local variable defines the list of dependant libraries common to all target in this package. +COMMON_LIBS = $(top_builddir)/src/Container/libSalomeContainer.la \ + $(top_builddir)/idl/libSalomeParallelIDLKernel.la \ + @CORBA_LIBS@ \ + @PACO_LIBS@ + +# +# =============================================================== +# Libraries targets +# =============================================================== +# +lib_LTLIBRARIES = libSalomeParallelContainer.la + +libSalomeParallelContainer_la_SOURCES = SALOME_ParallelComponent_i.cxx \ + SALOME_ParallelContainer_i.cxx + +libSalomeParallelContainer_la_CXXFLAGS = $(COMMON_CPPFLAGS) + +libSalomeParallelContainer_la_LIBADD = $(COMMON_LIBS) + +libSalomeParallelContainer_la_LDFLAGS = -no-undefined -version-info=0:0:0 + +# +# =============================================================== +# Executables targets +# =============================================================== +# + +if MPI_IS_OK +bin_mpi_programs = SALOME_ParallelContainerProxyMpi \ + SALOME_ParallelContainerNodeMpi +endif + +bin_PROGRAMS = SALOME_ParallelContainerProxyDummy \ + SALOME_ParallelContainerNodeDummy \ + $(bin_mpi_programs) + +SALOME_ParallelContainerProxyDummy_SOURCES = SALOME_ParallelContainerProxyDummy.cxx +SALOME_ParallelContainerProxyDummy_CXXFLAGS = $(COMMON_CPPFLAGS) +SALOME_ParallelContainerProxyDummy_LDADD = libSalomeParallelContainer.la \ + -L@PACOPATH@/lib -lPaCO_dummy -lPaCO_omnithread + +SALOME_ParallelContainerNodeDummy_SOURCES = SALOME_ParallelContainerNodeDummy.cxx +SALOME_ParallelContainerNodeDummy_CXXFLAGS = $(COMMON_CPPFLAGS) +SALOME_ParallelContainerNodeDummy_LDADD = libSalomeParallelContainer.la \ + -L@PACOPATH@/lib -lPaCO_dummy -lPaCO_omnithread + +SALOME_ParallelContainerProxyMpi_SOURCES = SALOME_ParallelContainerProxyMpi.cxx +SALOME_ParallelContainerProxyMpi_CXXFLAGS = $(COMMON_CPPFLAGS) @MPI_INCLUDES@ +SALOME_ParallelContainerProxyMpi_LDADD = libSalomeParallelContainer.la \ + -L@PACOPATH@/lib -lPaCO_dummy -lPaCO_mpi -lPaCO_omnithread \ + @MPI_LIBS@ + +SALOME_ParallelContainerNodeMpi_SOURCES = SALOME_ParallelContainerNodeMpi.cxx +SALOME_ParallelContainerNodeMpi_CXXFLAGS = $(COMMON_CPPFLAGS) @MPI_INCLUDES@ +SALOME_ParallelContainerNodeMpi_LDADD = libSalomeParallelContainer.la \ + -L@PACOPATH@/lib -lPaCO_dummy -lPaCO_mpi -lPaCO_omnithread \ + @MPI_LIBS@ + diff --git a/src/ParallelContainer/SALOME_ParallelComponent_i.cxx b/src/ParallelContainer/SALOME_ParallelComponent_i.cxx new file mode 100644 index 000000000..93d3428a0 --- /dev/null +++ b/src/ParallelContainer/SALOME_ParallelComponent_i.cxx @@ -0,0 +1,772 @@ +// SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel +// +// Copyright (C) 2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN, +// CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org +// +// File : SALOME_ParallelComponent_i.cxx +// Author : André RIBES, EDF +// Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA + +#include "SALOME_ParallelComponent_i.hxx" +#include "SALOME_ParallelContainer_i.hxx" + +#include "OpUtil.hxx" +#include +#ifndef WNT +#include +#endif +#include +#include "utilities.h" + +#ifndef WNT +#include +#include +#include +#else +#include +int SIGUSR11 = 1000; +#endif + + +using namespace std; + +extern bool _Sleeping ; +static Engines_Parallel_Component_i * theEngines_Component ; + +bool Engines_Parallel_Component_i::_isMultiStudy = true; +bool Engines_Parallel_Component_i::_isMultiInstance = false; + +//============================================================================= +/*! + * Default constructor, not for use + */ +//============================================================================= + +Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior) : + InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior) +{ + //ASSERT(0); + INFOS("Default Constructor..."); +} + +//============================================================================= +/*! + * Standard Constructor for generic Component, used in derived class + * Connection to Registry and Notification + * \param orb Object Request broker given by Container + * \parap poa Portable Object Adapter from Container (normally root_poa) + * \param contId container CORBA id inside the server + * \param instanceName unique instance name for this object (see Container_i) + * \param interfaceName component class name + * \param notif use of notification + */ +//============================================================================= + +Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, + PortableServer::POA_ptr poa, + PortableServer::ObjectId * contId, + const char *instanceName, + const char *interfaceName, + bool notif) : + InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior), + _instanceName(instanceName), + _interfaceName(interfaceName), + _myConnexionToRegistry(0), + _ThreadId(0) , + _ThreadCpuUsed(0) , + _Executed(false) , + _graphName("") , + _nodeName(""), + _studyId(-1) +{ + MESSAGE("Component constructor with instanceName "<< _instanceName); + //SCRUTE(pd_refCount); + _orb = CORBA::ORB::_duplicate(orb); + _poa = PortableServer::POA::_duplicate(poa); + _contId = contId ; + CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior... + const CORBA::String_var the_ior = _orb->object_to_string(o); + _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession", + _instanceName.c_str()); + + _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif); + //SCRUTE(pd_refCount); +} + +//============================================================================= +/*! + * Destructor: call Container for decrement of instances count. + * When instances count falls to 0, the container tries to remove the + * component library (dlclose) + */ +//============================================================================= + +Engines_Parallel_Component_i::~Engines_Parallel_Component_i() +{ + MESSAGE("Component destructor"); + Engines_Parallel_Container_i::decInstanceCnt(_interfaceName); +} + +//============================================================================= +/*! + * CORBA method: return name of the instance, unique in this Container + */ +//============================================================================= + +char* Engines_Parallel_Component_i::instanceName() +{ + return CORBA::string_dup(_instanceName.c_str()) ; +} + +//============================================================================= +/*! + * CORBA method: return name of the component class + */ +//============================================================================= + +char* Engines_Parallel_Component_i::interfaceName() +{ + return CORBA::string_dup(_interfaceName.c_str()) ; +} + +//============================================================================= +/*! + * CORBA method: Get study Id + * \return -1: not initialised (Internal Error) + * 0: multistudy component instance + * >0: study id associated to this instance + */ +//============================================================================= + +CORBA::Long Engines_Parallel_Component_i::getStudyId() +{ + return _studyId; +} + +//============================================================================= +/*! + * CORBA method: Test if instance is alive and responds + */ +//============================================================================= + +void Engines_Parallel_Component_i::ping_c() +{ + // MESSAGE("Engines_Parallel_Component_i::ping_c() pid "<< getpid() << " threadid " << pthread_self()); +} + +//============================================================================= +/*! + * CORBA method: Deactivate this instance. CORBA object is deactivated (do not + * respond any more to CORBA calls), the connection to Regsitry is removed + * (Registry informed of deactivation), internal server reference counter on + * the derived servant class is decremented, to allow destruction of the class + * (delete) by POA, when there are no more references. + * -- TO BE USED BY CONTAINER ONLY (Container housekeeping) -- + */ +//============================================================================= + +void Engines_Parallel_Component_i::destroy() +{ + MESSAGE("Engines_Parallel_Component_i::destroy()"); + //SCRUTE(pd_refCount); + + delete _notifSupplier; + _notifSupplier = 0; + + delete _myConnexionToRegistry; + _myConnexionToRegistry = 0 ; + _poa->deactivate_object(*_id) ; + CORBA::release(_poa) ; + delete(_id) ; + //SCRUTE(pd_refCount); + _thisObj->_remove_ref(); + //SCRUTE(pd_refCount); + MESSAGE("Engines_Parallel_Component_i::destroyed") ; +} + +//============================================================================= +/*! + * CORBA method: return CORBA reference of the Container + * + */ +//============================================================================= + +Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef() +{ + MESSAGE("Engines_Parallel_Component_i::GetContainerRef"); + CORBA::Object_ptr o = _poa->id_to_reference(*_contId) ; + return Engines::Container::_narrow(o); +} + +//============================================================================= +/*! + * CORBA method: + * Gives a sequence of (key=string,value=any) to the component. + * Base class component stores the sequence in a map. + * The map is cleared before. + * This map is for use by derived classes. + * \param dico sequence of (key=string,value=any) + */ +//============================================================================= + +void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico) +{ + _fieldsDict.clear(); + for (CORBA::ULong i=0; ilength(_fieldsDict.size()); + map::iterator it; + CORBA::ULong i = 0; + for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++) + { + std::string cle((*it).first); + copie[i].key = CORBA::string_dup(cle.c_str()); + copie[i].value = _fieldsDict[cle]; + } + return copie._retn(); +} + +//============================================================================= +/*! + * CORBA method: used by Supervision to give names to this instance + */ +//============================================================================= + +void Engines_Parallel_Component_i::Names( const char * graphName , + const char * nodeName ) +{ + _graphName = graphName ; + _nodeName = nodeName ; + INFOS("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '" + << _nodeName << "' )"); +} + +//============================================================================= +/*! + * CORBA method: used in Supervision + */ +//============================================================================= + +bool Engines_Parallel_Component_i::Kill_impl() +{ +// MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self() +// << " pid " << getpid() << " instanceName " +// << _instanceName.c_str() << " interface " << _interfaceName.c_str() +// << " machineName " << GetHostname().c_str()<< " _id " << hex << _id +// << dec << " _ThreadId " << _ThreadId << " this " << hex << this +// << dec ) ; + + bool RetVal = false ; +#ifndef WNT + if ( _ThreadId > 0 && pthread_self() != _ThreadId ) + { + RetVal = Killer( _ThreadId , 0 ) ; + _ThreadId = (pthread_t ) -1 ; + } + +#else + if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p ) + { + RetVal = Killer( *_ThreadId , 0 ) ; + _ThreadId = (pthread_t* ) 0 ; + } + +#endif + return RetVal ; +} + +//============================================================================= +/*! + * CORBA method: used in Supervision + */ +//============================================================================= + +bool Engines_Parallel_Component_i::Stop_impl() +{ + MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self() + << " pid " << getpid() << " instanceName " + << _instanceName.c_str() << " interface " << _interfaceName.c_str() + << " machineName " << GetHostname().c_str()<< " _id " << hex << _id + << dec << " _ThreadId " << _ThreadId ); + + + bool RetVal = false ; +#ifndef WNT + if ( _ThreadId > 0 && pthread_self() != _ThreadId ) + { + RetVal = Killer( _ThreadId , 0 ) ; + _ThreadId = (pthread_t ) -1 ; + } +#else + if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p ) + { + RetVal = Killer( *_ThreadId , 0 ) ; + _ThreadId = (pthread_t* ) 0 ; + } +#endif + return RetVal ; +} + +//============================================================================= +/*! + * CORBA method: used in Supervision + */ +//============================================================================= + +bool Engines_Parallel_Component_i::Suspend_impl() +{ + MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self() + << " pid " << getpid() << " instanceName " + << _instanceName.c_str() << " interface " << _interfaceName.c_str() + << " machineName " << GetHostname().c_str()<< " _id " << hex << _id + << dec << " _ThreadId " << _ThreadId ); + + bool RetVal = false ; +#ifndef WNT + if ( _ThreadId > 0 && pthread_self() != _ThreadId ) +#else + if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p ) +#endif + { + if ( _Sleeping ) + { + return false ; + } + else + { +#ifndef WNT + RetVal = Killer( _ThreadId ,SIGINT ) ; +#else + RetVal = Killer( *_ThreadId ,SIGINT ) ; +#endif + //if ( RetVal ) _Sleeping = true; + + } + } + return RetVal ; +} + +//============================================================================= +/*! + * CORBA method: used in Supervision + */ +//============================================================================= + +bool Engines_Parallel_Component_i::Resume_impl() +{ + MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self() + << " pid " << getpid() << " instanceName " + << _instanceName.c_str() << " interface " << _interfaceName.c_str() + << " machineName " << GetHostname().c_str()<< " _id " << hex << _id + << dec << " _ThreadId " << _ThreadId ); + bool RetVal = false ; +#ifndef WNT + if ( _ThreadId > 0 && pthread_self() != _ThreadId ) +#else + if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p ) +#endif + { + if ( _Sleeping ) + { + _Sleeping = false ; + RetVal = true ; + } + else + { + RetVal = false ; + } + } + return RetVal ; +} + +//============================================================================= +/*! + * CORBA method: + */ +//============================================================================= + +CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl() +{ + long cpu = 0 ; + if ( _ThreadId || _Executed ) + { + if ( _ThreadId > 0 ) + { +#ifndef WNT + if ( pthread_self() != _ThreadId ) +#else + if ( pthread_self().p != _ThreadId->p ) +#endif + { + if ( _Sleeping ) + { + } + else + { + // Get Cpu in the appropriate thread with that object !... + theEngines_Component = this ; +#ifndef WNT + Killer( _ThreadId ,SIGUSR1 ) ; +#else + Killer( *_ThreadId ,SIGUSR11 ) ; +#endif + } + cpu = _ThreadCpuUsed ; + } + else + { + _ThreadCpuUsed = CpuUsed() ; + cpu = _ThreadCpuUsed ; + // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl " + // << _serviceName << " " << cpu << endl ; + } + } + else + { + cpu = _ThreadCpuUsed ; + // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl " + // << _serviceName << " " << cpu<< endl ; + } + } + else + { + // cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId " + // <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<(_poa->id_to_servant(*_contId)) ; +} + +//============================================================================= +/*! + * C++ method: set study Id + * \param studyId 0 if instance is not associated to a study, + * >0 otherwise (== study id) + * \return true if the set of study Id is OK + * must be set once by Container, at instance creation, + * and cannot be changed after. + */ +//============================================================================= + +CORBA::Boolean Engines_Parallel_Component_i::setStudyId(CORBA::Long studyId) +{ + ASSERT( studyId >= 0); + CORBA::Boolean ret = false; + if (_studyId < 0) // --- not yet initialized + { + _studyId = studyId; + ret = true; + } + else + if ( _studyId == studyId) ret = true; + return ret; +} + +//============================================================================= +/*! + * C++ method: return CORBA instance id, the id is set in derived class + * constructor, when instance is activated. + */ +//============================================================================= + +PortableServer::ObjectId * Engines_Parallel_Component_i::getId() +{ +// MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()"); + return _id ; +} + +//============================================================================= +/*! + * C++ method: used by derived classes for supervision + */ +//============================================================================= + +void Engines_Parallel_Component_i::beginService(const char *serviceName) +{ + MESSAGE(pthread_self() << "Send BeginService notification for " <p = pthread_self().p ; + _ThreadId->x = pthread_self().x ; +#endif + _StartUsed = 0 ; + _StartUsed = CpuUsed_impl() ; + _ThreadCpuUsed = 0 ; + _Executed = true ; + _serviceName = serviceName ; + if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) ) + { + perror("pthread_setcanceltype ") ; + exit(0) ; + } + if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) ) + { + perror("pthread_setcancelstate ") ; + exit(0) ; + } +// MESSAGE(pthread_self() << " Return from BeginService for " << serviceName +// << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed +// << " _graphName " << _graphName << " _nodeName " << _nodeName ); + + // --- for supervisor : all strings given with setProperties + // are set in environment + bool overwrite = true; + map::iterator it; + for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++) + { + std::string cle((*it).first); + if ((*it).second.type()->kind() == CORBA::tk_string) + { + const char* value; + (*it).second >>= value; + // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC. +#if defined __GNUC__ + int ret = setenv(cle.c_str(), value, overwrite); +#else + //CCRT porting : setenv not defined in stdlib.h + std::string s(cle); + s+='='; + s+=value; + // char* cast because 1st arg of linux putenv function + // is not a const char* ! + int ret=putenv((char *)s.c_str()); + //End of CCRT porting +#endif + MESSAGE("--- setenv: "<SetCurCpu() ; +} + +//============================================================================= +/*! + * C++ method: + */ +//============================================================================= + +void Engines_Parallel_Component_i::SetCurCpu() +{ + _ThreadCpuUsed = CpuUsed() ; + // MESSAGE(pthread_self() << + // " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ; +} + +//============================================================================= +/*! + * C++ method: + */ +//============================================================================= + +long Engines_Parallel_Component_i::CpuUsed() +{ + long cpu = 0 ; +#ifndef WNT + struct rusage usage ; + if ( _ThreadId || _Executed ) + { + if ( getrusage( RUSAGE_SELF , &usage ) == -1 ) + { + perror("Engines_Parallel_Component_i::CpuUsed") ; + return 0 ; + } + cpu = usage.ru_utime.tv_sec - _StartUsed ; + // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " " + // << _serviceName << usage.ru_utime.tv_sec << " - " << _StartUsed + // << " = " << cpu << endl ; + } + else + { + // cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId " + // << _ThreadId << " " << _serviceName<< " _StartUsed " + // << _StartUsed << endl ; + } +#else + // NOT implementet yet +#endif + + + return cpu ; +} + +//============================================================================= +/*! + * C++ method: Send message to event channel + */ +//============================================================================= + +void Engines_Parallel_Component_i::sendMessage(const char *event_type, + const char *message) +{ + _notifSupplier->Send(graphName(), nodeName(), event_type, message); +} + +//============================================================================= +/*! + * C++ method: return standard library name built on component name + */ +//============================================================================= + +string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName) +{ + string ret="lib"; + ret+=componentName; + ret+="Engine.so"; + return ret; +} + +//============================================================================= +/*! + * C++ method: DumpPython default implementation + */ +//============================================================================= + +Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr theStudy, + CORBA::Boolean isPublished, + CORBA::Boolean& isValidScript) +{ + char* aScript = "def RebuildData(theStudy): pass"; + char* aBuffer = new char[strlen(aScript)+1]; + strcpy(aBuffer, aScript); + CORBA::Octet* anOctetBuf = (CORBA::Octet*)aBuffer; + int aBufferSize = strlen(aBuffer)+1; + Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1); + isValidScript = true; + return aStreamFile._retn(); +} diff --git a/src/ParallelContainer/SALOME_ParallelComponent_i.hxx b/src/ParallelContainer/SALOME_ParallelComponent_i.hxx new file mode 100644 index 000000000..1b67853c5 --- /dev/null +++ b/src/ParallelContainer/SALOME_ParallelComponent_i.hxx @@ -0,0 +1,153 @@ +// SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel +// +// Copyright (C) 2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN, +// CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org +// +// File : SALOME_ParallelComponent_i.hxx +// Author : André RIBES, EDF +// Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA + +#ifndef _SALOME_PARALLEL_COMPONENT_I_HXX_ +#define _SALOME_PARALLEL_COMPONENT_I_HXX_ + +#include +#include +#include +#ifndef WNT +#include +#endif +#include +#include +#include +#include + +#include "SALOME_ComponentPaCO_Engines_Component_server.h" + +#include "NOTIFICATION.hxx" +#include "RegistryConnexion.hxx" + +class Engines_Parallel_Container_i; + +#if defined CONTAINER_EXPORTS +#if defined WIN32 +#define CONTAINER_EXPORT __declspec( dllexport ) +#else +#define CONTAINER_EXPORT +#endif +#else +#if defined WNT +#define CONTAINER_EXPORT __declspec( dllimport ) +#else +#define CONTAINER_EXPORT +#endif +#endif + +class CONTAINER_EXPORT Engines_Parallel_Component_i: + public virtual Engines::Component_serv, + public virtual PortableServer::RefCountServantBase +{ +public: + Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior); + Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, + PortableServer::POA_ptr poa, + PortableServer::ObjectId * contId, + const char *instanceName, + const char *interfaceName, + bool notif = false); + + virtual ~Engines_Parallel_Component_i(); + + // --- CORBA methods + + char* instanceName(); + char* interfaceName(); + + void ping_c(); + void destroy(); + + CORBA::Long getStudyId(); + Engines::Container_ptr GetContainerRef(); + + void setProperties(const Engines::FieldsDict& dico); + Engines::FieldsDict* getProperties(); + + void Names( const char * graphName , const char * nodeName ) ; + bool Kill_impl(); + bool Stop_impl(); + bool Suspend_impl(); + bool Resume_impl(); + CORBA::Long CpuUsed_impl() ; + + virtual Engines::TMPFile* DumpPython(CORBA::Object_ptr theStudy, + CORBA::Boolean isPublished, + CORBA::Boolean& isValidScript); + + + // --- local C++ methods + + PortableServer::ObjectId * getId(); + Engines_Parallel_Container_i *GetContainerPtr(); + + bool setStudyId(CORBA::Long studyId); + static bool isMultiStudy(); + static bool isMultiInstance(); + static std::string GetDynLibraryName(const char *componentName); + + void beginService(const char *serviceName); + void endService(const char *serviceName); + void sendMessage(const char *event_type, const char *message); + char * graphName() ; + char * nodeName() ; + bool Killer( pthread_t ThreadId , int signum ); + void SetCurCpu() ; + long CpuUsed() ; + +protected: + int _studyId; // -1: not initialised; 0: multiStudy; >0: study + static bool _isMultiStudy; + static bool _isMultiInstance; + + std::string _instanceName ; + std::string _interfaceName ; + + CORBA::ORB_ptr _orb; + PortableServer::POA_ptr _poa; + PortableServer::ObjectId * _id; + PortableServer::ObjectId * _contId; + Engines_Parallel_Component_i * _thisObj ; + RegistryConnexion *_myConnexionToRegistry; + NOTIFICATION_Supplier* _notifSupplier; + std::map_fieldsDict; + + std::string _serviceName ; + std::string _graphName ; + std::string _nodeName ; + +private: +#ifndef WNT + pthread_t _ThreadId ; +#else + pthread_t* _ThreadId ; +#endif + long _StartUsed ; + long _ThreadCpuUsed ; + bool _Executed ; +}; + +#endif diff --git a/src/ParallelContainer/SALOME_ParallelContainerNodeDummy.cxx b/src/ParallelContainer/SALOME_ParallelContainerNodeDummy.cxx new file mode 100644 index 000000000..d400ab47c --- /dev/null +++ b/src/ParallelContainer/SALOME_ParallelContainerNodeDummy.cxx @@ -0,0 +1,165 @@ +// SALOME ParallelContainerNodeDummy : launcher of a PaCO++ object +// +// Copyright (C) 2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN, +// CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org +// +// +// +// File : SALOME_ParallelContainerNodeDummy.cxx +// Author : André Ribes, EDF +// Module : SALOME PARALLEL + +#include +#include +#include + +#ifndef WNT +#include +#else +#include +#endif + +#include "SALOME_ParallelContainer_i.hxx" + +#include +#include + +#include "SALOME_NamingService.hxx" + +#include "utilities.h" +#include "Utils_ORB_INIT.hxx" +#include "Utils_SINGLETON.hxx" +#include "SALOMETraceCollector.hxx" +#include "OpUtil.hxx" + +using namespace std; + +#ifdef DEBUG_PARALLEL +#include + +void handler(int t) { + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + cerr << "SIGSEGV in :" << getpid() << endl; + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + while (1) {} +} +#endif + +int main(int argc, char* argv[]) +{ + INFOS("Launching a parallel container node"); + +#ifdef DEBUG_PARALLEL + signal(SIGSEGV, handler); +#endif + + // Initialise the ORB. + ORB_INIT &init = *SINGLETON_::Instance(); + ASSERT(SINGLETON_::IsAlreadyExisting()); + CORBA::ORB_var orb = init(0, 0); + //CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + char * containerName = ""; + if(argc > 1) { + containerName = argv[1]; + } + + char * hostname = ""; + if(argc > 3) { + hostname = argv[3]; + } + + try { + CORBA::Object_var obj = orb->resolve_initial_references("RootPOA"); + PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj); + PortableServer::POAManager_var pman = root_poa->the_POAManager(); + +#ifndef WNT + // add this container to the kill list + char aCommand[100]; + sprintf(aCommand, "addToKillList.py %d SALOME_ParallelContainerNode", getpid()); + system(aCommand); +#endif + + SALOME_NamingService * ns = new SALOME_NamingService(CORBA::ORB::_duplicate(orb)); + // Get the proxy + string proxyNameInNS = ns->BuildContainerNameForNS(containerName, hostname); + obj = ns->Resolve(proxyNameInNS.c_str()); + char * proxy_ior = orb->object_to_string(obj); + + // Creating a node + string name(containerName); + string node_name = name + "Node"; + Engines_Parallel_Container_i * servant = new Engines_Parallel_Container_i(CORBA::ORB::_duplicate(orb), proxy_ior, + root_poa, + (char*) node_name.c_str(), + argc, argv); + // PaCO++ init + paco_fabrique_manager * pfm = paco_getFabriqueManager(); + pfm->register_com("dummy", new paco_dummy_fabrique()); + pfm->register_thread("omni", new paco_omni_fabrique()); + + // Global context + PaCO_operation * global_ptr = servant->getContext("global_paco_context"); + global_ptr->setLibCom("dummy",NULL); + global_ptr->setLibThread("omni"); + + // Activation + PortableServer::ObjectId * _id = root_poa->activate_object(servant); + servant->set_id(_id); + obj = root_poa->id_to_reference(*_id); + + // In the NamingService + string hostname = GetHostname(); + int myid = 0; + char buffer [5]; + snprintf(buffer, 5, "%d", myid); + node_name = node_name + buffer; + string _containerName = ns->BuildContainerNameForNS((char*) node_name.c_str(), + hostname.c_str()); + cerr << "---------" << _containerName << "----------" << endl; + ns->Register(obj, _containerName.c_str()); + pman->activate(); + orb->run(); + } + catch(CORBA::SystemException&) + { + INFOS("Caught CORBA::SystemException."); + } + catch(PortableServer::POA::ServantAlreadyActive&) + { + INFOS("Caught CORBA::ServantAlreadyActiveException"); + } + catch(CORBA::Exception&) + { + INFOS("Caught CORBA::Exception."); + } + catch(std::exception& exc) + { + INFOS("Caught std::exception - "< +#include +#include + +#ifndef WNT +#include +#else +#include +#endif + +#include "SALOME_ParallelContainer_i.hxx" + +// PaCO++ include +#include +#include + +#include + +#include "SALOME_NamingService.hxx" + +#include "utilities.h" +#include "Utils_ORB_INIT.hxx" +#include "Utils_SINGLETON.hxx" +#include "SALOMETraceCollector.hxx" +#include "OpUtil.hxx" + +using namespace std; + +#ifdef DEBUG_PARALLEL +#include + +void handler(int t) { + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + cerr << "SIGSEGV in :" << getpid() << endl; + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + while (1) {} +} +#endif + +int main(int argc, char* argv[]) +{ + INFOS("Launching a parallel Mpi container node"); + +#ifdef DEBUG_PARALLEL + signal(SIGSEGV, handler); +#endif + + // MPI Init + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE ,&provided); + +#ifdef DEBUG_PARALLEL + cerr << "Level MPI_THREAD_SINGLE : " << MPI_THREAD_SINGLE << endl; + cerr << "Level MPI_THREAD_SERIALIZED : " << MPI_THREAD_SERIALIZED << endl; + cerr << "Level MPI_THREAD_FUNNELED : " << MPI_THREAD_FUNNELED << endl; + cerr << "Level MPI_THREAD_MULTIPLE : " << MPI_THREAD_MULTIPLE << endl; + cerr << "Level provided : " << provided << endl; +#endif + // Initialise the ORB. + ORB_INIT &init = *SINGLETON_::Instance(); + ASSERT(SINGLETON_::IsAlreadyExisting()); + CORBA::ORB_var orb = init(0, 0); + //CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + // Code pour choisir le reseau infiniband ..... +/* string hostname_temp = GetHostname(); + hostent * t = gethostbyname(hostname_temp.c_str()); + cerr << " AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA " << t->h_addr << " " << hostname_temp << endl; + cerr << t->h_addr << endl; + in_addr * address=(in_addr * ) t->h_addr; + cerr << inet_ntoa(* address) << endl; + string ip = inet_ntoa(* address); + cerr << " AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA " << endl; + string com = "giop:tcp:" + ip + ":"; + const char* options[][2] = { { "endPoint", com.c_str() }, { 0, 0 } }; + CORBA::ORB_var orb = CORBA::ORB_init(argc, argv, "omniORB4", options); +*/ + char * containerName = ""; + if(argc > 1) { + containerName = argv[1]; + } + + char * hostname = ""; + if(argc > 3) { + hostname = argv[3]; + } + + try { + CORBA::Object_var obj = orb->resolve_initial_references("RootPOA"); + PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj); + PortableServer::POAManager_var pman = root_poa->the_POAManager(); + +#ifndef WNT + // add this container to the kill list + char aCommand[100]; + sprintf(aCommand, "addToKillList.py %d SALOME_ParallelContainerNodeMpi", getpid()); + system(aCommand); +#endif + + SALOME_NamingService * ns = new SALOME_NamingService(CORBA::ORB::_duplicate(orb)); + // On récupère le proxy + string proxyNameInNS = ns->BuildContainerNameForNS(containerName, hostname); + obj = ns->Resolve(proxyNameInNS.c_str()); + char * proxy_ior = orb->object_to_string(obj); + + // Node creation + string name(containerName); + string node_name = name + "Node"; + Engines_Parallel_Container_i * servant = new Engines_Parallel_Container_i(CORBA::ORB::_duplicate(orb), proxy_ior, + root_poa, + (char*) node_name.c_str(), + argc, argv); + // PaCO++ init + paco_fabrique_manager * pfm = paco_getFabriqueManager(); + pfm->register_com("mpi", new paco_mpi_fabrique()); + pfm->register_thread("omni", new paco_omni_fabrique()); + + // Global context + PaCO_operation * global_ptr = servant->getContext("global_paco_context"); + MPI_Comm group = MPI_COMM_WORLD; + global_ptr->setLibCom("mpi", &group); + global_ptr->setLibThread("omni"); + + // Activation + PortableServer::ObjectId * _id = root_poa->activate_object(servant); + servant->set_id(_id); + obj = root_poa->id_to_reference(*_id); + + // In the NamingService + string hostname = GetHostname(); + + int myid; + MPI_Comm_rank(MPI_COMM_WORLD, &myid); + char buffer [5]; + snprintf(buffer, 5, "%d", myid); + node_name = node_name + buffer; + string _containerName = ns->BuildContainerNameForNS((char*) node_name.c_str(), + hostname.c_str()); + cerr << "---------" << _containerName << "----------" << endl; + ns->Register(obj, _containerName.c_str()); + pman->activate(); + orb->run(); + } + catch(CORBA::SystemException&) + { + INFOS("Caught CORBA::SystemException."); + } + catch(PortableServer::POA::ServantAlreadyActive&) + { + INFOS("Caught CORBA::ServantAlreadyActiveException"); + } + catch(CORBA::Exception&) + { + INFOS("Caught CORBA::Exception."); + } + catch(std::exception& exc) + { + INFOS("Caught std::exception - "< +#include +#include + +#ifndef WNT +#include +#else +#include +#endif + +// PaCO++ include +#include "SALOME_ComponentPaCO_Engines_Container_server.h" +#include +#include + +#include "SALOME_NamingService.hxx" + +#include "utilities.h" +#include "Utils_ORB_INIT.hxx" +#include "Utils_SINGLETON.hxx" +#include "SALOMETraceCollector.hxx" +#include "OpUtil.hxx" + +#ifdef DEBUG_PARALLEL +#include +using namespace std; + +void handler(int t) { + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + cerr << "SIGSEGV in :" << getpid() << endl; + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + while (1) {} +} +#endif + +int main(int argc, char* argv[]) +{ + INFOS("Launching a parallel proxy container"); + +#ifdef DEBUG_PARALLEL + signal(SIGSEGV, handler); +#endif + + // Initialise the ORB. + ORB_INIT &init = *SINGLETON_::Instance(); + ASSERT(SINGLETON_::IsAlreadyExisting()); + CORBA::ORB_var orb = init(0, 0); + //CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + char *containerName = ""; + if(argc > 1) { + containerName = argv[1]; + } + + try { + CORBA::Object_var obj = orb->resolve_initial_references("RootPOA"); + ASSERT(!CORBA::is_nil(obj)); + PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj); + PortableServer::POAManager_var pman = root_poa->the_POAManager(); + +#ifndef WNT + // add this container to the kill list + char aCommand[100]; + sprintf(aCommand, "addToKillList.py %d SALOME_ParallelContainerProxy", getpid()); + system(aCommand); +#endif + + SALOME_NamingService * ns = new SALOME_NamingService(CORBA::ORB::_duplicate(orb)); + Engines::Container_proxy_impl * proxy = new Engines::Container_proxy_impl(CORBA::ORB::_duplicate(orb)); + + // PaCO++ code + paco_fabrique_manager* pfm = paco_getFabriqueManager(); + // Global context + PaCO_operation * global_ptr = proxy->getContext("global_paco_context"); + pfm->register_com("dummy", new paco_dummy_fabrique()); + global_ptr->setLibCom("dummy", NULL); + pfm->register_thread("omnithread", new paco_omni_fabrique()); + global_ptr->setLibThread("omnithread"); + global_ptr->setTypeClient(true); + // The proxy is a PaCO++ client + PaCO::PacoTopology_t client_topo; + client_topo.total = 1; + global_ptr->setClientTopo(client_topo); + // Topo of the parallel object + PaCO::PacoTopology_t serveur_topo; + serveur_topo.total = 1; + proxy->setTopo(serveur_topo); + + PortableServer::ObjectId_var _id = root_poa->activate_object(proxy); + obj = root_poa->id_to_reference(_id); + + // In the NamingService + string hostname = GetHostname(); + Engines::Container_var pCont = Engines::Container::_narrow(obj); + string _containerName = ns->BuildContainerNameForNS(containerName, + hostname.c_str()); + cerr << "---------" << _containerName << "----------" << endl; + ns->Register(pCont, _containerName.c_str()); + pman->activate(); + orb->run(); + } + catch(CORBA::SystemException&) + { + INFOS("Caught CORBA::SystemException."); + } + catch(PortableServer::POA::ServantAlreadyActive&) + { + INFOS("Caught CORBA::ServantAlreadyActiveException"); + } + catch(CORBA::Exception&) + { + INFOS("Caught CORBA::Exception."); + } + catch(std::exception& exc) + { + INFOS("Caught std::exception - "< +#include +#include + +#ifndef WNT +#include +#else +#include +#endif + +#include "SALOME_ComponentPaCO_Engines_Container_server.h" +#include +#include + +#include + +#include "SALOME_NamingService.hxx" + +#include "utilities.h" +#include "Utils_ORB_INIT.hxx" +#include "Utils_SINGLETON.hxx" +#include "SALOMETraceCollector.hxx" +#include "OpUtil.hxx" +using namespace std; + +#ifdef DEBUG_PARALLEL +#include + +void handler(int t) { + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + cerr << "SIGSEGV in :" << getpid() << endl; + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + cerr << "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" << endl; + while (1) {} +} +#endif + +int main(int argc, char* argv[]) +{ + INFOS("Launching a parallel Mpi proxy container"); + +#ifdef DEBUG_PARALLEL + signal(SIGSEGV, handler); +#endif + + // MPI Init + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED ,&provided); + + // Initialise the ORB. + ORB_INIT &init = *SINGLETON_::Instance(); + ASSERT(SINGLETON_::IsAlreadyExisting()); + CORBA::ORB_var orb = init(0, 0); + //CORBA::ORB_var orb = CORBA::ORB_init(argc, argv); + + char *containerName = ""; + if(argc > 1) { + containerName = argv[1]; + } + + char * nb_nodes = ""; + if(argc > 2) { + nb_nodes = argv[2]; + } + + try { + CORBA::Object_var obj = orb->resolve_initial_references("RootPOA"); + ASSERT(!CORBA::is_nil(obj)); + PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj); + PortableServer::POAManager_var pman = root_poa->the_POAManager(); + +#ifndef WNT + // add this container to the kill list + char aCommand[100]; + sprintf(aCommand, "addToKillList.py %d SALOME_ParallelContainerProxyMpi", getpid()); + system(aCommand); +#endif + + SALOME_NamingService * ns = new SALOME_NamingService(CORBA::ORB::_duplicate(orb)); + Engines::Container_proxy_impl * proxy = new Engines::Container_proxy_impl(CORBA::ORB::_duplicate(orb)); + + // PaCO++ code + paco_fabrique_manager* pfm = paco_getFabriqueManager(); + // Global context + PaCO_operation * global_ptr = proxy->getContext("global_paco_context"); + pfm->register_com("mpi", new paco_mpi_fabrique()); + MPI_Comm group = MPI_COMM_WORLD; + global_ptr->setLibCom("mpi", &group); + pfm->register_thread("omnithread", new paco_omni_fabrique()); + global_ptr->setLibThread("omnithread"); + global_ptr->setTypeClient(true); + PaCO::PacoTopology_t client_topo; + client_topo.total = 1; + global_ptr->setClientTopo(client_topo); + PaCO::PacoTopology_t serveur_topo; + serveur_topo.total = atoi(nb_nodes); + proxy->setTopo(serveur_topo); + + // Activation + PortableServer::ObjectId_var _id = root_poa->activate_object(proxy); + obj = root_poa->id_to_reference(_id); + + // in the NamingService + string hostname = GetHostname(); + Engines::Container_var pCont = Engines::Container::_narrow(obj); + string _containerName = ns->BuildContainerNameForNS(containerName, + hostname.c_str()); + cerr << "---------" << _containerName << "----------" << endl; + ns->Register(pCont, _containerName.c_str()); + pman->activate(); + orb->run(); + } + catch(CORBA::SystemException&) + { + INFOS("Caught CORBA::SystemException."); + } + catch(PortableServer::POA::ServantAlreadyActive&) + { + INFOS("Caught CORBA::ServantAlreadyActiveException"); + } + catch(CORBA::Exception&) + { + INFOS("Caught CORBA::Exception."); + } + catch(std::exception& exc) + { + INFOS("Caught std::exception - "< +#ifndef WNT +#else +#include +#endif +#include "SALOME_ParallelContainer_i.hxx" + +#include "SALOME_Component_i.hxx" + +#include "SALOME_FileRef_i.hxx" +#include "SALOME_FileTransfer_i.hxx" +#include "SALOME_NamingService.hxx" +#include "OpUtil.hxx" + +#include "OpUtil.hxx" +#include +#include +#ifndef WNT +#include +#include +#else +#include "../../adm/win32/SALOME_WNT.hxx" +#include +#include +int SIGUSR1 = 1000; +#endif + +#include "utilities.h" +using namespace std; + +bool _Sleeping = false ; + +// Containers with name FactoryServer are started via rsh in LifeCycleCORBA +// Other Containers are started via start_impl of FactoryServer + +extern "C" {void ActSigIntHandler() ; } +#ifndef WNT +extern "C" {void SigIntHandler(int, siginfo_t *, void *) ; } +#else +extern "C" {void SigIntHandler( int ) ; } +#endif + + +map Engines_Parallel_Container_i::_cntInstances_map; +map Engines_Parallel_Container_i::_library_map; +map Engines_Parallel_Container_i::_toRemove_map; +omni_mutex Engines_Parallel_Container_i::_numInstanceMutex ; + +//============================================================================= +/*! + * Default constructor, not for use + */ +//============================================================================= + +Engines_Parallel_Container_i::Engines_Parallel_Container_i (CORBA::ORB_ptr orb, char * ior) : + InterfaceParallel_impl(orb,ior), Engines::Container_serv(orb,ior), + _numInstance(0) +{ +} + +//============================================================================= +/*! + * Construtor to use + */ +//============================================================================= + +Engines_Parallel_Container_i::Engines_Parallel_Container_i (CORBA::ORB_ptr orb, char * ior, + PortableServer::POA_ptr poa, + char *containerName , + int argc , char* argv[], + bool activAndRegist, + bool isServantAloneInProcess + ) : + InterfaceParallel_impl(orb,ior), Engines::Container_serv(orb,ior), + _numInstance(0),_isServantAloneInProcess(isServantAloneInProcess) +{ + _pid = (long)getpid(); + + if(activAndRegist) + ActSigIntHandler() ; + + _argc = argc ; + _argv = argv ; + + string hostname = GetHostname(); + + _orb = CORBA::ORB::_duplicate(orb) ; + _poa = PortableServer::POA::_duplicate(poa) ; + _NS = new SALOME_NamingService(); + _NS->init_orb( CORBA::ORB::_duplicate(_orb) ); + _containerName = _NS->BuildContainerNameForNS(containerName, hostname.c_str()); + + fileTransfer_i* aFileTransfer = new fileTransfer_i(); + _fileTransfer = Engines::fileTransfer::_narrow(aFileTransfer->_this()); +} + +//============================================================================= +/*! + * Destructor + */ +//============================================================================= + +Engines_Parallel_Container_i::~Engines_Parallel_Container_i() +{ + MESSAGE("Container_i::~Container_i()"); + delete _id; +} + +//============================================================================= +/*! + * CORBA attribute: Container name (see constructor) + */ +//============================================================================= + +char* Engines_Parallel_Container_i::name() +{ + return CORBA::string_dup(_containerName.c_str()) ; +} + +//============================================================================= +/*! + * CORBA method: Get the hostName of the Container (without domain extensions) + */ +//============================================================================= + +char* Engines_Parallel_Container_i::getHostName() +{ + string s = GetHostname(); + MESSAGE("Engines_Parallel_Container_i::getHostName " << s); + return CORBA::string_dup(s.c_str()) ; +} + +//============================================================================= +/*! + * CORBA method: Get the PID (process identification) of the Container + */ +//============================================================================= + +CORBA::Long Engines_Parallel_Container_i::getPID() +{ + return (CORBA::Long)getpid(); +} + +//============================================================================= +/*! + * CORBA method: check if servant is still alive + */ +//============================================================================= + +void Engines_Parallel_Container_i::ping() +{ + MESSAGE("Engines_Parallel_Container_i::ping() pid "<< getpid()); +} + +//============================================================================= +/*! + * CORBA method, oneway: Server shutdown. + * - Container name removed from naming service, + * - servant deactivation, + * - orb shutdown if no other servants in the process + */ +//============================================================================= + +void Engines_Parallel_Container_i::Shutdown() +{ + MESSAGE("Engines_Parallel_Container_i::Shutdown()"); + _NS->Destroy_FullDirectory(_containerName.c_str()); + //_remove_ref(); + //_poa->deactivate_object(*_id); + if(_isServantAloneInProcess) + { + MESSAGE("Effective Shutdown of container Begins..."); + LocalTraceBufferPool* bp1 = LocalTraceBufferPool::instance(); + bp1->deleteInstance(bp1); + _orb->shutdown(0); + } +} + + +//============================================================================= +/*! + * CORBA method: load a new component class (Python or C++ implementation) + * \param componentName like COMPONENT + * try to make a Python import of COMPONENT, + * then a lib open of libCOMPONENTEngine.so + * \return true if dlopen successfull or already done, false otherwise + */ +//============================================================================= + +bool +Engines_Parallel_Container_i::load_component_Library(const char* componentName) +{ + bool ret = false; + string aCompName = componentName; + // --- try dlopen C++ component + +#ifndef WNT + string impl_name = string ("lib") + aCompName + string("Engine.so"); +#else + string impl_name = aCompName + string("Engine.dll"); +#endif + + SCRUTE(impl_name); + + _numInstanceMutex.lock(); // lock to be alone + if (_toRemove_map[impl_name]) _toRemove_map.erase(impl_name); + if (_library_map[impl_name]) + { + MESSAGE("Library " << impl_name << " already loaded"); + _numInstanceMutex.unlock(); + ret = true; + } + + void* handle; +#ifndef WNT + handle = dlopen( impl_name.c_str() , RTLD_LAZY ) ; +#else + handle = dlopen( impl_name.c_str() , 0 ) ; +#endif + if ( handle ) + { + _library_map[impl_name] = handle; + _numInstanceMutex.unlock(); + ret = true; + } + else + { + cerr << "Can't load shared library : " << impl_name << endl; + cerr << "error dlopen: " << dlerror() << endl; + _numInstanceMutex.unlock(); + ret = false; + } + + // To be sure that all the nodes of the component as loaded the library + global_paco_context_ptr->my_com->paco_barrier(); + + return ret; +} + +//============================================================================= +/*! + * CORBA method: Creates a new servant instance of a component. + * The servant registers itself to naming service and Registry. + * \param genericRegisterName Name of the component instance to register + * in Registry & Name Service (without _inst_n suffix) + * \param studyId 0 for multiStudy instance, + * study Id (>0) otherwise + * \return a loaded component + */ +//============================================================================= + +Engines::Component_ptr +Engines_Parallel_Container_i::create_component_instance(const char*genericRegisterName, + CORBA::Long studyId) +{ + cerr << "----------------- create_component_instance node : " << getMyRank() << endl; + + if (studyId < 0) + { + INFOS("studyId must be > 0 for mono study instance, =0 for multiStudy"); + return Engines::Component::_nil() ; + } + + Engines::Component_var iobject = Engines::Component::_nil() ; + + // is it a parallel component ? + bool parallel = false; + string aCompName = genericRegisterName; + int par = aCompName.find("@PARALLEL@"); + if (par>0) { + parallel = true; + aCompName = aCompName.substr(0,par); + } + + //--- try C++ +#ifndef WNT + string impl_name = string ("lib") + aCompName +string("Engine.so"); +#else + string impl_name = aCompName +string("Engine.dll"); +#endif + void* handle = _library_map[impl_name]; + + if ( !handle ) + { + cerr << "shared library " << impl_name <<"must be loaded before instance" << endl;; + return Engines::Component::_nil() ; + } + else + { + if (parallel) { + // Sequential component case + // Component parallel proxy created on node 0 + iobject = createParallelInstance(aCompName, + handle, + studyId); + + } + else { + // Sequential component case + iobject = createInstance(aCompName, + handle, + studyId); + } + + return iobject._retn(); + } +} + +//============================================================================= +/*! + * CORBA method: Finds a servant instance of a component + * \param registeredName Name of the component in Registry or Name Service, + * without instance suffix number + * \param studyId 0 if instance is not associated to a study, + * >0 otherwise (== study id) + * \return the first instance found with same studyId + */ +//============================================================================= + +Engines::Component_ptr Engines_Parallel_Container_i::find_component_instance( const char* registeredName, + CORBA::Long studyId) +{ + Engines::Component_var anEngine = Engines::Component::_nil(); + map::iterator itm =_listInstances_map.begin(); + while (itm != _listInstances_map.end()) + { + string instance = (*itm).first; + SCRUTE(instance); + if (instance.find(registeredName) == 0) + { + anEngine = (*itm).second; + if (studyId == anEngine->getStudyId()) + { + return anEngine._retn(); + } + } + itm++; + } + return anEngine._retn(); +} + +//============================================================================= +/*! + * CORBA method: find or create an instance of the component (servant), + * load a new component class (dynamic library) if required, + * ---- FOR COMPATIBILITY WITH 2.2 ---- + * ---- USE ONLY FOR MULTISTUDY INSTANCES ! -------- + * The servant registers itself to naming service and Registry. + * \param genericRegisterName Name of the component to register + * in Registry & Name Service + * \param componentName Name of the constructed library of the component + * \return a loaded component + */ +//============================================================================= + +Engines::Component_ptr Engines_Parallel_Container_i::load_impl( const char* genericRegisterName, + const char* componentName ) +{ + string impl_name = string ("lib") + genericRegisterName +string("Engine.so"); + Engines::Component_var iobject = Engines::Component::_nil() ; + if (load_component_Library(genericRegisterName)) + iobject = find_or_create_instance(genericRegisterName, impl_name); + return iobject._retn(); +} + + +//============================================================================= +/*! + * CORBA method: Stops the component servant, and deletes all related objects + * \param component_i Component to be removed + */ +//============================================================================= + +void Engines_Parallel_Container_i::remove_impl(Engines::Component_ptr component_i) +{ + ASSERT(! CORBA::is_nil(component_i)); + string instanceName = component_i->instanceName() ; + MESSAGE("unload component " << instanceName); + _listInstances_map.erase(instanceName); + component_i->destroy() ; + _NS->Destroy_Name(instanceName.c_str()); +} + +//============================================================================= +/*! + * CORBA method: Discharges unused libraries from the container. + */ +//============================================================================= + +void Engines_Parallel_Container_i::finalize_removal() +{ + MESSAGE("finalize unload : dlclose"); + _numInstanceMutex.lock(); // lock to be alone + // (see decInstanceCnt, load_component_Library) + map::iterator ith; + for (ith = _toRemove_map.begin(); ith != _toRemove_map.end(); ith++) + { + void *handle = (*ith).second; + string impl_name= (*ith).first; + if (handle) + { + SCRUTE(handle); + SCRUTE(impl_name); + // dlclose(handle); // SALOME unstable after ... + // _library_map.erase(impl_name); + } + } + _toRemove_map.clear(); + _numInstanceMutex.unlock(); +} + +//============================================================================= +/*! + * CORBA method: Kill the container process with exit(0). + * To remove : never returns ! + */ +//============================================================================= + +bool Engines_Parallel_Container_i::Kill_impl() +{ + MESSAGE("Engines_Parallel_Container_i::Kill() pid "<< getpid() << " containerName " + << _containerName.c_str() << " machineName " + << GetHostname().c_str()); + INFOS("==============================================================="); + INFOS("= REMOVE calls to Kill_impl in C++ container ="); + INFOS("==============================================================="); + //exit( 0 ) ; + ASSERT(0); + return false; +} + +//============================================================================= +/*! + * C++ method: Finds an already existing servant instance of a component, or + * create an instance. + * ---- USE ONLY FOR MULTISTUDY INSTANCES ! -------- + * \param genericRegisterName Name of the component instance to register + * in Registry & Name Service, + * (without _inst_n suffix, like "COMPONENT") + * \param componentLibraryName like "libCOMPONENTEngine.so" + * \return a loaded component + * + * example with names: + * aGenRegisterName = COMPONENT (= first argument) + * impl_name = libCOMPONENTEngine.so (= second argument) + * _containerName = /Containers/cli76ce/FactoryServer + * factoryName = COMPONENTEngine_factory + * component_registerBase = /Containers/cli76ce/FactoryServer/COMPONENT + * + * instanceName = COMPONENT_inst_1 + * component_registerName = /Containers/cli76ce/FactoryServer/COMPONENT_inst_1 + */ +//============================================================================= + +Engines::Component_ptr +Engines_Parallel_Container_i::find_or_create_instance(string genericRegisterName, + string componentLibraryName) +{ + string aGenRegisterName = genericRegisterName; + string impl_name = componentLibraryName; + void* handle = _library_map[impl_name]; + if ( !handle ) + { + INFOS("shared library " << impl_name <<"must be loaded before instance"); + return Engines::Component::_nil() ; + } + else + { + // --- find a registered instance in naming service, or create + + string component_registerBase = + _containerName + "/" + aGenRegisterName; + Engines::Component_var iobject = Engines::Component::_nil() ; + try + { + CORBA::Object_var obj = + _NS->ResolveFirst( component_registerBase.c_str()); + if ( CORBA::is_nil( obj ) ) + { + iobject = createInstance(genericRegisterName, + handle, + 0); // force multiStudy instance here ! + } + else + { + iobject = Engines::Component::_narrow( obj ) ; + Engines_Component_i *servant = + dynamic_cast + (_poa->reference_to_servant(iobject)); + ASSERT(servant) + int studyId = servant->getStudyId(); + ASSERT (studyId >= 0); + if (studyId == 0) // multiStudy instance, OK + { + // No ReBind ! + MESSAGE(component_registerBase.c_str()<<" already bound"); + } + else // monoStudy instance: NOK + { + iobject = Engines::Component::_nil(); + INFOS("load_impl & find_component_instance methods " + << "NOT SUITABLE for mono study components"); + } + } + } + catch (...) + { + INFOS( "Container_i::load_impl catched" ) ; + } + return iobject._retn(); + } +} + +//============================================================================= +/*! + * C++ method: create a servant instance of a component. + * \param genericRegisterName Name of the component instance to register + * in Registry & Name Service, + * (without _inst_n suffix, like "COMPONENT") + * \param handle loaded library handle + * \param studyId 0 for multiStudy instance, + * study Id (>0) otherwise + * \return a loaded component + * + * example with names: + * aGenRegisterName = COMPONENT (= first argument) + * _containerName = /Containers/cli76ce/FactoryServer + * factoryName = COMPONENTEngine_factory + * component_registerBase = /Containers/cli76ce/FactoryServer/COMPONENT + * instanceName = COMPONENT_inst_1 + * component_registerName = /Containers/cli76ce/FactoryServer/COMPONENT_inst_1 + */ +//============================================================================= + +Engines::Component_ptr +Engines_Parallel_Container_i::createInstance(string genericRegisterName, + void *handle, + int studyId) +{ + // --- find the factory + + string aGenRegisterName = genericRegisterName; + string factory_name = aGenRegisterName + string("Engine_factory"); + + SCRUTE(factory_name) ; + + typedef PortableServer::ObjectId * (*FACTORY_FUNCTION) + (CORBA::ORB_ptr, + PortableServer::POA_ptr, + PortableServer::ObjectId *, + const char *, + const char *) ; + + FACTORY_FUNCTION Component_factory + = (FACTORY_FUNCTION) dlsym(handle, factory_name.c_str()); + + char *error ; + if ( (error = dlerror() ) != NULL) + { + INFOS("Can't resolve symbol: " + factory_name); + SCRUTE(error); + return Engines::Component::_nil() ; + } + + // --- create instance + Engines::Component_var iobject = Engines::Component::_nil() ; + try + { + _numInstanceMutex.lock() ; // lock on the instance number + _numInstance++ ; + int numInstance = _numInstance ; + _numInstanceMutex.unlock() ; + + char aNumI[12]; + sprintf( aNumI , "%d" , numInstance ) ; + string instanceName = aGenRegisterName + "_inst_" + aNumI ; + string component_registerName = + _containerName + "/" + instanceName; + + // --- Instanciate required CORBA object + + PortableServer::ObjectId *id ; //not owner, do not delete (nore use var) + id = (Component_factory) ( _orb, _poa, _id, instanceName.c_str(), + aGenRegisterName.c_str() ) ; + + // --- get reference & servant from id + CORBA::Object_var obj = _poa->id_to_reference(*id); + iobject = Engines::Component::_narrow(obj) ; + + Engines_Component_i *servant = + dynamic_cast(_poa->reference_to_servant(iobject)); + ASSERT(servant); + servant->_remove_ref(); // compensate previous id_to_reference + _listInstances_map[instanceName] = iobject; + _cntInstances_map[aGenRegisterName] += 1; + bool ret_studyId = servant->setStudyId(studyId); + ASSERT(ret_studyId); + + // --- register the engine under the name + // containerName(.dir)/instanceName(.object) + _NS->Register(iobject , component_registerName.c_str()) ; + MESSAGE( component_registerName.c_str() << " bound" ) ; + } + catch (...) + { + INFOS( "Container_i::createInstance exception catched" ) ; + } + return iobject._retn(); +} + +Engines::Component_ptr +Engines_Parallel_Container_i::createParallelInstance(string genericRegisterName, + void *handle, + int studyId) +{ + cerr << "----------------- createParallelInstance node : " << getMyRank() << endl; + // --- create instance + Engines::Component_var iobject = Engines::Component::_nil(); + string aGenRegisterName = genericRegisterName; + + ////////////////////////////////////////////////////////////////////////// + // 1: Proxy Step + // Node 0 create the proxy + if (getMyRank() == 0) { + // --- find the factory + string factory_name = aGenRegisterName + string("EngineProxy_factory"); + + typedef PortableServer::ObjectId * (*FACTORY_FUNCTION) + (CORBA::ORB_ptr, + PortableServer::POA_ptr, + PortableServer::ObjectId *, + const char *, + int) ; + + FACTORY_FUNCTION Component_factory + = (FACTORY_FUNCTION) dlsym(handle, factory_name.c_str()); + + char *error ; + if ( (error = dlerror() ) != NULL) { + INFOS("Can't resolve symbol: " + factory_name); + SCRUTE(error); + return Engines::Component::_nil(); + } + try { + _numInstanceMutex.lock() ; // lock on the instance number + _numInstance++ ; + int numInstance = _numInstance ; + _numInstanceMutex.unlock() ; + + char aNumI[12]; + sprintf( aNumI , "%d" , numInstance ) ; + string instanceName = aGenRegisterName + "_inst_" + aNumI ; + string component_registerName = + _containerName + "/" + instanceName; + + // --- Instanciate required CORBA object + PortableServer::ObjectId *id ; //not owner, do not delete (nore use var) + id = (Component_factory) ( _orb, _poa, _id, instanceName.c_str(), getTotalNode()) ; + + // --- get reference & servant from id + CORBA::Object_var obj = _poa->id_to_reference(*id); + iobject = Engines::Component::_narrow(obj) ; + + _listInstances_map[instanceName] = iobject; + _cntInstances_map[aGenRegisterName] += 1; + + // --- register the engine under the name + // containerName(.dir)/instanceName(.object) + _NS->Register(iobject , component_registerName.c_str()) ; + MESSAGE( component_registerName.c_str() << " bound" ) ; + } + catch (...) + { + INFOS( "Container_i::createParallelInstance exception catched in Proxy creation" ) ; + } + } + else { + // We have to have the same numIntance to be able to get the proxy reference + // in the nameing service. + _numInstanceMutex.lock() ; // lock on the instance number + _numInstance++ ; + int numInstance = _numInstance ; + _numInstanceMutex.unlock() ; + } + cerr << "Node " << getMyRank() << " entering in paco_barrier()" << endl; + global_paco_context_ptr->my_com->paco_barrier(); + cerr << "Node " << getMyRank() << " quitting paco_barrier()" << endl; + + ////////////////////////////////////////////////////////////////////////// + // 2: Nodes Step + + char * proxy_ior; + Engines::Component_PaCO_var iobject2; + + char aNumI[12]; + sprintf( aNumI , "%d" , _numInstance ) ; + string instanceName = aGenRegisterName + "_inst_" + aNumI ; + + string component_registerName = _containerName + "/" + instanceName; + string hostname = GetHostname(); + + CORBA::Object_var temp = _NS->Resolve(component_registerName.c_str()); + Engines::Component_var obj_proxy = Engines::Component::_narrow(temp); + proxy_ior = _orb->object_to_string(obj_proxy); + + // --- find the factory + string factory_name = aGenRegisterName + string("Engine_factory"); + + typedef PortableServer::ObjectId * (*FACTORY_FUNCTION) + (CORBA::ORB_ptr, char * ior, + PortableServer::POA_ptr, + PortableServer::ObjectId *, + const char *, + const char *) ; + + FACTORY_FUNCTION Component_factory + = (FACTORY_FUNCTION) dlsym(handle, factory_name.c_str()); + + char *error ; + if ( (error = dlerror() ) != NULL) + { + INFOS("Can't resolve symbol: " + factory_name); + SCRUTE(error); + return Engines::Component::_nil() ; + } + try + { + char aNumI2[12]; + sprintf(aNumI2 , "%d" , getMyRank()) ; + string instanceName = aGenRegisterName + "_inst_node_" + aNumI2; + string component_registerName = _containerName + aNumI2 + "/" + instanceName; + + // --- Instanciate required CORBA object + + PortableServer::ObjectId *id ; //not owner, do not delete (nore use var) + id = (Component_factory) ( _orb, proxy_ior, _poa, _id, instanceName.c_str(), + aGenRegisterName.c_str() ) ; + + // --- get reference & servant from id + CORBA::Object_var obj = _poa->id_to_reference(*id); + iobject2 = Engines::Component_PaCO::_narrow(obj) ; + + // --- register the engine under the name + _NS->Register(iobject2 , component_registerName.c_str()) ; + MESSAGE( component_registerName.c_str() << " bound" ) ; + } + catch (...) + { + INFOS( "Container_i::createParallelInstance exception catched" ) ; + } + + ////////////////////////////////////////////////////////////////////////// + // 3: Deployment Step + + iobject2->deploy(getMyRank()); + global_paco_context_ptr->my_com->paco_barrier(); + cerr << "--------- createParallelInstance : End Deploy step ----------" << endl; + + // return obj_proxy._retn(); + return iobject._retn(); +} + +//============================================================================= +/*! + * + */ +//============================================================================= + +void Engines_Parallel_Container_i::decInstanceCnt(string genericRegisterName) +{ + string aGenRegisterName =genericRegisterName; + MESSAGE("Engines_Parallel_Container_i::decInstanceCnt " << aGenRegisterName); + ASSERT(_cntInstances_map[aGenRegisterName] > 0); + _numInstanceMutex.lock(); // lock to be alone + // (see finalize_removal, load_component_Library) + _cntInstances_map[aGenRegisterName] -= 1; + SCRUTE(_cntInstances_map[aGenRegisterName]); + if (_cntInstances_map[aGenRegisterName] == 0) + { + string impl_name = + Engines_Component_i::GetDynLibraryName(aGenRegisterName.c_str()); + SCRUTE(impl_name); + void* handle = _library_map[impl_name]; + ASSERT(handle); + _toRemove_map[impl_name] = handle; + } + _numInstanceMutex.unlock(); +} + +//============================================================================= +/*! + * Retrieves only with container naming convention if it is a python container + */ +//============================================================================= + +bool Engines_Parallel_Container_i::isPythonContainer(const char* ContainerName) +{ + bool ret=false; + int len=strlen(ContainerName); + if(len>=2) + if(strcmp(ContainerName+len-2,"Py")==0) + ret=true; + return ret; +} + +//============================================================================= +/*! + * + */ +//============================================================================= + +void ActSigIntHandler() +{ +#ifndef WNT + struct sigaction SigIntAct ; + SigIntAct.sa_sigaction = &SigIntHandler ; + SigIntAct.sa_flags = SA_SIGINFO ; +#endif + + // DEBUG 03.02.2005 : the first parameter of sigaction is not a mask of signals + // (SIGINT | SIGUSR1) : + // it must be only one signal ===> one call for SIGINT + // and an other one for SIGUSR1 +#ifndef WNT + if ( sigaction( SIGINT , &SigIntAct, NULL ) ) { + perror("SALOME_Container main ") ; + exit(0) ; + } + if ( sigaction( SIGUSR1 , &SigIntAct, NULL ) ) { + perror("SALOME_Container main ") ; + exit(0) ; + } + //PAL9042 JR : during the execution of a Signal Handler (and of methods called through Signal Handlers) + // use of streams (and so on) should never be used because : + // streams of C++ are naturally thread-safe and use pthread_mutex_lock ===> + // A stream operation may be interrupted by a signal and if the Handler use stream we + // may have a "Dead-Lock" ===HangUp + //==INFOS is commented + // INFOS(pthread_self() << "SigIntHandler activated") ; +#else + signal( SIGINT, SigIntHandler ); + signal( SIGUSR1, SigIntHandler ); +#endif + +} + +void SetCpuUsed() ; + +#ifndef WNT +void SigIntHandler(int what , siginfo_t * siginfo , + void * toto ) { + //PAL9042 JR : during the execution of a Signal Handler (and of methods called through Signal Handlers) + // use of streams (and so on) should never be used because : + // streams of C++ are naturally thread-safe and use pthread_mutex_lock ===> + // A stream operation may be interrupted by a signal and if the Handler use stream we + // may have a "Dead-Lock" ===HangUp + //==MESSAGE is commented + // MESSAGE(pthread_self() << "SigIntHandler what " << what << endl + // << " si_signo " << siginfo->si_signo << endl + // << " si_code " << siginfo->si_code << endl + // << " si_pid " << siginfo->si_pid) ; + if ( _Sleeping ) { + _Sleeping = false ; + // MESSAGE("SigIntHandler END sleeping.") ; + return ; + } + else { + ActSigIntHandler() ; + if ( siginfo->si_signo == SIGUSR1 ) { + SetCpuUsed() ; + } + else { + _Sleeping = true ; + // MESSAGE("SigIntHandler BEGIN sleeping.") ; + int count = 0 ; + while( _Sleeping ) { + sleep( 1 ) ; + count += 1 ; + } + // MESSAGE("SigIntHandler LEAVE sleeping after " << count << " s.") ; + } + return ; + } +} +#else // Case WNT +void SigIntHandler( int what ) { + MESSAGE( pthread_self() << "SigIntHandler what " << what << endl ); + if ( _Sleeping ) { + _Sleeping = false ; + MESSAGE("SigIntHandler END sleeping.") ; + return ; + } + else { + ActSigIntHandler() ; + if ( what == SIGUSR1 ) { + SetCpuUsed() ; + } + else { + _Sleeping = true ; + MESSAGE("SigIntHandler BEGIN sleeping.") ; + int count = 0 ; + while( _Sleeping ) { + Sleep( 1000 ) ; + count += 1 ; + } + MESSAGE("SigIntHandler LEAVE sleeping after " << count << " s.") ; + } + return ; + } +} +#endif + +//============================================================================= +/*! + * CORBA method: get or create a fileRef object associated to a local file + * (a file on the computer on which runs the container server), which stores + * a list of (machine, localFileName) corresponding to copies already done. + * + * \param origFileName absolute path for a local file to copy on other + * computers + * \return a fileRef object associated to the file. + */ +//============================================================================= + +Engines::fileRef_ptr +Engines_Parallel_Container_i::createFileRef(const char* origFileName) +{ + string origName(origFileName); + Engines::fileRef_var theFileRef = Engines::fileRef::_nil(); + + if (origName[0] != '/') + { + INFOS("path of file to copy must be an absolute path begining with '/'"); + return Engines::fileRef::_nil(); + } + + if (CORBA::is_nil(_fileRef_map[origName])) + { + CORBA::Object_var obj=_poa->id_to_reference(*_id); + Engines::Container_var pCont = Engines::Container::_narrow(obj); + fileRef_i* aFileRef = new fileRef_i(pCont, origFileName); + theFileRef = Engines::fileRef::_narrow(aFileRef->_this()); + _numInstanceMutex.lock() ; // lock to be alone (stl container write) + _fileRef_map[origName] = theFileRef; + _numInstanceMutex.unlock() ; + } + + theFileRef = Engines::fileRef::_duplicate(_fileRef_map[origName]); + ASSERT(! CORBA::is_nil(theFileRef)); + return theFileRef._retn(); +} + +//============================================================================= +/*! + * CORBA method: + * \return a reference to the fileTransfer object + */ +//============================================================================= + +Engines::fileTransfer_ptr +Engines_Parallel_Container_i::getFileTransfer() +{ + Engines::fileTransfer_var aFileTransfer + = Engines::fileTransfer::_duplicate(_fileTransfer); + return aFileTransfer._retn(); +} + + + diff --git a/src/ParallelContainer/SALOME_ParallelContainer_i.hxx b/src/ParallelContainer/SALOME_ParallelContainer_i.hxx new file mode 100644 index 000000000..254d60740 --- /dev/null +++ b/src/ParallelContainer/SALOME_ParallelContainer_i.hxx @@ -0,0 +1,159 @@ +// SALOME_ParallelContainer : implementation of container and engine for Parallel Kernel +// +// Copyright (C) 2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN, +// CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2.1 of the License. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org +// +// File : SALOME_ParallelContainer_i.hxx +// Author : André RIBES, EDF +// Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA + +#ifndef _SALOME_PARALLEL_CONTAINER_I_HXX_ +#define _SALOME_PARALLEL_CONTAINER_I_HXX_ + +#include + +#include "SALOME_ComponentPaCO_Engines_Container_server.h" + +#include +#include +#include +#ifndef WNT +#include +#endif +#include +#include +#include +#include + +class SALOME_NamingService; + + +#if defined CONTAINER_EXPORTS +#if defined WIN32 +#define CONTAINER_EXPORT __declspec( dllexport ) +#else +#define CONTAINER_EXPORT +#endif +#else +#if defined WNT +#define CONTAINER_EXPORT __declspec( dllimport ) +#else +#define CONTAINER_EXPORT +#endif +#endif + +class CONTAINER_EXPORT Engines_Parallel_Container_i: + /* public virtual POA_Engines::Container, */ + public Engines::Container_serv, + public virtual PortableServer::RefCountServantBase +{ +public: + Engines_Parallel_Container_i(CORBA::ORB_ptr orb, char * ior); + Engines_Parallel_Container_i(CORBA::ORB_ptr orb, char * ior, + PortableServer::POA_ptr poa, + char * containerName , + int argc, char* argv[], + bool activAndRegist = true, + bool isServantAloneInProcess = true); + virtual ~Engines_Parallel_Container_i(); + + // --- CORBA methods + + virtual bool load_component_Library(const char* componentName); + + virtual Engines::Component_ptr + create_component_instance( const char* componentName, + CORBA::Long studyId); // 0 for multiStudy + + Engines::Component_ptr + find_component_instance( const char* registeredName, + CORBA::Long studyId); // 0 for multiStudy + + Engines::Component_ptr + load_impl(const char* nameToRegister, + const char* componentName); + + + void remove_impl(Engines::Component_ptr component_i); + void finalize_removal(); + + virtual void ping(); + char* name(); + virtual void Shutdown(); + char* getHostName(); + CORBA::Long getPID(); + //! Kill current container + bool Kill_impl() ; + + // --- local C++ methods + + Engines::Component_ptr + find_or_create_instance( std::string genericRegisterName, + std::string componentLibraryName); + + Engines::Component_ptr + createInstance(std::string genericRegisterName, + void *handle, + int studyId); + + Engines::Component_ptr + createParallelInstance(std::string genericRegisterName, + void *handle, + int studyId); + + static bool isPythonContainer(const char* ContainerName); + static void decInstanceCnt(std::string genericRegisterName); + + // --- needed for parallel components, Numerical Platon + + int getArgc() { return _argc; } + char **getArgv() { return _argv; } + + void set_id(PortableServer::ObjectId * id) { _id = id;} + + Engines::fileRef_ptr createFileRef(const char* origFileName); + Engines::fileTransfer_ptr getFileTransfer(); +protected: + + static std::map _cntInstances_map; + static std::map _library_map; // library names, loaded + static std::map _toRemove_map;// library names to remove + static omni_mutex _numInstanceMutex ; // lib and instance protection + + bool _isSupervContainer; + + SALOME_NamingService *_NS ; + std::string _library_path; + std::string _containerName; + CORBA::ORB_var _orb; + PortableServer::POA_var _poa; + PortableServer::ObjectId * _id ; + int _numInstance ; + std::map _listInstances_map; + std::map _fileRef_map; + Engines::fileTransfer_var _fileTransfer; + + int _argc ; + char** _argv ; + long _pid; + bool _isServantAloneInProcess; +}; + +#endif + -- 2.39.2