00001
00002
00003
00004
00005
00006
00007
00008 #include <fstream>
00009 #include <set>
00010 #include <numeric>
00011 #include <stdexcept>
00012 #include <boost/bind.hpp>
00013 #include <boost/date_time/posix_time/posix_time_types.hpp>
00014 #include <boost/date_time/posix_time/ptime.hpp>
00015 #include <boost/ref.hpp>
00016 #include <boost/optional.hpp>
00017 #include <boost/lexical_cast.hpp>
00018 #include <boost/filesystem/operations.hpp>
00019
00020 #include "libtorrent/kademlia/node.hpp"
00021 #include "libtorrent/kademlia/node_id.hpp"
00022 #include "libtorrent/kademlia/traversal_algorithm.hpp"
00023 #include "libtorrent/kademlia/dht_tracker.hpp"
00024
00025 #include "libtorrent/socket.hpp"
00026 #include "libtorrent/bencode.hpp"
00027 #include "libtorrent/io.hpp"
00028 #include "libtorrent/version.hpp"
00029
00030 using boost::posix_time::ptime;
00031 using boost::posix_time::time_duration;
00032 using boost::posix_time::second_clock;
00033 using boost::posix_time::microsec_clock;
00034 using boost::posix_time::seconds;
00035 using boost::posix_time::minutes;
00036 using boost::posix_time::hours;
00037 using boost::posix_time::milliseconds;
00038 using boost::ref;
00039 using boost::lexical_cast;
00040 using libtorrent::dht::node_impl;
00041 using libtorrent::dht::node_id;
00042 using libtorrent::dht::packet_t;
00043 using libtorrent::dht::msg;
00044 using libtorrent::dht::packet_iterator;
00045 namespace messages = libtorrent::dht::messages;
00046 using namespace libtorrent::detail;
00047
00048 using asio::ip::udp;
00049 typedef asio::ip::address_v4 address;
00050
00051 namespace
00052 {
00053 const int tick_period = 1;
00054
00055 struct count_peers
00056 {
00057 int& count;
00058 count_peers(int& c): count(c) {}
00059 void operator()(std::pair<libtorrent::dht::node_id
00060 , libtorrent::dht::torrent_entry> const& t)
00061 {
00062 count += std::distance(t.second.peers.begin()
00063 , t.second.peers.end());
00064 }
00065 };
00066
00067 boost::optional<node_id> read_id(libtorrent::entry const& d)
00068 {
00069 using namespace libtorrent;
00070 using libtorrent::dht::node_id;
00071
00072 if (d.type() != entry::dictionary_t) return boost::optional<node_id>();
00073 entry const* nid = d.find_key("node-id");
00074 if (!nid
00075 || nid->type() != entry::string_t
00076 || nid->string().length() != 40)
00077 return boost::optional<node_id>();
00078 return boost::optional<node_id>(
00079 boost::lexical_cast<node_id>(nid->string()));
00080 }
00081
00082 template <class EndpointType>
00083 void read_endpoint_list(libtorrent::entry const* n, std::vector<EndpointType>& epl)
00084 {
00085 using namespace libtorrent;
00086 entry::list_type const& contacts = n->list();
00087 for (entry::list_type::const_iterator i = contacts.begin()
00088 , end(contacts.end()); i != end; ++i)
00089 {
00090 std::string const& p = i->string();
00091 if (p.size() < 6) continue;
00092 std::string::const_iterator in = p.begin();
00093 if (p.size() == 6)
00094 epl.push_back(read_v4_endpoint<EndpointType>(in));
00095 else if (p.size() == 18)
00096 epl.push_back(read_v6_endpoint<EndpointType>(in));
00097 }
00098 }
00099
00100 }
00101
00102 namespace libtorrent { namespace dht
00103 {
00104
00105 void intrusive_ptr_add_ref(dht_tracker const* c)
00106 {
00107 assert(c != 0);
00108 assert(c->m_refs >= 0);
00109 ++c->m_refs;
00110 }
00111
00112 void intrusive_ptr_release(dht_tracker const* c)
00113 {
00114 assert(c != 0);
00115 assert(c->m_refs > 0);
00116 if (--c->m_refs == 0)
00117 delete c;
00118 }
00119
00120 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00121 TORRENT_DEFINE_LOG(dht_tracker)
00122 #endif
00123
00124
00125
00126 dht_tracker::dht_tracker(asio::io_service& ios, dht_settings const& settings
00127 , asio::ip::address listen_interface, entry const& bootstrap)
00128 : m_strand(ios)
00129 , m_socket(ios, udp::endpoint(listen_interface, settings.service_port))
00130 , m_dht(bind(&dht_tracker::send_packet, this, _1), settings
00131 , read_id(bootstrap))
00132 , m_buffer(0)
00133 , m_last_refresh(second_clock::universal_time() - hours(1))
00134 , m_timer(ios)
00135 , m_connection_timer(ios)
00136 , m_refresh_timer(ios)
00137 , m_settings(settings)
00138 , m_refresh_bucket(160)
00139 , m_host_resolver(ios)
00140 , m_refs(0)
00141 {
00142 using boost::bind;
00143
00144 m_in_buf[0].resize(1000);
00145 m_in_buf[1].resize(1000);
00146 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00147 m_counter = 0;
00148 std::fill_n(m_replies_bytes_sent, 5, 0);
00149 std::fill_n(m_queries_bytes_received, 5, 0);
00150 std::fill_n(m_replies_sent, 5, 0);
00151 std::fill_n(m_queries_received, 5, 0);
00152 m_announces = 0;
00153 m_failed_announces = 0;
00154 m_total_message_input = 0;
00155 m_ut_message_input = 0;
00156 m_lt_message_input = 0;
00157 m_mp_message_input = 0;
00158 m_gr_message_input = 0;
00159 m_mo_message_input = 0;
00160 m_total_in_bytes = 0;
00161 m_total_out_bytes = 0;
00162 m_queries_out_bytes = 0;
00163
00164
00165
00166 #endif
00167 std::vector<udp::endpoint> initial_nodes;
00168
00169 if (bootstrap.type() == entry::dictionary_t)
00170 {
00171 try
00172 {
00173 if (entry const* nodes = bootstrap.find_key("nodes"))
00174 read_endpoint_list<udp::endpoint>(nodes, initial_nodes);
00175 } catch (std::exception&) {}
00176 }
00177
00178 m_socket.async_receive_from(asio::buffer(&m_in_buf[m_buffer][0]
00179 , m_in_buf[m_buffer].size()), m_remote_endpoint[m_buffer]
00180 , m_strand.wrap(bind(&dht_tracker::on_receive, self(), _1, _2)));
00181 m_timer.expires_from_now(seconds(1));
00182 m_timer.async_wait(m_strand.wrap(bind(&dht_tracker::tick, self(), _1)));
00183
00184 m_connection_timer.expires_from_now(seconds(10));
00185 m_connection_timer.async_wait(m_strand.wrap(
00186 bind(&dht_tracker::connection_timeout, self(), _1)));
00187
00188 m_refresh_timer.expires_from_now(minutes(15));
00189 m_refresh_timer.async_wait(m_strand.wrap(bind(&dht_tracker::refresh_timeout, self(), _1)));
00190
00191 m_dht.bootstrap(initial_nodes, bind(&dht_tracker::on_bootstrap, self()));
00192 }
00193
00194 void dht_tracker::stop()
00195 {
00196 m_timer.cancel();
00197 m_connection_timer.cancel();
00198 m_refresh_timer.cancel();
00199 m_socket.close();
00200 }
00201
00202 void dht_tracker::dht_status(session_status& s)
00203 {
00204 boost::tie(s.dht_nodes, s.dht_node_cache) = m_dht.size();
00205 s.dht_torrents = m_dht.data_size();
00206 }
00207
00208 void dht_tracker::connection_timeout(asio::error_code const& e)
00209 try
00210 {
00211 if (e) return;
00212 time_duration d = m_dht.connection_timeout();
00213 m_connection_timer.expires_from_now(d);
00214 m_connection_timer.async_wait(m_strand.wrap(bind(&dht_tracker::connection_timeout, self(), _1)));
00215 }
00216 catch (std::exception& exc)
00217 {
00218 #ifndef NDEBUG
00219 std::cerr << "exception-type: " << typeid(exc).name() << std::endl;
00220 std::cerr << "what: " << exc.what() << std::endl;
00221 assert(false);
00222 #endif
00223 };
00224
00225 void dht_tracker::refresh_timeout(asio::error_code const& e)
00226 try
00227 {
00228 if (e) return;
00229 time_duration d = m_dht.refresh_timeout();
00230 m_refresh_timer.expires_from_now(d);
00231 m_refresh_timer.async_wait(m_strand.wrap(
00232 bind(&dht_tracker::refresh_timeout, self(), _1)));
00233 }
00234 catch (std::exception&)
00235 {
00236 assert(false);
00237 };
00238
00239 void dht_tracker::rebind(asio::ip::address listen_interface, int listen_port)
00240 {
00241 m_socket.close();
00242 udp::endpoint ep(listen_interface, listen_port);
00243 m_socket.open(ep.protocol());
00244 m_socket.bind(ep);
00245 }
00246
00247 void dht_tracker::tick(asio::error_code const& e)
00248 try
00249 {
00250 if (e) return;
00251 m_timer.expires_from_now(minutes(tick_period));
00252 m_timer.async_wait(m_strand.wrap(bind(&dht_tracker::tick, this, _1)));
00253
00254 m_dht.new_write_key();
00255
00256 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00257 static bool first = true;
00258 if (first)
00259 {
00260 boost::filesystem::create_directory("libtorrent_logs");
00261 }
00262
00263 std::ofstream st("libtorrent_logs/routing_table_state.txt", std::ios_base::trunc);
00264 m_dht.print_state(st);
00265
00266
00267 int torrents = std::distance(m_dht.begin_data(), m_dht.end_data());
00268
00269
00270 int peers = 0;
00271 std::for_each(m_dht.begin_data(), m_dht.end_data(), count_peers(peers));
00272
00273 std::ofstream pc("libtorrent_logs/dht_stats.log", std::ios_base::app);
00274 if (first)
00275 {
00276 first = false;
00277 using boost::posix_time::to_simple_string;
00278 pc << "\n\n ***** starting log at " << to_simple_string(
00279 second_clock::universal_time()) << " *****\n\n"
00280 << "minute:active nodes:passive nodes"
00281 ":ping replies sent:ping queries recvd:ping"
00282 ":ping replies sent:ping queries recvd:ping"
00283 ":find_node replies bytes sent:find_node queries bytes recv"
00284 ":find_node replies bytes sent:find_node queries bytes recv"
00285 ":get_peers replies sent:get_peers queries recvd:get_peers"
00286 ":get_peers replies bytes sent:get_peers queries bytes recv"
00287 ":announce_peer replies sent:announce_peer queries recvd:announce_peer"
00288 ":announce_peer replies bytes sent:announce_peer queries bytes recv"
00289 ":error replies sent:error queries recvd:error"
00290 ":error replies bytes sent:error queries bytes recv"
00291 ":num torrents:num peers:announces per min"
00292 ":failed announces per min:total msgs per min"
00293 ":ut msgs per min:lt msgs per min:mp msgs per min"
00294 ":gr msgs per min:bytes in per sec:bytes out per sec"
00295 ":queries out bytes per sec\n\n";
00296 }
00297
00298 int active;
00299 int passive;
00300 boost::tie(active, passive) = m_dht.size();
00301 pc << (m_counter * tick_period)
00302 << "\t" << active
00303 << "\t" << passive;
00304 for (int i = 0; i < 5; ++i)
00305 pc << "\t" << (m_replies_sent[i] / float(tick_period))
00306 << "\t" << (m_queries_received[i] / float(tick_period))
00307 << "\t" << (m_replies_bytes_sent[i] / float(tick_period*60))
00308 << "\t" << (m_queries_bytes_received[i] / float(tick_period*60));
00309
00310 pc << "\t" << torrents
00311 << "\t" << peers
00312 << "\t" << m_announces / float(tick_period)
00313 << "\t" << m_failed_announces / float(tick_period)
00314 << "\t" << (m_total_message_input / float(tick_period))
00315 << "\t" << (m_ut_message_input / float(tick_period))
00316 << "\t" << (m_lt_message_input / float(tick_period))
00317 << "\t" << (m_mp_message_input / float(tick_period))
00318 << "\t" << (m_gr_message_input / float(tick_period))
00319 << "\t" << (m_mo_message_input / float(tick_period))
00320 << "\t" << (m_total_in_bytes / float(tick_period*60))
00321 << "\t" << (m_total_out_bytes / float(tick_period*60))
00322 << "\t" << (m_queries_out_bytes / float(tick_period*60))
00323 << std::endl;
00324 ++m_counter;
00325 std::fill_n(m_replies_bytes_sent, 5, 0);
00326 std::fill_n(m_queries_bytes_received, 5, 0);
00327 std::fill_n(m_replies_sent, 5, 0);
00328 std::fill_n(m_queries_received, 5, 0);
00329 m_announces = 0;
00330 m_failed_announces = 0;
00331 m_total_message_input = 0;
00332 m_ut_message_input = 0;
00333 m_lt_message_input = 0;
00334 m_total_in_bytes = 0;
00335 m_total_out_bytes = 0;
00336 m_queries_out_bytes = 0;
00337 #endif
00338 }
00339 catch (std::exception&)
00340 {
00341 assert(false);
00342 };
00343
00344 void dht_tracker::announce(sha1_hash const& ih, int listen_port
00345 , boost::function<void(std::vector<tcp::endpoint> const&
00346 , sha1_hash const&)> f)
00347 {
00348 m_dht.announce(ih, listen_port, f);
00349 }
00350
00351
00352
00353 void dht_tracker::on_receive(asio::error_code const& error, size_t bytes_transferred)
00354 try
00355 {
00356 if (error == asio::error::operation_aborted) return;
00357
00358 int current_buffer = m_buffer;
00359 m_buffer = (m_buffer + 1) & 1;
00360 m_socket.async_receive_from(asio::buffer(&m_in_buf[m_buffer][0]
00361 , m_in_buf[m_buffer].size()), m_remote_endpoint[m_buffer]
00362 , m_strand.wrap(bind(&dht_tracker::on_receive, self(), _1, _2)));
00363
00364 if (error) return;
00365
00366 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00367 ++m_total_message_input;
00368 m_total_in_bytes += bytes_transferred;
00369 #endif
00370
00371 try
00372 {
00373 using libtorrent::entry;
00374 using libtorrent::bdecode;
00375
00376 assert(bytes_transferred > 0);
00377
00378 entry e = bdecode(m_in_buf[current_buffer].begin()
00379 , m_in_buf[current_buffer].end());
00380
00381 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00382 TORRENT_LOG(dht_tracker) << microsec_clock::universal_time()
00383 << " RECEIVED [" << m_remote_endpoint[current_buffer]
00384 << "]:";
00385 #endif
00386
00387 libtorrent::dht::msg m;
00388 m.message_id = 0;
00389 m.addr = m_remote_endpoint[current_buffer];
00390 m.transaction_id = e["t"].string();
00391
00392 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00393 try
00394 {
00395 entry const* ver = e.find_key("v");
00396 if (!ver) throw std::exception();
00397
00398 std::string const& client = ver->string();
00399 if (client.size() > 1 && std::equal(client.begin(), client.begin() + 2, "UT"))
00400 {
00401 ++m_ut_message_input;
00402 TORRENT_LOG(dht_tracker) << " client: uTorrent";
00403 }
00404 else if (client.size() > 1 && std::equal(client.begin(), client.begin() + 2, "LT"))
00405 {
00406 ++m_lt_message_input;
00407 TORRENT_LOG(dht_tracker) << " client: libtorrent";
00408 }
00409 else if (client.size() > 1 && std::equal(client.begin(), client.begin() + 2, "MP"))
00410 {
00411 ++m_mp_message_input;
00412 TORRENT_LOG(dht_tracker) << " client: MooPolice";
00413 }
00414 else if (client.size() > 1 && std::equal(client.begin(), client.begin() + 2, "GR"))
00415 {
00416 ++m_gr_message_input;
00417 TORRENT_LOG(dht_tracker) << " client: GetRight";
00418 }
00419 else if (client.size() > 1 && std::equal(client.begin(), client.begin() + 2, "MO"))
00420 {
00421 ++m_mo_message_input;
00422 TORRENT_LOG(dht_tracker) << " client: Mono Torrent";
00423 }
00424 else
00425 {
00426 TORRENT_LOG(dht_tracker) << " client: generic";
00427 }
00428 }
00429 catch (std::exception&)
00430 {
00431 TORRENT_LOG(dht_tracker) << " client: generic";
00432 };
00433 #endif
00434
00435 std::string const& msg_type = e["y"].string();
00436
00437 if (msg_type == "r")
00438 {
00439 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00440 TORRENT_LOG(dht_tracker) << " reply: transaction: "
00441 << m.transaction_id;
00442 #endif
00443
00444 m.reply = true;
00445 entry const& r = e["r"];
00446 std::string const& id = r["id"].string();
00447 if (id.size() != 20) throw std::runtime_error("invalid size of id");
00448 std::copy(id.begin(), id.end(), m.id.begin());
00449
00450 if (entry const* n = r.find_key("values"))
00451 {
00452 m.peers.clear();
00453 if (n->list().size() == 1)
00454 {
00455
00456 std::string const& peers = n->list().front().string();
00457 std::string::const_iterator i = peers.begin();
00458 std::string::const_iterator end = peers.end();
00459
00460 while (std::distance(i, end) >= 6)
00461 m.peers.push_back(read_v4_endpoint<tcp::endpoint>(i));
00462 }
00463 else
00464 {
00465
00466 read_endpoint_list<tcp::endpoint>(n, m.peers);
00467 }
00468 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00469 TORRENT_LOG(dht_tracker) << " peers: " << m.peers.size();
00470 #endif
00471 }
00472
00473 m.nodes.clear();
00474 if (entry const* n = r.find_key("nodes"))
00475 {
00476 std::string const& nodes = n->string();
00477 std::string::const_iterator i = nodes.begin();
00478 std::string::const_iterator end = nodes.end();
00479
00480 while (std::distance(i, end) >= 26)
00481 {
00482 node_id id;
00483 std::copy(i, i + 20, id.begin());
00484 i += 20;
00485 m.nodes.push_back(libtorrent::dht::node_entry(
00486 id, read_v4_endpoint<udp::endpoint>(i)));
00487 }
00488 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00489 TORRENT_LOG(dht_tracker) << " nodes: " << m.nodes.size();
00490 #endif
00491 }
00492
00493 if (entry const* n = r.find_key("nodes2"))
00494 {
00495 entry::list_type const& contacts = n->list();
00496 for (entry::list_type::const_iterator i = contacts.begin()
00497 , end(contacts.end()); i != end; ++i)
00498 {
00499 std::string const& p = i->string();
00500 if (p.size() < 6 + 20) continue;
00501 std::string::const_iterator in = p.begin();
00502
00503 node_id id;
00504 std::copy(in, in + 20, id.begin());
00505 in += 20;
00506 if (p.size() == 6 + 20)
00507 m.nodes.push_back(libtorrent::dht::node_entry(
00508 id, read_v4_endpoint<udp::endpoint>(in)));
00509 else if (p.size() == 18 + 20)
00510 m.nodes.push_back(libtorrent::dht::node_entry(
00511 id, read_v6_endpoint<udp::endpoint>(in)));
00512 }
00513 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00514 TORRENT_LOG(dht_tracker) << " nodes2 + nodes: " << m.nodes.size();
00515 #endif
00516 }
00517
00518 entry const* token = r.find_key("token");
00519 if (token) m.write_token = *token;
00520 }
00521 else if (msg_type == "q")
00522 {
00523 m.reply = false;
00524 entry const& a = e["a"];
00525 std::string const& id = a["id"].string();
00526 if (id.size() != 20) throw std::runtime_error("invalid size of id");
00527 std::copy(id.begin(), id.end(), m.id.begin());
00528
00529 std::string request_kind(e["q"].string());
00530 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00531 TORRENT_LOG(dht_tracker) << " query: " << request_kind;
00532 #endif
00533
00534 if (request_kind == "ping")
00535 {
00536 m.message_id = libtorrent::dht::messages::ping;
00537 }
00538 else if (request_kind == "find_node")
00539 {
00540 std::string const& target = a["target"].string();
00541 if (target.size() != 20) throw std::runtime_error("invalid size of target id");
00542 std::copy(target.begin(), target.end(), m.info_hash.begin());
00543 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00544 TORRENT_LOG(dht_tracker) << " target: "
00545 << boost::lexical_cast<std::string>(m.info_hash);
00546 #endif
00547
00548 m.message_id = libtorrent::dht::messages::find_node;
00549 }
00550 else if (request_kind == "get_peers")
00551 {
00552 std::string const& info_hash = a["info_hash"].string();
00553 if (info_hash.size() != 20) throw std::runtime_error("invalid size of info-hash");
00554 std::copy(info_hash.begin(), info_hash.end(), m.info_hash.begin());
00555 m.message_id = libtorrent::dht::messages::get_peers;
00556 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00557 TORRENT_LOG(dht_tracker) << " info_hash: "
00558 << boost::lexical_cast<std::string>(m.info_hash);
00559 #endif
00560 }
00561 else if (request_kind == "announce_peer")
00562 {
00563 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00564 ++m_announces;
00565 #endif
00566 std::string const& info_hash = a["info_hash"].string();
00567 if (info_hash.size() != 20)
00568 throw std::runtime_error("invalid size of info-hash");
00569 std::copy(info_hash.begin(), info_hash.end(), m.info_hash.begin());
00570 m.port = a["port"].integer();
00571 m.write_token = a["token"];
00572 m.message_id = libtorrent::dht::messages::announce_peer;
00573 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00574 TORRENT_LOG(dht_tracker) << " info_hash: "
00575 << boost::lexical_cast<std::string>(m.info_hash);
00576 TORRENT_LOG(dht_tracker) << " port: " << m.port;
00577
00578 if (!m_dht.verify_token(m))
00579 ++m_failed_announces;
00580 #endif
00581 }
00582 else
00583 {
00584 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00585 TORRENT_LOG(dht_tracker) << " *** UNSUPPORTED REQUEST *** : "
00586 << request_kind;
00587 #endif
00588 throw std::runtime_error("unsupported request: " + request_kind);
00589 }
00590 }
00591 else if (msg_type == "e")
00592 {
00593 entry::list_type const& list = e["e"].list();
00594 m.message_id = messages::error;
00595 m.error_msg = list.back().string();
00596 m.error_code = list.front().integer();
00597 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00598 TORRENT_LOG(dht_tracker) << " error: " << m.error_code << " "
00599 << m.error_msg;
00600 #endif
00601 }
00602 else
00603 {
00604 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00605 TORRENT_LOG(dht_tracker) << " *** UNSUPPORTED MESSAGE TYPE *** : "
00606 << msg_type;
00607 #endif
00608 throw std::runtime_error("unsupported message type: " + msg_type);
00609 }
00610
00611 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00612 if (!m.reply)
00613 {
00614 ++m_queries_received[m.message_id];
00615 m_queries_bytes_received[m.message_id] += int(bytes_transferred);
00616 }
00617 TORRENT_LOG(dht_tracker) << e;
00618 #endif
00619
00620 m_dht.incoming(m);
00621 }
00622 catch (std::exception& e)
00623 {
00624 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00625 TORRENT_LOG(dht_tracker) << "invalid incoming packet: "
00626 << e.what();
00627 #endif
00628 }
00629 }
00630 catch (std::exception& e)
00631 {
00632 assert(false);
00633 };
00634
00635 entry dht_tracker::state() const
00636 {
00637 entry ret(entry::dictionary_t);
00638 {
00639 entry nodes(entry::list_t);
00640 for (node_impl::iterator i(m_dht.begin())
00641 , end(m_dht.end()); i != end; ++i)
00642 {
00643 std::string node;
00644 std::back_insert_iterator<std::string> out(node);
00645 write_endpoint(i->addr, out);
00646 nodes.list().push_back(entry(node));
00647 }
00648 bucket_t cache;
00649 m_dht.replacement_cache(cache);
00650 for (bucket_t::iterator i(cache.begin())
00651 , end(cache.end()); i != end; ++i)
00652 {
00653 std::string node;
00654 std::back_insert_iterator<std::string> out(node);
00655 write_endpoint(i->addr, out);
00656 nodes.list().push_back(entry(node));
00657 }
00658 if (!nodes.list().empty())
00659 ret["nodes"] = nodes;
00660 }
00661
00662 ret["node-id"] = boost::lexical_cast<std::string>(m_dht.nid());
00663 return ret;
00664 }
00665
00666 void dht_tracker::add_node(udp::endpoint node)
00667 {
00668 m_dht.add_node(node);
00669 }
00670
00671 void dht_tracker::add_node(std::pair<std::string, int> const& node)
00672 {
00673 udp::resolver::query q(node.first, lexical_cast<std::string>(node.second));
00674 m_host_resolver.async_resolve(q, m_strand.wrap(
00675 bind(&dht_tracker::on_name_lookup, self(), _1, _2)));
00676 }
00677
00678 void dht_tracker::on_name_lookup(asio::error_code const& e
00679 , udp::resolver::iterator host) try
00680 {
00681 if (e || host == udp::resolver::iterator()) return;
00682 add_node(host->endpoint());
00683 }
00684 catch (std::exception&)
00685 {
00686 assert(false);
00687 };
00688
00689 void dht_tracker::add_router_node(std::pair<std::string, int> const& node)
00690 {
00691 udp::resolver::query q(node.first, lexical_cast<std::string>(node.second));
00692 m_host_resolver.async_resolve(q, m_strand.wrap(
00693 bind(&dht_tracker::on_router_name_lookup, self(), _1, _2)));
00694 }
00695
00696 void dht_tracker::on_router_name_lookup(asio::error_code const& e
00697 , udp::resolver::iterator host) try
00698 {
00699 if (e || host == udp::resolver::iterator()) return;
00700 m_dht.add_router_node(host->endpoint());
00701 }
00702 catch (std::exception&)
00703 {
00704 assert(false);
00705 };
00706
00707 void dht_tracker::on_bootstrap()
00708 {}
00709
00710 void dht_tracker::send_packet(msg const& m)
00711 {
00712 using libtorrent::bencode;
00713 using libtorrent::entry;
00714 entry e(entry::dictionary_t);
00715 e["t"] = m.transaction_id;
00716 std::string version_str("LT ");
00717 std::string::iterator i = version_str.begin() + 2;
00718 detail::write_uint8(LIBTORRENT_VERSION_MAJOR, i);
00719 detail::write_uint8(LIBTORRENT_VERSION_MINOR, i);
00720 e["v"] = version_str;
00721
00722 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00723 TORRENT_LOG(dht_tracker) << microsec_clock::universal_time()
00724 << " SENDING [" << m.addr << "]:";
00725 TORRENT_LOG(dht_tracker) << " transaction: " << m.transaction_id;
00726
00727 #endif
00728
00729 if (m.message_id == messages::error)
00730 {
00731 assert(m.reply);
00732 e["y"] = "e";
00733 entry error_list(entry::list_t);
00734 error_list.list().push_back(entry(m.error_code));
00735 error_list.list().push_back(entry(m.error_msg));
00736 e["e"] = error_list;
00737 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00738 TORRENT_LOG(dht_tracker) << " error: " << m.error_code << " "
00739 << m.error_msg;
00740 #endif
00741 }
00742 else if (m.reply)
00743 {
00744 e["y"] = "r";
00745 e["r"] = entry(entry::dictionary_t);
00746 entry& r = e["r"];
00747 r["id"] = std::string(m.id.begin(), m.id.end());
00748
00749 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00750 TORRENT_LOG(dht_tracker) << " reply: "
00751 << messages::ids[m.message_id];
00752 #endif
00753
00754 if (m.write_token.type() != entry::undefined_t)
00755 r["token"] = m.write_token;
00756
00757 switch (m.message_id)
00758 {
00759 case messages::ping:
00760 break;
00761 case messages::find_node:
00762 {
00763 bool ipv6_nodes = false;
00764 r["nodes"] = entry(entry::string_t);
00765 entry& n = r["nodes"];
00766 std::back_insert_iterator<std::string> out(n.string());
00767 for (msg::nodes_t::const_iterator i = m.nodes.begin()
00768 , end(m.nodes.end()); i != end; ++i)
00769 {
00770 if (!i->addr.address().is_v4())
00771 {
00772 ipv6_nodes = true;
00773 continue;
00774 }
00775 std::copy(i->id.begin(), i->id.end(), out);
00776 write_endpoint(i->addr, out);
00777 }
00778
00779 if (ipv6_nodes)
00780 {
00781 r["nodes2"] = entry(entry::list_t);
00782 entry& p = r["nodes2"];
00783 std::string endpoint;
00784 endpoint.resize(6);
00785 for (msg::nodes_t::const_iterator i = m.nodes.begin()
00786 , end(m.nodes.end()); i != end; ++i)
00787 {
00788 std::string::iterator out = endpoint.begin();
00789 std::copy(i->id.begin(), i->id.end(), out);
00790 write_endpoint(i->addr, out);
00791 p.list().push_back(entry(endpoint));
00792 }
00793 }
00794 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00795 TORRENT_LOG(dht_tracker) << " nodes: " << m.nodes.size();
00796 #endif
00797 break;
00798 }
00799 case messages::get_peers:
00800 {
00801 if (m.peers.empty())
00802 {
00803 r["nodes"] = entry(entry::string_t);
00804 entry& n = r["nodes"];
00805 std::back_insert_iterator<std::string> out(n.string());
00806 for (msg::nodes_t::const_iterator i = m.nodes.begin()
00807 , end(m.nodes.end()); i != end; ++i)
00808 {
00809 if (!i->addr.address().is_v4()) continue;
00810 std::copy(i->id.begin(), i->id.end(), out);
00811 write_endpoint(i->addr, out);
00812 }
00813 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00814 TORRENT_LOG(dht_tracker) << " nodes: " << m.nodes.size();
00815 #endif
00816 }
00817 else
00818 {
00819 r["values"] = entry(entry::list_t);
00820 entry& p = r["values"];
00821 std::string endpoint;
00822 endpoint.resize(6);
00823 for (msg::peers_t::const_iterator i = m.peers.begin()
00824 , end(m.peers.end()); i != end; ++i)
00825 {
00826 std::string::iterator out = endpoint.begin();
00827 write_endpoint(*i, out);
00828 p.list().push_back(entry(endpoint));
00829 }
00830 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00831 TORRENT_LOG(dht_tracker) << " peers: " << m.peers.size();
00832 #endif
00833 }
00834 break;
00835 }
00836
00837 case messages::announce_peer:
00838 break;
00839 break;
00840 }
00841 }
00842 else
00843 {
00844 e["y"] = "q";
00845 e["a"] = entry(entry::dictionary_t);
00846 entry& a = e["a"];
00847 a["id"] = std::string(m.id.begin(), m.id.end());
00848
00849 if (m.write_token.type() != entry::undefined_t)
00850 a["token"] = m.write_token;
00851 assert(m.message_id <= messages::error);
00852 e["q"] = messages::ids[m.message_id];
00853
00854 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00855 TORRENT_LOG(dht_tracker) << " query: "
00856 << messages::ids[m.message_id];
00857 #endif
00858
00859 switch (m.message_id)
00860 {
00861 case messages::find_node:
00862 {
00863 a["target"] = std::string(m.info_hash.begin(), m.info_hash.end());
00864 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00865 TORRENT_LOG(dht_tracker) << " target: "
00866 << boost::lexical_cast<std::string>(m.info_hash);
00867 #endif
00868 break;
00869 }
00870 case messages::get_peers:
00871 {
00872 a["info_hash"] = std::string(m.info_hash.begin(), m.info_hash.end());
00873 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00874 TORRENT_LOG(dht_tracker) << " info_hash: "
00875 << boost::lexical_cast<std::string>(m.info_hash);
00876 #endif
00877 break;
00878 }
00879 case messages::announce_peer:
00880 a["port"] = m_settings.service_port;
00881 a["info_hash"] = std::string(m.info_hash.begin(), m.info_hash.end());
00882 a["token"] = m.write_token;
00883 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00884 TORRENT_LOG(dht_tracker) << " port: "
00885 << m_settings.service_port
00886 << " info_hash: " << boost::lexical_cast<std::string>(m.info_hash);
00887 #endif
00888 break;
00889 default: break;
00890 }
00891
00892 }
00893
00894 m_send_buf.clear();
00895 bencode(std::back_inserter(m_send_buf), e);
00896 m_socket.send_to(asio::buffer(&m_send_buf[0]
00897 , (int)m_send_buf.size()), m.addr);
00898
00899 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00900 m_total_out_bytes += m_send_buf.size();
00901
00902 if (m.reply)
00903 {
00904 ++m_replies_sent[m.message_id];
00905 m_replies_bytes_sent[m.message_id] += int(m_send_buf.size());
00906 }
00907 else
00908 {
00909 m_queries_out_bytes += m_send_buf.size();
00910 }
00911 TORRENT_LOG(dht_tracker) << e;
00912 #endif
00913
00914 if (!m.piggy_backed_ping) return;
00915
00916 msg pm;
00917 pm.reply = false;
00918 pm.piggy_backed_ping = false;
00919 pm.message_id = messages::ping;
00920 pm.transaction_id = m.ping_transaction_id;
00921 pm.id = m.id;
00922 pm.addr = m.addr;
00923
00924 send_packet(pm);
00925 }
00926
00927 }}
00928