00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef ASIO_DETAIL_SELECT_REACTOR_HPP
00012 #define ASIO_DETAIL_SELECT_REACTOR_HPP
00013
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017
00018 #include "asio/detail/push_options.hpp"
00019
00020 #include "asio/detail/socket_types.hpp"
00021
00022 #include "asio/detail/push_options.hpp"
00023 #include <cstddef>
00024 #include <boost/config.hpp>
00025 #include <boost/date_time/posix_time/posix_time_types.hpp>
00026 #include <vector>
00027 #include "asio/detail/pop_options.hpp"
00028
00029 #include "asio/io_service.hpp"
00030 #include "asio/detail/bind_handler.hpp"
00031 #include "asio/detail/fd_set_adapter.hpp"
00032 #include "asio/detail/mutex.hpp"
00033 #include "asio/detail/noncopyable.hpp"
00034 #include "asio/detail/reactor_op_queue.hpp"
00035 #include "asio/detail/select_interrupter.hpp"
00036 #include "asio/detail/select_reactor_fwd.hpp"
00037 #include "asio/detail/service_base.hpp"
00038 #include "asio/detail/signal_blocker.hpp"
00039 #include "asio/detail/socket_ops.hpp"
00040 #include "asio/detail/socket_types.hpp"
00041 #include "asio/detail/task_io_service.hpp"
00042 #include "asio/detail/thread.hpp"
00043 #include "asio/detail/timer_queue.hpp"
00044
00045 namespace asio {
00046 namespace detail {
00047
00048 template <bool Own_Thread>
00049 class select_reactor
00050 : public asio::detail::service_base<select_reactor<Own_Thread> >
00051 {
00052 public:
00053
00054 select_reactor(asio::io_service& io_service)
00055 : asio::detail::service_base<
00056 select_reactor<Own_Thread> >(io_service),
00057 mutex_(),
00058 select_in_progress_(false),
00059 interrupter_(),
00060 read_op_queue_(),
00061 write_op_queue_(),
00062 except_op_queue_(),
00063 pending_cancellations_(),
00064 stop_thread_(false),
00065 thread_(0),
00066 shutdown_(false)
00067 {
00068 if (Own_Thread)
00069 {
00070 asio::detail::signal_blocker sb;
00071 thread_ = new asio::detail::thread(
00072 bind_handler(&select_reactor::call_run_thread, this));
00073 }
00074 }
00075
00076
00077 ~select_reactor()
00078 {
00079 shutdown_service();
00080 }
00081
00082
00083 void shutdown_service()
00084 {
00085 asio::detail::mutex::scoped_lock lock(mutex_);
00086 shutdown_ = true;
00087 stop_thread_ = true;
00088 lock.unlock();
00089
00090 if (thread_)
00091 {
00092 interrupter_.interrupt();
00093 thread_->join();
00094 delete thread_;
00095 thread_ = 0;
00096 }
00097
00098 read_op_queue_.destroy_operations();
00099 write_op_queue_.destroy_operations();
00100 except_op_queue_.destroy_operations();
00101
00102 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00103 timer_queues_[i]->destroy_timers();
00104 timer_queues_.clear();
00105 }
00106
00107
00108
00109 int register_descriptor(socket_type descriptor)
00110 {
00111 return 0;
00112 }
00113
00114
00115
00116 template <typename Handler>
00117 void start_read_op(socket_type descriptor, Handler handler)
00118 {
00119 asio::detail::mutex::scoped_lock lock(mutex_);
00120 if (!shutdown_)
00121 if (read_op_queue_.enqueue_operation(descriptor, handler))
00122 interrupter_.interrupt();
00123 }
00124
00125
00126
00127 template <typename Handler>
00128 void start_write_op(socket_type descriptor, Handler handler)
00129 {
00130 asio::detail::mutex::scoped_lock lock(mutex_);
00131 if (!shutdown_)
00132 if (write_op_queue_.enqueue_operation(descriptor, handler))
00133 interrupter_.interrupt();
00134 }
00135
00136
00137
00138 template <typename Handler>
00139 void start_except_op(socket_type descriptor, Handler handler)
00140 {
00141 asio::detail::mutex::scoped_lock lock(mutex_);
00142 if (!shutdown_)
00143 if (except_op_queue_.enqueue_operation(descriptor, handler))
00144 interrupter_.interrupt();
00145 }
00146
00147
00148
00149
00150 template <typename Handler>
00151 void start_write_and_except_ops(socket_type descriptor, Handler handler)
00152 {
00153 asio::detail::mutex::scoped_lock lock(mutex_);
00154 if (!shutdown_)
00155 {
00156 bool interrupt = write_op_queue_.enqueue_operation(descriptor, handler);
00157 interrupt = except_op_queue_.enqueue_operation(descriptor, handler)
00158 || interrupt;
00159 if (interrupt)
00160 interrupter_.interrupt();
00161 }
00162 }
00163
00164
00165
00166
00167 void cancel_ops(socket_type descriptor)
00168 {
00169 asio::detail::mutex::scoped_lock lock(mutex_);
00170 cancel_ops_unlocked(descriptor);
00171 }
00172
00173
00174
00175
00176
00177
00178 void enqueue_cancel_ops_unlocked(socket_type descriptor)
00179 {
00180 pending_cancellations_.push_back(descriptor);
00181 }
00182
00183
00184
00185 void close_descriptor(socket_type descriptor)
00186 {
00187 asio::detail::mutex::scoped_lock lock(mutex_);
00188 cancel_ops_unlocked(descriptor);
00189 }
00190
00191
00192 template <typename Time_Traits>
00193 void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
00194 {
00195 asio::detail::mutex::scoped_lock lock(mutex_);
00196 timer_queues_.push_back(&timer_queue);
00197 }
00198
00199
00200 template <typename Time_Traits>
00201 void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
00202 {
00203 asio::detail::mutex::scoped_lock lock(mutex_);
00204 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00205 {
00206 if (timer_queues_[i] == &timer_queue)
00207 {
00208 timer_queues_.erase(timer_queues_.begin() + i);
00209 return;
00210 }
00211 }
00212 }
00213
00214
00215
00216 template <typename Time_Traits, typename Handler>
00217 void schedule_timer(timer_queue<Time_Traits>& timer_queue,
00218 const typename Time_Traits::time_type& time, Handler handler, void* token)
00219 {
00220 asio::detail::mutex::scoped_lock lock(mutex_);
00221 if (!shutdown_)
00222 if (timer_queue.enqueue_timer(time, handler, token))
00223 interrupter_.interrupt();
00224 }
00225
00226
00227
00228 template <typename Time_Traits>
00229 std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
00230 {
00231 asio::detail::mutex::scoped_lock lock(mutex_);
00232 return timer_queue.cancel_timer(token);
00233 }
00234
00235 private:
00236 friend class task_io_service<select_reactor<Own_Thread> >;
00237
00238
00239 void run(bool block)
00240 {
00241 asio::detail::mutex::scoped_lock lock(mutex_);
00242
00243
00244
00245 read_op_queue_.dispatch_cancellations();
00246 write_op_queue_.dispatch_cancellations();
00247 except_op_queue_.dispatch_cancellations();
00248
00249
00250 if (stop_thread_)
00251 {
00252
00253
00254 lock.unlock();
00255 read_op_queue_.cleanup_operations();
00256 write_op_queue_.cleanup_operations();
00257 except_op_queue_.cleanup_operations();
00258 return;
00259 }
00260
00261
00262
00263 if (!block && read_op_queue_.empty() && write_op_queue_.empty()
00264 && except_op_queue_.empty() && all_timer_queues_are_empty())
00265 {
00266
00267
00268 lock.unlock();
00269 read_op_queue_.cleanup_operations();
00270 write_op_queue_.cleanup_operations();
00271 except_op_queue_.cleanup_operations();
00272 return;
00273 }
00274
00275
00276 fd_set_adapter read_fds;
00277 read_fds.set(interrupter_.read_descriptor());
00278 read_op_queue_.get_descriptors(read_fds);
00279 fd_set_adapter write_fds;
00280 write_op_queue_.get_descriptors(write_fds);
00281 fd_set_adapter except_fds;
00282 except_op_queue_.get_descriptors(except_fds);
00283 socket_type max_fd = read_fds.max_descriptor();
00284 if (write_fds.max_descriptor() > max_fd)
00285 max_fd = write_fds.max_descriptor();
00286 if (except_fds.max_descriptor() > max_fd)
00287 max_fd = except_fds.max_descriptor();
00288
00289
00290
00291 timeval tv_buf = { 0, 0 };
00292 timeval* tv = block ? get_timeout(tv_buf) : &tv_buf;
00293 select_in_progress_ = true;
00294 lock.unlock();
00295 asio::error_code ec;
00296 int retval = socket_ops::select(static_cast<int>(max_fd + 1),
00297 read_fds, write_fds, except_fds, tv, ec);
00298 lock.lock();
00299 select_in_progress_ = false;
00300
00301
00302 asio::detail::signal_blocker sb;
00303
00304
00305 if (retval > 0 && read_fds.is_set(interrupter_.read_descriptor()))
00306 interrupter_.reset();
00307
00308
00309 if (retval > 0)
00310 {
00311
00312
00313 except_op_queue_.dispatch_descriptors(except_fds,
00314 asio::error_code());
00315 read_op_queue_.dispatch_descriptors(read_fds,
00316 asio::error_code());
00317 write_op_queue_.dispatch_descriptors(write_fds,
00318 asio::error_code());
00319 except_op_queue_.dispatch_cancellations();
00320 read_op_queue_.dispatch_cancellations();
00321 write_op_queue_.dispatch_cancellations();
00322 }
00323 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00324 timer_queues_[i]->dispatch_timers();
00325
00326
00327 for (size_t i = 0; i < pending_cancellations_.size(); ++i)
00328 cancel_ops_unlocked(pending_cancellations_[i]);
00329 pending_cancellations_.clear();
00330
00331
00332
00333 lock.unlock();
00334 read_op_queue_.cleanup_operations();
00335 write_op_queue_.cleanup_operations();
00336 except_op_queue_.cleanup_operations();
00337 }
00338
00339
00340 void run_thread()
00341 {
00342 asio::detail::mutex::scoped_lock lock(mutex_);
00343 while (!stop_thread_)
00344 {
00345 lock.unlock();
00346 run(true);
00347 lock.lock();
00348 }
00349 }
00350
00351
00352 static void call_run_thread(select_reactor* reactor)
00353 {
00354 reactor->run_thread();
00355 }
00356
00357
00358 void interrupt()
00359 {
00360 interrupter_.interrupt();
00361 }
00362
00363
00364 bool all_timer_queues_are_empty() const
00365 {
00366 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00367 if (!timer_queues_[i]->empty())
00368 return false;
00369 return true;
00370 }
00371
00372
00373 timeval* get_timeout(timeval& tv)
00374 {
00375 if (all_timer_queues_are_empty())
00376 return 0;
00377
00378
00379
00380 boost::posix_time::time_duration minimum_wait_duration
00381 = boost::posix_time::minutes(5);
00382
00383 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00384 {
00385 boost::posix_time::time_duration wait_duration
00386 = timer_queues_[i]->wait_duration();
00387 if (wait_duration < minimum_wait_duration)
00388 minimum_wait_duration = wait_duration;
00389 }
00390
00391 if (minimum_wait_duration > boost::posix_time::time_duration())
00392 {
00393 tv.tv_sec = minimum_wait_duration.total_seconds();
00394 tv.tv_usec = minimum_wait_duration.total_microseconds() % 1000000;
00395 }
00396 else
00397 {
00398 tv.tv_sec = 0;
00399 tv.tv_usec = 0;
00400 }
00401
00402 return &tv;
00403 }
00404
00405
00406
00407
00408 void cancel_ops_unlocked(socket_type descriptor)
00409 {
00410 bool interrupt = read_op_queue_.cancel_operations(descriptor);
00411 interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt;
00412 interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt;
00413 if (interrupt)
00414 interrupter_.interrupt();
00415 }
00416
00417
00418 asio::detail::mutex mutex_;
00419
00420
00421 bool select_in_progress_;
00422
00423
00424 select_interrupter interrupter_;
00425
00426
00427 reactor_op_queue<socket_type> read_op_queue_;
00428
00429
00430 reactor_op_queue<socket_type> write_op_queue_;
00431
00432
00433 reactor_op_queue<socket_type> except_op_queue_;
00434
00435
00436 std::vector<timer_queue_base*> timer_queues_;
00437
00438
00439 std::vector<socket_type> pending_cancellations_;
00440
00441
00442 bool stop_thread_;
00443
00444
00445 asio::detail::thread* thread_;
00446
00447
00448 bool shutdown_;
00449 };
00450
00451 }
00452 }
00453
00454 #include "asio/detail/pop_options.hpp"
00455
00456 #endif // ASIO_DETAIL_SELECT_REACTOR_HPP