00001
00002
00003
00004
00005
00006
00007
00008 #include <iostream>
00009
00010 #include "libtorrent/peer_connection.hpp"
00011
00012 #ifdef _MSC_VER
00013 #pragma warning(push, 1)
00014 #endif
00015
00016 #include <boost/date_time/posix_time/posix_time.hpp>
00017 #include <boost/bind.hpp>
00018
00019 #ifdef _MSC_VER
00020 #pragma warning(pop)
00021 #endif
00022
00023 #include "libtorrent/web_peer_connection.hpp"
00024 #include "libtorrent/policy.hpp"
00025 #include "libtorrent/torrent.hpp"
00026 #include "libtorrent/socket.hpp"
00027 #include "libtorrent/alert_types.hpp"
00028 #include "libtorrent/invariant_check.hpp"
00029 #include "libtorrent/aux_/session_impl.hpp"
00030
00031 namespace libtorrent
00032 {
00033 class peer_connection;
00034 }
00035
00036 using namespace boost::posix_time;
00037 using boost::bind;
00038
00039 namespace
00040 {
00041 using namespace libtorrent;
00042
00043 size_type collect_free_download(
00044 torrent::peer_iterator start
00045 , torrent::peer_iterator end)
00046 {
00047 size_type accumulator = 0;
00048 for (torrent::peer_iterator i = start; i != end; ++i)
00049 {
00050
00051
00052
00053
00054 size_type diff = i->second->share_diff();
00055 assert(diff < std::numeric_limits<size_type>::max());
00056 if (i->second->is_peer_interested() || diff <= 0)
00057 continue;
00058
00059 assert(diff > 0);
00060 i->second->add_free_upload(-diff);
00061 accumulator += diff;
00062 assert(accumulator > 0);
00063 }
00064 assert(accumulator >= 0);
00065 return accumulator;
00066 }
00067
00068
00069
00070 size_type distribute_free_upload(
00071 torrent::peer_iterator start
00072 , torrent::peer_iterator end
00073 , size_type free_upload)
00074 {
00075 if (free_upload <= 0) return free_upload;
00076 int num_peers = 0;
00077 size_type total_diff = 0;
00078 for (torrent::peer_iterator i = start; i != end; ++i)
00079 {
00080 size_type d = i->second->share_diff();
00081 assert(d < std::numeric_limits<size_type>::max());
00082 total_diff += d;
00083 if (!i->second->is_peer_interested() || i->second->share_diff() >= 0) continue;
00084 ++num_peers;
00085 }
00086
00087 if (num_peers == 0) return free_upload;
00088 size_type upload_share;
00089 if (total_diff >= 0)
00090 {
00091 upload_share = std::min(free_upload, total_diff) / num_peers;
00092 }
00093 else
00094 {
00095 upload_share = (free_upload + total_diff) / num_peers;
00096 }
00097 if (upload_share < 0) return free_upload;
00098
00099 for (torrent::peer_iterator i = start; i != end; ++i)
00100 {
00101 peer_connection* p = i->second;
00102 if (!p->is_peer_interested() || p->share_diff() >= 0) continue;
00103 p->add_free_upload(upload_share);
00104 free_upload -= upload_share;
00105 }
00106 return free_upload;
00107 }
00108
00109 struct match_peer_ip
00110 {
00111 match_peer_ip(tcp::endpoint const& ip)
00112 : m_ip(ip)
00113 {}
00114
00115 bool operator()(policy::peer const& p) const
00116 { return p.ip.address() == m_ip.address(); }
00117
00118 tcp::endpoint m_ip;
00119 };
00120
00121 struct match_peer_connection
00122 {
00123 match_peer_connection(peer_connection const& c)
00124 : m_conn(c)
00125 {}
00126
00127 bool operator()(policy::peer const& p) const
00128 { return p.connection == &m_conn; }
00129
00130 const peer_connection& m_conn;
00131 };
00132
00133 }
00134
00135 namespace libtorrent
00136 {
00137
00138
00139
00140
00141 void request_a_block(
00142 torrent& t
00143 , peer_connection& c
00144 , std::vector<peer_connection*> ignore)
00145 {
00146 assert(!t.is_seed());
00147 assert(!c.has_peer_choked());
00148 int num_requests = c.desired_queue_size()
00149 - (int)c.download_queue().size()
00150 - (int)c.request_queue().size();
00151
00152 assert(c.desired_queue_size() > 0);
00153
00154
00155 if (num_requests <= 0) return;
00156
00157 piece_picker& p = t.picker();
00158 std::vector<piece_block> interesting_pieces;
00159 interesting_pieces.reserve(100);
00160
00161 bool prefer_whole_pieces = c.prefer_whole_pieces();
00162 if (!prefer_whole_pieces)
00163 {
00164 prefer_whole_pieces = c.statistics().download_payload_rate()
00165 * t.settings().whole_pieces_threshold
00166 > t.torrent_file().piece_length();
00167 }
00168
00169
00170
00171
00172
00173 assert((c.proxy() == tcp::endpoint() && c.remote() == c.get_socket()->remote_endpoint())
00174 || c.proxy() == c.get_socket()->remote_endpoint());
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184 p.pick_pieces(c.get_bitfield(), interesting_pieces
00185 , num_requests, prefer_whole_pieces, c.remote());
00186
00187
00188
00189
00190
00191
00192 std::vector<piece_block> busy_pieces;
00193 busy_pieces.reserve(10);
00194
00195 for (std::vector<piece_block>::iterator i = interesting_pieces.begin();
00196 i != interesting_pieces.end(); ++i)
00197 {
00198 if (p.is_downloading(*i))
00199 {
00200 busy_pieces.push_back(*i);
00201 continue;
00202 }
00203
00204
00205
00206
00207 c.add_request(*i);
00208 num_requests--;
00209 }
00210
00211 c.send_block_requests();
00212
00213
00214
00215
00216
00217
00218 if (busy_pieces.empty()) return;
00219
00220
00221
00222
00223
00224
00225 while (num_requests > 0)
00226 {
00227 peer_connection* peer = 0;
00228
00229 const int initial_queue_size = (int)c.download_queue().size()
00230 + (int)c.request_queue().size();
00231
00232
00233
00234 float min_weight = initial_queue_size == 0
00235 ? std::numeric_limits<float>::max()
00236 : c.statistics().download_payload_rate() / initial_queue_size;
00237
00238
00239
00240
00241 for (torrent::peer_iterator i = t.begin();
00242 i != t.end(); ++i)
00243 {
00244
00245 if (i->second == &c)
00246 continue;
00247
00248
00249 if (std::find(ignore.begin(), ignore.end(), i->second) != ignore.end())
00250 continue;
00251
00252 const std::deque<piece_block>& download_queue = i->second->download_queue();
00253 const std::deque<piece_block>& request_queue = i->second->request_queue();
00254 const int queue_size = (int)i->second->download_queue().size()
00255 + (int)i->second->request_queue().size();
00256
00257 bool in_request_queue = std::find_first_of(
00258 busy_pieces.begin()
00259 , busy_pieces.end()
00260 , request_queue.begin()
00261 , request_queue.end()) != busy_pieces.end();
00262
00263 bool in_download_queue = std::find_first_of(
00264 busy_pieces.begin()
00265 , busy_pieces.end()
00266 , download_queue.begin()
00267 , download_queue.end()) != busy_pieces.end();
00268
00269
00270
00271
00272
00273
00274
00275 const float weight = (queue_size == 0)
00276 ? std::numeric_limits<float>::max()
00277 : i->second->statistics().download_payload_rate() / queue_size
00278 * in_request_queue ? .1f : 1.f;
00279
00280
00281
00282
00283
00284 if (weight < min_weight && (in_request_queue || in_download_queue))
00285 {
00286 peer = i->second;
00287 min_weight = weight;
00288 }
00289 }
00290
00291 if (peer == 0)
00292 {
00293
00294
00295 break;
00296 }
00297
00298
00299
00300 std::deque<piece_block>::const_reverse_iterator common_block =
00301 std::find_first_of(
00302 peer->request_queue().rbegin()
00303 , peer->request_queue().rend()
00304 , busy_pieces.begin()
00305 , busy_pieces.end());
00306
00307 if (common_block == peer->request_queue().rend())
00308 {
00309 common_block = std::find_first_of(
00310 peer->download_queue().rbegin()
00311 , peer->download_queue().rend()
00312 , busy_pieces.begin()
00313 , busy_pieces.end());
00314 assert(common_block != peer->download_queue().rend());
00315 }
00316
00317 piece_block block = *common_block;
00318
00319
00320
00321
00322
00323 peer->cancel_request(block);
00324 c.add_request(block);
00325 ignore.push_back(&c);
00326 if (!peer->has_peer_choked() && !t.is_seed())
00327 {
00328 request_a_block(t, *peer, ignore);
00329 peer->send_block_requests();
00330 }
00331
00332 num_requests--;
00333
00334 const int queue_size = (int)c.download_queue().size()
00335 + (int)c.request_queue().size();
00336 const float weight = queue_size == 0
00337 ? std::numeric_limits<float>::max()
00338 : c.statistics().download_payload_rate() / queue_size;
00339
00340
00341
00342 if (weight <= min_weight) break;
00343 }
00344 c.send_block_requests();
00345 }
00346
00347 policy::policy(torrent* t)
00348 : m_torrent(t)
00349 , m_num_unchoked(0)
00350 , m_available_free_upload(0)
00351 , m_last_optimistic_disconnect(boost::gregorian::date(1970,boost::gregorian::Jan,1))
00352 { assert(t); }
00353
00354
00355
00356 policy::peer* policy::find_choke_candidate()
00357 {
00358 INVARIANT_CHECK;
00359
00360 peer* worst_peer = 0;
00361 size_type min_weight = std::numeric_limits<int>::min();
00362
00363 #ifndef NDEBUG
00364 int unchoked_counter = m_num_unchoked;
00365 #endif
00366
00367
00368
00369 for (std::vector<peer>::iterator i = m_peers.begin();
00370 i != m_peers.end(); ++i)
00371 {
00372 peer_connection* c = i->connection;
00373
00374 if (c == 0) continue;
00375 if (c->is_choked()) continue;
00376 #ifndef NDEBUG
00377 unchoked_counter--;
00378 #endif
00379 if (c->is_disconnecting()) continue;
00380
00381 if (!c->is_peer_interested())
00382 return &(*i);
00383
00384 size_type diff = i->total_download()
00385 - i->total_upload();
00386
00387 size_type weight = static_cast<int>(c->statistics().download_rate() * 10.f)
00388 + diff
00389 + ((c->is_interesting() && c->has_peer_choked())?-10:10)*1024;
00390
00391 if (weight >= min_weight && worst_peer) continue;
00392
00393 min_weight = weight;
00394 worst_peer = &(*i);
00395 continue;
00396 }
00397 assert(unchoked_counter == 0);
00398 return worst_peer;
00399 }
00400
00401 policy::peer* policy::find_unchoke_candidate()
00402 {
00403 INVARIANT_CHECK;
00404
00405
00406
00407 if (m_num_unchoked == m_torrent->num_peers())
00408 return 0;
00409
00410 using namespace boost::posix_time;
00411 using namespace boost::gregorian;
00412
00413 peer* unchoke_peer = 0;
00414 ptime min_time(date(9999,Jan,1));
00415 float max_down_speed = 0.f;
00416
00417
00418
00419 for (std::vector<peer>::iterator i = m_peers.begin();
00420 i != m_peers.end(); ++i)
00421 {
00422 peer_connection* c = i->connection;
00423 if (c == 0) continue;
00424 if (c->is_disconnecting()) continue;
00425 if (!c->is_choked()) continue;
00426 if (!c->is_peer_interested()) continue;
00427 if (c->share_diff() < -free_upload_amount
00428 && m_torrent->ratio() != 0) continue;
00429 if (c->statistics().download_rate() < max_down_speed) continue;
00430
00431 min_time = i->last_optimistically_unchoked;
00432 max_down_speed = c->statistics().download_rate();
00433 unchoke_peer = &(*i);
00434 }
00435 return unchoke_peer;
00436 }
00437
00438 policy::peer* policy::find_disconnect_candidate()
00439 {
00440 peer *disconnect_peer = 0;
00441 double slowest_transfer_rate = std::numeric_limits<double>::max();
00442
00443 boost::posix_time::ptime local_time
00444 = second_clock::universal_time();
00445
00446 for (std::vector<peer>::iterator i = m_peers.begin();
00447 i != m_peers.end(); ++i)
00448 {
00449 peer_connection* c = i->connection;
00450 if(c == 0)
00451 continue;
00452 if(c->is_disconnecting())
00453 continue;
00454
00455 double transferred_amount
00456 = (double)c->statistics().total_payload_download();
00457
00458 boost::posix_time::time_duration connected_time
00459 = local_time - i->connected;
00460
00461 double connected_time_in_seconds
00462 = connected_time.seconds()
00463 + connected_time.minutes()*60.0
00464 + connected_time.hours()*60.0*60.0;
00465
00466 double transfer_rate
00467 = transferred_amount / (connected_time_in_seconds+1);
00468
00469 if (transfer_rate <= slowest_transfer_rate)
00470 {
00471 slowest_transfer_rate = transfer_rate;
00472 disconnect_peer = &(*i);
00473 }
00474 }
00475 return disconnect_peer;
00476 }
00477
00478 policy::peer *policy::find_connect_candidate()
00479 {
00480 boost::posix_time::ptime local_time=second_clock::universal_time();
00481 boost::posix_time::ptime ptime(local_time);
00482 policy::peer* candidate =0;
00483
00484 for (std::vector<peer>::iterator i = m_peers.begin();
00485 i != m_peers.end(); ++i)
00486 {
00487 if(i->connection) continue;
00488 if(i->banned) continue;
00489 if(i->type == peer::not_connectable) continue;
00490
00491 assert(i->connected <= local_time);
00492
00493 boost::posix_time::ptime next_connect = i->connected;
00494
00495 if (next_connect <= ptime)
00496 {
00497 ptime = next_connect;
00498 candidate = &(*i);
00499 }
00500 }
00501
00502 assert(ptime <= local_time);
00503
00504 return candidate;
00505 }
00506
00507 policy::peer* policy::find_seed_choke_candidate()
00508 {
00509 INVARIANT_CHECK;
00510
00511 assert(m_num_unchoked > 0);
00512
00513
00514
00515 using namespace boost::posix_time;
00516 using namespace boost::gregorian;
00517
00518 peer* candidate = 0;
00519
00520
00521 ptime last_unchoke = ptime(date(1970, Jan, 1));
00522
00523
00524
00525
00526 peer* second_candidate = 0;
00527 size_type lowest_share_diff = 0;
00528
00529 for (std::vector<peer>::iterator i = m_peers.begin();
00530 i != m_peers.end(); ++i)
00531 {
00532 peer_connection* c = i->connection;
00533
00534
00535 if (c == 0) continue;
00536
00537 if (c->is_choked()) continue;
00538 if (c->is_disconnecting()) continue;
00539
00540 size_type share_diff = c->share_diff();
00541
00542
00543
00544 if (!second_candidate || share_diff <= lowest_share_diff)
00545 {
00546 lowest_share_diff = share_diff;
00547 second_candidate = &(*i);
00548 }
00549
00550
00551
00552 if (share_diff > 0) continue;
00553 if (!candidate || last_unchoke > i->last_optimistically_unchoked)
00554 {
00555 last_unchoke = i->last_optimistically_unchoked;
00556 candidate = &(*i);
00557 }
00558 }
00559 if (candidate) return candidate;
00560 if (second_candidate) return second_candidate;
00561 assert(false);
00562 return 0;
00563 }
00564
00565 policy::peer* policy::find_seed_unchoke_candidate()
00566 {
00567 INVARIANT_CHECK;
00568
00569 peer* candidate = 0;
00570 boost::posix_time::ptime last_unchoke
00571 = second_clock::universal_time();
00572
00573 for (std::vector<peer>::iterator i = m_peers.begin();
00574 i != m_peers.end(); ++i)
00575 {
00576 peer_connection* c = i->connection;
00577 if (c == 0) continue;
00578 if (!c->is_choked()) continue;
00579 if (!c->is_peer_interested()) continue;
00580 if (c->is_disconnecting()) continue;
00581 if (last_unchoke < i->last_optimistically_unchoked) continue;
00582 last_unchoke = i->last_optimistically_unchoked;
00583 candidate = &(*i);
00584 }
00585 return candidate;
00586 }
00587
00588 bool policy::seed_unchoke_one_peer()
00589 {
00590 INVARIANT_CHECK;
00591
00592 peer* p = find_seed_unchoke_candidate();
00593 if (p != 0)
00594 {
00595 assert(p->connection->is_choked());
00596 p->connection->send_unchoke();
00597 p->last_optimistically_unchoked
00598 = second_clock::universal_time();
00599 ++m_num_unchoked;
00600 }
00601 return p != 0;
00602 }
00603
00604 void policy::seed_choke_one_peer()
00605 {
00606 INVARIANT_CHECK;
00607
00608 peer* p = find_seed_choke_candidate();
00609 if (p != 0)
00610 {
00611 assert(!p->connection->is_choked());
00612 p->connection->send_choke();
00613 --m_num_unchoked;
00614 }
00615 }
00616
00617 void policy::pulse()
00618 {
00619 INVARIANT_CHECK;
00620
00621 if (m_torrent->is_paused()) return;
00622
00623 using namespace boost::posix_time;
00624
00625
00626 m_peers.erase(
00627 std::remove_if(m_peers.begin()
00628 , m_peers.end()
00629 , old_disconnected_peer())
00630 , m_peers.end());
00631
00632
00633
00634
00635
00636
00637
00638 int num_connected_peers = 0;
00639
00640 for (std::vector<peer>::iterator i = m_peers.begin();
00641 i != m_peers.end(); ++i)
00642 {
00643 if (i->connection && !i->connection->is_disconnecting())
00644 ++num_connected_peers;
00645 }
00646
00647 if (m_torrent->m_connections_quota.given != std::numeric_limits<int>::max())
00648 {
00649
00650 int max_connections = m_torrent->m_connections_quota.given;
00651
00652 if (num_connected_peers >= max_connections)
00653 {
00654
00655
00656 boost::posix_time::ptime local_time = second_clock::universal_time();
00657 if (m_last_optimistic_disconnect + boost::posix_time::seconds(120) <= local_time)
00658 {
00659 m_last_optimistic_disconnect = local_time;
00660 --max_connections;
00661 }
00662 }
00663 else
00664 {
00665
00666 m_last_optimistic_disconnect = second_clock::universal_time();
00667 }
00668
00669 while (num_connected_peers > max_connections)
00670 {
00671 bool ret = disconnect_one_peer();
00672 (void)ret;
00673 assert(ret);
00674 --num_connected_peers;
00675 }
00676 }
00677
00678 while (m_torrent->num_peers() < m_torrent->m_connections_quota.given)
00679 {
00680 if (!connect_one_peer())
00681 break;
00682 }
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692
00693
00694
00695
00696
00697
00698
00699 if (m_torrent->ratio() != 0.f)
00700 {
00701
00702
00703 m_available_free_upload
00704 += collect_free_download(
00705 m_torrent->begin()
00706 , m_torrent->end());
00707
00708
00709 m_available_free_upload = distribute_free_upload(
00710 m_torrent->begin()
00711 , m_torrent->end()
00712 , m_available_free_upload);
00713 }
00714
00715
00716
00717
00718 if (m_torrent->is_seed())
00719 {
00720 if (m_num_unchoked > m_torrent->m_uploads_quota.given)
00721 {
00722 do
00723 {
00724 peer* p = find_seed_choke_candidate();
00725 --m_num_unchoked;
00726 assert(p != 0);
00727 if (p == 0) break;
00728
00729 assert(!p->connection->is_choked());
00730 p->connection->send_choke();
00731 } while (m_num_unchoked > m_torrent->m_uploads_quota.given);
00732 }
00733 else if (m_num_unchoked > 0)
00734 {
00735
00736
00737
00738
00739 assert(m_num_unchoked <= m_torrent->num_peers());
00740 peer* p = find_seed_unchoke_candidate();
00741 if (p)
00742 {
00743 assert(p->connection->is_choked());
00744 seed_choke_one_peer();
00745 p->connection->send_unchoke();
00746 ++m_num_unchoked;
00747 }
00748
00749 }
00750
00751
00752
00753 while (m_num_unchoked < m_torrent->m_uploads_quota.given)
00754 {
00755 if (!seed_unchoke_one_peer()) break;
00756 }
00757 #ifndef NDEBUG
00758 check_invariant();
00759 #endif
00760 }
00761
00762
00763
00764
00765 else
00766 {
00767 if (m_torrent->ratio() != 0)
00768 {
00769
00770 for (std::vector<peer>::iterator i = m_peers.begin();
00771 i != m_peers.end(); ++i)
00772 {
00773 peer_connection* c = i->connection;
00774 if (c == 0) continue;
00775
00776 size_type diff = i->connection->share_diff();
00777 if (diff < -free_upload_amount
00778 && !c->is_choked())
00779 {
00780
00781
00782 c->send_choke();
00783 --m_num_unchoked;
00784 }
00785 }
00786 }
00787
00788 if (m_torrent->m_uploads_quota.given < m_torrent->num_peers())
00789 {
00790 assert(m_torrent->m_uploads_quota.given >= 0);
00791
00792
00793
00794 if (m_num_unchoked > m_torrent->m_uploads_quota.given)
00795 {
00796 do
00797 {
00798 peer* p = find_choke_candidate();
00799 if (!p) break;
00800 assert(p);
00801 assert(!p->connection->is_choked());
00802 p->connection->send_choke();
00803 --m_num_unchoked;
00804 } while (m_num_unchoked > m_torrent->m_uploads_quota.given);
00805 }
00806 else
00807 {
00808
00809
00810
00811
00812 assert(m_num_unchoked <= m_torrent->num_peers());
00813 peer* p = find_unchoke_candidate();
00814 if (p)
00815 {
00816 assert(p->connection->is_choked());
00817 choke_one_peer();
00818 p->connection->send_unchoke();
00819 ++m_num_unchoked;
00820 }
00821 }
00822 }
00823
00824
00825
00826 while (m_num_unchoked < m_torrent->m_uploads_quota.given
00827 && unchoke_one_peer());
00828 }
00829 }
00830
00831 void policy::ban_peer(const peer_connection& c)
00832 {
00833 INVARIANT_CHECK;
00834
00835 std::vector<peer>::iterator i = std::find_if(
00836 m_peers.begin()
00837 , m_peers.end()
00838 , match_peer_connection(c));
00839
00840 if (i == m_peers.end())
00841 {
00842
00843 if (web_peer_connection const* p = dynamic_cast<web_peer_connection const*>(&c))
00844 {
00845 m_torrent->remove_url_seed(p->url());
00846 }
00847 return;
00848 }
00849
00850 i->type = peer::not_connectable;
00851 i->ip.port(0);
00852 i->banned = true;
00853 }
00854
00855 void policy::new_connection(peer_connection& c)
00856 {
00857 assert(!c.is_local());
00858
00859 INVARIANT_CHECK;
00860
00861
00862
00863
00864
00865
00866
00867 assert((c.proxy() == tcp::endpoint() && c.remote() == c.get_socket()->remote_endpoint())
00868 || c.proxy() == c.get_socket()->remote_endpoint());
00869
00870 if (m_torrent->num_peers() >= m_torrent->m_connections_quota.given
00871 && c.remote().address() != m_torrent->current_tracker().address())
00872 {
00873 throw protocol_error("too many connections, refusing incoming connection");
00874 }
00875
00876 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
00877 if (c.remote().address() == m_torrent->current_tracker().address())
00878 {
00879 m_torrent->debug_log("overriding connection limit for tracker NAT-check");
00880 }
00881 #endif
00882
00883 std::vector<peer>::iterator i;
00884
00885 if (m_torrent->settings().allow_multiple_connections_per_ip)
00886 {
00887 i = m_peers.end();
00888 }
00889 else
00890 {
00891 i = std::find_if(
00892 m_peers.begin()
00893 , m_peers.end()
00894 , match_peer_ip(c.remote()));
00895 }
00896
00897 if (i != m_peers.end())
00898 {
00899 if (i->banned)
00900 throw protocol_error("ip address banned, closing");
00901
00902 if (i->connection != 0)
00903 {
00904 assert(i->connection != &c);
00905
00906
00907 if (!i->connection->is_connecting() || c.is_local())
00908 {
00909 throw protocol_error("duplicate connection, closing");
00910 }
00911 else
00912 {
00913 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
00914 m_torrent->debug_log("duplicate connection. existing connection"
00915 " is connecting and this connection is incoming. closing existing "
00916 "connection in favour of this one");
00917 #endif
00918 i->connection->disconnect();
00919 i->connection = 0;
00920 }
00921 }
00922 }
00923 else
00924 {
00925 using namespace boost::posix_time;
00926 using namespace boost::gregorian;
00927
00928
00929
00930 assert((c.proxy() == tcp::endpoint() && c.remote() == c.get_socket()->remote_endpoint())
00931 || c.proxy() == c.get_socket()->remote_endpoint());
00932
00933 peer p(c.remote(), peer::not_connectable);
00934 m_peers.push_back(p);
00935 i = m_peers.end()-1;
00936 }
00937
00938 assert(i->connection == 0);
00939 c.add_stat(i->prev_amount_download, i->prev_amount_upload);
00940 i->prev_amount_download = 0;
00941 i->prev_amount_upload = 0;
00942 i->connection = &c;
00943 assert(i->connection);
00944 i->connected = second_clock::universal_time();
00945 m_last_optimistic_disconnect = second_clock::universal_time();
00946 }
00947
00948 void policy::peer_from_tracker(const tcp::endpoint& remote, const peer_id& pid)
00949 {
00950 INVARIANT_CHECK;
00951
00952
00953 if(remote.address() == address() || remote.port() == 0)
00954 return;
00955
00956 try
00957 {
00958 std::vector<peer>::iterator i;
00959
00960 if (m_torrent->settings().allow_multiple_connections_per_ip)
00961 {
00962 i = m_peers.end();
00963 }
00964 else
00965 {
00966 i = std::find_if(
00967 m_peers.begin()
00968 , m_peers.end()
00969 , match_peer_ip(remote));
00970 }
00971
00972 bool just_added = false;
00973
00974 if (i == m_peers.end())
00975 {
00976 using namespace boost::posix_time;
00977 using namespace boost::gregorian;
00978
00979
00980
00981 peer p(remote, peer::connectable);
00982 m_peers.push_back(p);
00983
00984
00985 i = m_peers.end() - 1;
00986 just_added = true;
00987 }
00988 else
00989 {
00990 i->type = peer::connectable;
00991
00992
00993
00994
00995 i->ip = remote;
00996
00997 if (i->connection)
00998 {
00999
01000
01001
01002
01003 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
01004 m_torrent->debug_log("already connected to peer: " + remote.address().to_string() + ":"
01005 + boost::lexical_cast<std::string>(remote.port()) + " "
01006 + boost::lexical_cast<std::string>(i->connection->pid()));
01007 #endif
01008
01009 assert(i->connection->associated_torrent().lock().get() == m_torrent);
01010 return;
01011 }
01012 }
01013
01014 if (i->banned) return;
01015
01016 if (m_torrent->num_peers() < m_torrent->m_connections_quota.given
01017 && !m_torrent->is_paused())
01018 {
01019 if (!connect_peer(&*i) && just_added)
01020 {
01021
01022
01023
01024 assert(i == m_peers.end() - 1);
01025 m_peers.erase(i);
01026 }
01027 }
01028 return;
01029 }
01030 catch(std::exception& e)
01031 {
01032 if (m_torrent->alerts().should_post(alert::debug))
01033 {
01034 m_torrent->alerts().post_alert(
01035 peer_error_alert(remote, pid, e.what()));
01036 }
01037 }
01038 }
01039
01040
01041
01042
01043 void policy::choked(peer_connection&)
01044 {
01045 }
01046
01047 void policy::piece_finished(int index, bool successfully_verified)
01048 {
01049 INVARIANT_CHECK;
01050
01051 assert(index >= 0 && index < m_torrent->torrent_file().num_pieces());
01052
01053 if (successfully_verified)
01054 {
01055
01056 for (std::vector<peer>::iterator i = m_peers.begin();
01057 i != m_peers.end(); ++i)
01058 {
01059 if (i->connection == 0) continue;
01060
01061 if (!i->connection->is_interesting()) continue;
01062 if (!i->connection->has_piece(index)) continue;
01063
01064 bool interested = false;
01065 const std::vector<bool>& peer_has = i->connection->get_bitfield();
01066 const std::vector<bool>& we_have = m_torrent->pieces();
01067 assert(we_have.size() == peer_has.size());
01068 for (int j = 0; j != (int)we_have.size(); ++j)
01069 {
01070 if (!we_have[j] && peer_has[j])
01071 {
01072 interested = true;
01073 break;
01074 }
01075 }
01076 if (!interested)
01077 i->connection->send_not_interested();
01078 assert(i->connection->is_interesting() == interested);
01079 }
01080 }
01081 }
01082
01083
01084
01085
01086 void policy::block_finished(peer_connection& c, piece_block)
01087 {
01088 INVARIANT_CHECK;
01089
01090
01091 if (!c.has_peer_choked() && !m_torrent->is_seed())
01092 request_a_block(*m_torrent, c);
01093 }
01094
01095
01096
01097
01098 void policy::unchoked(peer_connection& c)
01099 {
01100 INVARIANT_CHECK;
01101 if (c.is_interesting())
01102 {
01103 request_a_block(*m_torrent, c);
01104 }
01105 }
01106
01107
01108 void policy::interested(peer_connection& c)
01109 {
01110 INVARIANT_CHECK;
01111
01112 assert(std::find_if(m_peers.begin(), m_peers.end()
01113 , boost::bind<bool>(std::equal_to<peer_connection*>(), bind(&peer::connection, _1)
01114 , &c)) != m_peers.end());
01115
01116
01117
01118
01119
01120
01121
01122
01123
01124
01125
01126 if (c.is_choked()
01127 && m_num_unchoked < m_torrent->m_uploads_quota.given
01128 && (m_torrent->ratio() == 0
01129 || c.share_diff() >= -free_upload_amount
01130 || m_torrent->is_seed()))
01131 {
01132 c.send_unchoke();
01133 ++m_num_unchoked;
01134 }
01135 }
01136
01137
01138 void policy::not_interested(peer_connection& c)
01139 {
01140 INVARIANT_CHECK;
01141
01142 if (m_torrent->ratio() != 0.f)
01143 {
01144 assert(c.share_diff() < std::numeric_limits<size_type>::max());
01145 size_type diff = c.share_diff();
01146 if (diff > 0 && c.is_seed())
01147 {
01148
01149
01150
01151 m_available_free_upload += diff;
01152 c.add_free_upload(-diff);
01153 }
01154 }
01155 if (!c.is_choked())
01156 {
01157 c.send_choke();
01158 --m_num_unchoked;
01159
01160 if (m_torrent->is_seed()) seed_unchoke_one_peer();
01161 else unchoke_one_peer();
01162 }
01163 }
01164
01165 bool policy::unchoke_one_peer()
01166 {
01167 peer* p = find_unchoke_candidate();
01168 if (p == 0) return false;
01169 assert(p->connection);
01170 assert(!p->connection->is_disconnecting());
01171
01172 assert(p->connection->is_choked());
01173 p->connection->send_unchoke();
01174 p->last_optimistically_unchoked = second_clock::universal_time();
01175 ++m_num_unchoked;
01176 return true;
01177 }
01178
01179 void policy::choke_one_peer()
01180 {
01181 peer* p = find_choke_candidate();
01182 if (p == 0) return;
01183 assert(p->connection);
01184 assert(!p->connection->is_disconnecting());
01185 assert(!p->connection->is_choked());
01186 p->connection->send_choke();
01187 --m_num_unchoked;
01188 }
01189
01190 bool policy::connect_one_peer()
01191 {
01192 if(m_torrent->num_peers() >= m_torrent->m_connections_quota.given)
01193 return false;
01194 peer* p = find_connect_candidate();
01195 if (p == 0) return false;
01196 assert(!p->banned);
01197 assert(!p->connection);
01198 assert(p->type == peer::connectable);
01199
01200 return connect_peer(p);
01201 }
01202
01203 bool policy::connect_peer(peer *p)
01204 {
01205 INVARIANT_CHECK;
01206 try
01207 {
01208 assert(!p->connection);
01209 p->connection = &m_torrent->connect_to_peer(p->ip);
01210 assert(p->connection);
01211 p->connection->add_stat(p->prev_amount_download, p->prev_amount_upload);
01212 p->prev_amount_download = 0;
01213 p->prev_amount_upload = 0;
01214 p->connected =
01215 m_last_optimistic_disconnect =
01216 second_clock::universal_time();
01217 return true;
01218 }
01219 catch (std::exception& e)
01220 {}
01221 return false;
01222 }
01223
01224 bool policy::disconnect_one_peer()
01225 {
01226 peer *p = find_disconnect_candidate();
01227 if(!p)
01228 return false;
01229 #if defined(TORRENT_VERBOSE_LOGGING)
01230 (*p->connection->m_logger) << "*** CLOSING CONNECTION 'too many connections'\n";
01231 #endif
01232
01233 p->connection->disconnect();
01234 return true;
01235 }
01236
01237
01238 void policy::connection_closed(const peer_connection& c) try
01239 {
01240 INVARIANT_CHECK;
01241
01242 bool unchoked = false;
01243
01244 std::vector<peer>::iterator i = std::find_if(
01245 m_peers.begin()
01246 , m_peers.end()
01247 , match_peer_connection(c));
01248
01249
01250 if (i == m_peers.end()) return;
01251 assert(i->connection == &c);
01252
01253 i->connected = second_clock::universal_time();
01254 if (!i->connection->is_choked() && !m_torrent->is_aborted())
01255 {
01256 unchoked = true;
01257 }
01258
01259 if (c.failed())
01260 {
01261 i->type = peer::not_connectable;
01262 i->ip.port(0);
01263 }
01264
01265
01266
01267
01268 if (m_torrent->ratio() != 0.f)
01269 {
01270 assert(i->connection->associated_torrent().lock().get() == m_torrent);
01271 assert(i->connection->share_diff() < std::numeric_limits<size_type>::max());
01272 m_available_free_upload += i->connection->share_diff();
01273 }
01274 i->prev_amount_download += c.statistics().total_payload_download();
01275 i->prev_amount_upload += c.statistics().total_payload_upload();
01276 i->connection = 0;
01277
01278 if (unchoked)
01279 {
01280
01281
01282
01283 --m_num_unchoked;
01284 if (m_torrent->is_seed()) seed_unchoke_one_peer();
01285 else unchoke_one_peer();
01286 }
01287 }
01288 catch (std::exception& e)
01289 {
01290 #ifndef NDEBUG
01291 std::string err = e.what();
01292 #endif
01293 assert(false);
01294 }
01295
01296 void policy::peer_is_interesting(peer_connection& c)
01297 {
01298 INVARIANT_CHECK;
01299
01300 c.send_interested();
01301 if (c.has_peer_choked()) return;
01302 request_a_block(*m_torrent, c);
01303 }
01304
01305 #ifndef NDEBUG
01306 bool policy::has_connection(const peer_connection* c)
01307 {
01308 assert(c);
01309 assert((c->proxy() == tcp::endpoint() && c->remote() == c->get_socket()->remote_endpoint())
01310 || c->proxy() == c->get_socket()->remote_endpoint());
01311
01312 return std::find_if(
01313 m_peers.begin()
01314 , m_peers.end()
01315 , match_peer_connection(*c)) != m_peers.end();
01316 }
01317
01318 void policy::check_invariant() const
01319 {
01320 if (m_torrent->is_aborted()) return;
01321 int actual_unchoked = 0;
01322 int connected_peers = 0;
01323
01324 int total_connections = 0;
01325 int nonempty_connections = 0;
01326
01327
01328 for (std::vector<peer>::const_iterator i = m_peers.begin();
01329 i != m_peers.end(); ++i)
01330 {
01331 ++total_connections;
01332 if (!i->connection) continue;
01333 ++nonempty_connections;
01334 if (!i->connection->is_disconnecting())
01335 ++connected_peers;
01336 if (!i->connection->is_choked()) ++actual_unchoked;
01337 }
01338
01339 assert(actual_unchoked == m_num_unchoked);
01340
01341 int num_torrent_peers = 0;
01342 for (torrent::const_peer_iterator i = m_torrent->begin();
01343 i != m_torrent->end(); ++i)
01344 {
01345 if (i->second->is_disconnecting()) continue;
01346
01347
01348 if (dynamic_cast<web_peer_connection*>(i->second)) continue;
01349 ++num_torrent_peers;
01350 }
01351
01352
01353
01354
01355
01356
01357
01358
01359
01360
01361 assert(connected_peers == num_torrent_peers
01362 || (connected_peers == num_torrent_peers + 1
01363 && connected_peers > 0)
01364 || (connected_peers + 1 == num_torrent_peers
01365 && num_torrent_peers > 0));
01366
01367
01368
01369 }
01370 #endif
01371
01372 policy::peer::peer(const tcp::endpoint& ip_, peer::connection_type t)
01373 : ip(ip_)
01374 , type(t)
01375 , last_optimistically_unchoked(
01376 boost::gregorian::date(1970,boost::gregorian::Jan,1))
01377 , connected(boost::gregorian::date(1970,boost::gregorian::Jan,1))
01378 , prev_amount_upload(0)
01379 , prev_amount_download(0)
01380 , banned(false)
01381 , connection(0)
01382 {
01383 assert(connected < second_clock::universal_time());
01384 }
01385
01386 size_type policy::peer::total_download() const
01387 {
01388 if (connection != 0)
01389 {
01390 assert(prev_amount_download == 0);
01391 return connection->statistics().total_payload_download();
01392 }
01393 else
01394 {
01395 return prev_amount_download;
01396 }
01397 }
01398
01399 size_type policy::peer::total_upload() const
01400 {
01401 if (connection != 0)
01402 {
01403 assert(prev_amount_upload == 0);
01404 return connection->statistics().total_payload_upload();
01405 }
01406 else
01407 {
01408 return prev_amount_upload;
01409 }
01410 }
01411 }
01412