00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
00012 #define ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP
00013
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017
00018 #include "asio/detail/push_options.hpp"
00019
00020 #include "asio/detail/push_options.hpp"
00021 #include <boost/shared_ptr.hpp>
00022 #include "asio/detail/pop_options.hpp"
00023
00024 #include "asio/buffer.hpp"
00025 #include "asio/error.hpp"
00026 #include "asio/io_service.hpp"
00027 #include "asio/socket_base.hpp"
00028 #include "asio/detail/bind_handler.hpp"
00029 #include "asio/detail/noncopyable.hpp"
00030 #include "asio/detail/service_base.hpp"
00031 #include "asio/detail/socket_holder.hpp"
00032 #include "asio/detail/socket_ops.hpp"
00033 #include "asio/detail/socket_types.hpp"
00034
00035 namespace asio {
00036 namespace detail {
00037
00038 template <typename Protocol, typename Reactor>
00039 class reactive_socket_service
00040 : public asio::detail::service_base<
00041 reactive_socket_service<Protocol, Reactor> >
00042 {
00043 public:
00044
00045 typedef Protocol protocol_type;
00046
00047
00048 typedef typename Protocol::endpoint endpoint_type;
00049
00050
00051 typedef socket_type native_type;
00052
00053
00054 class implementation_type
00055 : private asio::detail::noncopyable
00056 {
00057 public:
00058
00059 implementation_type()
00060 : socket_(invalid_socket),
00061 flags_(0),
00062 protocol_(endpoint_type().protocol())
00063 {
00064 }
00065
00066 private:
00067
00068 friend class reactive_socket_service<Protocol, Reactor>;
00069
00070
00071 socket_type socket_;
00072
00073 enum
00074 {
00075 user_set_non_blocking = 1,
00076 internal_non_blocking = 2,
00077 enable_connection_aborted = 4,
00078 user_set_linger = 8
00079 };
00080
00081
00082 unsigned char flags_;
00083
00084
00085 protocol_type protocol_;
00086 };
00087
00088
00089 enum { max_buffers = 16 };
00090
00091
00092 reactive_socket_service(asio::io_service& io_service)
00093 : asio::detail::service_base<
00094 reactive_socket_service<Protocol, Reactor> >(io_service),
00095 reactor_(asio::use_service<Reactor>(io_service))
00096 {
00097 }
00098
00099
00100 void shutdown_service()
00101 {
00102 }
00103
00104
00105 void construct(implementation_type& impl)
00106 {
00107 impl.socket_ = invalid_socket;
00108 impl.flags_ = 0;
00109 }
00110
00111
00112 void destroy(implementation_type& impl)
00113 {
00114 if (impl.socket_ != invalid_socket)
00115 {
00116 reactor_.close_descriptor(impl.socket_);
00117
00118 if (impl.flags_ & implementation_type::internal_non_blocking)
00119 {
00120 ioctl_arg_type non_blocking = 0;
00121 asio::error_code ignored_ec;
00122 socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
00123 impl.flags_ &= ~implementation_type::internal_non_blocking;
00124 }
00125
00126 if (impl.flags_ & implementation_type::user_set_linger)
00127 {
00128 ::linger opt;
00129 opt.l_onoff = 0;
00130 opt.l_linger = 0;
00131 asio::error_code ignored_ec;
00132 socket_ops::setsockopt(impl.socket_,
00133 SOL_SOCKET, SO_LINGER, &opt, sizeof(opt), ignored_ec);
00134 }
00135
00136 asio::error_code ignored_ec;
00137 socket_ops::close(impl.socket_, ignored_ec);
00138
00139 impl.socket_ = invalid_socket;
00140 }
00141 }
00142
00143
00144 asio::error_code open(implementation_type& impl,
00145 const protocol_type& protocol, asio::error_code& ec)
00146 {
00147 if (is_open(impl))
00148 {
00149 ec = asio::error::already_open;
00150 return ec;
00151 }
00152
00153 socket_holder sock(socket_ops::socket(protocol.family(),
00154 protocol.type(), protocol.protocol(), ec));
00155 if (sock.get() == invalid_socket)
00156 return ec;
00157
00158 if (int err = reactor_.register_descriptor(sock.get()))
00159 {
00160 ec = asio::error_code(err, asio::native_ecat);
00161 return ec;
00162 }
00163
00164 impl.socket_ = sock.release();
00165 impl.flags_ = 0;
00166 impl.protocol_ = protocol;
00167 ec = asio::error_code();
00168 return ec;
00169 }
00170
00171
00172 asio::error_code assign(implementation_type& impl,
00173 const protocol_type& protocol, const native_type& native_socket,
00174 asio::error_code& ec)
00175 {
00176 if (is_open(impl))
00177 {
00178 ec = asio::error::already_open;
00179 return ec;
00180 }
00181
00182 if (int err = reactor_.register_descriptor(native_socket))
00183 {
00184 ec = asio::error_code(err, asio::native_ecat);
00185 return ec;
00186 }
00187
00188 impl.socket_ = native_socket;
00189 impl.flags_ = 0;
00190 impl.protocol_ = protocol;
00191 ec = asio::error_code();
00192 return ec;
00193 }
00194
00195
00196 bool is_open(const implementation_type& impl) const
00197 {
00198 return impl.socket_ != invalid_socket;
00199 }
00200
00201
00202 asio::error_code close(implementation_type& impl,
00203 asio::error_code& ec)
00204 {
00205 if (is_open(impl))
00206 {
00207 reactor_.close_descriptor(impl.socket_);
00208
00209 if (impl.flags_ & implementation_type::internal_non_blocking)
00210 {
00211 ioctl_arg_type non_blocking = 0;
00212 asio::error_code ignored_ec;
00213 socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ignored_ec);
00214 impl.flags_ &= ~implementation_type::internal_non_blocking;
00215 }
00216
00217 if (socket_ops::close(impl.socket_, ec) == socket_error_retval)
00218 return ec;
00219
00220 impl.socket_ = invalid_socket;
00221 }
00222
00223 ec = asio::error_code();
00224 return ec;
00225 }
00226
00227
00228 native_type native(implementation_type& impl)
00229 {
00230 return impl.socket_;
00231 }
00232
00233
00234 asio::error_code cancel(implementation_type& impl,
00235 asio::error_code& ec)
00236 {
00237 if (!is_open(impl))
00238 {
00239 ec = asio::error::bad_descriptor;
00240 return ec;
00241 }
00242
00243 reactor_.cancel_ops(impl.socket_);
00244 ec = asio::error_code();
00245 return ec;
00246 }
00247
00248
00249 bool at_mark(const implementation_type& impl,
00250 asio::error_code& ec) const
00251 {
00252 if (!is_open(impl))
00253 {
00254 ec = asio::error::bad_descriptor;
00255 return false;
00256 }
00257
00258 asio::detail::ioctl_arg_type value = 0;
00259 socket_ops::ioctl(impl.socket_, SIOCATMARK, &value, ec);
00260 #if defined(ENOTTY)
00261 if (ec.value() == ENOTTY)
00262 ec = asio::error::not_socket;
00263 #endif // defined(ENOTTY)
00264 return ec ? false : value != 0;
00265 }
00266
00267
00268 std::size_t available(const implementation_type& impl,
00269 asio::error_code& ec) const
00270 {
00271 if (!is_open(impl))
00272 {
00273 ec = asio::error::bad_descriptor;
00274 return 0;
00275 }
00276
00277 asio::detail::ioctl_arg_type value = 0;
00278 socket_ops::ioctl(impl.socket_, FIONREAD, &value, ec);
00279 #if defined(ENOTTY)
00280 if (ec.value() == ENOTTY)
00281 ec = asio::error::not_socket;
00282 #endif // defined(ENOTTY)
00283 return ec ? static_cast<std::size_t>(0) : static_cast<std::size_t>(value);
00284 }
00285
00286
00287 asio::error_code bind(implementation_type& impl,
00288 const endpoint_type& endpoint, asio::error_code& ec)
00289 {
00290 if (!is_open(impl))
00291 {
00292 ec = asio::error::bad_descriptor;
00293 return ec;
00294 }
00295
00296 socket_ops::bind(impl.socket_, endpoint.data(), endpoint.size(), ec);
00297 return ec;
00298 }
00299
00300
00301 asio::error_code listen(implementation_type& impl, int backlog,
00302 asio::error_code& ec)
00303 {
00304 if (!is_open(impl))
00305 {
00306 ec = asio::error::bad_descriptor;
00307 return ec;
00308 }
00309
00310 socket_ops::listen(impl.socket_, backlog, ec);
00311 return ec;
00312 }
00313
00314
00315 template <typename Option>
00316 asio::error_code set_option(implementation_type& impl,
00317 const Option& option, asio::error_code& ec)
00318 {
00319 if (!is_open(impl))
00320 {
00321 ec = asio::error::bad_descriptor;
00322 return ec;
00323 }
00324
00325 if (option.level(impl.protocol_) == custom_socket_option_level
00326 && option.name(impl.protocol_) == enable_connection_aborted_option)
00327 {
00328 if (option.size(impl.protocol_) != sizeof(int))
00329 {
00330 ec = asio::error::invalid_argument;
00331 }
00332 else
00333 {
00334 if (*reinterpret_cast<const int*>(option.data(impl.protocol_)))
00335 impl.flags_ |= implementation_type::enable_connection_aborted;
00336 else
00337 impl.flags_ &= ~implementation_type::enable_connection_aborted;
00338 ec = asio::error_code();
00339 }
00340 return ec;
00341 }
00342 else
00343 {
00344 if (option.level(impl.protocol_) == SOL_SOCKET
00345 && option.name(impl.protocol_) == SO_LINGER)
00346 {
00347 impl.flags_ |= implementation_type::user_set_linger;
00348 }
00349
00350 socket_ops::setsockopt(impl.socket_,
00351 option.level(impl.protocol_), option.name(impl.protocol_),
00352 option.data(impl.protocol_), option.size(impl.protocol_), ec);
00353 return ec;
00354 }
00355 }
00356
00357
00358 template <typename Option>
00359 asio::error_code get_option(const implementation_type& impl,
00360 Option& option, asio::error_code& ec) const
00361 {
00362 if (!is_open(impl))
00363 {
00364 ec = asio::error::bad_descriptor;
00365 return ec;
00366 }
00367
00368 if (option.level(impl.protocol_) == custom_socket_option_level
00369 && option.name(impl.protocol_) == enable_connection_aborted_option)
00370 {
00371 if (option.size(impl.protocol_) != sizeof(int))
00372 {
00373 ec = asio::error::invalid_argument;
00374 }
00375 else
00376 {
00377 int* target = reinterpret_cast<int*>(option.data(impl.protocol_));
00378 if (impl.flags_ & implementation_type::enable_connection_aborted)
00379 *target = 1;
00380 else
00381 *target = 0;
00382 option.resize(impl.protocol_, sizeof(int));
00383 ec = asio::error_code();
00384 }
00385 return ec;
00386 }
00387 else
00388 {
00389 size_t size = option.size(impl.protocol_);
00390 socket_ops::getsockopt(impl.socket_,
00391 option.level(impl.protocol_), option.name(impl.protocol_),
00392 option.data(impl.protocol_), &size, ec);
00393 if (!ec)
00394 option.resize(impl.protocol_, size);
00395 return ec;
00396 }
00397 }
00398
00399
00400 template <typename IO_Control_Command>
00401 asio::error_code io_control(implementation_type& impl,
00402 IO_Control_Command& command, asio::error_code& ec)
00403 {
00404 if (!is_open(impl))
00405 {
00406 ec = asio::error::bad_descriptor;
00407 return ec;
00408 }
00409
00410 if (command.name() == static_cast<int>(FIONBIO))
00411 {
00412 if (command.get())
00413 impl.flags_ |= implementation_type::user_set_non_blocking;
00414 else
00415 impl.flags_ &= ~implementation_type::user_set_non_blocking;
00416 ec = asio::error_code();
00417 }
00418 else
00419 {
00420 socket_ops::ioctl(impl.socket_, command.name(),
00421 static_cast<ioctl_arg_type*>(command.data()), ec);
00422 }
00423 return ec;
00424 }
00425
00426
00427 endpoint_type local_endpoint(const implementation_type& impl,
00428 asio::error_code& ec) const
00429 {
00430 if (!is_open(impl))
00431 {
00432 ec = asio::error::bad_descriptor;
00433 return endpoint_type();
00434 }
00435
00436 endpoint_type endpoint;
00437 socket_addr_len_type addr_len = endpoint.capacity();
00438 if (socket_ops::getsockname(impl.socket_, endpoint.data(), &addr_len, ec))
00439 return endpoint_type();
00440 endpoint.resize(addr_len);
00441 return endpoint;
00442 }
00443
00444
00445 endpoint_type remote_endpoint(const implementation_type& impl,
00446 asio::error_code& ec) const
00447 {
00448 if (!is_open(impl))
00449 {
00450 ec = asio::error::bad_descriptor;
00451 return endpoint_type();
00452 }
00453
00454 endpoint_type endpoint;
00455 socket_addr_len_type addr_len = endpoint.capacity();
00456 if (socket_ops::getpeername(impl.socket_, endpoint.data(), &addr_len, ec))
00457 return endpoint_type();
00458 endpoint.resize(addr_len);
00459 return endpoint;
00460 }
00461
00463 asio::error_code shutdown(implementation_type& impl,
00464 socket_base::shutdown_type what, asio::error_code& ec)
00465 {
00466 if (!is_open(impl))
00467 {
00468 ec = asio::error::bad_descriptor;
00469 return ec;
00470 }
00471
00472 socket_ops::shutdown(impl.socket_, what, ec);
00473 return ec;
00474 }
00475
00476
00477 template <typename ConstBufferSequence>
00478 size_t send(implementation_type& impl, const ConstBufferSequence& buffers,
00479 socket_base::message_flags flags, asio::error_code& ec)
00480 {
00481 if (!is_open(impl))
00482 {
00483 ec = asio::error::bad_descriptor;
00484 return 0;
00485 }
00486
00487
00488 socket_ops::buf bufs[max_buffers];
00489 typename ConstBufferSequence::const_iterator iter = buffers.begin();
00490 typename ConstBufferSequence::const_iterator end = buffers.end();
00491 size_t i = 0;
00492 size_t total_buffer_size = 0;
00493 for (; iter != end && i < max_buffers; ++iter, ++i)
00494 {
00495 asio::const_buffer buffer(*iter);
00496 socket_ops::init_buf(bufs[i],
00497 asio::buffer_cast<const void*>(buffer),
00498 asio::buffer_size(buffer));
00499 total_buffer_size += asio::buffer_size(buffer);
00500 }
00501
00502
00503 if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
00504 {
00505 ec = asio::error_code();
00506 return 0;
00507 }
00508
00509
00510 for (;;)
00511 {
00512
00513 int bytes_sent = socket_ops::send(impl.socket_, bufs, i, flags, ec);
00514
00515
00516 if (bytes_sent >= 0)
00517 return bytes_sent;
00518
00519
00520 if ((impl.flags_ & implementation_type::user_set_non_blocking)
00521 || (ec != asio::error::would_block
00522 && ec != asio::error::try_again))
00523 return 0;
00524
00525
00526 if (socket_ops::poll_write(impl.socket_, ec) < 0)
00527 return 0;
00528 }
00529 }
00530
00531 template <typename ConstBufferSequence, typename Handler>
00532 class send_handler
00533 {
00534 public:
00535 send_handler(socket_type socket, asio::io_service& io_service,
00536 const ConstBufferSequence& buffers, socket_base::message_flags flags,
00537 Handler handler)
00538 : socket_(socket),
00539 io_service_(io_service),
00540 work_(io_service),
00541 buffers_(buffers),
00542 flags_(flags),
00543 handler_(handler)
00544 {
00545 }
00546
00547 bool operator()(const asio::error_code& result)
00548 {
00549
00550 if (result)
00551 {
00552 io_service_.post(bind_handler(handler_, result, 0));
00553 return true;
00554 }
00555
00556
00557 socket_ops::buf bufs[max_buffers];
00558 typename ConstBufferSequence::const_iterator iter = buffers_.begin();
00559 typename ConstBufferSequence::const_iterator end = buffers_.end();
00560 size_t i = 0;
00561 for (; iter != end && i < max_buffers; ++iter, ++i)
00562 {
00563 asio::const_buffer buffer(*iter);
00564 socket_ops::init_buf(bufs[i],
00565 asio::buffer_cast<const void*>(buffer),
00566 asio::buffer_size(buffer));
00567 }
00568
00569
00570 asio::error_code ec;
00571 int bytes = socket_ops::send(socket_, bufs, i, flags_, ec);
00572
00573
00574 if (ec == asio::error::would_block
00575 || ec == asio::error::try_again)
00576 return false;
00577
00578 io_service_.post(bind_handler(handler_, ec, bytes < 0 ? 0 : bytes));
00579 return true;
00580 }
00581
00582 private:
00583 socket_type socket_;
00584 asio::io_service& io_service_;
00585 asio::io_service::work work_;
00586 ConstBufferSequence buffers_;
00587 socket_base::message_flags flags_;
00588 Handler handler_;
00589 };
00590
00591
00592
00593 template <typename ConstBufferSequence, typename Handler>
00594 void async_send(implementation_type& impl, const ConstBufferSequence& buffers,
00595 socket_base::message_flags flags, Handler handler)
00596 {
00597 if (!is_open(impl))
00598 {
00599 this->io_service().post(bind_handler(handler,
00600 asio::error::bad_descriptor, 0));
00601 }
00602 else
00603 {
00604 if (impl.protocol_.type() == SOCK_STREAM)
00605 {
00606
00607 typename ConstBufferSequence::const_iterator iter = buffers.begin();
00608 typename ConstBufferSequence::const_iterator end = buffers.end();
00609 size_t i = 0;
00610 size_t total_buffer_size = 0;
00611 for (; iter != end && i < max_buffers; ++iter, ++i)
00612 {
00613 asio::const_buffer buffer(*iter);
00614 total_buffer_size += asio::buffer_size(buffer);
00615 }
00616
00617
00618 if (total_buffer_size == 0)
00619 {
00620 this->io_service().post(bind_handler(handler,
00621 asio::error_code(), 0));
00622 return;
00623 }
00624 }
00625
00626
00627 if (!(impl.flags_ & implementation_type::internal_non_blocking))
00628 {
00629 ioctl_arg_type non_blocking = 1;
00630 asio::error_code ec;
00631 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
00632 {
00633 this->io_service().post(bind_handler(handler, ec, 0));
00634 return;
00635 }
00636 impl.flags_ |= implementation_type::internal_non_blocking;
00637 }
00638
00639 reactor_.start_write_op(impl.socket_,
00640 send_handler<ConstBufferSequence, Handler>(
00641 impl.socket_, this->io_service(), buffers, flags, handler));
00642 }
00643 }
00644
00645
00646
00647 template <typename ConstBufferSequence>
00648 size_t send_to(implementation_type& impl, const ConstBufferSequence& buffers,
00649 const endpoint_type& destination, socket_base::message_flags flags,
00650 asio::error_code& ec)
00651 {
00652 if (!is_open(impl))
00653 {
00654 ec = asio::error::bad_descriptor;
00655 return 0;
00656 }
00657
00658
00659 socket_ops::buf bufs[max_buffers];
00660 typename ConstBufferSequence::const_iterator iter = buffers.begin();
00661 typename ConstBufferSequence::const_iterator end = buffers.end();
00662 size_t i = 0;
00663 for (; iter != end && i < max_buffers; ++iter, ++i)
00664 {
00665 asio::const_buffer buffer(*iter);
00666 socket_ops::init_buf(bufs[i],
00667 asio::buffer_cast<const void*>(buffer),
00668 asio::buffer_size(buffer));
00669 }
00670
00671
00672 for (;;)
00673 {
00674
00675 int bytes_sent = socket_ops::sendto(impl.socket_, bufs, i, flags,
00676 destination.data(), destination.size(), ec);
00677
00678
00679 if (bytes_sent >= 0)
00680 return bytes_sent;
00681
00682
00683 if ((impl.flags_ & implementation_type::user_set_non_blocking)
00684 || (ec != asio::error::would_block
00685 && ec != asio::error::try_again))
00686 return 0;
00687
00688
00689 if (socket_ops::poll_write(impl.socket_, ec) < 0)
00690 return 0;
00691 }
00692 }
00693
00694 template <typename ConstBufferSequence, typename Handler>
00695 class send_to_handler
00696 {
00697 public:
00698 send_to_handler(socket_type socket, asio::io_service& io_service,
00699 const ConstBufferSequence& buffers, const endpoint_type& endpoint,
00700 socket_base::message_flags flags, Handler handler)
00701 : socket_(socket),
00702 io_service_(io_service),
00703 work_(io_service),
00704 buffers_(buffers),
00705 destination_(endpoint),
00706 flags_(flags),
00707 handler_(handler)
00708 {
00709 }
00710
00711 bool operator()(const asio::error_code& result)
00712 {
00713
00714 if (result)
00715 {
00716 io_service_.post(bind_handler(handler_, result, 0));
00717 return true;
00718 }
00719
00720
00721 socket_ops::buf bufs[max_buffers];
00722 typename ConstBufferSequence::const_iterator iter = buffers_.begin();
00723 typename ConstBufferSequence::const_iterator end = buffers_.end();
00724 size_t i = 0;
00725 for (; iter != end && i < max_buffers; ++iter, ++i)
00726 {
00727 asio::const_buffer buffer(*iter);
00728 socket_ops::init_buf(bufs[i],
00729 asio::buffer_cast<const void*>(buffer),
00730 asio::buffer_size(buffer));
00731 }
00732
00733
00734 asio::error_code ec;
00735 int bytes = socket_ops::sendto(socket_, bufs, i, flags_,
00736 destination_.data(), destination_.size(), ec);
00737
00738
00739 if (ec == asio::error::would_block
00740 || ec == asio::error::try_again)
00741 return false;
00742
00743 io_service_.post(bind_handler(handler_, ec, bytes < 0 ? 0 : bytes));
00744 return true;
00745 }
00746
00747 private:
00748 socket_type socket_;
00749 asio::io_service& io_service_;
00750 asio::io_service::work work_;
00751 ConstBufferSequence buffers_;
00752 endpoint_type destination_;
00753 socket_base::message_flags flags_;
00754 Handler handler_;
00755 };
00756
00757
00758
00759 template <typename ConstBufferSequence, typename Handler>
00760 void async_send_to(implementation_type& impl,
00761 const ConstBufferSequence& buffers,
00762 const endpoint_type& destination, socket_base::message_flags flags,
00763 Handler handler)
00764 {
00765 if (!is_open(impl))
00766 {
00767 this->io_service().post(bind_handler(handler,
00768 asio::error::bad_descriptor, 0));
00769 }
00770 else
00771 {
00772
00773 if (!(impl.flags_ & implementation_type::internal_non_blocking))
00774 {
00775 ioctl_arg_type non_blocking = 1;
00776 asio::error_code ec;
00777 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
00778 {
00779 this->io_service().post(bind_handler(handler, ec, 0));
00780 return;
00781 }
00782 impl.flags_ |= implementation_type::internal_non_blocking;
00783 }
00784
00785 reactor_.start_write_op(impl.socket_,
00786 send_to_handler<ConstBufferSequence, Handler>(
00787 impl.socket_, this->io_service(), buffers,
00788 destination, flags, handler));
00789 }
00790 }
00791
00792
00793 template <typename MutableBufferSequence>
00794 size_t receive(implementation_type& impl,
00795 const MutableBufferSequence& buffers,
00796 socket_base::message_flags flags, asio::error_code& ec)
00797 {
00798 if (!is_open(impl))
00799 {
00800 ec = asio::error::bad_descriptor;
00801 return 0;
00802 }
00803
00804
00805 socket_ops::buf bufs[max_buffers];
00806 typename MutableBufferSequence::const_iterator iter = buffers.begin();
00807 typename MutableBufferSequence::const_iterator end = buffers.end();
00808 size_t i = 0;
00809 size_t total_buffer_size = 0;
00810 for (; iter != end && i < max_buffers; ++iter, ++i)
00811 {
00812 asio::mutable_buffer buffer(*iter);
00813 socket_ops::init_buf(bufs[i],
00814 asio::buffer_cast<void*>(buffer),
00815 asio::buffer_size(buffer));
00816 total_buffer_size += asio::buffer_size(buffer);
00817 }
00818
00819
00820 if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
00821 {
00822 ec = asio::error_code();
00823 return 0;
00824 }
00825
00826
00827 for (;;)
00828 {
00829
00830 int bytes_recvd = socket_ops::recv(impl.socket_, bufs, i, flags, ec);
00831
00832
00833 if (bytes_recvd > 0)
00834 return bytes_recvd;
00835
00836
00837 if (bytes_recvd == 0)
00838 {
00839 ec = asio::error::eof;
00840 return 0;
00841 }
00842
00843
00844 if ((impl.flags_ & implementation_type::user_set_non_blocking)
00845 || (ec != asio::error::would_block
00846 && ec != asio::error::try_again))
00847 return 0;
00848
00849
00850 if (socket_ops::poll_read(impl.socket_, ec) < 0)
00851 return 0;
00852 }
00853 }
00854
00855 template <typename MutableBufferSequence, typename Handler>
00856 class receive_handler
00857 {
00858 public:
00859 receive_handler(socket_type socket, asio::io_service& io_service,
00860 const MutableBufferSequence& buffers, socket_base::message_flags flags,
00861 Handler handler)
00862 : socket_(socket),
00863 io_service_(io_service),
00864 work_(io_service),
00865 buffers_(buffers),
00866 flags_(flags),
00867 handler_(handler)
00868 {
00869 }
00870
00871 bool operator()(const asio::error_code& result)
00872 {
00873
00874 if (result)
00875 {
00876 io_service_.post(bind_handler(handler_, result, 0));
00877 return true;
00878 }
00879
00880
00881 socket_ops::buf bufs[max_buffers];
00882 typename MutableBufferSequence::const_iterator iter = buffers_.begin();
00883 typename MutableBufferSequence::const_iterator end = buffers_.end();
00884 size_t i = 0;
00885 for (; iter != end && i < max_buffers; ++iter, ++i)
00886 {
00887 asio::mutable_buffer buffer(*iter);
00888 socket_ops::init_buf(bufs[i],
00889 asio::buffer_cast<void*>(buffer),
00890 asio::buffer_size(buffer));
00891 }
00892
00893
00894 asio::error_code ec;
00895 int bytes = socket_ops::recv(socket_, bufs, i, flags_, ec);
00896 if (bytes == 0)
00897 ec = asio::error::eof;
00898
00899
00900 if (ec == asio::error::would_block
00901 || ec == asio::error::try_again)
00902 return false;
00903
00904 io_service_.post(bind_handler(handler_, ec, bytes < 0 ? 0 : bytes));
00905 return true;
00906 }
00907
00908 private:
00909 socket_type socket_;
00910 asio::io_service& io_service_;
00911 asio::io_service::work work_;
00912 MutableBufferSequence buffers_;
00913 socket_base::message_flags flags_;
00914 Handler handler_;
00915 };
00916
00917
00918
00919 template <typename MutableBufferSequence, typename Handler>
00920 void async_receive(implementation_type& impl,
00921 const MutableBufferSequence& buffers,
00922 socket_base::message_flags flags, Handler handler)
00923 {
00924 if (!is_open(impl))
00925 {
00926 this->io_service().post(bind_handler(handler,
00927 asio::error::bad_descriptor, 0));
00928 }
00929 else
00930 {
00931 if (impl.protocol_.type() == SOCK_STREAM)
00932 {
00933
00934 typename MutableBufferSequence::const_iterator iter = buffers.begin();
00935 typename MutableBufferSequence::const_iterator end = buffers.end();
00936 size_t i = 0;
00937 size_t total_buffer_size = 0;
00938 for (; iter != end && i < max_buffers; ++iter, ++i)
00939 {
00940 asio::mutable_buffer buffer(*iter);
00941 total_buffer_size += asio::buffer_size(buffer);
00942 }
00943
00944
00945 if (total_buffer_size == 0)
00946 {
00947 this->io_service().post(bind_handler(handler,
00948 asio::error_code(), 0));
00949 return;
00950 }
00951 }
00952
00953
00954 if (!(impl.flags_ & implementation_type::internal_non_blocking))
00955 {
00956 ioctl_arg_type non_blocking = 1;
00957 asio::error_code ec;
00958 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
00959 {
00960 this->io_service().post(bind_handler(handler, ec, 0));
00961 return;
00962 }
00963 impl.flags_ |= implementation_type::internal_non_blocking;
00964 }
00965
00966 if (flags & socket_base::message_out_of_band)
00967 {
00968 reactor_.start_except_op(impl.socket_,
00969 receive_handler<MutableBufferSequence, Handler>(
00970 impl.socket_, this->io_service(), buffers, flags, handler));
00971 }
00972 else
00973 {
00974 reactor_.start_read_op(impl.socket_,
00975 receive_handler<MutableBufferSequence, Handler>(
00976 impl.socket_, this->io_service(), buffers, flags, handler));
00977 }
00978 }
00979 }
00980
00981
00982
00983 template <typename MutableBufferSequence>
00984 size_t receive_from(implementation_type& impl,
00985 const MutableBufferSequence& buffers,
00986 endpoint_type& sender_endpoint, socket_base::message_flags flags,
00987 asio::error_code& ec)
00988 {
00989 if (!is_open(impl))
00990 {
00991 ec = asio::error::bad_descriptor;
00992 return 0;
00993 }
00994
00995
00996 socket_ops::buf bufs[max_buffers];
00997 typename MutableBufferSequence::const_iterator iter = buffers.begin();
00998 typename MutableBufferSequence::const_iterator end = buffers.end();
00999 size_t i = 0;
01000 for (; iter != end && i < max_buffers; ++iter, ++i)
01001 {
01002 asio::mutable_buffer buffer(*iter);
01003 socket_ops::init_buf(bufs[i],
01004 asio::buffer_cast<void*>(buffer),
01005 asio::buffer_size(buffer));
01006 }
01007
01008
01009 for (;;)
01010 {
01011
01012 socket_addr_len_type addr_len = sender_endpoint.capacity();
01013 int bytes_recvd = socket_ops::recvfrom(impl.socket_, bufs, i, flags,
01014 sender_endpoint.data(), &addr_len, ec);
01015
01016
01017 if (bytes_recvd > 0)
01018 {
01019 sender_endpoint.resize(addr_len);
01020 return bytes_recvd;
01021 }
01022
01023
01024 if (bytes_recvd == 0)
01025 {
01026 ec = asio::error::eof;
01027 return 0;
01028 }
01029
01030
01031 if ((impl.flags_ & implementation_type::user_set_non_blocking)
01032 || (ec != asio::error::would_block
01033 && ec != asio::error::try_again))
01034 return 0;
01035
01036
01037 if (socket_ops::poll_read(impl.socket_, ec) < 0)
01038 return 0;
01039 }
01040 }
01041
01042 template <typename MutableBufferSequence, typename Handler>
01043 class receive_from_handler
01044 {
01045 public:
01046 receive_from_handler(socket_type socket,
01047 asio::io_service& io_service,
01048 const MutableBufferSequence& buffers, endpoint_type& endpoint,
01049 socket_base::message_flags flags, Handler handler)
01050 : socket_(socket),
01051 io_service_(io_service),
01052 work_(io_service),
01053 buffers_(buffers),
01054 sender_endpoint_(endpoint),
01055 flags_(flags),
01056 handler_(handler)
01057 {
01058 }
01059
01060 bool operator()(const asio::error_code& result)
01061 {
01062
01063 if (result != 0)
01064 {
01065 io_service_.post(bind_handler(handler_, result, 0));
01066 return true;
01067 }
01068
01069
01070 socket_ops::buf bufs[max_buffers];
01071 typename MutableBufferSequence::const_iterator iter = buffers_.begin();
01072 typename MutableBufferSequence::const_iterator end = buffers_.end();
01073 size_t i = 0;
01074 for (; iter != end && i < max_buffers; ++iter, ++i)
01075 {
01076 asio::mutable_buffer buffer(*iter);
01077 socket_ops::init_buf(bufs[i],
01078 asio::buffer_cast<void*>(buffer),
01079 asio::buffer_size(buffer));
01080 }
01081
01082
01083 socket_addr_len_type addr_len = sender_endpoint_.capacity();
01084 asio::error_code ec;
01085 int bytes = socket_ops::recvfrom(socket_, bufs, i, flags_,
01086 sender_endpoint_.data(), &addr_len, ec);
01087 if (bytes == 0)
01088 ec = asio::error::eof;
01089
01090
01091 if (ec == asio::error::would_block
01092 || ec == asio::error::try_again)
01093 return false;
01094
01095 sender_endpoint_.resize(addr_len);
01096 io_service_.post(bind_handler(handler_, ec, bytes < 0 ? 0 : bytes));
01097 return true;
01098 }
01099
01100 private:
01101 socket_type socket_;
01102 asio::io_service& io_service_;
01103 asio::io_service::work work_;
01104 MutableBufferSequence buffers_;
01105 endpoint_type& sender_endpoint_;
01106 socket_base::message_flags flags_;
01107 Handler handler_;
01108 };
01109
01110
01111
01112
01113 template <typename MutableBufferSequence, typename Handler>
01114 void async_receive_from(implementation_type& impl,
01115 const MutableBufferSequence& buffers, endpoint_type& sender_endpoint,
01116 socket_base::message_flags flags, Handler handler)
01117 {
01118 if (!is_open(impl))
01119 {
01120 this->io_service().post(bind_handler(handler,
01121 asio::error::bad_descriptor, 0));
01122 }
01123 else
01124 {
01125
01126 if (!(impl.flags_ & implementation_type::internal_non_blocking))
01127 {
01128 ioctl_arg_type non_blocking = 1;
01129 asio::error_code ec;
01130 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
01131 {
01132 this->io_service().post(bind_handler(handler, ec, 0));
01133 return;
01134 }
01135 impl.flags_ |= implementation_type::internal_non_blocking;
01136 }
01137
01138 reactor_.start_read_op(impl.socket_,
01139 receive_from_handler<MutableBufferSequence, Handler>(
01140 impl.socket_, this->io_service(), buffers,
01141 sender_endpoint, flags, handler));
01142 }
01143 }
01144
01145
01146 template <typename Socket>
01147 asio::error_code accept(implementation_type& impl,
01148 Socket& peer, endpoint_type* peer_endpoint, asio::error_code& ec)
01149 {
01150 if (!is_open(impl))
01151 {
01152 ec = asio::error::bad_descriptor;
01153 return ec;
01154 }
01155
01156
01157 if (peer.is_open())
01158 {
01159 ec = asio::error::already_open;
01160 return ec;
01161 }
01162
01163
01164 for (;;)
01165 {
01166
01167 asio::error_code ec;
01168 socket_holder new_socket;
01169 socket_addr_len_type addr_len = 0;
01170 if (peer_endpoint)
01171 {
01172 addr_len = peer_endpoint->capacity();
01173 new_socket.reset(socket_ops::accept(impl.socket_,
01174 peer_endpoint->data(), &addr_len, ec));
01175 }
01176 else
01177 {
01178 new_socket.reset(socket_ops::accept(impl.socket_, 0, 0, ec));
01179 }
01180
01181
01182 if (new_socket.get() >= 0)
01183 {
01184 if (peer_endpoint)
01185 peer_endpoint->resize(addr_len);
01186 peer.assign(impl.protocol_, new_socket.get(), ec);
01187 if (!ec)
01188 new_socket.release();
01189 return ec;
01190 }
01191
01192
01193 if (ec == asio::error::would_block
01194 || ec == asio::error::try_again)
01195 {
01196 if (impl.flags_ & implementation_type::user_set_non_blocking)
01197 return ec;
01198
01199 }
01200 else if (ec == asio::error::connection_aborted)
01201 {
01202 if (impl.flags_ & implementation_type::enable_connection_aborted)
01203 return ec;
01204
01205 }
01206 else
01207 return ec;
01208
01209
01210 if (socket_ops::poll_read(impl.socket_, ec) < 0)
01211 return ec;
01212 }
01213 }
01214
01215 template <typename Socket, typename Handler>
01216 class accept_handler
01217 {
01218 public:
01219 accept_handler(socket_type socket, asio::io_service& io_service,
01220 Socket& peer, const protocol_type& protocol,
01221 endpoint_type* peer_endpoint, bool enable_connection_aborted,
01222 Handler handler)
01223 : socket_(socket),
01224 io_service_(io_service),
01225 work_(io_service),
01226 peer_(peer),
01227 protocol_(protocol),
01228 peer_endpoint_(peer_endpoint),
01229 enable_connection_aborted_(enable_connection_aborted),
01230 handler_(handler)
01231 {
01232 }
01233
01234 bool operator()(const asio::error_code& result)
01235 {
01236
01237 if (result)
01238 {
01239 io_service_.post(bind_handler(handler_, result));
01240 return true;
01241 }
01242
01243
01244 asio::error_code ec;
01245 socket_holder new_socket;
01246 socket_addr_len_type addr_len = 0;
01247 if (peer_endpoint_)
01248 {
01249 addr_len = peer_endpoint_->capacity();
01250 new_socket.reset(socket_ops::accept(socket_,
01251 peer_endpoint_->data(), &addr_len, ec));
01252 }
01253 else
01254 {
01255 new_socket.reset(socket_ops::accept(socket_, 0, 0, ec));
01256 }
01257
01258
01259 if (ec == asio::error::would_block
01260 || ec == asio::error::try_again)
01261 return false;
01262 if (ec == asio::error::connection_aborted
01263 && !enable_connection_aborted_)
01264 return false;
01265
01266
01267 if (!ec)
01268 {
01269 if (peer_endpoint_)
01270 peer_endpoint_->resize(addr_len);
01271 peer_.assign(protocol_, new_socket.get(), ec);
01272 if (!ec)
01273 new_socket.release();
01274 }
01275
01276 io_service_.post(bind_handler(handler_, ec));
01277 return true;
01278 }
01279
01280 private:
01281 socket_type socket_;
01282 asio::io_service& io_service_;
01283 asio::io_service::work work_;
01284 Socket& peer_;
01285 protocol_type protocol_;
01286 endpoint_type* peer_endpoint_;
01287 bool enable_connection_aborted_;
01288 Handler handler_;
01289 };
01290
01291
01292
01293 template <typename Socket, typename Handler>
01294 void async_accept(implementation_type& impl, Socket& peer,
01295 endpoint_type* peer_endpoint, Handler handler)
01296 {
01297 if (!is_open(impl))
01298 {
01299 this->io_service().post(bind_handler(handler,
01300 asio::error::bad_descriptor));
01301 }
01302 else if (peer.is_open())
01303 {
01304 this->io_service().post(bind_handler(handler,
01305 asio::error::already_open));
01306 }
01307 else
01308 {
01309
01310 if (!(impl.flags_ & implementation_type::internal_non_blocking))
01311 {
01312 ioctl_arg_type non_blocking = 1;
01313 asio::error_code ec;
01314 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
01315 {
01316 this->io_service().post(bind_handler(handler, ec));
01317 return;
01318 }
01319 impl.flags_ |= implementation_type::internal_non_blocking;
01320 }
01321
01322 reactor_.start_read_op(impl.socket_,
01323 accept_handler<Socket, Handler>(
01324 impl.socket_, this->io_service(),
01325 peer, impl.protocol_, peer_endpoint,
01326 (impl.flags_ & implementation_type::enable_connection_aborted) != 0,
01327 handler));
01328 }
01329 }
01330
01331
01332 asio::error_code connect(implementation_type& impl,
01333 const endpoint_type& peer_endpoint, asio::error_code& ec)
01334 {
01335 if (!is_open(impl))
01336 {
01337 ec = asio::error::bad_descriptor;
01338 return ec;
01339 }
01340
01341 if (impl.flags_ & implementation_type::internal_non_blocking)
01342 {
01343
01344 ioctl_arg_type non_blocking = 0;
01345 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
01346 return ec;
01347 impl.flags_ &= ~implementation_type::internal_non_blocking;
01348 }
01349
01350
01351 socket_ops::connect(impl.socket_,
01352 peer_endpoint.data(), peer_endpoint.size(), ec);
01353 return ec;
01354 }
01355
01356 template <typename Handler>
01357 class connect_handler
01358 {
01359 public:
01360 connect_handler(socket_type socket, boost::shared_ptr<bool> completed,
01361 asio::io_service& io_service, Reactor& reactor, Handler handler)
01362 : socket_(socket),
01363 completed_(completed),
01364 io_service_(io_service),
01365 work_(io_service),
01366 reactor_(reactor),
01367 handler_(handler)
01368 {
01369 }
01370
01371 bool operator()(const asio::error_code& result)
01372 {
01373
01374
01375 if (*completed_)
01376 return true;
01377
01378
01379 *completed_ = true;
01380 reactor_.enqueue_cancel_ops_unlocked(socket_);
01381
01382
01383 if (result)
01384 {
01385 io_service_.post(bind_handler(handler_, result));
01386 return true;
01387 }
01388
01389
01390 int connect_error = 0;
01391 size_t connect_error_len = sizeof(connect_error);
01392 asio::error_code ec;
01393 if (socket_ops::getsockopt(socket_, SOL_SOCKET, SO_ERROR,
01394 &connect_error, &connect_error_len, ec) == socket_error_retval)
01395 {
01396 io_service_.post(bind_handler(handler_, ec));
01397 return true;
01398 }
01399
01400
01401 if (connect_error)
01402 {
01403 ec = asio::error_code(connect_error,
01404 asio::native_ecat);
01405 io_service_.post(bind_handler(handler_, ec));
01406 return true;
01407 }
01408
01409
01410 io_service_.post(bind_handler(handler_, ec));
01411 return true;
01412 }
01413
01414 private:
01415 socket_type socket_;
01416 boost::shared_ptr<bool> completed_;
01417 asio::io_service& io_service_;
01418 asio::io_service::work work_;
01419 Reactor& reactor_;
01420 Handler handler_;
01421 };
01422
01423
01424 template <typename Handler>
01425 void async_connect(implementation_type& impl,
01426 const endpoint_type& peer_endpoint, Handler handler)
01427 {
01428 if (!is_open(impl))
01429 {
01430 this->io_service().post(bind_handler(handler,
01431 asio::error::bad_descriptor));
01432 return;
01433 }
01434
01435
01436 if (!(impl.flags_ & implementation_type::internal_non_blocking))
01437 {
01438 ioctl_arg_type non_blocking = 1;
01439 asio::error_code ec;
01440 if (socket_ops::ioctl(impl.socket_, FIONBIO, &non_blocking, ec))
01441 {
01442 this->io_service().post(bind_handler(handler, ec));
01443 return;
01444 }
01445 impl.flags_ |= implementation_type::internal_non_blocking;
01446 }
01447
01448
01449
01450 asio::error_code ec;
01451 if (socket_ops::connect(impl.socket_, peer_endpoint.data(),
01452 peer_endpoint.size(), ec) == 0)
01453 {
01454
01455
01456 this->io_service().post(bind_handler(handler,
01457 asio::error_code()));
01458 }
01459 else if (ec == asio::error::in_progress
01460 || ec == asio::error::would_block)
01461 {
01462
01463
01464 boost::shared_ptr<bool> completed(new bool(false));
01465 reactor_.start_write_and_except_ops(impl.socket_,
01466 connect_handler<Handler>(
01467 impl.socket_, completed, this->io_service(), reactor_, handler));
01468 }
01469 else
01470 {
01471
01472 this->io_service().post(bind_handler(handler, ec));
01473 }
01474 }
01475
01476 private:
01477
01478 Reactor& reactor_;
01479 };
01480
01481 }
01482 }
01483
01484 #include "asio/detail/pop_options.hpp"
01485
01486 #endif // ASIO_DETAIL_REACTIVE_SOCKET_SERVICE_HPP