]> SALOME platform Git repositories - modules/yacs.git/blob - src/engine/Executor.cxx
Salome HOME
PR: first version from Antony GEAY, with directory restructuration
[modules/yacs.git] / src / engine / Executor.cxx
1 #include "Executor.hxx"
2 #include "Task.hxx"
3 #include "Scheduler.hxx"
4 #include <pthread.h>
5 #include <iostream>
6
7 using namespace YACS::ENGINE;
8
9 using YACS::BASES::Mutex;
10 using YACS::BASES::Thread;
11 using YACS::BASES::Semaphore;
12
13 Executor::Executor():_nbOfConcurrentThreads(0)//,_cond(PTHREAD_COND_INITIALIZER)
14 {
15 }
16
17 Executor::~Executor()
18 {
19   for(std::list<Thread *>::iterator iter=_groupOfAllThreadsCreated.begin();iter!=_groupOfAllThreadsCreated.end();iter++)
20     delete *iter;
21 }
22
23 void Executor::RunW(Scheduler *graph)
24 {
25   _mainSched=graph;
26   bool isMore;
27   int i=0;
28   graph->init();
29   std::vector<Task *> tasks;
30   std::vector<Task *>::iterator iter;
31   bool toContinue=true;
32   wakeUp();
33   while(toContinue)
34     {
35       sleepWhileNoEventsFromAnyRunningTask();
36       {//Critical section
37         _mutexForSchedulerUpdate.lock();
38         tasks=graph->getNextTasks(isMore);
39         graph->selectRunnableTasks(tasks);
40         _mutexForSchedulerUpdate.unlock();
41       }//End of critical section
42       for(iter=tasks.begin();iter!=tasks.end();iter++)
43         launchTask(*iter);
44       {//Critical section
45         _mutexForSchedulerUpdate.lock();
46         toContinue=!graph->isFinished();
47         _mutexForSchedulerUpdate.unlock();
48       }//End of critical section
49       i++;
50     }
51 }
52
53 void Executor::launchTask(Task *task)
54 {
55   void **args=new void *[3];
56   _mutexForNbOfConcurrentThreads.lock();
57   _groupOfAllThreadsCreated.push_back(0);
58   std::list<Thread *>::iterator iter=_groupOfAllThreadsCreated.end();
59   iter--;
60   _mutexForNbOfConcurrentThreads.unlock();
61   args[0]=(void *)task;
62   args[1]=(void *)_mainSched;
63   args[2]=(void *)this;
64   {//Critical section
65     _mutexForSchedulerUpdate.lock();
66     task->begin();
67     _mainSched->notifyFrom(task,YACS::START);
68     _mutexForSchedulerUpdate.unlock();
69   }//End of critical section
70   _mutexForNbOfConcurrentThreads.lock();
71   //functionForTaskExecution(args);//MultiThreaded=NO
72   //  *iter=
73   new Thread(functionForTaskExecution,args);//MultiThreaded=YES
74   _mutexForNbOfConcurrentThreads.unlock();
75 }
76
77 void Executor::sleepWhileNoEventsFromAnyRunningTask()
78 {
79   _semForNewTasksToPerform.wait();
80 }
81
82 void Executor::notifyEndOfThread(YACS::BASES::Thread *thread)
83 {
84   /*_mutexForNbOfConcurrentThreads.lock();
85   _groupOfAllThreadsCreated.remove(thread);
86   delete thread;
87   _mutexForNbOfConcurrentThreads.unlock();*/
88 }
89
90 void Executor::wakeUp()
91 {
92   int val=_semForNewTasksToPerform.getValue();
93   if(!val)
94     _semForNewTasksToPerform.post();
95 }
96
97 int Executor::getNbOfThreads()
98 {
99   int ret;
100   _mutexForNbOfConcurrentThreads.lock();
101   ret=_groupOfAllThreadsCreated.size();
102   _mutexForNbOfConcurrentThreads.unlock();
103   return ret;
104 }
105
106 void *Executor::functionForTaskExecution(void *arg)
107 {
108   void **argT=(void **)arg;
109   Task *task=(Task *)argT[0];
110   Scheduler *sched=(Scheduler *)argT[1];
111   Executor *execInst=(Executor *)argT[2];
112   delete [] argT;
113   task->execute();
114   {//Critical section
115     execInst->_mutexForSchedulerUpdate.lock();
116     task->finished();
117     sched->notifyFrom(task,YACS::FINISH);
118     execInst->_mutexForSchedulerUpdate.unlock();
119   }//End of critical section
120   execInst->wakeUp();
121   execInst->notifyEndOfThread(0);
122   return 0;
123 }