From 88853328dddaec0b429a5aebfdac8aa5478fbe6b Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Fri, 14 Aug 2020 12:45:27 +0800 Subject: [PATCH 01/11] Add parallel_for --- src/support/parallel_for.cc | 39 +++++++++ src/support/parallel_for.h | 148 +++++++++++++++++++++++++++++++++ tests/cpp/parallel_for_test.cc | 65 +++++++++++++++ 3 files changed, 252 insertions(+) create mode 100644 src/support/parallel_for.cc create mode 100644 src/support/parallel_for.h create mode 100644 tests/cpp/parallel_for_test.cc diff --git a/src/support/parallel_for.cc b/src/support/parallel_for.cc new file mode 100644 index 000000000000..a46842dbef10 --- /dev/null +++ b/src/support/parallel_for.cc @@ -0,0 +1,39 @@ +#include "parallel_for.h" + +#include + +namespace tvm { +namespace support { + +ThreadPool& ThreadPool::Global() { + static ThreadPool* pool = new ThreadPool(); + static int ct = 0; + + ct = (ct + 1) % ThreadPool::REFRESH_EVERY; + + if (ct == 0) { + pool->Abort(); + delete pool; + pool = new ThreadPool(); + } + + if (pool->NumWorkers() == 0) { + pool->Launch(std::thread::hardware_concurrency()); + } + + return *pool; +} + +void parallel_for(int begin, int end, const std::function& f, int step) { + auto& pf = ThreadPool::Global(); + int batch_count = (end - begin) / step; + CHECK_GT(batch_count, 0); + pf.BeginBatch(batch_count); + for (int i = begin; i < end; i += step) { + pf.Enqueue(f, i); + } + pf.WaitBatch(); +} + +} // namespace support +} // namespace tvm diff --git a/src/support/parallel_for.h b/src/support/parallel_for.h new file mode 100644 index 000000000000..578193b98b93 --- /dev/null +++ b/src/support/parallel_for.h @@ -0,0 +1,148 @@ +#ifndef TVM_SUPPORT_PARALLEL_FOR_H_ +#define TVM_SUPPORT_PARALLEL_FOR_H_ + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace tvm { +namespace support { + +class ThreadPool { + public: + /*! + * \brief Set the thread number used in this pool. + * \param n The thread number of this pool. + */ + void Launch(size_t n = 1) { + for (std::size_t i = 0; i < n; ++i) { + threads_.emplace_back([this] { WorkerFunc(); }); + } + } + + /*! + * \brief Set the total task number to be executed in this parallel for run batch. + * \param n The task number of this parallel for run batch. + */ + void BeginBatch(int n) { + CHECK(is_finished_) << "Last run batch didn't finished."; + finish_ct_ = n; + is_finished_ = n <= 0; + } + + /*! + * \brief Add run task to task queue. The task added will be run in thread pool immediately. + * \param f The task function to be executed. + * \param args The args of the task function. + * \return The result of the task function. + */ + template ::type> + std::future Enqueue(F&& f, Args&&... args) { + std::packaged_task p(std::bind(f, args...)); + + auto r = p.get_future(); + { + std::unique_lock l(m_); + work_.emplace_back(std::move(p)); + } + work_signal_.notify_one(); + return r; + } + + /*! \brief Wait until the parallel for run batch is finished. */ + void WaitBatch() { + std::unique_lock l(finish_mutex_); + if (!is_finished_) { + finish_signal_.wait(l); + } + CHECK(is_finished_); + } + + /*! \brief Stop the running process. */ + void Abort() { + CancelPending(); + Join(); + } + + /*! \brief Cancel all the tasks in task queue. */ + void CancelPending() { + std::unique_lock l(m_); + work_.clear(); + } + + /*! \brief Wait until all of the threads are finished. */ + void Join() { + { + std::unique_lock l(m_); + for (size_t i = 0; i < threads_.size(); ++i) { + work_.push_back({}); + } + } + work_signal_.notify_all(); + for (auto& t : threads_) { + t.join(); + } + threads_.clear(); + } + + /*! + * \brief Get the working thread number of this pool. + * \return The thread number of this pool. + */ + size_t NumWorkers() { return threads_.size(); } + + static const int REFRESH_EVERY = 128; + static ThreadPool& Global(); + + ~ThreadPool() { Join(); } + + private: + void WorkerFunc() { + std::packaged_task f; + while (true) { + { + std::unique_lock l(m_); + if (work_.empty()) { + work_signal_.wait(l, [&] { return !work_.empty(); }); + } + f = std::move(work_.front()); + work_.pop_front(); + } + if (!f.valid()) { + return; + } + + f(); + + finish_ct_--; + if (finish_ct_ == 0) { + std::unique_lock l(finish_mutex_); + is_finished_ = true; + finish_signal_.notify_one(); + } + } + } + + std::mutex m_; + std::condition_variable work_signal_; + std::deque> work_; + std::vector threads_; + + bool is_finished_ = true; + std::mutex finish_mutex_; + std::atomic finish_ct_; + std::condition_variable finish_signal_; +}; + +void parallel_for(int begin, int end, const std::function& f, int step = 1); + +} // namespace support +} // namespace tvm + +#endif // TVM_SUPPORT_PARALLEL_FOR_H_ diff --git a/tests/cpp/parallel_for_test.cc b/tests/cpp/parallel_for_test.cc new file mode 100644 index 000000000000..d5e21c9b2308 --- /dev/null +++ b/tests/cpp/parallel_for_test.cc @@ -0,0 +1,65 @@ +#include "../src/support/parallel_for.h" + +#include +#include +#include + +TEST(ParallelFor, Basic) { + using namespace tvm::support; + + int a[100], b[100]; + + for (int i = 0; i < 100; i++) { + a[i] = i; + } + + parallel_for(0, 100, [&b](int i) { + b[i] = i; + }); + + for (int i = 0; i < 100; i++) { + CHECK_EQ(a[i], b[i]); + } +} + +TEST(ParallelFor, Nested) { + using namespace tvm::support; + + int a[100][100], b[100][100], c[100][100]; + + for (int i = 0; i < 100; i++) { + for (int j = 0; j < 100; j++) { + a[i][j] = i * j; + } + } + + parallel_for(0, 100, [&b](int i) { + for (int j = 0; j < 100; j++) { + b[i][j] = i * j; + } + }); + + for (int i = 0; i < 100; i++) { + for (int j = 0; j < 100 ; j++) { + CHECK_EQ(a[i][j], b[i][j]); + } + } + + for (int i = 0; i < 100; i++) { + parallel_for(0, 100, [&c, &i](int j) { + c[i][j] = i * j; + }); + } + + for (int i = 0; i < 100; i++) { + for (int j = 0; j < 100 ; j++) { + CHECK_EQ(a[i][j], c[i][j]); + } + } +} + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + testing::FLAGS_gtest_death_test_style = "threadsafe"; + return RUN_ALL_TESTS(); +} From 0c04e79ab220fe6d55a5a548f72bb07fa127aecd Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Fri, 14 Aug 2020 13:34:59 +0800 Subject: [PATCH 02/11] Add parallel_for --- src/support/parallel_for.cc | 161 +++++++++++++++++++++++++++++- src/support/parallel_for.h | 172 ++++++++------------------------- tests/cpp/parallel_for_test.cc | 15 ++- 3 files changed, 204 insertions(+), 144 deletions(-) diff --git a/src/support/parallel_for.cc b/src/support/parallel_for.cc index a46842dbef10..5beddd151652 100644 --- a/src/support/parallel_for.cc +++ b/src/support/parallel_for.cc @@ -1,10 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/*! + * \file parallel_for.cc + * \brief An implementation to run loop in parallel. + */ #include "parallel_for.h" -#include +#include + +#include +#include +#include +#include +#include +#include +#include namespace tvm { namespace support { +class ThreadPool { + public: + /*! + * \brief Set the thread number used in this pool. + * \param n The thread number of this pool. + */ + void Launch(size_t n = 1) { + for (std::size_t i = 0; i < n; ++i) { + threads_.emplace_back([this] { WorkerFunc(); }); + } + } + + /*! + * \brief Set the total task number to be executed in this parallel for run batch. + * \param n The task number of this parallel for run batch. + */ + void BeginBatch(int n) { + CHECK(is_finished_) << "Last run batch didn't finished."; + finish_ct_ = n; + is_finished_ = n <= 0; + } + + /*! + * \brief Add run task to task queue. The task added will be run in thread pool immediately. + * \param f The task function to be executed. + * \param args The args of the task function. + * \return The result of the task function. + */ + template ::type> + std::future Enqueue(F&& f, Args&&... args) { + std::packaged_task p(std::bind(f, args...)); + + auto r = p.get_future(); + { + std::unique_lock l(m_); + work_.emplace_back(std::move(p)); + } + work_signal_.notify_one(); + return r; + } + + /*! \brief Wait until the parallel for run batch is finished. */ + void WaitBatch() { + std::unique_lock l(finish_mutex_); + if (!is_finished_) { + finish_signal_.wait(l); + } + CHECK(is_finished_); + } + + /*! \brief Stop the running process. */ + void Abort() { + CancelPending(); + Join(); + } + + /*! \brief Cancel all the tasks in task queue. */ + void CancelPending() { + std::unique_lock l(m_); + work_.clear(); + } + + /*! \brief Wait until all of the threads are finished. */ + void Join() { + { + std::unique_lock l(m_); + for (size_t i = 0; i < threads_.size(); ++i) { + work_.push_back({}); + } + } + work_signal_.notify_all(); + for (auto& t : threads_) { + t.join(); + } + threads_.clear(); + } + + /*! + * \brief Get the working thread number of this pool. + * \return The thread number of this pool. + */ + size_t NumWorkers() { return threads_.size(); } + + static const int REFRESH_EVERY = 128; + static ThreadPool& Global(); + + ~ThreadPool() { Join(); } + + private: + /*! \brief The function to run in the worker threads. */ + void WorkerFunc() { + std::packaged_task f; + while (true) { + { + std::unique_lock l(m_); + if (work_.empty()) { + work_signal_.wait(l, [&] { return !work_.empty(); }); + } + f = std::move(work_.front()); + work_.pop_front(); + } + if (!f.valid()) { + return; + } + + // Run the function + f(); + + finish_ct_--; + if (finish_ct_ == 0) { + std::unique_lock l(finish_mutex_); + is_finished_ = true; + finish_signal_.notify_one(); + } + } + } + + std::mutex m_; + std::condition_variable work_signal_; + std::deque> work_; + std::vector threads_; + + bool is_finished_ = true; + std::mutex finish_mutex_; + std::atomic finish_ct_; + std::condition_variable finish_signal_; +}; + ThreadPool& ThreadPool::Global() { static ThreadPool* pool = new ThreadPool(); static int ct = 0; diff --git a/src/support/parallel_for.h b/src/support/parallel_for.h index 578193b98b93..963a61688dc6 100644 --- a/src/support/parallel_for.h +++ b/src/support/parallel_for.h @@ -1,145 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/*! + * \file parallel_for.h + * \brief An implementation to run loop in parallel. + */ #ifndef TVM_SUPPORT_PARALLEL_FOR_H_ #define TVM_SUPPORT_PARALLEL_FOR_H_ -#include #include -#include -#include -#include -#include -#include - -#include namespace tvm { namespace support { -class ThreadPool { - public: - /*! - * \brief Set the thread number used in this pool. - * \param n The thread number of this pool. - */ - void Launch(size_t n = 1) { - for (std::size_t i = 0; i < n; ++i) { - threads_.emplace_back([this] { WorkerFunc(); }); - } - } - - /*! - * \brief Set the total task number to be executed in this parallel for run batch. - * \param n The task number of this parallel for run batch. - */ - void BeginBatch(int n) { - CHECK(is_finished_) << "Last run batch didn't finished."; - finish_ct_ = n; - is_finished_ = n <= 0; - } - - /*! - * \brief Add run task to task queue. The task added will be run in thread pool immediately. - * \param f The task function to be executed. - * \param args The args of the task function. - * \return The result of the task function. - */ - template ::type> - std::future Enqueue(F&& f, Args&&... args) { - std::packaged_task p(std::bind(f, args...)); - - auto r = p.get_future(); - { - std::unique_lock l(m_); - work_.emplace_back(std::move(p)); - } - work_signal_.notify_one(); - return r; - } - - /*! \brief Wait until the parallel for run batch is finished. */ - void WaitBatch() { - std::unique_lock l(finish_mutex_); - if (!is_finished_) { - finish_signal_.wait(l); - } - CHECK(is_finished_); - } - - /*! \brief Stop the running process. */ - void Abort() { - CancelPending(); - Join(); - } - - /*! \brief Cancel all the tasks in task queue. */ - void CancelPending() { - std::unique_lock l(m_); - work_.clear(); - } - - /*! \brief Wait until all of the threads are finished. */ - void Join() { - { - std::unique_lock l(m_); - for (size_t i = 0; i < threads_.size(); ++i) { - work_.push_back({}); - } - } - work_signal_.notify_all(); - for (auto& t : threads_) { - t.join(); - } - threads_.clear(); - } - - /*! - * \brief Get the working thread number of this pool. - * \return The thread number of this pool. - */ - size_t NumWorkers() { return threads_.size(); } - - static const int REFRESH_EVERY = 128; - static ThreadPool& Global(); - - ~ThreadPool() { Join(); } - - private: - void WorkerFunc() { - std::packaged_task f; - while (true) { - { - std::unique_lock l(m_); - if (work_.empty()) { - work_signal_.wait(l, [&] { return !work_.empty(); }); - } - f = std::move(work_.front()); - work_.pop_front(); - } - if (!f.valid()) { - return; - } - - f(); - - finish_ct_--; - if (finish_ct_ == 0) { - std::unique_lock l(finish_mutex_); - is_finished_ = true; - finish_signal_.notify_one(); - } - } - } - - std::mutex m_; - std::condition_variable work_signal_; - std::deque> work_; - std::vector threads_; - - bool is_finished_ = true; - std::mutex finish_mutex_; - std::atomic finish_ct_; - std::condition_variable finish_signal_; -}; - +/*! + * \brief A runtime api provided to run the task function in parallel. + * e.g. A for loop: + * for (int i = 0; i < 10; i++) { + * std::cout << index << "\n"; + * } + * should work the same as: + * parallel_for(0, 10, [](int index) { + * std::cout << index << "\n"; + * }); + * \param begin The start index of this parallel loop(inclusive). + * \param end The end index of this parallel loop(exclusive). + * \param f The task function to be excuted. Assert to take an int index as input with no output. + * \param step The traversal step to the index. + */ void parallel_for(int begin, int end, const std::function& f, int step = 1); } // namespace support diff --git a/tests/cpp/parallel_for_test.cc b/tests/cpp/parallel_for_test.cc index d5e21c9b2308..1b1cebcaa5d2 100644 --- a/tests/cpp/parallel_for_test.cc +++ b/tests/cpp/parallel_for_test.cc @@ -2,6 +2,7 @@ #include #include + #include TEST(ParallelFor, Basic) { @@ -13,9 +14,7 @@ TEST(ParallelFor, Basic) { a[i] = i; } - parallel_for(0, 100, [&b](int i) { - b[i] = i; - }); + parallel_for(0, 100, [&b](int i) { b[i] = i; }); for (int i = 0; i < 100; i++) { CHECK_EQ(a[i], b[i]); @@ -35,24 +34,22 @@ TEST(ParallelFor, Nested) { parallel_for(0, 100, [&b](int i) { for (int j = 0; j < 100; j++) { - b[i][j] = i * j; + b[i][j] = i * j; } }); for (int i = 0; i < 100; i++) { - for (int j = 0; j < 100 ; j++) { + for (int j = 0; j < 100; j++) { CHECK_EQ(a[i][j], b[i][j]); } } for (int i = 0; i < 100; i++) { - parallel_for(0, 100, [&c, &i](int j) { - c[i][j] = i * j; - }); + parallel_for(0, 100, [&c, &i](int j) { c[i][j] = i * j; }); } for (int i = 0; i < 100; i++) { - for (int j = 0; j < 100 ; j++) { + for (int j = 0; j < 100; j++) { CHECK_EQ(a[i][j], c[i][j]); } } From ba6b40f783f00a3c85bc0c96f666269e2e38b6cc Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Fri, 14 Aug 2020 13:44:29 +0800 Subject: [PATCH 03/11] Lint fix --- tests/cpp/parallel_for_test.cc | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/cpp/parallel_for_test.cc b/tests/cpp/parallel_for_test.cc index 1b1cebcaa5d2..0c28cbdfca6e 100644 --- a/tests/cpp/parallel_for_test.cc +++ b/tests/cpp/parallel_for_test.cc @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + #include "../src/support/parallel_for.h" #include From 3e2ee1b075bda5992a6550e9d9b86b15817510a9 Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Fri, 14 Aug 2020 14:25:08 +0800 Subject: [PATCH 04/11] Update exception catching --- src/support/parallel_for.cc | 59 +++++++++++++++++++++------------- src/support/parallel_for.h | 1 + tests/cpp/parallel_for_test.cc | 12 +++++++ 3 files changed, 49 insertions(+), 23 deletions(-) diff --git a/src/support/parallel_for.cc b/src/support/parallel_for.cc index 5beddd151652..8fa923827ec7 100644 --- a/src/support/parallel_for.cc +++ b/src/support/parallel_for.cc @@ -67,7 +67,6 @@ class ThreadPool { template ::type> std::future Enqueue(F&& f, Args&&... args) { std::packaged_task p(std::bind(f, args...)); - auto r = p.get_future(); { std::unique_lock l(m_); @@ -119,8 +118,31 @@ class ThreadPool { */ size_t NumWorkers() { return threads_.size(); } + /*! + * \brief The global singleton entry of ThreadPool. + * \return The ThreadPool reference; + */ + static ThreadPool& Global() { + static ThreadPool* pool = new ThreadPool(); + static int ct = 0; + + ct = (ct + 1) % ThreadPool::REFRESH_EVERY; + + if (ct == 0) { + pool->Abort(); + delete pool; + pool = new ThreadPool(); + } + + if (pool->NumWorkers() == 0) { + pool->Launch(std::thread::hardware_concurrency()); + } + + return *pool; + } + + /*! \brief Refresh the thread pool in every several runs. */ static const int REFRESH_EVERY = 128; - static ThreadPool& Global(); ~ThreadPool() { Join(); } @@ -164,34 +186,25 @@ class ThreadPool { std::condition_variable finish_signal_; }; -ThreadPool& ThreadPool::Global() { - static ThreadPool* pool = new ThreadPool(); - static int ct = 0; - - ct = (ct + 1) % ThreadPool::REFRESH_EVERY; - - if (ct == 0) { - pool->Abort(); - delete pool; - pool = new ThreadPool(); - } - - if (pool->NumWorkers() == 0) { - pool->Launch(std::thread::hardware_concurrency()); - } - - return *pool; -} - void parallel_for(int begin, int end, const std::function& f, int step) { auto& pf = ThreadPool::Global(); + int batch_count = (end - begin) / step; - CHECK_GT(batch_count, 0); + CHECK_GT(batch_count, 0) << "Infinite loop condition, check the setting of begin, end, step."; + + std::vector> res_vec; pf.BeginBatch(batch_count); for (int i = begin; i < end; i += step) { - pf.Enqueue(f, i); + res_vec.push_back(pf.Enqueue(f, i)); } pf.WaitBatch(); + try { + for (auto& i : res_vec) { + i.get(); + } + } catch (const std::exception& e) { + LOG(FATAL) << "Parallel_for error with " << e.what(); + } } } // namespace support diff --git a/src/support/parallel_for.h b/src/support/parallel_for.h index 963a61688dc6..3006a06fd35a 100644 --- a/src/support/parallel_for.h +++ b/src/support/parallel_for.h @@ -43,6 +43,7 @@ namespace support { * \param end The end index of this parallel loop(exclusive). * \param f The task function to be excuted. Assert to take an int index as input with no output. * \param step The traversal step to the index. + * \note Currently do not support nested parallel_for. */ void parallel_for(int begin, int end, const std::function& f, int step = 1); diff --git a/tests/cpp/parallel_for_test.cc b/tests/cpp/parallel_for_test.cc index 0c28cbdfca6e..9c4afd9bbcf0 100644 --- a/tests/cpp/parallel_for_test.cc +++ b/tests/cpp/parallel_for_test.cc @@ -74,6 +74,18 @@ TEST(ParallelFor, Nested) { } } +TEST(ParallelFor, Exception) { + using namespace tvm::support; + + bool exception = false; + try { + parallel_for(0, 100, [](int i) { LOG(FATAL) << "error"; }); + } catch (const std::exception& e) { + exception = true; + } + CHECK(exception); +} + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); testing::FLAGS_gtest_death_test_style = "threadsafe"; From 73e7243436039bd39957d82ea79caf48030d2261 Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Fri, 14 Aug 2020 17:45:39 +0800 Subject: [PATCH 05/11] Link fix --- src/support/parallel_for.h | 4 +++- tests/cpp/parallel_for_test.cc | 8 ++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/support/parallel_for.h b/src/support/parallel_for.h index 3006a06fd35a..8c7170dfadd4 100644 --- a/src/support/parallel_for.h +++ b/src/support/parallel_for.h @@ -24,6 +24,8 @@ #ifndef TVM_SUPPORT_PARALLEL_FOR_H_ #define TVM_SUPPORT_PARALLEL_FOR_H_ +#include + #include namespace tvm { @@ -45,7 +47,7 @@ namespace support { * \param step The traversal step to the index. * \note Currently do not support nested parallel_for. */ -void parallel_for(int begin, int end, const std::function& f, int step = 1); +TVM_DLL void parallel_for(int begin, int end, const std::function& f, int step = 1); } // namespace support } // namespace tvm diff --git a/tests/cpp/parallel_for_test.cc b/tests/cpp/parallel_for_test.cc index 9c4afd9bbcf0..e348843a556d 100644 --- a/tests/cpp/parallel_for_test.cc +++ b/tests/cpp/parallel_for_test.cc @@ -17,7 +17,7 @@ * under the License. */ -#include "../src/support/parallel_for.h" +#include "../../src/support/parallel_for.h" #include #include @@ -25,7 +25,7 @@ #include TEST(ParallelFor, Basic) { - using namespace tvm::support; + using tvm::support::parallel_for; int a[100], b[100]; @@ -41,7 +41,7 @@ TEST(ParallelFor, Basic) { } TEST(ParallelFor, Nested) { - using namespace tvm::support; + using tvm::support::parallel_for; int a[100][100], b[100][100], c[100][100]; @@ -75,7 +75,7 @@ TEST(ParallelFor, Nested) { } TEST(ParallelFor, Exception) { - using namespace tvm::support; + using tvm::support::parallel_for; bool exception = false; try { From 2b7d23d2fe068cb9accfa7e89dac70b6828b30f6 Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Sat, 15 Aug 2020 18:30:21 +0800 Subject: [PATCH 06/11] Remove the threadpool --- src/support/parallel_for.cc | 183 ++++++------------------------------ src/support/parallel_for.h | 19 +++- 2 files changed, 46 insertions(+), 156 deletions(-) diff --git a/src/support/parallel_for.cc b/src/support/parallel_for.cc index 8fa923827ec7..c5744661e2d5 100644 --- a/src/support/parallel_for.cc +++ b/src/support/parallel_for.cc @@ -36,168 +36,41 @@ namespace tvm { namespace support { -class ThreadPool { - public: - /*! - * \brief Set the thread number used in this pool. - * \param n The thread number of this pool. - */ - void Launch(size_t n = 1) { - for (std::size_t i = 0; i < n; ++i) { - threads_.emplace_back([this] { WorkerFunc(); }); - } - } - - /*! - * \brief Set the total task number to be executed in this parallel for run batch. - * \param n The task number of this parallel for run batch. - */ - void BeginBatch(int n) { - CHECK(is_finished_) << "Last run batch didn't finished."; - finish_ct_ = n; - is_finished_ = n <= 0; - } - - /*! - * \brief Add run task to task queue. The task added will be run in thread pool immediately. - * \param f The task function to be executed. - * \param args The args of the task function. - * \return The result of the task function. - */ - template ::type> - std::future Enqueue(F&& f, Args&&... args) { - std::packaged_task p(std::bind(f, args...)); - auto r = p.get_future(); - { - std::unique_lock l(m_); - work_.emplace_back(std::move(p)); - } - work_signal_.notify_one(); - return r; - } - - /*! \brief Wait until the parallel for run batch is finished. */ - void WaitBatch() { - std::unique_lock l(finish_mutex_); - if (!is_finished_) { - finish_signal_.wait(l); - } - CHECK(is_finished_); - } - - /*! \brief Stop the running process. */ - void Abort() { - CancelPending(); - Join(); - } - - /*! \brief Cancel all the tasks in task queue. */ - void CancelPending() { - std::unique_lock l(m_); - work_.clear(); - } - - /*! \brief Wait until all of the threads are finished. */ - void Join() { - { - std::unique_lock l(m_); - for (size_t i = 0; i < threads_.size(); ++i) { - work_.push_back({}); - } - } - work_signal_.notify_all(); - for (auto& t : threads_) { - t.join(); - } - threads_.clear(); - } - - /*! - * \brief Get the working thread number of this pool. - * \return The thread number of this pool. - */ - size_t NumWorkers() { return threads_.size(); } - - /*! - * \brief The global singleton entry of ThreadPool. - * \return The ThreadPool reference; - */ - static ThreadPool& Global() { - static ThreadPool* pool = new ThreadPool(); - static int ct = 0; - - ct = (ct + 1) % ThreadPool::REFRESH_EVERY; - - if (ct == 0) { - pool->Abort(); - delete pool; - pool = new ThreadPool(); - } - - if (pool->NumWorkers() == 0) { - pool->Launch(std::thread::hardware_concurrency()); - } - - return *pool; +std::vector> rr_partitioner(int begin, int end, int step, int num_threads) { + int total_task_count = (end - begin) / step; + CHECK_GT(total_task_count, 0) << "Infinite loop condition, check the input value of " + << "`begin`, `end`, `step`."; + + std::vector> ret(num_threads); + for (int thread = 0; begin < end; begin += step, thread = (thread + 1) % num_threads) { + ret[thread].push_back(begin); } + return ret; +} - /*! \brief Refresh the thread pool in every several runs. */ - static const int REFRESH_EVERY = 128; - - ~ThreadPool() { Join(); } - - private: - /*! \brief The function to run in the worker threads. */ - void WorkerFunc() { - std::packaged_task f; - while (true) { - { - std::unique_lock l(m_); - if (work_.empty()) { - work_signal_.wait(l, [&] { return !work_.empty(); }); - } - f = std::move(work_.front()); - work_.pop_front(); - } - if (!f.valid()) { - return; - } - - // Run the function - f(); +void parallel_for(int begin, int end, const std::function& f, int step, + const PartitionerFuncType partitioner) { + int default_num_threads = std::thread::hardware_concurrency(); + const auto& run_partitions = partitioner(begin, end, step, default_num_threads); - finish_ct_--; - if (finish_ct_ == 0) { - std::unique_lock l(finish_mutex_); - is_finished_ = true; - finish_signal_.notify_one(); - } + std::vector threads; + std::vector> res_vec; + for (const auto& run_partition : run_partitions) { + if (!run_partition.empty()) { + std::packaged_task&, const std::function&)> task( + [](const std::vector& run_pattition, const std::function& f) { + for (const auto& i : run_pattition) { + f(i); + } + }); + res_vec.emplace_back(task.get_future()); + threads.emplace_back(std::move(task), run_partition, f); } } - std::mutex m_; - std::condition_variable work_signal_; - std::deque> work_; - std::vector threads_; - - bool is_finished_ = true; - std::mutex finish_mutex_; - std::atomic finish_ct_; - std::condition_variable finish_signal_; -}; - -void parallel_for(int begin, int end, const std::function& f, int step) { - auto& pf = ThreadPool::Global(); - - int batch_count = (end - begin) / step; - CHECK_GT(batch_count, 0) << "Infinite loop condition, check the setting of begin, end, step."; - - std::vector> res_vec; - pf.BeginBatch(batch_count); - for (int i = begin; i < end; i += step) { - res_vec.push_back(pf.Enqueue(f, i)); + for (auto& thread : threads) { + thread.join(); } - pf.WaitBatch(); try { for (auto& i : res_vec) { i.get(); diff --git a/src/support/parallel_for.h b/src/support/parallel_for.h index 8c7170dfadd4..e1720f1516ff 100644 --- a/src/support/parallel_for.h +++ b/src/support/parallel_for.h @@ -27,10 +27,24 @@ #include #include +#include namespace tvm { namespace support { +using PartitionerFuncType = std::function>(int, int, int, int)>; + +/*! + * \brief A partitioner to split the task to each thread in Round-robin manner. + * \param begin The start index of this parallel loop(inclusive). + * \param end The end index of this parallel loop(exclusive). + * \param step The traversal step to the index. + * \param num_threads The number of threads(the number of tasks to be partitioned to). + * \return A list with `num_threads` elements, and each is a list of integers indicating the loop + * indexes for the corresponding thread to process. + */ +std::vector> rr_partitioner(int begin, int end, int step, int num_threads); + /*! * \brief A runtime api provided to run the task function in parallel. * e.g. A for loop: @@ -45,9 +59,12 @@ namespace support { * \param end The end index of this parallel loop(exclusive). * \param f The task function to be excuted. Assert to take an int index as input with no output. * \param step The traversal step to the index. + * \param partitioner A partition function to split tasks to different threads. Use Round-robin + * partitioner in default. * \note Currently do not support nested parallel_for. */ -TVM_DLL void parallel_for(int begin, int end, const std::function& f, int step = 1); +TVM_DLL void parallel_for(int begin, int end, const std::function& f, int step = 1, + const PartitionerFuncType partitioner = rr_partitioner); } // namespace support } // namespace tvm From 93795818e6bef34ba5ed370a0c83f820d5b61e32 Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Sat, 15 Aug 2020 18:35:44 +0800 Subject: [PATCH 07/11] Remove unused head file --- src/support/parallel_for.cc | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/support/parallel_for.cc b/src/support/parallel_for.cc index c5744661e2d5..99f7eb0c08f1 100644 --- a/src/support/parallel_for.cc +++ b/src/support/parallel_for.cc @@ -25,10 +25,7 @@ #include -#include -#include #include -#include #include #include #include From f1c3d6662cbf33248866ad962dd5e45629583cc3 Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Sat, 15 Aug 2020 18:46:39 +0800 Subject: [PATCH 08/11] Update UT --- tests/cpp/parallel_for_test.cc | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/cpp/parallel_for_test.cc b/tests/cpp/parallel_for_test.cc index e348843a556d..ee370fcb5c11 100644 --- a/tests/cpp/parallel_for_test.cc +++ b/tests/cpp/parallel_for_test.cc @@ -29,6 +29,7 @@ TEST(ParallelFor, Basic) { int a[100], b[100]; + // Default for (int i = 0; i < 100; i++) { a[i] = i; } @@ -38,6 +39,18 @@ TEST(ParallelFor, Basic) { for (int i = 0; i < 100; i++) { CHECK_EQ(a[i], b[i]); } + + // Check for step != 1 + for (int i = 0; i < 100; i += 2) { + a[i] *= 2; + } + + parallel_for( + 0, 100, [&b](int i) { b[i] *= 2; }, 2); + + for (int i = 0; i < 100; i++) { + CHECK_EQ(a[i], b[i]); + } } TEST(ParallelFor, Nested) { From 45abc179acfd09340114f63942ec1685f4e125a0 Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Mon, 17 Aug 2020 10:02:26 +0800 Subject: [PATCH 09/11] Update --- src/support/parallel_for.cc | 29 ++++++++++--------- src/support/parallel_for.h | 13 +++++---- tests/cpp/parallel_for_test.cc | 53 ++++++++++++++++++---------------- 3 files changed, 51 insertions(+), 44 deletions(-) diff --git a/src/support/parallel_for.cc b/src/support/parallel_for.cc index 99f7eb0c08f1..71a3f74b69d2 100644 --- a/src/support/parallel_for.cc +++ b/src/support/parallel_for.cc @@ -37,9 +37,12 @@ std::vector> rr_partitioner(int begin, int end, int step, int n int total_task_count = (end - begin) / step; CHECK_GT(total_task_count, 0) << "Infinite loop condition, check the input value of " << "`begin`, `end`, `step`."; - - std::vector> ret(num_threads); - for (int thread = 0; begin < end; begin += step, thread = (thread + 1) % num_threads) { + std::vector> ret; + ret.reserve(num_threads); + for (size_t thread = 0; begin < end; begin += step, thread = (thread + 1) % num_threads) { + if (thread >= ret.size()) { + ret.push_back(std::vector()); + } ret[thread].push_back(begin); } return ret; @@ -51,18 +54,18 @@ void parallel_for(int begin, int end, const std::function& f, int ste const auto& run_partitions = partitioner(begin, end, step, default_num_threads); std::vector threads; + threads.reserve(run_partitions.size()); std::vector> res_vec; + res_vec.reserve(run_partitions.size()); for (const auto& run_partition : run_partitions) { - if (!run_partition.empty()) { - std::packaged_task&, const std::function&)> task( - [](const std::vector& run_pattition, const std::function& f) { - for (const auto& i : run_pattition) { - f(i); - } - }); - res_vec.emplace_back(task.get_future()); - threads.emplace_back(std::move(task), run_partition, f); - } + std::packaged_task&, const std::function&)> task( + [](const std::vector& run_pattition, const std::function& f) { + for (const auto& i : run_pattition) { + f(i); + } + }); + res_vec.emplace_back(task.get_future()); + threads.emplace_back(std::move(task), run_partition, f); } for (auto& thread : threads) { diff --git a/src/support/parallel_for.h b/src/support/parallel_for.h index e1720f1516ff..49a9d4889e33 100644 --- a/src/support/parallel_for.h +++ b/src/support/parallel_for.h @@ -43,25 +43,26 @@ using PartitionerFuncType = std::function>(int, int * \return A list with `num_threads` elements, and each is a list of integers indicating the loop * indexes for the corresponding thread to process. */ -std::vector> rr_partitioner(int begin, int end, int step, int num_threads); +TVM_DLL std::vector> rr_partitioner(int begin, int end, int step, int num_threads); /*! * \brief A runtime api provided to run the task function in parallel. * e.g. A for loop: * for (int i = 0; i < 10; i++) { - * std::cout << index << "\n"; + * a[i] = i; * } * should work the same as: - * parallel_for(0, 10, [](int index) { - * std::cout << index << "\n"; + * parallel_for(0, 10, [&a](int index) { + * a[i] = i; * }); * \param begin The start index of this parallel loop(inclusive). * \param end The end index of this parallel loop(exclusive). * \param f The task function to be excuted. Assert to take an int index as input with no output. * \param step The traversal step to the index. * \param partitioner A partition function to split tasks to different threads. Use Round-robin - * partitioner in default. - * \note Currently do not support nested parallel_for. + * partitioner by default. + * \note 1. Currently do not support nested parallel_for; 2. The order of execution in each thread + * is not guaranteed, the for loop task should be thread independent and thread safe. */ TVM_DLL void parallel_for(int begin, int end, const std::function& f, int step = 1, const PartitionerFuncType partitioner = rr_partitioner); diff --git a/tests/cpp/parallel_for_test.cc b/tests/cpp/parallel_for_test.cc index ee370fcb5c11..4db055c8acd7 100644 --- a/tests/cpp/parallel_for_test.cc +++ b/tests/cpp/parallel_for_test.cc @@ -27,61 +27,64 @@ TEST(ParallelFor, Basic) { using tvm::support::parallel_for; - int a[100], b[100]; + int a[1000], b[1000]; - // Default - for (int i = 0; i < 100; i++) { + // Check for a small size of parallel + for (int i = 0; i < 10; i++) { a[i] = i; } + parallel_for(0, 10, [&b](int i) { b[i] = i; }); + for (int i = 0; i < 10; i++) { + CHECK_EQ(a[i], b[i]); + } - parallel_for(0, 100, [&b](int i) { b[i] = i; }); - - for (int i = 0; i < 100; i++) { + // Check for a large size of parallel + for (int i = 0; i < 1000; i++) { + a[i] = i; + } + parallel_for(0, 1000, [&b](int i) { b[i] = i; }); + for (int i = 0; i < 1000; i++) { CHECK_EQ(a[i], b[i]); } // Check for step != 1 - for (int i = 0; i < 100; i += 2) { + for (int i = 0; i < 1000; i += 2) { a[i] *= 2; } - parallel_for( - 0, 100, [&b](int i) { b[i] *= 2; }, 2); - - for (int i = 0; i < 100; i++) { + 0, 1000, [&b](int i) { b[i] *= 2; }, 2); + for (int i = 0; i < 1000; i++) { CHECK_EQ(a[i], b[i]); } } -TEST(ParallelFor, Nested) { +TEST(ParallelFor, NestedWithNormalForLoop) { using tvm::support::parallel_for; - int a[100][100], b[100][100], c[100][100]; + int a[500][500], b[500][500], c[500][500]; - for (int i = 0; i < 100; i++) { - for (int j = 0; j < 100; j++) { + for (int i = 0; i < 500; i++) { + for (int j = 0; j < 500; j++) { a[i][j] = i * j; } } - parallel_for(0, 100, [&b](int i) { - for (int j = 0; j < 100; j++) { + parallel_for(0, 500, [&b](int i) { + for (int j = 0; j < 500; j++) { b[i][j] = i * j; } }); - - for (int i = 0; i < 100; i++) { - for (int j = 0; j < 100; j++) { + for (int i = 0; i < 500; i++) { + for (int j = 0; j < 500; j++) { CHECK_EQ(a[i][j], b[i][j]); } } - for (int i = 0; i < 100; i++) { - parallel_for(0, 100, [&c, &i](int j) { c[i][j] = i * j; }); + for (int i = 0; i < 500; i++) { + parallel_for(0, 500, [&c, &i](int j) { c[i][j] = i * j; }); } - - for (int i = 0; i < 100; i++) { - for (int j = 0; j < 100; j++) { + for (int i = 0; i < 500; i++) { + for (int j = 0; j < 500; j++) { CHECK_EQ(a[i][j], c[i][j]); } } From 233a6ea94956a0258f9221fc451f97d1486591f4 Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Mon, 17 Aug 2020 14:55:59 +0800 Subject: [PATCH 10/11] Update universal reference --- src/support/parallel_for.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/support/parallel_for.cc b/src/support/parallel_for.cc index 71a3f74b69d2..9870f82c1eca 100644 --- a/src/support/parallel_for.cc +++ b/src/support/parallel_for.cc @@ -68,11 +68,11 @@ void parallel_for(int begin, int end, const std::function& f, int ste threads.emplace_back(std::move(task), run_partition, f); } - for (auto& thread : threads) { + for (auto&& thread : threads) { thread.join(); } try { - for (auto& i : res_vec) { + for (auto&& i : res_vec) { i.get(); } } catch (const std::exception& e) { From 0d76fb30d2f3739b60da5e30f22485eab1f70c4e Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Tue, 18 Aug 2020 17:26:53 +0800 Subject: [PATCH 11/11] Move parallel_for.h to include --- {src => include/tvm}/support/parallel_for.h | 0 src/support/parallel_for.cc | 3 +-- tests/cpp/parallel_for_test.cc | 3 +-- 3 files changed, 2 insertions(+), 4 deletions(-) rename {src => include/tvm}/support/parallel_for.h (100%) diff --git a/src/support/parallel_for.h b/include/tvm/support/parallel_for.h similarity index 100% rename from src/support/parallel_for.h rename to include/tvm/support/parallel_for.h diff --git a/src/support/parallel_for.cc b/src/support/parallel_for.cc index 9870f82c1eca..30f39fbee6f9 100644 --- a/src/support/parallel_for.cc +++ b/src/support/parallel_for.cc @@ -21,9 +21,8 @@ * \file parallel_for.cc * \brief An implementation to run loop in parallel. */ -#include "parallel_for.h" - #include +#include #include #include diff --git a/tests/cpp/parallel_for_test.cc b/tests/cpp/parallel_for_test.cc index 4db055c8acd7..3d586fc1aa15 100644 --- a/tests/cpp/parallel_for_test.cc +++ b/tests/cpp/parallel_for_test.cc @@ -17,10 +17,9 @@ * under the License. */ -#include "../../src/support/parallel_for.h" - #include #include +#include #include