00001
00002
00003
00004
00005
00006
00007
00008 #include <vector>
00009 #include <iostream>
00010 #include <iomanip>
00011 #include <limits>
00012 #include <boost/bind.hpp>
00013
00014 #include "libtorrent/bt_peer_connection.hpp"
00015 #include "libtorrent/session.hpp"
00016 #include "libtorrent/identify_client.hpp"
00017 #include "libtorrent/entry.hpp"
00018 #include "libtorrent/bencode.hpp"
00019 #include "libtorrent/alert_types.hpp"
00020 #include "libtorrent/invariant_check.hpp"
00021 #include "libtorrent/io.hpp"
00022 #include "libtorrent/version.hpp"
00023 #include "libtorrent/extensions.hpp"
00024 #include "libtorrent/aux_/session_impl.hpp"
00025
00026 using namespace boost::posix_time;
00027 using boost::bind;
00028 using boost::shared_ptr;
00029 using libtorrent::aux::session_impl;
00030
00031 namespace libtorrent
00032 {
00033
00034 const bt_peer_connection::message_handler
00035 bt_peer_connection::m_message_handler[] =
00036 {
00037 &bt_peer_connection::on_choke,
00038 &bt_peer_connection::on_unchoke,
00039 &bt_peer_connection::on_interested,
00040 &bt_peer_connection::on_not_interested,
00041 &bt_peer_connection::on_have,
00042 &bt_peer_connection::on_bitfield,
00043 &bt_peer_connection::on_request,
00044 &bt_peer_connection::on_piece,
00045 &bt_peer_connection::on_cancel,
00046 &bt_peer_connection::on_dht_port,
00047 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
00048 &bt_peer_connection::on_extended
00049 };
00050
00051 bt_peer_connection::bt_peer_connection(
00052 session_impl& ses
00053 , boost::weak_ptr<torrent> tor
00054 , shared_ptr<stream_socket> s
00055 , tcp::endpoint const& remote)
00056 : peer_connection(ses, tor, s, remote, tcp::endpoint())
00057 , m_state(read_protocol_length)
00058 #ifndef TORRENT_DISABLE_EXTENSIONS
00059 , m_supports_extensions(false)
00060 #endif
00061 , m_supports_dht_port(false)
00062 #ifndef NDEBUG
00063 , m_sent_bitfield(false)
00064 , m_in_constructor(true)
00065 #endif
00066 {
00067 #ifdef TORRENT_VERBOSE_LOGGING
00068 (*m_logger) << "*** bt_peer_connection\n";
00069 #endif
00070
00071 write_handshake();
00072
00073
00074
00075 reset_recv_buffer(1);
00076
00077
00078 boost::shared_ptr<torrent> t = associated_torrent().lock();
00079 assert(t);
00080
00081 if (t->ready_for_connections())
00082 write_bitfield(t->pieces());
00083
00084 setup_send();
00085 setup_receive();
00086 #ifndef NDEBUG
00087 m_in_constructor = false;
00088 #endif
00089 }
00090
00091 bt_peer_connection::bt_peer_connection(
00092 session_impl& ses
00093 , boost::shared_ptr<stream_socket> s)
00094 : peer_connection(ses, s)
00095 , m_state(read_protocol_length)
00096 #ifndef TORRENT_DISABLE_EXTENSIONS
00097 , m_supports_extensions(false)
00098 #endif
00099 , m_supports_dht_port(false)
00100 #ifndef NDEBUG
00101 , m_sent_bitfield(false)
00102 , m_in_constructor(true)
00103 #endif
00104 {
00105
00106
00107
00108
00109
00110
00111
00112
00113 m_bandwidth_limit[download_channel].assign(80);
00114 m_bandwidth_limit[upload_channel].assign(80);
00115
00116
00117
00118 reset_recv_buffer(1);
00119 setup_receive();
00120 #ifndef NDEBUG
00121 m_in_constructor = false;
00122 #endif
00123 }
00124
00125 bt_peer_connection::~bt_peer_connection()
00126 {
00127 }
00128
00129 void bt_peer_connection::on_metadata()
00130 {
00131 boost::shared_ptr<torrent> t = associated_torrent().lock();
00132 assert(t);
00133 write_bitfield(t->pieces());
00134 }
00135
00136 void bt_peer_connection::write_dht_port(int listen_port)
00137 {
00138 INVARIANT_CHECK;
00139
00140 buffer::interval packet = allocate_send_buffer(7);
00141 detail::write_uint32(3, packet.begin);
00142 detail::write_uint8(msg_dht_port, packet.begin);
00143 detail::write_uint16(listen_port, packet.begin);
00144 assert(packet.begin == packet.end);
00145 setup_send();
00146 }
00147
00148 void bt_peer_connection::get_peer_info(peer_info& p) const
00149 {
00150 assert(!associated_torrent().expired());
00151
00152 p.down_speed = statistics().download_rate();
00153 p.up_speed = statistics().upload_rate();
00154 p.payload_down_speed = statistics().download_payload_rate();
00155 p.payload_up_speed = statistics().upload_payload_rate();
00156 p.pid = pid();
00157 p.ip = remote();
00158
00159 p.country[0] = m_country[0];
00160 p.country[1] = m_country[1];
00161
00162 p.total_download = statistics().total_payload_download();
00163 p.total_upload = statistics().total_payload_upload();
00164
00165 if (m_bandwidth_limit[upload_channel].throttle() == bandwidth_limit::inf)
00166 p.upload_limit = -1;
00167 else
00168 p.upload_limit = m_bandwidth_limit[upload_channel].throttle();
00169
00170 if (m_bandwidth_limit[download_channel].throttle() == bandwidth_limit::inf)
00171 p.download_limit = -1;
00172 else
00173 p.download_limit = m_bandwidth_limit[download_channel].throttle();
00174
00175 p.load_balancing = total_free_upload();
00176
00177 p.download_queue_length = (int)download_queue().size();
00178 p.upload_queue_length = (int)upload_queue().size();
00179
00180 if (boost::optional<piece_block_progress> ret = downloading_piece_progress())
00181 {
00182 p.downloading_piece_index = ret->piece_index;
00183 p.downloading_block_index = ret->block_index;
00184 p.downloading_progress = ret->bytes_downloaded;
00185 p.downloading_total = ret->full_block_bytes;
00186 }
00187 else
00188 {
00189 p.downloading_piece_index = -1;
00190 p.downloading_block_index = -1;
00191 p.downloading_progress = 0;
00192 p.downloading_total = 0;
00193 }
00194
00195 p.flags = 0;
00196 if (is_interesting()) p.flags |= peer_info::interesting;
00197 if (is_choked()) p.flags |= peer_info::choked;
00198 if (is_peer_interested()) p.flags |= peer_info::remote_interested;
00199 if (has_peer_choked()) p.flags |= peer_info::remote_choked;
00200 if (support_extensions()) p.flags |= peer_info::supports_extensions;
00201 if (is_local()) p.flags |= peer_info::local_connection;
00202 if (!is_connecting() && m_state < read_packet_size)
00203 p.flags |= peer_info::handshake;
00204 if (is_connecting() && !is_queued()) p.flags |= peer_info::connecting;
00205 if (is_queued()) p.flags |= peer_info::queued;
00206
00207 p.pieces = get_bitfield();
00208 p.seed = is_seed();
00209
00210 p.client = m_client_version;
00211 p.connection_type = peer_info::standard_bittorrent;
00212 }
00213
00214 bool bt_peer_connection::in_handshake() const
00215 {
00216 return m_state < read_packet_size;
00217 }
00218
00219 void bt_peer_connection::write_handshake()
00220 {
00221 INVARIANT_CHECK;
00222
00223 boost::shared_ptr<torrent> t = associated_torrent().lock();
00224 assert(t);
00225
00226
00227 const char version_string[] = "BitTorrent protocol";
00228 const int string_len = sizeof(version_string)-1;
00229
00230 buffer::interval i = allocate_send_buffer(1 + string_len + 8 + 20 + 20);
00231
00232 *i.begin = string_len;
00233 ++i.begin;
00234
00235
00236 std::copy(
00237 version_string
00238 , version_string + string_len
00239 , i.begin);
00240 i.begin += string_len;
00241
00242
00243 std::fill(i.begin, i.begin + 8, 0);
00244
00245 #ifndef TORRENT_DISABLE_DHT
00246
00247 *(i.begin + 7) = 0x01;
00248 #endif
00249
00250 #ifndef TORRENT_DISABLE_EXTENSIONS
00251
00252 *(i.begin + 5) = 0x10;
00253 #endif
00254
00255 i.begin += 8;
00256
00257
00258 sha1_hash const& ih = t->torrent_file().info_hash();
00259 std::copy(ih.begin(), ih.end(), i.begin);
00260 i.begin += 20;
00261
00262
00263 std::copy(
00264 m_ses.get_peer_id().begin()
00265 , m_ses.get_peer_id().end()
00266 , i.begin);
00267 i.begin += 20;
00268 assert(i.begin == i.end);
00269
00270 #ifdef TORRENT_VERBOSE_LOGGING
00271 using namespace boost::posix_time;
00272 (*m_logger) << to_simple_string(second_clock::universal_time())
00273 << " ==> HANDSHAKE\n";
00274 #endif
00275 setup_send();
00276 }
00277
00278 boost::optional<piece_block_progress> bt_peer_connection::downloading_piece_progress() const
00279 {
00280 boost::shared_ptr<torrent> t = associated_torrent().lock();
00281 assert(t);
00282
00283 buffer::const_interval recv_buffer = receive_buffer();
00284
00285 if (m_state != read_packet
00286 || recv_buffer.left() < 9
00287 || recv_buffer[0] != msg_piece)
00288 return boost::optional<piece_block_progress>();
00289
00290 const char* ptr = recv_buffer.begin + 1;
00291 peer_request r;
00292 r.piece = detail::read_int32(ptr);
00293 r.start = detail::read_int32(ptr);
00294 r.length = packet_size() - 9;
00295
00296
00297 if (!verify_piece(r))
00298 return boost::optional<piece_block_progress>();
00299
00300 piece_block_progress p;
00301
00302 p.piece_index = r.piece;
00303 p.block_index = r.start / t->block_size();
00304 p.bytes_downloaded = recv_buffer.left() - 9;
00305 p.full_block_bytes = r.length;
00306
00307 return boost::optional<piece_block_progress>(p);
00308 }
00309
00310
00311
00312
00313
00314
00315
00316 void bt_peer_connection::on_keepalive()
00317 {
00318 INVARIANT_CHECK;
00319
00320 #ifdef TORRENT_VERBOSE_LOGGING
00321 using namespace boost::posix_time;
00322 (*m_logger) << to_simple_string(second_clock::universal_time())
00323 << " <== KEEPALIVE\n";
00324 #endif
00325 incoming_keepalive();
00326 }
00327
00328
00329
00330
00331
00332 void bt_peer_connection::on_choke(int received)
00333 {
00334 INVARIANT_CHECK;
00335
00336 assert(received > 0);
00337 if (packet_size() != 1)
00338 throw protocol_error("'choke' message size != 1");
00339 m_statistics.received_bytes(0, received);
00340 if (!packet_finished()) return;
00341
00342 #ifndef TORRENT_DISABLE_EXTENSIONS
00343 for (extension_list_t::iterator i = m_extensions.begin()
00344 , end(m_extensions.end()); i != end; ++i)
00345 {
00346 if ((*i)->on_choke()) return;
00347 }
00348 #endif
00349
00350 incoming_choke();
00351 }
00352
00353
00354
00355
00356
00357 void bt_peer_connection::on_unchoke(int received)
00358 {
00359 INVARIANT_CHECK;
00360
00361 assert(received > 0);
00362 if (packet_size() != 1)
00363 throw protocol_error("'unchoke' message size != 1");
00364 m_statistics.received_bytes(0, received);
00365 if (!packet_finished()) return;
00366
00367 #ifndef TORRENT_DISABLE_EXTENSIONS
00368 for (extension_list_t::iterator i = m_extensions.begin()
00369 , end(m_extensions.end()); i != end; ++i)
00370 {
00371 if ((*i)->on_unchoke()) return;
00372 }
00373 #endif
00374
00375 incoming_unchoke();
00376 }
00377
00378
00379
00380
00381
00382 void bt_peer_connection::on_interested(int received)
00383 {
00384 INVARIANT_CHECK;
00385
00386 assert(received > 0);
00387 if (packet_size() != 1)
00388 throw protocol_error("'interested' message size != 1");
00389 m_statistics.received_bytes(0, received);
00390 if (!packet_finished()) return;
00391
00392 #ifndef TORRENT_DISABLE_EXTENSIONS
00393 for (extension_list_t::iterator i = m_extensions.begin()
00394 , end(m_extensions.end()); i != end; ++i)
00395 {
00396 if ((*i)->on_interested()) return;
00397 }
00398 #endif
00399
00400 incoming_interested();
00401 }
00402
00403
00404
00405
00406
00407 void bt_peer_connection::on_not_interested(int received)
00408 {
00409 INVARIANT_CHECK;
00410
00411 assert(received > 0);
00412 if (packet_size() != 1)
00413 throw protocol_error("'not interested' message size != 1");
00414 m_statistics.received_bytes(0, received);
00415 if (!packet_finished()) return;
00416
00417 #ifndef TORRENT_DISABLE_EXTENSIONS
00418 for (extension_list_t::iterator i = m_extensions.begin()
00419 , end(m_extensions.end()); i != end; ++i)
00420 {
00421 if ((*i)->on_not_interested()) return;
00422 }
00423 #endif
00424
00425 incoming_not_interested();
00426 }
00427
00428
00429
00430
00431
00432 void bt_peer_connection::on_have(int received)
00433 {
00434 INVARIANT_CHECK;
00435
00436 assert(received > 0);
00437 if (packet_size() != 5)
00438 throw protocol_error("'have' message size != 5");
00439 m_statistics.received_bytes(0, received);
00440 if (!packet_finished()) return;
00441
00442 buffer::const_interval recv_buffer = receive_buffer();
00443
00444 const char* ptr = recv_buffer.begin + 1;
00445 int index = detail::read_int32(ptr);
00446
00447 #ifndef TORRENT_DISABLE_EXTENSIONS
00448 for (extension_list_t::iterator i = m_extensions.begin()
00449 , end(m_extensions.end()); i != end; ++i)
00450 {
00451 if ((*i)->on_have(index)) return;
00452 }
00453 #endif
00454
00455 incoming_have(index);
00456 }
00457
00458
00459
00460
00461
00462 void bt_peer_connection::on_bitfield(int received)
00463 {
00464 INVARIANT_CHECK;
00465
00466 assert(received > 0);
00467
00468 boost::shared_ptr<torrent> t = associated_torrent().lock();
00469 assert(t);
00470
00471
00472
00473 if (t->valid_metadata()
00474 && packet_size() - 1 != ((int)get_bitfield().size() + 7) / 8)
00475 throw protocol_error("bitfield with invalid size");
00476
00477 m_statistics.received_bytes(0, received);
00478 if (!packet_finished()) return;
00479
00480 buffer::const_interval recv_buffer = receive_buffer();
00481
00482 std::vector<bool> bitfield;
00483
00484 if (!t->valid_metadata())
00485 bitfield.resize((packet_size() - 1) * 8);
00486 else
00487 bitfield.resize(get_bitfield().size());
00488
00489
00490
00491
00492
00493 for (int i = 0; i < (int)bitfield.size(); ++i)
00494 bitfield[i] = (recv_buffer[1 + (i>>3)] & (1 << (7 - (i&7)))) != 0;
00495
00496 #ifndef TORRENT_DISABLE_EXTENSIONS
00497 for (extension_list_t::iterator i = m_extensions.begin()
00498 , end(m_extensions.end()); i != end; ++i)
00499 {
00500 if ((*i)->on_bitfield(bitfield)) return;
00501 }
00502 #endif
00503
00504 incoming_bitfield(bitfield);
00505 }
00506
00507
00508
00509
00510
00511 void bt_peer_connection::on_request(int received)
00512 {
00513 INVARIANT_CHECK;
00514
00515 assert(received > 0);
00516 if (packet_size() != 13)
00517 throw protocol_error("'request' message size != 13");
00518 m_statistics.received_bytes(0, received);
00519 if (!packet_finished()) return;
00520
00521 buffer::const_interval recv_buffer = receive_buffer();
00522
00523 peer_request r;
00524 const char* ptr = recv_buffer.begin + 1;
00525 r.piece = detail::read_int32(ptr);
00526 r.start = detail::read_int32(ptr);
00527 r.length = detail::read_int32(ptr);
00528
00529 #ifndef TORRENT_DISABLE_EXTENSIONS
00530 for (extension_list_t::iterator i = m_extensions.begin()
00531 , end(m_extensions.end()); i != end; ++i)
00532 {
00533 if ((*i)->on_request(r)) return;
00534 }
00535 #endif
00536
00537 incoming_request(r);
00538 }
00539
00540
00541
00542
00543
00544 void bt_peer_connection::on_piece(int received)
00545 {
00546 INVARIANT_CHECK;
00547
00548 assert(received > 0);
00549
00550 buffer::const_interval recv_buffer = receive_buffer();
00551 int recv_pos = recv_buffer.end - recv_buffer.begin;
00552
00553
00554
00555 if (recv_pos <= 9)
00556
00557 m_statistics.received_bytes(0, received);
00558 else if (recv_pos - received >= 9)
00559
00560 m_statistics.received_bytes(received, 0);
00561 else
00562 {
00563
00564 assert(recv_pos - received < 9);
00565 assert(recv_pos > 9);
00566 assert(9 - (recv_pos - received) <= 9);
00567 m_statistics.received_bytes(
00568 recv_pos - 9
00569 , 9 - (recv_pos - received));
00570 }
00571
00572 incoming_piece_fragment();
00573 if (!packet_finished()) return;
00574
00575 const char* ptr = recv_buffer.begin + 1;
00576 peer_request p;
00577 p.piece = detail::read_int32(ptr);
00578 p.start = detail::read_int32(ptr);
00579 p.length = packet_size() - 9;
00580
00581 #ifndef TORRENT_DISABLE_EXTENSIONS
00582 for (extension_list_t::iterator i = m_extensions.begin()
00583 , end(m_extensions.end()); i != end; ++i)
00584 {
00585 if ((*i)->on_piece(p, recv_buffer.begin + 9)) return;
00586 }
00587 #endif
00588
00589 incoming_piece(p, recv_buffer.begin + 9);
00590 }
00591
00592
00593
00594
00595
00596 void bt_peer_connection::on_cancel(int received)
00597 {
00598 INVARIANT_CHECK;
00599
00600 assert(received > 0);
00601 if (packet_size() != 13)
00602 throw protocol_error("'cancel' message size != 13");
00603 m_statistics.received_bytes(0, received);
00604 if (!packet_finished()) return;
00605
00606 buffer::const_interval recv_buffer = receive_buffer();
00607
00608 peer_request r;
00609 const char* ptr = recv_buffer.begin + 1;
00610 r.piece = detail::read_int32(ptr);
00611 r.start = detail::read_int32(ptr);
00612 r.length = detail::read_int32(ptr);
00613
00614 #ifndef TORRENT_DISABLE_EXTENSIONS
00615 for (extension_list_t::iterator i = m_extensions.begin()
00616 , end(m_extensions.end()); i != end; ++i)
00617 {
00618 if ((*i)->on_cancel(r)) return;
00619 }
00620 #endif
00621
00622 incoming_cancel(r);
00623 }
00624
00625
00626
00627
00628
00629 void bt_peer_connection::on_dht_port(int received)
00630 {
00631 INVARIANT_CHECK;
00632
00633 assert(received > 0);
00634 if (packet_size() != 3)
00635 throw protocol_error("'dht_port' message size != 3");
00636 m_statistics.received_bytes(0, received);
00637 if (!packet_finished()) return;
00638
00639 buffer::const_interval recv_buffer = receive_buffer();
00640
00641 const char* ptr = recv_buffer.begin + 1;
00642 int listen_port = detail::read_uint16(ptr);
00643
00644 incoming_dht_port(listen_port);
00645 }
00646
00647
00648
00649
00650
00651 void bt_peer_connection::on_extended(int received)
00652 {
00653 INVARIANT_CHECK;
00654
00655 assert(received > 0);
00656 m_statistics.received_bytes(0, received);
00657 if (packet_size() < 2)
00658 throw protocol_error("'extended' message smaller than 2 bytes");
00659
00660 if (associated_torrent().expired())
00661 throw protocol_error("'extended' message sent before proper handshake");
00662
00663 buffer::const_interval recv_buffer = receive_buffer();
00664 if (recv_buffer.left() < 2) return;
00665
00666 assert(*recv_buffer.begin == msg_extended);
00667 ++recv_buffer.begin;
00668
00669 int extended_id = detail::read_uint8(recv_buffer.begin);
00670
00671 if (extended_id == 0)
00672 {
00673 on_extended_handshake();
00674 return;
00675 }
00676
00677 #ifndef TORRENT_DISABLE_EXTENSIONS
00678 for (extension_list_t::iterator i = m_extensions.begin()
00679 , end(m_extensions.end()); i != end; ++i)
00680 {
00681 if ((*i)->on_extended(packet_size() - 2, extended_id
00682 , recv_buffer))
00683 return;
00684 }
00685 #endif
00686
00687 throw protocol_error("unknown extended message id: "
00688 + boost::lexical_cast<std::string>(extended_id));
00689 }
00690
00691 void bt_peer_connection::on_extended_handshake()
00692 {
00693 if (!packet_finished()) return;
00694
00695 boost::shared_ptr<torrent> t = associated_torrent().lock();
00696 assert(t);
00697
00698 buffer::const_interval recv_buffer = receive_buffer();
00699
00700 entry root;
00701 try
00702 {
00703 root = bdecode(recv_buffer.begin + 2, recv_buffer.end);
00704 }
00705 catch (std::exception& exc)
00706 {
00707 #ifdef TORRENT_VERBOSE_LOGGING
00708 (*m_logger) << "invalid extended handshake: " << exc.what() << "\n";
00709 #endif
00710 return;
00711 }
00712
00713 #ifdef TORRENT_VERBOSE_LOGGING
00714 std::stringstream ext;
00715 root.print(ext);
00716 (*m_logger) << "<== EXTENDED HANDSHAKE: \n" << ext.str();
00717 #endif
00718
00719 #ifndef TORRENT_DISABLE_EXTENSIONS
00720 for (extension_list_t::iterator i = m_extensions.begin()
00721 , end(m_extensions.end()); i != end;)
00722 {
00723
00724
00725 if (!(*i)->on_extension_handshake(root))
00726 i = m_extensions.erase(i);
00727 else
00728 ++i;
00729 }
00730 #endif
00731
00732
00733 if (entry* listen_port = root.find_key("p"))
00734 {
00735 if (listen_port->type() == entry::int_t)
00736 {
00737 tcp::endpoint adr(remote().address()
00738 , (unsigned short)listen_port->integer());
00739 t->get_policy().peer_from_tracker(adr, pid());
00740 }
00741 }
00742
00743
00744
00745 if (entry* client_info = root.find_key("v"))
00746 {
00747 if (client_info->type() == entry::string_t)
00748 m_client_version = client_info->string();
00749 }
00750
00751 if (entry* reqq = root.find_key("reqq"))
00752 {
00753 if (reqq->type() == entry::int_t)
00754 m_max_out_request_queue = reqq->integer();
00755 if (m_max_out_request_queue < 1)
00756 m_max_out_request_queue = 1;
00757 }
00758 }
00759
00760 bool bt_peer_connection::dispatch_message(int received)
00761 {
00762 INVARIANT_CHECK;
00763
00764 assert(received > 0);
00765
00766
00767 if (associated_torrent().expired()) return false;
00768
00769 buffer::const_interval recv_buffer = receive_buffer();
00770
00771 int packet_type = recv_buffer[0];
00772 if (packet_type < 0
00773 || packet_type >= num_supported_messages
00774 || m_message_handler[packet_type] == 0)
00775 {
00776 #ifndef TORRENT_DISABLE_EXTENSIONS
00777 for (extension_list_t::iterator i = m_extensions.begin()
00778 , end(m_extensions.end()); i != end; ++i)
00779 {
00780 if ((*i)->on_unknown_message(packet_size(), packet_type
00781 , buffer::const_interval(recv_buffer.begin+1
00782 , recv_buffer.end)))
00783 return packet_finished();
00784 }
00785 #endif
00786
00787 throw protocol_error("unknown message id: "
00788 + boost::lexical_cast<std::string>(packet_type)
00789 + " size: " + boost::lexical_cast<std::string>(packet_size()));
00790 }
00791
00792 assert(m_message_handler[packet_type] != 0);
00793
00794
00795 (this->*m_message_handler[packet_type])(received);
00796
00797 return packet_finished();
00798 }
00799
00800 void bt_peer_connection::write_keepalive()
00801 {
00802 INVARIANT_CHECK;
00803
00804 char buf[] = {0,0,0,0};
00805 send_buffer(buf, buf + sizeof(buf));
00806 }
00807
00808 void bt_peer_connection::write_cancel(peer_request const& r)
00809 {
00810 INVARIANT_CHECK;
00811
00812 assert(associated_torrent().lock()->valid_metadata());
00813
00814 char buf[] = {0,0,0,13, msg_cancel};
00815
00816 buffer::interval i = allocate_send_buffer(17);
00817
00818 std::copy(buf, buf + 5, i.begin);
00819 i.begin += 5;
00820
00821
00822 detail::write_int32(r.piece, i.begin);
00823
00824 detail::write_int32(r.start, i.begin);
00825
00826 detail::write_int32(r.length, i.begin);
00827 assert(i.begin == i.end);
00828
00829 setup_send();
00830 }
00831
00832 void bt_peer_connection::write_request(peer_request const& r)
00833 {
00834 INVARIANT_CHECK;
00835
00836 assert(associated_torrent().lock()->valid_metadata());
00837
00838 char buf[] = {0,0,0,13, msg_request};
00839
00840 buffer::interval i = allocate_send_buffer(17);
00841
00842 std::copy(buf, buf + 5, i.begin);
00843 i.begin += 5;
00844
00845
00846 detail::write_int32(r.piece, i.begin);
00847
00848 detail::write_int32(r.start, i.begin);
00849
00850 detail::write_int32(r.length, i.begin);
00851 assert(i.begin == i.end);
00852
00853 setup_send();
00854 }
00855
00856 void bt_peer_connection::write_bitfield(std::vector<bool> const& bitfield)
00857 {
00858 INVARIANT_CHECK;
00859
00860 boost::shared_ptr<torrent> t = associated_torrent().lock();
00861 assert(t);
00862 assert(m_sent_bitfield == false);
00863 assert(t->valid_metadata());
00864
00865 #ifdef TORRENT_VERBOSE_LOGGING
00866 using namespace boost::posix_time;
00867 (*m_logger) << to_simple_string(second_clock::universal_time())
00868 << " ==> BITFIELD ";
00869
00870 for (int i = 0; i < (int)get_bitfield().size(); ++i)
00871 {
00872 if (bitfield[i]) (*m_logger) << "1";
00873 else (*m_logger) << "0";
00874 }
00875 (*m_logger) << "\n";
00876 #endif
00877 const int packet_size = ((int)bitfield.size() + 7) / 8 + 5;
00878
00879 buffer::interval i = allocate_send_buffer(packet_size);
00880
00881 detail::write_int32(packet_size - 4, i.begin);
00882 detail::write_uint8(msg_bitfield, i.begin);
00883
00884 std::fill(i.begin, i.end, 0);
00885 for (int c = 0; c < (int)bitfield.size(); ++c)
00886 {
00887 if (bitfield[c])
00888 i.begin[c >> 3] |= 1 << (7 - (c & 7));
00889 }
00890 assert(i.end - i.begin == ((int)bitfield.size() + 7) / 8);
00891 #ifndef NDEBUG
00892 m_sent_bitfield = true;
00893 #endif
00894 setup_send();
00895 }
00896
00897 #ifndef TORRENT_DISABLE_EXTENSIONS
00898 void bt_peer_connection::write_extensions()
00899 {
00900 INVARIANT_CHECK;
00901
00902 #ifdef TORRENT_VERBOSE_LOGGING
00903 using namespace boost::posix_time;
00904 (*m_logger) << to_simple_string(second_clock::universal_time())
00905 << " ==> EXTENSIONS\n";
00906 #endif
00907 assert(m_supports_extensions);
00908
00909 entry handshake(entry::dictionary_t);
00910 entry extension_list(entry::dictionary_t);
00911
00912 handshake["m"] = extension_list;
00913
00914
00915
00916
00917 if (is_local()) handshake["p"] = m_ses.listen_port();
00918 handshake["v"] = m_ses.settings().user_agent;
00919 std::string remote_address;
00920 std::back_insert_iterator<std::string> out(remote_address);
00921 detail::write_address(remote().address(), out);
00922 handshake["ip"] = remote_address;
00923 handshake["reqq"] = m_ses.settings().max_allowed_in_request_queue;
00924
00925
00926
00927 for (extension_list_t::reverse_iterator i = m_extensions.rbegin()
00928 , end(m_extensions.rend()); i != end; ++i)
00929 {
00930 (*i)->add_handshake(handshake);
00931 }
00932
00933 std::vector<char> msg;
00934 bencode(std::back_inserter(msg), handshake);
00935
00936
00937 buffer::interval i = allocate_send_buffer(6 + msg.size());
00938
00939
00940 detail::write_int32((int)msg.size() + 2, i.begin);
00941 detail::write_uint8(msg_extended, i.begin);
00942
00943 detail::write_uint8(0, i.begin);
00944
00945 std::copy(msg.begin(), msg.end(), i.begin);
00946 i.begin += msg.size();
00947 assert(i.begin == i.end);
00948
00949 #ifdef TORRENT_VERBOSE_LOGGING
00950 std::stringstream ext;
00951 handshake.print(ext);
00952 (*m_logger) << "==> EXTENDED HANDSHAKE: \n" << ext.str();
00953 #endif
00954
00955 setup_send();
00956 }
00957 #endif
00958
00959 void bt_peer_connection::write_choke()
00960 {
00961 INVARIANT_CHECK;
00962
00963 if (is_choked()) return;
00964 char msg[] = {0,0,0,1,msg_choke};
00965 send_buffer(msg, msg + sizeof(msg));
00966 }
00967
00968 void bt_peer_connection::write_unchoke()
00969 {
00970 INVARIANT_CHECK;
00971
00972 char msg[] = {0,0,0,1,msg_unchoke};
00973 send_buffer(msg, msg + sizeof(msg));
00974 }
00975
00976 void bt_peer_connection::write_interested()
00977 {
00978 INVARIANT_CHECK;
00979
00980 char msg[] = {0,0,0,1,msg_interested};
00981 send_buffer(msg, msg + sizeof(msg));
00982 }
00983
00984 void bt_peer_connection::write_not_interested()
00985 {
00986 INVARIANT_CHECK;
00987
00988 char msg[] = {0,0,0,1,msg_not_interested};
00989 send_buffer(msg, msg + sizeof(msg));
00990 }
00991
00992 void bt_peer_connection::write_have(int index)
00993 {
00994 assert(associated_torrent().lock()->valid_metadata());
00995 assert(index >= 0);
00996 assert(index < associated_torrent().lock()->torrent_file().num_pieces());
00997 INVARIANT_CHECK;
00998
00999 const int packet_size = 9;
01000 char msg[packet_size] = {0,0,0,5,msg_have};
01001 char* ptr = msg + 5;
01002 detail::write_int32(index, ptr);
01003 send_buffer(msg, msg + packet_size);
01004 }
01005
01006 void bt_peer_connection::write_piece(peer_request const& r)
01007 {
01008 INVARIANT_CHECK;
01009
01010 const int packet_size = 4 + 5 + 4 + r.length;
01011
01012 boost::shared_ptr<torrent> t = associated_torrent().lock();
01013 assert(t);
01014
01015 buffer::interval i = allocate_send_buffer(packet_size);
01016
01017 detail::write_int32(packet_size-4, i.begin);
01018 detail::write_uint8(msg_piece, i.begin);
01019 detail::write_int32(r.piece, i.begin);
01020 detail::write_int32(r.start, i.begin);
01021
01022 t->filesystem().read(
01023 i.begin, r.piece, r.start, r.length);
01024
01025 assert(i.begin + r.length == i.end);
01026
01027 m_payloads.push_back(range(send_buffer_size() - r.length, r.length));
01028 setup_send();
01029 }
01030
01031 namespace
01032 {
01033 struct match_peer_id
01034 {
01035 match_peer_id(peer_id const& id, peer_connection const* pc)
01036 : m_id(id), m_pc(pc)
01037 { assert(pc); }
01038
01039 bool operator()(policy::peer const& p) const
01040 {
01041 return p.connection != m_pc
01042 && p.connection
01043 && p.connection->pid() == m_id
01044 && !p.connection->pid().is_all_zeros();
01045 }
01046
01047 peer_id m_id;
01048 peer_connection const* m_pc;
01049 };
01050 }
01051
01052
01053
01054
01055
01056
01057 void bt_peer_connection::on_receive(asio::error_code const& error
01058 , std::size_t bytes_transferred)
01059 {
01060 INVARIANT_CHECK;
01061
01062 if (error) return;
01063
01064 buffer::const_interval recv_buffer = receive_buffer();
01065
01066 boost::shared_ptr<torrent> t = associated_torrent().lock();
01067
01068 switch(m_state)
01069 {
01070 case read_protocol_length:
01071 {
01072 m_statistics.received_bytes(0, bytes_transferred);
01073 if (!packet_finished()) break;
01074
01075 int packet_size = recv_buffer[0];
01076
01077 #ifdef TORRENT_VERBOSE_LOGGING
01078 (*m_logger) << " protocol length: " << packet_size << "\n";
01079 #endif
01080 if (packet_size > 100 || packet_size <= 0)
01081 {
01082 std::stringstream s;
01083 s << "incorrect protocol length ("
01084 << packet_size
01085 << ") should be 19.";
01086 throw std::runtime_error(s.str());
01087 }
01088 m_state = read_protocol_string;
01089 reset_recv_buffer(packet_size);
01090 }
01091 break;
01092
01093 case read_protocol_string:
01094 {
01095 m_statistics.received_bytes(0, bytes_transferred);
01096 if (!packet_finished()) break;
01097
01098 #ifdef TORRENT_VERBOSE_LOGGING
01099 (*m_logger) << " protocol: '" << std::string(recv_buffer.begin
01100 , recv_buffer.end) << "'\n";
01101 #endif
01102 const char protocol_string[] = "BitTorrent protocol";
01103 if (recv_buffer.end - recv_buffer.begin != 19
01104 || !std::equal(recv_buffer.begin, recv_buffer.end
01105 , protocol_string))
01106 {
01107 const char cmd[] = "version";
01108 if (recv_buffer.end - recv_buffer.begin == 7 && std::equal(
01109 recv_buffer.begin, recv_buffer.end, cmd))
01110 {
01111 #ifdef TORRENT_VERBOSE_LOGGING
01112 (*m_logger) << "sending libtorrent version\n";
01113 #endif
01114 asio::write(*get_socket(), asio::buffer("libtorrent version " LIBTORRENT_VERSION "\n", 27));
01115 throw std::runtime_error("closing");
01116 }
01117 #ifdef TORRENT_VERBOSE_LOGGING
01118 (*m_logger) << "incorrect protocol name\n";
01119 #endif
01120 std::stringstream s;
01121 s << "got invalid protocol name: '"
01122 << std::string(recv_buffer.begin, recv_buffer.end)
01123 << "'";
01124 throw std::runtime_error(s.str());
01125 }
01126
01127 m_state = read_info_hash;
01128 reset_recv_buffer(28);
01129 }
01130 break;
01131
01132 case read_info_hash:
01133 {
01134 m_statistics.received_bytes(0, bytes_transferred);
01135 if (!packet_finished()) break;
01136
01137 #ifdef TORRENT_VERBOSE_LOGGING
01138 for (int i=0; i < 8; ++i)
01139 {
01140 for (int j=0; j < 8; ++j)
01141 {
01142 if (recv_buffer[i] & (0x80 >> j)) (*m_logger) << "1";
01143 else (*m_logger) << "0";
01144 }
01145 }
01146 (*m_logger) << "\n";
01147 if (recv_buffer[7] & 0x01)
01148 (*m_logger) << "supports DHT port message\n";
01149 if (recv_buffer[7] & 0x02)
01150 (*m_logger) << "supports FAST extensions\n";
01151 if (recv_buffer[5] & 0x10)
01152 (*m_logger) << "supports extensions protocol\n";
01153 #endif
01154
01155 #ifndef DISABLE_EXTENSIONS
01156 if ((recv_buffer[5] & 0x10))
01157 m_supports_extensions = true;
01158 #endif
01159 if (recv_buffer[7] & 0x01)
01160 m_supports_dht_port = true;
01161
01162
01163
01164 if (!t)
01165 {
01166
01167
01168 sha1_hash info_hash;
01169 std::copy(recv_buffer.begin + 8, recv_buffer.begin + 28
01170 , (char*)info_hash.begin());
01171
01172 attach_to_torrent(info_hash);
01173 t = associated_torrent().lock();
01174 assert(t);
01175
01176
01177
01178 write_handshake();
01179 if (t->valid_metadata())
01180 write_bitfield(t->pieces());
01181 }
01182 else
01183 {
01184
01185 if (!std::equal(recv_buffer.begin + 8, recv_buffer.begin + 28
01186 , (const char*)t->torrent_file().info_hash().begin()))
01187 {
01188 #ifdef TORRENT_VERBOSE_LOGGING
01189 (*m_logger) << " received invalid info_hash\n";
01190 #endif
01191 throw protocol_error("invalid info-hash in handshake");
01192 }
01193 }
01194
01195 assert(t->get_policy().has_connection(this));
01196
01197 m_state = read_peer_id;
01198 reset_recv_buffer(20);
01199 #ifdef TORRENT_VERBOSE_LOGGING
01200 (*m_logger) << " info_hash received\n";
01201 #endif
01202 }
01203 break;
01204
01205 case read_peer_id:
01206 {
01207 if (!t) return;
01208 m_statistics.received_bytes(0, bytes_transferred);
01209 if (!packet_finished()) break;
01210 assert(packet_size() == 20);
01211
01212 #ifdef TORRENT_VERBOSE_LOGGING
01213 {
01214 peer_id tmp;
01215 std::copy(recv_buffer.begin, recv_buffer.begin + 20, (char*)tmp.begin());
01216 std::stringstream s;
01217 s << "received peer_id: " << tmp << " client: " << identify_client(tmp) << "\n";
01218 s << "as ascii: ";
01219 for (peer_id::iterator i = tmp.begin(); i != tmp.end(); ++i)
01220 {
01221 if (std::isprint(*i)) s << *i;
01222 else s << ".";
01223 }
01224 s << "\n";
01225 (*m_logger) << s.str();
01226 }
01227 #endif
01228 peer_id pid;
01229 std::copy(recv_buffer.begin, recv_buffer.begin + 20, (char*)pid.begin());
01230 set_pid(pid);
01231
01232 if (t->settings().allow_multiple_connections_per_ip)
01233 {
01234
01235 policy& p = t->get_policy();
01236 policy::iterator i = std::find_if(p.begin_peer(), p.end_peer()
01237 , match_peer_id(pid, this));
01238 if (i != p.end_peer())
01239 {
01240 assert(i->connection->pid() == pid);
01241
01242
01243
01244
01245
01246
01247
01248 if (pid < m_ses.get_peer_id() && is_local())
01249 {
01250 i->connection->disconnect();
01251 }
01252 else
01253 {
01254 throw protocol_error("duplicate peer-id, connection closed");
01255 }
01256 }
01257
01258 }
01259
01260 #ifndef TORRENT_DISABLE_DHT
01261 if (m_supports_dht_port && m_ses.m_dht)
01262 write_dht_port(m_ses.kad_settings().service_port);
01263 #endif
01264
01265 m_client_version = identify_client(pid);
01266 boost::optional<fingerprint> f = client_fingerprint(pid);
01267 if (f && std::equal(f->name, f->name + 2, "BC"))
01268 {
01269
01270 if (m_max_out_request_queue > 50) m_max_out_request_queue = 50;
01271 }
01272
01273
01274
01275 if (pid == m_ses.get_peer_id())
01276 throw std::runtime_error("closing connection to ourself");
01277
01278 #ifndef TORRENT_DISABLE_EXTENSIONS
01279 for (extension_list_t::iterator i = m_extensions.begin()
01280 , end(m_extensions.end()); i != end;)
01281 {
01282 if (!(*i)->on_handshake())
01283 {
01284 i = m_extensions.erase(i);
01285 }
01286 else
01287 {
01288 ++i;
01289 }
01290 }
01291
01292 if (m_supports_extensions) write_extensions();
01293 #endif
01294
01295 m_state = read_packet_size;
01296 reset_recv_buffer(4);
01297 }
01298 break;
01299
01300 case read_packet_size:
01301 {
01302 if (!t) return;
01303 m_statistics.received_bytes(0, bytes_transferred);
01304 if (!packet_finished()) break;
01305
01306 const char* ptr = recv_buffer.begin;
01307 int packet_size = detail::read_int32(ptr);
01308
01309
01310 if (packet_size > 1024*1024 || packet_size < 0)
01311 {
01312
01313 throw std::runtime_error("packet > 1 MB ("
01314 + boost::lexical_cast<std::string>(
01315 (unsigned int)packet_size) + " bytes)");
01316 }
01317
01318 if (packet_size == 0)
01319 {
01320 incoming_keepalive();
01321
01322 m_state = read_packet_size;
01323 reset_recv_buffer(4);
01324 }
01325 else
01326 {
01327 m_state = read_packet;
01328 reset_recv_buffer(packet_size);
01329 }
01330 }
01331 break;
01332
01333 case read_packet:
01334 {
01335 if (!t) return;
01336 if (dispatch_message(bytes_transferred))
01337 {
01338 m_state = read_packet_size;
01339 reset_recv_buffer(4);
01340 }
01341 }
01342 break;
01343
01344 }
01345 }
01346
01347
01348
01349
01350
01351
01352 void bt_peer_connection::on_sent(asio::error_code const& error
01353 , std::size_t bytes_transferred)
01354 {
01355 INVARIANT_CHECK;
01356
01357 if (error) return;
01358
01359
01360 int amount_payload = 0;
01361 if (!m_payloads.empty())
01362 {
01363 for (std::deque<range>::iterator i = m_payloads.begin();
01364 i != m_payloads.end(); ++i)
01365 {
01366 i->start -= bytes_transferred;
01367 if (i->start < 0)
01368 {
01369 if (i->start + i->length <= 0)
01370 {
01371 amount_payload += i->length;
01372 }
01373 else
01374 {
01375 amount_payload += -i->start;
01376 i->length -= -i->start;
01377 i->start = 0;
01378 }
01379 }
01380 }
01381 }
01382
01383
01384
01385 m_payloads.erase(
01386 std::remove_if(m_payloads.begin(), m_payloads.end(), range_below_zero)
01387 , m_payloads.end());
01388
01389 assert(amount_payload <= (int)bytes_transferred);
01390 m_statistics.sent_bytes(amount_payload, bytes_transferred - amount_payload);
01391 }
01392
01393 #ifndef NDEBUG
01394 void bt_peer_connection::check_invariant() const
01395 {
01396 if (!m_in_constructor)
01397 peer_connection::check_invariant();
01398
01399 if (!m_payloads.empty())
01400 {
01401 for (std::deque<range>::const_iterator i = m_payloads.begin();
01402 i != m_payloads.end() - 1; ++i)
01403 {
01404 assert(i->start + i->length <= (i+1)->start);
01405 }
01406 }
01407 }
01408 #endif
01409
01410 }
01411