1 // Copyright (C) 2007-2024 CEA, EDF, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
23 // SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel
24 // File : SALOME_ParallelComponent_i.cxx
25 // Author : Andr� RIBES, EDF
26 // Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
28 #include "SALOME_ParallelComponent_i.hxx"
29 #include "SALOME_ParallelContainer_i.hxx"
37 #include "utilities.h"
38 #include "Basics_Utils.hxx"
42 #include <sys/resource.h>
45 #include <sys/timeb.h>
49 #include <paco_dummy.h>
50 #include <paco_omni.h>
53 extern bool _Sleeping ;
54 static Engines_Parallel_Component_i * theEngines_Component ;
56 bool Engines_Parallel_Component_i::_isMultiInstance = false;
58 //=============================================================================
60 * Standard Constructor for generic Component, used in derived class
61 * Connection to Registry and Notification
62 * \param orb Object Request broker given by Container
63 * \parap poa Portable Object Adapter from Container (normally root_poa)
64 * \param contId container CORBA id inside the server
65 * \param instanceName unique instance name for this object (see Container_i)
66 * \param interfaceName component class name
67 * \param notif use of notification
69 //=============================================================================
71 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank,
72 PortableServer::POA_ptr poa,
73 PortableServer::ObjectId * contId,
74 const char *instanceName,
75 const char *interfaceName,
78 InterfaceParallel_impl(orb,ior,rank),
79 Engines::EngineComponent_serv(orb,ior,rank),
80 Engines::EngineComponent_base_serv(orb,ior,rank),
81 Engines::Parallel_Component_serv(orb,ior,rank),
82 Engines::Parallel_Component_base_serv(orb,ior,rank),
83 _instanceName(instanceName),
84 _interfaceName(interfaceName),
86 _myConnexionToRegistry(0),
93 _CanceledThread(false)
95 MESSAGE("Parallel Component constructor with instanceName "<< _instanceName);
96 //SCRUTE(pd_refCount);
97 _orb = CORBA::ORB::_duplicate(orb);
98 _poa = PortableServer::POA::_duplicate(poa);
100 CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior...
104 CORBA::String_var the_ior = _orb->object_to_string(o);
105 _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession",
106 _instanceName.c_str());
108 _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
110 deploy_mutex = new pthread_mutex_t();
111 pthread_mutex_init(deploy_mutex, NULL);
113 //SCRUTE(pd_refCount);
116 //=============================================================================
118 * Destructor: call Container for decrement of instances count.
119 * When instances count falls to 0, the container tries to remove the
120 * component library (dlclose)
122 //=============================================================================
124 Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
126 MESSAGE("Parallel Component destructor");
127 Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
128 if(_myConnexionToRegistry)delete _myConnexionToRegistry;
129 if(_notifSupplier)delete _notifSupplier;
133 pthread_mutex_destroy(deploy_mutex);
139 //=============================================================================
141 * CORBA method: return name of the instance, unique in this Container
143 //=============================================================================
145 char* Engines_Parallel_Component_i::instanceName()
147 return CORBA::string_dup(_instanceName.c_str()) ;
150 //=============================================================================
152 * CORBA method: return name of the component class
154 //=============================================================================
156 char* Engines_Parallel_Component_i::interfaceName()
158 return CORBA::string_dup(_interfaceName.c_str()) ;
161 //=============================================================================
163 * CORBA method: Test if instance is alive and responds
165 //=============================================================================
167 void Engines_Parallel_Component_i::ping()
170 MESSAGE("Engines_Parallel_Component_i::ping() pid "<< getpid() << " threadid "
173 MESSAGE("Engines_Parallel_Component_i::ping() pid "<< _getpid()<< " threadid "
174 << pthread_self().p );
178 //=============================================================================
180 * CORBA method: Deactivate this instance. CORBA object is deactivated (do not
181 * respond any more to CORBA calls), the connection to Regsitry is removed
182 * (Registry informed of deactivation), internal server reference counter on
183 * the derived servant class is decremented, to allow destruction of the class
184 * (delete) by POA, when there are no more references.
185 * -- TO BE USED BY CONTAINER ONLY (Container housekeeping) --
187 //=============================================================================
189 void Engines_Parallel_Component_i::destroy()
191 MESSAGE("Engines_Parallel_Component_i::destroy()");
192 MESSAGE("Object Instance will be deleted when Shutdown of the container will be called");
200 //=============================================================================
202 * CORBA method: return CORBA reference of the Container
205 //=============================================================================
207 Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef()
209 MESSAGE("Engines_Parallel_Component_i::GetContainerRef");
210 CORBA::Object_var o = _poa->id_to_reference(*_contId) ;
211 return Engines::Container::_narrow(o);
214 //=============================================================================
217 * Gives a sequence of (key=string,value=any) to the component.
218 * Base class component stores the sequence in a map.
219 * The map is cleared before.
220 * This map is for use by derived classes.
221 * \param dico sequence of (key=string,value=any)
223 //=============================================================================
225 void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico)
228 for (CORBA::ULong i=0; i<dico.length(); i++)
230 std::string cle(dico[i].key);
231 _fieldsDict[cle] = dico[i].value;
235 //=============================================================================
238 * returns a previously stored map (key=string,value=any) as a sequence.
239 * (see setProperties)
241 //=============================================================================
243 Engines::FieldsDict* Engines_Parallel_Component_i::getProperties()
245 Engines::FieldsDict_var copie = new Engines::FieldsDict;
246 copie->length(_fieldsDict.size());
247 std::map<std::string,CORBA::Any>::iterator it;
249 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++)
251 std::string cle((*it).first);
252 copie[i].key = CORBA::string_dup(cle.c_str());
253 copie[i].value = _fieldsDict[cle];
255 return copie._retn();
258 //=============================================================================
260 * CORBA method: used by Supervision to give names to this instance
262 //=============================================================================
264 void Engines_Parallel_Component_i::Names( const char * graphName ,
265 const char * nodeName )
267 _graphName = graphName;
268 _nodeName = nodeName;
269 MESSAGE("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '"
270 << _nodeName << "' )");
273 //=============================================================================
275 * CORBA method: used in Supervision
277 //=============================================================================
279 bool Engines_Parallel_Component_i::Kill_impl()
281 // MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self()
282 // << " pid " << getpid() << " instanceName "
283 // << _instanceName.c_str() << " interface " << _interfaceName.c_str()
284 // << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
285 // << dec << " _ThreadId " << _ThreadId << " this " << hex << this
288 bool RetVal = false ;
290 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
292 RetVal = Killer( _ThreadId , SIGUSR2 ) ;
293 _ThreadId = (pthread_t ) -1 ;
297 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
299 RetVal = Killer( *_ThreadId , 0 ) ;
300 _ThreadId = (pthread_t* ) 0 ;
307 //=============================================================================
309 * CORBA method: used in Supervision
311 //=============================================================================
313 bool Engines_Parallel_Component_i::Stop_impl()
316 MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self()
317 << " pid " << getpid() << " instanceName "
318 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
319 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
320 << dec << " _ThreadId " << _ThreadId );
322 MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self().p
323 << " pid " << _getpid() << " instanceName "
324 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
325 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
326 << dec << " _ThreadId " << _ThreadId );
330 bool RetVal = false ;
332 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
334 RetVal = Killer( _ThreadId , 0 ) ;
335 _ThreadId = (pthread_t ) -1 ;
338 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
340 RetVal = Killer( *_ThreadId , 0 ) ;
341 _ThreadId = (pthread_t* ) 0 ;
347 //=============================================================================
349 * CORBA method: used in Supervision
351 //=============================================================================
353 bool Engines_Parallel_Component_i::Suspend_impl()
356 MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self()
357 << " pid " << getpid() << " instanceName "
358 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
359 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
360 << dec << " _ThreadId " << _ThreadId );
362 MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self().p
363 << " pid " << _getpid() << " instanceName "
364 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
365 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
366 << dec << " _ThreadId " << _ThreadId );
369 bool RetVal = false ;
371 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
373 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
383 RetVal = Killer( _ThreadId ,SIGINT ) ;
385 RetVal = Killer( *_ThreadId ,SIGINT ) ;
387 //if ( RetVal ) _Sleeping = true;
394 //=============================================================================
396 * CORBA method: used in Supervision
398 //=============================================================================
400 bool Engines_Parallel_Component_i::Resume_impl()
403 MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self()
404 << " pid " << getpid() << " instanceName "
405 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
406 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
407 << dec << " _ThreadId " << _ThreadId );
409 MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self().p
410 << " pid " << _getpid() << " instanceName "
411 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
412 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
413 << dec << " _ThreadId " << _ThreadId );
415 bool RetVal = false ;
417 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
419 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
435 //=============================================================================
439 //=============================================================================
441 CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl()
444 if ( _ThreadId || _Executed )
449 if ( pthread_self() != _ThreadId )
451 if ( pthread_self().p != _ThreadId->p )
459 // Get Cpu in the appropriate thread with that object !...
460 theEngines_Component = this ;
462 Killer( _ThreadId ,SIGUSR1 ) ;
464 Killer( *_ThreadId ,SIGUSR11 ) ;
467 cpu = _ThreadCpuUsed ;
471 _ThreadCpuUsed = CpuUsed() ;
472 cpu = _ThreadCpuUsed ;
473 // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
474 // << _serviceName << " " << cpu << std::endl ;
479 cpu = _ThreadCpuUsed ;
480 // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
481 // << _serviceName << " " << cpu<< std::endl ;
486 // std::cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId "
487 // <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<<std::endl;
493 //=============================================================================
495 * C++ method: return Container Servant
497 //=============================================================================
499 Engines_Parallel_Container_i *Engines_Parallel_Component_i::GetContainerPtr()
501 return dynamic_cast<Engines_Parallel_Container_i*>(_poa->id_to_servant(*_contId)) ;
504 //=============================================================================
506 * C++ method: return CORBA instance id, the id is set in derived class
507 * constructor, when instance is activated.
509 //=============================================================================
511 PortableServer::ObjectId * Engines_Parallel_Component_i::getId()
513 // MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()");
517 //=============================================================================
519 * C++ method: used by derived classes for supervision
521 //=============================================================================
523 void Engines_Parallel_Component_i::beginService(const char *serviceName)
526 MESSAGE(pthread_self() << "Send BeginService notification for " <<serviceName
527 << endl << "Parallel Component instance : " << _instanceName << endl << endl);
529 MESSAGE(pthread_self().p << "Send BeginService notification for " <<serviceName
530 << endl << "Parallel Component instance : " << _instanceName << endl << endl);
533 _ThreadId = pthread_self() ;
535 _ThreadId = new pthread_t;
536 _ThreadId->p = pthread_self().p ;
537 _ThreadId->x = pthread_self().x ;
540 _StartUsed = CpuUsed_impl() ;
543 _serviceName = serviceName ;
544 if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) )
546 perror("pthread_setcanceltype ") ;
549 if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) )
551 perror("pthread_setcancelstate ") ;
554 // MESSAGE(pthread_self() << " Return from BeginService for " << serviceName
555 // << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed
556 // << " _graphName " << _graphName << " _nodeName " << _nodeName );
558 // --- for supervisor : all strings given with setProperties
559 // are set in environment
560 bool overwrite = true;
561 std::map<std::string,CORBA::Any>::iterator it;
562 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++)
564 std::string cle((*it).first);
565 if ((*it).second.type()->kind() == CORBA::tk_string)
568 (*it).second >>= value;
569 // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
571 //int ret = setenv(cle.c_str(), value, overwrite);
572 setenv(cle.c_str(), value, overwrite);
574 //CCRT porting : setenv not defined in stdlib.h
578 // char* cast because 1st arg of linux putenv function
579 // is not a const char* !
580 //int ret=putenv((char *)s.c_str());
581 putenv((char *)s.c_str());
582 //End of CCRT porting
584 MESSAGE("--- setenv: "<<cle<<" = "<< value);
589 //=============================================================================
591 * C++ method: used by derived classes for supervision
593 //=============================================================================
595 void Engines_Parallel_Component_i::endService(const char *serviceName)
597 if ( !_CanceledThread )
598 _ThreadCpuUsed = CpuUsed_impl() ;
601 MESSAGE(pthread_self() << " Send EndService notification for " << serviceName
602 << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
603 << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
605 MESSAGE(pthread_self().p << " Send EndService notification for " << serviceName
606 << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
607 << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
612 //=============================================================================
614 * C++ method: -- CHECK IF USED --
616 //=============================================================================
618 char* Engines_Parallel_Component_i::graphName()
620 return CORBA::string_dup( _graphName.c_str() ) ;
623 //=============================================================================
625 * C++ method: -- CHECK IF USED --
627 //=============================================================================
629 char* Engines_Parallel_Component_i::nodeName()
631 return CORBA::string_dup( _nodeName.c_str() ) ;
634 //=============================================================================
636 * C++ method: used in Supervision (see kill_impl)
638 //=============================================================================
640 bool Engines_Parallel_Component_i::Killer( pthread_t ThreadId , int signum )
650 if ( pthread_cancel( ThreadId ) )
652 perror("Killer pthread_cancel error") ;
658 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
659 << " pthread_canceled") ;
661 MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
662 << " pthread_canceled") ;
668 if ( pthread_kill( ThreadId , signum ) == -1 )
670 perror("Killer pthread_kill error") ;
676 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
677 << " pthread_killed(" << signum << ")") ;
679 MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
680 << " pthread_killed(" << signum << ")") ;
688 //=============================================================================
692 //=============================================================================
696 if (theEngines_Component)
697 theEngines_Component->SetCurCpu();
700 //=============================================================================
704 //=============================================================================
706 void Engines_Parallel_Component_i::SetCurCpu()
708 _ThreadCpuUsed = CpuUsed() ;
709 // MESSAGE(pthread_self() <<
710 // " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ;
713 //=============================================================================
717 //=============================================================================
719 long Engines_Parallel_Component_i::CpuUsed()
723 struct rusage usage ;
724 if ( _ThreadId || _Executed )
726 if ( getrusage( RUSAGE_SELF , &usage ) == -1 )
728 perror("Engines_Parallel_Component_i::CpuUsed") ;
731 cpu = usage.ru_utime.tv_sec - _StartUsed ;
732 // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " "
733 // << _serviceName << usage.ru_utime.tv_sec << " - " << _StartUsed
734 // << " = " << cpu << std::endl ;
738 // std::cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId "
739 // << _ThreadId << " " << _serviceName<< " _StartUsed "
740 // << _StartUsed << std::endl ;
743 // NOT implementet yet
750 void CallCancelThread()
752 if ( theEngines_Component )
753 theEngines_Component->CancelThread() ;
756 //=============================================================================
760 //=============================================================================
762 void Engines_Parallel_Component_i::CancelThread()
764 _CanceledThread = true;
767 //=============================================================================
769 * C++ method: Send message to event channel
771 //=============================================================================
773 void Engines_Parallel_Component_i::sendMessage(const char *event_type,
776 _notifSupplier->Send(graphName(), nodeName(), event_type, message);
779 //=============================================================================
781 * C++ method: return standard library name built on component name
783 //=============================================================================
785 std::string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName)
788 std::string ret="lib";
796 std::string ret=componentName;
802 //=============================================================================
804 * C++ method: DumpPython default implementation
806 //=============================================================================
808 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Boolean isPublished,
809 CORBA::Boolean isMultiFile,
810 CORBA::Boolean& isValidScript)
812 const char* aScript = isMultiFile ? "def RebuildData(): pass" : "";
813 char* aBuffer = new char[strlen(aScript)+1];
814 strcpy(aBuffer, aScript);
815 CORBA::Octet* anOctetBuf = (CORBA::Octet*)aBuffer;
816 int aBufferSize = strlen(aBuffer)+1;
817 Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1);
818 isValidScript = true;
819 return aStreamFile._retn();
823 Engines::Salome_file_ptr
824 Engines_Parallel_Component_i::setInputFileToService(const char* service_name,
825 const char* Salome_file_name)
827 // Try to find the service, if it doesn't exist, we add it.
828 _Service_file_map_it = _Input_Service_file_map.find(service_name);
829 if (_Service_file_map_it == _Input_Service_file_map.end()) {
830 _t_Salome_file_map * _map = new _t_Salome_file_map();
831 _Input_Service_file_map[service_name] = _map;
832 _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
833 _Proxy_Input_Service_file_map[service_name] = _proxy_map;
834 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
835 _IOR_Proxy_Input_Service_file_map[service_name] = _IOR_proxy_map;
837 _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
838 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
839 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Input_Service_file_map[service_name];
841 pthread_mutex_lock(deploy_mutex);
842 std::string proxy_ior;
844 // Try to find the Salome_file ...
845 _Salome_file_map_it = _map->find(Salome_file_name);
846 if (_Salome_file_map_it == _map->end()) {
848 // We create a new PaCO++ object.
849 // He has the same configuration than
852 // Firstly, we have to create the proxy object
853 // of the Salome_file and transmit his
854 // reference to the other nodes.
855 Engines::Parallel_Salome_file_proxy_impl * proxy = NULL;
856 if (getMyRank() == 0) {
857 proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
858 new paco_omni_fabrique());
859 proxy->copyGlobalContext(this);
860 PaCO::PacoTopology_t serveur_topo;
861 serveur_topo.total = getTotalNode();
862 proxy->setTopology(serveur_topo);
864 // We register the CORBA object into the POA
865 CORBA::Object_ptr proxy_ref = proxy->_this();
867 // We send the reference to all the nodes...
868 Engines::Parallel_Component_var component_proxy =
869 Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
870 component_proxy->send_parallel_proxy_object(proxy_ref);
872 // Adding proxy into the map
873 (*_proxy_map)[Salome_file_name] = proxy;
876 this->wait_parallel_object_proxy();
879 proxy_ior = this->get_parallel_proxy_object();
880 (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
882 // We register each node of the parallel Salome_file object
884 for (int i = 0; i < getTotalNode(); i++) {
885 if (i == getMyRank()) {
886 Parallel_Salome_file_i * servant =
887 new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb),
890 servant->copyGlobalContext(this);
892 // We register the CORBA object into the POA
893 servant->POA_PaCO::InterfaceParallel::_this();
895 // Register the servant
898 // Adding servant to the map
899 (*_map)[Salome_file_name] = servant;
902 _my_com->paco_barrier();
903 // start parallel object
904 if (getMyRank() == 0) {
906 _my_com->paco_barrier();
909 _my_com->paco_barrier();
911 // Parallel_Salome_file is created and deployed
916 pthread_mutex_unlock(deploy_mutex);
917 proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
918 CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
919 return Engines::Salome_file::_narrow(proxy_ref);
922 Engines::Salome_file_ptr
923 Engines_Parallel_Component_i::setOutputFileToService(const char* service_name,
924 const char* Salome_file_name)
926 // Try to find the service, if it doesn't exist, we add it.
927 _Service_file_map_it = _Output_Service_file_map.find(service_name);
928 if (_Service_file_map_it == _Output_Service_file_map.end()) {
929 _t_Salome_file_map * _map = new _t_Salome_file_map();
930 _Output_Service_file_map[service_name] = _map;
931 _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
932 _Proxy_Output_Service_file_map[service_name] = _proxy_map;
933 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
934 _IOR_Proxy_Output_Service_file_map[service_name] = _IOR_proxy_map;
936 _t_Salome_file_map * _map = _Output_Service_file_map[service_name];
937 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Output_Service_file_map[service_name];
938 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Output_Service_file_map[service_name];
940 pthread_mutex_lock(deploy_mutex);
941 std::string proxy_ior;
943 // Try to find the Salome_file ...
944 _Salome_file_map_it = _map->find(Salome_file_name);
945 Engines::Parallel_Salome_file_proxy_impl * proxy;
946 if (_Salome_file_map_it == _map->end()) {
948 // We create a new PaCO++ object.
949 // He has the same configuration than
952 // Firstly, we have to create the proxy object
953 // of the Salome_file and transmit his
954 // reference to the other nodes.
955 if (getMyRank() == 0) {
956 proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
957 new paco_omni_fabrique());
958 proxy->copyGlobalContext(this);
959 PaCO::PacoTopology_t serveur_topo;
960 serveur_topo.total = getTotalNode();
961 proxy->setTopology(serveur_topo);
963 // We register the CORBA object into the POA
964 CORBA::Object_ptr proxy_ref = proxy->_this();
966 // We send the reference to all the nodes...
967 Engines::Parallel_Component_var component_proxy =
968 Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
969 component_proxy->send_parallel_proxy_object(proxy_ref);
971 // Adding proxy into the map
972 (*_proxy_map)[Salome_file_name] = proxy;
975 this->wait_parallel_object_proxy();
978 proxy_ior = this->get_parallel_proxy_object();
979 (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
981 // We register each node of the parallel Salome_file object
983 for (int i = 0; i < getTotalNode(); i++) {
984 if (i == getMyRank()) {
985 Parallel_Salome_file_i * servant =
986 new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb),
989 servant->copyGlobalContext(this);
991 // We register the CORBA object into the POA
992 servant->POA_PaCO::InterfaceParallel::_this();
994 // Register the servant
997 // Adding servant to the map
998 (*_map)[Salome_file_name] = servant;
1001 _my_com->paco_barrier();
1002 // start parallel object
1003 if (getMyRank() == 0) {
1005 _my_com->paco_barrier();
1008 _my_com->paco_barrier();
1011 // Parallel_Salome_file is created and deployed
1015 pthread_mutex_unlock(deploy_mutex);
1016 proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
1017 CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
1018 return Engines::Salome_file::_narrow(proxy_ref);
1021 Engines::Salome_file_ptr
1022 Engines_Parallel_Component_i::getInputFileToService(const char* service_name,
1023 const char* Salome_file_name)
1025 // Try to find the service, if it doesn't exist, we throw an exception.
1026 _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1027 if (_Proxy_Service_file_map_it == _Proxy_Input_Service_file_map.end()) {
1028 SALOME::ExceptionStruct es;
1029 es.type = SALOME::INTERNAL_ERROR;
1030 es.text = "service doesn't have salome files";
1031 throw SALOME::SALOME_Exception(es);
1033 _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
1035 // Try to find the Salome_file ...
1036 _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1037 if (_Proxy_Salome_file_map_it == _map->end()) {
1038 SALOME::ExceptionStruct es;
1039 es.type = SALOME::INTERNAL_ERROR;
1040 es.text = "service doesn't have this Salome_file";
1041 throw SALOME::SALOME_Exception(es);
1044 // Client get the proxy object
1045 Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1046 return Sfile->_this();
1049 Engines::Salome_file_ptr
1050 Engines_Parallel_Component_i::getOutputFileToService(const char* service_name,
1051 const char* Salome_file_name)
1053 // Try to find the service, if it doesn't exist, we throw an exception.
1054 _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1055 if (_Proxy_Service_file_map_it == _Proxy_Output_Service_file_map.end()) {
1056 SALOME::ExceptionStruct es;
1057 es.type = SALOME::INTERNAL_ERROR;
1058 es.text = "service doesn't have salome files";
1059 throw SALOME::SALOME_Exception(es);
1061 _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1063 // Try to find the Salome_file ...
1064 _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1065 if (_Proxy_Salome_file_map_it == _map->end()) {
1066 SALOME::ExceptionStruct es;
1067 es.type = SALOME::INTERNAL_ERROR;
1068 es.text = "service doesn't have this Salome_file";
1069 throw SALOME::SALOME_Exception(es);
1072 // Client get the proxy object
1073 Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1074 return Sfile->_this();
1079 Engines_Parallel_Component_i::checkInputFilesToService(const char* service_name)
1081 // Try to find the service, if it doesn't exist, nothing to do.
1082 _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1083 if (_Proxy_Service_file_map_it != _Proxy_Input_Service_file_map.end()) {
1084 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
1085 _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
1086 _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
1088 for(;begin!=end;begin++) {
1089 Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1090 std::string file_port_name = begin->first;
1091 configureSalome_file(service_name, file_port_name, file);
1098 Engines_Parallel_Component_i::checkOutputFilesToService(const char* service_name)
1100 // Try to find the service, if it doesn't exist, nothing to do.
1101 _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1102 if (_Proxy_Service_file_map_it != _Proxy_Output_Service_file_map.end()) {
1103 _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1104 _t_Proxy_Salome_file_map::iterator begin = _map->begin();
1105 _t_Proxy_Salome_file_map::iterator end = _map->end();
1107 for(;begin!=end;begin++) {
1108 Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1109 std::string file_port_name = begin->first;
1110 configureSalome_file(service_name, file_port_name, file);
1117 //=============================================================================
1119 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1121 //=============================================================================
1123 Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
1124 _proxy = _orb->object_to_string(proxy_ref);
1127 //=============================================================================
1129 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1131 //=============================================================================
1133 Engines_Parallel_Component_i::wait_parallel_object_proxy() {
1134 char * proxy = NULL;
1135 proxy = get_parallel_proxy_object();
1136 while(proxy == NULL)
1139 proxy = get_parallel_proxy_object();
1143 //=============================================================================
1145 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1147 //=============================================================================
1149 Engines_Parallel_Component_i::get_parallel_proxy_object() {
1154 //=============================================================================
1156 * C++ method: used to configure the Salome_file into the runtime.
1157 * \param service_name name of the service that use this Salome_file
1158 * \param file_port_name name of the Salome_file
1159 * \param file Parallel Salome_file C++ object
1161 //=============================================================================
1163 Engines_Parallel_Component_i::configureSalome_file(std::string service_name,
1164 std::string file_port_name,
1165 Engines::Parallel_Salome_file_proxy_impl * file)
1167 // By default this method does nothing