// Implement parallel computation of Hexa with c++ thread implementation
template<typename Iterator, class Function>
- void parallel_for(const Iterator& first, const Iterator& last, Function&& f, const int nthreads = 1)
+ void parallel_for(const Iterator& first, const Iterator& last, Function&& f, const unsigned int nthreads = 1)
{
- const unsigned int group = ((last-first))/std::abs(nthreads);
+ MESSAGE("Start parallel computation of Hexa with c++ threads...");
- std::vector<std::thread> threads;
+ assert(nthreads > 0);
+
+ const unsigned int numTasksTotal = last - first;
+ std::vector<std::thread> threads;
+ Iterator it = first;
+
+ MESSAGE("Number of elements to compute: " << numTasksTotal << "; num of threads: " << nthreads);
+
+ // Distribute tasks among threads
+ if (numTasksTotal <= nthreads)
+ {
+ // A simple case - just one task executed in one thread.
+ // TODO: check if it's faster to do it sequentially
+ threads.reserve(numTasksTotal);
+ for (; it < last; ++it)
+ {
+ threads.emplace_back(f, std::ref(*it));
+ }
+ }
+ else
+ {
+ // Calculate how to distribute elements among threads evenly
+ const unsigned int numTasksInThread = numTasksTotal / nthreads;
+ MESSAGE("Number of tasks in thread: " << numTasksInThread);
+
+ // Store the numbers of tasks per thread
+ std::vector<unsigned int> distTasksInThreads(nthreads, numTasksInThread);
+
+ // Distribute a remainder among saved numbers
+ const unsigned int remainder = numTasksTotal % nthreads;
+ MESSAGE("Remainder of tasks " << remainder << " will be evenly distributed among threads");
+ for (unsigned int i = 0; i < remainder; ++i)
+ {
+ ++distTasksInThreads[i];
+ }
+
+ // Create threads for each number of tasks
threads.reserve(nthreads);
- Iterator it = first;
- for (; it < last-group; it += group) {
- // to create a thread
- // Pass iterators by value and the function by reference!
- auto lambda = [=,&f](){ std::for_each(it, std::min(it+group, last), f);};
+ for (const auto i : distTasksInThreads)
+ {
+ Iterator curLast = it + i;
- // stack the threads
- threads.push_back( std::thread( lambda ) );
+ // Pass iterators by value and the function by reference!
+ auto lambda = [=,&f](){ std::for_each(it, curLast, f); };
+
+ // Create a thread
+ threads.emplace_back(lambda);
+
+ // Advance iterator to the next step
+ it = curLast;
}
+ }
+
+ std::for_each(threads.begin(), threads.end(), [](std::thread& x){ x.join(); });
- std::for_each(it, last, f); // last steps while we wait for other threads
- std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
+ MESSAGE("Parallel computation was finished successfully");
}
//================================================================================
/*!