#ifndef HV_EVENT_LOOP_THREAD_POOL_HPP_ #define HV_EVENT_LOOP_THREAD_POOL_HPP_ #include "EventLoopThread.h" #include "hbase.h" namespace hv { class EventLoopThreadPool : public Status { public: EventLoopThreadPool(int thread_num = std::thread::hardware_concurrency()) { setStatus(kInitializing); thread_num_ = thread_num; next_loop_idx_ = 0; setStatus(kInitialized); } ~EventLoopThreadPool() { stop(); join(); } int threadNum() { return thread_num_; } void setThreadNum(int num) { thread_num_ = num; } EventLoopPtr nextLoop(load_balance_e lb = LB_RoundRobin) { size_t numLoops = loop_threads_.size(); if (numLoops == 0) return NULL; size_t idx = 0; if (lb == LB_RoundRobin) { if (++next_loop_idx_ >= numLoops) next_loop_idx_ = 0; idx = next_loop_idx_ % numLoops; } else if (lb == LB_Random) { idx = hv_rand(0, numLoops - 1); } else if (lb == LB_LeastConnections) { for (size_t i = 1; i < numLoops; ++i) { if (loop_threads_[i]->loop()->connectionNum < loop_threads_[idx]->loop()->connectionNum) { idx = i; } } } else { // Not Implemented } return loop_threads_[idx]->loop(); } EventLoopPtr loop(int idx = -1) { if (idx >= 0 && idx < (int)loop_threads_.size()) { return loop_threads_[idx]->loop(); } return nextLoop(); } hloop_t* hloop(int idx = -1) { EventLoopPtr ptr = loop(idx); return ptr ? ptr->loop() : NULL; } // @param wait_threads_started: if ture this method will block until all loop_threads started. // @param pre: This functor will be executed when loop_thread started. // @param post:This Functor will be executed when loop_thread stopped. void start(bool wait_threads_started = false, std::function pre = NULL, std::function post = NULL) { if (thread_num_ == 0) return; if (status() >= kStarting && status() < kStopped) return; setStatus(kStarting); auto started_cnt = std::make_shared>(0); auto exited_cnt = std::make_shared>(0); loop_threads_.clear(); for (int i = 0; i < thread_num_; ++i) { auto loop_thread = std::make_shared(); const EventLoopPtr& loop = loop_thread->loop(); loop_thread->start(false, [this, started_cnt, pre, &loop]() { if (++(*started_cnt) == thread_num_) { setStatus(kRunning); } if (pre) pre(loop); return 0; }, [this, exited_cnt, post, &loop]() { if (post) post(loop); if (++(*exited_cnt) == thread_num_) { setStatus(kStopped); } return 0; } ); loop_threads_.push_back(loop_thread); } if (wait_threads_started) { while (status() < kRunning) { hv_delay(1); } } } // @param wait_threads_started: if ture this method will block until all loop_threads stopped. // stop thread-safe void stop(bool wait_threads_stopped = false) { if (status() < kStarting || status() >= kStopping) return; setStatus(kStopping); for (auto& loop_thread : loop_threads_) { loop_thread->stop(false); } if (wait_threads_stopped) { join(); } } // @brief join all loop_threads // @note destructor will join loop_threads if you forget to call this method. void join() { for (auto& loop_thread : loop_threads_) { loop_thread->join(); } } private: int thread_num_; std::vector loop_threads_; std::atomic next_loop_idx_; }; } #endif // HV_EVENT_LOOP_THREAD_POOL_HPP_