00001
00002
00003
00004
00005
00006
00007
00008 #include <utility>
00009 #include <boost/bind.hpp>
00010 #include <boost/optional.hpp>
00011 #include <boost/function.hpp>
00012 #include <boost/iterator_adaptors.hpp>
00013
00014 #include "libtorrent/io.hpp"
00015 #include "libtorrent/hasher.hpp"
00016 #include "libtorrent/random_sample.hpp"
00017 #include "libtorrent/kademlia/node_id.hpp"
00018 #include "libtorrent/kademlia/rpc_manager.hpp"
00019 #include "libtorrent/kademlia/packet_iterator.hpp"
00020 #include "libtorrent/kademlia/routing_table.hpp"
00021 #include "libtorrent/kademlia/node.hpp"
00022
00023 #include "libtorrent/kademlia/refresh.hpp"
00024 #include "libtorrent/kademlia/closest_nodes.hpp"
00025 #include "libtorrent/kademlia/find_data.hpp"
00026
00027 using boost::bind;
00028 using boost::posix_time::second_clock;
00029 using boost::posix_time::seconds;
00030 using boost::posix_time::minutes;
00031 using boost::posix_time::ptime;
00032 using boost::posix_time::time_duration;
00033
00034 namespace libtorrent { namespace dht
00035 {
00036
00037 #ifdef _MSC_VER
00038 namespace
00039 {
00040 char rand() { return (char)std::rand(); }
00041 }
00042 #endif
00043
00044 typedef boost::shared_ptr<observer> observer_ptr;
00045
00046 enum { announce_interval = 30 };
00047
00048 using asio::ip::udp;
00049
00050 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00051 TORRENT_DEFINE_LOG(node)
00052 #endif
00053
00054 node_id generate_id()
00055 {
00056 char random[20];
00057 std::srand(std::time(0));
00058 #ifdef _MSC_VER
00059 std::generate(random, random + 20, &rand);
00060 #else
00061 std::generate(random, random + 20, &std::rand);
00062 #endif
00063
00064 hasher h;
00065 h.update(random, 20);
00066 return h.final();
00067 }
00068
00069 void purge_peers(std::set<peer_entry>& peers)
00070 {
00071 for (std::set<peer_entry>::iterator i = peers.begin()
00072 , end(peers.end()); i != end;)
00073 {
00074
00075 if (i->added + minutes(int(announce_interval * 1.5f)) < second_clock::universal_time())
00076 {
00077 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00078 TORRENT_LOG(node) << "peer timed out at: " << i->addr.address();
00079 #endif
00080 peers.erase(i++);
00081 }
00082 else
00083 ++i;
00084 }
00085 }
00086
00087 void nop() {}
00088
00089 node_impl::node_impl(boost::function<void(msg const&)> const& f
00090 , dht_settings const& settings, boost::optional<node_id> node_id)
00091 : m_settings(settings)
00092 , m_id(node_id ? *node_id : generate_id())
00093 , m_table(m_id, 8, settings)
00094 , m_rpc(bind(&node_impl::incoming_request, this, _1)
00095 , m_id, m_table, f)
00096 , m_last_tracker_tick(boost::posix_time::second_clock::universal_time())
00097 {
00098 m_secret[0] = std::rand();
00099 m_secret[1] = std::rand();
00100 }
00101
00102 bool node_impl::verify_token(msg const& m)
00103 {
00104 if (m.write_token.type() != entry::string_t)
00105 return false;
00106 std::string const& token = m.write_token.string();
00107 if (token.length() != 4) return false;
00108
00109 hasher h1;
00110 std::string address = m.addr.address().to_string();
00111 h1.update(&address[0], address.length());
00112 h1.update((char*)&m_secret[0], sizeof(m_secret[0]));
00113 h1.update((char*)&m.info_hash[0], sha1_hash::size);
00114
00115 sha1_hash h = h1.final();
00116 if (std::equal(token.begin(), token.end(), (signed char*)&h[0]))
00117 return true;
00118
00119 hasher h2;
00120 h2.update(&address[0], address.length());
00121 h2.update((char*)&m_secret[1], sizeof(m_secret[1]));
00122 h = h2.final();
00123 if (std::equal(token.begin(), token.end(), (signed char*)&h[0]))
00124 return true;
00125 return false;
00126 }
00127
00128 entry node_impl::generate_token(msg const& m)
00129 {
00130 std::string token;
00131 token.resize(4);
00132 hasher h;
00133 std::string address = m.addr.address().to_string();
00134 h.update(&address[0], address.length());
00135 h.update((char*)&m_secret[0], sizeof(m_secret[0]));
00136 h.update((char*)&m.info_hash[0], sha1_hash::size);
00137
00138 sha1_hash hash = h.final();
00139 std::copy(hash.begin(), hash.begin() + 4, (signed char*)&token[0]);
00140 return entry(token);
00141 }
00142
00143 void node_impl::refresh(node_id const& id
00144 , boost::function0<void> f)
00145 {
00146
00147
00148 std::vector<node_entry> start;
00149 start.reserve(m_table.bucket_size());
00150 m_table.find_node(id, start, false);
00151 refresh::initiate(id, m_settings.search_branching, 10, m_table.bucket_size()
00152 , m_table, start.begin(), start.end(), m_rpc, f);
00153 }
00154
00155 void node_impl::bootstrap(std::vector<udp::endpoint> const& nodes
00156 , boost::function0<void> f)
00157 {
00158 std::vector<node_entry> start;
00159 start.reserve(nodes.size());
00160 std::copy(nodes.begin(), nodes.end(), std::back_inserter(start));
00161 refresh::initiate(m_id, m_settings.search_branching, 10, m_table.bucket_size()
00162 , m_table, start.begin(), start.end(), m_rpc, f);
00163 }
00164
00165 void node_impl::refresh()
00166 {
00167 std::vector<node_entry> start;
00168 start.reserve(m_table.size().get<0>());
00169 std::copy(m_table.begin(), m_table.end(), std::back_inserter(start));
00170
00171 refresh::initiate(m_id, m_settings.search_branching, 10, m_table.bucket_size()
00172 , m_table, start.begin(), start.end(), m_rpc, bind(&nop));
00173 }
00174
00175 int node_impl::bucket_size(int bucket)
00176 {
00177 return m_table.bucket_size(bucket);
00178 }
00179
00180 void node_impl::new_write_key()
00181 {
00182 m_secret[1] = m_secret[0];
00183 m_secret[0] = std::rand();
00184 }
00185
00186 void node_impl::refresh_bucket(int bucket) try
00187 {
00188 assert(bucket >= 0 && bucket < 160);
00189
00190
00191 node_id target = generate_id();
00192 int num_bits = 160 - bucket;
00193 node_id mask(0);
00194 for (int i = 0; i < num_bits; ++i)
00195 {
00196 int byte = i / 8;
00197 mask[byte] |= 0x80 >> (i % 8);
00198 }
00199
00200 node_id root = m_id;
00201 root &= mask;
00202 target &= ~mask;
00203 target |= root;
00204
00205
00206
00207
00208 target[(num_bits - 1) / 8] &= ~(0x80 >> ((num_bits - 1) % 8));
00209 target[(num_bits - 1) / 8] |=
00210 (~(m_id[(num_bits - 1) / 8])) & (0x80 >> ((num_bits - 1) % 8));
00211
00212 assert(distance_exp(m_id, target) == bucket);
00213
00214 std::vector<node_entry> start;
00215 start.reserve(m_table.bucket_size());
00216 m_table.find_node(target, start, false, m_table.bucket_size());
00217
00218 refresh::initiate(target, m_settings.search_branching, 10, m_table.bucket_size()
00219 , m_table, start.begin(), start.end(), m_rpc, bind(&nop));
00220 m_table.touch_bucket(bucket);
00221 }
00222 catch (std::exception&) {}
00223
00224 void node_impl::incoming(msg const& m)
00225 {
00226 if (m_rpc.incoming(m))
00227 {
00228 refresh();
00229 }
00230 }
00231
00232 namespace
00233 {
00234
00235 class announce_observer : public observer
00236 {
00237 public:
00238 announce_observer(sha1_hash const& info_hash, int listen_port
00239 , entry const& write_token)
00240 : m_info_hash(info_hash)
00241 , m_listen_port(listen_port)
00242 , m_token(write_token)
00243 {}
00244
00245 void send(msg& m)
00246 {
00247 m.port = m_listen_port;
00248 m.info_hash = m_info_hash;
00249 m.write_token = m_token;
00250 }
00251
00252 void timeout() {}
00253 void reply(msg const&) {}
00254 void abort() {}
00255
00256 private:
00257 sha1_hash m_info_hash;
00258 int m_listen_port;
00259 entry m_token;
00260 };
00261
00262 class get_peers_observer : public observer
00263 {
00264 public:
00265 get_peers_observer(sha1_hash const& info_hash, int listen_port
00266 , rpc_manager& rpc
00267 , boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> f)
00268 : m_info_hash(info_hash)
00269 , m_listen_port(listen_port)
00270 , m_rpc(rpc)
00271 , m_fun(f)
00272 {}
00273
00274 void send(msg& m)
00275 {
00276 m.port = m_listen_port;
00277 m.info_hash = m_info_hash;
00278 }
00279
00280 void timeout() {}
00281 void reply(msg const& r)
00282 {
00283 m_rpc.invoke(messages::announce_peer, r.addr
00284 , boost::shared_ptr<observer>(
00285 new announce_observer(m_info_hash, m_listen_port, r.write_token)));
00286 m_fun(r.peers, m_info_hash);
00287 }
00288 void abort() {}
00289
00290 private:
00291 sha1_hash m_info_hash;
00292 int m_listen_port;
00293 rpc_manager& m_rpc;
00294 boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> m_fun;
00295 };
00296
00297 void announce_fun(std::vector<node_entry> const& v, rpc_manager& rpc
00298 , int listen_port, sha1_hash const& ih
00299 , boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> f)
00300 {
00301 bool nodes = false;
00302
00303 for (std::vector<node_entry>::const_iterator i = v.begin()
00304 , end(v.end()); i != end; ++i)
00305 {
00306 rpc.invoke(messages::get_peers, i->addr, boost::shared_ptr<observer>(
00307 new get_peers_observer(ih, listen_port, rpc, f)));
00308 nodes = true;
00309 }
00310 }
00311
00312 }
00313
00314 namespace
00315 {
00316 struct dummy_observer : observer
00317 {
00318 virtual void reply(msg const&) {}
00319 virtual void timeout() {}
00320 virtual void send(msg&) {}
00321 virtual void abort() {}
00322 };
00323 }
00324
00325 void node_impl::add_router_node(udp::endpoint router)
00326 {
00327 m_table.add_router_node(router);
00328 }
00329
00330 void node_impl::add_node(udp::endpoint node)
00331 {
00332
00333
00334 observer_ptr p(new dummy_observer());
00335 m_rpc.invoke(messages::ping, node, p);
00336 }
00337
00338 void node_impl::announce(sha1_hash const& info_hash, int listen_port
00339 , boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> f)
00340 {
00341
00342
00343 closest_nodes::initiate(info_hash, m_settings.search_branching
00344 , m_table.bucket_size(), m_table, m_rpc
00345 , boost::bind(&announce_fun, _1, boost::ref(m_rpc), listen_port
00346 , info_hash, f));
00347 }
00348
00349 time_duration node_impl::refresh_timeout()
00350 {
00351 int refresh = -1;
00352 ptime now = second_clock::universal_time();
00353 ptime next = now + minutes(15);
00354 try
00355 {
00356 for (int i = 0; i < 160; ++i)
00357 {
00358 ptime r = m_table.next_refresh(i);
00359 if (r <= now)
00360 {
00361 if (refresh == -1) refresh = i;
00362 }
00363 else if (r < next)
00364 {
00365 next = r;
00366 }
00367 }
00368 if (refresh != -1)
00369 {
00370 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00371 TORRENT_LOG(node) << "refreshing bucket: " << refresh;
00372 #endif
00373 refresh_bucket(refresh);
00374 }
00375 }
00376 catch (std::exception&) {}
00377
00378 if (next < now + seconds(5)) return seconds(5);
00379 return next - now;
00380 }
00381
00382 time_duration node_impl::connection_timeout()
00383 {
00384 time_duration d = m_rpc.tick();
00385 try
00386 {
00387 ptime now(second_clock::universal_time());
00388 if (now - m_last_tracker_tick < minutes(10)) return d;
00389 m_last_tracker_tick = now;
00390
00391
00392 for (data_iterator i = begin_data(), end(end_data()); i != end;)
00393 {
00394 torrent_entry& t = i->second;
00395 node_id const& key = i->first;
00396 ++i;
00397 purge_peers(t.peers);
00398
00399
00400 if (t.peers.empty())
00401 {
00402 table_t::iterator i = m_map.find(key);
00403 if (i != m_map.end()) m_map.erase(i);
00404 }
00405 }
00406 }
00407 catch (std::exception&) {}
00408
00409 return d;
00410 }
00411
00412 void node_impl::on_announce(msg const& m, msg& reply)
00413 {
00414 if (!verify_token(m))
00415 {
00416 reply.message_id = messages::error;
00417 reply.error_code = 203;
00418 reply.error_msg = "Incorrect write token in announce_peer message";
00419 return;
00420 }
00421
00422
00423
00424
00425 m_table.node_seen(m.id, m.addr);
00426
00427 torrent_entry& v = m_map[m.info_hash];
00428 peer_entry e;
00429 e.addr = tcp::endpoint(m.addr.address(), m.addr.port());
00430 e.added = second_clock::universal_time();
00431 std::set<peer_entry>::iterator i = v.peers.find(e);
00432 if (i != v.peers.end()) v.peers.erase(i++);
00433 v.peers.insert(i, e);
00434 }
00435
00436 namespace
00437 {
00438 tcp::endpoint get_endpoint(peer_entry const& p)
00439 {
00440 return p.addr;
00441 }
00442 }
00443
00444 bool node_impl::on_find(msg const& m, std::vector<tcp::endpoint>& peers) const
00445 {
00446 table_t::const_iterator i = m_map.find(m.info_hash);
00447 if (i == m_map.end()) return false;
00448
00449 torrent_entry const& v = i->second;
00450
00451 int num = (std::min)((int)v.peers.size(), m_settings.max_peers_reply);
00452 peers.clear();
00453 peers.reserve(num);
00454 random_sample_n(boost::make_transform_iterator(v.peers.begin(), &get_endpoint)
00455 , boost::make_transform_iterator(v.peers.end(), &get_endpoint)
00456 , std::back_inserter(peers), num);
00457
00458 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00459 for (std::vector<tcp::endpoint>::iterator i = peers.begin()
00460 , end(peers.end()); i != end; ++i)
00461 {
00462 TORRENT_LOG(node) << " " << *i;
00463 }
00464 #endif
00465 return true;
00466 }
00467
00468 void node_impl::incoming_request(msg const& m)
00469 {
00470 msg reply;
00471 switch (m.message_id)
00472 {
00473 case messages::ping:
00474 break;
00475 case messages::get_peers:
00476 {
00477 reply.info_hash = m.info_hash;
00478 reply.write_token = generate_token(m);
00479
00480 if (!on_find(m, reply.peers))
00481 {
00482
00483
00484 m_table.find_node(m.info_hash, reply.nodes, false);
00485 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00486 for (std::vector<node_entry>::iterator i = reply.nodes.begin()
00487 , end(reply.nodes.end()); i != end; ++i)
00488 {
00489 TORRENT_LOG(node) << " " << i->id << " " << i->addr;
00490 }
00491 #endif
00492 }
00493 }
00494 break;
00495 case messages::find_node:
00496 {
00497 reply.info_hash = m.info_hash;
00498
00499 m_table.find_node(m.info_hash, reply.nodes, false);
00500 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00501 for (std::vector<node_entry>::iterator i = reply.nodes.begin()
00502 , end(reply.nodes.end()); i != end; ++i)
00503 {
00504 TORRENT_LOG(node) << " " << i->id << " " << i->addr;
00505 }
00506 #endif
00507 }
00508 break;
00509 case messages::announce_peer:
00510 {
00511 on_announce(m, reply);
00512 }
00513 break;
00514 };
00515
00516 if (m_table.need_node(m.id))
00517 m_rpc.reply_with_ping(reply, m);
00518 else
00519 m_rpc.reply(reply, m);
00520 }
00521
00522 } }