1 // SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel
3 // Copyright (C) 2007 OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
4 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
6 // This library is free software; you can redistribute it and/or
7 // modify it under the terms of the GNU Lesser General Public
8 // License as published by the Free Software Foundation; either
9 // version 2.1 of the License.
11 // This library is distributed in the hope that it will be useful,
12 // but WITHOUT ANY WARRANTY; without even the implied warranty of
13 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 // Lesser General Public License for more details.
16 // You should have received a copy of the GNU Lesser General Public
17 // License along with this library; if not, write to the Free Software
18 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 // See http://www.opencascade.org/SALOME/ or email : webmaster.salome@opencascade.org
22 // File : SALOME_ParallelComponent_i.cxx
23 // Author : André RIBES, EDF
24 // Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
26 #include "SALOME_ParallelComponent_i.hxx"
27 #include "SALOME_ParallelContainer_i.hxx"
35 #include "utilities.h"
39 #include <sys/resource.h>
42 #include <sys/timeb.h>
46 #include <paco_dummy.h>
47 #include <paco_omni.h>
51 extern bool _Sleeping ;
52 static Engines_Parallel_Component_i * theEngines_Component ;
54 bool Engines_Parallel_Component_i::_isMultiStudy = true;
55 bool Engines_Parallel_Component_i::_isMultiInstance = false;
57 //=============================================================================
59 * Default constructor, not for use
61 //=============================================================================
63 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank) :
64 InterfaceParallel_impl(orb,ior,rank),
65 Engines::Component_serv(orb,ior,rank),
66 Engines::Parallel_Component_serv(orb,ior,rank)
69 INFOS("Default Constructor...");
72 //=============================================================================
74 * Standard Constructor for generic Component, used in derived class
75 * Connection to Registry and Notification
76 * \param orb Object Request broker given by Container
77 * \parap poa Portable Object Adapter from Container (normally root_poa)
78 * \param contId container CORBA id inside the server
79 * \param instanceName unique instance name for this object (see Container_i)
80 * \param interfaceName component class name
81 * \param notif use of notification
83 //=============================================================================
85 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank,
86 PortableServer::POA_ptr poa,
87 PortableServer::ObjectId * contId,
88 const char *instanceName,
89 const char *interfaceName,
91 InterfaceParallel_impl(orb,ior,rank),
92 Engines::Component_serv(orb,ior,rank),
93 Engines::Parallel_Component_serv(orb,ior,rank),
94 _instanceName(instanceName),
95 _interfaceName(interfaceName),
96 _myConnexionToRegistry(0),
104 MESSAGE("Component constructor with instanceName "<< _instanceName);
105 //SCRUTE(pd_refCount);
106 _orb = CORBA::ORB::_duplicate(orb);
107 _poa = PortableServer::POA::_duplicate(poa);
109 CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior...
110 const CORBA::String_var the_ior = _orb->object_to_string(o);
111 _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession",
112 _instanceName.c_str());
114 _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
116 deploy_mutex = new pthread_mutex_t();
117 pthread_mutex_init(deploy_mutex, NULL);
119 //SCRUTE(pd_refCount);
122 //=============================================================================
124 * Destructor: call Container for decrement of instances count.
125 * When instances count falls to 0, the container tries to remove the
126 * component library (dlclose)
128 //=============================================================================
130 Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
132 MESSAGE("Component destructor");
133 Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
134 pthread_mutex_destroy(deploy_mutex);
141 //=============================================================================
143 * CORBA method: return name of the instance, unique in this Container
145 //=============================================================================
147 char* Engines_Parallel_Component_i::instanceName()
149 return CORBA::string_dup(_instanceName.c_str()) ;
152 //=============================================================================
154 * CORBA method: return name of the component class
156 //=============================================================================
158 char* Engines_Parallel_Component_i::interfaceName()
160 return CORBA::string_dup(_interfaceName.c_str()) ;
163 //=============================================================================
165 * CORBA method: Get study Id
166 * \return -1: not initialised (Internal Error)
167 * 0: multistudy component instance
168 * >0: study id associated to this instance
170 //=============================================================================
172 CORBA::Long Engines_Parallel_Component_i::getStudyId()
177 //=============================================================================
179 * CORBA method: Test if instance is alive and responds
181 //=============================================================================
183 void Engines_Parallel_Component_i::ping()
185 // MESSAGE("Engines_Parallel_Component_i::ping_c() pid "<< getpid() << " threadid " << pthread_self());
188 //=============================================================================
190 * CORBA method: Deactivate this instance. CORBA object is deactivated (do not
191 * respond any more to CORBA calls), the connection to Regsitry is removed
192 * (Registry informed of deactivation), internal server reference counter on
193 * the derived servant class is decremented, to allow destruction of the class
194 * (delete) by POA, when there are no more references.
195 * -- TO BE USED BY CONTAINER ONLY (Container housekeeping) --
197 //=============================================================================
199 void Engines_Parallel_Component_i::destroy()
201 MESSAGE("Engines_Parallel_Component_i::destroy()");
202 //SCRUTE(pd_refCount);
204 delete _notifSupplier;
207 delete _myConnexionToRegistry;
208 _myConnexionToRegistry = 0 ;
209 _poa->deactivate_object(*_id) ;
210 CORBA::release(_poa) ;
212 //SCRUTE(pd_refCount);
213 _thisObj->_remove_ref();
214 //SCRUTE(pd_refCount);
215 MESSAGE("Engines_Parallel_Component_i::destroyed") ;
218 //=============================================================================
220 * CORBA method: return CORBA reference of the Container
223 //=============================================================================
225 Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef()
227 MESSAGE("Engines_Parallel_Component_i::GetContainerRef");
228 CORBA::Object_ptr o = _poa->id_to_reference(*_contId) ;
229 return Engines::Container::_narrow(o);
232 //=============================================================================
235 * Gives a sequence of (key=string,value=any) to the component.
236 * Base class component stores the sequence in a map.
237 * The map is cleared before.
238 * This map is for use by derived classes.
239 * \param dico sequence of (key=string,value=any)
241 //=============================================================================
243 void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico)
246 for (CORBA::ULong i=0; i<dico.length(); i++)
248 std::string cle(dico[i].key);
249 _fieldsDict[cle] = dico[i].value;
253 //=============================================================================
256 * returns a previously stored map (key=string,value=any) as a sequence.
257 * (see setProperties)
259 //=============================================================================
261 Engines::FieldsDict* Engines_Parallel_Component_i::getProperties()
263 Engines::FieldsDict_var copie = new Engines::FieldsDict;
264 copie->length(_fieldsDict.size());
265 map<std::string,CORBA::Any>::iterator it;
267 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++)
269 std::string cle((*it).first);
270 copie[i].key = CORBA::string_dup(cle.c_str());
271 copie[i].value = _fieldsDict[cle];
273 return copie._retn();
276 //=============================================================================
278 * CORBA method: used by Supervision to give names to this instance
280 //=============================================================================
282 void Engines_Parallel_Component_i::Names( const char * graphName ,
283 const char * nodeName )
285 _graphName = graphName ;
286 _nodeName = nodeName ;
287 INFOS("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '"
288 << _nodeName << "' )");
291 //=============================================================================
293 * CORBA method: used in Supervision
295 //=============================================================================
297 bool Engines_Parallel_Component_i::Kill_impl()
299 // MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self()
300 // << " pid " << getpid() << " instanceName "
301 // << _instanceName.c_str() << " interface " << _interfaceName.c_str()
302 // << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
303 // << dec << " _ThreadId " << _ThreadId << " this " << hex << this
306 bool RetVal = false ;
308 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
310 RetVal = Killer( _ThreadId , 0 ) ;
311 _ThreadId = (pthread_t ) -1 ;
315 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
317 RetVal = Killer( *_ThreadId , 0 ) ;
318 _ThreadId = (pthread_t* ) 0 ;
325 //=============================================================================
327 * CORBA method: used in Supervision
329 //=============================================================================
331 bool Engines_Parallel_Component_i::Stop_impl()
333 MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self()
334 << " pid " << getpid() << " instanceName "
335 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
336 << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
337 << dec << " _ThreadId " << _ThreadId );
340 bool RetVal = false ;
342 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
344 RetVal = Killer( _ThreadId , 0 ) ;
345 _ThreadId = (pthread_t ) -1 ;
348 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
350 RetVal = Killer( *_ThreadId , 0 ) ;
351 _ThreadId = (pthread_t* ) 0 ;
357 //=============================================================================
359 * CORBA method: used in Supervision
361 //=============================================================================
363 bool Engines_Parallel_Component_i::Suspend_impl()
365 MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self()
366 << " pid " << getpid() << " instanceName "
367 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
368 << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
369 << dec << " _ThreadId " << _ThreadId );
371 bool RetVal = false ;
373 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
375 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
385 RetVal = Killer( _ThreadId ,SIGINT ) ;
387 RetVal = Killer( *_ThreadId ,SIGINT ) ;
389 //if ( RetVal ) _Sleeping = true;
396 //=============================================================================
398 * CORBA method: used in Supervision
400 //=============================================================================
402 bool Engines_Parallel_Component_i::Resume_impl()
404 MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self()
405 << " pid " << getpid() << " instanceName "
406 << _instanceName.c_str() << " interface " << _interfaceName.c_str()
407 << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
408 << dec << " _ThreadId " << _ThreadId );
409 bool RetVal = false ;
411 if ( _ThreadId > 0 && pthread_self() != _ThreadId )
413 if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
429 //=============================================================================
433 //=============================================================================
435 CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl()
438 if ( _ThreadId || _Executed )
443 if ( pthread_self() != _ThreadId )
445 if ( pthread_self().p != _ThreadId->p )
453 // Get Cpu in the appropriate thread with that object !...
454 theEngines_Component = this ;
456 Killer( _ThreadId ,SIGUSR1 ) ;
458 Killer( *_ThreadId ,SIGUSR11 ) ;
461 cpu = _ThreadCpuUsed ;
465 _ThreadCpuUsed = CpuUsed() ;
466 cpu = _ThreadCpuUsed ;
467 // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
468 // << _serviceName << " " << cpu << endl ;
473 cpu = _ThreadCpuUsed ;
474 // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
475 // << _serviceName << " " << cpu<< endl ;
480 // cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId "
481 // <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<<endl;
487 //=============================================================================
489 * C++ method: return Container Servant
491 //=============================================================================
493 Engines_Parallel_Container_i *Engines_Parallel_Component_i::GetContainerPtr()
495 return dynamic_cast<Engines_Parallel_Container_i*>(_poa->id_to_servant(*_contId)) ;
498 //=============================================================================
500 * C++ method: set study Id
501 * \param studyId 0 if instance is not associated to a study,
502 * >0 otherwise (== study id)
503 * \return true if the set of study Id is OK
504 * must be set once by Container, at instance creation,
505 * and cannot be changed after.
507 //=============================================================================
509 CORBA::Boolean Engines_Parallel_Component_i::setStudyId(CORBA::Long studyId)
511 ASSERT( studyId >= 0);
512 CORBA::Boolean ret = false;
513 if (_studyId < 0) // --- not yet initialized
519 if ( _studyId == studyId) ret = true;
523 //=============================================================================
525 * C++ method: return CORBA instance id, the id is set in derived class
526 * constructor, when instance is activated.
528 //=============================================================================
530 PortableServer::ObjectId * Engines_Parallel_Component_i::getId()
532 // MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()");
536 //=============================================================================
538 * C++ method: used by derived classes for supervision
540 //=============================================================================
542 void Engines_Parallel_Component_i::beginService(const char *serviceName)
544 MESSAGE(pthread_self() << "Send BeginService notification for " <<serviceName
545 << endl << "Component instance : " << _instanceName << endl << endl);
547 _ThreadId = pthread_self() ;
549 _ThreadId = new pthread_t;
550 _ThreadId->p = pthread_self().p ;
551 _ThreadId->x = pthread_self().x ;
554 _StartUsed = CpuUsed_impl() ;
557 _serviceName = serviceName ;
558 if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) )
560 perror("pthread_setcanceltype ") ;
563 if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) )
565 perror("pthread_setcancelstate ") ;
568 // MESSAGE(pthread_self() << " Return from BeginService for " << serviceName
569 // << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed
570 // << " _graphName " << _graphName << " _nodeName " << _nodeName );
572 // --- for supervisor : all strings given with setProperties
573 // are set in environment
574 bool overwrite = true;
575 map<std::string,CORBA::Any>::iterator it;
576 for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++)
578 std::string cle((*it).first);
579 if ((*it).second.type()->kind() == CORBA::tk_string)
582 (*it).second >>= value;
583 // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
585 //int ret = setenv(cle.c_str(), value, overwrite);
586 setenv(cle.c_str(), value, overwrite);
588 //CCRT porting : setenv not defined in stdlib.h
592 // char* cast because 1st arg of linux putenv function
593 // is not a const char* !
594 //int ret=putenv((char *)s.c_str());
595 putenv((char *)s.c_str());
596 //End of CCRT porting
598 MESSAGE("--- setenv: "<<cle<<" = "<< value);
603 //=============================================================================
605 * C++ method: used by derived classes for supervision
607 //=============================================================================
609 void Engines_Parallel_Component_i::endService(const char *serviceName)
611 _ThreadCpuUsed = CpuUsed_impl() ;
612 MESSAGE(pthread_self() << " Send EndService notification for " << serviceName
613 << endl << " Component instance : " << _instanceName << " StartUsed "
614 << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
618 //=============================================================================
620 * C++ method: -- CHECK IF USED --
622 //=============================================================================
624 char* Engines_Parallel_Component_i::graphName()
626 return CORBA::string_dup( _graphName.c_str() ) ;
629 //=============================================================================
631 * C++ method: -- CHECK IF USED --
633 //=============================================================================
635 char* Engines_Parallel_Component_i::nodeName()
637 return CORBA::string_dup( _nodeName.c_str() ) ;
640 //=============================================================================
642 * C++ method: used in Supervision (see kill_impl)
644 //=============================================================================
646 bool Engines_Parallel_Component_i::Killer( pthread_t ThreadId , int signum )
656 if ( pthread_cancel( ThreadId ) )
658 perror("Killer pthread_cancel error") ;
663 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
664 << " pthread_canceled") ;
669 if ( pthread_kill( ThreadId , signum ) == -1 )
671 perror("Killer pthread_kill error") ;
676 MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
677 << " pthread_killed(" << signum << ")") ;
684 //=============================================================================
688 //=============================================================================
692 theEngines_Component->SetCurCpu() ;
695 //=============================================================================
699 //=============================================================================
701 void Engines_Parallel_Component_i::SetCurCpu()
703 _ThreadCpuUsed = CpuUsed() ;
704 // MESSAGE(pthread_self() <<
705 // " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ;
708 //=============================================================================
712 //=============================================================================
714 long Engines_Parallel_Component_i::CpuUsed()
718 struct rusage usage ;
719 if ( _ThreadId || _Executed )
721 if ( getrusage( RUSAGE_SELF , &usage ) == -1 )
723 perror("Engines_Parallel_Component_i::CpuUsed") ;
726 cpu = usage.ru_utime.tv_sec - _StartUsed ;
727 // cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " "
728 // << _serviceName << usage.ru_utime.tv_sec << " - " << _StartUsed
729 // << " = " << cpu << endl ;
733 // cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId "
734 // << _ThreadId << " " << _serviceName<< " _StartUsed "
735 // << _StartUsed << endl ;
738 // NOT implementet yet
745 //=============================================================================
747 * C++ method: Send message to event channel
749 //=============================================================================
751 void Engines_Parallel_Component_i::sendMessage(const char *event_type,
754 _notifSupplier->Send(graphName(), nodeName(), event_type, message);
757 //=============================================================================
759 * C++ method: return standard library name built on component name
761 //=============================================================================
763 string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName)
771 //=============================================================================
773 * C++ method: DumpPython default implementation
775 //=============================================================================
777 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr theStudy,
778 CORBA::Boolean isPublished,
779 CORBA::Boolean& isValidScript)
781 const char* aScript = "def RebuildData(theStudy): pass";
782 char* aBuffer = new char[strlen(aScript)+1];
783 strcpy(aBuffer, aScript);
784 CORBA::Octet* anOctetBuf = (CORBA::Octet*)aBuffer;
785 int aBufferSize = strlen(aBuffer)+1;
786 Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1);
787 isValidScript = true;
788 return aStreamFile._retn();
792 Engines::Salome_file_ptr
793 Engines_Parallel_Component_i::setInputFileToService(const char* service_name,
794 const char* Salome_file_name)
796 // Try to find the service, if it doesn't exist, we add it.
797 _Service_file_map_it = _Input_Service_file_map.find(service_name);
798 if (_Service_file_map_it == _Input_Service_file_map.end()) {
799 _t_Salome_file_map * _map = new _t_Salome_file_map();
800 _Input_Service_file_map[service_name] = _map;
801 _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
802 _Proxy_Input_Service_file_map[service_name] = _proxy_map;
803 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
804 _IOR_Proxy_Input_Service_file_map[service_name] = _IOR_proxy_map;
806 _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
807 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
808 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Input_Service_file_map[service_name];
810 pthread_mutex_lock(deploy_mutex);
811 std::string proxy_ior;
813 // Try to find the Salome_file ...
814 _Salome_file_map_it = _map->find(Salome_file_name);
815 if (_Salome_file_map_it == _map->end()) {
817 // We create a new PaCO++ object.
818 // He has the same configuration than
821 // Firstly, we have to create the proxy object
822 // of the Salome_file and transmit his
823 // reference to the other nodes.
824 Engines::Parallel_Salome_file_proxy_impl * proxy = NULL;
825 if (getMyRank() == 0) {
826 proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
827 new paco_omni_fabrique());
828 proxy->copyGlobalContext(this);
829 PaCO::PacoTopology_t serveur_topo;
830 serveur_topo.total = getTotalNode();
831 proxy->setTopology(serveur_topo);
833 // We register the CORBA objet into the POA
834 CORBA::Object_ptr proxy_ref = proxy->_this();
836 // We send the reference to all the nodes...
837 Engines::Parallel_Component_var component_proxy =
838 Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
839 component_proxy->send_parallel_proxy_object(proxy_ref);
841 // Adding proxy into the map
842 (*_proxy_map)[Salome_file_name] = proxy;
845 this->wait_parallel_object_proxy();
848 proxy_ior = this->get_parallel_proxy_object();
849 (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
851 // We register each node of the parallel Salome_file object
853 for (int i = 0; i < getTotalNode(); i++) {
854 if (i == getMyRank()) {
855 Parallel_Salome_file_i * servant =
856 new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb),
859 servant->copyGlobalContext(this);
861 // We register the CORBA objet into the POA
862 servant->POA_PaCO::InterfaceParallel::_this();
864 // Register the servant
867 // Adding servant to the map
868 (*_map)[Salome_file_name] = servant;
871 _my_com->paco_barrier();
872 // start parallel object
873 if (getMyRank() == 0) {
875 _my_com->paco_barrier();
878 _my_com->paco_barrier();
880 // Parallel_Salome_file is created and deployed
885 pthread_mutex_unlock(deploy_mutex);
886 proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
887 CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
888 return Engines::Salome_file::_narrow(proxy_ref);
891 Engines::Salome_file_ptr
892 Engines_Parallel_Component_i::setOutputFileToService(const char* service_name,
893 const char* Salome_file_name)
895 // Try to find the service, if it doesn't exist, we add it.
896 _Service_file_map_it = _Output_Service_file_map.find(service_name);
897 if (_Service_file_map_it == _Output_Service_file_map.end()) {
898 _t_Salome_file_map * _map = new _t_Salome_file_map();
899 _Output_Service_file_map[service_name] = _map;
900 _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
901 _Proxy_Output_Service_file_map[service_name] = _proxy_map;
902 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
903 _IOR_Proxy_Output_Service_file_map[service_name] = _IOR_proxy_map;
905 _t_Salome_file_map * _map = _Output_Service_file_map[service_name];
906 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Output_Service_file_map[service_name];
907 _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Output_Service_file_map[service_name];
909 pthread_mutex_lock(deploy_mutex);
910 std::string proxy_ior;
912 // Try to find the Salome_file ...
913 _Salome_file_map_it = _map->find(Salome_file_name);
914 Engines::Parallel_Salome_file_proxy_impl * proxy;
915 if (_Salome_file_map_it == _map->end()) {
917 // We create a new PaCO++ object.
918 // He has the same configuration than
921 // Firstly, we have to create the proxy object
922 // of the Salome_file and transmit his
923 // reference to the other nodes.
924 if (getMyRank() == 0) {
925 proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
926 new paco_omni_fabrique());
927 proxy->copyGlobalContext(this);
928 PaCO::PacoTopology_t serveur_topo;
929 serveur_topo.total = getTotalNode();
930 proxy->setTopology(serveur_topo);
932 // We register the CORBA objet into the POA
933 CORBA::Object_ptr proxy_ref = proxy->_this();
935 // We send the reference to all the nodes...
936 Engines::Parallel_Component_var component_proxy =
937 Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
938 component_proxy->send_parallel_proxy_object(proxy_ref);
940 // Adding proxy into the map
941 (*_proxy_map)[Salome_file_name] = proxy;
944 this->wait_parallel_object_proxy();
947 proxy_ior = this->get_parallel_proxy_object();
948 (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
950 // We register each node of the parallel Salome_file object
952 for (int i = 0; i < getTotalNode(); i++) {
953 if (i == getMyRank()) {
954 Parallel_Salome_file_i * servant =
955 new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb),
958 servant->copyGlobalContext(this);
960 // We register the CORBA objet into the POA
961 servant->POA_PaCO::InterfaceParallel::_this();
963 // Register the servant
966 // Adding servant to the map
967 (*_map)[Salome_file_name] = servant;
970 _my_com->paco_barrier();
971 // start parallel object
972 if (getMyRank() == 0) {
974 _my_com->paco_barrier();
977 _my_com->paco_barrier();
980 // Parallel_Salome_file is created and deployed
984 pthread_mutex_unlock(deploy_mutex);
985 proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
986 CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
987 return Engines::Salome_file::_narrow(proxy_ref);
990 Engines::Salome_file_ptr
991 Engines_Parallel_Component_i::getInputFileToService(const char* service_name,
992 const char* Salome_file_name)
994 // Try to find the service, if it doesn't exist, we throw an exception.
995 _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
996 if (_Proxy_Service_file_map_it == _Proxy_Input_Service_file_map.end()) {
997 SALOME::ExceptionStruct es;
998 es.type = SALOME::INTERNAL_ERROR;
999 es.text = "service doesn't have salome files";
1000 throw SALOME::SALOME_Exception(es);
1002 _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
1004 // Try to find the Salome_file ...
1005 _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1006 if (_Proxy_Salome_file_map_it == _map->end()) {
1007 SALOME::ExceptionStruct es;
1008 es.type = SALOME::INTERNAL_ERROR;
1009 es.text = "service doesn't have this Salome_file";
1010 throw SALOME::SALOME_Exception(es);
1013 // Client get the proxy object
1014 Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1015 return Sfile->_this();
1018 Engines::Salome_file_ptr
1019 Engines_Parallel_Component_i::getOutputFileToService(const char* service_name,
1020 const char* Salome_file_name)
1022 // Try to find the service, if it doesn't exist, we throw an exception.
1023 _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1024 if (_Proxy_Service_file_map_it == _Proxy_Output_Service_file_map.end()) {
1025 SALOME::ExceptionStruct es;
1026 es.type = SALOME::INTERNAL_ERROR;
1027 es.text = "service doesn't have salome files";
1028 throw SALOME::SALOME_Exception(es);
1030 _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1032 // Try to find the Salome_file ...
1033 _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
1034 if (_Proxy_Salome_file_map_it == _map->end()) {
1035 SALOME::ExceptionStruct es;
1036 es.type = SALOME::INTERNAL_ERROR;
1037 es.text = "service doesn't have this Salome_file";
1038 throw SALOME::SALOME_Exception(es);
1041 // Client get the proxy object
1042 Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
1043 return Sfile->_this();
1048 Engines_Parallel_Component_i::checkInputFilesToService(const char* service_name)
1050 // Try to find the service, if it doesn't exist, nothing to do.
1051 _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
1052 if (_Proxy_Service_file_map_it != _Proxy_Input_Service_file_map.end()) {
1053 _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
1054 _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
1055 _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
1057 for(;begin!=end;begin++) {
1058 Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1059 std::string file_port_name = begin->first;
1060 configureSalome_file(service_name, file_port_name, file);
1067 Engines_Parallel_Component_i::checkOutputFilesToService(const char* service_name)
1069 // Try to find the service, if it doesn't exist, nothing to do.
1070 _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
1071 if (_Proxy_Service_file_map_it != _Proxy_Output_Service_file_map.end()) {
1072 _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
1073 _t_Proxy_Salome_file_map::iterator begin = _map->begin();
1074 _t_Proxy_Salome_file_map::iterator end = _map->end();
1076 for(;begin!=end;begin++) {
1077 Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
1078 std::string file_port_name = begin->first;
1079 configureSalome_file(service_name, file_port_name, file);
1086 //=============================================================================
1088 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1090 //=============================================================================
1092 Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
1093 _proxy = _orb->object_to_string(proxy_ref);
1096 //=============================================================================
1098 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1100 //=============================================================================
1102 Engines_Parallel_Component_i::wait_parallel_object_proxy() {
1103 char * proxy = NULL;
1104 proxy = get_parallel_proxy_object();
1105 while(proxy == NULL)
1108 proxy = get_parallel_proxy_object();
1112 //=============================================================================
1114 * C++ method: Used by the Parallel Component to deploy a Parallel Salome_file
1116 //=============================================================================
1118 Engines_Parallel_Component_i::get_parallel_proxy_object() {
1123 //=============================================================================
1125 * C++ method: used to configure the Salome_file into the runtime.
1126 * \param service_name name of the service that use this Salome_file
1127 * \param file_port_name name of the Salome_file
1128 * \param file Parallel Salome_file C++ object
1130 //=============================================================================
1132 Engines_Parallel_Component_i::configureSalome_file(std::string service_name,
1133 std::string file_port_name,
1134 Engines::Parallel_Salome_file_proxy_impl * file)
1136 // By default this method does nothing