00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef ASIO_DETAIL_STRAND_SERVICE_HPP
00012 #define ASIO_DETAIL_STRAND_SERVICE_HPP
00013
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017
00018 #include "asio/detail/push_options.hpp"
00019
00020 #include "asio/detail/push_options.hpp"
00021 #include <boost/aligned_storage.hpp>
00022 #include <boost/assert.hpp>
00023 #include <boost/intrusive_ptr.hpp>
00024 #include "asio/detail/pop_options.hpp"
00025
00026 #include "asio/io_service.hpp"
00027 #include "asio/detail/bind_handler.hpp"
00028 #include "asio/detail/call_stack.hpp"
00029 #include "asio/detail/handler_alloc_helpers.hpp"
00030 #include "asio/detail/handler_invoke_helpers.hpp"
00031 #include "asio/detail/mutex.hpp"
00032 #include "asio/detail/noncopyable.hpp"
00033 #include "asio/detail/service_base.hpp"
00034
00035 namespace asio {
00036 namespace detail {
00037
00038
00039 class strand_service
00040 : public asio::detail::service_base<strand_service>
00041 {
00042 public:
00043 class handler_base;
00044 class invoke_current_handler;
00045 class post_next_waiter_on_exit;
00046
00047
00048 class strand_impl
00049 {
00050 #if defined (__BORLANDC__)
00051 public:
00052 #else
00053 private:
00054 #endif
00055 void add_ref()
00056 {
00057 asio::detail::mutex::scoped_lock lock(mutex_);
00058 ++ref_count_;
00059 }
00060
00061 void release()
00062 {
00063 asio::detail::mutex::scoped_lock lock(mutex_);
00064 --ref_count_;
00065 if (ref_count_ == 0)
00066 {
00067 lock.unlock();
00068 delete this;
00069 }
00070 }
00071
00072 private:
00073
00074 friend class strand_service;
00075 friend class post_next_waiter_on_exit;
00076 friend class invoke_current_handler;
00077
00078 strand_impl(strand_service& owner)
00079 : owner_(owner),
00080 current_handler_(0),
00081 first_waiter_(0),
00082 last_waiter_(0),
00083 ref_count_(0)
00084 {
00085
00086 asio::detail::mutex::scoped_lock lock(owner_.mutex_);
00087 next_ = owner_.impl_list_;
00088 prev_ = 0;
00089 if (owner_.impl_list_)
00090 owner_.impl_list_->prev_ = this;
00091 owner_.impl_list_ = this;
00092 }
00093
00094 ~strand_impl()
00095 {
00096
00097 asio::detail::mutex::scoped_lock lock(owner_.mutex_);
00098 if (owner_.impl_list_ == this)
00099 owner_.impl_list_ = next_;
00100 if (prev_)
00101 prev_->next_ = next_;
00102 if (next_)
00103 next_->prev_= prev_;
00104 next_ = 0;
00105 prev_ = 0;
00106 lock.unlock();
00107
00108 if (current_handler_)
00109 {
00110 current_handler_->destroy();
00111 }
00112
00113 while (first_waiter_)
00114 {
00115 handler_base* next = first_waiter_->next_;
00116 first_waiter_->destroy();
00117 first_waiter_ = next;
00118 }
00119 }
00120
00121
00122 asio::detail::mutex mutex_;
00123
00124
00125 strand_service& owner_;
00126
00127
00128
00129 handler_base* current_handler_;
00130
00131
00132 handler_base* first_waiter_;
00133
00134
00135 handler_base* last_waiter_;
00136
00137
00138 typedef boost::aligned_storage<64> handler_storage_type;
00139 #if defined(__BORLANDC__)
00140 boost::aligned_storage<64> handler_storage_;
00141 #else
00142 handler_storage_type handler_storage_;
00143 #endif
00144
00145
00146 strand_impl* next_;
00147 strand_impl* prev_;
00148
00149
00150 size_t ref_count_;
00151
00152 #if !defined(__BORLANDC__)
00153 friend void intrusive_ptr_add_ref(strand_impl* p)
00154 {
00155 p->add_ref();
00156 }
00157
00158 friend void intrusive_ptr_release(strand_impl* p)
00159 {
00160 p->release();
00161 }
00162 #endif
00163 };
00164
00165 friend class strand_impl;
00166
00167 typedef boost::intrusive_ptr<strand_impl> implementation_type;
00168
00169
00170 class handler_base
00171 {
00172 public:
00173 typedef void (*invoke_func_type)(handler_base*,
00174 strand_service&, implementation_type&);
00175 typedef void (*destroy_func_type)(handler_base*);
00176
00177 handler_base(invoke_func_type invoke_func, destroy_func_type destroy_func)
00178 : next_(0),
00179 invoke_func_(invoke_func),
00180 destroy_func_(destroy_func)
00181 {
00182 }
00183
00184 void invoke(strand_service& service_impl, implementation_type& impl)
00185 {
00186 invoke_func_(this, service_impl, impl);
00187 }
00188
00189 void destroy()
00190 {
00191 destroy_func_(this);
00192 }
00193
00194 protected:
00195 ~handler_base()
00196 {
00197 }
00198
00199 private:
00200 friend class strand_service;
00201 friend class strand_impl;
00202 friend class post_next_waiter_on_exit;
00203 handler_base* next_;
00204 invoke_func_type invoke_func_;
00205 destroy_func_type destroy_func_;
00206 };
00207
00208
00209 class invoke_current_handler
00210 {
00211 public:
00212 invoke_current_handler(strand_service& service_impl,
00213 const implementation_type& impl)
00214 : service_impl_(service_impl),
00215 impl_(impl)
00216 {
00217 }
00218
00219 void operator()()
00220 {
00221 impl_->current_handler_->invoke(service_impl_, impl_);
00222 }
00223
00224 friend void* asio_handler_allocate(std::size_t size,
00225 invoke_current_handler* this_handler)
00226 {
00227 return this_handler->do_handler_allocate(size);
00228 }
00229
00230 friend void asio_handler_deallocate(void*, std::size_t,
00231 invoke_current_handler*)
00232 {
00233 }
00234
00235 void* do_handler_allocate(std::size_t size)
00236 {
00237 #if defined(__BORLANDC__)
00238 BOOST_ASSERT(size <= boost::aligned_storage<64>::size);
00239 #else
00240 BOOST_ASSERT(size <= strand_impl::handler_storage_type::size);
00241 #endif
00242 return impl_->handler_storage_.address();
00243 }
00244
00245
00246
00247
00248
00249 private:
00250 strand_service& service_impl_;
00251 implementation_type impl_;
00252 };
00253
00254
00255 class post_next_waiter_on_exit
00256 {
00257 public:
00258 post_next_waiter_on_exit(strand_service& service_impl,
00259 implementation_type& impl)
00260 : service_impl_(service_impl),
00261 impl_(impl),
00262 cancelled_(false)
00263 {
00264 }
00265
00266 ~post_next_waiter_on_exit()
00267 {
00268 if (!cancelled_)
00269 {
00270 asio::detail::mutex::scoped_lock lock(impl_->mutex_);
00271 impl_->current_handler_ = impl_->first_waiter_;
00272 if (impl_->current_handler_)
00273 {
00274 impl_->first_waiter_ = impl_->first_waiter_->next_;
00275 if (impl_->first_waiter_ == 0)
00276 impl_->last_waiter_ = 0;
00277 lock.unlock();
00278 service_impl_.io_service().post(
00279 invoke_current_handler(service_impl_, impl_));
00280 }
00281 }
00282 }
00283
00284 void cancel()
00285 {
00286 cancelled_ = true;
00287 }
00288
00289 private:
00290 strand_service& service_impl_;
00291 implementation_type& impl_;
00292 bool cancelled_;
00293 };
00294
00295
00296 template <typename Handler>
00297 class handler_wrapper
00298 : public handler_base
00299 {
00300 public:
00301 handler_wrapper(Handler handler)
00302 : handler_base(&handler_wrapper<Handler>::do_invoke,
00303 &handler_wrapper<Handler>::do_destroy),
00304 handler_(handler)
00305 {
00306 }
00307
00308 static void do_invoke(handler_base* base,
00309 strand_service& service_impl, implementation_type& impl)
00310 {
00311
00312 typedef handler_wrapper<Handler> this_type;
00313 this_type* h(static_cast<this_type*>(base));
00314 typedef handler_alloc_traits<Handler, this_type> alloc_traits;
00315 handler_ptr<alloc_traits> ptr(h->handler_, h);
00316
00317 post_next_waiter_on_exit p1(service_impl, impl);
00318
00319
00320
00321 Handler handler(h->handler_);
00322
00323
00324
00325
00326
00327 p1.cancel();
00328 post_next_waiter_on_exit p2(service_impl, impl);
00329
00330
00331 ptr.reset();
00332
00333
00334 call_stack<strand_impl>::context ctx(impl.get());
00335
00336
00337 asio_handler_invoke_helpers::invoke(handler, &handler);
00338 }
00339
00340 static void do_destroy(handler_base* base)
00341 {
00342
00343 typedef handler_wrapper<Handler> this_type;
00344 this_type* h(static_cast<this_type*>(base));
00345 typedef handler_alloc_traits<Handler, this_type> alloc_traits;
00346 handler_ptr<alloc_traits> ptr(h->handler_, h);
00347 }
00348
00349 private:
00350 Handler handler_;
00351 };
00352
00353
00354 explicit strand_service(asio::io_service& io_service)
00355 : asio::detail::service_base<strand_service>(io_service),
00356 mutex_(),
00357 impl_list_(0)
00358 {
00359 }
00360
00361
00362 void shutdown_service()
00363 {
00364
00365 asio::detail::mutex::scoped_lock lock(mutex_);
00366 strand_impl* impl = impl_list_;
00367 handler_base* first_handler = 0;
00368 while (impl)
00369 {
00370 if (impl->current_handler_)
00371 {
00372 impl->current_handler_->next_ = first_handler;
00373 first_handler = impl->current_handler_;
00374 impl->current_handler_ = 0;
00375 }
00376 if (impl->first_waiter_)
00377 {
00378 impl->last_waiter_->next_ = first_handler;
00379 first_handler = impl->first_waiter_;
00380 impl->first_waiter_ = 0;
00381 impl->last_waiter_ = 0;
00382 }
00383 impl = impl->next_;
00384 }
00385
00386
00387 lock.unlock();
00388 while (first_handler)
00389 {
00390 handler_base* next = first_handler->next_;
00391 first_handler->destroy();
00392 first_handler = next;
00393 }
00394 }
00395
00396
00397 void construct(implementation_type& impl)
00398 {
00399 impl = implementation_type(new strand_impl(*this));
00400 }
00401
00402
00403 void destroy(implementation_type& impl)
00404 {
00405 implementation_type().swap(impl);
00406 }
00407
00408
00409 template <typename Handler>
00410 void dispatch(implementation_type& impl, Handler handler)
00411 {
00412 if (call_stack<strand_impl>::contains(impl.get()))
00413 {
00414 asio_handler_invoke_helpers::invoke(handler, &handler);
00415 }
00416 else
00417 {
00418 asio::detail::mutex::scoped_lock lock(impl->mutex_);
00419
00420
00421 typedef handler_wrapper<Handler> value_type;
00422 typedef handler_alloc_traits<Handler, value_type> alloc_traits;
00423 raw_handler_ptr<alloc_traits> raw_ptr(handler);
00424 handler_ptr<alloc_traits> ptr(raw_ptr, handler);
00425
00426 if (impl->current_handler_ == 0)
00427 {
00428
00429 impl->current_handler_ = ptr.get();
00430 lock.unlock();
00431 this->io_service().dispatch(invoke_current_handler(*this, impl));
00432 ptr.release();
00433 }
00434 else
00435 {
00436
00437
00438
00439 if (impl->last_waiter_)
00440 {
00441 impl->last_waiter_->next_ = ptr.get();
00442 impl->last_waiter_ = impl->last_waiter_->next_;
00443 }
00444 else
00445 {
00446 impl->first_waiter_ = ptr.get();
00447 impl->last_waiter_ = ptr.get();
00448 }
00449 ptr.release();
00450 }
00451 }
00452 }
00453
00454
00455 template <typename Handler>
00456 void post(implementation_type& impl, Handler handler)
00457 {
00458 asio::detail::mutex::scoped_lock lock(impl->mutex_);
00459
00460
00461 typedef handler_wrapper<Handler> value_type;
00462 typedef handler_alloc_traits<Handler, value_type> alloc_traits;
00463 raw_handler_ptr<alloc_traits> raw_ptr(handler);
00464 handler_ptr<alloc_traits> ptr(raw_ptr, handler);
00465
00466 if (impl->current_handler_ == 0)
00467 {
00468
00469 impl->current_handler_ = ptr.get();
00470 lock.unlock();
00471 this->io_service().post(invoke_current_handler(*this, impl));
00472 ptr.release();
00473 }
00474 else
00475 {
00476
00477
00478
00479 if (impl->last_waiter_)
00480 {
00481 impl->last_waiter_->next_ = ptr.get();
00482 impl->last_waiter_ = impl->last_waiter_->next_;
00483 }
00484 else
00485 {
00486 impl->first_waiter_ = ptr.get();
00487 impl->last_waiter_ = ptr.get();
00488 }
00489 ptr.release();
00490 }
00491 }
00492
00493 private:
00494
00495 asio::detail::mutex mutex_;
00496
00497
00498 strand_impl* impl_list_;
00499 };
00500
00501 }
00502 }
00503
00504 #if defined(__BORLANDC__)
00505
00506 namespace boost {
00507
00508 inline void intrusive_ptr_add_ref(
00509 asio::detail::strand_service::strand_impl* p)
00510 {
00511 p->add_ref();
00512 }
00513
00514 inline void intrusive_ptr_release(
00515 asio::detail::strand_service::strand_impl* p)
00516 {
00517 p->release();
00518 }
00519
00520 }
00521
00522 #endif // defined(__BORLANDC__)
00523
00524 #include "asio/detail/pop_options.hpp"
00525
00526 #endif // ASIO_DETAIL_STRAND_SERVICE_HPP