1 // SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel
3 // Copyright (C) 2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org
22 // File : SALOME_ParallelComponent_i.cxx
23 // Author : André RIBES, EDF
24 // Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
26 #include "SALOME_ParallelComponent_i.hxx"
27 #include "SALOME_ParallelContainer_i.hxx"
35 #include "utilities.h"
39 #include <sys/resource.h>
42 #include <sys/timeb.h>
46 #include <paco_dummy.h>
50 extern bool _Sleeping ;
51 static Engines_Parallel_Component_i * theEngines_Component ;
53 bool Engines_Parallel_Component_i::_isMultiStudy = true;
54 bool Engines_Parallel_Component_i::_isMultiInstance = false;
56 //=============================================================================
58 * Default constructor, not for use
60 //=============================================================================
62 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior) :
63 InterfaceParallel_impl(orb,ior), Engines::Component_serv(orb,ior), Engines::Parallel_Component_serv(orb,ior)
66 INFOS("Default Constructor...");
69 //=============================================================================
71 * Standard Constructor for generic Component, used in derived class
72 * Connection to Registry and Notification
73 * \param orb Object Request broker given by Container
74 * \parap poa Portable Object Adapter from Container (normally root_poa)
75 * \param contId container CORBA id inside the server
76 * \param instanceName unique instance name for this object (see Container_i)
77 * \param interfaceName component class name
78 * \param notif use of notification
80 //=============================================================================
82 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior,
83 PortableServer::POA_ptr poa,
84 PortableServer::ObjectId * contId,
85 const char *instanceName,
86 const char *interfaceName,
88 InterfaceParallel_impl(orb,ior),
89 Engines::Component_serv(orb,ior),
90 Engines::Parallel_Component_serv(orb,ior),
91 _instanceName(instanceName),
92 _interfaceName(interfaceName),
93 _myConnexionToRegistry(0),
101 MESSAGE("Component constructor with instanceName "<< _instanceName);
102 //SCRUTE(pd_refCount);
103 _orb = CORBA::ORB::_duplicate(orb);
104 _poa = PortableServer::POA::_duplicate(poa);
106 CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior...
107 const CORBA::String_var the_ior = _orb->object_to_string(o);
108 _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession",
109 _instanceName.c_str());
111 _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
113 deploy_mutex = new pthread_mutex_t();
114 pthread_mutex_init(deploy_mutex, NULL);
116 //SCRUTE(pd_refCount);
119 //=============================================================================
121 * Destructor: call Container for decrement of instances count.
122 * When instances count falls to 0, the container tries to remove the
123 * component library (dlclose)
125 //=============================================================================
127 Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
129 MESSAGE("Component destructor");
130 Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
131 pthread_mutex_destroy(deploy_mutex);
138 //=============================================================================
140 * CORBA method: return name of the instance, unique in this Container
142 //=============================================================================
144 char* Engines_Parallel_Component_i::instanceName()
146 return CORBA::string_dup(_instanceName.c_str()) ;
149 //=============================================================================
151 * CORBA method: return name of the component class
153 //=============================================================================
155 char* Engines_Parallel_Component_i::interfaceName()
157 return CORBA::string_dup(_interfaceName.c_str()) ;
160 //=============================================================================
162 * CORBA method: Get study Id
163 * \return -1: not initialised (Internal Error)
164 * 0: multistudy component instance
165 * >0: study id associated to this instance
167 //=============================================================================
169 CORBA::Long Engines_Parallel_Component_i::getStudyId()
174 //=============================================================================
176 * CORBA method: Test if instance is alive and responds
178 //=============================================================================
180 void Engines_Parallel_Component_i::ping()
182 // MESSAGE("Engines_Parallel_Component_i::ping_c() pid "<< getpid() << " threadid " << pthread_self());
185 //=============================================================================
187 * CORBA method: Deactivate this instance. CORBA object is deactivated (do not
188 * respond any more to CORBA calls), the connection to Regsitry is removed
189 * (Registry informed of deactivation), internal server reference counter on
190 * the derived servant class is decremented, to allow destruction of the class
191 * (delete) by POA, when there are no more references.
192 * -- TO BE USED BY CONTAINER ONLY (Container housekeeping) --
194 //=============================================================================
196 void Engines_Parallel_Component_i::destroy()
198 MESSAGE("Engines_Parallel_Component_i::destroy()");
199 //SCRUTE(pd_refCount);
201 delete _notifSupplier;
204 delete _myConnexionToRegistry;
205 _myConnexionToRegistry = 0 ;
206 _poa->deactivate_object(*_id) ;
207 CORBA::release(_poa) ;
209 //SCRUTE(pd_refCount);
210 _thisObj->_remove_ref();
211 //SCRUTE(pd_refCount);
212 MESSAGE("Engines_Parallel_Component_i::destroyed") ;
215 //=============================================================================
217 * CORBA method: return CORBA reference of the Container
220 //=============================================================================
222 Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef()
224 MESSAGE("Engines_Parallel_Component_i::GetContainerRef");
225 CORBA::Object_ptr o = _poa->id_to_reference(*_contId) ;
226 return Engines::Container::_narrow(o);
229 //=============================================================================
232 * Gives a sequence of (key=string,value=any) to the component.
233 * Base class component stores the sequence in a map.
234 * The map is cleared before.
235 * This map is for use by derived classes.
236 * \param dico sequence of (key=string,value=any)
238 //=============================================================================
240 void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico)
243 for (CORBA::ULong i=0; i<dico.length(); i++)
245 std::string cle(dico[i].key);
246 _fieldsDict[cle] = dico[i].value;
250 //=============================================================================
253 * returns a previously stored map (key=string,value=any) as a sequence.
254 * (see setProperties)
256 //=============================================================================
258 Engines::FieldsDict* Engines_Parallel_Component_i::getProperties()
260 Engines::FieldsDict_var copie = new Engines::FieldsDict;
261 copie->length(_fieldsDict.size());
262 map<std::string,CORBA::Any>::iterator it;
264 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++)
266 std::string cle((*it).first);
267 copie[i].key = CORBA::string_dup(cle.c_str());
268 copie[i].value = _fieldsDict[cle];
270 return copie._retn();
273 //=============================================================================
275 * CORBA method: used by Supervision to give names to this instance
277 //=============================================================================
279 void Engines_Parallel_Component_i::Names( const char * graphName ,
280 const char * nodeName )
282 _graphName = graphName ;
283 _nodeName = nodeName ;
284 INFOS("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '"
285 << _nodeName << "' )");
288 //=============================================================================
290 * CORBA method: used in Supervision
292 //=============================================================================
294 bool Engines_Parallel_Component_i::Kill_impl()
296 // MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self()
297 // << " pid " << getpid() << " instanceName "
298 // << _instanceName.c_str() << " interface " << _interfaceName.c_str()
299 // << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
300 // << dec << " _ThreadId " << _ThreadId << " this " << hex << this
303 bool RetVal = false ;
305 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
307 RetVal = Killer( _ThreadId , 0 ) ;
308 _ThreadId = (pthread_t ) -1 ;
312 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
314 RetVal = Killer( *_ThreadId , 0 ) ;
315 _ThreadId = (pthread_t* ) 0 ;
322 //=============================================================================
324 * CORBA method: used in Supervision
326 //=============================================================================
328 bool Engines_Parallel_Component_i::Stop_impl()
330 MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self()
331 << " pid " << getpid() << " instanceName "
332 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
333 << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
334 << dec << " _ThreadId " << _ThreadId );
337 bool RetVal = false ;
339 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
341 RetVal = Killer( _ThreadId , 0 ) ;
342 _ThreadId = (pthread_t ) -1 ;
345 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
347 RetVal = Killer( *_ThreadId , 0 ) ;
348 _ThreadId = (pthread_t* ) 0 ;
354 //=============================================================================
356 * CORBA method: used in Supervision
358 //=============================================================================
360 bool Engines_Parallel_Component_i::Suspend_impl()
362 MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self()
363 << " pid " << getpid() << " instanceName "
364 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
365 << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
366 << dec << " _ThreadId " << _ThreadId );
368 bool RetVal = false ;
370 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
372 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
382 RetVal = Killer( _ThreadId ,SIGINT ) ;
384 RetVal = Killer( *_ThreadId ,SIGINT ) ;
386 //if ( RetVal ) _Sleeping = true;
393 //=============================================================================
395 * CORBA method: used in Supervision
397 //=============================================================================
399 bool Engines_Parallel_Component_i::Resume_impl()
401 MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self()
402 << " pid " << getpid() << " instanceName "
403 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
404 << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
405 << dec << " _ThreadId " << _ThreadId );
406 bool RetVal = false ;
408 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
410 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
426 //=============================================================================
430 //=============================================================================
432 CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl()
435 if ( _ThreadId || _Executed )
440 if ( pthread_self() != _ThreadId )
442 if ( pthread_self().p != _ThreadId->p )
450 // Get Cpu in the appropriate thread with that object !...
451 theEngines_Component = this ;
453 Killer( _ThreadId ,SIGUSR1 ) ;
455 Killer( *_ThreadId ,SIGUSR11 ) ;
458 cpu = _ThreadCpuUsed ;
462 _ThreadCpuUsed = CpuUsed() ;
463 cpu = _ThreadCpuUsed ;
464 // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
465 // << _serviceName << " " << cpu << endl ;
470 cpu = _ThreadCpuUsed ;
471 // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
472 // << _serviceName << " " << cpu<< endl ;
477 // cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId "
478 // <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<<endl;
484 //=============================================================================
486 * C++ method: return Container Servant
488 //=============================================================================
490 Engines_Parallel_Container_i *Engines_Parallel_Component_i::GetContainerPtr()
492 return dynamic_cast<Engines_Parallel_Container_i*>(_poa->id_to_servant(*_contId)) ;
495 //=============================================================================
497 * C++ method: set study Id
498 * \param studyId 0 if instance is not associated to a study,
499 * >0 otherwise (== study id)
500 * \return true if the set of study Id is OK
501 * must be set once by Container, at instance creation,
502 * and cannot be changed after.
504 //=============================================================================
506 CORBA::Boolean Engines_Parallel_Component_i::setStudyId(CORBA::Long studyId)
508 ASSERT( studyId >= 0);
509 CORBA::Boolean ret = false;
510 if (_studyId < 0) // --- not yet initialized
516 if ( _studyId == studyId) ret = true;
520 //=============================================================================
522 * C++ method: return CORBA instance id, the id is set in derived class
523 * constructor, when instance is activated.
525 //=============================================================================
527 PortableServer::ObjectId * Engines_Parallel_Component_i::getId()
529 // MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()");
533 //=============================================================================
535 * C++ method: used by derived classes for supervision
537 //=============================================================================
539 void Engines_Parallel_Component_i::beginService(const char *serviceName)
541 MESSAGE(pthread_self() << "Send BeginService notification for " <<serviceName
542 << endl << "Component instance : " << _instanceName << endl << endl);
544 _ThreadId = pthread_self() ;
546 _ThreadId = new pthread_t;
547 _ThreadId->p = pthread_self().p ;
548 _ThreadId->x = pthread_self().x ;
551 _StartUsed = CpuUsed_impl() ;
554 _serviceName = serviceName ;
555 if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) )
557 perror("pthread_setcanceltype ") ;
560 if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) )
562 perror("pthread_setcancelstate ") ;
565 // MESSAGE(pthread_self() << " Return from BeginService for " << serviceName
566 // << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed
567 // << " _graphName " << _graphName << " _nodeName " << _nodeName );
569 // --- for supervisor : all strings given with setProperties
570 // are set in environment
571 bool overwrite = true;
572 map<std::string,CORBA::Any>::iterator it;
573 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++)
575 std::string cle((*it).first);
576 if ((*it).second.type()->kind() == CORBA::tk_string)
579 (*it).second >>= value;
580 // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
582 //int ret = setenv(cle.c_str(), value, overwrite);
583 setenv(cle.c_str(), value, overwrite);
585 //CCRT porting : setenv not defined in stdlib.h
589 // char* cast because 1st arg of linux putenv function
590 // is not a const char* !
591 //int ret=putenv((char *)s.c_str());
592 putenv((char *)s.c_str());
593 //End of CCRT porting
595 MESSAGE("--- setenv: "<<cle<<" = "<< value);
600 //=============================================================================
602 * C++ method: used by derived classes for supervision
604 //=============================================================================
606 void Engines_Parallel_Component_i::endService(const char *serviceName)
608 _ThreadCpuUsed = CpuUsed_impl() ;
609 MESSAGE(pthread_self() << " Send EndService notification for " << serviceName
610 << endl << " Component instance : " << _instanceName << " StartUsed "
611 << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
615 //=============================================================================
617 * C++ method: -- CHECK IF USED --
619 //=============================================================================
621 char* Engines_Parallel_Component_i::graphName()
623 return CORBA::string_dup( _graphName.c_str() ) ;
626 //=============================================================================
628 * C++ method: -- CHECK IF USED --
630 //=============================================================================
632 char* Engines_Parallel_Component_i::nodeName()
634 return CORBA::string_dup( _nodeName.c_str() ) ;
637 //=============================================================================
639 * C++ method: used in Supervision (see kill_impl)
641 //=============================================================================
643 bool Engines_Parallel_Component_i::Killer( pthread_t ThreadId , int signum )
653 if ( pthread_cancel( ThreadId ) )
655 perror("Killer pthread_cancel error") ;
660 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
661 << " pthread_canceled") ;
666 if ( pthread_kill( ThreadId , signum ) == -1 )
668 perror("Killer pthread_kill error") ;
673 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
674 << " pthread_killed(" << signum << ")") ;
681 //=============================================================================
685 //=============================================================================
689 theEngines_Component->SetCurCpu() ;
692 //=============================================================================
696 //=============================================================================
698 void Engines_Parallel_Component_i::SetCurCpu()
700 _ThreadCpuUsed = CpuUsed() ;
701 // MESSAGE(pthread_self() <<
702 // " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ;
705 //=============================================================================
709 //=============================================================================
711 long Engines_Parallel_Component_i::CpuUsed()
715 struct rusage usage ;
716 if ( _ThreadId || _Executed )
718 if ( getrusage( RUSAGE_SELF , &usage ) == -1 )
720 perror("Engines_Parallel_Component_i::CpuUsed") ;
723 cpu = usage.ru_utime.tv_sec - _StartUsed ;
724 // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " "
725 // << _serviceName << usage.ru_utime.tv_sec << " - " << _StartUsed
726 // << " = " << cpu << endl ;
730 // cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId "
731 // << _ThreadId << " " << _serviceName<< " _StartUsed "
732 // << _StartUsed << endl ;
735 // NOT implementet yet
742 //=============================================================================
744 * C++ method: Send message to event channel
746 //=============================================================================
748 void Engines_Parallel_Component_i::sendMessage(const char *event_type,
751 _notifSupplier->Send(graphName(), nodeName(), event_type, message);
754 //=============================================================================
756 * C++ method: return standard library name built on component name
758 //=============================================================================
760 string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName)
768 //=============================================================================
770 * C++ method: DumpPython default implementation
772 //=============================================================================
774 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr theStudy,
775 CORBA::Boolean isPublished,
776 CORBA::Boolean& isValidScript)
778 const char* aScript = "def RebuildData(theStudy): pass";
779 char* aBuffer = new char[strlen(aScript)+1];
780 strcpy(aBuffer, aScript);
781 CORBA::Octet* anOctetBuf = (CORBA::Octet*)aBuffer;
782 int aBufferSize = strlen(aBuffer)+1;
783 Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1);
784 isValidScript = true;
785 return aStreamFile._retn();
789 Engines::Salome_file_ptr
790 Engines_Parallel_Component_i::setInputFileToService(const char* service_name,
791 const char* Salome_file_name)
793 // Try to find the service, if it doesn't exist, we add it.
794 _Service_file_map_it = _Input_Service_file_map.find(service_name);
795 if (_Service_file_map_it == _Input_Service_file_map.end()) {
796 _t_Salome_file_map * _map = new _t_Salome_file_map();
797 _Input_Service_file_map[service_name] = _map;
798 _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
799 _Proxy_Input_Service_file_map[service_name] = _proxy_map;
800 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
801 _IOR_Proxy_Input_Service_file_map[service_name] = _IOR_proxy_map;
803 _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
804 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
805 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Input_Service_file_map[service_name];
807 pthread_mutex_lock(deploy_mutex);
808 std::string proxy_ior;
810 // Try to find the Salome_file ...
811 _Salome_file_map_it = _map->find(Salome_file_name);
812 if (_Salome_file_map_it == _map->end()) {
814 // We create a new PaCO++ object.
815 // He has the same configuration than
818 // Firstly, we have to create the proxy object
819 // of the Salome_file and transmit his
820 // reference to the other nodes.
821 if (getMyRank() == 0) {
822 Engines::Parallel_Salome_file_proxy_impl * proxy =
823 new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb));
824 PaCO_operation * proxy_global_ptr = proxy->getContext("global_paco_context");
825 // We initialize the object with the context of the Parallel component
826 PaCO_operation * compo_global_ptr = getContext("global_paco_context");
827 //compo_global_ptr->init_context(proxy_global_ptr);
828 proxy_global_ptr->init_context(compo_global_ptr);
830 paco_fabrique_manager* pfm = paco_getFabriqueManager();
831 pfm->register_com("dummy", new paco_dummy_fabrique());
832 proxy_global_ptr->setComFab(NULL);
833 proxy_global_ptr->setLibCom("dummy",NULL);
835 proxy_global_ptr->setTypeClient(true);
836 PaCO::PacoTopology_t client_topo;
837 client_topo.total = 1;
838 proxy_global_ptr->setClientTopo(client_topo);
839 PaCO::PacoTopology_t serveur_topo;
840 serveur_topo.total = getTotalNode();
841 proxy->setTopo(serveur_topo);
843 // We register the CORBA objet into the POA
844 CORBA::Object_ptr proxy_ref = proxy->_this();
846 // We send the reference to all the nodes...
847 CORBA::Object_ptr comp_proxy = _orb->string_to_object(_ior.c_str());
848 Engines::Parallel_Component_var component_proxy = Engines::Parallel_Component::_narrow(comp_proxy);
849 component_proxy->send_parallel_proxy_object(proxy_ref);
851 // Adding proxy into the map
852 (*_proxy_map)[Salome_file_name] = proxy;
855 this->wait_parallel_object_proxy();
858 proxy_ior = this->get_parallel_proxy_object();
859 (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
861 // We register each node of the parallel Salome_file object
863 for (int i = 0; i < getTotalNode(); i++) {
864 if (i == getMyRank()) {
865 Parallel_Salome_file_i * servant =
866 new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), proxy_ior.c_str());
867 PaCO_operation * servant_global_ptr = servant->getContext("global_paco_context");
869 // We initialize the object with the context of the Parallel component
870 PaCO_operation * compo_global_ptr = this->getContext("global_paco_context");
871 // compo_global_ptr->init_context(servant_global_ptr);
872 servant_global_ptr->init_context(compo_global_ptr);
874 // We register the CORBA objet into the POA
875 servant->POA_PaCO::InterfaceParallel::_this();
877 // Register the servant
878 servant->deploy(getMyRank());
880 // Adding servant to the map
881 (*_map)[Salome_file_name] = servant;
884 PaCO_operation * compo_global_ptr = this->getContext("global_paco_context");
885 compo_global_ptr->my_com->paco_barrier();
888 // Parallel_Salome_file is created and deployed
892 pthread_mutex_unlock(deploy_mutex);
893 proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
894 CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
895 return Engines::Salome_file::_narrow(proxy_ref);
898 Engines::Salome_file_ptr
899 Engines_Parallel_Component_i::setOutputFileToService(const char* service_name,
900 const char* Salome_file_name)
902 // Try to find the service, if it doesn't exist, we add it.
903 _Service_file_map_it = _Output_Service_file_map.find(service_name);
904 if (_Service_file_map_it == _Output_Service_file_map.end()) {
905 _t_Salome_file_map * _map = new _t_Salome_file_map();
906 _Output_Service_file_map[service_name] = _map;
907 _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
908 _Proxy_Output_Service_file_map[service_name] = _proxy_map;
909 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
910 _IOR_Proxy_Output_Service_file_map[service_name] = _IOR_proxy_map;
912 _t_Salome_file_map * _map = _Output_Service_file_map[service_name];
913 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Output_Service_file_map[service_name];
914 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Output_Service_file_map[service_name];
916 pthread_mutex_lock(deploy_mutex);
917 std::string proxy_ior;
919 // Try to find the Salome_file ...
920 _Salome_file_map_it = _map->find(Salome_file_name);
921 if (_Salome_file_map_it == _map->end()) {
923 // We create a new PaCO++ object.
924 // He has the same configuration than
927 // Firstly, we have to create the proxy object
928 // of the Salome_file and transmit his
929 // reference to the other nodes.
930 if (getMyRank() == 0) {
931 Engines::Parallel_Salome_file_proxy_impl * proxy =
932 new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb));
933 PaCO_operation * proxy_global_ptr = proxy->getContext("global_paco_context");
934 // We initialize the object with the context of the Parallel component
935 PaCO_operation * compo_global_ptr = getContext("global_paco_context");
936 //compo_global_ptr->init_context(proxy_global_ptr);
937 proxy_global_ptr->init_context(compo_global_ptr);
939 paco_fabrique_manager* pfm = paco_getFabriqueManager();
940 pfm->register_com("dummy", new paco_dummy_fabrique());
941 proxy_global_ptr->setComFab(NULL);
942 proxy_global_ptr->setLibCom("dummy",NULL);
944 proxy_global_ptr->setTypeClient(true);
945 PaCO::PacoTopology_t client_topo;
946 client_topo.total = 1;
947 proxy_global_ptr->setClientTopo(client_topo);
948 PaCO::PacoTopology_t serveur_topo;
949 serveur_topo.total = getTotalNode();
950 proxy->setTopo(serveur_topo);
952 // We register the CORBA objet into the POA
953 CORBA::Object_ptr proxy_ref = proxy->_this();
955 // We send the reference to all the nodes...
956 CORBA::Object_ptr comp_proxy = _orb->string_to_object(_ior.c_str());
957 Engines::Parallel_Component_var component_proxy = Engines::Parallel_Component::_narrow(comp_proxy);
958 component_proxy->send_parallel_proxy_object(proxy_ref);
960 // Adding proxy into the map
961 (*_proxy_map)[Salome_file_name] = proxy;
964 this->wait_parallel_object_proxy();
967 proxy_ior = this->get_parallel_proxy_object();
968 (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
970 // We register each node of the parallel Salome_file object
972 for (int i = 0; i < getTotalNode(); i++) {
973 if (i == getMyRank()) {
974 Parallel_Salome_file_i * servant =
975 new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), proxy_ior.c_str());
976 PaCO_operation * servant_global_ptr = servant->getContext("global_paco_context");
978 // We initialize the object with the context of the Parallel component
979 PaCO_operation * compo_global_ptr = this->getContext("global_paco_context");
980 // compo_global_ptr->init_context(servant_global_ptr);
981 servant_global_ptr->init_context(compo_global_ptr);
983 // We register the CORBA objet into the POA
984 servant->POA_PaCO::InterfaceParallel::_this();
986 // Register the servant
987 servant->deploy(getMyRank());
989 // Adding servant to the map
990 (*_map)[Salome_file_name] = servant;
993 PaCO_operation * compo_global_ptr = this->getContext("global_paco_context");
994 compo_global_ptr->my_com->paco_barrier();
997 // Parallel_Salome_file is created and deployed
1001 pthread_mutex_unlock(deploy_mutex);
1002 proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
1003 CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
1004 return Engines::Salome_file::_narrow(proxy_ref);
1007 Engines::Salome_file_ptr
1008 Engines_Parallel_Component_i::getInputFileToService(const char* service_name,
1009 const char* Salome_file_name)
1011 // Try to find the service, if it doesn't exist, we throw an exception.
1012 _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1013 if (_Proxy_Service_file_map_it == _Proxy_Input_Service_file_map.end()) {
1014 SALOME::ExceptionStruct es;
1015 es.type = SALOME::INTERNAL_ERROR;
1016 es.text = "service doesn't have salome files";
1017 throw SALOME::SALOME_Exception(es);
1019 _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
1021 // Try to find the Salome_file ...
1022 _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1023 if (_Proxy_Salome_file_map_it == _map->end()) {
1024 SALOME::ExceptionStruct es;
1025 es.type = SALOME::INTERNAL_ERROR;
1026 es.text = "service doesn't have this Salome_file";
1027 throw SALOME::SALOME_Exception(es);
1030 // Client get the proxy object
1031 Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1032 return Sfile->_this();
1035 Engines::Salome_file_ptr
1036 Engines_Parallel_Component_i::getOutputFileToService(const char* service_name,
1037 const char* Salome_file_name)
1039 // Try to find the service, if it doesn't exist, we throw an exception.
1040 _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1041 if (_Proxy_Service_file_map_it == _Proxy_Output_Service_file_map.end()) {
1042 SALOME::ExceptionStruct es;
1043 es.type = SALOME::INTERNAL_ERROR;
1044 es.text = "service doesn't have salome files";
1045 throw SALOME::SALOME_Exception(es);
1047 _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1049 // Try to find the Salome_file ...
1050 _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1051 if (_Proxy_Salome_file_map_it == _map->end()) {
1052 SALOME::ExceptionStruct es;
1053 es.type = SALOME::INTERNAL_ERROR;
1054 es.text = "service doesn't have this Salome_file";
1055 throw SALOME::SALOME_Exception(es);
1058 // Client get the proxy object
1059 Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1060 return Sfile->_this();
1065 Engines_Parallel_Component_i::checkInputFilesToService(const char* service_name)
1067 // Try to find the service, if it doesn't exist, nothing to do.
1068 _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1069 if (_Proxy_Service_file_map_it != _Proxy_Input_Service_file_map.end()) {
1070 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
1071 _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
1072 _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
1074 for(;begin!=end;begin++) {
1075 Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1076 std::string file_port_name = begin->first;
1077 configureSalome_file(service_name, file_port_name, file);
1084 Engines_Parallel_Component_i::checkOutputFilesToService(const char* service_name)
1086 // Try to find the service, if it doesn't exist, nothing to do.
1087 _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1088 if (_Proxy_Service_file_map_it != _Proxy_Output_Service_file_map.end()) {
1089 _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1090 _t_Proxy_Salome_file_map::iterator begin = _map->begin();
1091 _t_Proxy_Salome_file_map::iterator end = _map->end();
1093 for(;begin!=end;begin++) {
1094 Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1095 std::string file_port_name = begin->first;
1096 configureSalome_file(service_name, file_port_name, file);
1103 //=============================================================================
1105 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1107 //=============================================================================
1109 Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
1110 _proxy = _orb->object_to_string(proxy_ref);
1113 //=============================================================================
1115 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1117 //=============================================================================
1119 Engines_Parallel_Component_i::wait_parallel_object_proxy() {
1120 char * proxy = NULL;
1121 proxy = get_parallel_proxy_object();
1122 while(proxy == NULL)
1125 proxy = get_parallel_proxy_object();
1129 //=============================================================================
1131 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1133 //=============================================================================
1135 Engines_Parallel_Component_i::get_parallel_proxy_object() {
1140 //=============================================================================
1142 * C++ method: used to configure the Salome_file into the runtime.
1143 * \param service_name name of the service that use this Salome_file
1144 * \param file_port_name name of the Salome_file
1145 * \param file Parallel Salome_file C++ object
1147 //=============================================================================
1149 Engines_Parallel_Component_i::configureSalome_file(std::string service_name,
1150 std::string file_port_name,
1151 Engines::Parallel_Salome_file_proxy_impl * file)
1153 // By default this method does nothing