Salome HOME
Workloadmanager deadlock on big foreach. EMC2P_V_1_3_4 V9_8_0b1
authorOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Fri, 15 Oct 2021 12:18:38 +0000 (14:18 +0200)
committerOvidiu Mircescu <ovidiu.mircescu@edf.fr>
Fri, 15 Oct 2021 12:18:38 +0000 (14:18 +0200)
src/workloadmanager/WorkloadManager.cxx
src/yacsloader/samples/wlm_complex_foreach.xml [new file with mode: 0644]
src/yacsloader_swig/Test/testWorkloadManager.py

index 98224019f42b0ea2521ee4a5376cfd9a0c6e4a6c..a364146bb385df27892b23b9330159160e4f9cae 100644 (file)
@@ -18,6 +18,9 @@
 //
 #include "WorkloadManager.hxx"
 #include "Task.hxx"
+#include <chrono>
+#include <thread>
+
 
 namespace WorkloadManager
 {
@@ -96,17 +99,21 @@ namespace WorkloadManager
     bool threadStop = false;
     while(!threadStop)
     {
-      std::unique_lock<std::mutex> lock(_data_mutex);
-      _startCondition.wait(lock, [this] {return !_algo.empty() || _stop;});
-      RunningInfo taskInfo;
-      while(chooseTaskToRun(taskInfo))
       {
-        _runningTasks.emplace(taskInfo.id, std::async(std::launch::async, [this, taskInfo]
-          {
-            runOneTask(taskInfo);
-          }));
+        std::unique_lock<std::mutex> lock(_data_mutex);
+        _startCondition.wait(lock, [this] {return !_algo.empty() || _stop;});
+        RunningInfo taskInfo;
+        while(chooseTaskToRun(taskInfo))
+        {
+          _runningTasks.emplace(taskInfo.id, std::async(std::launch::async, [this, taskInfo]
+            {
+              runOneTask(taskInfo);
+            }));
+        }
+        threadStop = _stop && _algo.empty();
       }
-      threadStop = _stop && _algo.empty();
+      // workaroud to release the lock and give a chance to other tasks to finish
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
     }
   }
 
