00001
00002
00003
00004
00005
00006
00007
00008 #include <boost/date_time/posix_time/posix_time_types.hpp>
00009 #include <boost/date_time/posix_time/ptime.hpp>
00010 #include <boost/bind.hpp>
00011
00012 #include <libtorrent/io.hpp>
00013 #include <libtorrent/invariant_check.hpp>
00014 #include <libtorrent/kademlia/rpc_manager.hpp>
00015 #include <libtorrent/kademlia/logging.hpp>
00016 #include <libtorrent/kademlia/routing_table.hpp>
00017 #include <libtorrent/hasher.hpp>
00018
00019 #include <fstream>
00020
00021 using boost::posix_time::ptime;
00022 using boost::posix_time::time_duration;
00023 using boost::posix_time::microsec_clock;
00024 using boost::posix_time::seconds;
00025 using boost::posix_time::milliseconds;
00026 using boost::shared_ptr;
00027 using boost::bind;
00028
00029 namespace libtorrent { namespace dht
00030 {
00031
00032 namespace io = libtorrent::detail;
00033
00034 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00035 TORRENT_DEFINE_LOG(rpc)
00036 #endif
00037
00038 node_id generate_id();
00039
00040 rpc_manager::rpc_manager(fun const& f, node_id const& our_id
00041 , routing_table& table, send_fun const& sf)
00042 : m_next_transaction_id(rand() % max_transactions)
00043 , m_oldest_transaction_id(m_next_transaction_id)
00044 , m_incoming(f)
00045 , m_send(sf)
00046 , m_our_id(our_id)
00047 , m_table(table)
00048 , m_timer(boost::posix_time::microsec_clock::universal_time())
00049 , m_random_number(generate_id())
00050 , m_destructing(false)
00051 {
00052 std::srand(time(0));
00053 }
00054
00055 rpc_manager::~rpc_manager()
00056 {
00057 m_destructing = true;
00058 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00059 TORRENT_LOG(rpc) << "Destructing";
00060 #endif
00061 std::for_each(m_aborted_transactions.begin(), m_aborted_transactions.end()
00062 , bind(&observer::abort, _1));
00063
00064 for (transactions_t::iterator i = m_transactions.begin()
00065 , end(m_transactions.end()); i != end; ++i)
00066 {
00067 if (*i) (*i)->abort();
00068 }
00069 }
00070
00071 #ifndef NDEBUG
00072 void rpc_manager::check_invariant() const
00073 {
00074 assert(m_oldest_transaction_id >= 0);
00075 assert(m_oldest_transaction_id < max_transactions);
00076 assert(m_next_transaction_id >= 0);
00077 assert(m_next_transaction_id < max_transactions);
00078 assert(!m_transactions[m_next_transaction_id]);
00079
00080 for (int i = (m_next_transaction_id + 1) % max_transactions;
00081 i != m_oldest_transaction_id; i = (i + 1) % max_transactions)
00082 {
00083 assert(!m_transactions[i]);
00084 }
00085 }
00086 #endif
00087
00088 bool rpc_manager::incoming(msg const& m)
00089 {
00090 INVARIANT_CHECK;
00091
00092 if (m_destructing) return false;
00093
00094 if (m.reply)
00095 {
00096
00097
00098
00099 if (m.transaction_id.size() != 2)
00100 {
00101 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00102 TORRENT_LOG(rpc) << "Reply with invalid transaction id size: "
00103 << m.transaction_id.size() << " from " << m.addr;
00104 #endif
00105 return false;
00106 }
00107
00108 std::string::const_iterator i = m.transaction_id.begin();
00109 int tid = io::read_uint16(i);
00110
00111 if (tid >= (int)m_transactions.size()
00112 || tid < 0)
00113 {
00114 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00115 TORRENT_LOG(rpc) << "Reply with unknown transaction id: "
00116 << tid << " from " << m.addr;
00117 #endif
00118 return false;
00119 }
00120
00121 boost::shared_ptr<observer> o = m_transactions[tid];
00122
00123 if (!o)
00124 {
00125 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00126 TORRENT_LOG(rpc) << "Reply with unknown transaction id: "
00127 << tid << " from " << m.addr;
00128 #endif
00129 return false;
00130 }
00131
00132 if (m.addr != o->target_addr)
00133 {
00134 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00135 TORRENT_LOG(rpc) << "Reply with incorrect address and valid transaction id: "
00136 << tid << " from " << m.addr;
00137 #endif
00138 return false;
00139 }
00140
00141 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00142 std::ofstream reply_stats("libtorrent_logs/round_trip_ms.log", std::ios::app);
00143 reply_stats << m.addr << "\t" << (microsec_clock::universal_time()
00144 - o->sent).total_milliseconds() << std::endl;
00145 #endif
00146 o->reply(m);
00147 m_transactions[tid].reset();
00148
00149 if (m.piggy_backed_ping)
00150 {
00151
00152
00153 msg ph;
00154 ph.message_id = messages::ping;
00155 ph.transaction_id = m.ping_transaction_id;
00156 ph.id = m_our_id;
00157 ph.addr = m.addr;
00158
00159 msg empty;
00160
00161 reply(empty, ph);
00162 }
00163 return m_table.node_seen(m.id, m.addr);
00164 }
00165 else
00166 {
00167
00168 m_incoming(m);
00169 }
00170 return false;
00171 }
00172
00173 time_duration rpc_manager::tick()
00174 {
00175 INVARIANT_CHECK;
00176
00177 using boost::posix_time::microsec_clock;
00178
00179 const int timeout_ms = 10 * 1000;
00180
00181
00182
00183 if (m_next_transaction_id == m_oldest_transaction_id) return milliseconds(timeout_ms);
00184
00185 std::vector<shared_ptr<observer> > timeouts;
00186
00187 for (;m_next_transaction_id != m_oldest_transaction_id;
00188 m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions)
00189 {
00190 assert(m_oldest_transaction_id >= 0);
00191 assert(m_oldest_transaction_id < max_transactions);
00192
00193 boost::shared_ptr<observer> o = m_transactions[m_oldest_transaction_id];
00194 if (!o) continue;
00195
00196 time_duration diff = o->sent + milliseconds(timeout_ms)
00197 - microsec_clock::universal_time();
00198 if (diff > seconds(0))
00199 {
00200 if (diff < seconds(1)) return seconds(1);
00201 return diff;
00202 }
00203
00204 try
00205 {
00206 m_transactions[m_oldest_transaction_id].reset();
00207 timeouts.push_back(o);
00208 } catch (std::exception) {}
00209 }
00210
00211 std::for_each(timeouts.begin(), timeouts.end(), bind(&observer::timeout, _1));
00212 timeouts.clear();
00213
00214
00215
00216
00217 std::vector<shared_ptr<observer> >().swap(m_aborted_transactions);
00218 return milliseconds(timeout_ms);
00219 }
00220
00221 unsigned int rpc_manager::new_transaction_id(shared_ptr<observer> o)
00222 {
00223 INVARIANT_CHECK;
00224
00225 unsigned int tid = m_next_transaction_id;
00226 m_next_transaction_id = (m_next_transaction_id + 1) % max_transactions;
00227 if (m_transactions[m_next_transaction_id])
00228 {
00229
00230
00231
00232 m_aborted_transactions.push_back(m_transactions[m_next_transaction_id]);
00233 m_transactions[m_next_transaction_id].reset();
00234 assert(m_oldest_transaction_id == m_next_transaction_id);
00235 }
00236 assert(!m_transactions[tid]);
00237 m_transactions[tid] = o;
00238 if (m_oldest_transaction_id == m_next_transaction_id)
00239 {
00240 m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions;
00241 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00242 TORRENT_LOG(rpc) << "WARNING: transaction limit reached! Too many concurrent"
00243 " messages! limit: " << (int)max_transactions;
00244 #endif
00245 update_oldest_transaction_id();
00246 }
00247
00248 return tid;
00249 }
00250
00251 void rpc_manager::update_oldest_transaction_id()
00252 {
00253 INVARIANT_CHECK;
00254
00255 assert(m_oldest_transaction_id != m_next_transaction_id);
00256 while (!m_transactions[m_oldest_transaction_id])
00257 {
00258 m_oldest_transaction_id = (m_oldest_transaction_id + 1)
00259 % max_transactions;
00260 if (m_oldest_transaction_id == m_next_transaction_id)
00261 break;
00262 }
00263 }
00264
00265 void rpc_manager::invoke(int message_id, udp::endpoint target_addr
00266 , shared_ptr<observer> o)
00267 {
00268 INVARIANT_CHECK;
00269
00270 if (m_destructing)
00271 {
00272 o->abort();
00273 return;
00274 }
00275
00276 msg m;
00277 m.message_id = message_id;
00278 m.reply = false;
00279 m.id = m_our_id;
00280 m.addr = target_addr;
00281 assert(!m_transactions[m_next_transaction_id]);
00282 #ifndef NDEBUG
00283 int potential_new_id = m_next_transaction_id;
00284 #endif
00285 try
00286 {
00287 m.transaction_id.clear();
00288 std::back_insert_iterator<std::string> out(m.transaction_id);
00289 io::write_uint16(m_next_transaction_id, out);
00290
00291 o->send(m);
00292
00293 o->sent = boost::posix_time::microsec_clock::universal_time();
00294 o->target_addr = target_addr;
00295
00296 #ifdef TORRENT_DHT_VERBOSE_LOGGING
00297 TORRENT_LOG(rpc) << "Invoking " << messages::ids[message_id]
00298 << " -> " << target_addr;
00299 #endif
00300 m_send(m);
00301 new_transaction_id(o);
00302 }
00303 catch (std::exception& e)
00304 {
00305
00306 assert(potential_new_id == m_next_transaction_id);
00307 o->abort();
00308 }
00309 }
00310
00311 void rpc_manager::reply(msg& m, msg const& reply_to)
00312 {
00313 INVARIANT_CHECK;
00314
00315 if (m_destructing) return;
00316
00317 if (m.message_id != messages::error)
00318 m.message_id = reply_to.message_id;
00319 m.addr = reply_to.addr;
00320 m.reply = true;
00321 m.piggy_backed_ping = false;
00322 m.id = m_our_id;
00323 m.transaction_id = reply_to.transaction_id;
00324
00325 m_send(m);
00326 }
00327
00328 namespace
00329 {
00330 struct dummy_observer : observer
00331 {
00332 virtual void reply(msg const&) {}
00333 virtual void timeout() {}
00334 virtual void send(msg&) {}
00335 void abort() {}
00336 };
00337 }
00338
00339 void rpc_manager::reply_with_ping(msg& m, msg const& reply_to)
00340 {
00341 INVARIANT_CHECK;
00342
00343 if (m_destructing) return;
00344
00345 if (m.message_id != messages::error)
00346 m.message_id = reply_to.message_id;
00347 m.addr = reply_to.addr;
00348 m.reply = true;
00349 m.piggy_backed_ping = true;
00350 m.id = m_our_id;
00351 m.transaction_id = reply_to.transaction_id;
00352
00353 try
00354 {
00355 m.ping_transaction_id.clear();
00356 std::back_insert_iterator<std::string> out(m.ping_transaction_id);
00357 io::write_uint16(m_next_transaction_id, out);
00358
00359 boost::shared_ptr<observer> o(new dummy_observer);
00360 assert(!m_transactions[m_next_transaction_id]);
00361 o->sent = boost::posix_time::microsec_clock::universal_time();
00362 o->target_addr = m.addr;
00363
00364 m_send(m);
00365 new_transaction_id(o);
00366 }
00367 catch (std::exception& e)
00368 {
00369
00370 }
00371 }
00372
00373 } }
00374