00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef ASIO_DETAIL_TASK_IO_SERVICE_HPP
00012 #define ASIO_DETAIL_TASK_IO_SERVICE_HPP
00013
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017
00018 #include "asio/detail/push_options.hpp"
00019
00020 #include "asio/error_code.hpp"
00021 #include "asio/io_service.hpp"
00022 #include "asio/detail/call_stack.hpp"
00023 #include "asio/detail/event.hpp"
00024 #include "asio/detail/handler_alloc_helpers.hpp"
00025 #include "asio/detail/handler_invoke_helpers.hpp"
00026 #include "asio/detail/mutex.hpp"
00027 #include "asio/detail/service_base.hpp"
00028 #include "asio/detail/task_io_service_fwd.hpp"
00029
00030 namespace asio {
00031 namespace detail {
00032
00033 template <typename Task>
00034 class task_io_service
00035 : public asio::detail::service_base<task_io_service<Task> >
00036 {
00037 public:
00038
00039 task_io_service(asio::io_service& io_service)
00040 : asio::detail::service_base<task_io_service<Task> >(io_service),
00041 mutex_(),
00042 task_(use_service<Task>(io_service)),
00043 outstanding_work_(0),
00044 handler_queue_(&task_handler_),
00045 handler_queue_end_(&task_handler_),
00046 stopped_(false),
00047 shutdown_(false),
00048 first_idle_thread_(0)
00049 {
00050 }
00051
00052 void init(size_t )
00053 {
00054 }
00055
00056
00057 void shutdown_service()
00058 {
00059 asio::detail::mutex::scoped_lock lock(mutex_);
00060 shutdown_ = true;
00061 lock.unlock();
00062
00063
00064 while (handler_queue_)
00065 {
00066 handler_base* h = handler_queue_;
00067 handler_queue_ = h->next_;
00068 if (h != &task_handler_)
00069 h->destroy();
00070 }
00071
00072
00073 handler_queue_ = &task_handler_;
00074 handler_queue_end_ = &task_handler_;
00075 }
00076
00077
00078 size_t run(asio::error_code& ec)
00079 {
00080 typename call_stack<task_io_service>::context ctx(this);
00081
00082 idle_thread_info this_idle_thread;
00083 this_idle_thread.prev = &this_idle_thread;
00084 this_idle_thread.next = &this_idle_thread;
00085
00086 asio::detail::mutex::scoped_lock lock(mutex_);
00087
00088 size_t n = 0;
00089 while (do_one(lock, &this_idle_thread, ec))
00090 if (n != (std::numeric_limits<size_t>::max)())
00091 ++n;
00092 return n;
00093 }
00094
00095
00096 size_t run_one(asio::error_code& ec)
00097 {
00098 typename call_stack<task_io_service>::context ctx(this);
00099
00100 idle_thread_info this_idle_thread;
00101 this_idle_thread.prev = &this_idle_thread;
00102 this_idle_thread.next = &this_idle_thread;
00103
00104 asio::detail::mutex::scoped_lock lock(mutex_);
00105
00106 return do_one(lock, &this_idle_thread, ec);
00107 }
00108
00109
00110 size_t poll(asio::error_code& ec)
00111 {
00112 typename call_stack<task_io_service>::context ctx(this);
00113
00114 asio::detail::mutex::scoped_lock lock(mutex_);
00115
00116 size_t n = 0;
00117 while (do_one(lock, 0, ec))
00118 if (n != (std::numeric_limits<size_t>::max)())
00119 ++n;
00120 return n;
00121 }
00122
00123
00124 size_t poll_one(asio::error_code& ec)
00125 {
00126 typename call_stack<task_io_service>::context ctx(this);
00127
00128 asio::detail::mutex::scoped_lock lock(mutex_);
00129
00130 return do_one(lock, 0, ec);
00131 }
00132
00133
00134 void stop()
00135 {
00136 asio::detail::mutex::scoped_lock lock(mutex_);
00137 stop_all_threads();
00138 }
00139
00140
00141 void reset()
00142 {
00143 asio::detail::mutex::scoped_lock lock(mutex_);
00144 stopped_ = false;
00145 }
00146
00147
00148 void work_started()
00149 {
00150 asio::detail::mutex::scoped_lock lock(mutex_);
00151 ++outstanding_work_;
00152 }
00153
00154
00155 void work_finished()
00156 {
00157 asio::detail::mutex::scoped_lock lock(mutex_);
00158 if (--outstanding_work_ == 0)
00159 stop_all_threads();
00160 }
00161
00162
00163 template <typename Handler>
00164 void dispatch(Handler handler)
00165 {
00166 if (call_stack<task_io_service>::contains(this))
00167 asio_handler_invoke_helpers::invoke(handler, &handler);
00168 else
00169 post(handler);
00170 }
00171
00172
00173 template <typename Handler>
00174 void post(Handler handler)
00175 {
00176
00177 typedef handler_wrapper<Handler> value_type;
00178 typedef handler_alloc_traits<Handler, value_type> alloc_traits;
00179 raw_handler_ptr<alloc_traits> raw_ptr(handler);
00180 handler_ptr<alloc_traits> ptr(raw_ptr, handler);
00181
00182 asio::detail::mutex::scoped_lock lock(mutex_);
00183
00184
00185 if (shutdown_)
00186 return;
00187
00188
00189 if (handler_queue_end_)
00190 {
00191 handler_queue_end_->next_ = ptr.get();
00192 handler_queue_end_ = ptr.get();
00193 }
00194 else
00195 {
00196 handler_queue_ = handler_queue_end_ = ptr.get();
00197 }
00198 ptr.release();
00199
00200
00201 ++outstanding_work_;
00202
00203
00204 if (!interrupt_one_idle_thread())
00205 if (task_handler_.next_ == 0 && handler_queue_end_ != &task_handler_)
00206 task_.interrupt();
00207 }
00208
00209 private:
00210 struct idle_thread_info;
00211
00212 size_t do_one(asio::detail::mutex::scoped_lock& lock,
00213 idle_thread_info* this_idle_thread, asio::error_code& ec)
00214 {
00215 if (outstanding_work_ == 0 && !stopped_)
00216 {
00217 stop_all_threads();
00218 ec = asio::error_code();
00219 return 0;
00220 }
00221
00222 bool polling = !this_idle_thread;
00223 bool task_has_run = false;
00224 while (!stopped_)
00225 {
00226 if (handler_queue_)
00227 {
00228
00229 handler_base* h = handler_queue_;
00230 handler_queue_ = h->next_;
00231 if (handler_queue_ == 0)
00232 handler_queue_end_ = 0;
00233 bool more_handlers = (handler_queue_ != 0);
00234 lock.unlock();
00235
00236 if (h == &task_handler_)
00237 {
00238
00239 if (task_has_run && polling)
00240 {
00241 ec = asio::error_code();
00242 return 0;
00243 }
00244 task_has_run = true;
00245
00246 task_cleanup c(lock, *this);
00247
00248
00249
00250
00251 task_.run(!more_handlers && !polling);
00252 }
00253 else
00254 {
00255 handler_cleanup c(lock, *this);
00256
00257
00258 h->call();
00259
00260 ec = asio::error_code();
00261 return 1;
00262 }
00263 }
00264 else if (this_idle_thread)
00265 {
00266
00267 if (first_idle_thread_)
00268 {
00269 this_idle_thread->next = first_idle_thread_;
00270 this_idle_thread->prev = first_idle_thread_->prev;
00271 first_idle_thread_->prev->next = this_idle_thread;
00272 first_idle_thread_->prev = this_idle_thread;
00273 }
00274 first_idle_thread_ = this_idle_thread;
00275 this_idle_thread->wakeup_event.clear();
00276 lock.unlock();
00277 this_idle_thread->wakeup_event.wait();
00278 lock.lock();
00279 if (this_idle_thread->next == this_idle_thread)
00280 {
00281 first_idle_thread_ = 0;
00282 }
00283 else
00284 {
00285 if (first_idle_thread_ == this_idle_thread)
00286 first_idle_thread_ = this_idle_thread->next;
00287 this_idle_thread->next->prev = this_idle_thread->prev;
00288 this_idle_thread->prev->next = this_idle_thread->next;
00289 this_idle_thread->next = this_idle_thread;
00290 this_idle_thread->prev = this_idle_thread;
00291 }
00292 }
00293 else
00294 {
00295 ec = asio::error_code();
00296 return 0;
00297 }
00298 }
00299
00300 ec = asio::error_code();
00301 return 0;
00302 }
00303
00304
00305 void stop_all_threads()
00306 {
00307 stopped_ = true;
00308 interrupt_all_idle_threads();
00309 if (task_handler_.next_ == 0 && handler_queue_end_ != &task_handler_)
00310 task_.interrupt();
00311 }
00312
00313
00314
00315 bool interrupt_one_idle_thread()
00316 {
00317 if (first_idle_thread_)
00318 {
00319 first_idle_thread_->wakeup_event.signal();
00320 first_idle_thread_ = first_idle_thread_->next;
00321 return true;
00322 }
00323 return false;
00324 }
00325
00326
00327 void interrupt_all_idle_threads()
00328 {
00329 if (first_idle_thread_)
00330 {
00331 first_idle_thread_->wakeup_event.signal();
00332 idle_thread_info* current_idle_thread = first_idle_thread_->next;
00333 while (current_idle_thread != first_idle_thread_)
00334 {
00335 current_idle_thread->wakeup_event.signal();
00336 current_idle_thread = current_idle_thread->next;
00337 }
00338 }
00339 }
00340
00341 class task_cleanup;
00342 friend class task_cleanup;
00343
00344
00345
00346 class handler_base
00347 {
00348 public:
00349 typedef void (*call_func_type)(handler_base*);
00350 typedef void (*destroy_func_type)(handler_base*);
00351
00352 handler_base(call_func_type call_func, destroy_func_type destroy_func)
00353 : next_(0),
00354 call_func_(call_func),
00355 destroy_func_(destroy_func)
00356 {
00357 }
00358
00359 void call()
00360 {
00361 call_func_(this);
00362 }
00363
00364 void destroy()
00365 {
00366 destroy_func_(this);
00367 }
00368
00369 protected:
00370
00371 ~handler_base()
00372 {
00373 }
00374
00375 private:
00376 friend class task_io_service<Task>;
00377 friend class task_cleanup;
00378 handler_base* next_;
00379 call_func_type call_func_;
00380 destroy_func_type destroy_func_;
00381 };
00382
00383
00384 template <typename Handler>
00385 class handler_wrapper
00386 : public handler_base
00387 {
00388 public:
00389 handler_wrapper(Handler handler)
00390 : handler_base(&handler_wrapper<Handler>::do_call,
00391 &handler_wrapper<Handler>::do_destroy),
00392 handler_(handler)
00393 {
00394 }
00395
00396 static void do_call(handler_base* base)
00397 {
00398
00399 typedef handler_wrapper<Handler> this_type;
00400 this_type* h(static_cast<this_type*>(base));
00401 typedef handler_alloc_traits<Handler, this_type> alloc_traits;
00402 handler_ptr<alloc_traits> ptr(h->handler_, h);
00403
00404
00405
00406 Handler handler(h->handler_);
00407
00408
00409 ptr.reset();
00410
00411
00412 asio_handler_invoke_helpers::invoke(handler, &handler);
00413 }
00414
00415 static void do_destroy(handler_base* base)
00416 {
00417
00418 typedef handler_wrapper<Handler> this_type;
00419 this_type* h(static_cast<this_type*>(base));
00420 typedef handler_alloc_traits<Handler, this_type> alloc_traits;
00421 handler_ptr<alloc_traits> ptr(h->handler_, h);
00422 }
00423
00424 private:
00425 Handler handler_;
00426 };
00427
00428
00429 class task_cleanup
00430 {
00431 public:
00432 task_cleanup(asio::detail::mutex::scoped_lock& lock,
00433 task_io_service& task_io_svc)
00434 : lock_(lock),
00435 task_io_service_(task_io_svc)
00436 {
00437 }
00438
00439 ~task_cleanup()
00440 {
00441
00442 lock_.lock();
00443 task_io_service_.task_handler_.next_ = 0;
00444 if (task_io_service_.handler_queue_end_)
00445 {
00446 task_io_service_.handler_queue_end_->next_
00447 = &task_io_service_.task_handler_;
00448 task_io_service_.handler_queue_end_
00449 = &task_io_service_.task_handler_;
00450 }
00451 else
00452 {
00453 task_io_service_.handler_queue_
00454 = task_io_service_.handler_queue_end_
00455 = &task_io_service_.task_handler_;
00456 }
00457 }
00458
00459 private:
00460 asio::detail::mutex::scoped_lock& lock_;
00461 task_io_service& task_io_service_;
00462 };
00463
00464
00465 class handler_cleanup;
00466 friend class handler_cleanup;
00467 class handler_cleanup
00468 {
00469 public:
00470 handler_cleanup(asio::detail::mutex::scoped_lock& lock,
00471 task_io_service& task_io_svc)
00472 : lock_(lock),
00473 task_io_service_(task_io_svc)
00474 {
00475 }
00476
00477 ~handler_cleanup()
00478 {
00479 lock_.lock();
00480 if (--task_io_service_.outstanding_work_ == 0)
00481 task_io_service_.stop_all_threads();
00482 }
00483
00484 private:
00485 asio::detail::mutex::scoped_lock& lock_;
00486 task_io_service& task_io_service_;
00487 };
00488
00489
00490 asio::detail::mutex mutex_;
00491
00492
00493 Task& task_;
00494
00495
00496 class task_handler
00497 : public handler_base
00498 {
00499 public:
00500 task_handler()
00501 : handler_base(0, 0)
00502 {
00503 }
00504 } task_handler_;
00505
00506
00507 int outstanding_work_;
00508
00509
00510 handler_base* handler_queue_;
00511
00512
00513 handler_base* handler_queue_end_;
00514
00515
00516 bool stopped_;
00517
00518
00519 bool shutdown_;
00520
00521
00522 struct idle_thread_info
00523 {
00524 event wakeup_event;
00525 idle_thread_info* prev;
00526 idle_thread_info* next;
00527 };
00528
00529
00530 idle_thread_info* first_idle_thread_;
00531 };
00532
00533 }
00534 }
00535
00536 #include "asio/detail/pop_options.hpp"
00537
00538 #endif // ASIO_DETAIL_TASK_IO_SERVICE_HPP