00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #ifndef ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP
00012 #define ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP
00013
00014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
00015 # pragma once
00016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
00017
00018 #include "asio/detail/push_options.hpp"
00019
00020 #include "asio/detail/win_iocp_io_service_fwd.hpp"
00021
00022 #if defined(ASIO_HAS_IOCP)
00023
00024 #include "asio/detail/push_options.hpp"
00025 #include <limits>
00026 #include <boost/throw_exception.hpp>
00027 #include "asio/detail/pop_options.hpp"
00028
00029 #include "asio/io_service.hpp"
00030 #include "asio/system_error.hpp"
00031 #include "asio/detail/call_stack.hpp"
00032 #include "asio/detail/handler_alloc_helpers.hpp"
00033 #include "asio/detail/handler_invoke_helpers.hpp"
00034 #include "asio/detail/service_base.hpp"
00035 #include "asio/detail/socket_types.hpp"
00036 #include "asio/detail/win_iocp_operation.hpp"
00037
00038 namespace asio {
00039 namespace detail {
00040
00041 class win_iocp_io_service
00042 : public asio::detail::service_base<win_iocp_io_service>
00043 {
00044 public:
00045
00046 typedef win_iocp_operation operation;
00047
00048
00049 win_iocp_io_service(asio::io_service& io_service)
00050 : asio::detail::service_base<win_iocp_io_service>(io_service),
00051 iocp_(),
00052 outstanding_work_(0),
00053 stopped_(0),
00054 shutdown_(0)
00055 {
00056 }
00057
00058 void init(size_t concurrency_hint)
00059 {
00060 iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
00061 static_cast<DWORD>((std::min<size_t>)(concurrency_hint, DWORD(~0))));
00062 if (!iocp_.handle)
00063 {
00064 DWORD last_error = ::GetLastError();
00065 asio::system_error e(
00066 asio::error_code(last_error, asio::native_ecat),
00067 "iocp");
00068 boost::throw_exception(e);
00069 }
00070 }
00071
00072
00073 void shutdown_service()
00074 {
00075 ::InterlockedExchange(&shutdown_, 1);
00076
00077 for (;;)
00078 {
00079 DWORD bytes_transferred = 0;
00080 #if (WINVER < 0x0500)
00081 DWORD completion_key = 0;
00082 #else
00083 DWORD_PTR completion_key = 0;
00084 #endif
00085 LPOVERLAPPED overlapped = 0;
00086 ::SetLastError(0);
00087 BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle,
00088 &bytes_transferred, &completion_key, &overlapped, 0);
00089 DWORD last_error = ::GetLastError();
00090 if (!ok && overlapped == 0 && last_error == WAIT_TIMEOUT)
00091 break;
00092 if (overlapped)
00093 static_cast<operation*>(overlapped)->destroy();
00094 }
00095 }
00096
00097
00098 void register_handle(HANDLE handle)
00099 {
00100 ::CreateIoCompletionPort(handle, iocp_.handle, 0, 0);
00101 }
00102
00103
00104 size_t run(asio::error_code& ec)
00105 {
00106 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
00107 {
00108 ec = asio::error_code();
00109 return 0;
00110 }
00111
00112 call_stack<win_iocp_io_service>::context ctx(this);
00113
00114 size_t n = 0;
00115 while (do_one(true, ec))
00116 if (n != (std::numeric_limits<size_t>::max)())
00117 ++n;
00118 return n;
00119 }
00120
00121
00122 size_t run_one(asio::error_code& ec)
00123 {
00124 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
00125 {
00126 ec = asio::error_code();
00127 return 0;
00128 }
00129
00130 call_stack<win_iocp_io_service>::context ctx(this);
00131
00132 return do_one(true, ec);
00133 }
00134
00135
00136 size_t poll(asio::error_code& ec)
00137 {
00138 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
00139 {
00140 ec = asio::error_code();
00141 return 0;
00142 }
00143
00144 call_stack<win_iocp_io_service>::context ctx(this);
00145
00146 size_t n = 0;
00147 while (do_one(false, ec))
00148 if (n != (std::numeric_limits<size_t>::max)())
00149 ++n;
00150 return n;
00151 }
00152
00153
00154 size_t poll_one(asio::error_code& ec)
00155 {
00156 if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
00157 {
00158 ec = asio::error_code();
00159 return 0;
00160 }
00161
00162 call_stack<win_iocp_io_service>::context ctx(this);
00163
00164 return do_one(false, ec);
00165 }
00166
00167
00168 void stop()
00169 {
00170 if (::InterlockedExchange(&stopped_, 1) == 0)
00171 {
00172 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
00173 {
00174 DWORD last_error = ::GetLastError();
00175 asio::system_error e(
00176 asio::error_code(last_error, asio::native_ecat),
00177 "pqcs");
00178 boost::throw_exception(e);
00179 }
00180 }
00181 }
00182
00183
00184 void reset()
00185 {
00186 ::InterlockedExchange(&stopped_, 0);
00187 }
00188
00189
00190 void work_started()
00191 {
00192 ::InterlockedIncrement(&outstanding_work_);
00193 }
00194
00195
00196 void work_finished()
00197 {
00198 if (::InterlockedDecrement(&outstanding_work_) == 0)
00199 stop();
00200 }
00201
00202
00203 template <typename Handler>
00204 void dispatch(Handler handler)
00205 {
00206 if (call_stack<win_iocp_io_service>::contains(this))
00207 asio_handler_invoke_helpers::invoke(handler, &handler);
00208 else
00209 post(handler);
00210 }
00211
00212
00213 template <typename Handler>
00214 void post(Handler handler)
00215 {
00216
00217 if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
00218 return;
00219
00220
00221 typedef handler_operation<Handler> value_type;
00222 typedef handler_alloc_traits<Handler, value_type> alloc_traits;
00223 raw_handler_ptr<alloc_traits> raw_ptr(handler);
00224 handler_ptr<alloc_traits> ptr(raw_ptr, *this, handler);
00225
00226
00227 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, ptr.get()))
00228 {
00229 DWORD last_error = ::GetLastError();
00230 asio::system_error e(
00231 asio::error_code(last_error, asio::native_ecat),
00232 "pqcs");
00233 boost::throw_exception(e);
00234 }
00235
00236
00237 ptr.release();
00238 }
00239
00240
00241 void post_completion(win_iocp_operation* op, DWORD op_last_error,
00242 DWORD bytes_transferred)
00243 {
00244
00245 if (!::PostQueuedCompletionStatus(iocp_.handle,
00246 bytes_transferred, op_last_error, op))
00247 {
00248 DWORD last_error = ::GetLastError();
00249 asio::system_error e(
00250 asio::error_code(last_error, asio::native_ecat),
00251 "pqcs");
00252 boost::throw_exception(e);
00253 }
00254 }
00255
00256 private:
00257
00258
00259
00260 size_t do_one(bool block, asio::error_code& ec)
00261 {
00262 for (;;)
00263 {
00264
00265 DWORD bytes_transferred = 0;
00266 #if (WINVER < 0x0500)
00267 DWORD completion_key = 0;
00268 #else
00269 DWORD_PTR completion_key = 0;
00270 #endif
00271 LPOVERLAPPED overlapped = 0;
00272 ::SetLastError(0);
00273 BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
00274 &completion_key, &overlapped, block ? 1000 : 0);
00275 DWORD last_error = ::GetLastError();
00276
00277 if (!ok && overlapped == 0)
00278 {
00279 if (block && last_error == WAIT_TIMEOUT)
00280 continue;
00281 ec = asio::error_code();
00282 return 0;
00283 }
00284
00285 if (overlapped)
00286 {
00287
00288 if (last_error == 0)
00289 {
00290 last_error = completion_key;
00291 }
00292
00293
00294
00295 auto_work work(*this);
00296
00297
00298 operation* op = static_cast<operation*>(overlapped);
00299 op->do_completion(last_error, bytes_transferred);
00300
00301 ec = asio::error_code();
00302 return 1;
00303 }
00304 else
00305 {
00306
00307
00308 if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
00309 {
00310
00311 if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
00312 {
00313 DWORD last_error = ::GetLastError();
00314 ec = asio::error_code(last_error,
00315 asio::native_ecat);
00316 return 0;
00317 }
00318
00319 ec = asio::error_code();
00320 return 0;
00321 }
00322 }
00323 }
00324 }
00325
00326 struct auto_work
00327 {
00328 auto_work(win_iocp_io_service& io_service)
00329 : io_service_(io_service)
00330 {
00331 io_service_.work_started();
00332 }
00333
00334 ~auto_work()
00335 {
00336 io_service_.work_finished();
00337 }
00338
00339 private:
00340 win_iocp_io_service& io_service_;
00341 };
00342
00343 template <typename Handler>
00344 struct handler_operation
00345 : public operation
00346 {
00347 handler_operation(win_iocp_io_service& io_service,
00348 Handler handler)
00349 : operation(&handler_operation<Handler>::do_completion_impl,
00350 &handler_operation<Handler>::destroy_impl),
00351 io_service_(io_service),
00352 handler_(handler)
00353 {
00354 io_service_.work_started();
00355 }
00356
00357 ~handler_operation()
00358 {
00359 io_service_.work_finished();
00360 }
00361
00362 private:
00363
00364 handler_operation(const handler_operation&);
00365 void operator=(const handler_operation&);
00366
00367 static void do_completion_impl(operation* op, DWORD, size_t)
00368 {
00369
00370 typedef handler_operation<Handler> op_type;
00371 op_type* handler_op(static_cast<op_type*>(op));
00372 typedef handler_alloc_traits<Handler, op_type> alloc_traits;
00373 handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
00374
00375
00376
00377 Handler handler(handler_op->handler_);
00378
00379
00380 ptr.reset();
00381
00382
00383 asio_handler_invoke_helpers::invoke(handler, &handler);
00384 }
00385
00386 static void destroy_impl(operation* op)
00387 {
00388
00389 typedef handler_operation<Handler> op_type;
00390 op_type* handler_op(static_cast<op_type*>(op));
00391 typedef handler_alloc_traits<Handler, op_type> alloc_traits;
00392 handler_ptr<alloc_traits> ptr(handler_op->handler_, handler_op);
00393 }
00394
00395 win_iocp_io_service& io_service_;
00396 Handler handler_;
00397 };
00398
00399
00400 struct iocp_holder
00401 {
00402 HANDLE handle;
00403 iocp_holder() : handle(0) {}
00404 ~iocp_holder() { if (handle) ::CloseHandle(handle); }
00405 } iocp_;
00406
00407
00408 long outstanding_work_;
00409
00410
00411 long stopped_;
00412
00413
00414 long shutdown_;
00415 };
00416
00417 }
00418 }
00419
00420 #endif // defined(ASIO_HAS_IOCP)
00421
00422 #include "asio/detail/pop_options.hpp"
00423
00424 #endif // ASIO_DETAIL_WIN_IOCP_IO_SERVICE_HPP