diff --git a/src/yacsloader/samples/wlm_complex_foreach.xml b/src/yacsloader/samples/wlm_complex_foreach.xml
new file mode 100644 (file)
index 0000000..7da065d
--- /dev/null
@@ -0,0 +1,119 @@
+<?xml version='1.0' encoding='iso-8859-1' ?>
+<proc name="test">
+   <property name="executor" value="workloadmanager"/>
+   <type name="string" kind="string"/>
+   <struct name="Engines/dataref">
+      <member name="ref" type="string"/>
+   </struct>
+   <type name="bool" kind="bool"/>
+   <sequence name="boolvec" content="bool"/>
+   <type name="double" kind="double"/>
+   <sequence name="dblevec" content="double"/>
+   <objref name="file" id="file"/>
+   <type name="int" kind="int"/>
+   <sequence name="intvec" content="int"/>
+   <objref name="pyobj" id="python:obj:1.0"/>
+   <sequence name="list[pyobj]" content="pyobj"/>
+   <struct name="stringpair">
+      <member name="name" type="string"/>
+      <member name="value" type="string"/>
+   </struct>
+   <sequence name="propvec" content="stringpair"/>
+   <sequence name="seqboolvec" content="boolvec"/>
+   <sequence name="seqdblevec" content="dblevec"/>
+   <sequence name="seqintvec" content="intvec"/>
+   <sequence name="seqpyobj" content="pyobj"/>
+   <sequence name="stringvec" content="string"/>
+   <sequence name="seqstringvec" content="stringvec"/>
+   <container name="DefaultContainer">
+      <property name="container_kind" value="Salome"/>
+      <property name="attached_on_cloning" value="0"/>
+      <property name="container_name" value="FactoryServer"/>
+      <property name="name" value="localhost"/>
+   </container>
+   <container name="default_container">
+      <property name="container_kind" value="Salome"/>
+      <property name="attached_on_cloning" value="0"/>
+      <property name="nb_parallel_procs" value="1"/>
+      <property name="type" value="multi"/>
+      <property name="use_py_cache" value="1"/>
+   </container>
+   <bloc name="test/run">
+      <remote name="testgenerate_conditions" elementaryWeight="-1">
+         <script><code><![CDATA[conditions_list=range(1000)
+]]></code></script>
+         <load container="default_container"/>
+         <outport name="conditions_list" type="list[pyobj]"/>
+      </remote>
+      <bloc name="UnBloc">
+         <foreachdyn name="UneForEach" loopWeight="-1" type="pyobj">
+            <bloc name="forEachBloc">
+               <bloc name="test/one_state_calc">
+                  <remote name="testget_search_state_option" elementaryWeight="-1">
+                     <script><code><![CDATA[
+]]></code></script>
+                     <load container="default_container"/>
+                     <inport name="conditions" type="pyobj"/>
+                  </remote>
+                  <remote name="line02" elementaryWeight="0.0001">
+                     <script><code><![CDATA[t=0.1]]></code></script>
+                     <load container="default_container"/>
+                     <outport name="t" type="pyobj"/>
+                  </remote>
+                  <remote name="testwait_for_me" elementaryWeight="-1">
+                     <script><code><![CDATA[import time
+time.sleep(timetowait)
+a=1
+]]></code></script>
+                     <load container="default_container"/>
+                     <inport name="timetowait" type="pyobj"/>
+                     <outport name="a" type="pyobj"/>
+                  </remote>
+                  <inline name="return_internal@PHY2S@Node">
+                     <script><code><![CDATA[res=i0]]></code></script>
+                     <inport name="i0" type="pyobj"/>
+                     <outport name="res" type="pyobj"/>
+                  </inline>
+                  <control> <fromnode>line02</fromnode> <tonode>testwait_for_me</tonode> </control>
+                  <control> <fromnode>testwait_for_me</fromnode> <tonode>return_internal@PHY2S@Node</tonode> </control>
+                  <datalink control="false">
+                     <fromnode>line02</fromnode> <fromport>t</fromport>
+                     <tonode>testwait_for_me</tonode> <toport>timetowait</toport>
+                  </datalink>
+                  <datalink control="false">
+                     <fromnode>testwait_for_me</fromnode> <fromport>a</fromport>
+                     <tonode>return_internal@PHY2S@Node</tonode> <toport>i0</toport>
+                  </datalink>
+               </bloc>
+               <inline name="return_internal@PHY2S@Node">
+                  <script><code><![CDATA[result=i0]]></code></script>
+                  <inport name="i0" type="pyobj"/>
+                  <outport name="result" type="pyobj"/>
+               </inline>
+               <control> <fromnode>test/one_state_calc</fromnode> <tonode>return_internal@PHY2S@Node</tonode> </control>
+               <datalink control="false">
+                  <fromnode>test/one_state_calc.return_internal@PHY2S@Node</fromnode> <fromport>res</fromport>
+                  <tonode>return_internal@PHY2S@Node</tonode> <toport>i0</toport>
+               </datalink>
+            </bloc>
+         </foreachdyn>
+         <datalink control="false">
+            <fromnode>UneForEach</fromnode> <fromport>evalSamples</fromport>
+            <tonode>UneForEach.forEachBloc.test/one_state_calc.testget_search_state_option</tonode> <toport>conditions</toport>
+         </datalink>
+      </bloc>
+      <control> <fromnode>testgenerate_conditions</fromnode> <tonode>UnBloc</tonode> </control>
+      <datalink control="false">
+         <fromnode>testgenerate_conditions</fromnode> <fromport>conditions_list</fromport>
+         <tonode>UnBloc.UneForEach</tonode> <toport>SmplsCollection</toport>
+      </datalink>
+   </bloc>
+   <parameter>
+      <tonode>test/run.UnBloc.UneForEach.forEachBloc.test/one_state_calc.testget_search_state_option</tonode><toport>conditions</toport>
+      <value><objref><![CDATA[lgPq1dvuHePaIDdCdgPqRgsiHev49jNCI.sAI=H4fbOK9WuanWtohjP4pZ1umeNq9dNqdXPmFZOalWtE1clEljfuFeuKBWuKRjfuIDN4JeP49Z.CH3=m9UvAI2cC93.I7UugI4dqVfOaI.tgI4cadaO4fXMiycNCteuqdbMiyau4BeuaI.vgIAt4R5]]></objref></value>
+   </parameter>
+   <parameter>
+      <tonode>test/run.UnBloc.UneForEach.forEachBloc.test/one_state_calc.testwait_for_me</tonode><toport>timetowait</toport>
+      <value><objref><![CDATA[lgPq1dvuHePaIDdCdgPqRgsiHev49jNCI.sAI=H4fbOK9WuanWtohjP4pZ1umeNq9dNqdXPmFZOalWtE1clEljfuFeuKBWuKRjfuIDN4JeP49Z.CH3=m9UvAI2cC93.I7UugI4dqVfOaI.tgI4cadaO4fXMiycNCteuqdbMiyau4BeuaI.vgIAt4R5]]></objref></value>
+   </parameter>
+</proc>
index d1d610d43e933ffcc59cf37897c0103175282c50..ce20db9cb24df5ec7d0f71ada05a133e168d7c1e 100755 (executable)
@@ -119,6 +119,13 @@ class TestEdit(unittest.TestCase):
         self.assertEqual(proc.getState(),pilot.FAILED)
         self.assertEqual(proc.getChildByName("ErrorNode").getState(),pilot.ERROR)
 
+    def test5(self):
+        """ Foreach with 1000 points and several nodes in the block.
+        """
+        proc = self.l.load("samples/wlm_complex_foreach.xml")
+        self.e.RunW(proc,0)
+        self.assertEqual(proc.getState(),pilot.DONE)
+
 if __name__ == '__main__':
   dir_test = tempfile.mkdtemp(suffix=".yacstest")
   file_test = os.path.join(dir_test,"UnitTestsResult")