1 // Copyright (C) 2007-2008 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
22 // SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel
23 // File : SALOME_ParallelComponent_i.cxx
24 // Author : André RIBES, EDF
25 // Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
27 #include "SALOME_ParallelComponent_i.hxx"
28 #include "SALOME_ParallelContainer_i.hxx"
36 #include "utilities.h"
37 #include "Basics_Utils.hxx"
41 #include <sys/resource.h>
44 #include <sys/timeb.h>
48 #include <paco_dummy.h>
49 #include <paco_omni.h>
53 extern bool _Sleeping ;
54 static Engines_Parallel_Component_i * theEngines_Component ;
56 bool Engines_Parallel_Component_i::_isMultiStudy = true;
57 bool Engines_Parallel_Component_i::_isMultiInstance = false;
59 //=============================================================================
61 * Standard Constructor for generic Component, used in derived class
62 * Connection to Registry and Notification
63 * \param orb Object Request broker given by Container
64 * \parap poa Portable Object Adapter from Container (normally root_poa)
65 * \param contId container CORBA id inside the server
66 * \param instanceName unique instance name for this object (see Container_i)
67 * \param interfaceName component class name
68 * \param notif use of notification
70 //=============================================================================
72 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank,
73 PortableServer::POA_ptr poa,
74 PortableServer::ObjectId * contId,
75 const char *instanceName,
76 const char *interfaceName,
79 InterfaceParallel_impl(orb,ior,rank),
80 Engines::Component_serv(orb,ior,rank),
81 Engines::Component_base_serv(orb,ior,rank),
82 Engines::Parallel_Component_serv(orb,ior,rank),
83 Engines::Parallel_Component_base_serv(orb,ior,rank),
84 _instanceName(instanceName),
85 _interfaceName(interfaceName),
87 _myConnexionToRegistry(0),
95 _CanceledThread(false)
97 MESSAGE("Parallel Component constructor with instanceName "<< _instanceName);
98 //SCRUTE(pd_refCount);
99 _orb = CORBA::ORB::_duplicate(orb);
100 _poa = PortableServer::POA::_duplicate(poa);
102 CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior...
106 CORBA::String_var the_ior = _orb->object_to_string(o);
107 _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession",
108 _instanceName.c_str());
110 _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
112 deploy_mutex = new pthread_mutex_t();
113 pthread_mutex_init(deploy_mutex, NULL);
115 //SCRUTE(pd_refCount);
118 //=============================================================================
120 * Destructor: call Container for decrement of instances count.
121 * When instances count falls to 0, the container tries to remove the
122 * component library (dlclose)
124 //=============================================================================
126 Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
128 MESSAGE("Parallel Component destructor");
129 Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
130 if(_myConnexionToRegistry)delete _myConnexionToRegistry;
131 if(_notifSupplier)delete _notifSupplier;
135 pthread_mutex_destroy(deploy_mutex);
141 //=============================================================================
143 * CORBA method: return name of the instance, unique in this Container
145 //=============================================================================
147 char* Engines_Parallel_Component_i::instanceName()
149 return CORBA::string_dup(_instanceName.c_str()) ;
152 //=============================================================================
154 * CORBA method: return name of the component class
156 //=============================================================================
158 char* Engines_Parallel_Component_i::interfaceName()
160 return CORBA::string_dup(_interfaceName.c_str()) ;
163 //=============================================================================
165 * CORBA method: Get study Id
166 * \return -1: not initialised (Internal Error)
167 * 0: multistudy component instance
168 * >0: study id associated to this instance
170 //=============================================================================
172 CORBA::Long Engines_Parallel_Component_i::getStudyId()
177 //=============================================================================
179 * CORBA method: Test if instance is alive and responds
181 //=============================================================================
183 void Engines_Parallel_Component_i::ping()
186 MESSAGE("Engines_Parallel_Component_i::ping() pid "<< getpid() << " threadid "
189 MESSAGE("Engines_Parallel_Component_i::ping() pid "<< _getpid()<< " threadid "
190 << pthread_self().p );
194 //=============================================================================
196 * CORBA method: Deactivate this instance. CORBA object is deactivated (do not
197 * respond any more to CORBA calls), the connection to Regsitry is removed
198 * (Registry informed of deactivation), internal server reference counter on
199 * the derived servant class is decremented, to allow destruction of the class
200 * (delete) by POA, when there are no more references.
201 * -- TO BE USED BY CONTAINER ONLY (Container housekeeping) --
203 //=============================================================================
205 void Engines_Parallel_Component_i::destroy()
207 MESSAGE("Engines_Parallel_Component_i::destroy()");
208 MESSAGE("Object Instance will be deleted when Shutdown of the container will be called");
216 //=============================================================================
218 * CORBA method: return CORBA reference of the Container
221 //=============================================================================
223 Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef()
225 MESSAGE("Engines_Parallel_Component_i::GetContainerRef");
226 CORBA::Object_var o = _poa->id_to_reference(*_contId) ;
227 return Engines::Container::_narrow(o);
230 //=============================================================================
233 * Gives a sequence of (key=string,value=any) to the component.
234 * Base class component stores the sequence in a map.
235 * The map is cleared before.
236 * This map is for use by derived classes.
237 * \param dico sequence of (key=string,value=any)
239 //=============================================================================
241 void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico)
244 for (CORBA::ULong i=0; i<dico.length(); i++)
246 std::string cle(dico[i].key);
247 _fieldsDict[cle] = dico[i].value;
251 //=============================================================================
254 * returns a previously stored map (key=string,value=any) as a sequence.
255 * (see setProperties)
257 //=============================================================================
259 Engines::FieldsDict* Engines_Parallel_Component_i::getProperties()
261 Engines::FieldsDict_var copie = new Engines::FieldsDict;
262 copie->length(_fieldsDict.size());
263 map<std::string,CORBA::Any>::iterator it;
265 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++)
267 std::string cle((*it).first);
268 copie[i].key = CORBA::string_dup(cle.c_str());
269 copie[i].value = _fieldsDict[cle];
271 return copie._retn();
274 //=============================================================================
276 * CORBA method: used by Supervision to give names to this instance
278 //=============================================================================
280 void Engines_Parallel_Component_i::Names( const char * graphName ,
281 const char * nodeName )
283 _graphName = graphName;
284 _nodeName = nodeName;
285 MESSAGE("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '"
286 << _nodeName << "' )");
289 //=============================================================================
291 * CORBA method: used in Supervision
293 //=============================================================================
295 bool Engines_Parallel_Component_i::Kill_impl()
297 // MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self()
298 // << " pid " << getpid() << " instanceName "
299 // << _instanceName.c_str() << " interface " << _interfaceName.c_str()
300 // << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
301 // << dec << " _ThreadId " << _ThreadId << " this " << hex << this
304 bool RetVal = false ;
306 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
308 RetVal = Killer( _ThreadId , SIGUSR2 ) ;
309 _ThreadId = (pthread_t ) -1 ;
313 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
315 RetVal = Killer( *_ThreadId , 0 ) ;
316 _ThreadId = (pthread_t* ) 0 ;
323 //=============================================================================
325 * CORBA method: used in Supervision
327 //=============================================================================
329 bool Engines_Parallel_Component_i::Stop_impl()
332 MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self()
333 << " pid " << getpid() << " instanceName "
334 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
335 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
336 << dec << " _ThreadId " << _ThreadId );
338 MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self().p
339 << " pid " << _getpid() << " instanceName "
340 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
341 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
342 << dec << " _ThreadId " << _ThreadId );
346 bool RetVal = false ;
348 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
350 RetVal = Killer( _ThreadId , 0 ) ;
351 _ThreadId = (pthread_t ) -1 ;
354 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
356 RetVal = Killer( *_ThreadId , 0 ) ;
357 _ThreadId = (pthread_t* ) 0 ;
363 //=============================================================================
365 * CORBA method: used in Supervision
367 //=============================================================================
369 bool Engines_Parallel_Component_i::Suspend_impl()
372 MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self()
373 << " pid " << getpid() << " instanceName "
374 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
375 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
376 << dec << " _ThreadId " << _ThreadId );
378 MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self().p
379 << " pid " << _getpid() << " instanceName "
380 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
381 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
382 << dec << " _ThreadId " << _ThreadId );
385 bool RetVal = false ;
387 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
389 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
399 RetVal = Killer( _ThreadId ,SIGINT ) ;
401 RetVal = Killer( *_ThreadId ,SIGINT ) ;
403 //if ( RetVal ) _Sleeping = true;
410 //=============================================================================
412 * CORBA method: used in Supervision
414 //=============================================================================
416 bool Engines_Parallel_Component_i::Resume_impl()
419 MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self()
420 << " pid " << getpid() << " instanceName "
421 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
422 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
423 << dec << " _ThreadId " << _ThreadId );
425 MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self().p
426 << " pid " << _getpid() << " instanceName "
427 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
428 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
429 << dec << " _ThreadId " << _ThreadId );
431 bool RetVal = false ;
433 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
435 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
451 //=============================================================================
455 //=============================================================================
457 CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl()
460 if ( _ThreadId || _Executed )
465 if ( pthread_self() != _ThreadId )
467 if ( pthread_self().p != _ThreadId->p )
475 // Get Cpu in the appropriate thread with that object !...
476 theEngines_Component = this ;
478 Killer( _ThreadId ,SIGUSR1 ) ;
480 Killer( *_ThreadId ,SIGUSR11 ) ;
483 cpu = _ThreadCpuUsed ;
487 _ThreadCpuUsed = CpuUsed() ;
488 cpu = _ThreadCpuUsed ;
489 // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
490 // << _serviceName << " " << cpu << endl ;
495 cpu = _ThreadCpuUsed ;
496 // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
497 // << _serviceName << " " << cpu<< endl ;
502 // cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId "
503 // <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<<endl;
509 //=============================================================================
511 * C++ method: return Container Servant
513 //=============================================================================
515 Engines_Parallel_Container_i *Engines_Parallel_Component_i::GetContainerPtr()
517 return dynamic_cast<Engines_Parallel_Container_i*>(_poa->id_to_servant(*_contId)) ;
520 //=============================================================================
522 * C++ method: set study Id
523 * \param studyId 0 if instance is not associated to a study,
524 * >0 otherwise (== study id)
525 * \return true if the set of study Id is OK
526 * must be set once by Container, at instance creation,
527 * and cannot be changed after.
529 //=============================================================================
531 CORBA::Boolean Engines_Parallel_Component_i::setStudyId(CORBA::Long studyId)
533 ASSERT( studyId >= 0);
534 CORBA::Boolean ret = false;
535 if (_studyId < 0) // --- not yet initialized
541 if ( _studyId == studyId) ret = true;
545 //=============================================================================
547 * C++ method: return CORBA instance id, the id is set in derived class
548 * constructor, when instance is activated.
550 //=============================================================================
552 PortableServer::ObjectId * Engines_Parallel_Component_i::getId()
554 // MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()");
558 //=============================================================================
560 * C++ method: used by derived classes for supervision
562 //=============================================================================
564 void Engines_Parallel_Component_i::beginService(const char *serviceName)
567 MESSAGE(pthread_self() << "Send BeginService notification for " <<serviceName
568 << endl << "Parallel Component instance : " << _instanceName << endl << endl);
570 MESSAGE(pthread_self().p << "Send BeginService notification for " <<serviceName
571 << endl << "Parallel Component instance : " << _instanceName << endl << endl);
574 _ThreadId = pthread_self() ;
576 _ThreadId = new pthread_t;
577 _ThreadId->p = pthread_self().p ;
578 _ThreadId->x = pthread_self().x ;
581 _StartUsed = CpuUsed_impl() ;
584 _serviceName = serviceName ;
585 if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) )
587 perror("pthread_setcanceltype ") ;
590 if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) )
592 perror("pthread_setcancelstate ") ;
595 // MESSAGE(pthread_self() << " Return from BeginService for " << serviceName
596 // << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed
597 // << " _graphName " << _graphName << " _nodeName " << _nodeName );
599 // --- for supervisor : all strings given with setProperties
600 // are set in environment
601 bool overwrite = true;
602 map<std::string,CORBA::Any>::iterator it;
603 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++)
605 std::string cle((*it).first);
606 if ((*it).second.type()->kind() == CORBA::tk_string)
609 (*it).second >>= value;
610 // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
612 //int ret = setenv(cle.c_str(), value, overwrite);
613 setenv(cle.c_str(), value, overwrite);
615 //CCRT porting : setenv not defined in stdlib.h
619 // char* cast because 1st arg of linux putenv function
620 // is not a const char* !
621 //int ret=putenv((char *)s.c_str());
622 putenv((char *)s.c_str());
623 //End of CCRT porting
625 MESSAGE("--- setenv: "<<cle<<" = "<< value);
630 //=============================================================================
632 * C++ method: used by derived classes for supervision
634 //=============================================================================
636 void Engines_Parallel_Component_i::endService(const char *serviceName)
638 if ( !_CanceledThread )
639 _ThreadCpuUsed = CpuUsed_impl() ;
642 MESSAGE(pthread_self() << " Send EndService notification for " << serviceName
643 << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
644 << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
646 MESSAGE(pthread_self().p << " Send EndService notification for " << serviceName
647 << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
648 << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
653 //=============================================================================
655 * C++ method: -- CHECK IF USED --
657 //=============================================================================
659 char* Engines_Parallel_Component_i::graphName()
661 return CORBA::string_dup( _graphName.c_str() ) ;
664 //=============================================================================
666 * C++ method: -- CHECK IF USED --
668 //=============================================================================
670 char* Engines_Parallel_Component_i::nodeName()
672 return CORBA::string_dup( _nodeName.c_str() ) ;
675 //=============================================================================
677 * C++ method: used in Supervision (see kill_impl)
679 //=============================================================================
681 bool Engines_Parallel_Component_i::Killer( pthread_t ThreadId , int signum )
691 if ( pthread_cancel( ThreadId ) )
693 perror("Killer pthread_cancel error") ;
699 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
700 << " pthread_canceled") ;
702 MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
703 << " pthread_canceled") ;
709 if ( pthread_kill( ThreadId , signum ) == -1 )
711 perror("Killer pthread_kill error") ;
717 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
718 << " pthread_killed(" << signum << ")") ;
720 MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
721 << " pthread_killed(" << signum << ")") ;
729 //=============================================================================
733 //=============================================================================
737 if (theEngines_Component)
738 theEngines_Component->SetCurCpu();
741 //=============================================================================
745 //=============================================================================
747 void Engines_Parallel_Component_i::SetCurCpu()
749 _ThreadCpuUsed = CpuUsed() ;
750 // MESSAGE(pthread_self() <<
751 // " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ;
754 //=============================================================================
758 //=============================================================================
760 long Engines_Parallel_Component_i::CpuUsed()
764 struct rusage usage ;
765 if ( _ThreadId || _Executed )
767 if ( getrusage( RUSAGE_SELF , &usage ) == -1 )
769 perror("Engines_Parallel_Component_i::CpuUsed") ;
772 cpu = usage.ru_utime.tv_sec - _StartUsed ;
773 // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " "
774 // << _serviceName << usage.ru_utime.tv_sec << " - " << _StartUsed
775 // << " = " << cpu << endl ;
779 // cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId "
780 // << _ThreadId << " " << _serviceName<< " _StartUsed "
781 // << _StartUsed << endl ;
784 // NOT implementet yet
791 void CallCancelThread()
793 if ( theEngines_Component )
794 theEngines_Component->CancelThread() ;
797 //=============================================================================
801 //=============================================================================
803 void Engines_Parallel_Component_i::CancelThread()
805 _CanceledThread = true;
808 //=============================================================================
810 * C++ method: Send message to event channel
812 //=============================================================================
814 void Engines_Parallel_Component_i::sendMessage(const char *event_type,
817 _notifSupplier->Send(graphName(), nodeName(), event_type, message);
820 //=============================================================================
822 * C++ method: return standard library name built on component name
824 //=============================================================================
826 string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName)
834 //=============================================================================
836 * C++ method: DumpPython default implementation
838 //=============================================================================
840 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr theStudy,
841 CORBA::Boolean isPublished,
842 CORBA::Boolean& isValidScript)
844 const char* aScript = "def RebuildData(theStudy): pass";
845 char* aBuffer = new char[strlen(aScript)+1];
846 strcpy(aBuffer, aScript);
847 CORBA::Octet* anOctetBuf = (CORBA::Octet*)aBuffer;
848 int aBufferSize = strlen(aBuffer)+1;
849 Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1);
850 isValidScript = true;
851 return aStreamFile._retn();
855 Engines::Salome_file_ptr
856 Engines_Parallel_Component_i::setInputFileToService(const char* service_name,
857 const char* Salome_file_name)
859 // Try to find the service, if it doesn't exist, we add it.
860 _Service_file_map_it = _Input_Service_file_map.find(service_name);
861 if (_Service_file_map_it == _Input_Service_file_map.end()) {
862 _t_Salome_file_map * _map = new _t_Salome_file_map();
863 _Input_Service_file_map[service_name] = _map;
864 _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
865 _Proxy_Input_Service_file_map[service_name] = _proxy_map;
866 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
867 _IOR_Proxy_Input_Service_file_map[service_name] = _IOR_proxy_map;
869 _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
870 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
871 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Input_Service_file_map[service_name];
873 pthread_mutex_lock(deploy_mutex);
874 std::string proxy_ior;
876 // Try to find the Salome_file ...
877 _Salome_file_map_it = _map->find(Salome_file_name);
878 if (_Salome_file_map_it == _map->end()) {
880 // We create a new PaCO++ object.
881 // He has the same configuration than
884 // Firstly, we have to create the proxy object
885 // of the Salome_file and transmit his
886 // reference to the other nodes.
887 Engines::Parallel_Salome_file_proxy_impl * proxy = NULL;
888 if (getMyRank() == 0) {
889 proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
890 new paco_omni_fabrique());
891 proxy->copyGlobalContext(this);
892 PaCO::PacoTopology_t serveur_topo;
893 serveur_topo.total = getTotalNode();
894 proxy->setTopology(serveur_topo);
896 // We register the CORBA objet into the POA
897 CORBA::Object_ptr proxy_ref = proxy->_this();
899 // We send the reference to all the nodes...
900 Engines::Parallel_Component_var component_proxy =
901 Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
902 component_proxy->send_parallel_proxy_object(proxy_ref);
904 // Adding proxy into the map
905 (*_proxy_map)[Salome_file_name] = proxy;
908 this->wait_parallel_object_proxy();
911 proxy_ior = this->get_parallel_proxy_object();
912 (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
914 // We register each node of the parallel Salome_file object
916 for (int i = 0; i < getTotalNode(); i++) {
917 if (i == getMyRank()) {
918 Parallel_Salome_file_i * servant =
919 new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb),
922 servant->copyGlobalContext(this);
924 // We register the CORBA objet into the POA
925 servant->POA_PaCO::InterfaceParallel::_this();
927 // Register the servant
930 // Adding servant to the map
931 (*_map)[Salome_file_name] = servant;
934 _my_com->paco_barrier();
935 // start parallel object
936 if (getMyRank() == 0) {
938 _my_com->paco_barrier();
941 _my_com->paco_barrier();
943 // Parallel_Salome_file is created and deployed
948 pthread_mutex_unlock(deploy_mutex);
949 proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
950 CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
951 return Engines::Salome_file::_narrow(proxy_ref);
954 Engines::Salome_file_ptr
955 Engines_Parallel_Component_i::setOutputFileToService(const char* service_name,
956 const char* Salome_file_name)
958 // Try to find the service, if it doesn't exist, we add it.
959 _Service_file_map_it = _Output_Service_file_map.find(service_name);
960 if (_Service_file_map_it == _Output_Service_file_map.end()) {
961 _t_Salome_file_map * _map = new _t_Salome_file_map();
962 _Output_Service_file_map[service_name] = _map;
963 _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
964 _Proxy_Output_Service_file_map[service_name] = _proxy_map;
965 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
966 _IOR_Proxy_Output_Service_file_map[service_name] = _IOR_proxy_map;
968 _t_Salome_file_map * _map = _Output_Service_file_map[service_name];
969 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Output_Service_file_map[service_name];
970 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Output_Service_file_map[service_name];
972 pthread_mutex_lock(deploy_mutex);
973 std::string proxy_ior;
975 // Try to find the Salome_file ...
976 _Salome_file_map_it = _map->find(Salome_file_name);
977 Engines::Parallel_Salome_file_proxy_impl * proxy;
978 if (_Salome_file_map_it == _map->end()) {
980 // We create a new PaCO++ object.
981 // He has the same configuration than
984 // Firstly, we have to create the proxy object
985 // of the Salome_file and transmit his
986 // reference to the other nodes.
987 if (getMyRank() == 0) {
988 proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
989 new paco_omni_fabrique());
990 proxy->copyGlobalContext(this);
991 PaCO::PacoTopology_t serveur_topo;
992 serveur_topo.total = getTotalNode();
993 proxy->setTopology(serveur_topo);
995 // We register the CORBA objet into the POA
996 CORBA::Object_ptr proxy_ref = proxy->_this();
998 // We send the reference to all the nodes...
999 Engines::Parallel_Component_var component_proxy =
1000 Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
1001 component_proxy->send_parallel_proxy_object(proxy_ref);
1003 // Adding proxy into the map
1004 (*_proxy_map)[Salome_file_name] = proxy;
1007 this->wait_parallel_object_proxy();
1010 proxy_ior = this->get_parallel_proxy_object();
1011 (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
1013 // We register each node of the parallel Salome_file object
1015 for (int i = 0; i < getTotalNode(); i++) {
1016 if (i == getMyRank()) {
1017 Parallel_Salome_file_i * servant =
1018 new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb),
1021 servant->copyGlobalContext(this);
1023 // We register the CORBA objet into the POA
1024 servant->POA_PaCO::InterfaceParallel::_this();
1026 // Register the servant
1029 // Adding servant to the map
1030 (*_map)[Salome_file_name] = servant;
1033 _my_com->paco_barrier();
1034 // start parallel object
1035 if (getMyRank() == 0) {
1037 _my_com->paco_barrier();
1040 _my_com->paco_barrier();
1043 // Parallel_Salome_file is created and deployed
1047 pthread_mutex_unlock(deploy_mutex);
1048 proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
1049 CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
1050 return Engines::Salome_file::_narrow(proxy_ref);
1053 Engines::Salome_file_ptr
1054 Engines_Parallel_Component_i::getInputFileToService(const char* service_name,
1055 const char* Salome_file_name)
1057 // Try to find the service, if it doesn't exist, we throw an exception.
1058 _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1059 if (_Proxy_Service_file_map_it == _Proxy_Input_Service_file_map.end()) {
1060 SALOME::ExceptionStruct es;
1061 es.type = SALOME::INTERNAL_ERROR;
1062 es.text = "service doesn't have salome files";
1063 throw SALOME::SALOME_Exception(es);
1065 _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
1067 // Try to find the Salome_file ...
1068 _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1069 if (_Proxy_Salome_file_map_it == _map->end()) {
1070 SALOME::ExceptionStruct es;
1071 es.type = SALOME::INTERNAL_ERROR;
1072 es.text = "service doesn't have this Salome_file";
1073 throw SALOME::SALOME_Exception(es);
1076 // Client get the proxy object
1077 Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1078 return Sfile->_this();
1081 Engines::Salome_file_ptr
1082 Engines_Parallel_Component_i::getOutputFileToService(const char* service_name,
1083 const char* Salome_file_name)
1085 // Try to find the service, if it doesn't exist, we throw an exception.
1086 _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1087 if (_Proxy_Service_file_map_it == _Proxy_Output_Service_file_map.end()) {
1088 SALOME::ExceptionStruct es;
1089 es.type = SALOME::INTERNAL_ERROR;
1090 es.text = "service doesn't have salome files";
1091 throw SALOME::SALOME_Exception(es);
1093 _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1095 // Try to find the Salome_file ...
1096 _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1097 if (_Proxy_Salome_file_map_it == _map->end()) {
1098 SALOME::ExceptionStruct es;
1099 es.type = SALOME::INTERNAL_ERROR;
1100 es.text = "service doesn't have this Salome_file";
1101 throw SALOME::SALOME_Exception(es);
1104 // Client get the proxy object
1105 Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1106 return Sfile->_this();
1111 Engines_Parallel_Component_i::checkInputFilesToService(const char* service_name)
1113 // Try to find the service, if it doesn't exist, nothing to do.
1114 _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1115 if (_Proxy_Service_file_map_it != _Proxy_Input_Service_file_map.end()) {
1116 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
1117 _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
1118 _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
1120 for(;begin!=end;begin++) {
1121 Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1122 std::string file_port_name = begin->first;
1123 configureSalome_file(service_name, file_port_name, file);
1130 Engines_Parallel_Component_i::checkOutputFilesToService(const char* service_name)
1132 // Try to find the service, if it doesn't exist, nothing to do.
1133 _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1134 if (_Proxy_Service_file_map_it != _Proxy_Output_Service_file_map.end()) {
1135 _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1136 _t_Proxy_Salome_file_map::iterator begin = _map->begin();
1137 _t_Proxy_Salome_file_map::iterator end = _map->end();
1139 for(;begin!=end;begin++) {
1140 Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1141 std::string file_port_name = begin->first;
1142 configureSalome_file(service_name, file_port_name, file);
1149 //=============================================================================
1151 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1153 //=============================================================================
1155 Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
1156 _proxy = _orb->object_to_string(proxy_ref);
1159 //=============================================================================
1161 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1163 //=============================================================================
1165 Engines_Parallel_Component_i::wait_parallel_object_proxy() {
1166 char * proxy = NULL;
1167 proxy = get_parallel_proxy_object();
1168 while(proxy == NULL)
1171 proxy = get_parallel_proxy_object();
1175 //=============================================================================
1177 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1179 //=============================================================================
1181 Engines_Parallel_Component_i::get_parallel_proxy_object() {
1186 //=============================================================================
1188 * C++ method: used to configure the Salome_file into the runtime.
1189 * \param service_name name of the service that use this Salome_file
1190 * \param file_port_name name of the Salome_file
1191 * \param file Parallel Salome_file C++ object
1193 //=============================================================================
1195 Engines_Parallel_Component_i::configureSalome_file(std::string service_name,
1196 std::string file_port_name,
1197 Engines::Parallel_Salome_file_proxy_impl * file)
1199 // By default this method does nothing