00001
00002
00003
00004
00005
00006
00007
00008 #include <ctime>
00009 #include <iostream>
00010 #include <fstream>
00011 #include <iomanip>
00012 #include <iterator>
00013 #include <algorithm>
00014 #include <set>
00015 #include <cctype>
00016 #include <algorithm>
00017
00018 #ifdef _MSC_VER
00019 #pragma warning(push, 1)
00020 #endif
00021
00022 #include <boost/lexical_cast.hpp>
00023 #include <boost/filesystem/convenience.hpp>
00024 #include <boost/filesystem/exception.hpp>
00025 #include <boost/limits.hpp>
00026 #include <boost/bind.hpp>
00027
00028 #ifdef _MSC_VER
00029 #pragma warning(pop)
00030 #endif
00031
00032 #include "libtorrent/peer_id.hpp"
00033 #include "libtorrent/torrent_info.hpp"
00034 #include "libtorrent/tracker_manager.hpp"
00035 #include "libtorrent/bencode.hpp"
00036 #include "libtorrent/hasher.hpp"
00037 #include "libtorrent/entry.hpp"
00038 #include "libtorrent/session.hpp"
00039 #include "libtorrent/fingerprint.hpp"
00040 #include "libtorrent/entry.hpp"
00041 #include "libtorrent/alert_types.hpp"
00042 #include "libtorrent/invariant_check.hpp"
00043 #include "libtorrent/file.hpp"
00044 #include "libtorrent/allocate_resources.hpp"
00045 #include "libtorrent/bt_peer_connection.hpp"
00046 #include "libtorrent/ip_filter.hpp"
00047 #include "libtorrent/socket.hpp"
00048 #include "libtorrent/aux_/session_impl.hpp"
00049 #include "libtorrent/kademlia/dht_tracker.hpp"
00050
00051 using namespace boost::posix_time;
00052 using boost::shared_ptr;
00053 using boost::weak_ptr;
00054 using boost::bind;
00055 using boost::mutex;
00056 using libtorrent::aux::session_impl;
00057
00058 namespace libtorrent { namespace detail
00059 {
00060
00061 std::string generate_auth_string(std::string const& user
00062 , std::string const& passwd)
00063 {
00064 if (user.empty()) return std::string();
00065 return user + ":" + passwd;
00066 }
00067
00068
00069 } namespace aux {
00070
00071
00072
00073
00074
00075
00076
00077 void checker_impl::operator()()
00078 {
00079 eh_initializer();
00080
00081
00082 boost::shared_ptr<piece_checker_data> processing;
00083 boost::shared_ptr<piece_checker_data> t;
00084 for (;;)
00085 {
00086
00087 try
00088 {
00089 t.reset();
00090 {
00091 boost::mutex::scoped_lock l(m_mutex);
00092
00093 INVARIANT_CHECK;
00094
00095
00096
00097
00098 while (m_torrents.empty() && !m_abort && !processing)
00099 m_cond.wait(l);
00100
00101 if (m_abort)
00102 {
00103
00104
00105 processing.reset();
00106 t.reset();
00107 std::for_each(m_torrents.begin(), m_torrents.end()
00108 , boost::bind(&torrent::abort
00109 , boost::bind(&shared_ptr<torrent>::get
00110 , boost::bind(&piece_checker_data::torrent_ptr, _1))));
00111 m_torrents.clear();
00112 std::for_each(m_processing.begin(), m_processing.end()
00113 , boost::bind(&torrent::abort
00114 , boost::bind(&shared_ptr<torrent>::get
00115 , boost::bind(&piece_checker_data::torrent_ptr, _1))));
00116 m_processing.clear();
00117 return;
00118 }
00119
00120 if (!m_torrents.empty())
00121 {
00122 t = m_torrents.front();
00123 if (t->abort)
00124 {
00125
00126
00127
00128
00129 l.unlock();
00130 session_impl::mutex_t::scoped_lock l2(m_ses.m_mutex);
00131 l.lock();
00132
00133 t->torrent_ptr->abort();
00134 m_torrents.pop_front();
00135 continue;
00136 }
00137 }
00138 }
00139
00140 if (t)
00141 {
00142 std::string error_msg;
00143 t->parse_resume_data(t->resume_data, t->torrent_ptr->torrent_file()
00144 , error_msg);
00145
00146 if (!error_msg.empty() && m_ses.m_alerts.should_post(alert::warning))
00147 {
00148 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
00149 m_ses.m_alerts.post_alert(fastresume_rejected_alert(
00150 t->torrent_ptr->get_handle()
00151 , error_msg));
00152 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
00153 (*m_ses.m_logger) << "fastresume data for "
00154 << t->torrent_ptr->torrent_file().name() << " rejected: "
00155 << error_msg << "\n";
00156 #endif
00157 }
00158
00159
00160 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
00161 mutex::scoped_lock l2(m_mutex);
00162
00163
00164 t->resume_data = entry();
00165 bool up_to_date = t->torrent_ptr->check_fastresume(*t);
00166
00167 if (up_to_date)
00168 {
00169 INVARIANT_CHECK;
00170
00171 assert(m_torrents.front() == t);
00172
00173 t->torrent_ptr->files_checked(t->unfinished_pieces);
00174 m_torrents.pop_front();
00175
00176
00177 if (!m_ses.is_aborted())
00178 {
00179 m_ses.m_torrents.insert(std::make_pair(t->info_hash, t->torrent_ptr));
00180 if (t->torrent_ptr->is_seed() && m_ses.m_alerts.should_post(alert::info))
00181 {
00182 m_ses.m_alerts.post_alert(torrent_finished_alert(
00183 t->torrent_ptr->get_handle()
00184 , "torrent is complete"));
00185 }
00186
00187 peer_id id;
00188 std::fill(id.begin(), id.end(), 0);
00189 for (std::vector<tcp::endpoint>::const_iterator i = t->peers.begin();
00190 i != t->peers.end(); ++i)
00191 {
00192 t->torrent_ptr->get_policy().peer_from_tracker(*i, id);
00193 }
00194 }
00195 else
00196 {
00197 t->torrent_ptr->abort();
00198 }
00199 t.reset();
00200 continue;
00201 }
00202
00203 l.unlock();
00204
00205
00206
00207 assert(m_torrents.front() == t);
00208
00209 m_torrents.pop_front();
00210 m_processing.push_back(t);
00211 if (!processing)
00212 {
00213 processing = t;
00214 processing->processing = true;
00215 t.reset();
00216 }
00217 }
00218 }
00219 catch (const std::exception& e)
00220 {
00221
00222
00223 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
00224 mutex::scoped_lock l2(m_mutex);
00225
00226 if (m_ses.m_alerts.should_post(alert::fatal))
00227 {
00228 m_ses.m_alerts.post_alert(
00229 file_error_alert(
00230 t->torrent_ptr->get_handle()
00231 , e.what()));
00232 }
00233 t->torrent_ptr->abort();
00234
00235 assert(!m_torrents.empty());
00236 m_torrents.pop_front();
00237 }
00238 catch(...)
00239 {
00240 #ifndef NDEBUG
00241 std::cerr << "error while checking resume data\n";
00242 #endif
00243 mutex::scoped_lock l(m_mutex);
00244 assert(!m_torrents.empty());
00245 m_torrents.pop_front();
00246 assert(false);
00247 }
00248
00249 if (!processing) continue;
00250
00251 try
00252 {
00253 assert(processing);
00254
00255 float finished = false;
00256 float progress = 0.f;
00257 boost::tie(finished, progress) = processing->torrent_ptr->check_files();
00258
00259 {
00260 mutex::scoped_lock l(m_mutex);
00261
00262 INVARIANT_CHECK;
00263
00264 processing->progress = progress;
00265 if (processing->abort)
00266 {
00267 assert(!m_processing.empty());
00268 assert(m_processing.front() == processing);
00269
00270 processing->torrent_ptr->abort();
00271
00272 processing.reset();
00273 m_processing.pop_front();
00274 if (!m_processing.empty())
00275 {
00276 processing = m_processing.front();
00277 processing->processing = true;
00278 }
00279 continue;
00280 }
00281 }
00282 if (finished)
00283 {
00284
00285 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
00286 mutex::scoped_lock l2(m_mutex);
00287
00288 INVARIANT_CHECK;
00289
00290 assert(!m_processing.empty());
00291 assert(m_processing.front() == processing);
00292
00293
00294
00295
00296 if (!m_ses.is_aborted())
00297 {
00298 processing->torrent_ptr->files_checked(processing->unfinished_pieces);
00299 m_ses.m_torrents.insert(std::make_pair(
00300 processing->info_hash, processing->torrent_ptr));
00301 if (processing->torrent_ptr->is_seed()
00302 && m_ses.m_alerts.should_post(alert::info))
00303 {
00304 m_ses.m_alerts.post_alert(torrent_finished_alert(
00305 processing->torrent_ptr->get_handle()
00306 , "torrent is complete"));
00307 }
00308
00309 peer_id id;
00310 std::fill(id.begin(), id.end(), 0);
00311 for (std::vector<tcp::endpoint>::const_iterator i = processing->peers.begin();
00312 i != processing->peers.end(); ++i)
00313 {
00314 processing->torrent_ptr->get_policy().peer_from_tracker(*i, id);
00315 }
00316 }
00317 else
00318 {
00319 processing->torrent_ptr->abort();
00320 }
00321 processing.reset();
00322 m_processing.pop_front();
00323 if (!m_processing.empty())
00324 {
00325 processing = m_processing.front();
00326 processing->processing = true;
00327 }
00328 }
00329 }
00330 catch(std::exception const& e)
00331 {
00332
00333 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
00334 mutex::scoped_lock l2(m_mutex);
00335
00336 if (m_ses.m_alerts.should_post(alert::fatal))
00337 {
00338 m_ses.m_alerts.post_alert(
00339 file_error_alert(
00340 processing->torrent_ptr->get_handle()
00341 , e.what()));
00342 }
00343 assert(!m_processing.empty());
00344
00345 processing->torrent_ptr->abort();
00346
00347 processing.reset();
00348 m_processing.pop_front();
00349 if (!m_processing.empty())
00350 {
00351 processing = m_processing.front();
00352 processing->processing = true;
00353 }
00354 }
00355 catch(...)
00356 {
00357 #ifndef NDEBUG
00358 std::cerr << "error while checking files\n";
00359 #endif
00360 mutex::scoped_lock l(m_mutex);
00361 assert(!m_processing.empty());
00362
00363 processing.reset();
00364 m_processing.pop_front();
00365 if (!m_processing.empty())
00366 {
00367 processing = m_processing.front();
00368 processing->processing = true;
00369 }
00370
00371 assert(false);
00372 }
00373 }
00374 }
00375
00376 aux::piece_checker_data* checker_impl::find_torrent(sha1_hash const& info_hash)
00377 {
00378 INVARIANT_CHECK;
00379 for (std::deque<boost::shared_ptr<piece_checker_data> >::iterator i
00380 = m_torrents.begin(); i != m_torrents.end(); ++i)
00381 {
00382 if ((*i)->info_hash == info_hash) return i->get();
00383 }
00384 for (std::deque<boost::shared_ptr<piece_checker_data> >::iterator i
00385 = m_processing.begin(); i != m_processing.end(); ++i)
00386 {
00387
00388 if ((*i)->info_hash == info_hash) return i->get();
00389 }
00390
00391 return 0;
00392 }
00393
00394 void checker_impl::remove_torrent(sha1_hash const& info_hash)
00395 {
00396 INVARIANT_CHECK;
00397 for (std::deque<boost::shared_ptr<piece_checker_data> >::iterator i
00398 = m_torrents.begin(); i != m_torrents.end(); ++i)
00399 {
00400 if ((*i)->info_hash == info_hash)
00401 {
00402 assert((*i)->processing == false);
00403 m_torrents.erase(i);
00404 return;
00405 }
00406 }
00407 for (std::deque<boost::shared_ptr<piece_checker_data> >::iterator i
00408 = m_processing.begin(); i != m_processing.end(); ++i)
00409 {
00410 if ((*i)->info_hash == info_hash)
00411 {
00412 assert((*i)->processing == false);
00413 m_processing.erase(i);
00414 return;
00415 }
00416 }
00417
00418 assert(false);
00419 }
00420
00421 #ifndef NDEBUG
00422 void checker_impl::check_invariant() const
00423 {
00424 for (std::deque<boost::shared_ptr<piece_checker_data> >::const_iterator i
00425 = m_torrents.begin(); i != m_torrents.end(); ++i)
00426 {
00427 assert(*i);
00428 assert((*i)->torrent_ptr);
00429 }
00430 for (std::deque<boost::shared_ptr<piece_checker_data> >::const_iterator i
00431 = m_processing.begin(); i != m_processing.end(); ++i)
00432 {
00433 assert(*i);
00434 assert((*i)->torrent_ptr);
00435 }
00436 }
00437 #endif
00438
00439 struct seed_random_generator
00440 {
00441 seed_random_generator()
00442 {
00443 std::srand((unsigned int)(boost::posix_time::microsec_clock::
00444 universal_time().time_of_day().total_microseconds()));
00445 }
00446 };
00447
00448 session_impl::session_impl(
00449 std::pair<int, int> listen_port_range
00450 , fingerprint const& cl_fprint
00451 , char const* listen_interface)
00452 : m_strand(m_io_service)
00453 , m_dl_bandwidth_manager(m_io_service, peer_connection::download_channel)
00454 , m_ul_bandwidth_manager(m_io_service, peer_connection::upload_channel)
00455 , m_tracker_manager(m_settings)
00456 , m_listen_port_range(listen_port_range)
00457 , m_listen_interface(address::from_string(listen_interface), listen_port_range.first)
00458 , m_abort(false)
00459 , m_max_uploads(-1)
00460 , m_max_connections(-1)
00461 , m_half_open_limit(-1)
00462 , m_incoming_connection(false)
00463 , m_files(40)
00464 , m_last_tick(microsec_clock::universal_time())
00465 , m_timer(m_io_service)
00466 , m_checker_impl(*this)
00467 {
00468
00469 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
00470 m_logger = create_log("main_session", listen_port(), false);
00471 using boost::posix_time::second_clock;
00472 using boost::posix_time::to_simple_string;
00473 (*m_logger) << to_simple_string(second_clock::universal_time()) << "\n";
00474
00475 m_stats_logger = create_log("session_stats", listen_port(), false);
00476 (*m_stats_logger) <<
00477 "1. second\n"
00478 "2. hard upload quota\n"
00479 "3. hard download quota\n"
00480 "\n";
00481 m_second_counter = 0;
00482 m_dl_bandwidth_manager.m_ses = this;
00483 m_ul_bandwidth_manager.m_ses = this;
00484 #endif
00485
00486
00487 static seed_random_generator seeder;
00488
00489 m_key = rand() + (rand() << 15) + (rand() << 30);
00490 std::string print = cl_fprint.to_string();
00491 assert(print.length() <= 20);
00492
00493
00494 std::copy(
00495 print.begin()
00496 , print.begin() + print.length()
00497 , m_peer_id.begin());
00498
00499
00500 static char const printable[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
00501 "abcdefghijklmnopqrstuvwxyz-_.!~*'()";
00502
00503
00504 for (unsigned char* i = m_peer_id.begin() + print.length();
00505 i != m_peer_id.end(); ++i)
00506 {
00507 *i = printable[rand() % (sizeof(printable)-1)];
00508 }
00509
00510 m_timer.expires_from_now(seconds(1));
00511 m_timer.async_wait(m_strand.wrap(
00512 bind(&session_impl::second_tick, this, _1)));
00513
00514 m_thread.reset(new boost::thread(boost::ref(*this)));
00515 m_checker_thread.reset(new boost::thread(boost::ref(m_checker_impl)));
00516 }
00517
00518 #ifndef TORRENT_DISABLE_EXTENSIONS
00519 void session_impl::add_extension(
00520 boost::function<boost::shared_ptr<torrent_plugin>(torrent*)> ext)
00521 {
00522 m_extensions.push_back(ext);
00523 }
00524 #endif
00525
00526 #ifndef TORRENT_DISABLE_DHT
00527 void session_impl::add_dht_node(udp::endpoint n)
00528 {
00529 if (m_dht) m_dht->add_node(n);
00530 }
00531 #endif
00532
00533 void session_impl::abort()
00534 {
00535 mutex_t::scoped_lock l(m_mutex);
00536 assert(!m_abort);
00537
00538 m_abort = true;
00539 m_io_service.stop();
00540 l.unlock();
00541
00542 mutex::scoped_lock l2(m_checker_impl.m_mutex);
00543
00544 m_checker_impl.m_abort = true;
00545 }
00546
00547 void session_impl::set_ip_filter(ip_filter const& f)
00548 {
00549 mutex_t::scoped_lock l(m_mutex);
00550 m_ip_filter = f;
00551
00552
00553
00554 for (session_impl::connection_map::iterator i
00555 = m_connections.begin(); i != m_connections.end();)
00556 {
00557 tcp::endpoint sender = i->first->remote_endpoint();
00558 if (m_ip_filter.access(sender.address()) & ip_filter::blocked)
00559 {
00560 #if defined(TORRENT_VERBOSE_LOGGING)
00561 (*i->second->m_logger) << "*** CONNECTION FILTERED\n";
00562 #endif
00563 session_impl::connection_map::iterator j = i;
00564 ++i;
00565 j->second->disconnect();
00566 }
00567 else ++i;
00568 }
00569 }
00570
00571 void session_impl::set_settings(session_settings const& s)
00572 {
00573 mutex_t::scoped_lock l(m_mutex);
00574 m_settings = s;
00575 m_files.resize(m_settings.file_pool_size);
00576
00577 std::string::iterator i = m_settings.user_agent.begin();
00578 while ((i = std::find(i, m_settings.user_agent.end(), '\n'))
00579 != m_settings.user_agent.end())
00580 *i = ' ';
00581 }
00582
00583 void session_impl::open_listen_port()
00584 {
00585 try
00586 {
00587
00588 m_listen_socket = boost::shared_ptr<socket_acceptor>(new socket_acceptor(m_io_service));
00589
00590 for(;;)
00591 {
00592 try
00593 {
00594 m_listen_socket->open(m_listen_interface.protocol());
00595 m_listen_socket->bind(m_listen_interface);
00596 m_listen_socket->listen();
00597 break;
00598 }
00599 catch (asio::system_error& e)
00600 {
00601
00602 if (e.code() == asio::error::host_not_found)
00603 {
00604 if (m_alerts.should_post(alert::fatal))
00605 {
00606 std::string msg = "cannot listen on the given interface '"
00607 + m_listen_interface.address().to_string() + "'";
00608 m_alerts.post_alert(listen_failed_alert(msg));
00609 }
00610 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
00611 std::string msg = "cannot listen on the given interface '"
00612 + m_listen_interface.address().to_string() + "'";
00613 (*m_logger) << msg << "\n";
00614 #endif
00615 assert(m_listen_socket.unique());
00616 m_listen_socket.reset();
00617 break;
00618 }
00619 m_listen_socket->close();
00620 m_listen_interface.port(m_listen_interface.port() + 1);
00621 if (m_listen_interface.port() > m_listen_port_range.second)
00622 {
00623 std::stringstream msg;
00624 msg << "none of the ports in the range ["
00625 << m_listen_port_range.first
00626 << ", " << m_listen_port_range.second
00627 << "] could be opened for listening";
00628 m_alerts.post_alert(listen_failed_alert(msg.str()));
00629 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
00630 (*m_logger) << msg.str() << "\n";
00631 #endif
00632 m_listen_socket.reset();
00633 break;
00634 }
00635 }
00636 }
00637 }
00638 catch (asio::system_error& e)
00639 {
00640 if (m_alerts.should_post(alert::fatal))
00641 {
00642 m_alerts.post_alert(listen_failed_alert(
00643 std::string("failed to open listen port: ") + e.what()));
00644 }
00645 }
00646
00647 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
00648 if (m_listen_socket)
00649 {
00650 (*m_logger) << "listening on port: " << m_listen_interface.port() << "\n";
00651 }
00652 #endif
00653 if (m_listen_socket) async_accept();
00654 }
00655
00656 void session_impl::process_connection_queue()
00657 {
00658 while (!m_connection_queue.empty())
00659 {
00660 if ((int)m_half_open.size() >= m_half_open_limit
00661 && m_half_open_limit > 0)
00662 return;
00663
00664 connection_queue::value_type c = m_connection_queue.front();
00665
00666 try
00667 {
00668 m_connection_queue.pop_front();
00669 assert(c->associated_torrent().lock().get());
00670 c->connect();
00671 m_half_open.insert(std::make_pair(c->get_socket(), c));
00672 }
00673 catch (std::exception& e)
00674 {
00675 c->disconnect();
00676
00677 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
00678 (*m_logger) << "connect failed [" << c->remote() << "]: "
00679 << e.what() << "\n";
00680 #endif
00681 }
00682 }
00683 }
00684
00685 void session_impl::async_accept()
00686 {
00687 shared_ptr<stream_socket> c(new stream_socket(m_io_service));
00688 m_listen_socket->async_accept(*c
00689 , bind(&session_impl::on_incoming_connection, this, c
00690 , weak_ptr<socket_acceptor>(m_listen_socket), _1));
00691 }
00692
00693 void session_impl::on_incoming_connection(shared_ptr<stream_socket> const& s
00694 , weak_ptr<socket_acceptor> const& listen_socket, asio::error_code const& e) try
00695 {
00696 if (listen_socket.expired())
00697 return;
00698
00699 if (e == asio::error::operation_aborted)
00700 return;
00701
00702 mutex_t::scoped_lock l(m_mutex);
00703 assert(listen_socket.lock() == m_listen_socket);
00704
00705 if (m_abort) return;
00706
00707 async_accept();
00708 if (e)
00709 {
00710 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
00711 std::string msg = "error accepting connection on '"
00712 + m_listen_interface.address().to_string() + "'";
00713 (*m_logger) << msg << "\n";
00714 #endif
00715 assert(m_listen_socket.unique());
00716 return;
00717 }
00718
00719
00720 m_incoming_connection = true;
00721 tcp::endpoint endp = s->remote_endpoint();
00722
00723 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
00724 (*m_logger) << endp << " <== INCOMING CONNECTION\n";
00725 #endif
00726 if (m_ip_filter.access(endp.address()) & ip_filter::blocked)
00727 {
00728 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
00729 (*m_logger) << "filtered blocked ip\n";
00730 #endif
00731
00732 return;
00733 }
00734
00735 boost::intrusive_ptr<peer_connection> c(
00736 new bt_peer_connection(*this, s));
00737 #ifndef NDEBUG
00738 c->m_in_constructor = false;
00739 #endif
00740
00741 m_connections.insert(std::make_pair(s, c));
00742 }
00743 catch (std::exception& exc)
00744 {
00745 #ifndef NDEBUG
00746 std::string err = exc.what();
00747 #endif
00748 }
00749
00750 void session_impl::connection_failed(boost::shared_ptr<stream_socket> const& s
00751 , tcp::endpoint const& a, char const* message)
00752 #ifndef NDEBUG
00753 try
00754 #endif
00755 {
00756 mutex_t::scoped_lock l(m_mutex);
00757
00758 connection_map::iterator p = m_connections.find(s);
00759
00760
00761 if (p != m_connections.end())
00762 {
00763 if (m_alerts.should_post(alert::debug))
00764 {
00765 m_alerts.post_alert(
00766 peer_error_alert(
00767 a
00768 , p->second->pid()
00769 , message));
00770 }
00771
00772 #if defined(TORRENT_VERBOSE_LOGGING)
00773 (*p->second->m_logger) << "*** CONNECTION FAILED " << message << "\n";
00774 #endif
00775 p->second->set_failed();
00776 p->second->disconnect();
00777 }
00778 else
00779 {
00780
00781
00782 p = m_half_open.find(s);
00783 if (p != m_half_open.end())
00784 {
00785 if (m_alerts.should_post(alert::debug))
00786 {
00787 m_alerts.post_alert(
00788 peer_error_alert(
00789 a
00790 , p->second->pid()
00791 , message));
00792 }
00793 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
00794 (*m_logger) << "CLOSED: " << a.address().to_string()
00795 << " " << message << "\n";
00796 #endif
00797 p->second->set_failed();
00798 p->second->disconnect();
00799 }
00800 }
00801 }
00802 #ifndef NDEBUG
00803 catch (...)
00804 {
00805 assert(false);
00806 };
00807 #endif
00808
00809 void session_impl::close_connection(boost::intrusive_ptr<peer_connection> const& p)
00810 {
00811 mutex_t::scoped_lock l(m_mutex);
00812
00813 assert(p->is_disconnecting());
00814
00815 if (p->is_connecting())
00816 {
00817 assert(p->is_local());
00818 assert(m_connections.find(p->get_socket()) == m_connections.end());
00819
00820
00821 connection_map::iterator i = m_half_open.find(p->get_socket());
00822 if (i == m_half_open.end())
00823 {
00824
00825
00826 connection_queue::iterator j = std::find(
00827 m_connection_queue.begin(), m_connection_queue.end(), p);
00828
00829
00830
00831
00832 if (j != m_connection_queue.end())
00833 m_connection_queue.erase(j);
00834 }
00835 else
00836 {
00837 m_half_open.erase(i);
00838 process_connection_queue();
00839 }
00840 }
00841 else
00842 {
00843 assert(m_half_open.find(p->get_socket()) == m_half_open.end());
00844 assert(std::find(m_connection_queue.begin()
00845 , m_connection_queue.end(), p) == m_connection_queue.end());
00846 connection_map::iterator i = m_connections.find(p->get_socket());
00847
00848 if (i != m_connections.end())
00849 m_connections.erase(i);
00850 }
00851 }
00852
00853 void session_impl::set_peer_id(peer_id const& id)
00854 {
00855 mutex_t::scoped_lock l(m_mutex);
00856 m_peer_id = id;
00857 }
00858
00859 void session_impl::set_key(int key)
00860 {
00861 mutex_t::scoped_lock l(m_mutex);
00862 m_key = key;
00863 }
00864
00865 void session_impl::second_tick(asio::error_code const& e) try
00866 {
00867 session_impl::mutex_t::scoped_lock l(m_mutex);
00868
00869 if (e)
00870 {
00871 #if defined(TORRENT_LOGGING)
00872 (*m_logger) << "*** SECOND TIMER FAILED " << e.message() << "\n";
00873 #endif
00874 m_abort = true;
00875 m_io_service.stop();
00876 return;
00877 }
00878
00879 if (m_abort) return;
00880 float tick_interval = (microsec_clock::universal_time()
00881 - m_last_tick).total_milliseconds() / 1000.f;
00882 m_last_tick = microsec_clock::universal_time();
00883
00884 m_timer.expires_from_now(seconds(1));
00885 m_timer.async_wait(m_strand.wrap(
00886 bind(&session_impl::second_tick, this, _1)));
00887
00888
00889
00890
00891
00892 for (connection_map::iterator i = m_connections.begin();
00893 i != m_connections.end();)
00894 {
00895
00896
00897 connection_map::iterator j = i;
00898 ++i;
00899
00900
00901 peer_connection& c = *j->second;
00902 if (c.has_timed_out())
00903 {
00904 if (m_alerts.should_post(alert::debug))
00905 {
00906 m_alerts.post_alert(
00907 peer_error_alert(
00908 c.remote()
00909 , c.pid()
00910 , "connection timed out"));
00911 }
00912 #if defined(TORRENT_VERBOSE_LOGGING)
00913 (*c.m_logger) << "*** CONNECTION TIMED OUT\n";
00914 #endif
00915
00916 c.set_failed();
00917 c.disconnect();
00918 continue;
00919 }
00920
00921 c.keep_alive();
00922 }
00923
00924
00925
00926 for (std::map<sha1_hash, boost::shared_ptr<torrent> >::iterator i
00927 = m_torrents.begin(); i != m_torrents.end();)
00928 {
00929 torrent& t = *i->second;
00930 assert(!t.is_aborted());
00931 if (t.should_request())
00932 {
00933 tracker_request req = t.generate_tracker_request();
00934 req.listen_port = m_listen_interface.port();
00935 req.key = m_key;
00936 m_tracker_manager.queue_request(m_strand, req, t.tracker_login()
00937 , m_listen_interface.address(), i->second);
00938
00939 if (m_alerts.should_post(alert::info))
00940 {
00941 m_alerts.post_alert(
00942 tracker_announce_alert(
00943 t.get_handle(), "tracker announce"));
00944 }
00945 }
00946
00947
00948 t.second_tick(m_stat, tick_interval);
00949 ++i;
00950 }
00951
00952 m_stat.second_tick(tick_interval);
00953
00954
00955
00956 assert(m_max_uploads >= -1);
00957 assert(m_max_connections >= -1);
00958
00959 allocate_resources(m_max_uploads == -1
00960 ? std::numeric_limits<int>::max()
00961 : m_max_uploads
00962 , m_torrents
00963 , &torrent::m_uploads_quota);
00964
00965 allocate_resources(m_max_connections == -1
00966 ? std::numeric_limits<int>::max()
00967 : m_max_connections
00968 , m_torrents
00969 , &torrent::m_connections_quota);
00970
00971 for (std::map<sha1_hash, boost::shared_ptr<torrent> >::iterator i
00972 = m_torrents.begin(); i != m_torrents.end(); ++i)
00973 {
00974 #ifndef NDEBUG
00975 i->second->check_invariant();
00976 #endif
00977 i->second->distribute_resources(tick_interval);
00978 }
00979 }
00980 catch (std::exception& exc)
00981 {
00982 #ifndef NDEBUG
00983 std::string err = exc.what();
00984 #endif
00985 };
00986
00987 void session_impl::connection_completed(
00988 boost::intrusive_ptr<peer_connection> const& p)
00989 #ifndef NDEBUG
00990 try
00991 #endif
00992 {
00993 mutex_t::scoped_lock l(m_mutex);
00994
00995 connection_map::iterator i = m_half_open.find(p->get_socket());
00996 m_connections.insert(std::make_pair(p->get_socket(), p));
00997 assert(i != m_half_open.end());
00998 if (i != m_half_open.end()) m_half_open.erase(i);
00999
01000 if (m_abort) return;
01001
01002 process_connection_queue();
01003 }
01004 #ifndef NDEBUG
01005 catch (std::exception& e)
01006 {
01007 assert(false);
01008 };
01009 #endif
01010
01011 void session_impl::operator()()
01012 {
01013 eh_initializer();
01014
01015 if (m_listen_port_range.first != 0 && m_listen_port_range.second != 0)
01016 {
01017 session_impl::mutex_t::scoped_lock l(m_mutex);
01018 open_listen_port();
01019 }
01020
01021 boost::posix_time::ptime timer = second_clock::universal_time();
01022
01023 do
01024 {
01025 try
01026 {
01027 m_io_service.run();
01028 assert(m_abort == true);
01029 }
01030 catch (std::exception& e)
01031 {
01032 #ifndef NDEBUG
01033 std::cerr << e.what() << "\n";
01034 std::string err = e.what();
01035 #endif
01036 assert(false);
01037 }
01038 }
01039 while (!m_abort);
01040
01041 deadline_timer tracker_timer(m_io_service);
01042
01043 session_impl::mutex_t::scoped_lock l(m_mutex);
01044
01045 m_tracker_manager.abort_all_requests();
01046 for (std::map<sha1_hash, boost::shared_ptr<torrent> >::iterator i =
01047 m_torrents.begin(); i != m_torrents.end(); ++i)
01048 {
01049 i->second->abort();
01050
01051
01052
01053
01054 if ((!i->second->is_paused() || i->second->should_request())
01055 && !i->second->trackers().empty())
01056 {
01057 tracker_request req = i->second->generate_tracker_request();
01058 req.listen_port = m_listen_interface.port();
01059 req.key = m_key;
01060 std::string login = i->second->tracker_login();
01061 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
01062 boost::shared_ptr<tracker_logger> tl(new tracker_logger(*this));
01063 m_tracker_loggers.push_back(tl);
01064 m_tracker_manager.queue_request(m_strand, req, login, m_listen_interface.address(), tl);
01065 #else
01066 m_tracker_manager.queue_request(m_strand, req, login, m_listen_interface.address());
01067 #endif
01068 }
01069 }
01070
01071 ptime start(microsec_clock::universal_time());
01072 l.unlock();
01073
01074 while (microsec_clock::universal_time() - start < seconds(
01075 m_settings.stop_tracker_timeout)
01076 && !m_tracker_manager.empty())
01077 {
01078 tracker_timer.expires_from_now(boost::posix_time::milliseconds(100));
01079 tracker_timer.async_wait(m_strand.wrap(
01080 bind(&io_service::stop, &m_io_service)));
01081
01082 m_io_service.reset();
01083 m_io_service.run();
01084 }
01085
01086 l.lock();
01087 assert(m_abort);
01088 m_abort = true;
01089
01090 m_connection_queue.clear();
01091
01092 while (!m_half_open.empty())
01093 m_half_open.begin()->second->disconnect();
01094
01095 while (!m_connections.empty())
01096 m_connections.begin()->second->disconnect();
01097
01098 #ifndef NDEBUG
01099 for (torrent_map::iterator i = m_torrents.begin();
01100 i != m_torrents.end(); ++i)
01101 {
01102 assert(i->second->num_peers() == 0);
01103 }
01104 #endif
01105
01106 m_torrents.clear();
01107
01108 assert(m_torrents.empty());
01109 assert(m_connections.empty());
01110 }
01111
01112
01113
01114 boost::weak_ptr<torrent> session_impl::find_torrent(sha1_hash const& info_hash)
01115 {
01116 std::map<sha1_hash, boost::shared_ptr<torrent> >::iterator i
01117 = m_torrents.find(info_hash);
01118 #ifndef NDEBUG
01119 for (std::map<sha1_hash, boost::shared_ptr<torrent> >::iterator j
01120 = m_torrents.begin(); j != m_torrents.end(); ++j)
01121 {
01122 torrent* p = boost::get_pointer(j->second);
01123 assert(p);
01124 }
01125 #endif
01126 if (i != m_torrents.end()) return i->second;
01127 return boost::weak_ptr<torrent>();
01128 }
01129
01130 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
01131 boost::shared_ptr<logger> session_impl::create_log(std::string const& name
01132 , int instance, bool append)
01133 {
01134
01135 return boost::shared_ptr<logger>(new logger(name + ".log", instance, append));
01136 }
01137 #endif
01138
01139 std::vector<torrent_handle> session_impl::get_torrents()
01140 {
01141 mutex_t::scoped_lock l(m_mutex);
01142 mutex::scoped_lock l2(m_checker_impl.m_mutex);
01143 std::vector<torrent_handle> ret;
01144 for (std::deque<boost::shared_ptr<aux::piece_checker_data> >::iterator i
01145 = m_checker_impl.m_torrents.begin()
01146 , end(m_checker_impl.m_torrents.end()); i != end; ++i)
01147 {
01148 if ((*i)->abort) continue;
01149 ret.push_back(torrent_handle(this, &m_checker_impl
01150 , (*i)->info_hash));
01151 }
01152
01153 for (std::deque<boost::shared_ptr<aux::piece_checker_data> >::iterator i
01154 = m_checker_impl.m_processing.begin()
01155 , end(m_checker_impl.m_processing.end()); i != end; ++i)
01156 {
01157 if ((*i)->abort) continue;
01158 ret.push_back(torrent_handle(this, &m_checker_impl
01159 , (*i)->info_hash));
01160 }
01161
01162 for (session_impl::torrent_map::iterator i
01163 = m_torrents.begin(), end(m_torrents.end());
01164 i != end; ++i)
01165 {
01166 if (i->second->is_aborted()) continue;
01167 ret.push_back(torrent_handle(this, &m_checker_impl
01168 , i->first));
01169 }
01170 return ret;
01171 }
01172
01173 torrent_handle session_impl::find_torrent_handle(sha1_hash const& info_hash)
01174 {
01175 return torrent_handle(this, &m_checker_impl, info_hash);
01176 }
01177
01178 torrent_handle session_impl::add_torrent(
01179 torrent_info const& ti
01180 , boost::filesystem::path const& save_path
01181 , entry const& resume_data
01182 , bool compact_mode
01183 , int block_size)
01184 {
01185
01186 #ifndef NDEBUG
01187 for (int i = 0; i < 32; ++i)
01188 {
01189 if (block_size & (1 << i))
01190 {
01191 assert((block_size & ~(1 << i)) == 0);
01192 break;
01193 }
01194 }
01195 #endif
01196
01197 assert(!save_path.empty());
01198
01199 if (ti.begin_files() == ti.end_files())
01200 throw std::runtime_error("no files in torrent");
01201
01202
01203 mutex_t::scoped_lock l(m_mutex);
01204 mutex::scoped_lock l2(m_checker_impl.m_mutex);
01205
01206 if (is_aborted())
01207 throw std::runtime_error("session is closing");
01208
01209
01210 if (!find_torrent(ti.info_hash()).expired())
01211 throw duplicate_torrent();
01212
01213
01214 if (m_checker_impl.find_torrent(ti.info_hash()))
01215 throw duplicate_torrent();
01216
01217
01218
01219
01220 boost::shared_ptr<torrent> torrent_ptr(
01221 new torrent(*this, m_checker_impl, ti, save_path
01222 , m_listen_interface, compact_mode, block_size
01223 , settings()));
01224
01225 #ifndef TORRENT_DISABLE_EXTENSIONS
01226 for (extension_list_t::iterator i = m_extensions.begin()
01227 , end(m_extensions.end()); i != end; ++i)
01228 {
01229 boost::shared_ptr<torrent_plugin> tp((*i)(torrent_ptr.get()));
01230 if (tp) torrent_ptr->add_extension(tp);
01231 }
01232 #endif
01233
01234 boost::shared_ptr<aux::piece_checker_data> d(
01235 new aux::piece_checker_data);
01236 d->torrent_ptr = torrent_ptr;
01237 d->save_path = save_path;
01238 d->info_hash = ti.info_hash();
01239 d->resume_data = resume_data;
01240
01241 #ifndef TORRENT_DISABLE_DHT
01242 if (m_dht)
01243 {
01244 torrent_info::nodes_t const& nodes = ti.nodes();
01245 std::for_each(nodes.begin(), nodes.end(), bind(
01246 (void(dht::dht_tracker::*)(std::pair<std::string, int> const&))
01247 &dht::dht_tracker::add_node
01248 , boost::ref(m_dht), _1));
01249 }
01250 #endif
01251
01252
01253 m_checker_impl.m_torrents.push_back(d);
01254
01255
01256 m_checker_impl.m_cond.notify_one();
01257
01258 return torrent_handle(this, &m_checker_impl, ti.info_hash());
01259 }
01260
01261 torrent_handle session_impl::add_torrent(
01262 char const* tracker_url
01263 , sha1_hash const& info_hash
01264 , char const* name
01265 , boost::filesystem::path const& save_path
01266 , entry const&
01267 , bool compact_mode
01268 , int block_size)
01269 {
01270
01271 #ifndef NDEBUG
01272 for (int i = 0; i < 32; ++i)
01273 {
01274 if (block_size & (1 << i))
01275 {
01276 assert((block_size & ~(1 << i)) == 0);
01277 break;
01278 }
01279 }
01280 #endif
01281
01282
01283 assert(!save_path.empty());
01284 {
01285
01286 mutex::scoped_lock l(m_checker_impl.m_mutex);
01287
01288
01289 if (m_checker_impl.find_torrent(info_hash))
01290 throw duplicate_torrent();
01291 }
01292
01293
01294 session_impl::mutex_t::scoped_lock l(m_mutex);
01295
01296
01297 if (!find_torrent(info_hash).expired())
01298 throw duplicate_torrent();
01299
01300
01301 assert(!is_aborted());
01302
01303
01304
01305
01306 boost::shared_ptr<torrent> torrent_ptr(
01307 new torrent(*this, m_checker_impl, tracker_url, info_hash, name
01308 , save_path, m_listen_interface, compact_mode, block_size
01309 , settings()));
01310
01311 #ifndef TORRENT_DISABLE_EXTENSIONS
01312 for (extension_list_t::iterator i = m_extensions.begin()
01313 , end(m_extensions.end()); i != end; ++i)
01314 {
01315 boost::shared_ptr<torrent_plugin> tp((*i)(torrent_ptr.get()));
01316 if (tp) torrent_ptr->add_extension(tp);
01317 }
01318 #endif
01319
01320 m_torrents.insert(
01321 std::make_pair(info_hash, torrent_ptr)).first;
01322
01323 return torrent_handle(this, &m_checker_impl, info_hash);
01324 }
01325
01326 void session_impl::remove_torrent(const torrent_handle& h)
01327 {
01328 if (h.m_ses != this) return;
01329 assert(h.m_chk == &m_checker_impl || h.m_chk == 0);
01330 assert(h.m_ses != 0);
01331
01332 mutex_t::scoped_lock l(m_mutex);
01333 session_impl::torrent_map::iterator i =
01334 m_torrents.find(h.m_info_hash);
01335 if (i != m_torrents.end())
01336 {
01337 torrent& t = *i->second;
01338 t.abort();
01339
01340 if ((!t.is_paused() || t.should_request())
01341 && !t.torrent_file().trackers().empty())
01342 {
01343 tracker_request req = t.generate_tracker_request();
01344 assert(req.event == tracker_request::stopped);
01345 req.listen_port = m_listen_interface.port();
01346 req.key = m_key;
01347
01348 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
01349 boost::shared_ptr<tracker_logger> tl(new tracker_logger(*this));
01350 m_tracker_loggers.push_back(tl);
01351 m_tracker_manager.queue_request(m_strand, req
01352 , t.tracker_login(), m_listen_interface.address(), tl);
01353 #else
01354 m_tracker_manager.queue_request(m_strand, req
01355 , t.tracker_login(), m_listen_interface.address());
01356 #endif
01357
01358 if (m_alerts.should_post(alert::info))
01359 {
01360 m_alerts.post_alert(
01361 tracker_announce_alert(
01362 t.get_handle(), "tracker announce, event=stopped"));
01363 }
01364 }
01365 #ifndef NDEBUG
01366 sha1_hash i_hash = t.torrent_file().info_hash();
01367 #endif
01368 m_torrents.erase(i);
01369 assert(m_torrents.find(i_hash) == m_torrents.end());
01370 return;
01371 }
01372 l.unlock();
01373
01374 if (h.m_chk)
01375 {
01376 mutex::scoped_lock l(m_checker_impl.m_mutex);
01377
01378 aux::piece_checker_data* d = m_checker_impl.find_torrent(h.m_info_hash);
01379 if (d != 0)
01380 {
01381 if (d->processing) d->abort = true;
01382 else m_checker_impl.remove_torrent(h.m_info_hash);
01383 return;
01384 }
01385 }
01386 }
01387
01388 bool session_impl::listen_on(
01389 std::pair<int, int> const& port_range
01390 , const char* net_interface)
01391 {
01392 session_impl::mutex_t::scoped_lock l(m_mutex);
01393
01394 tcp::endpoint new_interface;
01395 if (net_interface && std::strlen(net_interface) > 0)
01396 new_interface = tcp::endpoint(address::from_string(net_interface), port_range.first);
01397 else
01398 new_interface = tcp::endpoint(address(), port_range.first);
01399
01400 m_listen_port_range = port_range;
01401
01402
01403
01404 if (new_interface == m_listen_interface
01405 && m_listen_socket) return true;
01406
01407 if (m_listen_socket)
01408 m_listen_socket.reset();
01409
01410 #ifndef TORRENT_DISABLE_DHT
01411 if (m_listen_interface.address() != new_interface.address()
01412 && m_dht)
01413 {
01414
01415 m_dht->rebind(new_interface.address()
01416 , m_dht_settings.service_port);
01417 }
01418 #endif
01419
01420 m_incoming_connection = false;
01421 m_listen_interface = new_interface;
01422
01423 open_listen_port();
01424
01425 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
01426 m_logger = create_log("main_session", listen_port(), false);
01427 using boost::posix_time::second_clock;
01428 using boost::posix_time::to_simple_string;
01429 (*m_logger) << to_simple_string(second_clock::universal_time()) << "\n";
01430 #endif
01431
01432 return m_listen_socket;
01433 }
01434
01435 unsigned short session_impl::listen_port() const
01436 {
01437 mutex_t::scoped_lock l(m_mutex);
01438 return m_listen_interface.port();
01439 }
01440
01441 session_status session_impl::status() const
01442 {
01443 mutex_t::scoped_lock l(m_mutex);
01444 session_status s;
01445 s.has_incoming_connections = m_incoming_connection;
01446 s.num_peers = (int)m_connections.size();
01447
01448 s.download_rate = m_stat.download_rate();
01449 s.upload_rate = m_stat.upload_rate();
01450
01451 s.payload_download_rate = m_stat.download_payload_rate();
01452 s.payload_upload_rate = m_stat.upload_payload_rate();
01453
01454 s.total_download = m_stat.total_protocol_download()
01455 + m_stat.total_payload_download();
01456
01457 s.total_upload = m_stat.total_protocol_upload()
01458 + m_stat.total_payload_upload();
01459
01460 s.total_payload_download = m_stat.total_payload_download();
01461 s.total_payload_upload = m_stat.total_payload_upload();
01462
01463 #ifndef TORRENT_DISABLE_DHT
01464 if (m_dht)
01465 {
01466 m_dht->dht_status(s);
01467 }
01468 else
01469 {
01470 s.dht_nodes = 0;
01471 s.dht_node_cache = 0;
01472 s.dht_torrents = 0;
01473 }
01474 #endif
01475
01476 return s;
01477 }
01478
01479 #ifndef TORRENT_DISABLE_DHT
01480
01481 void session_impl::start_dht(entry const& startup_state)
01482 {
01483 mutex_t::scoped_lock l(m_mutex);
01484 if (m_dht)
01485 {
01486 m_dht->stop();
01487 m_dht = 0;
01488 }
01489 m_dht = new dht::dht_tracker(m_io_service
01490 , m_dht_settings, m_listen_interface.address()
01491 , startup_state);
01492 }
01493
01494 void session_impl::stop_dht()
01495 {
01496 mutex_t::scoped_lock l(m_mutex);
01497 if (!m_dht) return;
01498 m_dht->stop();
01499 m_dht = 0;
01500 }
01501
01502 void session_impl::set_dht_settings(dht_settings const& settings)
01503 {
01504 mutex_t::scoped_lock l(m_mutex);
01505 if (settings.service_port != m_dht_settings.service_port
01506 && m_dht)
01507 {
01508 m_dht->rebind(m_listen_interface.address()
01509 , settings.service_port);
01510 }
01511 m_dht_settings = settings;
01512 }
01513
01514 entry session_impl::dht_state() const
01515 {
01516 assert(m_dht);
01517 mutex_t::scoped_lock l(m_mutex);
01518 return m_dht->state();
01519 }
01520
01521 void session_impl::add_dht_node(std::pair<std::string, int> const& node)
01522 {
01523 assert(m_dht);
01524 mutex_t::scoped_lock l(m_mutex);
01525 m_dht->add_node(node);
01526 }
01527
01528 void session_impl::add_dht_router(std::pair<std::string, int> const& node)
01529 {
01530 assert(m_dht);
01531 mutex_t::scoped_lock l(m_mutex);
01532 m_dht->add_router_node(node);
01533 }
01534
01535 #endif
01536
01537 void session_impl::set_download_rate_limit(int bytes_per_second)
01538 {
01539 assert(bytes_per_second > 0 || bytes_per_second == -1);
01540 mutex_t::scoped_lock l(m_mutex);
01541 if (bytes_per_second == -1) bytes_per_second = bandwidth_limit::inf;
01542 m_dl_bandwidth_manager.throttle(bytes_per_second);
01543 }
01544
01545 bool session_impl::is_listening() const
01546 {
01547 mutex_t::scoped_lock l(m_mutex);
01548 return m_listen_socket;
01549 }
01550
01551 session_impl::~session_impl()
01552 {
01553 #ifndef TORRENT_DISABLE_DHT
01554 stop_dht();
01555 #endif
01556
01557 mutex_t::scoped_lock l(m_mutex);
01558 m_abort = true;
01559 m_io_service.stop();
01560 l.unlock();
01561
01562 m_thread->join();
01563
01564
01565
01566
01567
01568
01569
01570
01571 {
01572 mutex::scoped_lock l(m_checker_impl.m_mutex);
01573
01574 m_checker_impl.m_abort = true;
01575
01576
01577 if (!m_checker_impl.m_torrents.empty())
01578 {
01579 m_checker_impl.m_torrents.front()->abort = true;
01580 }
01581 m_checker_impl.m_cond.notify_one();
01582 }
01583
01584 m_checker_thread->join();
01585
01586 assert(m_torrents.empty());
01587 assert(m_connections.empty());
01588 }
01589
01590 void session_impl::set_max_uploads(int limit)
01591 {
01592 assert(limit > 0 || limit == -1);
01593 mutex_t::scoped_lock l(m_mutex);
01594 m_max_uploads = limit;
01595 }
01596
01597 void session_impl::set_max_connections(int limit)
01598 {
01599 assert(limit > 0 || limit == -1);
01600 mutex_t::scoped_lock l(m_mutex);
01601 m_max_connections = limit;
01602 }
01603
01604 void session_impl::set_max_half_open_connections(int limit)
01605 {
01606 assert(limit > 0 || limit == -1);
01607 mutex_t::scoped_lock l(m_mutex);
01608
01609 m_half_open_limit = limit;
01610 }
01611
01612 void session_impl::set_upload_rate_limit(int bytes_per_second)
01613 {
01614 assert(bytes_per_second > 0 || bytes_per_second == -1);
01615 mutex_t::scoped_lock l(m_mutex);
01616 if (bytes_per_second == -1) bytes_per_second = bandwidth_limit::inf;
01617 m_ul_bandwidth_manager.throttle(bytes_per_second);
01618 }
01619
01620 int session_impl::num_uploads() const
01621 {
01622 int uploads = 0;
01623 mutex_t::scoped_lock l(m_mutex);
01624 for (torrent_map::const_iterator i = m_torrents.begin()
01625 , end(m_torrents.end()); i != end; i++)
01626 {
01627 uploads += i->second->get_policy().num_uploads();
01628 }
01629 return uploads;
01630 }
01631
01632 int session_impl::num_connections() const
01633 {
01634 mutex_t::scoped_lock l(m_mutex);
01635 return m_connections.size() + m_half_open.size();
01636 }
01637
01638 std::auto_ptr<alert> session_impl::pop_alert()
01639 {
01640 mutex_t::scoped_lock l(m_mutex);
01641 if (m_alerts.pending())
01642 return m_alerts.get();
01643 return std::auto_ptr<alert>(0);
01644 }
01645
01646 void session_impl::set_severity_level(alert::severity_t s)
01647 {
01648 mutex_t::scoped_lock l(m_mutex);
01649 m_alerts.set_severity(s);
01650 }
01651
01652 int session_impl::upload_rate_limit() const
01653 {
01654 mutex_t::scoped_lock l(m_mutex);
01655 return m_ul_bandwidth_manager.throttle();
01656 }
01657
01658 int session_impl::download_rate_limit() const
01659 {
01660 mutex_t::scoped_lock l(m_mutex);
01661 return m_dl_bandwidth_manager.throttle();
01662 }
01663
01664 #ifndef NDEBUG
01665 void session_impl::check_invariant(const char *place)
01666 {
01667 assert(place);
01668
01669 for (connection_map::iterator i = m_half_open.begin();
01670 i != m_half_open.end(); ++i)
01671 {
01672 assert(i->second->is_connecting());
01673 }
01674
01675 for (connection_map::iterator i = m_connections.begin();
01676 i != m_connections.end(); ++i)
01677 {
01678 assert(i->second);
01679 assert(!i->second->is_connecting());
01680 if (i->second->is_connecting())
01681 {
01682 std::ofstream error_log("error.log", std::ios_base::app);
01683 boost::intrusive_ptr<peer_connection> p = i->second;
01684 error_log << "peer_connection::is_connecting() " << p->is_connecting() << "\n";
01685 error_log << "peer_connection::can_write() " << p->can_write() << "\n";
01686 error_log << "peer_connection::can_read() " << p->can_read() << "\n";
01687 error_log << "peer_connection::get_peer_id " << p->pid() << "\n";
01688 error_log << "place: " << place << "\n";
01689 error_log.flush();
01690 assert(false);
01691 }
01692
01693 boost::shared_ptr<torrent> t = i->second->associated_torrent().lock();
01694
01695 if (t)
01696 {
01697 assert(t->get_policy().has_connection(boost::get_pointer(i->second)));
01698 }
01699 }
01700 }
01701 #endif
01702
01703 void piece_checker_data::parse_resume_data(
01704 const entry& resume_data
01705 , const torrent_info& info
01706 , std::string& error)
01707 {
01708
01709 if (resume_data.type() == entry::undefined_t) return;
01710
01711 entry rd = resume_data;
01712
01713 try
01714 {
01715 if (rd["file-format"].string() != "libtorrent resume file")
01716 {
01717 error = "missing file format tag";
01718 return;
01719 }
01720
01721 if (rd["file-version"].integer() > 1)
01722 {
01723 error = "incompatible file version "
01724 + boost::lexical_cast<std::string>(rd["file-version"].integer());
01725 return;
01726 }
01727
01728
01729 sha1_hash hash = rd["info-hash"].string();
01730 if (hash != info.info_hash())
01731 {
01732 error = "mismatching info-hash: " + boost::lexical_cast<std::string>(hash);
01733 return;
01734 }
01735
01736
01737
01738 if (rd.find_key("peers"))
01739 {
01740 entry::list_type& peer_list = rd["peers"].list();
01741
01742 std::vector<tcp::endpoint> tmp_peers;
01743 tmp_peers.reserve(peer_list.size());
01744 for (entry::list_type::iterator i = peer_list.begin();
01745 i != peer_list.end(); ++i)
01746 {
01747 tcp::endpoint a(
01748 address::from_string((*i)["ip"].string())
01749 , (unsigned short)(*i)["port"].integer());
01750 tmp_peers.push_back(a);
01751 }
01752
01753 peers.swap(tmp_peers);
01754 }
01755
01756
01757 const entry::list_type& slots = rd["slots"].list();
01758 if ((int)slots.size() > info.num_pieces())
01759 {
01760 error = "file has more slots than torrent (slots: "
01761 + boost::lexical_cast<std::string>(slots.size()) + " size: "
01762 + boost::lexical_cast<std::string>(info.num_pieces()) + " )";
01763 return;
01764 }
01765
01766 std::vector<int> tmp_pieces;
01767 tmp_pieces.reserve(slots.size());
01768 for (entry::list_type::const_iterator i = slots.begin();
01769 i != slots.end(); ++i)
01770 {
01771 int index = (int)i->integer();
01772 if (index >= info.num_pieces() || index < -2)
01773 {
01774 error = "too high index number in slot map (index: "
01775 + boost::lexical_cast<std::string>(index) + " size: "
01776 + boost::lexical_cast<std::string>(info.num_pieces()) + ")";
01777 return;
01778 }
01779 tmp_pieces.push_back(index);
01780 }
01781
01782
01783
01784
01785 std::vector<piece_picker::downloading_piece> tmp_unfinished;
01786 int num_blocks_per_piece = (int)rd["blocks per piece"].integer();
01787 if (num_blocks_per_piece == info.piece_length() / torrent_ptr->block_size())
01788 {
01789
01790
01791 entry::list_type& unfinished = rd["unfinished"].list();
01792
01793 tmp_unfinished.reserve(unfinished.size());
01794 for (entry::list_type::iterator i = unfinished.begin();
01795 i != unfinished.end(); ++i)
01796 {
01797 piece_picker::downloading_piece p;
01798
01799 p.index = (int)(*i)["piece"].integer();
01800 if (p.index < 0 || p.index >= info.num_pieces())
01801 {
01802 error = "invalid piece index in unfinished piece list (index: "
01803 + boost::lexical_cast<std::string>(p.index) + " size: "
01804 + boost::lexical_cast<std::string>(info.num_pieces()) + ")";
01805 return;
01806 }
01807
01808 const std::string& bitmask = (*i)["bitmask"].string();
01809
01810 const int num_bitmask_bytes = std::max(num_blocks_per_piece / 8, 1);
01811 if ((int)bitmask.size() != num_bitmask_bytes)
01812 {
01813 error = "invalid size of bitmask (" + boost::lexical_cast<std::string>(bitmask.size()) + ")";
01814 return;
01815 }
01816 for (int j = 0; j < num_bitmask_bytes; ++j)
01817 {
01818 unsigned char bits = bitmask[j];
01819 for (int k = 0; k < 8; ++k)
01820 {
01821 const int bit = j * 8 + k;
01822 if (bits & (1 << k))
01823 p.finished_blocks[bit] = true;
01824 }
01825 }
01826
01827 if (p.finished_blocks.count() == 0) continue;
01828
01829 std::vector<int>::iterator slot_iter
01830 = std::find(tmp_pieces.begin(), tmp_pieces.end(), p.index);
01831 if (slot_iter == tmp_pieces.end())
01832 {
01833
01834
01835 error = "piece " + boost::lexical_cast<std::string>(p.index) + " is "
01836 "marked as unfinished, but doesn't have any storage";
01837 return;
01838 }
01839
01840 assert(*slot_iter == p.index);
01841 int slot_index = static_cast<int>(slot_iter - tmp_pieces.begin());
01842 unsigned long adler
01843 = torrent_ptr->filesystem().piece_crc(
01844 slot_index
01845 , torrent_ptr->block_size()
01846 , p.finished_blocks);
01847
01848 const entry& ad = (*i)["adler32"];
01849
01850
01851 if (ad.integer() != entry::integer_type(adler))
01852 {
01853 error = "checksum mismatch on piece " + boost::lexical_cast<std::string>(p.index);
01854 return;
01855 }
01856
01857 tmp_unfinished.push_back(p);
01858 }
01859 }
01860
01861
01862
01863 std::vector<std::pair<size_type, std::time_t> > file_sizes;
01864 entry::list_type& l = rd["file sizes"].list();
01865
01866 for (entry::list_type::iterator i = l.begin();
01867 i != l.end(); ++i)
01868 {
01869 file_sizes.push_back(std::pair<size_type, std::time_t>(
01870 i->list().front().integer()
01871 , i->list().back().integer()));
01872 }
01873
01874 if ((int)tmp_pieces.size() == info.num_pieces()
01875 && std::find_if(tmp_pieces.begin(), tmp_pieces.end()
01876 , boost::bind<bool>(std::less<int>(), _1, 0)) == tmp_pieces.end())
01877 {
01878 if (info.num_files() != (int)file_sizes.size())
01879 {
01880 error = "the number of files does not match the torrent (num: "
01881 + boost::lexical_cast<std::string>(file_sizes.size()) + " actual: "
01882 + boost::lexical_cast<std::string>(info.num_files()) + ")";
01883 return;
01884 }
01885
01886 std::vector<std::pair<size_type, std::time_t> >::iterator
01887 fs = file_sizes.begin();
01888
01889
01890 for (torrent_info::file_iterator i = info.begin_files()
01891 , end(info.end_files()); i != end; ++i, ++fs)
01892 {
01893 if (i->size != fs->first)
01894 {
01895 error = "file size for '" + i->path.native_file_string() + "' was expected to be "
01896 + boost::lexical_cast<std::string>(i->size) + " bytes";
01897 return;
01898 }
01899 }
01900 }
01901
01902 if (!match_filesizes(info, save_path, file_sizes, &error))
01903 return;
01904
01905 piece_map.swap(tmp_pieces);
01906 unfinished_pieces.swap(tmp_unfinished);
01907 }
01908 catch (invalid_encoding)
01909 {
01910 return;
01911 }
01912 catch (type_error)
01913 {
01914 return;
01915 }
01916 catch (file_error)
01917 {
01918 return;
01919 }
01920 }
01921 }}
01922