1 // Copyright (C) 2007-2008 CEA/DEN, EDF R&D, OPEN CASCADE
3 // Copyright (C) 2003-2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
22 // SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel
23 // File : SALOME_ParallelComponent_i.cxx
24 // Author : André RIBES, EDF
25 // Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
27 #include "SALOME_ParallelComponent_i.hxx"
28 #include "SALOME_ParallelContainer_i.hxx"
36 #include "utilities.h"
40 #include <sys/resource.h>
43 #include <sys/timeb.h>
47 #include <paco_dummy.h>
48 #include <paco_omni.h>
52 extern bool _Sleeping ;
53 static Engines_Parallel_Component_i * theEngines_Component ;
55 bool Engines_Parallel_Component_i::_isMultiStudy = true;
56 bool Engines_Parallel_Component_i::_isMultiInstance = false;
58 //=============================================================================
60 * Default constructor, not for use
62 //=============================================================================
64 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank) :
65 InterfaceParallel_impl(orb,ior,rank),
66 Engines::Component_serv(orb,ior,rank),
67 Engines::Parallel_Component_serv(orb,ior,rank)
70 INFOS("Default Constructor...");
73 //=============================================================================
75 * Standard Constructor for generic Component, used in derived class
76 * Connection to Registry and Notification
77 * \param orb Object Request broker given by Container
78 * \parap poa Portable Object Adapter from Container (normally root_poa)
79 * \param contId container CORBA id inside the server
80 * \param instanceName unique instance name for this object (see Container_i)
81 * \param interfaceName component class name
82 * \param notif use of notification
84 //=============================================================================
86 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank,
87 PortableServer::POA_ptr poa,
88 PortableServer::ObjectId * contId,
89 const char *instanceName,
90 const char *interfaceName,
92 InterfaceParallel_impl(orb,ior,rank),
93 Engines::Component_serv(orb,ior,rank),
94 Engines::Parallel_Component_serv(orb,ior,rank),
95 _instanceName(instanceName),
96 _interfaceName(interfaceName),
97 _myConnexionToRegistry(0),
105 MESSAGE("Component constructor with instanceName "<< _instanceName);
106 //SCRUTE(pd_refCount);
107 _orb = CORBA::ORB::_duplicate(orb);
108 _poa = PortableServer::POA::_duplicate(poa);
110 CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior...
111 const CORBA::String_var the_ior = _orb->object_to_string(o);
112 _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession",
113 _instanceName.c_str());
115 _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
117 deploy_mutex = new pthread_mutex_t();
118 pthread_mutex_init(deploy_mutex, NULL);
120 //SCRUTE(pd_refCount);
123 //=============================================================================
125 * Destructor: call Container for decrement of instances count.
126 * When instances count falls to 0, the container tries to remove the
127 * component library (dlclose)
129 //=============================================================================
131 Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
133 MESSAGE("Component destructor");
134 Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
135 pthread_mutex_destroy(deploy_mutex);
142 //=============================================================================
144 * CORBA method: return name of the instance, unique in this Container
146 //=============================================================================
148 char* Engines_Parallel_Component_i::instanceName()
150 return CORBA::string_dup(_instanceName.c_str()) ;
153 //=============================================================================
155 * CORBA method: return name of the component class
157 //=============================================================================
159 char* Engines_Parallel_Component_i::interfaceName()
161 return CORBA::string_dup(_interfaceName.c_str()) ;
164 //=============================================================================
166 * CORBA method: Get study Id
167 * \return -1: not initialised (Internal Error)
168 * 0: multistudy component instance
169 * >0: study id associated to this instance
171 //=============================================================================
173 CORBA::Long Engines_Parallel_Component_i::getStudyId()
178 //=============================================================================
180 * CORBA method: Test if instance is alive and responds
182 //=============================================================================
184 void Engines_Parallel_Component_i::ping()
186 // MESSAGE("Engines_Parallel_Component_i::ping_c() pid "<< getpid() << " threadid " << pthread_self());
189 //=============================================================================
191 * CORBA method: Deactivate this instance. CORBA object is deactivated (do not
192 * respond any more to CORBA calls), the connection to Regsitry is removed
193 * (Registry informed of deactivation), internal server reference counter on
194 * the derived servant class is decremented, to allow destruction of the class
195 * (delete) by POA, when there are no more references.
196 * -- TO BE USED BY CONTAINER ONLY (Container housekeeping) --
198 //=============================================================================
200 void Engines_Parallel_Component_i::destroy()
202 MESSAGE("Engines_Parallel_Component_i::destroy()");
203 //SCRUTE(pd_refCount);
205 delete _notifSupplier;
208 delete _myConnexionToRegistry;
209 _myConnexionToRegistry = 0 ;
210 _poa->deactivate_object(*_id) ;
211 CORBA::release(_poa) ;
213 //SCRUTE(pd_refCount);
214 _thisObj->_remove_ref();
215 //SCRUTE(pd_refCount);
216 MESSAGE("Engines_Parallel_Component_i::destroyed") ;
219 //=============================================================================
221 * CORBA method: return CORBA reference of the Container
224 //=============================================================================
226 Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef()
228 MESSAGE("Engines_Parallel_Component_i::GetContainerRef");
229 CORBA::Object_ptr o = _poa->id_to_reference(*_contId) ;
230 return Engines::Container::_narrow(o);
233 //=============================================================================
236 * Gives a sequence of (key=string,value=any) to the component.
237 * Base class component stores the sequence in a map.
238 * The map is cleared before.
239 * This map is for use by derived classes.
240 * \param dico sequence of (key=string,value=any)
242 //=============================================================================
244 void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico)
247 for (CORBA::ULong i=0; i<dico.length(); i++)
249 std::string cle(dico[i].key);
250 _fieldsDict[cle] = dico[i].value;
254 //=============================================================================
257 * returns a previously stored map (key=string,value=any) as a sequence.
258 * (see setProperties)
260 //=============================================================================
262 Engines::FieldsDict* Engines_Parallel_Component_i::getProperties()
264 Engines::FieldsDict_var copie = new Engines::FieldsDict;
265 copie->length(_fieldsDict.size());
266 map<std::string,CORBA::Any>::iterator it;
268 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++)
270 std::string cle((*it).first);
271 copie[i].key = CORBA::string_dup(cle.c_str());
272 copie[i].value = _fieldsDict[cle];
274 return copie._retn();
277 //=============================================================================
279 * CORBA method: used by Supervision to give names to this instance
281 //=============================================================================
283 void Engines_Parallel_Component_i::Names( const char * graphName ,
284 const char * nodeName )
286 _graphName = graphName ;
287 _nodeName = nodeName ;
288 INFOS("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '"
289 << _nodeName << "' )");
292 //=============================================================================
294 * CORBA method: used in Supervision
296 //=============================================================================
298 bool Engines_Parallel_Component_i::Kill_impl()
300 // MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self()
301 // << " pid " << getpid() << " instanceName "
302 // << _instanceName.c_str() << " interface " << _interfaceName.c_str()
303 // << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
304 // << dec << " _ThreadId " << _ThreadId << " this " << hex << this
307 bool RetVal = false ;
309 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
311 RetVal = Killer( _ThreadId , 0 ) ;
312 _ThreadId = (pthread_t ) -1 ;
316 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
318 RetVal = Killer( *_ThreadId , 0 ) ;
319 _ThreadId = (pthread_t* ) 0 ;
326 //=============================================================================
328 * CORBA method: used in Supervision
330 //=============================================================================
332 bool Engines_Parallel_Component_i::Stop_impl()
334 MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self()
335 << " pid " << getpid() << " instanceName "
336 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
337 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
338 << dec << " _ThreadId " << _ThreadId );
341 bool RetVal = false ;
343 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
345 RetVal = Killer( _ThreadId , 0 ) ;
346 _ThreadId = (pthread_t ) -1 ;
349 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
351 RetVal = Killer( *_ThreadId , 0 ) ;
352 _ThreadId = (pthread_t* ) 0 ;
358 //=============================================================================
360 * CORBA method: used in Supervision
362 //=============================================================================
364 bool Engines_Parallel_Component_i::Suspend_impl()
366 MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self()
367 << " pid " << getpid() << " instanceName "
368 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
369 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
370 << dec << " _ThreadId " << _ThreadId );
372 bool RetVal = false ;
374 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
376 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
386 RetVal = Killer( _ThreadId ,SIGINT ) ;
388 RetVal = Killer( *_ThreadId ,SIGINT ) ;
390 //if ( RetVal ) _Sleeping = true;
397 //=============================================================================
399 * CORBA method: used in Supervision
401 //=============================================================================
403 bool Engines_Parallel_Component_i::Resume_impl()
405 MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self()
406 << " pid " << getpid() << " instanceName "
407 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
408 << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
409 << dec << " _ThreadId " << _ThreadId );
410 bool RetVal = false ;
412 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
414 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
430 //=============================================================================
434 //=============================================================================
436 CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl()
439 if ( _ThreadId || _Executed )
444 if ( pthread_self() != _ThreadId )
446 if ( pthread_self().p != _ThreadId->p )
454 // Get Cpu in the appropriate thread with that object !...
455 theEngines_Component = this ;
457 Killer( _ThreadId ,SIGUSR1 ) ;
459 Killer( *_ThreadId ,SIGUSR11 ) ;
462 cpu = _ThreadCpuUsed ;
466 _ThreadCpuUsed = CpuUsed() ;
467 cpu = _ThreadCpuUsed ;
468 // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
469 // << _serviceName << " " << cpu << endl ;
474 cpu = _ThreadCpuUsed ;
475 // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
476 // << _serviceName << " " << cpu<< endl ;
481 // cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId "
482 // <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<<endl;
488 //=============================================================================
490 * C++ method: return Container Servant
492 //=============================================================================
494 Engines_Parallel_Container_i *Engines_Parallel_Component_i::GetContainerPtr()
496 return dynamic_cast<Engines_Parallel_Container_i*>(_poa->id_to_servant(*_contId)) ;
499 //=============================================================================
501 * C++ method: set study Id
502 * \param studyId 0 if instance is not associated to a study,
503 * >0 otherwise (== study id)
504 * \return true if the set of study Id is OK
505 * must be set once by Container, at instance creation,
506 * and cannot be changed after.
508 //=============================================================================
510 CORBA::Boolean Engines_Parallel_Component_i::setStudyId(CORBA::Long studyId)
512 ASSERT( studyId >= 0);
513 CORBA::Boolean ret = false;
514 if (_studyId < 0) // --- not yet initialized
520 if ( _studyId == studyId) ret = true;
524 //=============================================================================
526 * C++ method: return CORBA instance id, the id is set in derived class
527 * constructor, when instance is activated.
529 //=============================================================================
531 PortableServer::ObjectId * Engines_Parallel_Component_i::getId()
533 // MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()");
537 //=============================================================================
539 * C++ method: used by derived classes for supervision
541 //=============================================================================
543 void Engines_Parallel_Component_i::beginService(const char *serviceName)
545 MESSAGE(pthread_self() << "Send BeginService notification for " <<serviceName
546 << endl << "Component instance : " << _instanceName << endl << endl);
548 _ThreadId = pthread_self() ;
550 _ThreadId = new pthread_t;
551 _ThreadId->p = pthread_self().p ;
552 _ThreadId->x = pthread_self().x ;
555 _StartUsed = CpuUsed_impl() ;
558 _serviceName = serviceName ;
559 if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) )
561 perror("pthread_setcanceltype ") ;
564 if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) )
566 perror("pthread_setcancelstate ") ;
569 // MESSAGE(pthread_self() << " Return from BeginService for " << serviceName
570 // << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed
571 // << " _graphName " << _graphName << " _nodeName " << _nodeName );
573 // --- for supervisor : all strings given with setProperties
574 // are set in environment
575 bool overwrite = true;
576 map<std::string,CORBA::Any>::iterator it;
577 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++)
579 std::string cle((*it).first);
580 if ((*it).second.type()->kind() == CORBA::tk_string)
583 (*it).second >>= value;
584 // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
586 //int ret = setenv(cle.c_str(), value, overwrite);
587 setenv(cle.c_str(), value, overwrite);
589 //CCRT porting : setenv not defined in stdlib.h
593 // char* cast because 1st arg of linux putenv function
594 // is not a const char* !
595 //int ret=putenv((char *)s.c_str());
596 putenv((char *)s.c_str());
597 //End of CCRT porting
599 MESSAGE("--- setenv: "<<cle<<" = "<< value);
604 //=============================================================================
606 * C++ method: used by derived classes for supervision
608 //=============================================================================
610 void Engines_Parallel_Component_i::endService(const char *serviceName)
612 _ThreadCpuUsed = CpuUsed_impl() ;
613 MESSAGE(pthread_self() << " Send EndService notification for " << serviceName
614 << endl << " Component instance : " << _instanceName << " StartUsed "
615 << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
619 //=============================================================================
621 * C++ method: -- CHECK IF USED --
623 //=============================================================================
625 char* Engines_Parallel_Component_i::graphName()
627 return CORBA::string_dup( _graphName.c_str() ) ;
630 //=============================================================================
632 * C++ method: -- CHECK IF USED --
634 //=============================================================================
636 char* Engines_Parallel_Component_i::nodeName()
638 return CORBA::string_dup( _nodeName.c_str() ) ;
641 //=============================================================================
643 * C++ method: used in Supervision (see kill_impl)
645 //=============================================================================
647 bool Engines_Parallel_Component_i::Killer( pthread_t ThreadId , int signum )
657 if ( pthread_cancel( ThreadId ) )
659 perror("Killer pthread_cancel error") ;
664 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
665 << " pthread_canceled") ;
670 if ( pthread_kill( ThreadId , signum ) == -1 )
672 perror("Killer pthread_kill error") ;
677 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
678 << " pthread_killed(" << signum << ")") ;
685 //=============================================================================
689 //=============================================================================
693 theEngines_Component->SetCurCpu() ;
696 //=============================================================================
700 //=============================================================================
702 void Engines_Parallel_Component_i::SetCurCpu()
704 _ThreadCpuUsed = CpuUsed() ;
705 // MESSAGE(pthread_self() <<
706 // " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ;
709 //=============================================================================
713 //=============================================================================
715 long Engines_Parallel_Component_i::CpuUsed()
719 struct rusage usage ;
720 if ( _ThreadId || _Executed )
722 if ( getrusage( RUSAGE_SELF , &usage ) == -1 )
724 perror("Engines_Parallel_Component_i::CpuUsed") ;
727 cpu = usage.ru_utime.tv_sec - _StartUsed ;
728 // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " "
729 // << _serviceName << usage.ru_utime.tv_sec << " - " << _StartUsed
730 // << " = " << cpu << endl ;
734 // cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId "
735 // << _ThreadId << " " << _serviceName<< " _StartUsed "
736 // << _StartUsed << endl ;
739 // NOT implementet yet
746 //=============================================================================
748 * C++ method: Send message to event channel
750 //=============================================================================
752 void Engines_Parallel_Component_i::sendMessage(const char *event_type,
755 _notifSupplier->Send(graphName(), nodeName(), event_type, message);
758 //=============================================================================
760 * C++ method: return standard library name built on component name
762 //=============================================================================
764 string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName)
772 //=============================================================================
774 * C++ method: DumpPython default implementation
776 //=============================================================================
778 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr theStudy,
779 CORBA::Boolean isPublished,
780 CORBA::Boolean& isValidScript)
782 const char* aScript = "def RebuildData(theStudy): pass";
783 char* aBuffer = new char[strlen(aScript)+1];
784 strcpy(aBuffer, aScript);
785 CORBA::Octet* anOctetBuf = (CORBA::Octet*)aBuffer;
786 int aBufferSize = strlen(aBuffer)+1;
787 Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1);
788 isValidScript = true;
789 return aStreamFile._retn();
793 Engines::Salome_file_ptr
794 Engines_Parallel_Component_i::setInputFileToService(const char* service_name,
795 const char* Salome_file_name)
797 // Try to find the service, if it doesn't exist, we add it.
798 _Service_file_map_it = _Input_Service_file_map.find(service_name);
799 if (_Service_file_map_it == _Input_Service_file_map.end()) {
800 _t_Salome_file_map * _map = new _t_Salome_file_map();
801 _Input_Service_file_map[service_name] = _map;
802 _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
803 _Proxy_Input_Service_file_map[service_name] = _proxy_map;
804 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
805 _IOR_Proxy_Input_Service_file_map[service_name] = _IOR_proxy_map;
807 _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
808 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
809 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Input_Service_file_map[service_name];
811 pthread_mutex_lock(deploy_mutex);
812 std::string proxy_ior;
814 // Try to find the Salome_file ...
815 _Salome_file_map_it = _map->find(Salome_file_name);
816 if (_Salome_file_map_it == _map->end()) {
818 // We create a new PaCO++ object.
819 // He has the same configuration than
822 // Firstly, we have to create the proxy object
823 // of the Salome_file and transmit his
824 // reference to the other nodes.
825 Engines::Parallel_Salome_file_proxy_impl * proxy = NULL;
826 if (getMyRank() == 0) {
827 proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
828 new paco_omni_fabrique());
829 proxy->copyGlobalContext(this);
830 PaCO::PacoTopology_t serveur_topo;
831 serveur_topo.total = getTotalNode();
832 proxy->setTopology(serveur_topo);
834 // We register the CORBA objet into the POA
835 CORBA::Object_ptr proxy_ref = proxy->_this();
837 // We send the reference to all the nodes...
838 Engines::Parallel_Component_var component_proxy =
839 Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
840 component_proxy->send_parallel_proxy_object(proxy_ref);
842 // Adding proxy into the map
843 (*_proxy_map)[Salome_file_name] = proxy;
846 this->wait_parallel_object_proxy();
849 proxy_ior = this->get_parallel_proxy_object();
850 (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
852 // We register each node of the parallel Salome_file object
854 for (int i = 0; i < getTotalNode(); i++) {
855 if (i == getMyRank()) {
856 Parallel_Salome_file_i * servant =
857 new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb),
860 servant->copyGlobalContext(this);
862 // We register the CORBA objet into the POA
863 servant->POA_PaCO::InterfaceParallel::_this();
865 // Register the servant
868 // Adding servant to the map
869 (*_map)[Salome_file_name] = servant;
872 _my_com->paco_barrier();
873 // start parallel object
874 if (getMyRank() == 0) {
876 _my_com->paco_barrier();
879 _my_com->paco_barrier();
881 // Parallel_Salome_file is created and deployed
886 pthread_mutex_unlock(deploy_mutex);
887 proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
888 CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
889 return Engines::Salome_file::_narrow(proxy_ref);
892 Engines::Salome_file_ptr
893 Engines_Parallel_Component_i::setOutputFileToService(const char* service_name,
894 const char* Salome_file_name)
896 // Try to find the service, if it doesn't exist, we add it.
897 _Service_file_map_it = _Output_Service_file_map.find(service_name);
898 if (_Service_file_map_it == _Output_Service_file_map.end()) {
899 _t_Salome_file_map * _map = new _t_Salome_file_map();
900 _Output_Service_file_map[service_name] = _map;
901 _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
902 _Proxy_Output_Service_file_map[service_name] = _proxy_map;
903 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
904 _IOR_Proxy_Output_Service_file_map[service_name] = _IOR_proxy_map;
906 _t_Salome_file_map * _map = _Output_Service_file_map[service_name];
907 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Output_Service_file_map[service_name];
908 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Output_Service_file_map[service_name];
910 pthread_mutex_lock(deploy_mutex);
911 std::string proxy_ior;
913 // Try to find the Salome_file ...
914 _Salome_file_map_it = _map->find(Salome_file_name);
915 Engines::Parallel_Salome_file_proxy_impl * proxy;
916 if (_Salome_file_map_it == _map->end()) {
918 // We create a new PaCO++ object.
919 // He has the same configuration than
922 // Firstly, we have to create the proxy object
923 // of the Salome_file and transmit his
924 // reference to the other nodes.
925 if (getMyRank() == 0) {
926 proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
927 new paco_omni_fabrique());
928 proxy->copyGlobalContext(this);
929 PaCO::PacoTopology_t serveur_topo;
930 serveur_topo.total = getTotalNode();
931 proxy->setTopology(serveur_topo);
933 // We register the CORBA objet into the POA
934 CORBA::Object_ptr proxy_ref = proxy->_this();
936 // We send the reference to all the nodes...
937 Engines::Parallel_Component_var component_proxy =
938 Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
939 component_proxy->send_parallel_proxy_object(proxy_ref);
941 // Adding proxy into the map
942 (*_proxy_map)[Salome_file_name] = proxy;
945 this->wait_parallel_object_proxy();
948 proxy_ior = this->get_parallel_proxy_object();
949 (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
951 // We register each node of the parallel Salome_file object
953 for (int i = 0; i < getTotalNode(); i++) {
954 if (i == getMyRank()) {
955 Parallel_Salome_file_i * servant =
956 new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb),
959 servant->copyGlobalContext(this);
961 // We register the CORBA objet into the POA
962 servant->POA_PaCO::InterfaceParallel::_this();
964 // Register the servant
967 // Adding servant to the map
968 (*_map)[Salome_file_name] = servant;
971 _my_com->paco_barrier();
972 // start parallel object
973 if (getMyRank() == 0) {
975 _my_com->paco_barrier();
978 _my_com->paco_barrier();
981 // Parallel_Salome_file is created and deployed
985 pthread_mutex_unlock(deploy_mutex);
986 proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
987 CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
988 return Engines::Salome_file::_narrow(proxy_ref);
991 Engines::Salome_file_ptr
992 Engines_Parallel_Component_i::getInputFileToService(const char* service_name,
993 const char* Salome_file_name)
995 // Try to find the service, if it doesn't exist, we throw an exception.
996 _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
997 if (_Proxy_Service_file_map_it == _Proxy_Input_Service_file_map.end()) {
998 SALOME::ExceptionStruct es;
999 es.type = SALOME::INTERNAL_ERROR;
1000 es.text = "service doesn't have salome files";
1001 throw SALOME::SALOME_Exception(es);
1003 _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
1005 // Try to find the Salome_file ...
1006 _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1007 if (_Proxy_Salome_file_map_it == _map->end()) {
1008 SALOME::ExceptionStruct es;
1009 es.type = SALOME::INTERNAL_ERROR;
1010 es.text = "service doesn't have this Salome_file";
1011 throw SALOME::SALOME_Exception(es);
1014 // Client get the proxy object
1015 Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1016 return Sfile->_this();
1019 Engines::Salome_file_ptr
1020 Engines_Parallel_Component_i::getOutputFileToService(const char* service_name,
1021 const char* Salome_file_name)
1023 // Try to find the service, if it doesn't exist, we throw an exception.
1024 _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1025 if (_Proxy_Service_file_map_it == _Proxy_Output_Service_file_map.end()) {
1026 SALOME::ExceptionStruct es;
1027 es.type = SALOME::INTERNAL_ERROR;
1028 es.text = "service doesn't have salome files";
1029 throw SALOME::SALOME_Exception(es);
1031 _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1033 // Try to find the Salome_file ...
1034 _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1035 if (_Proxy_Salome_file_map_it == _map->end()) {
1036 SALOME::ExceptionStruct es;
1037 es.type = SALOME::INTERNAL_ERROR;
1038 es.text = "service doesn't have this Salome_file";
1039 throw SALOME::SALOME_Exception(es);
1042 // Client get the proxy object
1043 Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1044 return Sfile->_this();
1049 Engines_Parallel_Component_i::checkInputFilesToService(const char* service_name)
1051 // Try to find the service, if it doesn't exist, nothing to do.
1052 _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1053 if (_Proxy_Service_file_map_it != _Proxy_Input_Service_file_map.end()) {
1054 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
1055 _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
1056 _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
1058 for(;begin!=end;begin++) {
1059 Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1060 std::string file_port_name = begin->first;
1061 configureSalome_file(service_name, file_port_name, file);
1068 Engines_Parallel_Component_i::checkOutputFilesToService(const char* service_name)
1070 // Try to find the service, if it doesn't exist, nothing to do.
1071 _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1072 if (_Proxy_Service_file_map_it != _Proxy_Output_Service_file_map.end()) {
1073 _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1074 _t_Proxy_Salome_file_map::iterator begin = _map->begin();
1075 _t_Proxy_Salome_file_map::iterator end = _map->end();
1077 for(;begin!=end;begin++) {
1078 Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1079 std::string file_port_name = begin->first;
1080 configureSalome_file(service_name, file_port_name, file);
1087 //=============================================================================
1089 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1091 //=============================================================================
1093 Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
1094 _proxy = _orb->object_to_string(proxy_ref);
1097 //=============================================================================
1099 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1101 //=============================================================================
1103 Engines_Parallel_Component_i::wait_parallel_object_proxy() {
1104 char * proxy = NULL;
1105 proxy = get_parallel_proxy_object();
1106 while(proxy == NULL)
1109 proxy = get_parallel_proxy_object();
1113 //=============================================================================
1115 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1117 //=============================================================================
1119 Engines_Parallel_Component_i::get_parallel_proxy_object() {
1124 //=============================================================================
1126 * C++ method: used to configure the Salome_file into the runtime.
1127 * \param service_name name of the service that use this Salome_file
1128 * \param file_port_name name of the Salome_file
1129 * \param file Parallel Salome_file C++ object
1131 //=============================================================================
1133 Engines_Parallel_Component_i::configureSalome_file(std::string service_name,
1134 std::string file_port_name,
1135 Engines::Parallel_Salome_file_proxy_impl * file)
1137 // By default this method does nothing