00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 #ifndef ASIO_DETAIL_KQUEUE_REACTOR_HPP
00013 #define ASIO_DETAIL_KQUEUE_REACTOR_HPP
00014
00015 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00016 # pragma once
00017 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00018
00019 #include "asio/detail/push_options.hpp"
00020
00021 #include "asio/detail/kqueue_reactor_fwd.hpp"
00022
00023 #if defined(ASIO_HAS_KQUEUE)
00024
00025 #include "asio/detail/push_options.hpp"
00026 #include <cstddef>
00027 #include <vector>
00028 #include <sys/types.h>
00029 #include <sys/event.h>
00030 #include <sys/time.h>
00031 #include <boost/config.hpp>
00032 #include <boost/date_time/posix_time/posix_time_types.hpp>
00033 #include <boost/throw_exception.hpp>
00034 #include "asio/detail/pop_options.hpp"
00035
00036 #include "asio/error.hpp"
00037 #include "asio/io_service.hpp"
00038 #include "asio/system_error.hpp"
00039 #include "asio/detail/bind_handler.hpp"
00040 #include "asio/detail/mutex.hpp"
00041 #include "asio/detail/task_io_service.hpp"
00042 #include "asio/detail/thread.hpp"
00043 #include "asio/detail/reactor_op_queue.hpp"
00044 #include "asio/detail/select_interrupter.hpp"
00045 #include "asio/detail/service_base.hpp"
00046 #include "asio/detail/signal_blocker.hpp"
00047 #include "asio/detail/socket_types.hpp"
00048 #include "asio/detail/timer_queue.hpp"
00049
00050
00051 #if !defined(EV_OOBAND)
00052 # define EV_OOBAND EV_FLAG1
00053 #endif // !defined(EV_OOBAND)
00054
00055 namespace asio {
00056 namespace detail {
00057
00058 template <bool Own_Thread>
00059 class kqueue_reactor
00060 : public asio::detail::service_base<kqueue_reactor<Own_Thread> >
00061 {
00062 public:
00063
00064 kqueue_reactor(asio::io_service& io_service)
00065 : asio::detail::service_base<
00066 kqueue_reactor<Own_Thread> >(io_service),
00067 mutex_(),
00068 kqueue_fd_(do_kqueue_create()),
00069 wait_in_progress_(false),
00070 interrupter_(),
00071 read_op_queue_(),
00072 write_op_queue_(),
00073 except_op_queue_(),
00074 pending_cancellations_(),
00075 stop_thread_(false),
00076 thread_(0),
00077 shutdown_(false)
00078 {
00079
00080 if (Own_Thread)
00081 {
00082 asio::detail::signal_blocker sb;
00083 thread_ = new asio::detail::thread(
00084 bind_handler(&kqueue_reactor::call_run_thread, this));
00085 }
00086
00087
00088 struct kevent event;
00089 EV_SET(&event, interrupter_.read_descriptor(),
00090 EVFILT_READ, EV_ADD, 0, 0, 0);
00091 ::kevent(kqueue_fd_, &event, 1, 0, 0, 0);
00092 }
00093
00094
00095 ~kqueue_reactor()
00096 {
00097 shutdown_service();
00098 close(kqueue_fd_);
00099 }
00100
00101
00102 void shutdown_service()
00103 {
00104 asio::detail::mutex::scoped_lock lock(mutex_);
00105 shutdown_ = true;
00106 stop_thread_ = true;
00107 lock.unlock();
00108
00109 if (thread_)
00110 {
00111 interrupter_.interrupt();
00112 thread_->join();
00113 delete thread_;
00114 thread_ = 0;
00115 }
00116
00117 read_op_queue_.destroy_operations();
00118 write_op_queue_.destroy_operations();
00119 except_op_queue_.destroy_operations();
00120
00121 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00122 timer_queues_[i]->destroy_timers();
00123 timer_queues_.clear();
00124 }
00125
00126
00127
00128 int register_descriptor(socket_type)
00129 {
00130 return 0;
00131 }
00132
00133
00134
00135 template <typename Handler>
00136 void start_read_op(socket_type descriptor, Handler handler)
00137 {
00138 asio::detail::mutex::scoped_lock lock(mutex_);
00139
00140 if (shutdown_)
00141 return;
00142
00143 if (!read_op_queue_.has_operation(descriptor))
00144 if (handler(asio::error_code()))
00145 return;
00146
00147 if (read_op_queue_.enqueue_operation(descriptor, handler))
00148 {
00149 struct kevent event;
00150 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0);
00151 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
00152 {
00153 asio::error_code ec(errno, asio::native_ecat);
00154 read_op_queue_.dispatch_all_operations(descriptor, ec);
00155 }
00156 }
00157 }
00158
00159
00160
00161 template <typename Handler>
00162 void start_write_op(socket_type descriptor, Handler handler)
00163 {
00164 asio::detail::mutex::scoped_lock lock(mutex_);
00165
00166 if (shutdown_)
00167 return;
00168
00169 if (!write_op_queue_.has_operation(descriptor))
00170 if (handler(asio::error_code()))
00171 return;
00172
00173 if (write_op_queue_.enqueue_operation(descriptor, handler))
00174 {
00175 struct kevent event;
00176 EV_SET(&event, descriptor, EVFILT_WRITE, EV_ADD, 0, 0, 0);
00177 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
00178 {
00179 asio::error_code ec(errno, asio::native_ecat);
00180 write_op_queue_.dispatch_all_operations(descriptor, ec);
00181 }
00182 }
00183 }
00184
00185
00186
00187 template <typename Handler>
00188 void start_except_op(socket_type descriptor, Handler handler)
00189 {
00190 asio::detail::mutex::scoped_lock lock(mutex_);
00191
00192 if (shutdown_)
00193 return;
00194
00195 if (except_op_queue_.enqueue_operation(descriptor, handler))
00196 {
00197 struct kevent event;
00198 if (read_op_queue_.has_operation(descriptor))
00199 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0);
00200 else
00201 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, EV_OOBAND, 0, 0);
00202 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
00203 {
00204 asio::error_code ec(errno, asio::native_ecat);
00205 except_op_queue_.dispatch_all_operations(descriptor, ec);
00206 }
00207 }
00208 }
00209
00210
00211
00212
00213 template <typename Handler>
00214 void start_write_and_except_ops(socket_type descriptor, Handler handler)
00215 {
00216 asio::detail::mutex::scoped_lock lock(mutex_);
00217
00218 if (shutdown_)
00219 return;
00220
00221 if (write_op_queue_.enqueue_operation(descriptor, handler))
00222 {
00223 struct kevent event;
00224 EV_SET(&event, descriptor, EVFILT_WRITE, EV_ADD, 0, 0, 0);
00225 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
00226 {
00227 asio::error_code ec(errno, asio::native_ecat);
00228 write_op_queue_.dispatch_all_operations(descriptor, ec);
00229 }
00230 }
00231
00232 if (except_op_queue_.enqueue_operation(descriptor, handler))
00233 {
00234 struct kevent event;
00235 if (read_op_queue_.has_operation(descriptor))
00236 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0);
00237 else
00238 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, EV_OOBAND, 0, 0);
00239 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
00240 {
00241 asio::error_code ec(errno, asio::native_ecat);
00242 except_op_queue_.dispatch_all_operations(descriptor, ec);
00243 write_op_queue_.dispatch_all_operations(descriptor, ec);
00244 }
00245 }
00246 }
00247
00248
00249
00250
00251 void cancel_ops(socket_type descriptor)
00252 {
00253 asio::detail::mutex::scoped_lock lock(mutex_);
00254 cancel_ops_unlocked(descriptor);
00255 }
00256
00257
00258
00259
00260
00261
00262 void enqueue_cancel_ops_unlocked(socket_type descriptor)
00263 {
00264 pending_cancellations_.push_back(descriptor);
00265 }
00266
00267
00268
00269 void close_descriptor(socket_type descriptor)
00270 {
00271 asio::detail::mutex::scoped_lock lock(mutex_);
00272
00273
00274 struct kevent event[2];
00275 EV_SET(&event[0], descriptor, EVFILT_READ, EV_DELETE, 0, 0, 0);
00276 EV_SET(&event[1], descriptor, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
00277 ::kevent(kqueue_fd_, event, 2, 0, 0, 0);
00278
00279
00280 cancel_ops_unlocked(descriptor);
00281 }
00282
00283
00284 template <typename Time_Traits>
00285 void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
00286 {
00287 asio::detail::mutex::scoped_lock lock(mutex_);
00288 timer_queues_.push_back(&timer_queue);
00289 }
00290
00291
00292 template <typename Time_Traits>
00293 void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
00294 {
00295 asio::detail::mutex::scoped_lock lock(mutex_);
00296 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00297 {
00298 if (timer_queues_[i] == &timer_queue)
00299 {
00300 timer_queues_.erase(timer_queues_.begin() + i);
00301 return;
00302 }
00303 }
00304 }
00305
00306
00307
00308 template <typename Time_Traits, typename Handler>
00309 void schedule_timer(timer_queue<Time_Traits>& timer_queue,
00310 const typename Time_Traits::time_type& time, Handler handler, void* token)
00311 {
00312 asio::detail::mutex::scoped_lock lock(mutex_);
00313 if (!shutdown_)
00314 if (timer_queue.enqueue_timer(time, handler, token))
00315 interrupter_.interrupt();
00316 }
00317
00318
00319
00320 template <typename Time_Traits>
00321 std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
00322 {
00323 asio::detail::mutex::scoped_lock lock(mutex_);
00324 return timer_queue.cancel_timer(token);
00325 }
00326
00327 private:
00328 friend class task_io_service<kqueue_reactor<Own_Thread> >;
00329
00330
00331 void run(bool block)
00332 {
00333 asio::detail::mutex::scoped_lock lock(mutex_);
00334
00335
00336
00337 read_op_queue_.dispatch_cancellations();
00338 write_op_queue_.dispatch_cancellations();
00339 except_op_queue_.dispatch_cancellations();
00340
00341
00342 if (stop_thread_)
00343 {
00344
00345
00346 lock.unlock();
00347 read_op_queue_.cleanup_operations();
00348 write_op_queue_.cleanup_operations();
00349 except_op_queue_.cleanup_operations();
00350 return;
00351 }
00352
00353
00354
00355 if (!block && read_op_queue_.empty() && write_op_queue_.empty()
00356 && except_op_queue_.empty() && all_timer_queues_are_empty())
00357 {
00358
00359
00360 lock.unlock();
00361 read_op_queue_.cleanup_operations();
00362 write_op_queue_.cleanup_operations();
00363 except_op_queue_.cleanup_operations();
00364 return;
00365 }
00366
00367
00368 timespec timeout_buf = { 0, 0 };
00369 timespec* timeout = block ? get_timeout(timeout_buf) : &timeout_buf;
00370
00371 wait_in_progress_ = true;
00372 lock.unlock();
00373
00374
00375 struct kevent events[128];
00376 int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout);
00377
00378 lock.lock();
00379 wait_in_progress_ = false;
00380
00381
00382 asio::detail::signal_blocker sb;
00383
00384
00385 for (int i = 0; i < num_events; ++i)
00386 {
00387 int descriptor = events[i].ident;
00388 if (descriptor == interrupter_.read_descriptor())
00389 {
00390 interrupter_.reset();
00391 }
00392 else if (events[i].filter == EVFILT_READ)
00393 {
00394
00395 bool more_reads = false;
00396 bool more_except = false;
00397 if (events[i].flags & EV_ERROR)
00398 {
00399 asio::error_code error(
00400 events[i].data, asio::native_ecat);
00401 except_op_queue_.dispatch_all_operations(descriptor, error);
00402 read_op_queue_.dispatch_all_operations(descriptor, error);
00403 }
00404 else if (events[i].flags & EV_OOBAND)
00405 {
00406 asio::error_code error;
00407 more_except = except_op_queue_.dispatch_operation(descriptor, error);
00408 if (events[i].data > 0)
00409 more_reads = read_op_queue_.dispatch_operation(descriptor, error);
00410 else
00411 more_reads = read_op_queue_.has_operation(descriptor);
00412 }
00413 else
00414 {
00415 asio::error_code error;
00416 more_reads = read_op_queue_.dispatch_operation(descriptor, error);
00417 more_except = except_op_queue_.has_operation(descriptor);
00418 }
00419
00420
00421 struct kevent event;
00422 if (more_reads)
00423 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0);
00424 else if (more_except)
00425 EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, EV_OOBAND, 0, 0);
00426 else
00427 EV_SET(&event, descriptor, EVFILT_READ, EV_DELETE, 0, 0, 0);
00428 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
00429 {
00430 asio::error_code error(errno, asio::native_ecat);
00431 except_op_queue_.dispatch_all_operations(descriptor, error);
00432 read_op_queue_.dispatch_all_operations(descriptor, error);
00433 }
00434 }
00435 else if (events[i].filter == EVFILT_WRITE)
00436 {
00437
00438 bool more_writes = false;
00439 if (events[i].flags & EV_ERROR)
00440 {
00441 asio::error_code error(
00442 events[i].data, asio::native_ecat);
00443 write_op_queue_.dispatch_all_operations(descriptor, error);
00444 }
00445 else
00446 {
00447 asio::error_code error;
00448 more_writes = write_op_queue_.dispatch_operation(descriptor, error);
00449 }
00450
00451
00452 struct kevent event;
00453 if (more_writes)
00454 EV_SET(&event, descriptor, EVFILT_WRITE, EV_ADD, 0, 0, 0);
00455 else
00456 EV_SET(&event, descriptor, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
00457 if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
00458 {
00459 asio::error_code error(errno, asio::native_ecat);
00460 write_op_queue_.dispatch_all_operations(descriptor, error);
00461 }
00462 }
00463 }
00464
00465 read_op_queue_.dispatch_cancellations();
00466 write_op_queue_.dispatch_cancellations();
00467 except_op_queue_.dispatch_cancellations();
00468 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00469 timer_queues_[i]->dispatch_timers();
00470
00471
00472 for (std::size_t i = 0; i < pending_cancellations_.size(); ++i)
00473 cancel_ops_unlocked(pending_cancellations_[i]);
00474 pending_cancellations_.clear();
00475
00476
00477
00478 lock.unlock();
00479 read_op_queue_.cleanup_operations();
00480 write_op_queue_.cleanup_operations();
00481 except_op_queue_.cleanup_operations();
00482 }
00483
00484
00485 void run_thread()
00486 {
00487 asio::detail::mutex::scoped_lock lock(mutex_);
00488 while (!stop_thread_)
00489 {
00490 lock.unlock();
00491 run(true);
00492 lock.lock();
00493 }
00494 }
00495
00496
00497 static void call_run_thread(kqueue_reactor* reactor)
00498 {
00499 reactor->run_thread();
00500 }
00501
00502
00503 void interrupt()
00504 {
00505 interrupter_.interrupt();
00506 }
00507
00508
00509
00510 static int do_kqueue_create()
00511 {
00512 int fd = kqueue();
00513 if (fd == -1)
00514 {
00515 boost::throw_exception(asio::system_error(
00516 asio::error_code(errno, asio::native_ecat),
00517 "kqueue"));
00518 }
00519 return fd;
00520 }
00521
00522
00523 bool all_timer_queues_are_empty() const
00524 {
00525 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00526 if (!timer_queues_[i]->empty())
00527 return false;
00528 return true;
00529 }
00530
00531
00532 timespec* get_timeout(timespec& ts)
00533 {
00534 if (all_timer_queues_are_empty())
00535 return 0;
00536
00537
00538
00539 boost::posix_time::time_duration minimum_wait_duration
00540 = boost::posix_time::minutes(5);
00541
00542 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
00543 {
00544 boost::posix_time::time_duration wait_duration
00545 = timer_queues_[i]->wait_duration();
00546 if (wait_duration < minimum_wait_duration)
00547 minimum_wait_duration = wait_duration;
00548 }
00549
00550 if (minimum_wait_duration > boost::posix_time::time_duration())
00551 {
00552 ts.tv_sec = minimum_wait_duration.total_seconds();
00553 ts.tv_nsec = minimum_wait_duration.total_nanoseconds() % 1000000000;
00554 }
00555 else
00556 {
00557 ts.tv_sec = 0;
00558 ts.tv_nsec = 0;
00559 }
00560
00561 return &ts;
00562 }
00563
00564
00565
00566
00567 void cancel_ops_unlocked(socket_type descriptor)
00568 {
00569 bool interrupt = read_op_queue_.cancel_operations(descriptor);
00570 interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt;
00571 interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt;
00572 if (interrupt)
00573 interrupter_.interrupt();
00574 }
00575
00576
00577 asio::detail::mutex mutex_;
00578
00579
00580 int kqueue_fd_;
00581
00582
00583 bool wait_in_progress_;
00584
00585
00586 select_interrupter interrupter_;
00587
00588
00589 reactor_op_queue<socket_type> read_op_queue_;
00590
00591
00592 reactor_op_queue<socket_type> write_op_queue_;
00593
00594
00595 reactor_op_queue<socket_type> except_op_queue_;
00596
00597
00598 std::vector<timer_queue_base*> timer_queues_;
00599
00600
00601 std::vector<socket_type> pending_cancellations_;
00602
00603
00604 bool stop_thread_;
00605
00606
00607 asio::detail::thread* thread_;
00608
00609
00610 bool shutdown_;
00611 };
00612
00613 }
00614 }
00615
00616 #endif // defined(ASIO_HAS_KQUEUE)
00617
00618 #include "asio/detail/pop_options.hpp"
00619
00620 #endif // ASIO_DETAIL_KQUEUE_REACTOR_HPP