1 // Copyright (C) 2007-2016 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License, or (at your option) any later version.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
23 // SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel
24 // File : SALOME_ParallelComponent_i.cxx
25 // Author : Andr� RIBES, EDF
26 // Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
28 #include "SALOME_ParallelComponent_i.hxx"
29 #include "SALOME_ParallelContainer_i.hxx"
37 #include "utilities.h"
38 #include "Basics_Utils.hxx"
42 #include <sys/resource.h>
45 #include <sys/timeb.h>
49 #include <paco_dummy.h>
50 #include <paco_omni.h>
53 extern bool _Sleeping ;
54 static Engines_Parallel_Component_i * theEngines_Component ;
56 bool Engines_Parallel_Component_i::_isMultiStudy = false;
57 bool Engines_Parallel_Component_i::_isMultiInstance = false;
59 //=============================================================================
61 * Standard Constructor for generic Component, used in derived class
62 * Connection to Registry and Notification
63 * \param orb Object Request broker given by Container
64 * \parap poa Portable Object Adapter from Container (normally root_poa)
65 * \param contId container CORBA id inside the server
66 * \param instanceName unique instance name for this object (see Container_i)
67 * \param interfaceName component class name
68 * \param notif use of notification
70 //=============================================================================
72 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank,
73 PortableServer::POA_ptr poa,
74 PortableServer::ObjectId * contId,
75 const char *instanceName,
76 const char *interfaceName,
79 InterfaceParallel_impl(orb,ior,rank),
80 Engines::EngineComponent_serv(orb,ior,rank),
81 Engines::EngineComponent_base_serv(orb,ior,rank),
82 Engines::Parallel_Component_serv(orb,ior,rank),
83 Engines::Parallel_Component_base_serv(orb,ior,rank),
84 _instanceName(instanceName),
85 _interfaceName(interfaceName),
87 _myConnexionToRegistry(0),
94 _CanceledThread(false)
96 MESSAGE("Parallel Component constructor with instanceName "<< _instanceName);
97 //SCRUTE(pd_refCount);
98 _orb = CORBA::ORB::_duplicate(orb);
99 _poa = PortableServer::POA::_duplicate(poa);
101 CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior...
105 CORBA::String_var the_ior = _orb->object_to_string(o);
106 _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession",
107 _instanceName.c_str());
109 _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
111 deploy_mutex = new pthread_mutex_t();
112 pthread_mutex_init(deploy_mutex, NULL);
114 //SCRUTE(pd_refCount);
117 //=============================================================================
119 * Destructor: call Container for decrement of instances count.
120 * When instances count falls to 0, the container tries to remove the
121 * component library (dlclose)
123 //=============================================================================
125 Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
127 MESSAGE("Parallel Component destructor");
128 Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
129 if(_myConnexionToRegistry)delete _myConnexionToRegistry;
130 if(_notifSupplier)delete _notifSupplier;
134 pthread_mutex_destroy(deploy_mutex);
140 //=============================================================================
142 * CORBA method: return name of the instance, unique in this Container
144 //=============================================================================
146 char* Engines_Parallel_Component_i::instanceName()
148 return CORBA::string_dup(_instanceName.c_str()) ;
151 //=============================================================================
153 * CORBA method: return name of the component class
155 //=============================================================================
157 char* Engines_Parallel_Component_i::interfaceName()
159 return CORBA::string_dup(_interfaceName.c_str()) ;
162 //=============================================================================
164 * CORBA method: Test if instance is alive and responds
166 //=============================================================================
168 void Engines_Parallel_Component_i::ping()
171 MESSAGE("Engines_Parallel_Component_i::ping() pid "<< getpid() << " threadid "
174 MESSAGE("Engines_Parallel_Component_i::ping() pid "<< _getpid()<< " threadid "
175 << pthread_self().p );
179 //=============================================================================
181 * CORBA method: Deactivate this instance. CORBA object is deactivated (do not
182 * respond any more to CORBA calls), the connection to Regsitry is removed
183 * (Registry informed of deactivation), internal server reference counter on
184 * the derived servant class is decremented, to allow destruction of the class
185 * (delete) by POA, when there are no more references.
186 * -- TO BE USED BY CONTAINER ONLY (Container housekeeping) --
188 //=============================================================================
190 void Engines_Parallel_Component_i::destroy()
192 MESSAGE("Engines_Parallel_Component_i::destroy()");
193 MESSAGE("Object Instance will be deleted when Shutdown of the container will be called");
201 //=============================================================================
203 * CORBA method: return CORBA reference of the Container
206 //=============================================================================
208 Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef()
210 MESSAGE("Engines_Parallel_Component_i::GetContainerRef");
211 CORBA::Object_var o = _poa->id_to_reference(*_contId) ;
212 return Engines::Container::_narrow(o);
215 //=============================================================================
218 * Gives a sequence of (key=string,value=any) to the component.
219 * Base class component stores the sequence in a map.
220 * The map is cleared before.
221 * This map is for use by derived classes.
222 * \param dico sequence of (key=string,value=any)
224 //=============================================================================
226 void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico)
229 for (CORBA::ULong i=0; i<dico.length(); i++)
231 std::string cle(dico[i].key);
232 _fieldsDict[cle] = dico[i].value;
236 //=============================================================================
239 * returns a previously stored map (key=string,value=any) as a sequence.
240 * (see setProperties)
242 //=============================================================================
244 Engines::FieldsDict* Engines_Parallel_Component_i::getProperties()
246 Engines::FieldsDict_var copie = new Engines::FieldsDict;
247 copie->length(_fieldsDict.size());
248 std::map<std::string,CORBA::Any>::iterator it;
250 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++)
252 std::string cle((*it).first);
253 copie[i].key = CORBA::string_dup(cle.c_str());
254 copie[i].value = _fieldsDict[cle];
256 return copie._retn();
259 //=============================================================================
261 * CORBA method: used by Supervision to give names to this instance
263 //=============================================================================
265 void Engines_Parallel_Component_i::Names( const char * graphName ,
266 const char * nodeName )
268 _graphName = graphName;
269 _nodeName = nodeName;
270 MESSAGE("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '"
271 << _nodeName << "' )");
274 //=============================================================================
276 * CORBA method: used in Supervision
278 //=============================================================================
280 bool Engines_Parallel_Component_i::Kill_impl()
282 // MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self()
283 // << " pid " << getpid() << " instanceName "
284 // << _instanceName.c_str() << " interface " << _interfaceName.c_str()
285 // << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
286 // << dec << " _ThreadId " << _ThreadId << " this " << hex << this
289 bool RetVal = false ;
291 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
293 RetVal = Killer( _ThreadId , SIGUSR2 ) ;
294 _ThreadId = (pthread_t ) -1 ;
298 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
300 RetVal = Killer( *_ThreadId , 0 ) ;
301 _ThreadId = (pthread_t* ) 0 ;
308 //=============================================================================
310 * CORBA method: used in Supervision
312 //=============================================================================
314 bool Engines_Parallel_Component_i::Stop_impl()
317 MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self()
318 << " pid " << getpid() << " instanceName "
319 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
320 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
321 << dec << " _ThreadId " << _ThreadId );
323 MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self().p
324 << " pid " << _getpid() << " instanceName "
325 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
326 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
327 << dec << " _ThreadId " << _ThreadId );
331 bool RetVal = false ;
333 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
335 RetVal = Killer( _ThreadId , 0 ) ;
336 _ThreadId = (pthread_t ) -1 ;
339 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
341 RetVal = Killer( *_ThreadId , 0 ) ;
342 _ThreadId = (pthread_t* ) 0 ;
348 //=============================================================================
350 * CORBA method: used in Supervision
352 //=============================================================================
354 bool Engines_Parallel_Component_i::Suspend_impl()
357 MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self()
358 << " pid " << getpid() << " instanceName "
359 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
360 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
361 << dec << " _ThreadId " << _ThreadId );
363 MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self().p
364 << " pid " << _getpid() << " instanceName "
365 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
366 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
367 << dec << " _ThreadId " << _ThreadId );
370 bool RetVal = false ;
372 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
374 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
384 RetVal = Killer( _ThreadId ,SIGINT ) ;
386 RetVal = Killer( *_ThreadId ,SIGINT ) ;
388 //if ( RetVal ) _Sleeping = true;
395 //=============================================================================
397 * CORBA method: used in Supervision
399 //=============================================================================
401 bool Engines_Parallel_Component_i::Resume_impl()
404 MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self()
405 << " pid " << getpid() << " instanceName "
406 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
407 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
408 << dec << " _ThreadId " << _ThreadId );
410 MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self().p
411 << " pid " << _getpid() << " instanceName "
412 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
413 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
414 << dec << " _ThreadId " << _ThreadId );
416 bool RetVal = false ;
418 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
420 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
436 //=============================================================================
440 //=============================================================================
442 CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl()
445 if ( _ThreadId || _Executed )
450 if ( pthread_self() != _ThreadId )
452 if ( pthread_self().p != _ThreadId->p )
460 // Get Cpu in the appropriate thread with that object !...
461 theEngines_Component = this ;
463 Killer( _ThreadId ,SIGUSR1 ) ;
465 Killer( *_ThreadId ,SIGUSR11 ) ;
468 cpu = _ThreadCpuUsed ;
472 _ThreadCpuUsed = CpuUsed() ;
473 cpu = _ThreadCpuUsed ;
474 // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
475 // << _serviceName << " " << cpu << std::endl ;
480 cpu = _ThreadCpuUsed ;
481 // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
482 // << _serviceName << " " << cpu<< std::endl ;
487 // std::cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId "
488 // <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<<std::endl;
494 //=============================================================================
496 * C++ method: return Container Servant
498 //=============================================================================
500 Engines_Parallel_Container_i *Engines_Parallel_Component_i::GetContainerPtr()
502 return dynamic_cast<Engines_Parallel_Container_i*>(_poa->id_to_servant(*_contId)) ;
505 //=============================================================================
507 * C++ method: return CORBA instance id, the id is set in derived class
508 * constructor, when instance is activated.
510 //=============================================================================
512 PortableServer::ObjectId * Engines_Parallel_Component_i::getId()
514 // MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()");
518 //=============================================================================
520 * C++ method: used by derived classes for supervision
522 //=============================================================================
524 void Engines_Parallel_Component_i::beginService(const char *serviceName)
527 MESSAGE(pthread_self() << "Send BeginService notification for " <<serviceName
528 << endl << "Parallel Component instance : " << _instanceName << endl << endl);
530 MESSAGE(pthread_self().p << "Send BeginService notification for " <<serviceName
531 << endl << "Parallel Component instance : " << _instanceName << endl << endl);
534 _ThreadId = pthread_self() ;
536 _ThreadId = new pthread_t;
537 _ThreadId->p = pthread_self().p ;
538 _ThreadId->x = pthread_self().x ;
541 _StartUsed = CpuUsed_impl() ;
544 _serviceName = serviceName ;
545 if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) )
547 perror("pthread_setcanceltype ") ;
550 if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) )
552 perror("pthread_setcancelstate ") ;
555 // MESSAGE(pthread_self() << " Return from BeginService for " << serviceName
556 // << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed
557 // << " _graphName " << _graphName << " _nodeName " << _nodeName );
559 // --- for supervisor : all strings given with setProperties
560 // are set in environment
561 bool overwrite = true;
562 std::map<std::string,CORBA::Any>::iterator it;
563 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++)
565 std::string cle((*it).first);
566 if ((*it).second.type()->kind() == CORBA::tk_string)
569 (*it).second >>= value;
570 // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
572 //int ret = setenv(cle.c_str(), value, overwrite);
573 setenv(cle.c_str(), value, overwrite);
575 //CCRT porting : setenv not defined in stdlib.h
579 // char* cast because 1st arg of linux putenv function
580 // is not a const char* !
581 //int ret=putenv((char *)s.c_str());
582 putenv((char *)s.c_str());
583 //End of CCRT porting
585 MESSAGE("--- setenv: "<<cle<<" = "<< value);
590 //=============================================================================
592 * C++ method: used by derived classes for supervision
594 //=============================================================================
596 void Engines_Parallel_Component_i::endService(const char *serviceName)
598 if ( !_CanceledThread )
599 _ThreadCpuUsed = CpuUsed_impl() ;
602 MESSAGE(pthread_self() << " Send EndService notification for " << serviceName
603 << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
604 << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
606 MESSAGE(pthread_self().p << " Send EndService notification for " << serviceName
607 << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
608 << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
613 //=============================================================================
615 * C++ method: -- CHECK IF USED --
617 //=============================================================================
619 char* Engines_Parallel_Component_i::graphName()
621 return CORBA::string_dup( _graphName.c_str() ) ;
624 //=============================================================================
626 * C++ method: -- CHECK IF USED --
628 //=============================================================================
630 char* Engines_Parallel_Component_i::nodeName()
632 return CORBA::string_dup( _nodeName.c_str() ) ;
635 //=============================================================================
637 * C++ method: used in Supervision (see kill_impl)
639 //=============================================================================
641 bool Engines_Parallel_Component_i::Killer( pthread_t ThreadId , int signum )
651 if ( pthread_cancel( ThreadId ) )
653 perror("Killer pthread_cancel error") ;
659 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
660 << " pthread_canceled") ;
662 MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
663 << " pthread_canceled") ;
669 if ( pthread_kill( ThreadId , signum ) == -1 )
671 perror("Killer pthread_kill error") ;
677 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
678 << " pthread_killed(" << signum << ")") ;
680 MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
681 << " pthread_killed(" << signum << ")") ;
689 //=============================================================================
693 //=============================================================================
697 if (theEngines_Component)
698 theEngines_Component->SetCurCpu();
701 //=============================================================================
705 //=============================================================================
707 void Engines_Parallel_Component_i::SetCurCpu()
709 _ThreadCpuUsed = CpuUsed() ;
710 // MESSAGE(pthread_self() <<
711 // " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ;
714 //=============================================================================
718 //=============================================================================
720 long Engines_Parallel_Component_i::CpuUsed()
724 struct rusage usage ;
725 if ( _ThreadId || _Executed )
727 if ( getrusage( RUSAGE_SELF , &usage ) == -1 )
729 perror("Engines_Parallel_Component_i::CpuUsed") ;
732 cpu = usage.ru_utime.tv_sec - _StartUsed ;
733 // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " "
734 // << _serviceName << usage.ru_utime.tv_sec << " - " << _StartUsed
735 // << " = " << cpu << std::endl ;
739 // std::cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId "
740 // << _ThreadId << " " << _serviceName<< " _StartUsed "
741 // << _StartUsed << std::endl ;
744 // NOT implementet yet
751 void CallCancelThread()
753 if ( theEngines_Component )
754 theEngines_Component->CancelThread() ;
757 //=============================================================================
761 //=============================================================================
763 void Engines_Parallel_Component_i::CancelThread()
765 _CanceledThread = true;
768 //=============================================================================
770 * C++ method: Send message to event channel
772 //=============================================================================
774 void Engines_Parallel_Component_i::sendMessage(const char *event_type,
777 _notifSupplier->Send(graphName(), nodeName(), event_type, message);
780 //=============================================================================
782 * C++ method: return standard library name built on component name
784 //=============================================================================
786 std::string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName)
789 std::string ret="lib";
793 std::string ret=componentName;
799 //=============================================================================
801 * C++ method: DumpPython default implementation
803 //=============================================================================
805 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr theStudy,
806 CORBA::Boolean isPublished,
807 CORBA::Boolean isMultiFile,
808 CORBA::Boolean& isValidScript)
810 const char* aScript = isMultiFile ? "def RebuildData(theStudy): pass" : "";
811 char* aBuffer = new char[strlen(aScript)+1];
812 strcpy(aBuffer, aScript);
813 CORBA::Octet* anOctetBuf = (CORBA::Octet*)aBuffer;
814 int aBufferSize = strlen(aBuffer)+1;
815 Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1);
816 isValidScript = true;
817 return aStreamFile._retn();
821 Engines::Salome_file_ptr
822 Engines_Parallel_Component_i::setInputFileToService(const char* service_name,
823 const char* Salome_file_name)
825 // Try to find the service, if it doesn't exist, we add it.
826 _Service_file_map_it = _Input_Service_file_map.find(service_name);
827 if (_Service_file_map_it == _Input_Service_file_map.end()) {
828 _t_Salome_file_map * _map = new _t_Salome_file_map();
829 _Input_Service_file_map[service_name] = _map;
830 _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
831 _Proxy_Input_Service_file_map[service_name] = _proxy_map;
832 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
833 _IOR_Proxy_Input_Service_file_map[service_name] = _IOR_proxy_map;
835 _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
836 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
837 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Input_Service_file_map[service_name];
839 pthread_mutex_lock(deploy_mutex);
840 std::string proxy_ior;
842 // Try to find the Salome_file ...
843 _Salome_file_map_it = _map->find(Salome_file_name);
844 if (_Salome_file_map_it == _map->end()) {
846 // We create a new PaCO++ object.
847 // He has the same configuration than
850 // Firstly, we have to create the proxy object
851 // of the Salome_file and transmit his
852 // reference to the other nodes.
853 Engines::Parallel_Salome_file_proxy_impl * proxy = NULL;
854 if (getMyRank() == 0) {
855 proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
856 new paco_omni_fabrique());
857 proxy->copyGlobalContext(this);
858 PaCO::PacoTopology_t serveur_topo;
859 serveur_topo.total = getTotalNode();
860 proxy->setTopology(serveur_topo);
862 // We register the CORBA objet into the POA
863 CORBA::Object_ptr proxy_ref = proxy->_this();
865 // We send the reference to all the nodes...
866 Engines::Parallel_Component_var component_proxy =
867 Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
868 component_proxy->send_parallel_proxy_object(proxy_ref);
870 // Adding proxy into the map
871 (*_proxy_map)[Salome_file_name] = proxy;
874 this->wait_parallel_object_proxy();
877 proxy_ior = this->get_parallel_proxy_object();
878 (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
880 // We register each node of the parallel Salome_file object
882 for (int i = 0; i < getTotalNode(); i++) {
883 if (i == getMyRank()) {
884 Parallel_Salome_file_i * servant =
885 new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb),
888 servant->copyGlobalContext(this);
890 // We register the CORBA objet into the POA
891 servant->POA_PaCO::InterfaceParallel::_this();
893 // Register the servant
896 // Adding servant to the map
897 (*_map)[Salome_file_name] = servant;
900 _my_com->paco_barrier();
901 // start parallel object
902 if (getMyRank() == 0) {
904 _my_com->paco_barrier();
907 _my_com->paco_barrier();
909 // Parallel_Salome_file is created and deployed
914 pthread_mutex_unlock(deploy_mutex);
915 proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
916 CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
917 return Engines::Salome_file::_narrow(proxy_ref);
920 Engines::Salome_file_ptr
921 Engines_Parallel_Component_i::setOutputFileToService(const char* service_name,
922 const char* Salome_file_name)
924 // Try to find the service, if it doesn't exist, we add it.
925 _Service_file_map_it = _Output_Service_file_map.find(service_name);
926 if (_Service_file_map_it == _Output_Service_file_map.end()) {
927 _t_Salome_file_map * _map = new _t_Salome_file_map();
928 _Output_Service_file_map[service_name] = _map;
929 _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
930 _Proxy_Output_Service_file_map[service_name] = _proxy_map;
931 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
932 _IOR_Proxy_Output_Service_file_map[service_name] = _IOR_proxy_map;
934 _t_Salome_file_map * _map = _Output_Service_file_map[service_name];
935 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Output_Service_file_map[service_name];
936 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Output_Service_file_map[service_name];
938 pthread_mutex_lock(deploy_mutex);
939 std::string proxy_ior;
941 // Try to find the Salome_file ...
942 _Salome_file_map_it = _map->find(Salome_file_name);
943 Engines::Parallel_Salome_file_proxy_impl * proxy;
944 if (_Salome_file_map_it == _map->end()) {
946 // We create a new PaCO++ object.
947 // He has the same configuration than
950 // Firstly, we have to create the proxy object
951 // of the Salome_file and transmit his
952 // reference to the other nodes.
953 if (getMyRank() == 0) {
954 proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
955 new paco_omni_fabrique());
956 proxy->copyGlobalContext(this);
957 PaCO::PacoTopology_t serveur_topo;
958 serveur_topo.total = getTotalNode();
959 proxy->setTopology(serveur_topo);
961 // We register the CORBA objet into the POA
962 CORBA::Object_ptr proxy_ref = proxy->_this();
964 // We send the reference to all the nodes...
965 Engines::Parallel_Component_var component_proxy =
966 Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
967 component_proxy->send_parallel_proxy_object(proxy_ref);
969 // Adding proxy into the map
970 (*_proxy_map)[Salome_file_name] = proxy;
973 this->wait_parallel_object_proxy();
976 proxy_ior = this->get_parallel_proxy_object();
977 (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
979 // We register each node of the parallel Salome_file object
981 for (int i = 0; i < getTotalNode(); i++) {
982 if (i == getMyRank()) {
983 Parallel_Salome_file_i * servant =
984 new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb),
987 servant->copyGlobalContext(this);
989 // We register the CORBA objet into the POA
990 servant->POA_PaCO::InterfaceParallel::_this();
992 // Register the servant
995 // Adding servant to the map
996 (*_map)[Salome_file_name] = servant;
999 _my_com->paco_barrier();
1000 // start parallel object
1001 if (getMyRank() == 0) {
1003 _my_com->paco_barrier();
1006 _my_com->paco_barrier();
1009 // Parallel_Salome_file is created and deployed
1013 pthread_mutex_unlock(deploy_mutex);
1014 proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
1015 CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
1016 return Engines::Salome_file::_narrow(proxy_ref);
1019 Engines::Salome_file_ptr
1020 Engines_Parallel_Component_i::getInputFileToService(const char* service_name,
1021 const char* Salome_file_name)
1023 // Try to find the service, if it doesn't exist, we throw an exception.
1024 _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1025 if (_Proxy_Service_file_map_it == _Proxy_Input_Service_file_map.end()) {
1026 SALOME::ExceptionStruct es;
1027 es.type = SALOME::INTERNAL_ERROR;
1028 es.text = "service doesn't have salome files";
1029 throw SALOME::SALOME_Exception(es);
1031 _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
1033 // Try to find the Salome_file ...
1034 _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1035 if (_Proxy_Salome_file_map_it == _map->end()) {
1036 SALOME::ExceptionStruct es;
1037 es.type = SALOME::INTERNAL_ERROR;
1038 es.text = "service doesn't have this Salome_file";
1039 throw SALOME::SALOME_Exception(es);
1042 // Client get the proxy object
1043 Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1044 return Sfile->_this();
1047 Engines::Salome_file_ptr
1048 Engines_Parallel_Component_i::getOutputFileToService(const char* service_name,
1049 const char* Salome_file_name)
1051 // Try to find the service, if it doesn't exist, we throw an exception.
1052 _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1053 if (_Proxy_Service_file_map_it == _Proxy_Output_Service_file_map.end()) {
1054 SALOME::ExceptionStruct es;
1055 es.type = SALOME::INTERNAL_ERROR;
1056 es.text = "service doesn't have salome files";
1057 throw SALOME::SALOME_Exception(es);
1059 _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1061 // Try to find the Salome_file ...
1062 _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1063 if (_Proxy_Salome_file_map_it == _map->end()) {
1064 SALOME::ExceptionStruct es;
1065 es.type = SALOME::INTERNAL_ERROR;
1066 es.text = "service doesn't have this Salome_file";
1067 throw SALOME::SALOME_Exception(es);
1070 // Client get the proxy object
1071 Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1072 return Sfile->_this();
1077 Engines_Parallel_Component_i::checkInputFilesToService(const char* service_name)
1079 // Try to find the service, if it doesn't exist, nothing to do.
1080 _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1081 if (_Proxy_Service_file_map_it != _Proxy_Input_Service_file_map.end()) {
1082 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
1083 _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
1084 _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
1086 for(;begin!=end;begin++) {
1087 Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1088 std::string file_port_name = begin->first;
1089 configureSalome_file(service_name, file_port_name, file);
1096 Engines_Parallel_Component_i::checkOutputFilesToService(const char* service_name)
1098 // Try to find the service, if it doesn't exist, nothing to do.
1099 _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1100 if (_Proxy_Service_file_map_it != _Proxy_Output_Service_file_map.end()) {
1101 _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1102 _t_Proxy_Salome_file_map::iterator begin = _map->begin();
1103 _t_Proxy_Salome_file_map::iterator end = _map->end();
1105 for(;begin!=end;begin++) {
1106 Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1107 std::string file_port_name = begin->first;
1108 configureSalome_file(service_name, file_port_name, file);
1115 //=============================================================================
1117 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1119 //=============================================================================
1121 Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
1122 _proxy = _orb->object_to_string(proxy_ref);
1125 //=============================================================================
1127 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1129 //=============================================================================
1131 Engines_Parallel_Component_i::wait_parallel_object_proxy() {
1132 char * proxy = NULL;
1133 proxy = get_parallel_proxy_object();
1134 while(proxy == NULL)
1137 proxy = get_parallel_proxy_object();
1141 //=============================================================================
1143 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1145 //=============================================================================
1147 Engines_Parallel_Component_i::get_parallel_proxy_object() {
1152 //=============================================================================
1154 * C++ method: used to configure the Salome_file into the runtime.
1155 * \param service_name name of the service that use this Salome_file
1156 * \param file_port_name name of the Salome_file
1157 * \param file Parallel Salome_file C++ object
1159 //=============================================================================
1161 Engines_Parallel_Component_i::configureSalome_file(std::string service_name,
1162 std::string file_port_name,
1163 Engines::Parallel_Salome_file_proxy_impl * file)
1165 // By default this method does nothing