1 // Copyright (C) 2007-2013 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
23 // SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel
24 // File : SALOME_ParallelComponent_i.cxx
25 // Author : André RIBES, EDF
26 // Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
28 #include "SALOME_ParallelComponent_i.hxx"
29 #include "SALOME_ParallelContainer_i.hxx"
37 #include "utilities.h"
38 #include "Basics_Utils.hxx"
42 #include <sys/resource.h>
45 #include <sys/timeb.h>
49 #include <paco_dummy.h>
50 #include <paco_omni.h>
53 extern bool _Sleeping ;
54 static Engines_Parallel_Component_i * theEngines_Component ;
56 bool Engines_Parallel_Component_i::_isMultiStudy = true;
57 bool Engines_Parallel_Component_i::_isMultiInstance = false;
59 //=============================================================================
61 * Standard Constructor for generic Component, used in derived class
62 * Connection to Registry and Notification
63 * \param orb Object Request broker given by Container
64 * \parap poa Portable Object Adapter from Container (normally root_poa)
65 * \param contId container CORBA id inside the server
66 * \param instanceName unique instance name for this object (see Container_i)
67 * \param interfaceName component class name
68 * \param notif use of notification
70 //=============================================================================
72 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank,
73 PortableServer::POA_ptr poa,
74 PortableServer::ObjectId * contId,
75 const char *instanceName,
76 const char *interfaceName,
79 InterfaceParallel_impl(orb,ior,rank),
80 Engines::EngineComponent_serv(orb,ior,rank),
81 Engines::EngineComponent_base_serv(orb,ior,rank),
82 Engines::Parallel_Component_serv(orb,ior,rank),
83 Engines::Parallel_Component_base_serv(orb,ior,rank),
84 _instanceName(instanceName),
85 _interfaceName(interfaceName),
87 _myConnexionToRegistry(0),
95 _CanceledThread(false)
97 MESSAGE("Parallel Component constructor with instanceName "<< _instanceName);
98 //SCRUTE(pd_refCount);
99 _orb = CORBA::ORB::_duplicate(orb);
100 _poa = PortableServer::POA::_duplicate(poa);
102 CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior...
106 CORBA::String_var the_ior = _orb->object_to_string(o);
107 _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession",
108 _instanceName.c_str());
110 _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
112 deploy_mutex = new pthread_mutex_t();
113 pthread_mutex_init(deploy_mutex, NULL);
115 //SCRUTE(pd_refCount);
118 //=============================================================================
120 * Destructor: call Container for decrement of instances count.
121 * When instances count falls to 0, the container tries to remove the
122 * component library (dlclose)
124 //=============================================================================
126 Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
128 MESSAGE("Parallel Component destructor");
129 Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
130 if(_myConnexionToRegistry)delete _myConnexionToRegistry;
131 if(_notifSupplier)delete _notifSupplier;
135 pthread_mutex_destroy(deploy_mutex);
141 //=============================================================================
143 * CORBA method: return name of the instance, unique in this Container
145 //=============================================================================
147 char* Engines_Parallel_Component_i::instanceName()
149 return CORBA::string_dup(_instanceName.c_str()) ;
152 //=============================================================================
154 * CORBA method: return name of the component class
156 //=============================================================================
158 char* Engines_Parallel_Component_i::interfaceName()
160 return CORBA::string_dup(_interfaceName.c_str()) ;
163 //=============================================================================
165 * CORBA method: Get study Id
166 * \return -1: not initialised (Internal Error)
167 * 0: multistudy component instance
168 * >0: study id associated to this instance
170 //=============================================================================
172 CORBA::Long Engines_Parallel_Component_i::getStudyId()
177 //=============================================================================
179 * CORBA method: Test if instance is alive and responds
181 //=============================================================================
183 void Engines_Parallel_Component_i::ping()
186 MESSAGE("Engines_Parallel_Component_i::ping() pid "<< getpid() << " threadid "
189 MESSAGE("Engines_Parallel_Component_i::ping() pid "<< _getpid()<< " threadid "
190 << pthread_self().p );
194 //=============================================================================
196 * CORBA method: Deactivate this instance. CORBA object is deactivated (do not
197 * respond any more to CORBA calls), the connection to Regsitry is removed
198 * (Registry informed of deactivation), internal server reference counter on
199 * the derived servant class is decremented, to allow destruction of the class
200 * (delete) by POA, when there are no more references.
201 * -- TO BE USED BY CONTAINER ONLY (Container housekeeping) --
203 //=============================================================================
205 void Engines_Parallel_Component_i::destroy()
207 MESSAGE("Engines_Parallel_Component_i::destroy()");
208 MESSAGE("Object Instance will be deleted when Shutdown of the container will be called");
216 //=============================================================================
218 * CORBA method: return CORBA reference of the Container
221 //=============================================================================
223 Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef()
225 MESSAGE("Engines_Parallel_Component_i::GetContainerRef");
226 CORBA::Object_var o = _poa->id_to_reference(*_contId) ;
227 return Engines::Container::_narrow(o);
230 //=============================================================================
233 * Gives a sequence of (key=string,value=any) to the component.
234 * Base class component stores the sequence in a map.
235 * The map is cleared before.
236 * This map is for use by derived classes.
237 * \param dico sequence of (key=string,value=any)
239 //=============================================================================
241 void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico)
244 for (CORBA::ULong i=0; i<dico.length(); i++)
246 std::string cle(dico[i].key);
247 _fieldsDict[cle] = dico[i].value;
251 //=============================================================================
254 * returns a previously stored map (key=string,value=any) as a sequence.
255 * (see setProperties)
257 //=============================================================================
259 Engines::FieldsDict* Engines_Parallel_Component_i::getProperties()
261 Engines::FieldsDict_var copie = new Engines::FieldsDict;
262 copie->length(_fieldsDict.size());
263 std::map<std::string,CORBA::Any>::iterator it;
265 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++)
267 std::string cle((*it).first);
268 copie[i].key = CORBA::string_dup(cle.c_str());
269 copie[i].value = _fieldsDict[cle];
271 return copie._retn();
274 //=============================================================================
276 * CORBA method: used by Supervision to give names to this instance
278 //=============================================================================
280 void Engines_Parallel_Component_i::Names( const char * graphName ,
281 const char * nodeName )
283 _graphName = graphName;
284 _nodeName = nodeName;
285 MESSAGE("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '"
286 << _nodeName << "' )");
289 //=============================================================================
291 * CORBA method: used in Supervision
293 //=============================================================================
295 bool Engines_Parallel_Component_i::Kill_impl()
297 // MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self()
298 // << " pid " << getpid() << " instanceName "
299 // << _instanceName.c_str() << " interface " << _interfaceName.c_str()
300 // << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
301 // << dec << " _ThreadId " << _ThreadId << " this " << hex << this
304 bool RetVal = false ;
306 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
308 RetVal = Killer( _ThreadId , SIGUSR2 ) ;
309 _ThreadId = (pthread_t ) -1 ;
313 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
315 RetVal = Killer( *_ThreadId , 0 ) ;
316 _ThreadId = (pthread_t* ) 0 ;
323 //=============================================================================
325 * CORBA method: used in Supervision
327 //=============================================================================
329 bool Engines_Parallel_Component_i::Stop_impl()
332 MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self()
333 << " pid " << getpid() << " instanceName "
334 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
335 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
336 << dec << " _ThreadId " << _ThreadId );
338 MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self().p
339 << " pid " << _getpid() << " instanceName "
340 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
341 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
342 << dec << " _ThreadId " << _ThreadId );
346 bool RetVal = false ;
348 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
350 RetVal = Killer( _ThreadId , 0 ) ;
351 _ThreadId = (pthread_t ) -1 ;
354 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
356 RetVal = Killer( *_ThreadId , 0 ) ;
357 _ThreadId = (pthread_t* ) 0 ;
363 //=============================================================================
365 * CORBA method: used in Supervision
367 //=============================================================================
369 bool Engines_Parallel_Component_i::Suspend_impl()
372 MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self()
373 << " pid " << getpid() << " instanceName "
374 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
375 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
376 << dec << " _ThreadId " << _ThreadId );
378 MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self().p
379 << " pid " << _getpid() << " instanceName "
380 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
381 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
382 << dec << " _ThreadId " << _ThreadId );
385 bool RetVal = false ;
387 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
389 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
399 RetVal = Killer( _ThreadId ,SIGINT ) ;
401 RetVal = Killer( *_ThreadId ,SIGINT ) ;
403 //if ( RetVal ) _Sleeping = true;
410 //=============================================================================
412 * CORBA method: used in Supervision
414 //=============================================================================
416 bool Engines_Parallel_Component_i::Resume_impl()
419 MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self()
420 << " pid " << getpid() << " instanceName "
421 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
422 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
423 << dec << " _ThreadId " << _ThreadId );
425 MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self().p
426 << " pid " << _getpid() << " instanceName "
427 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
428 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
429 << dec << " _ThreadId " << _ThreadId );
431 bool RetVal = false ;
433 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
435 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
451 //=============================================================================
455 //=============================================================================
457 CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl()
460 if ( _ThreadId || _Executed )
465 if ( pthread_self() != _ThreadId )
467 if ( pthread_self().p != _ThreadId->p )
475 // Get Cpu in the appropriate thread with that object !...
476 theEngines_Component = this ;
478 Killer( _ThreadId ,SIGUSR1 ) ;
480 Killer( *_ThreadId ,SIGUSR11 ) ;
483 cpu = _ThreadCpuUsed ;
487 _ThreadCpuUsed = CpuUsed() ;
488 cpu = _ThreadCpuUsed ;
489 // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
490 // << _serviceName << " " << cpu << std::endl ;
495 cpu = _ThreadCpuUsed ;
496 // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
497 // << _serviceName << " " << cpu<< std::endl ;
502 // std::cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId "
503 // <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<<std::endl;
509 //=============================================================================
511 * C++ method: return Container Servant
513 //=============================================================================
515 Engines_Parallel_Container_i *Engines_Parallel_Component_i::GetContainerPtr()
517 return dynamic_cast<Engines_Parallel_Container_i*>(_poa->id_to_servant(*_contId)) ;
520 //=============================================================================
522 * C++ method: set study Id
523 * \param studyId 0 if instance is not associated to a study,
524 * >0 otherwise (== study id)
525 * \return true if the set of study Id is OK
526 * must be set once by Container, at instance creation,
527 * and cannot be changed after.
529 //=============================================================================
531 CORBA::Boolean Engines_Parallel_Component_i::setStudyId(CORBA::Long studyId)
533 ASSERT( studyId >= 0);
534 CORBA::Boolean ret = false;
535 if (_studyId < 0) // --- not yet initialized
541 if ( _studyId == studyId) ret = true;
545 //=============================================================================
547 * C++ method: return CORBA instance id, the id is set in derived class
548 * constructor, when instance is activated.
550 //=============================================================================
552 PortableServer::ObjectId * Engines_Parallel_Component_i::getId()
554 // MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()");
558 //=============================================================================
560 * C++ method: used by derived classes for supervision
562 //=============================================================================
564 void Engines_Parallel_Component_i::beginService(const char *serviceName)
567 MESSAGE(pthread_self() << "Send BeginService notification for " <<serviceName
568 << endl << "Parallel Component instance : " << _instanceName << endl << endl);
570 MESSAGE(pthread_self().p << "Send BeginService notification for " <<serviceName
571 << endl << "Parallel Component instance : " << _instanceName << endl << endl);
574 _ThreadId = pthread_self() ;
576 _ThreadId = new pthread_t;
577 _ThreadId->p = pthread_self().p ;
578 _ThreadId->x = pthread_self().x ;
581 _StartUsed = CpuUsed_impl() ;
584 _serviceName = serviceName ;
585 if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) )
587 perror("pthread_setcanceltype ") ;
590 if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) )
592 perror("pthread_setcancelstate ") ;
595 // MESSAGE(pthread_self() << " Return from BeginService for " << serviceName
596 // << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed
597 // << " _graphName " << _graphName << " _nodeName " << _nodeName );
599 // --- for supervisor : all strings given with setProperties
600 // are set in environment
601 bool overwrite = true;
602 std::map<std::string,CORBA::Any>::iterator it;
603 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++)
605 std::string cle((*it).first);
606 if ((*it).second.type()->kind() == CORBA::tk_string)
609 (*it).second >>= value;
610 // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
612 //int ret = setenv(cle.c_str(), value, overwrite);
613 setenv(cle.c_str(), value, overwrite);
615 //CCRT porting : setenv not defined in stdlib.h
619 // char* cast because 1st arg of linux putenv function
620 // is not a const char* !
621 //int ret=putenv((char *)s.c_str());
622 putenv((char *)s.c_str());
623 //End of CCRT porting
625 MESSAGE("--- setenv: "<<cle<<" = "<< value);
630 //=============================================================================
632 * C++ method: used by derived classes for supervision
634 //=============================================================================
636 void Engines_Parallel_Component_i::endService(const char *serviceName)
638 if ( !_CanceledThread )
639 _ThreadCpuUsed = CpuUsed_impl() ;
642 MESSAGE(pthread_self() << " Send EndService notification for " << serviceName
643 << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
644 << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
646 MESSAGE(pthread_self().p << " Send EndService notification for " << serviceName
647 << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
648 << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
653 //=============================================================================
655 * C++ method: -- CHECK IF USED --
657 //=============================================================================
659 char* Engines_Parallel_Component_i::graphName()
661 return CORBA::string_dup( _graphName.c_str() ) ;
664 //=============================================================================
666 * C++ method: -- CHECK IF USED --
668 //=============================================================================
670 char* Engines_Parallel_Component_i::nodeName()
672 return CORBA::string_dup( _nodeName.c_str() ) ;
675 //=============================================================================
677 * C++ method: used in Supervision (see kill_impl)
679 //=============================================================================
681 bool Engines_Parallel_Component_i::Killer( pthread_t ThreadId , int signum )
691 if ( pthread_cancel( ThreadId ) )
693 perror("Killer pthread_cancel error") ;
699 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
700 << " pthread_canceled") ;
702 MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
703 << " pthread_canceled") ;
709 if ( pthread_kill( ThreadId , signum ) == -1 )
711 perror("Killer pthread_kill error") ;
717 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
718 << " pthread_killed(" << signum << ")") ;
720 MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
721 << " pthread_killed(" << signum << ")") ;
729 //=============================================================================
733 //=============================================================================
737 if (theEngines_Component)
738 theEngines_Component->SetCurCpu();
741 //=============================================================================
745 //=============================================================================
747 void Engines_Parallel_Component_i::SetCurCpu()
749 _ThreadCpuUsed = CpuUsed() ;
750 // MESSAGE(pthread_self() <<
751 // " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ;
754 //=============================================================================
758 //=============================================================================
760 long Engines_Parallel_Component_i::CpuUsed()
764 struct rusage usage ;
765 if ( _ThreadId || _Executed )
767 if ( getrusage( RUSAGE_SELF , &usage ) == -1 )
769 perror("Engines_Parallel_Component_i::CpuUsed") ;
772 cpu = usage.ru_utime.tv_sec - _StartUsed ;
773 // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " "
774 // << _serviceName << usage.ru_utime.tv_sec << " - " << _StartUsed
775 // << " = " << cpu << std::endl ;
779 // std::cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId "
780 // << _ThreadId << " " << _serviceName<< " _StartUsed "
781 // << _StartUsed << std::endl ;
784 // NOT implementet yet
791 void CallCancelThread()
793 if ( theEngines_Component )
794 theEngines_Component->CancelThread() ;
797 //=============================================================================
801 //=============================================================================
803 void Engines_Parallel_Component_i::CancelThread()
805 _CanceledThread = true;
808 //=============================================================================
810 * C++ method: Send message to event channel
812 //=============================================================================
814 void Engines_Parallel_Component_i::sendMessage(const char *event_type,
817 _notifSupplier->Send(graphName(), nodeName(), event_type, message);
820 //=============================================================================
822 * C++ method: return standard library name built on component name
824 //=============================================================================
826 std::string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName)
829 std::string ret="lib";
833 std::string ret=componentName;
839 //=============================================================================
841 * C++ method: DumpPython default implementation
843 //=============================================================================
845 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr theStudy,
846 CORBA::Boolean isPublished,
847 CORBA::Boolean isMultiFile,
848 CORBA::Boolean& isValidScript)
850 const char* aScript = isMultiFile ? "def RebuildData(theStudy): pass" : "";
851 char* aBuffer = new char[strlen(aScript)+1];
852 strcpy(aBuffer, aScript);
853 CORBA::Octet* anOctetBuf = (CORBA::Octet*)aBuffer;
854 int aBufferSize = strlen(aBuffer)+1;
855 Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1);
856 isValidScript = true;
857 return aStreamFile._retn();
861 Engines::Salome_file_ptr
862 Engines_Parallel_Component_i::setInputFileToService(const char* service_name,
863 const char* Salome_file_name)
865 // Try to find the service, if it doesn't exist, we add it.
866 _Service_file_map_it = _Input_Service_file_map.find(service_name);
867 if (_Service_file_map_it == _Input_Service_file_map.end()) {
868 _t_Salome_file_map * _map = new _t_Salome_file_map();
869 _Input_Service_file_map[service_name] = _map;
870 _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
871 _Proxy_Input_Service_file_map[service_name] = _proxy_map;
872 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
873 _IOR_Proxy_Input_Service_file_map[service_name] = _IOR_proxy_map;
875 _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
876 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
877 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Input_Service_file_map[service_name];
879 pthread_mutex_lock(deploy_mutex);
880 std::string proxy_ior;
882 // Try to find the Salome_file ...
883 _Salome_file_map_it = _map->find(Salome_file_name);
884 if (_Salome_file_map_it == _map->end()) {
886 // We create a new PaCO++ object.
887 // He has the same configuration than
890 // Firstly, we have to create the proxy object
891 // of the Salome_file and transmit his
892 // reference to the other nodes.
893 Engines::Parallel_Salome_file_proxy_impl * proxy = NULL;
894 if (getMyRank() == 0) {
895 proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
896 new paco_omni_fabrique());
897 proxy->copyGlobalContext(this);
898 PaCO::PacoTopology_t serveur_topo;
899 serveur_topo.total = getTotalNode();
900 proxy->setTopology(serveur_topo);
902 // We register the CORBA objet into the POA
903 CORBA::Object_ptr proxy_ref = proxy->_this();
905 // We send the reference to all the nodes...
906 Engines::Parallel_Component_var component_proxy =
907 Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
908 component_proxy->send_parallel_proxy_object(proxy_ref);
910 // Adding proxy into the map
911 (*_proxy_map)[Salome_file_name] = proxy;
914 this->wait_parallel_object_proxy();
917 proxy_ior = this->get_parallel_proxy_object();
918 (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
920 // We register each node of the parallel Salome_file object
922 for (int i = 0; i < getTotalNode(); i++) {
923 if (i == getMyRank()) {
924 Parallel_Salome_file_i * servant =
925 new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb),
928 servant->copyGlobalContext(this);
930 // We register the CORBA objet into the POA
931 servant->POA_PaCO::InterfaceParallel::_this();
933 // Register the servant
936 // Adding servant to the map
937 (*_map)[Salome_file_name] = servant;
940 _my_com->paco_barrier();
941 // start parallel object
942 if (getMyRank() == 0) {
944 _my_com->paco_barrier();
947 _my_com->paco_barrier();
949 // Parallel_Salome_file is created and deployed
954 pthread_mutex_unlock(deploy_mutex);
955 proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
956 CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
957 return Engines::Salome_file::_narrow(proxy_ref);
960 Engines::Salome_file_ptr
961 Engines_Parallel_Component_i::setOutputFileToService(const char* service_name,
962 const char* Salome_file_name)
964 // Try to find the service, if it doesn't exist, we add it.
965 _Service_file_map_it = _Output_Service_file_map.find(service_name);
966 if (_Service_file_map_it == _Output_Service_file_map.end()) {
967 _t_Salome_file_map * _map = new _t_Salome_file_map();
968 _Output_Service_file_map[service_name] = _map;
969 _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
970 _Proxy_Output_Service_file_map[service_name] = _proxy_map;
971 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
972 _IOR_Proxy_Output_Service_file_map[service_name] = _IOR_proxy_map;
974 _t_Salome_file_map * _map = _Output_Service_file_map[service_name];
975 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Output_Service_file_map[service_name];
976 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Output_Service_file_map[service_name];
978 pthread_mutex_lock(deploy_mutex);
979 std::string proxy_ior;
981 // Try to find the Salome_file ...
982 _Salome_file_map_it = _map->find(Salome_file_name);
983 Engines::Parallel_Salome_file_proxy_impl * proxy;
984 if (_Salome_file_map_it == _map->end()) {
986 // We create a new PaCO++ object.
987 // He has the same configuration than
990 // Firstly, we have to create the proxy object
991 // of the Salome_file and transmit his
992 // reference to the other nodes.
993 if (getMyRank() == 0) {
994 proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
995 new paco_omni_fabrique());
996 proxy->copyGlobalContext(this);
997 PaCO::PacoTopology_t serveur_topo;
998 serveur_topo.total = getTotalNode();
999 proxy->setTopology(serveur_topo);
1001 // We register the CORBA objet into the POA
1002 CORBA::Object_ptr proxy_ref = proxy->_this();
1004 // We send the reference to all the nodes...
1005 Engines::Parallel_Component_var component_proxy =
1006 Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
1007 component_proxy->send_parallel_proxy_object(proxy_ref);
1009 // Adding proxy into the map
1010 (*_proxy_map)[Salome_file_name] = proxy;
1013 this->wait_parallel_object_proxy();
1016 proxy_ior = this->get_parallel_proxy_object();
1017 (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
1019 // We register each node of the parallel Salome_file object
1021 for (int i = 0; i < getTotalNode(); i++) {
1022 if (i == getMyRank()) {
1023 Parallel_Salome_file_i * servant =
1024 new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb),
1027 servant->copyGlobalContext(this);
1029 // We register the CORBA objet into the POA
1030 servant->POA_PaCO::InterfaceParallel::_this();
1032 // Register the servant
1035 // Adding servant to the map
1036 (*_map)[Salome_file_name] = servant;
1039 _my_com->paco_barrier();
1040 // start parallel object
1041 if (getMyRank() == 0) {
1043 _my_com->paco_barrier();
1046 _my_com->paco_barrier();
1049 // Parallel_Salome_file is created and deployed
1053 pthread_mutex_unlock(deploy_mutex);
1054 proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
1055 CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
1056 return Engines::Salome_file::_narrow(proxy_ref);
1059 Engines::Salome_file_ptr
1060 Engines_Parallel_Component_i::getInputFileToService(const char* service_name,
1061 const char* Salome_file_name)
1063 // Try to find the service, if it doesn't exist, we throw an exception.
1064 _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1065 if (_Proxy_Service_file_map_it == _Proxy_Input_Service_file_map.end()) {
1066 SALOME::ExceptionStruct es;
1067 es.type = SALOME::INTERNAL_ERROR;
1068 es.text = "service doesn't have salome files";
1069 throw SALOME::SALOME_Exception(es);
1071 _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
1073 // Try to find the Salome_file ...
1074 _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1075 if (_Proxy_Salome_file_map_it == _map->end()) {
1076 SALOME::ExceptionStruct es;
1077 es.type = SALOME::INTERNAL_ERROR;
1078 es.text = "service doesn't have this Salome_file";
1079 throw SALOME::SALOME_Exception(es);
1082 // Client get the proxy object
1083 Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1084 return Sfile->_this();
1087 Engines::Salome_file_ptr
1088 Engines_Parallel_Component_i::getOutputFileToService(const char* service_name,
1089 const char* Salome_file_name)
1091 // Try to find the service, if it doesn't exist, we throw an exception.
1092 _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1093 if (_Proxy_Service_file_map_it == _Proxy_Output_Service_file_map.end()) {
1094 SALOME::ExceptionStruct es;
1095 es.type = SALOME::INTERNAL_ERROR;
1096 es.text = "service doesn't have salome files";
1097 throw SALOME::SALOME_Exception(es);
1099 _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1101 // Try to find the Salome_file ...
1102 _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1103 if (_Proxy_Salome_file_map_it == _map->end()) {
1104 SALOME::ExceptionStruct es;
1105 es.type = SALOME::INTERNAL_ERROR;
1106 es.text = "service doesn't have this Salome_file";
1107 throw SALOME::SALOME_Exception(es);
1110 // Client get the proxy object
1111 Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1112 return Sfile->_this();
1117 Engines_Parallel_Component_i::checkInputFilesToService(const char* service_name)
1119 // Try to find the service, if it doesn't exist, nothing to do.
1120 _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1121 if (_Proxy_Service_file_map_it != _Proxy_Input_Service_file_map.end()) {
1122 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
1123 _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
1124 _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
1126 for(;begin!=end;begin++) {
1127 Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1128 std::string file_port_name = begin->first;
1129 configureSalome_file(service_name, file_port_name, file);
1136 Engines_Parallel_Component_i::checkOutputFilesToService(const char* service_name)
1138 // Try to find the service, if it doesn't exist, nothing to do.
1139 _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1140 if (_Proxy_Service_file_map_it != _Proxy_Output_Service_file_map.end()) {
1141 _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1142 _t_Proxy_Salome_file_map::iterator begin = _map->begin();
1143 _t_Proxy_Salome_file_map::iterator end = _map->end();
1145 for(;begin!=end;begin++) {
1146 Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1147 std::string file_port_name = begin->first;
1148 configureSalome_file(service_name, file_port_name, file);
1155 //=============================================================================
1157 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1159 //=============================================================================
1161 Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
1162 _proxy = _orb->object_to_string(proxy_ref);
1165 //=============================================================================
1167 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1169 //=============================================================================
1171 Engines_Parallel_Component_i::wait_parallel_object_proxy() {
1172 char * proxy = NULL;
1173 proxy = get_parallel_proxy_object();
1174 while(proxy == NULL)
1177 proxy = get_parallel_proxy_object();
1181 //=============================================================================
1183 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1185 //=============================================================================
1187 Engines_Parallel_Component_i::get_parallel_proxy_object() {
1192 //=============================================================================
1194 * C++ method: used to configure the Salome_file into the runtime.
1195 * \param service_name name of the service that use this Salome_file
1196 * \param file_port_name name of the Salome_file
1197 * \param file Parallel Salome_file C++ object
1199 //=============================================================================
1201 Engines_Parallel_Component_i::configureSalome_file(std::string service_name,
1202 std::string file_port_name,
1203 Engines::Parallel_Salome_file_proxy_impl * file)
1205 // By default this method does nothing