00001
00002
00003
00004
00005
00006
00007
00008 #include <libtorrent/kademlia/refresh.hpp>
00009 #include <libtorrent/kademlia/routing_table.hpp>
00010 #include <libtorrent/kademlia/rpc_manager.hpp>
00011 #include <libtorrent/kademlia/logging.hpp>
00012
00013 #include <libtorrent/io.hpp>
00014
00015 #include <boost/bind.hpp>
00016
00017 using boost::bind;
00018
00019 namespace libtorrent { namespace dht
00020 {
00021
00022 using asio::ip::udp;
00023
00024 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00025 TORRENT_DEFINE_LOG(refresh)
00026 #endif
00027
00028 typedef boost::shared_ptr<observer> observer_ptr;
00029
00030 class refresh_observer : public observer
00031 {
00032 public:
00033 refresh_observer(
00034 boost::intrusive_ptr<refresh> const& algorithm
00035 , node_id self
00036 , node_id target
00037 )
00038 : m_target(target)
00039 , m_self(self)
00040 , m_algorithm(algorithm)
00041 {}
00042 ~refresh_observer();
00043
00044 void send(msg& m)
00045 {
00046 m.info_hash = m_target;
00047 }
00048
00049 void timeout();
00050 void reply(msg const& m);
00051 void abort() { m_algorithm = 0; }
00052
00053 private:
00054 node_id const m_target;
00055 node_id const m_self;
00056 boost::intrusive_ptr<refresh> m_algorithm;
00057 };
00058
00059 refresh_observer::~refresh_observer()
00060 {
00061 if (m_algorithm) m_algorithm->failed(m_self, true);
00062 }
00063
00064 void refresh_observer::reply(msg const& in)
00065 {
00066 if (!m_algorithm) return;
00067
00068 if (!in.nodes.empty())
00069 {
00070 for (msg::nodes_t::const_iterator i = in.nodes.begin()
00071 , end(in.nodes.end()); i != end; ++i)
00072 {
00073 m_algorithm->traverse(i->id, i->addr);
00074 }
00075 }
00076 m_algorithm->finished(m_self);
00077 m_algorithm = 0;
00078 }
00079
00080 void refresh_observer::timeout()
00081 {
00082 if (!m_algorithm) return;
00083 m_algorithm->failed(m_self);
00084 m_algorithm = 0;
00085 }
00086
00087 class ping_observer : public observer
00088 {
00089 public:
00090 ping_observer(
00091 boost::intrusive_ptr<refresh> const& algorithm
00092 , node_id self
00093 )
00094 : m_self(self)
00095 , m_algorithm(algorithm)
00096 {}
00097 ~ping_observer();
00098
00099 void send(msg& p) {}
00100 void timeout();
00101 void reply(msg const& m);
00102 void abort() { m_algorithm = 0; }
00103
00104 private:
00105 node_id const m_self;
00106 boost::intrusive_ptr<refresh> m_algorithm;
00107 };
00108
00109 ping_observer::~ping_observer()
00110 {
00111 if (m_algorithm) m_algorithm->ping_timeout(m_self, true);
00112 }
00113
00114 void ping_observer::reply(msg const& m)
00115 {
00116 if (!m_algorithm) return;
00117
00118 m_algorithm->ping_reply(m_self);
00119 m_algorithm = 0;
00120 }
00121
00122 void ping_observer::timeout()
00123 {
00124 if (!m_algorithm) return;
00125 m_algorithm->ping_timeout(m_self);
00126 m_algorithm = 0;
00127 }
00128
00129 void refresh::invoke(node_id const& nid, udp::endpoint addr)
00130 {
00131 observer_ptr p(new refresh_observer(
00132 this
00133 , nid
00134 , m_target
00135 ));
00136
00137 m_rpc.invoke(messages::find_node, addr, p);
00138 }
00139
00140 void refresh::done()
00141 {
00142 m_leftover_nodes_iterator = (int)m_results.size() > m_max_results ?
00143 m_results.begin() + m_max_results : m_results.end();
00144
00145 invoke_pings_or_finish();
00146 }
00147
00148 void refresh::ping_reply(node_id nid)
00149 {
00150 m_active_pings--;
00151 invoke_pings_or_finish();
00152 }
00153
00154 void refresh::ping_timeout(node_id nid, bool prevent_request)
00155 {
00156 m_active_pings--;
00157 invoke_pings_or_finish(prevent_request);
00158 }
00159
00160 void refresh::invoke_pings_or_finish(bool prevent_request)
00161 {
00162 if (prevent_request)
00163 {
00164 --m_max_active_pings;
00165 if (m_max_active_pings <= 0)
00166 m_max_active_pings = 1;
00167 }
00168 else
00169 {
00170 while (m_active_pings < m_max_active_pings)
00171 {
00172 if (m_leftover_nodes_iterator == m_results.end()) break;
00173
00174 result const& node = *m_leftover_nodes_iterator;
00175
00176
00177 if (node.flags & result::initial)
00178 {
00179 ++m_leftover_nodes_iterator;
00180 continue;
00181 }
00182
00183 try
00184 {
00185 observer_ptr p(new ping_observer(this, node.id));
00186 m_rpc.invoke(messages::ping, node.addr, p);
00187 ++m_active_pings;
00188 ++m_leftover_nodes_iterator;
00189 }
00190 catch (std::exception& e) {}
00191 }
00192 }
00193
00194 if (m_active_pings == 0)
00195 {
00196 m_done_callback();
00197 }
00198 }
00199
00200 } }
00201