00001
00002
00003
00004
00005
00006
00007
00008 #include <vector>
00009 #include <iostream>
00010 #include <iomanip>
00011 #include <limits>
00012 #include <boost/bind.hpp>
00013
00014 #include "libtorrent/peer_connection.hpp"
00015 #include "libtorrent/identify_client.hpp"
00016 #include "libtorrent/entry.hpp"
00017 #include "libtorrent/bencode.hpp"
00018 #include "libtorrent/alert_types.hpp"
00019 #include "libtorrent/invariant_check.hpp"
00020 #include "libtorrent/io.hpp"
00021 #include "libtorrent/file.hpp"
00022 #include "libtorrent/version.hpp"
00023 #include "libtorrent/extensions.hpp"
00024 #include "libtorrent/aux_/session_impl.hpp"
00025
00026 using namespace boost::posix_time;
00027 using boost::bind;
00028 using boost::shared_ptr;
00029 using libtorrent::aux::session_impl;
00030
00031 namespace libtorrent
00032 {
00033
00034 void intrusive_ptr_add_ref(peer_connection const* c)
00035 {
00036 assert(c->m_refs >= 0);
00037 assert(c != 0);
00038 ++c->m_refs;
00039 }
00040
00041 void intrusive_ptr_release(peer_connection const* c)
00042 {
00043 assert(c->m_refs > 0);
00044 assert(c != 0);
00045 if (--c->m_refs == 0)
00046 delete c;
00047 }
00048
00049 peer_connection::peer_connection(
00050 session_impl& ses
00051 , boost::weak_ptr<torrent> tor
00052 , shared_ptr<stream_socket> s
00053 , tcp::endpoint const& remote
00054 , tcp::endpoint const& proxy)
00055 :
00056 #ifndef NDEBUG
00057 m_last_choke(boost::posix_time::second_clock::universal_time()
00058 - hours(1))
00059 ,
00060 #endif
00061 m_ses(ses)
00062 , m_max_out_request_queue(m_ses.settings().max_out_request_queue)
00063 , m_timeout(m_ses.settings().peer_timeout)
00064 , m_last_piece(second_clock::universal_time())
00065 , m_packet_size(0)
00066 , m_recv_pos(0)
00067 , m_current_send_buffer(0)
00068 , m_write_pos(0)
00069 , m_last_receive(second_clock::universal_time())
00070 , m_last_sent(second_clock::universal_time())
00071 , m_socket(s)
00072 , m_remote(remote)
00073 , m_remote_proxy(proxy)
00074 , m_torrent(tor)
00075 , m_active(true)
00076 , m_peer_interested(false)
00077 , m_peer_choked(true)
00078 , m_interesting(false)
00079 , m_choked(true)
00080 , m_failed(false)
00081 , m_num_pieces(0)
00082 , m_desired_queue_size(2)
00083 , m_free_upload(0)
00084 , m_trust_points(0)
00085 , m_assume_fifo(false)
00086 , m_num_invalid_requests(0)
00087 , m_disconnecting(false)
00088 , m_became_uninterested(second_clock::universal_time())
00089 , m_became_uninteresting(second_clock::universal_time())
00090 , m_connecting(true)
00091 , m_queued(true)
00092 , m_writing(false)
00093 , m_reading(false)
00094 , m_prefer_whole_pieces(false)
00095 , m_request_large_blocks(false)
00096 , m_non_prioritized(false)
00097 , m_refs(0)
00098 , m_upload_limit(resource_request::inf)
00099 , m_download_limit(resource_request::inf)
00100 #ifndef NDEBUG
00101 , m_in_constructor(true)
00102 #endif
00103 {
00104 std::fill(m_country, m_country + 2, 0);
00105 #ifdef TORRENT_VERBOSE_LOGGING
00106 m_logger = m_ses.create_log(m_remote.address().to_string() + "_"
00107 + boost::lexical_cast<std::string>(m_remote.port()), m_ses.listen_port());
00108 (*m_logger) << "*** OUTGOING CONNECTION\n";
00109 #endif
00110
00111 boost::shared_ptr<torrent> t = m_torrent.lock();
00112 assert(t);
00113 std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
00114
00115 if (t->ready_for_connections())
00116 init();
00117 }
00118
00119 peer_connection::peer_connection(
00120 session_impl& ses
00121 , boost::shared_ptr<stream_socket> s)
00122 :
00123 #ifndef NDEBUG
00124 m_last_choke(boost::posix_time::second_clock::universal_time()
00125 - hours(1))
00126 ,
00127 #endif
00128 m_ses(ses)
00129 , m_max_out_request_queue(m_ses.settings().max_out_request_queue)
00130 , m_timeout(m_ses.settings().peer_timeout)
00131 , m_last_piece(second_clock::universal_time())
00132 , m_packet_size(0)
00133 , m_recv_pos(0)
00134 , m_current_send_buffer(0)
00135 , m_write_pos(0)
00136 , m_last_receive(second_clock::universal_time())
00137 , m_last_sent(second_clock::universal_time())
00138 , m_socket(s)
00139 , m_active(false)
00140 , m_peer_interested(false)
00141 , m_peer_choked(true)
00142 , m_interesting(false)
00143 , m_choked(true)
00144 , m_failed(false)
00145 , m_num_pieces(0)
00146 , m_desired_queue_size(2)
00147 , m_free_upload(0)
00148 , m_trust_points(0)
00149 , m_assume_fifo(false)
00150 , m_num_invalid_requests(0)
00151 , m_disconnecting(false)
00152 , m_became_uninterested(second_clock::universal_time())
00153 , m_became_uninteresting(second_clock::universal_time())
00154 , m_connecting(false)
00155 , m_queued(false)
00156 , m_writing(false)
00157 , m_reading(false)
00158 , m_prefer_whole_pieces(false)
00159 , m_request_large_blocks(false)
00160 , m_non_prioritized(false)
00161 , m_refs(0)
00162 , m_upload_limit(resource_request::inf)
00163 , m_download_limit(resource_request::inf)
00164 #ifndef NDEBUG
00165 , m_in_constructor(true)
00166 #endif
00167 {
00168 std::fill(m_country, m_country + 2, 0);
00169 m_remote = m_socket->remote_endpoint();
00170
00171 #ifdef TORRENT_VERBOSE_LOGGING
00172 assert(m_socket->remote_endpoint() == remote());
00173 m_logger = m_ses.create_log(remote().address().to_string() + "_"
00174 + boost::lexical_cast<std::string>(remote().port()), m_ses.listen_port());
00175 (*m_logger) << "*** INCOMING CONNECTION\n";
00176 #endif
00177
00178 std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
00179 }
00180
00181 #ifndef TORRENT_DISABLE_EXTENSIONS
00182 void peer_connection::add_extension(boost::shared_ptr<peer_plugin> ext)
00183 {
00184 m_extensions.push_back(ext);
00185 }
00186 #endif
00187
00188 void peer_connection::init()
00189 {
00190 INVARIANT_CHECK;
00191
00192 boost::shared_ptr<torrent> t = m_torrent.lock();
00193 assert(t);
00194 assert(t->valid_metadata());
00195 assert(t->ready_for_connections());
00196
00197 m_have_piece.resize(t->torrent_file().num_pieces(), false);
00198
00199
00200
00201
00202
00203 m_num_pieces = 0;
00204 std::vector<int> piece_list;
00205 for (int i = 0; i < (int)m_have_piece.size(); ++i)
00206 {
00207 if (m_have_piece[i])
00208 {
00209 ++m_num_pieces;
00210 piece_list.push_back(i);
00211 }
00212 }
00213
00214
00215
00216 bool interesting = false;
00217 for (std::vector<int>::reverse_iterator i = piece_list.rbegin();
00218 i != piece_list.rend(); ++i)
00219 {
00220 int index = *i;
00221 t->peer_has(index);
00222 if (!t->have_piece(index)
00223 && !t->picker().is_filtered(index))
00224 interesting = true;
00225 }
00226
00227 if (piece_list.size() == m_have_piece.size())
00228 {
00229 #ifdef TORRENT_VERBOSE_LOGGING
00230 (*m_logger) << " *** THIS IS A SEED ***\n";
00231 #endif
00232
00233 if (t->is_seed())
00234 {
00235 #ifdef TORRENT_VERBOSE_LOGGING
00236 (*m_logger) << " we're also a seed, disconnecting\n";
00237 #endif
00238 throw std::runtime_error("seed to seed connection redundant, disconnecting");
00239 }
00240 }
00241
00242 if (interesting)
00243 t->get_policy().peer_is_interesting(*this);
00244 }
00245
00246 peer_connection::~peer_connection()
00247 {
00248
00249 assert(m_disconnecting);
00250
00251 #ifdef TORRENT_VERBOSE_LOGGING
00252 using namespace boost::posix_time;
00253 if (m_logger)
00254 {
00255 (*m_logger) << to_simple_string(second_clock::universal_time())
00256 << " *** CONNECTION CLOSED\n";
00257 }
00258 #endif
00259 #ifndef NDEBUG
00260 boost::shared_ptr<torrent> t = m_torrent.lock();
00261 if (t) assert(t->connection_for(remote()) != this);
00262 #endif
00263 }
00264
00265 void peer_connection::announce_piece(int index)
00266 {
00267
00268
00269 if (has_piece(index)) return;
00270
00271 #ifdef TORRENT_VERBOSE_LOGGING
00272 using namespace boost::posix_time;
00273 (*m_logger) << to_simple_string(second_clock::universal_time())
00274 << " ==> HAVE [ piece: " << index << "]\n";
00275 #endif
00276 write_have(index);
00277 #ifndef NDEBUG
00278 boost::shared_ptr<torrent> t = m_torrent.lock();
00279 assert(t);
00280 assert(t->have_piece(index));
00281 #endif
00282 }
00283
00284 bool peer_connection::has_piece(int i) const
00285 {
00286 INVARIANT_CHECK;
00287
00288 boost::shared_ptr<torrent> t = m_torrent.lock();
00289 assert(t);
00290 assert(t->valid_metadata());
00291 assert(i >= 0);
00292 assert(i < t->torrent_file().num_pieces());
00293 return m_have_piece[i];
00294 }
00295
00296 std::deque<piece_block> const& peer_connection::request_queue() const
00297 {
00298 return m_request_queue;
00299 }
00300
00301 std::deque<piece_block> const& peer_connection::download_queue() const
00302 {
00303 return m_download_queue;
00304 }
00305
00306 std::deque<peer_request> const& peer_connection::upload_queue() const
00307 {
00308 return m_requests;
00309 }
00310
00311 void peer_connection::add_stat(size_type downloaded, size_type uploaded)
00312 {
00313 INVARIANT_CHECK;
00314
00315 m_statistics.add_stat(downloaded, uploaded);
00316 }
00317
00318 std::vector<bool> const& peer_connection::get_bitfield() const
00319 {
00320 return m_have_piece;
00321 }
00322
00323 void peer_connection::received_valid_data(int index)
00324 {
00325 INVARIANT_CHECK;
00326
00327 #ifndef TORRENT_DISABLE_EXTENSIONS
00328 for (extension_list_t::iterator i = m_extensions.begin()
00329 , end(m_extensions.end()); i != end; ++i)
00330 {
00331 try { (*i)->on_piece_pass(index); } catch (std::exception&) {}
00332 }
00333 #endif
00334
00335 m_trust_points++;
00336
00337 if (m_trust_points > 20) m_trust_points = 20;
00338 }
00339
00340 void peer_connection::received_invalid_data(int index)
00341 {
00342 INVARIANT_CHECK;
00343
00344 #ifndef TORRENT_DISABLE_EXTENSIONS
00345 for (extension_list_t::iterator i = m_extensions.begin()
00346 , end(m_extensions.end()); i != end; ++i)
00347 {
00348 try { (*i)->on_piece_failed(index); } catch (std::exception&) {}
00349 }
00350 #endif
00351
00352
00353
00354
00355 m_trust_points -= 2;
00356 if (m_trust_points < -7) m_trust_points = -7;
00357 }
00358
00359 int peer_connection::trust_points() const
00360 {
00361 return m_trust_points;
00362 }
00363
00364 size_type peer_connection::total_free_upload() const
00365 {
00366 return m_free_upload;
00367 }
00368
00369 void peer_connection::add_free_upload(size_type free_upload)
00370 {
00371 INVARIANT_CHECK;
00372
00373 m_free_upload += free_upload;
00374 }
00375
00376
00377
00378 bool peer_connection::verify_piece(const peer_request& p) const
00379 {
00380 INVARIANT_CHECK;
00381
00382 boost::shared_ptr<torrent> t = m_torrent.lock();
00383 assert(t);
00384
00385 assert(t->valid_metadata());
00386
00387 return p.piece >= 0
00388 && p.piece < t->torrent_file().num_pieces()
00389 && p.length > 0
00390 && p.start >= 0
00391 && (p.length == t->block_size()
00392 || (p.length < t->block_size()
00393 && p.piece == t->torrent_file().num_pieces()-1
00394 && p.start + p.length == t->torrent_file().piece_size(p.piece))
00395 || (m_request_large_blocks
00396 && p.length <= t->torrent_file().piece_size(p.piece)))
00397 && p.start + p.length <= t->torrent_file().piece_size(p.piece)
00398 && (p.start % t->block_size() == 0);
00399 }
00400
00401 struct disconnect_torrent
00402 {
00403 disconnect_torrent(boost::weak_ptr<torrent>& t): m_t(&t) {}
00404 ~disconnect_torrent() { if (m_t) m_t->reset(); }
00405 void cancel() { m_t = 0; }
00406 private:
00407 boost::weak_ptr<torrent>* m_t;
00408 };
00409
00410 void peer_connection::attach_to_torrent(sha1_hash const& ih)
00411 {
00412 INVARIANT_CHECK;
00413
00414 assert(!m_disconnecting);
00415 m_torrent = m_ses.find_torrent(ih);
00416
00417 boost::shared_ptr<torrent> t = m_torrent.lock();
00418
00419 if (t && t->is_aborted())
00420 {
00421 m_torrent.reset();
00422 t.reset();
00423 }
00424
00425 if (!t)
00426 {
00427
00428 #ifdef TORRENT_VERBOSE_LOGGING
00429 (*m_logger) << " couldn't find a torrent with the given info_hash\n";
00430 #endif
00431 throw std::runtime_error("got info-hash that is not in our session");
00432 }
00433
00434 disconnect_torrent disconnect(m_torrent);
00435 if (t->is_paused())
00436 {
00437
00438
00439 #ifdef TORRENT_VERBOSE_LOGGING
00440 (*m_logger) << " rejected connection to paused torrent\n";
00441 #endif
00442 throw std::runtime_error("connection rejected by paused torrent");
00443 }
00444
00445
00446
00447 t->attach_peer(this);
00448
00449
00450
00451
00452 if (t->ready_for_connections()) init();
00453
00454
00455
00456
00457 assert(m_num_pieces == 0);
00458 std::fill(m_have_piece.begin(), m_have_piece.end(), false);
00459 disconnect.cancel();
00460 }
00461
00462
00463
00464
00465
00466
00467
00468 void peer_connection::incoming_keepalive()
00469 {
00470 INVARIANT_CHECK;
00471
00472 #ifdef TORRENT_VERBOSE_LOGGING
00473 using namespace boost::posix_time;
00474 (*m_logger) << to_simple_string(second_clock::universal_time())
00475 << " <== KEEPALIVE\n";
00476 #endif
00477 }
00478
00479
00480
00481
00482
00483 void peer_connection::incoming_choke()
00484 {
00485 INVARIANT_CHECK;
00486
00487 boost::shared_ptr<torrent> t = m_torrent.lock();
00488 assert(t);
00489
00490 #ifdef TORRENT_VERBOSE_LOGGING
00491 using namespace boost::posix_time;
00492 (*m_logger) << to_simple_string(second_clock::universal_time())
00493 << " <== CHOKE\n";
00494 #endif
00495 m_peer_choked = true;
00496 t->get_policy().choked(*this);
00497
00498 if (!t->is_seed())
00499 {
00500 piece_picker& p = t->picker();
00501
00502
00503 for (std::deque<piece_block>::iterator i = m_download_queue.begin();
00504 i != m_download_queue.end(); ++i)
00505 {
00506 p.abort_download(*i);
00507 }
00508 for (std::deque<piece_block>::const_iterator i = m_request_queue.begin()
00509 , end(m_request_queue.end()); i != end; ++i)
00510 {
00511
00512
00513 p.abort_download(*i);
00514 }
00515 }
00516
00517 m_download_queue.clear();
00518 m_request_queue.clear();
00519 }
00520
00521
00522
00523
00524
00525 void peer_connection::incoming_unchoke()
00526 {
00527 INVARIANT_CHECK;
00528
00529 boost::shared_ptr<torrent> t = m_torrent.lock();
00530 assert(t);
00531
00532 #ifdef TORRENT_VERBOSE_LOGGING
00533 using namespace boost::posix_time;
00534 (*m_logger) << to_simple_string(second_clock::universal_time())
00535 << " <== UNCHOKE\n";
00536 #endif
00537 m_peer_choked = false;
00538 t->get_policy().unchoked(*this);
00539 }
00540
00541
00542
00543
00544
00545 void peer_connection::incoming_interested()
00546 {
00547 INVARIANT_CHECK;
00548
00549 boost::shared_ptr<torrent> t = m_torrent.lock();
00550 assert(t);
00551
00552 #ifdef TORRENT_VERBOSE_LOGGING
00553 using namespace boost::posix_time;
00554 (*m_logger) << to_simple_string(second_clock::universal_time())
00555 << " <== INTERESTED\n";
00556 #endif
00557 m_peer_interested = true;
00558 t->get_policy().interested(*this);
00559 }
00560
00561
00562
00563
00564
00565 void peer_connection::incoming_not_interested()
00566 {
00567 INVARIANT_CHECK;
00568
00569 m_became_uninterested = second_clock::universal_time();
00570
00571
00572 m_requests.clear();
00573
00574 #ifdef TORRENT_VERBOSE_LOGGING
00575 using namespace boost::posix_time;
00576 (*m_logger) << to_simple_string(second_clock::universal_time())
00577 << " <== NOT_INTERESTED\n";
00578 #endif
00579
00580 boost::shared_ptr<torrent> t = m_torrent.lock();
00581 assert(t);
00582
00583 m_peer_interested = false;
00584 t->get_policy().not_interested(*this);
00585 }
00586
00587
00588
00589
00590
00591 void peer_connection::incoming_have(int index)
00592 {
00593 INVARIANT_CHECK;
00594
00595 boost::shared_ptr<torrent> t = m_torrent.lock();
00596 assert(t);
00597
00598 #ifdef TORRENT_VERBOSE_LOGGING
00599 using namespace boost::posix_time;
00600 (*m_logger) << to_simple_string(second_clock::universal_time())
00601 << " <== HAVE [ piece: " << index << "]\n";
00602 #endif
00603
00604
00605 if (index >= (int)m_have_piece.size() || index < 0)
00606 throw protocol_error("got 'have'-message with higher index "
00607 "than the number of pieces");
00608
00609 if (m_have_piece[index])
00610 {
00611 #ifdef TORRENT_VERBOSE_LOGGING
00612 (*m_logger) << " got redundant HAVE message for index: " << index << "\n";
00613 #endif
00614 }
00615 else
00616 {
00617 m_have_piece[index] = true;
00618
00619
00620
00621
00622
00623 if (t->valid_metadata())
00624 {
00625 ++m_num_pieces;
00626 t->peer_has(index);
00627
00628 if (!t->have_piece(index)
00629 && !t->is_seed()
00630 && !is_interesting()
00631 && !t->picker().is_filtered(index))
00632 t->get_policy().peer_is_interesting(*this);
00633 }
00634
00635 if (t->is_seed() && is_seed())
00636 {
00637 throw protocol_error("seed to seed connection redundant, disconnecting");
00638 }
00639 }
00640 }
00641
00642
00643
00644
00645
00646 void peer_connection::incoming_bitfield(std::vector<bool> const& bitfield)
00647 {
00648 INVARIANT_CHECK;
00649
00650 boost::shared_ptr<torrent> t = m_torrent.lock();
00651 assert(t);
00652
00653 #ifdef TORRENT_VERBOSE_LOGGING
00654 using namespace boost::posix_time;
00655 (*m_logger) << to_simple_string(second_clock::universal_time())
00656 << " <== BITFIELD\n";
00657 #endif
00658
00659
00660
00661 if (t->valid_metadata()
00662 && (bitfield.size() / 8) != (m_have_piece.size() / 8))
00663 throw protocol_error("got bitfield with invalid size: "
00664 + boost::lexical_cast<std::string>(bitfield.size() / 8)
00665 + "bytes. expected: "
00666 + boost::lexical_cast<std::string>(m_have_piece.size() / 8)
00667 + "bytes");
00668
00669
00670
00671
00672
00673 if (!t->ready_for_connections())
00674 {
00675 m_have_piece = bitfield;
00676 m_num_pieces = std::count(bitfield.begin(), bitfield.end(), true);
00677 return;
00678 }
00679
00680
00681
00682 bool interesting = false;
00683 for (int i = 0; i < (int)m_have_piece.size(); ++i)
00684 {
00685 bool have = bitfield[i];
00686 if (have && !m_have_piece[i])
00687 {
00688 m_have_piece[i] = true;
00689 ++m_num_pieces;
00690 t->peer_has(i);
00691 if (!t->have_piece(i)
00692 && !t->picker().is_filtered(i))
00693 interesting = true;
00694 }
00695 else if (!have && m_have_piece[i])
00696 {
00697
00698 m_have_piece[i] = false;
00699 --m_num_pieces;
00700 t->peer_lost(i);
00701 }
00702 }
00703
00704 if (m_num_pieces == int(m_have_piece.size()))
00705 {
00706 #ifdef TORRENT_VERBOSE_LOGGING
00707 (*m_logger) << " *** THIS IS A SEED ***\n";
00708 #endif
00709
00710 if (t->is_seed())
00711 {
00712 throw protocol_error("seed to seed connection redundant, disconnecting");
00713 }
00714 }
00715
00716 if (interesting) t->get_policy().peer_is_interesting(*this);
00717 }
00718
00719
00720
00721
00722
00723 void peer_connection::incoming_request(peer_request const& r)
00724 {
00725 INVARIANT_CHECK;
00726
00727 boost::shared_ptr<torrent> t = m_torrent.lock();
00728 assert(t);
00729
00730 if (!t->valid_metadata())
00731 {
00732
00733
00734 #ifdef TORRENT_VERBOSE_LOGGING
00735 using namespace boost::posix_time;
00736 (*m_logger) << to_simple_string(second_clock::universal_time())
00737 << " <== UNEXPECTED_REQUEST [ "
00738 "piece: " << r.piece << " | "
00739 "s: " << r.start << " | "
00740 "l: " << r.length << " | "
00741 "i: " << m_peer_interested << " | "
00742 "t: " << (int)t->torrent_file().piece_size(r.piece) << " | "
00743 "n: " << t->torrent_file().num_pieces() << " ]\n";
00744 #endif
00745 return;
00746 }
00747
00748 if (int(m_requests.size()) > m_ses.settings().max_allowed_in_request_queue)
00749 {
00750
00751
00752
00753
00754 #ifdef TORRENT_VERBOSE_LOGGING
00755 using namespace boost::posix_time;
00756 (*m_logger) << to_simple_string(second_clock::universal_time())
00757 << " <== TOO MANY REQUESTS [ "
00758 "piece: " << r.piece << " | "
00759 "s: " << r.start << " | "
00760 "l: " << r.length << " | "
00761 "i: " << m_peer_interested << " | "
00762 "t: " << (int)t->torrent_file().piece_size(r.piece) << " | "
00763 "n: " << t->torrent_file().num_pieces() << " ]\n";
00764 #endif
00765 return;
00766 }
00767
00768
00769
00770
00771 if (r.piece >= 0
00772 && r.piece < t->torrent_file().num_pieces()
00773 && t->have_piece(r.piece)
00774 && r.start >= 0
00775 && r.start < t->torrent_file().piece_size(r.piece)
00776 && r.length > 0
00777 && r.length + r.start <= t->torrent_file().piece_size(r.piece)
00778 && m_peer_interested)
00779 {
00780 #ifdef TORRENT_VERBOSE_LOGGING
00781 using namespace boost::posix_time;
00782 (*m_logger) << to_simple_string(second_clock::universal_time())
00783 << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
00784 #endif
00785
00786
00787 if (m_choked)
00788 return;
00789
00790 m_requests.push_back(r);
00791 fill_send_buffer();
00792 }
00793 else
00794 {
00795 #ifdef TORRENT_VERBOSE_LOGGING
00796 using namespace boost::posix_time;
00797 (*m_logger) << to_simple_string(second_clock::universal_time())
00798 << " <== INVALID_REQUEST [ "
00799 "piece: " << r.piece << " | "
00800 "s: " << r.start << " | "
00801 "l: " << r.length << " | "
00802 "i: " << m_peer_interested << " | "
00803 "t: " << (int)t->torrent_file().piece_size(r.piece) << " | "
00804 "n: " << t->torrent_file().num_pieces() << " | "
00805 "h: " << t->have_piece(r.piece) << " ]\n";
00806 #endif
00807
00808 ++m_num_invalid_requests;
00809
00810 if (t->alerts().should_post(alert::debug))
00811 {
00812 t->alerts().post_alert(invalid_request_alert(
00813 r
00814 , t->get_handle()
00815 , m_remote
00816 , m_peer_id
00817 , "peer sent an illegal piece request, ignoring"));
00818 }
00819 }
00820 }
00821
00822 void peer_connection::incoming_piece_fragment()
00823 {
00824 m_last_piece = second_clock::universal_time();
00825 }
00826
00827 #ifndef NDEBUG
00828 struct check_postcondition
00829 {
00830 check_postcondition(boost::shared_ptr<torrent> const& t_
00831 , bool init_check = true): t(t_) { if (init_check) check(); }
00832
00833 ~check_postcondition() { check(); }
00834
00835 void check()
00836 {
00837 if (!t->is_seed())
00838 {
00839 const int blocks_per_piece = static_cast<int>(
00840 t->torrent_file().piece_length() / t->block_size());
00841
00842 std::vector<piece_picker::downloading_piece> const& dl_queue
00843 = t->picker().get_download_queue();
00844
00845 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
00846 dl_queue.begin(); i != dl_queue.end(); ++i)
00847 {
00848 assert(int(i->finished_blocks.count()) < blocks_per_piece);
00849 }
00850 }
00851 }
00852
00853 shared_ptr<torrent> t;
00854 };
00855 #endif
00856
00857
00858
00859
00860
00861 void peer_connection::incoming_piece(peer_request const& p, char const* data)
00862 {
00863 INVARIANT_CHECK;
00864
00865 boost::shared_ptr<torrent> t = m_torrent.lock();
00866 assert(t);
00867 #ifndef NDEBUG
00868 check_postcondition post_checker_(t);
00869 t->check_invariant();
00870 #endif
00871
00872 #ifdef TORRENT_VERBOSE_LOGGING
00873 (*m_logger) << to_simple_string(second_clock::universal_time())
00874 << " <== PIECE [ piece: " << p.piece << " | "
00875 "s: " << p.start << " | "
00876 "l: " << p.length << " | "
00877 "ds: " << statistics().download_rate() << " | "
00878 "qs: " << m_desired_queue_size << " ]\n";
00879 #endif
00880
00881 if (!verify_piece(p))
00882 {
00883 #ifdef TORRENT_VERBOSE_LOGGING
00884 using namespace boost::posix_time;
00885 (*m_logger) << to_simple_string(second_clock::universal_time())
00886 << " <== INVALID_PIECE [ piece: " << p.piece << " | "
00887 "start: " << p.start << " | "
00888 "length: " << p.length << " ]\n";
00889 #endif
00890 throw protocol_error("got invalid piece packet");
00891 }
00892
00893 using namespace boost::posix_time;
00894
00895
00896
00897 if (t->is_seed())
00898 {
00899 t->received_redundant_data(p.length);
00900 return;
00901 }
00902
00903 piece_picker& picker = t->picker();
00904 piece_manager& fs = t->filesystem();
00905 policy& pol = t->get_policy();
00906
00907 std::vector<piece_block> finished_blocks;
00908 piece_block block_finished(p.piece, p.start / t->block_size());
00909 assert(p.start % t->block_size() == 0);
00910 assert(p.length == t->block_size()
00911 || p.length == t->torrent_file().total_size() % t->block_size());
00912
00913 std::deque<piece_block>::iterator b
00914 = std::find(
00915 m_download_queue.begin()
00916 , m_download_queue.end()
00917 , block_finished);
00918
00919
00920
00921 peer_connection* request_peer = 0;
00922
00923 if (b != m_download_queue.end())
00924 {
00925 if (m_assume_fifo)
00926 {
00927 for (std::deque<piece_block>::iterator i = m_download_queue.begin();
00928 i != b; ++i)
00929 {
00930 #ifdef TORRENT_VERBOSE_LOGGING
00931 (*m_logger) << to_simple_string(second_clock::universal_time())
00932 << " *** SKIPPED_PIECE [ piece: " << i->piece_index << " | "
00933 "b: " << i->block_index << " ] ***\n";
00934 #endif
00935
00936
00937
00938 picker.abort_download(*i);
00939 }
00940
00941
00942
00943
00944 m_download_queue.erase(m_download_queue.begin()
00945 , boost::next(b));
00946 }
00947 else
00948 {
00949 m_download_queue.erase(b);
00950 }
00951 }
00952 else
00953 {
00954
00955
00956 boost::optional<tcp::endpoint> peer
00957 = t->picker().get_downloader(block_finished);
00958 if (peer)
00959 {
00960 assert(!t->picker().is_finished(block_finished));
00961 peer_connection* pc = t->connection_for(*peer);
00962 if (pc && pc != this)
00963 {
00964 pc->cancel_request(block_finished);
00965 request_peer = pc;
00966 }
00967 }
00968 else
00969 {
00970 if (t->alerts().should_post(alert::debug))
00971 {
00972 t->alerts().post_alert(
00973 peer_error_alert(
00974 m_remote
00975 , m_peer_id
00976 , "got a block that was not requested"));
00977 }
00978 #ifdef TORRENT_VERBOSE_LOGGING
00979 (*m_logger) << " *** The block we just got was not in the "
00980 "request queue ***\n";
00981 #endif
00982 }
00983 }
00984
00985
00986 if (picker.is_finished(block_finished))
00987 {
00988 t->received_redundant_data(t->block_size());
00989 pol.block_finished(*this, block_finished);
00990 send_block_requests();
00991
00992 if (request_peer && !request_peer->has_peer_choked() && !t->is_seed())
00993 {
00994 request_a_block(*t, *request_peer);
00995 request_peer->send_block_requests();
00996 }
00997 return;
00998 }
00999
01000 fs.write(data, p.piece, p.start, p.length);
01001
01002 picker.mark_as_finished(block_finished, m_remote);
01003
01004 try
01005 {
01006 pol.block_finished(*this, block_finished);
01007 send_block_requests();
01008 }
01009 catch (std::exception const&) {}
01010
01011 if (request_peer && !request_peer->has_peer_choked() && !t->is_seed())
01012 {
01013 request_a_block(*t, *request_peer);
01014 request_peer->send_block_requests();
01015 }
01016
01017 #ifndef NDEBUG
01018 try
01019 {
01020 #endif
01021
01022 bool was_seed = t->is_seed();
01023 bool was_finished = picker.num_filtered() + t->num_pieces()
01024 == t->torrent_file().num_pieces();
01025
01026
01027 if (picker.is_piece_finished(p.piece))
01028 {
01029 #ifndef NDEBUG
01030 check_postcondition post_checker2_(t, false);
01031 #endif
01032 bool verified = t->verify_piece(p.piece);
01033 if (verified)
01034 {
01035
01036
01037 t->announce_piece(p.piece);
01038 assert(t->valid_metadata());
01039
01040
01041 if (!was_finished
01042 && (t->is_seed()
01043 || picker.num_filtered() + t->num_pieces()
01044 == t->torrent_file().num_pieces()))
01045 {
01046
01047
01048
01049
01050 try { t->finished(); }
01051 catch (std::exception&)
01052 {
01053 assert(false);
01054 }
01055 }
01056 }
01057 else
01058 {
01059 t->piece_failed(p.piece);
01060 }
01061
01062 pol.piece_finished(p.piece, verified);
01063
01064 if (!was_seed && t->is_seed())
01065 {
01066 assert(verified);
01067 t->completed();
01068 }
01069 }
01070
01071 #ifndef NDEBUG
01072 }
01073 catch (std::exception const& e)
01074 {
01075 std::string err = e.what();
01076 assert(false);
01077 }
01078 #endif
01079 }
01080
01081
01082
01083
01084
01085 void peer_connection::incoming_cancel(peer_request const& r)
01086 {
01087 INVARIANT_CHECK;
01088
01089 #ifdef TORRENT_VERBOSE_LOGGING
01090 using namespace boost::posix_time;
01091 (*m_logger) << to_simple_string(second_clock::universal_time())
01092 << " <== CANCEL [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
01093 #endif
01094
01095 std::deque<peer_request>::iterator i
01096 = std::find(m_requests.begin(), m_requests.end(), r);
01097
01098 if (i != m_requests.end())
01099 {
01100 m_requests.erase(i);
01101 }
01102 else
01103 {
01104 #ifdef TORRENT_VERBOSE_LOGGING
01105 using namespace boost::posix_time;
01106 (*m_logger) << to_simple_string(second_clock::universal_time())
01107 << " *** GOT CANCEL NOT IN THE QUEUE\n";
01108 #endif
01109 }
01110 }
01111
01112
01113
01114
01115
01116 void peer_connection::incoming_dht_port(int listen_port)
01117 {
01118 INVARIANT_CHECK;
01119
01120 #ifdef TORRENT_VERBOSE_LOGGING
01121 using namespace boost::posix_time;
01122 (*m_logger) << to_simple_string(second_clock::universal_time())
01123 << " <== DHT_PORT [ p: " << listen_port << " ]\n";
01124 #endif
01125 #ifndef TORRENT_DISABLE_DHT
01126 m_ses.add_dht_node(udp::endpoint(
01127 m_remote.address(), listen_port));
01128 #endif
01129 }
01130
01131 void peer_connection::add_request(piece_block const& block)
01132 {
01133 INVARIANT_CHECK;
01134
01135 boost::shared_ptr<torrent> t = m_torrent.lock();
01136 assert(t);
01137
01138 assert(t->valid_metadata());
01139 assert(block.piece_index >= 0);
01140 assert(block.piece_index < t->torrent_file().num_pieces());
01141 assert(block.block_index >= 0);
01142 assert(block.block_index < t->torrent_file().piece_size(block.piece_index));
01143 assert(!t->picker().is_downloading(block));
01144
01145 t->picker().mark_as_downloading(block, m_remote);
01146 m_request_queue.push_back(block);
01147 }
01148
01149 void peer_connection::cancel_request(piece_block const& block)
01150 {
01151 INVARIANT_CHECK;
01152
01153 boost::shared_ptr<torrent> t = m_torrent.lock();
01154 assert(t);
01155
01156 assert(t->valid_metadata());
01157
01158 assert(block.piece_index >= 0);
01159 assert(block.piece_index < t->torrent_file().num_pieces());
01160 assert(block.block_index >= 0);
01161 assert(block.block_index < t->torrent_file().piece_size(block.piece_index));
01162 assert(t->picker().is_downloading(block));
01163
01164 t->picker().abort_download(block);
01165
01166 std::deque<piece_block>::iterator it
01167 = std::find(m_download_queue.begin(), m_download_queue.end(), block);
01168 if (it == m_download_queue.end())
01169 {
01170 it = std::find(m_request_queue.begin(), m_request_queue.end(), block);
01171 assert(it != m_request_queue.end());
01172 if (it == m_request_queue.end()) return;
01173 m_request_queue.erase(it);
01174
01175
01176 return;
01177 }
01178 else
01179 {
01180 m_download_queue.erase(it);
01181 }
01182
01183 int block_offset = block.block_index * t->block_size();
01184 int block_size
01185 = std::min((int)t->torrent_file().piece_size(block.piece_index)-block_offset,
01186 t->block_size());
01187 assert(block_size > 0);
01188 assert(block_size <= t->block_size());
01189
01190 peer_request r;
01191 r.piece = block.piece_index;
01192 r.start = block_offset;
01193 r.length = block_size;
01194
01195 write_cancel(r);
01196
01197 #ifdef TORRENT_VERBOSE_LOGGING
01198 using namespace boost::posix_time;
01199 (*m_logger) << to_simple_string(second_clock::universal_time())
01200 << " ==> CANCEL [ piece: " << block.piece_index << " | s: "
01201 << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n";
01202 #endif
01203 }
01204
01205 void peer_connection::send_choke()
01206 {
01207 INVARIANT_CHECK;
01208
01209 if (m_choked) return;
01210 write_choke();
01211 m_choked = true;
01212
01213 #ifdef TORRENT_VERBOSE_LOGGING
01214 using namespace boost::posix_time;
01215 (*m_logger) << to_simple_string(second_clock::universal_time())
01216 << " ==> CHOKE\n";
01217 #endif
01218 #ifndef NDEBUG
01219 using namespace boost::posix_time;
01220 m_last_choke = second_clock::universal_time();
01221 #endif
01222 m_num_invalid_requests = 0;
01223 m_requests.clear();
01224 }
01225
01226 void peer_connection::send_unchoke()
01227 {
01228 INVARIANT_CHECK;
01229
01230 #ifndef NDEBUG
01231
01232
01233
01234
01235 using namespace boost::posix_time;
01236
01237 #endif
01238
01239 if (!m_choked) return;
01240 write_unchoke();
01241 m_choked = false;
01242
01243 #ifdef TORRENT_VERBOSE_LOGGING
01244 using namespace boost::posix_time;
01245 (*m_logger) << to_simple_string(second_clock::universal_time())
01246 << " ==> UNCHOKE\n";
01247 #endif
01248 }
01249
01250 void peer_connection::send_interested()
01251 {
01252 INVARIANT_CHECK;
01253
01254 if (m_interesting) return;
01255 write_interested();
01256 m_interesting = true;
01257
01258 #ifdef TORRENT_VERBOSE_LOGGING
01259 using namespace boost::posix_time;
01260 (*m_logger) << to_simple_string(second_clock::universal_time())
01261 << " ==> INTERESTED\n";
01262 #endif
01263 }
01264
01265 void peer_connection::send_not_interested()
01266 {
01267 INVARIANT_CHECK;
01268
01269 if (!m_interesting) return;
01270 write_not_interested();
01271 m_interesting = false;
01272
01273 m_became_uninteresting = second_clock::universal_time();
01274
01275 #ifdef TORRENT_VERBOSE_LOGGING
01276 using namespace boost::posix_time;
01277 (*m_logger) << to_simple_string(second_clock::universal_time())
01278 << " ==> NOT_INTERESTED\n";
01279 #endif
01280 }
01281
01282 void peer_connection::send_block_requests()
01283 {
01284 INVARIANT_CHECK;
01285
01286 if (has_peer_choked()) return;
01287
01288 boost::shared_ptr<torrent> t = m_torrent.lock();
01289 assert(t);
01290
01291 assert(!has_peer_choked());
01292
01293 if ((int)m_download_queue.size() >= m_desired_queue_size) return;
01294
01295 while (!m_request_queue.empty()
01296 && (int)m_download_queue.size() < m_desired_queue_size)
01297 {
01298 piece_block block = m_request_queue.front();
01299
01300 int block_offset = block.block_index * t->block_size();
01301 int block_size = std::min((int)t->torrent_file().piece_size(
01302 block.piece_index) - block_offset, t->block_size());
01303 assert(block_size > 0);
01304 assert(block_size <= t->block_size());
01305
01306 peer_request r;
01307 r.piece = block.piece_index;
01308 r.start = block_offset;
01309 r.length = block_size;
01310
01311 m_request_queue.pop_front();
01312 m_download_queue.push_back(block);
01313
01314
01315
01316 if (m_request_large_blocks)
01317 {
01318 while (!m_request_queue.empty()
01319 && m_request_queue.front().piece_index == r.piece
01320 && m_request_queue.front().block_index == block.block_index + 1)
01321 {
01322 block = m_request_queue.front();
01323 m_request_queue.pop_front();
01324 m_download_queue.push_back(block);
01325
01326 block_offset = block.block_index * t->block_size();
01327 block_size = std::min((int)t->torrent_file().piece_size(
01328 block.piece_index) - block_offset, t->block_size());
01329 assert(block_size > 0);
01330 assert(block_size <= t->block_size());
01331
01332 r.length += block_size;
01333 }
01334 }
01335
01336 assert(verify_piece(r));
01337
01338 #ifndef TORRENT_DISABLE_EXTENSIONS
01339 bool handled = false;
01340 for (extension_list_t::iterator i = m_extensions.begin()
01341 , end(m_extensions.end()); i != end; ++i)
01342 {
01343 if (handled = (*i)->write_request(r)) break;
01344 }
01345 if (!handled) write_request(r);
01346 #else
01347 write_request(r);
01348 #endif
01349
01350 using namespace boost::posix_time;
01351
01352 #ifdef TORRENT_VERBOSE_LOGGING
01353 (*m_logger) << to_simple_string(second_clock::universal_time())
01354 << " ==> REQUEST [ "
01355 "piece: " << r.piece << " | "
01356 "s: " << r.start << " | "
01357 "l: " << r.length << " | "
01358 "ds: " << statistics().download_rate() << " B/s | "
01359 "qs: " << m_desired_queue_size << " ]\n";
01360 #endif
01361 }
01362 m_last_piece = second_clock::universal_time();
01363 }
01364
01365 void close_socket_ignore_error(boost::shared_ptr<stream_socket> s)
01366 {
01367 asio::error_code e;
01368 s->close(e);
01369 }
01370
01371 void peer_connection::disconnect()
01372 {
01373 boost::intrusive_ptr<peer_connection> me(this);
01374
01375 INVARIANT_CHECK;
01376
01377 if (m_disconnecting) return;
01378 m_disconnecting = true;
01379 m_ses.m_io_service.post(boost::bind(&close_socket_ignore_error, m_socket));
01380
01381 boost::shared_ptr<torrent> t = m_torrent.lock();
01382
01383 if (t)
01384 {
01385 if (t->valid_metadata() && !t->is_seed())
01386 {
01387 piece_picker& picker = t->picker();
01388
01389 while (!m_download_queue.empty())
01390 {
01391 picker.abort_download(m_download_queue.back());
01392 m_download_queue.pop_back();
01393 }
01394 while (!m_request_queue.empty())
01395 {
01396 picker.abort_download(m_request_queue.back());
01397 m_request_queue.pop_back();
01398 }
01399 }
01400
01401 t->remove_peer(this);
01402
01403 m_torrent.reset();
01404 }
01405
01406 m_ses.close_connection(me);
01407 }
01408
01409 void peer_connection::set_upload_limit(int limit)
01410 {
01411 assert(limit >= -1);
01412 if (limit == -1) limit = resource_request::inf;
01413 if (limit < 10) limit = 10;
01414 m_upload_limit = limit;
01415 m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
01416 }
01417
01418 void peer_connection::set_download_limit(int limit)
01419 {
01420 assert(limit >= -1);
01421 if (limit == -1) limit = resource_request::inf;
01422 if (limit < 10) limit = 10;
01423 m_download_limit = limit;
01424 m_bandwidth_limit[download_channel].throttle(m_download_limit);
01425 }
01426
01427 size_type peer_connection::share_diff() const
01428 {
01429 INVARIANT_CHECK;
01430
01431 boost::shared_ptr<torrent> t = m_torrent.lock();
01432 assert(t);
01433
01434 float ratio = t->ratio();
01435
01436
01437
01438 if (ratio == 0.f)
01439 return std::numeric_limits<size_type>::max();
01440
01441 return m_free_upload
01442 + static_cast<size_type>(m_statistics.total_payload_download() * ratio)
01443 - m_statistics.total_payload_upload();
01444 }
01445
01446 void peer_connection::cut_receive_buffer(int size, int packet_size)
01447 {
01448 INVARIANT_CHECK;
01449
01450 assert(packet_size > 0);
01451 assert((int)m_recv_buffer.size() >= size);
01452
01453 std::copy(m_recv_buffer.begin() + size, m_recv_buffer.begin() + m_recv_pos, m_recv_buffer.begin());
01454
01455 assert(m_recv_pos >= size);
01456 m_recv_pos -= size;
01457
01458 #ifndef NDEBUG
01459 std::fill(m_recv_buffer.begin() + m_recv_pos, m_recv_buffer.end(), 0);
01460 #endif
01461
01462 m_packet_size = packet_size;
01463 if (m_packet_size >= m_recv_pos) m_recv_buffer.resize(m_packet_size);
01464 }
01465
01466 void peer_connection::second_tick(float tick_interval)
01467 {
01468 INVARIANT_CHECK;
01469
01470 ptime now(second_clock::universal_time());
01471
01472 boost::shared_ptr<torrent> t = m_torrent.lock();
01473 assert(t);
01474
01475 on_tick();
01476
01477 #ifndef TORRENT_DISABLE_EXTENSIONS
01478 for (extension_list_t::iterator i = m_extensions.begin()
01479 , end(m_extensions.end()); i != end; ++i)
01480 {
01481 (*i)->tick();
01482 }
01483 #endif
01484
01485 m_statistics.second_tick(tick_interval);
01486
01487 if (!t->valid_metadata()) return;
01488
01489
01490 const float queue_time = m_ses.settings().request_queue_time;
01491
01492
01493
01494
01495
01496
01497 const int block_size = m_request_large_blocks
01498 ? t->torrent_file().piece_length() : t->block_size();
01499 assert(block_size > 0);
01500
01501 m_desired_queue_size = static_cast<int>(queue_time
01502 * statistics().download_rate() / block_size);
01503 if (m_desired_queue_size > m_max_out_request_queue)
01504 m_desired_queue_size = m_max_out_request_queue;
01505 if (m_desired_queue_size < min_request_queue)
01506 m_desired_queue_size = min_request_queue;
01507
01508 if (!m_download_queue.empty()
01509 && now - m_last_piece > seconds(m_ses.settings().piece_timeout))
01510 {
01511
01512
01513
01514
01515 #ifdef TORRENT_VERBOSE_LOGGING
01516 (*m_logger) << to_simple_string(now)
01517 << " *** PIECE_REQUESTS TIMED OUT [ " << (int)m_download_queue.size()
01518 << " " << to_simple_string(now - m_last_piece) << "] ***\n";
01519 #endif
01520
01521 if (t->is_seed())
01522 {
01523 m_download_queue.clear();
01524 m_request_queue.clear();
01525 }
01526 else
01527 {
01528 piece_picker& picker = t->picker();
01529 while (!m_download_queue.empty())
01530 {
01531 picker.abort_download(m_download_queue.back());
01532 m_download_queue.pop_back();
01533 }
01534 while (!m_request_queue.empty())
01535 {
01536 picker.abort_download(m_request_queue.back());
01537 m_request_queue.pop_back();
01538 }
01539
01540
01541
01542
01543 m_assume_fifo = true;
01544
01545 request_a_block(*t, *this);
01546 send_block_requests();
01547 }
01548 }
01549
01550
01551
01552
01553
01554
01555
01556
01557 if (t->is_seed() || is_choked() || t->ratio() == 0.0f)
01558 {
01559
01560
01561
01562 m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
01563 }
01564 else
01565 {
01566 size_type bias = 0x10000 + 2 * t->block_size() + m_free_upload;
01567
01568 double break_even_time = 15;
01569 size_type have_uploaded = m_statistics.total_payload_upload();
01570 size_type have_downloaded = m_statistics.total_payload_download();
01571 double download_speed = m_statistics.download_rate();
01572
01573 size_type soon_downloaded =
01574 have_downloaded + (size_type)(download_speed * break_even_time*1.5);
01575
01576 if (t->ratio() != 1.f)
01577 soon_downloaded = (size_type)(soon_downloaded*(double)t->ratio());
01578
01579 double upload_speed_limit = std::min((soon_downloaded - have_uploaded
01580 + bias) / break_even_time, double(m_upload_limit));
01581
01582 upload_speed_limit = std::min(upload_speed_limit,
01583 (double)std::numeric_limits<int>::max());
01584
01585 m_bandwidth_limit[upload_channel].throttle(
01586 std::min(std::max((int)upload_speed_limit, 20)
01587 , m_upload_limit));
01588 }
01589
01590 fill_send_buffer();
01591
01592 }
01593
01594 void peer_connection::fill_send_buffer()
01595 {
01596 INVARIANT_CHECK;
01597
01598 boost::shared_ptr<torrent> t = m_torrent.lock();
01599 if (!t) return;
01600
01601
01602
01603
01604 while (!m_requests.empty()
01605 && (send_buffer_size() < t->block_size() * 6)
01606 && !m_choked)
01607 {
01608 assert(t->valid_metadata());
01609 peer_request& r = m_requests.front();
01610
01611 assert(r.piece >= 0);
01612 assert(r.piece < (int)m_have_piece.size());
01613 assert(t->have_piece(r.piece));
01614 assert(r.start + r.length <= t->torrent_file().piece_size(r.piece));
01615 assert(r.length > 0 && r.start >= 0);
01616
01617 write_piece(r);
01618
01619 #ifdef TORRENT_VERBOSE_LOGGING
01620 using namespace boost::posix_time;
01621 (*m_logger) << to_simple_string(second_clock::universal_time())
01622 << " ==> PIECE [ piece: " << r.piece << " | s: " << r.start
01623 << " | l: " << r.length << " ]\n";
01624 #endif
01625
01626 m_requests.erase(m_requests.begin());
01627
01628 if (m_requests.empty()
01629 && m_num_invalid_requests > 0
01630 && is_peer_interested()
01631 && !is_seed())
01632 {
01633
01634
01635
01636
01637 send_choke();
01638 send_unchoke();
01639 }
01640 }
01641 }
01642
01643 void peer_connection::assign_bandwidth(int channel, int amount)
01644 {
01645 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
01646
01647 #ifdef TORRENT_VERBOSE_LOGGING
01648 (*m_logger) << "bandwidth [ " << channel << " ] + " << amount << "\n";
01649 #endif
01650
01651 m_bandwidth_limit[channel].assign(amount);
01652 if (channel == upload_channel)
01653 {
01654 m_writing = false;
01655 setup_send();
01656 }
01657 else if (channel == download_channel)
01658 {
01659 m_reading = false;
01660 setup_receive();
01661 }
01662 }
01663
01664 void peer_connection::expire_bandwidth(int channel, int amount)
01665 {
01666 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
01667
01668 m_bandwidth_limit[channel].expire(amount);
01669 if (channel == upload_channel)
01670 {
01671 setup_send();
01672 }
01673 else if (channel == download_channel)
01674 {
01675 setup_receive();
01676 }
01677 }
01678
01679 void peer_connection::setup_send()
01680 {
01681 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
01682
01683 INVARIANT_CHECK;
01684
01685 if (m_writing) return;
01686
01687 shared_ptr<torrent> t = m_torrent.lock();
01688
01689 if (m_bandwidth_limit[upload_channel].quota_left() == 0
01690 && (!m_send_buffer[m_current_send_buffer].empty()
01691 || !m_send_buffer[(m_current_send_buffer + 1) & 1].empty())
01692 && !m_connecting
01693 && t)
01694 {
01695
01696
01697
01698 assert(t);
01699 if (m_bandwidth_limit[upload_channel].max_assignable() > 0)
01700 {
01701 #ifdef TORRENT_VERBOSE_LOGGING
01702 (*m_logger) << "req bandwidth [ " << upload_channel << " ]\n";
01703 #endif
01704
01705
01706 t->request_bandwidth(upload_channel, self(), false);
01707 m_writing = true;
01708 }
01709 return;
01710 }
01711
01712 if (!can_write()) return;
01713
01714 assert(!m_writing);
01715
01716 int sending_buffer = (m_current_send_buffer + 1) & 1;
01717 if (m_send_buffer[sending_buffer].empty())
01718 {
01719
01720
01721 std::swap(m_current_send_buffer, sending_buffer);
01722 m_write_pos = 0;
01723 }
01724
01725
01726 if (!m_send_buffer[sending_buffer].empty())
01727 {
01728 int amount_to_send
01729 = std::min(m_bandwidth_limit[upload_channel].quota_left()
01730 , (int)m_send_buffer[sending_buffer].size() - m_write_pos);
01731
01732 assert(amount_to_send > 0);
01733
01734 assert(m_write_pos < (int)m_send_buffer[sending_buffer].size());
01735 m_socket->async_write_some(asio::buffer(
01736 &m_send_buffer[sending_buffer][m_write_pos], amount_to_send)
01737 , bind(&peer_connection::on_send_data, self(), _1, _2));
01738
01739 m_writing = true;
01740 }
01741 }
01742
01743 void peer_connection::setup_receive()
01744 {
01745 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
01746
01747 INVARIANT_CHECK;
01748
01749 if (m_reading) return;
01750
01751 shared_ptr<torrent> t = m_torrent.lock();
01752
01753 if (m_bandwidth_limit[download_channel].quota_left() == 0
01754 && !m_connecting
01755 && t)
01756 {
01757 assert(t);
01758 if (m_bandwidth_limit[download_channel].max_assignable() > 0)
01759 {
01760 #ifdef TORRENT_VERBOSE_LOGGING
01761 (*m_logger) << "req bandwidth [ " << download_channel << " ]\n";
01762 #endif
01763 t->request_bandwidth(download_channel, self(), m_non_prioritized);
01764 m_reading = true;
01765 }
01766 return;
01767 }
01768
01769 if (!can_read()) return;
01770
01771 assert(m_packet_size > 0);
01772 int max_receive = std::min(
01773 m_bandwidth_limit[download_channel].quota_left()
01774 , m_packet_size - m_recv_pos);
01775 assert(max_receive > 0);
01776
01777 assert(m_recv_pos >= 0);
01778 assert(m_packet_size > 0);
01779 assert(max_receive > 0);
01780
01781 assert(can_read());
01782 m_socket->async_read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
01783 , max_receive), bind(&peer_connection::on_receive_data, self(), _1, _2));
01784 m_reading = true;
01785 }
01786
01787 void peer_connection::reset_recv_buffer(int packet_size)
01788 {
01789 assert(packet_size > 0);
01790 if (m_recv_pos > m_packet_size)
01791 {
01792 cut_receive_buffer(m_packet_size, packet_size);
01793 return;
01794 }
01795 m_recv_pos = 0;
01796 m_packet_size = packet_size;
01797 if (int(m_recv_buffer.size()) < m_packet_size)
01798 m_recv_buffer.resize(m_packet_size);
01799 }
01800
01801 void peer_connection::send_buffer(char const* begin, char const* end)
01802 {
01803 std::vector<char>& buf = m_send_buffer[m_current_send_buffer];
01804 buf.insert(buf.end(), begin, end);
01805 setup_send();
01806 }
01807
01808 buffer::interval peer_connection::allocate_send_buffer(int size)
01809 {
01810 std::vector<char>& buf = m_send_buffer[m_current_send_buffer];
01811 buf.resize(buf.size() + size);
01812 buffer::interval ret(&buf[0] + buf.size() - size, &buf[0] + buf.size());
01813 return ret;
01814 }
01815
01816 template<class T>
01817 struct set_to_zero
01818 {
01819 set_to_zero(T& v, bool cond): m_val(v), m_cond(cond) {}
01820 void fire() { if (!m_cond) return; m_cond = false; m_val = 0; }
01821 ~set_to_zero() { if (m_cond) m_val = 0; }
01822 private:
01823 T& m_val;
01824 bool m_cond;
01825 };
01826
01827
01828
01829
01830
01831
01832 void peer_connection::on_receive_data(const asio::error_code& error
01833 , std::size_t bytes_transferred) try
01834 {
01835 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
01836
01837 INVARIANT_CHECK;
01838
01839 assert(m_reading);
01840 m_reading = false;
01841
01842 m_bandwidth_limit[download_channel].use_quota(bytes_transferred);
01843
01844 if (error)
01845 {
01846 #ifdef TORRENT_VERBOSE_LOGGING
01847 (*m_logger) << "**ERROR**: " << error.message() << "\n";
01848 #endif
01849 on_receive(error, bytes_transferred);
01850 throw std::runtime_error(error.message());
01851 }
01852
01853 if (m_disconnecting) return;
01854
01855 assert(m_packet_size > 0);
01856 assert(bytes_transferred > 0);
01857
01858 m_last_receive = second_clock::universal_time();
01859 m_recv_pos += bytes_transferred;
01860 assert(m_recv_pos <= int(m_recv_buffer.size()));
01861
01862 {
01863 INVARIANT_CHECK;
01864 on_receive(error, bytes_transferred);
01865 }
01866
01867 assert(m_packet_size > 0);
01868
01869 setup_receive();
01870 }
01871 catch (file_error& e)
01872 {
01873 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
01874
01875 boost::shared_ptr<torrent> t = m_torrent.lock();
01876 if (!t)
01877 {
01878 m_ses.connection_failed(m_socket, remote(), e.what());
01879 return;
01880 }
01881
01882 if (t->alerts().should_post(alert::fatal))
01883 {
01884 t->alerts().post_alert(
01885 file_error_alert(t->get_handle()
01886 , std::string("torrent paused: ") + e.what()));
01887 }
01888 t->pause();
01889 }
01890 catch (std::exception& e)
01891 {
01892 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
01893 m_ses.connection_failed(m_socket, remote(), e.what());
01894 }
01895 catch (...)
01896 {
01897
01898 assert(false);
01899 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
01900 m_ses.connection_failed(m_socket, remote(), "connection failed for unkown reason");
01901 }
01902
01903 bool peer_connection::can_write() const
01904 {
01905 INVARIANT_CHECK;
01906
01907
01908
01909 return (!m_send_buffer[m_current_send_buffer].empty()
01910 || !m_send_buffer[(m_current_send_buffer + 1) & 1].empty())
01911 && m_bandwidth_limit[upload_channel].quota_left() > 0
01912 && !m_connecting;
01913 }
01914
01915 bool peer_connection::can_read() const
01916 {
01917 INVARIANT_CHECK;
01918
01919 return m_bandwidth_limit[download_channel].quota_left() > 0
01920 && !m_connecting;
01921 }
01922
01923 void peer_connection::connect()
01924 {
01925 INVARIANT_CHECK;
01926
01927 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
01928 (*m_ses.m_logger) << "CONNECTING: " << m_remote.address().to_string() << "\n";
01929 #endif
01930
01931 boost::shared_ptr<torrent> t = m_torrent.lock();
01932 assert(t);
01933
01934 m_queued = false;
01935 assert(m_connecting);
01936 m_socket->open(t->get_interface().protocol());
01937 m_socket->bind(t->get_interface());
01938 if (m_remote_proxy != tcp::endpoint())
01939 {
01940 m_socket->async_connect(m_remote_proxy
01941 , bind(&peer_connection::on_connection_complete, self(), _1));
01942 }
01943 else
01944 {
01945 m_socket->async_connect(m_remote
01946 , bind(&peer_connection::on_connection_complete, self(), _1));
01947 }
01948
01949 if (t->alerts().should_post(alert::debug))
01950 {
01951 t->alerts().post_alert(peer_error_alert(
01952 m_remote, m_peer_id, "connecting to peer"));
01953 }
01954 }
01955
01956 void peer_connection::on_connection_complete(asio::error_code const& e) try
01957 {
01958 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
01959
01960 INVARIANT_CHECK;
01961
01962 if (e)
01963 {
01964 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
01965 (*m_ses.m_logger) << "CONNECTION FAILED: " << m_remote.address().to_string()
01966 << ": " << e.message() << "\n";
01967 #endif
01968 m_ses.connection_failed(m_socket, m_remote, e.message().c_str());
01969 return;
01970 }
01971
01972 if (m_disconnecting) return;
01973 m_last_receive = second_clock::universal_time();
01974
01975
01976
01977 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
01978 (*m_ses.m_logger) << "COMPLETED: " << m_remote.address().to_string() << "\n";
01979 #endif
01980
01981 m_ses.connection_completed(self());
01982 m_connecting = false;
01983 on_connected();
01984 setup_send();
01985 setup_receive();
01986 }
01987 catch (std::exception& ex)
01988 {
01989 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
01990 m_ses.connection_failed(m_socket, remote(), ex.what());
01991 }
01992 catch (...)
01993 {
01994
01995 assert(false);
01996 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
01997 m_ses.connection_failed(m_socket, remote(), "connection failed for unkown reason");
01998 }
01999
02000
02001
02002
02003
02004
02005 void peer_connection::on_send_data(asio::error_code const& error
02006 , std::size_t bytes_transferred) try
02007 {
02008 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
02009
02010 INVARIANT_CHECK;
02011
02012 assert(m_writing);
02013 m_writing = false;
02014
02015 m_bandwidth_limit[upload_channel].use_quota(bytes_transferred);
02016 m_write_pos += bytes_transferred;
02017
02018 if (error)
02019 {
02020 #ifdef TORRENT_VERBOSE_LOGGING
02021 (*m_logger) << "**ERROR**: " << error.message() << "\n";
02022 #endif
02023 throw std::runtime_error(error.message());
02024 }
02025 if (m_disconnecting) return;
02026
02027 assert(!m_connecting);
02028 assert(bytes_transferred > 0);
02029
02030 int sending_buffer = (m_current_send_buffer + 1) & 1;
02031
02032 assert(int(m_send_buffer[sending_buffer].size()) >= m_write_pos);
02033 if (int(m_send_buffer[sending_buffer].size()) == m_write_pos)
02034 {
02035 m_send_buffer[sending_buffer].clear();
02036 m_write_pos = 0;
02037 }
02038
02039 m_last_sent = second_clock::universal_time();
02040
02041 on_sent(error, bytes_transferred);
02042 fill_send_buffer();
02043 setup_send();
02044 }
02045 catch (std::exception& e)
02046 {
02047 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
02048 m_ses.connection_failed(m_socket, remote(), e.what());
02049 }
02050 catch (...)
02051 {
02052
02053 assert(false);
02054 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
02055 m_ses.connection_failed(m_socket, remote(), "connection failed for unkown reason");
02056 }
02057
02058 #ifndef NDEBUG
02059 void peer_connection::check_invariant() const
02060 {
02061 boost::shared_ptr<torrent> t = m_torrent.lock();
02062 if (!t)
02063 {
02064 typedef session_impl::torrent_map torrent_map;
02065 torrent_map& m = m_ses.m_torrents;
02066 for (torrent_map::iterator i = m.begin(), end(m.end()); i != end; ++i)
02067 {
02068 torrent& t = *i->second;
02069 assert(t.connection_for(m_remote) != this);
02070 }
02071 return;
02072 }
02073
02074 if (!m_in_constructor && t->connection_for(remote()) != this)
02075 {
02076 assert(false);
02077 }
02078
02079 assert(m_write_pos <= int(m_send_buffer[
02080 (m_current_send_buffer + 1) & 1].size()));
02081
02082 }
02083 #endif
02084
02085 bool peer_connection::has_timed_out() const
02086 {
02087
02088 INVARIANT_CHECK;
02089
02090 #ifndef NDEBUG
02091
02092 return false;
02093 #endif
02094
02095 using namespace boost::posix_time;
02096
02097 ptime now(second_clock::universal_time());
02098
02099
02100
02101
02102 if (m_connecting) return false;
02103
02104
02105
02106 time_duration d;
02107 d = second_clock::universal_time() - m_last_receive;
02108 if (d > seconds(m_timeout)) return true;
02109
02110
02111
02112
02113 time_duration d1;
02114 time_duration d2;
02115 d1 = now - m_became_uninterested;
02116 d2 = now - m_became_uninteresting;
02117
02118 if (!m_interesting
02119 && !m_peer_interested
02120 && d1 > minutes(10)
02121 && d2 > minutes(10))
02122 {
02123 return true;
02124 }
02125
02126 return false;
02127 }
02128
02129 void peer_connection::keep_alive()
02130 {
02131 INVARIANT_CHECK;
02132
02133 boost::posix_time::time_duration d;
02134 d = second_clock::universal_time() - m_last_sent;
02135 if (d.total_seconds() < m_timeout / 2) return;
02136
02137 if (m_connecting) return;
02138
02139 write_keepalive();
02140 }
02141
02142 bool peer_connection::is_seed() const
02143 {
02144 INVARIANT_CHECK;
02145
02146
02147 return m_num_pieces == (int)m_have_piece.size() && m_num_pieces > 0;
02148 }
02149 }
02150