00001
00002
00003
00004
00005
00006
00007
00008 #include "libtorrent/invariant_check.hpp"
00009 #include "libtorrent/bandwidth_manager.hpp"
00010 #include "libtorrent/peer_connection.hpp"
00011 #if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING
00012 #include "libtorrent/aux_/session_impl.hpp"
00013 #endif
00014
00015 namespace libtorrent
00016 {
00017 namespace
00018 {
00019 const pt::time_duration window_size = pt::seconds(1);
00020 }
00021
00022 history_entry::history_entry(intrusive_ptr<peer_connection> p
00023 , weak_ptr<torrent> t, int a, pt::ptime exp)
00024 : expires_at(exp), amount(a), peer(p), tor(t)
00025 {}
00026
00027 bw_queue_entry::bw_queue_entry(intrusive_ptr<peer_connection> const& pe
00028 , bool no_prio)
00029 : peer(pe), non_prioritized(no_prio)
00030 {}
00031
00032 bandwidth_manager::bandwidth_manager(io_service& ios, int channel)
00033 : m_ios(ios)
00034 , m_history_timer(m_ios)
00035 , m_limit(bandwidth_limit::inf)
00036 , m_current_quota(0)
00037 , m_channel(channel)
00038 {}
00039
00040 void bandwidth_manager::request_bandwidth(intrusive_ptr<peer_connection> peer
00041 , bool non_prioritized)
00042 {
00043 INVARIANT_CHECK;
00044
00045
00046
00047 #ifndef NDEBUG
00048 for (std::deque<bw_queue_entry>::iterator i = m_queue.begin()
00049 , end(m_queue.end()); i != end; ++i)
00050 {
00051 assert(i->peer < peer || peer < i->peer);
00052 }
00053 #endif
00054
00055 assert(peer->max_assignable_bandwidth(m_channel) > 0);
00056
00057
00058
00059
00060
00061 if (m_queue.empty() || non_prioritized)
00062 {
00063 m_queue.push_back(bw_queue_entry(peer, non_prioritized));
00064 }
00065 else
00066 {
00067
00068
00069 std::deque<bw_queue_entry>::reverse_iterator i = m_queue.rbegin();
00070 while (i != m_queue.rend() && i->non_prioritized) ++i;
00071 m_queue.insert(i.base(), bw_queue_entry(peer, non_prioritized));
00072 }
00073 if (m_queue.size() == 1) hand_out_bandwidth();
00074 }
00075
00076 #ifndef NDEBUG
00077 void bandwidth_manager::check_invariant() const
00078 {
00079 int current_quota = 0;
00080 for (std::deque<history_entry>::const_iterator i
00081 = m_history.begin(), end(m_history.end()); i != end; ++i)
00082 {
00083 current_quota += i->amount;
00084 }
00085
00086 assert(current_quota == m_current_quota);
00087 }
00088 #endif
00089
00090 void bandwidth_manager::add_history_entry(history_entry const& e) try
00091 {
00092 INVARIANT_CHECK;
00093 #if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING
00094
00095 #endif
00096
00097 m_history.push_front(e);
00098 m_current_quota += e.amount;
00099
00100
00101 if (m_history.size() > 1) return;
00102
00103 m_history_timer.expires_at(e.expires_at);
00104 m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
00105 }
00106 catch (std::exception&) { assert(false); }
00107
00108 void bandwidth_manager::on_history_expire(asio::error_code const& e) try
00109 {
00110 INVARIANT_CHECK;
00111
00112 if (e) return;
00113
00114 #if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING
00115
00116 #endif
00117
00118 assert(!m_history.empty());
00119
00120 pt::ptime now(pt::microsec_clock::universal_time());
00121 while (!m_history.empty() && m_history.back().expires_at <= now)
00122 {
00123 history_entry e = m_history.back();
00124 m_history.pop_back();
00125 m_current_quota -= e.amount;
00126 assert(m_current_quota >= 0);
00127 intrusive_ptr<peer_connection> c = e.peer;
00128 shared_ptr<torrent> t = e.tor.lock();
00129 if (!c->is_disconnecting()) c->expire_bandwidth(m_channel, e.amount);
00130 if (t) t->expire_bandwidth(m_channel, e.amount);
00131 }
00132
00133
00134 if (!m_history.empty())
00135 {
00136 m_history_timer.expires_at(m_history.back().expires_at);
00137 m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
00138 }
00139
00140
00141
00142
00143 if (!m_queue.empty()) hand_out_bandwidth();
00144 }
00145 catch (std::exception&)
00146 {
00147 assert(false);
00148 };
00149
00150 void bandwidth_manager::hand_out_bandwidth() try
00151 {
00152 INVARIANT_CHECK;
00153 #if defined TORRENT_LOGGING || defined TORRENT_VERBOSE_LOGGING
00154
00155 #endif
00156
00157 pt::ptime now(pt::microsec_clock::universal_time());
00158
00159 mutex_t::scoped_lock l(m_mutex);
00160 int limit = m_limit;
00161 l.unlock();
00162
00163
00164 int amount = limit - m_current_quota;
00165
00166 int bandwidth_block_size_limit = max_bandwidth_block_size;
00167 if (m_queue.size() > 3 && bandwidth_block_size_limit > limit / int(m_queue.size()))
00168 bandwidth_block_size_limit = std::max(max_bandwidth_block_size / int(m_queue.size() - 3)
00169 , min_bandwidth_block_size);
00170
00171 while (!m_queue.empty() && amount > 0)
00172 {
00173 assert(amount == limit - m_current_quota);
00174 bw_queue_entry qe = m_queue.front();
00175 m_queue.pop_front();
00176
00177 shared_ptr<torrent> t = qe.peer->associated_torrent().lock();
00178 if (!t) continue;
00179 if (qe.peer->is_disconnecting())
00180 {
00181 t->expire_bandwidth(m_channel, -1);
00182 continue;
00183 }
00184
00185
00186
00187
00188
00189 int max_assignable = qe.peer->max_assignable_bandwidth(m_channel);
00190 if (max_assignable == 0)
00191 {
00192 t->expire_bandwidth(m_channel, -1);
00193 continue;
00194 }
00195
00196
00197 if (max_assignable > t->bandwidth_throttle(m_channel))
00198 max_assignable = t->bandwidth_throttle(m_channel);
00199
00200
00201
00202
00203 int single_amount = std::min(amount
00204 , std::min(bandwidth_block_size_limit
00205 , max_assignable));
00206 assert(single_amount > 0);
00207 amount -= single_amount;
00208 qe.peer->assign_bandwidth(m_channel, single_amount);
00209 t->assign_bandwidth(m_channel, single_amount);
00210 add_history_entry(history_entry(qe.peer, t, single_amount, now + window_size));
00211 }
00212 }
00213 catch (std::exception& e)
00214 { assert(false); };
00215
00216 }