00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef ASIO_DETAIL_EPOLL_REACTOR_HPP
00012 #define ASIO_DETAIL_EPOLL_REACTOR_HPP
00013
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017
00018 #include "asio/detail/push_options.hpp"
00019
00020 #include "asio/detail/epoll_reactor_fwd.hpp"
00021
00022 #if defined(ASIO_HAS_EPOLL)
00023
00024 #include "asio/detail/push_options.hpp"
00025 #include <cstddef>
00026 #include <vector>
00027 #include <sys/epoll.h>
00028 #include <boost/config.hpp>
00029 #include <boost/date_time/posix_time/posix_time_types.hpp>
00030 #include <boost/throw_exception.hpp>
00031 #include "asio/detail/pop_options.hpp"
00032
00033 #include "asio/error.hpp"
00034 #include "asio/io_service.hpp"
00035 #include "asio/system_error.hpp"
00036 #include "asio/detail/bind_handler.hpp"
00037 #include "asio/detail/hash_map.hpp"
00038 #include "asio/detail/mutex.hpp"
00039 #include "asio/detail/task_io_service.hpp"
00040 #include "asio/detail/thread.hpp"
00041 #include "asio/detail/reactor_op_queue.hpp"
00042 #include "asio/detail/select_interrupter.hpp"
00043 #include "asio/detail/service_base.hpp"
00044 #include "asio/detail/signal_blocker.hpp"
00045 #include "asio/detail/socket_types.hpp"
00046 #include "asio/detail/timer_queue.hpp"
00047
00048 namespace asio {
00049 namespace detail {
00050
00051 template <bool Own_Thread>
00052 class epoll_reactor
00053 : public asio::detail::service_base<epoll_reactor<Own_Thread> >
00054 {
00055 public:
00056
00057 epoll_reactor(asio::io_service& io_service)
00058 : asio::detail::service_base<epoll_reactor<Own_Thread> >(io_service),
00059 mutex_(),
00060 epoll_fd_(do_epoll_create()),
00061 wait_in_progress_(false),
00062 interrupter_(),
00063 read_op_queue_(),
00064 write_op_queue_(),
00065 except_op_queue_(),
00066 pending_cancellations_(),
00067 stop_thread_(false),
00068 thread_(0),
00069 shutdown_(false)
00070 {
00071
00072 if (Own_Thread)
00073 {
00074 asio::detail::signal_blocker sb;
00075 thread_ = new asio::detail::thread(
00076 bind_handler(&epoll_reactor::call_run_thread, this));
00077 }
00078
00079
00080 epoll_event ev = { 0, { 0 } };
00081 ev.events = EPOLLIN | EPOLLERR;
00082 ev.data.fd = interrupter_.read_descriptor();
00083 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
00084 }
00085
00086
00087 ~epoll_reactor()
00088 {
00089 shutdown_service();
00090 close(epoll_fd_);
00091 }
00092
00093
00094 void shutdown_service()
00095 {
00096 asio::detail::mutex::scoped_lock lock(mutex_);
00097 shutdown_ = true;
00098 stop_thread_ = true;
00099 lock.unlock();
00100
00101 if (thread_)
00102 {
00103 interrupter_.interrupt();
00104 thread_->join();
00105 delete thread_;
00106 thread_ = 0;
00107 }
00108
00109 read_op_queue_.destroy_operations();
00110 write_op_queue_.destroy_operations();
00111 except_op_queue_.destroy_operations();
00112
00113 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00114 timer_queues_[i]->destroy_timers();
00115 timer_queues_.clear();
00116 }
00117
00118
00119
00120 int register_descriptor(socket_type descriptor)
00121 {
00122
00123
00124 epoll_event ev = { 0, { 0 } };
00125 ev.events = 0;
00126 ev.data.fd = descriptor;
00127 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
00128 if (result != 0)
00129 return errno;
00130 return 0;
00131 }
00132
00133
00134
00135 template <typename Handler>
00136 void start_read_op(socket_type descriptor, Handler handler)
00137 {
00138 asio::detail::mutex::scoped_lock lock(mutex_);
00139
00140 if (shutdown_)
00141 return;
00142
00143 if (!read_op_queue_.has_operation(descriptor))
00144 if (handler(asio::error_code()))
00145 return;
00146
00147 if (read_op_queue_.enqueue_operation(descriptor, handler))
00148 {
00149 epoll_event ev = { 0, { 0 } };
00150 ev.events = EPOLLIN | EPOLLERR | EPOLLHUP;
00151 if (write_op_queue_.has_operation(descriptor))
00152 ev.events |= EPOLLOUT;
00153 if (except_op_queue_.has_operation(descriptor))
00154 ev.events |= EPOLLPRI;
00155 ev.data.fd = descriptor;
00156
00157 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00158 if (result != 0)
00159 {
00160 asio::error_code ec(errno, asio::native_ecat);
00161 read_op_queue_.dispatch_all_operations(descriptor, ec);
00162 }
00163 }
00164 }
00165
00166
00167
00168 template <typename Handler>
00169 void start_write_op(socket_type descriptor, Handler handler)
00170 {
00171 asio::detail::mutex::scoped_lock lock(mutex_);
00172
00173 if (shutdown_)
00174 return;
00175
00176 if (!write_op_queue_.has_operation(descriptor))
00177 if (handler(asio::error_code()))
00178 return;
00179
00180 if (write_op_queue_.enqueue_operation(descriptor, handler))
00181 {
00182 epoll_event ev = { 0, { 0 } };
00183 ev.events = EPOLLOUT | EPOLLERR | EPOLLHUP;
00184 if (read_op_queue_.has_operation(descriptor))
00185 ev.events |= EPOLLIN;
00186 if (except_op_queue_.has_operation(descriptor))
00187 ev.events |= EPOLLPRI;
00188 ev.data.fd = descriptor;
00189
00190 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00191 if (result != 0)
00192 {
00193 asio::error_code ec(errno, asio::native_ecat);
00194 write_op_queue_.dispatch_all_operations(descriptor, ec);
00195 }
00196 }
00197 }
00198
00199
00200
00201 template <typename Handler>
00202 void start_except_op(socket_type descriptor, Handler handler)
00203 {
00204 asio::detail::mutex::scoped_lock lock(mutex_);
00205
00206 if (shutdown_)
00207 return;
00208
00209 if (except_op_queue_.enqueue_operation(descriptor, handler))
00210 {
00211 epoll_event ev = { 0, { 0 } };
00212 ev.events = EPOLLPRI | EPOLLERR | EPOLLHUP;
00213 if (read_op_queue_.has_operation(descriptor))
00214 ev.events |= EPOLLIN;
00215 if (write_op_queue_.has_operation(descriptor))
00216 ev.events |= EPOLLOUT;
00217 ev.data.fd = descriptor;
00218
00219 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00220 if (result != 0)
00221 {
00222 asio::error_code ec(errno, asio::native_ecat);
00223 except_op_queue_.dispatch_all_operations(descriptor, ec);
00224 }
00225 }
00226 }
00227
00228
00229
00230
00231 template <typename Handler>
00232 void start_write_and_except_ops(socket_type descriptor, Handler handler)
00233 {
00234 asio::detail::mutex::scoped_lock lock(mutex_);
00235
00236 if (shutdown_)
00237 return;
00238
00239 bool need_mod = write_op_queue_.enqueue_operation(descriptor, handler);
00240 need_mod = except_op_queue_.enqueue_operation(descriptor, handler)
00241 && need_mod;
00242 if (need_mod)
00243 {
00244 epoll_event ev = { 0, { 0 } };
00245 ev.events = EPOLLOUT | EPOLLPRI | EPOLLERR | EPOLLHUP;
00246 if (read_op_queue_.has_operation(descriptor))
00247 ev.events |= EPOLLIN;
00248 ev.data.fd = descriptor;
00249
00250 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00251 if (result != 0)
00252 {
00253 asio::error_code ec(errno, asio::native_ecat);
00254 write_op_queue_.dispatch_all_operations(descriptor, ec);
00255 except_op_queue_.dispatch_all_operations(descriptor, ec);
00256 }
00257 }
00258 }
00259
00260
00261
00262
00263 void cancel_ops(socket_type descriptor)
00264 {
00265 asio::detail::mutex::scoped_lock lock(mutex_);
00266 cancel_ops_unlocked(descriptor);
00267 }
00268
00269
00270
00271
00272
00273
00274 void enqueue_cancel_ops_unlocked(socket_type descriptor)
00275 {
00276 pending_cancellations_.push_back(descriptor);
00277 }
00278
00279
00280
00281 void close_descriptor(socket_type descriptor)
00282 {
00283 asio::detail::mutex::scoped_lock lock(mutex_);
00284
00285
00286 epoll_event ev = { 0, { 0 } };
00287 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
00288
00289
00290 cancel_ops_unlocked(descriptor);
00291 }
00292
00293
00294 template <typename Time_Traits>
00295 void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
00296 {
00297 asio::detail::mutex::scoped_lock lock(mutex_);
00298 timer_queues_.push_back(&timer_queue);
00299 }
00300
00301
00302 template <typename Time_Traits>
00303 void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
00304 {
00305 asio::detail::mutex::scoped_lock lock(mutex_);
00306 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00307 {
00308 if (timer_queues_[i] == &timer_queue)
00309 {
00310 timer_queues_.erase(timer_queues_.begin() + i);
00311 return;
00312 }
00313 }
00314 }
00315
00316
00317
00318 template <typename Time_Traits, typename Handler>
00319 void schedule_timer(timer_queue<Time_Traits>& timer_queue,
00320 const typename Time_Traits::time_type& time, Handler handler, void* token)
00321 {
00322 asio::detail::mutex::scoped_lock lock(mutex_);
00323 if (!shutdown_)
00324 if (timer_queue.enqueue_timer(time, handler, token))
00325 interrupter_.interrupt();
00326 }
00327
00328
00329
00330 template <typename Time_Traits>
00331 std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
00332 {
00333 asio::detail::mutex::scoped_lock lock(mutex_);
00334 return timer_queue.cancel_timer(token);
00335 }
00336
00337 private:
00338 friend class task_io_service<epoll_reactor<Own_Thread> >;
00339
00340
00341 void run(bool block)
00342 {
00343 asio::detail::mutex::scoped_lock lock(mutex_);
00344
00345
00346
00347 read_op_queue_.dispatch_cancellations();
00348 write_op_queue_.dispatch_cancellations();
00349 except_op_queue_.dispatch_cancellations();
00350
00351
00352 if (stop_thread_)
00353 {
00354
00355
00356 lock.unlock();
00357 read_op_queue_.cleanup_operations();
00358 write_op_queue_.cleanup_operations();
00359 except_op_queue_.cleanup_operations();
00360 return;
00361 }
00362
00363
00364
00365 if (!block && read_op_queue_.empty() && write_op_queue_.empty()
00366 && except_op_queue_.empty() && all_timer_queues_are_empty())
00367 {
00368
00369
00370 lock.unlock();
00371 read_op_queue_.cleanup_operations();
00372 write_op_queue_.cleanup_operations();
00373 except_op_queue_.cleanup_operations();
00374 return;
00375 }
00376
00377 int timeout = block ? get_timeout() : 0;
00378 wait_in_progress_ = true;
00379 lock.unlock();
00380
00381
00382 epoll_event events[128];
00383 int num_events = epoll_wait(epoll_fd_, events, 128, timeout);
00384
00385 lock.lock();
00386 wait_in_progress_ = false;
00387
00388
00389 asio::detail::signal_blocker sb;
00390
00391
00392 for (int i = 0; i < num_events; ++i)
00393 {
00394 int descriptor = events[i].data.fd;
00395 if (descriptor == interrupter_.read_descriptor())
00396 {
00397 interrupter_.reset();
00398 }
00399 else
00400 {
00401 if (events[i].events & (EPOLLERR | EPOLLHUP))
00402 {
00403 asio::error_code ec;
00404 except_op_queue_.dispatch_all_operations(descriptor, ec);
00405 read_op_queue_.dispatch_all_operations(descriptor, ec);
00406 write_op_queue_.dispatch_all_operations(descriptor, ec);
00407
00408 epoll_event ev = { 0, { 0 } };
00409 ev.events = 0;
00410 ev.data.fd = descriptor;
00411 epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00412 }
00413 else
00414 {
00415 bool more_reads = false;
00416 bool more_writes = false;
00417 bool more_except = false;
00418 asio::error_code ec;
00419
00420
00421
00422 if (events[i].events & EPOLLPRI)
00423 more_except = except_op_queue_.dispatch_operation(descriptor, ec);
00424 else
00425 more_except = except_op_queue_.has_operation(descriptor);
00426
00427 if (events[i].events & EPOLLIN)
00428 more_reads = read_op_queue_.dispatch_operation(descriptor, ec);
00429 else
00430 more_reads = read_op_queue_.has_operation(descriptor);
00431
00432 if (events[i].events & EPOLLOUT)
00433 more_writes = write_op_queue_.dispatch_operation(descriptor, ec);
00434 else
00435 more_writes = write_op_queue_.has_operation(descriptor);
00436
00437 epoll_event ev = { 0, { 0 } };
00438 ev.events = EPOLLERR | EPOLLHUP;
00439 if (more_reads)
00440 ev.events |= EPOLLIN;
00441 if (more_writes)
00442 ev.events |= EPOLLOUT;
00443 if (more_except)
00444 ev.events |= EPOLLPRI;
00445 ev.data.fd = descriptor;
00446 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
00447 if (result != 0)
00448 {
00449 ec = asio::error_code(errno, asio::native_ecat);
00450 read_op_queue_.dispatch_all_operations(descriptor, ec);
00451 write_op_queue_.dispatch_all_operations(descriptor, ec);
00452 except_op_queue_.dispatch_all_operations(descriptor, ec);
00453 }
00454 }
00455 }
00456 }
00457 read_op_queue_.dispatch_cancellations();
00458 write_op_queue_.dispatch_cancellations();
00459 except_op_queue_.dispatch_cancellations();
00460 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00461 timer_queues_[i]->dispatch_timers();
00462
00463
00464 for (size_t i = 0; i < pending_cancellations_.size(); ++i)
00465 cancel_ops_unlocked(pending_cancellations_[i]);
00466 pending_cancellations_.clear();
00467
00468
00469
00470 lock.unlock();
00471 read_op_queue_.cleanup_operations();
00472 write_op_queue_.cleanup_operations();
00473 except_op_queue_.cleanup_operations();
00474 }
00475
00476
00477 void run_thread()
00478 {
00479 asio::detail::mutex::scoped_lock lock(mutex_);
00480 while (!stop_thread_)
00481 {
00482 lock.unlock();
00483 run(true);
00484 lock.lock();
00485 }
00486 }
00487
00488
00489 static void call_run_thread(epoll_reactor* reactor)
00490 {
00491 reactor->run_thread();
00492 }
00493
00494
00495 void interrupt()
00496 {
00497 interrupter_.interrupt();
00498 }
00499
00500
00501 enum { epoll_size = 20000 };
00502
00503
00504
00505 static int do_epoll_create()
00506 {
00507 int fd = epoll_create(epoll_size);
00508 if (fd == -1)
00509 {
00510 boost::throw_exception(asio::system_error(
00511 asio::error_code(errno, asio::native_ecat),
00512 "epoll"));
00513 }
00514 return fd;
00515 }
00516
00517
00518 bool all_timer_queues_are_empty() const
00519 {
00520 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00521 if (!timer_queues_[i]->empty())
00522 return false;
00523 return true;
00524 }
00525
00526
00527
00528
00529 int get_timeout()
00530 {
00531 if (all_timer_queues_are_empty())
00532 return -1;
00533
00534
00535
00536 boost::posix_time::time_duration minimum_wait_duration
00537 = boost::posix_time::minutes(5);
00538
00539 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00540 {
00541 boost::posix_time::time_duration wait_duration
00542 = timer_queues_[i]->wait_duration();
00543 if (wait_duration < minimum_wait_duration)
00544 minimum_wait_duration = wait_duration;
00545 }
00546
00547 if (minimum_wait_duration > boost::posix_time::time_duration())
00548 {
00549 return minimum_wait_duration.total_milliseconds();
00550 }
00551 else
00552 {
00553 return 0;
00554 }
00555 }
00556
00557
00558
00559
00560 void cancel_ops_unlocked(socket_type descriptor)
00561 {
00562 bool interrupt = read_op_queue_.cancel_operations(descriptor);
00563 interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt;
00564 interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt;
00565 if (interrupt)
00566 interrupter_.interrupt();
00567 }
00568
00569
00570 asio::detail::mutex mutex_;
00571
00572
00573 int epoll_fd_;
00574
00575
00576 bool wait_in_progress_;
00577
00578
00579 select_interrupter interrupter_;
00580
00581
00582 reactor_op_queue<socket_type> read_op_queue_;
00583
00584
00585 reactor_op_queue<socket_type> write_op_queue_;
00586
00587
00588 reactor_op_queue<socket_type> except_op_queue_;
00589
00590
00591 std::vector<timer_queue_base*> timer_queues_;
00592
00593
00594 std::vector<socket_type> pending_cancellations_;
00595
00596
00597 bool stop_thread_;
00598
00599
00600 asio::detail::thread* thread_;
00601
00602
00603 bool shutdown_;
00604 };
00605
00606 }
00607 }
00608
00609 #endif // defined(ASIO_HAS_EPOLL)
00610
00611 #include "asio/detail/pop_options.hpp"
00612
00613 #endif // ASIO_DETAIL_EPOLL_REACTOR_HPP