torrent 是怎么样交换信息的?
torrent 本身像爬虫:
- 自身建立端口, udp datagram
- 自身找到DHT中心,寻找其他节点
- 发送查询请求给到其他节点,让其他节点介绍新节点
- 响应其他节点的四种操作
- 如果其他节点需要查询 infohash,则对 infohash 记录并给它介绍其他节点。(说明它要的infohash是当前热门的)
去中心服务器? 通过infohash下载torrent文件。 得到torrent文件可以进行解析,获得文件名,大小,类型,文件列表。 详情可以看 libtorrent cpp github 源码
版本 RC2.0, 从 test/bt-get3 得知 下载bt的时候会建立 session,
在 session 去解析 magnet 去得到 torrent
torrent_handle session_handle::add_torrent(add_torrent_params&& params)
{
TORRENT_ASSERT_PRECOND(!params.save_path.empty());
#if TORRENT_ABI_VERSION < 3
if (!params.info_hashes.has_v1() && !params.info_hashes.has_v2() && !params.ti)
params.info_hashes.v1 = params.info_hash;
#endif
// the internal torrent object keeps and mutates state in the
// torrent_info object. We can't let that leak back to the client
if (params.ti)
params.ti = std::make_shared<torrent_info>(*params.ti);
#if TORRENT_ABI_VERSION == 1
handle_backwards_compatible_resume_data(params);
#endif
error_code ec;
auto ecr = std::ref(ec);
torrent_handle r = sync_call_ret<torrent_handle>(&session_impl::add_torrent, std::move(params), ecr);
if (ec) aux::throw_ex<system_error>(ec);
return r;
}
void session_handle::async_add_torrent(add_torrent_params&& params)
{
TORRENT_ASSERT_PRECOND(!params.save_path.empty());
#if TORRENT_ABI_VERSION < 3
if (!params.info_hashes.has_v1() && !params.info_hashes.has_v2() && !params.ti)
params.info_hashes.v1 = params.info_hash;
#endif
// the internal torrent object keeps and mutates state in the
// torrent_info object. We can't let that leak back to the client
if (params.ti)
params.ti = std::make_shared<torrent_info>(*params.ti);
// we cannot capture a unique_ptr into a lambda in c++11, so we use a raw
// pointer for now. async_call uses a lambda expression to post the call
// to the main thread
// TODO: in C++14, use unique_ptr and move it into the lambda
auto* p = new add_torrent_params(std::move(params));
auto guard = aux::scope_end([p]{ delete p; });
p->save_path = complete(p->save_path);
#if TORRENT_ABI_VERSION == 1
handle_backwards_compatible_resume_data(*p);
#endif
async_call(&session_impl::async_add_torrent, p);
guard.disarm();
}
// 关键 add_torrent
void session_impl::async_add_torrent(add_torrent_params* params)
{
std::unique_ptr<add_torrent_params> holder(params);
error_code ec;
add_torrent(std::move(*params), ec);
}
// 关键 add_torrent_impl
torrent_handle session_impl::add_torrent(add_torrent_params&& params
, error_code& ec)
{
std::shared_ptr<torrent> torrent_ptr;
// in case there's an error, make sure to abort the torrent before leaving
// the scope
auto abort_torrent = aux::scope_end([&]{ if (torrent_ptr) torrent_ptr->abort(); });
#ifndef TORRENT_DISABLE_EXTENSIONS
auto extensions = std::move(params.extensions);
auto const userdata = std::move(params.userdata);
#endif
// copy the most important fields from params to pass back in the
// add_torrent_alert
add_torrent_params alert_params;
alert_params.flags = params.flags;
alert_params.ti = params.ti;
alert_params.name = params.name;
alert_params.save_path = params.save_path;
alert_params.userdata = params.userdata;
alert_params.trackerid = params.trackerid;
auto const flags = params.flags;
info_hash_t info_hash;
bool added;
std::tie(torrent_ptr, info_hash, added) = add_torrent_impl(std::move(params), ec);
alert_params.info_hashes = info_hash;
torrent_handle handle(torrent_ptr);
if (!torrent_ptr)
{
m_alerts.emplace_alert<add_torrent_alert>(handle, std::move(alert_params), ec);
return handle;
}
TORRENT_ASSERT(info_hash.has_v1() || info_hash.has_v2());
#if TORRENT_ABI_VERSION == 1
if (m_alerts.should_post<torrent_added_alert>())
m_alerts.emplace_alert<torrent_added_alert>(handle);
#endif
// if this was an existing torrent, we can't start it again, or add
// another set of plugins etc. we're done
if (!added)
{
abort_torrent.disarm();
m_alerts.emplace_alert<add_torrent_alert>(handle, std::move(alert_params), ec);
return handle;
}
torrent_ptr->set_ip_filter(m_ip_filter);
torrent_ptr->start();
#ifndef TORRENT_DISABLE_EXTENSIONS
for (auto& ext : extensions)
{
std::shared_ptr<torrent_plugin> tp(ext(handle, userdata));
if (tp) torrent_ptr->add_extension(std::move(tp));
}
add_extensions_to_torrent(torrent_ptr, userdata);
#endif
TORRENT_ASSERT(info_hash == torrent_ptr->torrent_file().info_hashes());
insert_torrent(info_hash, torrent_ptr);
m_alerts.emplace_alert<add_torrent_alert>(handle, std::move(alert_params), ec);
// once we successfully add the torrent, we can disarm the abort action
abort_torrent.disarm();
// recalculate auto-managed torrents sooner (or put it off)
// if another torrent will be added within one second from now
// we want to put it off again anyway. So that while we're adding
// a boat load of torrents, we postpone the recalculation until
// we're done adding them all (since it's kind of an expensive operation)
if (flags & torrent_flags::auto_managed)
{
const int max_downloading = settings().get_int(settings_pack::active_downloads);
const int max_seeds = settings().get_int(settings_pack::active_seeds);
const int max_active = settings().get_int(settings_pack::active_limit);
const int num_downloading
= int(torrent_list(session_interface::torrent_downloading_auto_managed).size());
const int num_seeds
= int(torrent_list(session_interface::torrent_seeding_auto_managed).size());
const int num_active = num_downloading + num_seeds;
// there's no point in triggering the auto manage logic early if we
// don't have a reason to believe anything will change. It's kind of
// expensive.
if ((num_downloading < max_downloading
|| num_seeds < max_seeds)
&& num_active < max_active)
{
trigger_auto_manage();
}
}
return handle;
}
std::tuple<std::shared_ptr<torrent>, info_hash_t, bool>
session_impl::add_torrent_impl(add_torrent_params&& params, error_code& ec)
{
TORRENT_ASSERT(!params.save_path.empty());
using ptr_t = std::shared_ptr<torrent>;
using ret_t = std::tuple<std::shared_ptr<torrent>, info_hash_t, bool>;
#if TORRENT_ABI_VERSION == 1
if (string_begins_no_case("magnet:", params.url.c_str()))
{
parse_magnet_uri(params.url, params, ec);
if (ec) return ret_t{ptr_t(), params.info_hashes, false};
params.url.clear();
}
#endif
if (params.ti && !params.ti->is_valid())
{
ec = errors::no_metadata;
return ret_t{ptr_t(), params.info_hashes, false};
}
if (params.ti && params.ti->is_valid() && params.ti->num_files() == 0)
{
ec = errors::no_files_in_torrent;
return ret_t{ptr_t(), params.info_hashes, false};
}
if (params.ti
&& ((params.info_hashes.has_v1() && params.info_hashes.v1 != params.ti->info_hashes().v1)
|| (params.info_hashes.has_v2() && params.info_hashes.v2 != params.ti->info_hashes().v2)
))
{
ec = errors::mismatching_info_hash;
return ret_t{ptr_t(), params.info_hashes, false};
}
#ifndef TORRENT_DISABLE_DHT
// add params.dht_nodes to the DHT, if enabled
for (auto const& n : params.dht_nodes)
add_dht_node_name(n);
if (params.ti)
{
for (auto const& n : params.ti->nodes())
add_dht_node_name(n);
}
#endif
INVARIANT_CHECK;
if (is_aborted())
{
ec = errors::session_is_closing;
return ret_t{ptr_t(), params.info_hashes, false};
}
// figure out the info hash of the torrent and make sure
// params.info_hashes is set correctly
if (params.ti)
{
params.info_hashes = params.ti->info_hashes();
#if TORRENT_ABI_VERSION < 3
params.info_hash = params.info_hashes.get_best();
#endif
}
if (!params.info_hashes.has_v1() && !params.info_hashes.has_v2())
{
ec = errors::missing_info_hash_in_uri;
return ret_t{ptr_t(), params.info_hashes, false};
}
// is the torrent already active?
std::shared_ptr<torrent> torrent_ptr = find_torrent(params.info_hashes).lock();
if (torrent_ptr)
{
if (!(params.flags & torrent_flags::duplicate_is_error))
return ret_t{std::move(torrent_ptr), params.info_hashes, false};
ec = errors::duplicate_torrent;
return ret_t{ptr_t(), params.info_hashes, false};
}
// make sure we have enough memory in the torrent lists up-front,
// since when torrents changes states, we cannot allocate memory that
// might fail.
size_t const num_torrents = m_torrents.size();
for (auto& l : m_torrent_lists)
{
l.reserve(num_torrents + 1);
}
try
{
torrent_ptr = std::make_shared<torrent>(*this, m_paused, std::move(params));
torrent_ptr->set_queue_position(m_download_queue.end_index());
}
catch (system_error const& e)
{
ec = e.code();
return ret_t{ptr_t(), params.info_hashes, false};
}
// it's fine to copy this moved-from info_hash_t object, since its move
// construction is just a copy.
return ret_t{std::move(torrent_ptr), params.info_hashes, true};
}
从上面代码可以看出, 始终在尝试建立 torrent_ptr, 最终会得到 torrent。把 magnet 里面 url 解析以后给到 torrent 类实例化。 一万多行的代码,有点难看。
// we need to start announcing since we don't have any
// metadata. To receive peers to ask for it.
set_state(torrent_status::downloading_metadata);
void torrent::start_announcing()
{
TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(state() != torrent_status::checking_files);
if (is_paused())
{
#ifndef TORRENT_DISABLE_LOGGING
debug_log("start_announcing(), paused");
#endif
return;
}
// if we don't have metadata, we need to announce
// before checking files, to get peers to
// request the metadata from
if (!m_files_checked && valid_metadata())
{
#ifndef TORRENT_DISABLE_LOGGING
debug_log("start_announcing(), files not checked (with valid metadata)");
#endif
return;
}
if (m_announcing) return;
m_announcing = true;
#ifndef TORRENT_DISABLE_DHT
if ((!m_peer_list || m_peer_list->num_peers() < 50) && m_ses.dht())
{
// we don't have any peers, prioritize
// announcing this torrent with the DHT
m_ses.prioritize_dht(shared_from_this());
}
#endif
if (!m_trackers.empty())
{
// tell the tracker that we're back
for (auto& t : m_trackers) t.reset();
}
// reset the stats, since from the tracker's
// point of view, this is a new session
m_total_failed_bytes = 0;
m_total_redundant_bytes = 0;
m_stat.clear();
update_want_tick();
announce_with_tracker();
lsd_announce();
}
前面都是异常处理, 重要的是
- update_want_tick
- announce_with_tracker
- lsd_announce
update_want_tick 更新列表用的
void torrent::update_want_tick()
{
update_list(aux::session_interface::torrent_want_tick, want_tick());
}
announce_with_tracker 给 tracker 们发信息
m_ses.queue_tracker_request(req, tl);
void announce_with_tracker(event_t = event_t::none);
void torrent::announce_with_tracker(event_t e)
{
TORRENT_ASSERT(is_single_thread());
TORRENT_ASSERT(e == event_t::stopped || state() != torrent_status::checking_files);
INVARIANT_CHECK;
if (m_trackers.empty())
{
#ifndef TORRENT_DISABLE_LOGGING
debug_log("*** announce: no trackers");
#endif
return;
}
if (m_abort) e = event_t::stopped;
// having stop_tracker_timeout <= 0 means that there is
// no need to send any request to trackers or trigger any
// related logic when the event is stopped
if (e == event_t::stopped
&& settings().get_int(settings_pack::stop_tracker_timeout) <= 0)
{
#ifndef TORRENT_DISABLE_LOGGING
debug_log("*** announce: event == stopped && stop_tracker_timeout <= 0");
#endif
return;
}
// if we're not announcing to trackers, only allow
// stopping
if (e != event_t::stopped && !m_announce_to_trackers)
{
#ifndef TORRENT_DISABLE_LOGGING
debug_log("*** announce: event != stopped && !m_announce_to_trackers");
#endif
return;
}
// if we're not allowing peers, there's no point in announcing
if (e != event_t::stopped && m_paused)
{
#ifndef TORRENT_DISABLE_LOGGING
debug_log("*** announce: event != stopped && m_paused");
#endif
return;
}
TORRENT_ASSERT(!m_paused || e == event_t::stopped);
if (e == event_t::none && is_finished() && !is_seed())
e = event_t::paused;
tracker_request req;
if (settings().get_bool(settings_pack::apply_ip_filter_to_trackers)
&& m_apply_ip_filter)
{
req.filter = m_ip_filter;
}
req.private_torrent = m_torrent_file->priv();
req.pid = m_peer_id;
req.downloaded = m_stat.total_payload_download() - m_total_failed_bytes;
req.uploaded = m_stat.total_payload_upload();
req.corrupt = m_total_failed_bytes;
req.left = value_or(bytes_left(), 16*1024);
#ifdef TORRENT_SSL_PEERS
// if this torrent contains an SSL certificate, make sure
// any SSL tracker presents a certificate signed by it
req.ssl_ctx = m_ssl_ctx.get();
#endif
req.redundant = m_total_redundant_bytes;
// exclude redundant bytes if we should
if (!settings().get_bool(settings_pack::report_true_downloaded))
{
req.downloaded -= m_total_redundant_bytes;
// if the torrent is complete we know that all incoming pieces will be
// marked redundant so add them to the redundant count
// this is mainly needed to cover the case where a torrent has just completed
// but still has partially downloaded pieces
// if the incoming pieces are not accounted for it could cause the downloaded
// amount to exceed the total size of the torrent which upsets some trackers
if (is_seed())
{
for (auto c : m_connections)
{
TORRENT_INCREMENT(m_iterating_connections);
auto const pbp = c->downloading_piece_progress();
if (pbp.bytes_downloaded > 0)
{
req.downloaded -= pbp.bytes_downloaded;
req.redundant += pbp.bytes_downloaded;
}
}
}
}
if (req.downloaded < 0) req.downloaded = 0;
req.event = e;
// since sending our IPv4/v6 address to the tracker may be sensitive. Only
// do that if we're not in anonymous mode and if it's a private torrent
if (!settings().get_bool(settings_pack::anonymous_mode)
&& m_torrent_file
&& m_torrent_file->priv())
{
m_ses.for_each_listen_socket([&](aux::listen_socket_handle const& s)
{
if (s.is_ssl() != is_ssl_torrent()) return;
tcp::endpoint const ep = s.get_local_endpoint();
if (ep.address().is_unspecified()) return;
if (aux::is_v6(ep))
{
if (!aux::is_local(ep.address()) && !ep.address().is_loopback())
req.ipv6.push_back(ep.address().to_v6());
}
else
{
if (!aux::is_local(ep.address()) && !ep.address().is_loopback())
req.ipv4.push_back(ep.address().to_v4());
}
});
}
// if we are aborting. we don't want any new peers
req.num_want = (req.event == event_t::stopped)
? 0 : settings().get_int(settings_pack::num_want);
// some older versions of clang had a bug where it would fire this warning here
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wmissing-braces"
#endif
aux::array<bool const, num_protocols, protocol_version> const supports_protocol
{ {
m_info_hash.has_v1(),
m_info_hash.has_v2()
} };
#ifdef __clang__
#pragma clang diagnostic pop
#endif
time_point32 const now = aux::time_now32();
// each listen socket gets its own announce state
// so that each one should get at least one announce
std::vector<announce_state> listen_socket_states;
#ifndef TORRENT_DISABLE_LOGGING
int idx = -1;
if (should_log())
{
debug_log("*** announce: "
"[ announce_to_all_tiers: %d announce_to_all_trackers: %d num_trackers: %d ]"
, settings().get_bool(settings_pack::announce_to_all_tiers)
, settings().get_bool(settings_pack::announce_to_all_trackers)
, int(m_trackers.size()));
}
#endif
for (auto& ae : m_trackers)
{
#ifndef TORRENT_DISABLE_LOGGING
++idx;
#endif
refresh_endpoint_list(m_ses, ae.url, is_ssl_torrent(), bool(m_complete_sent), ae.endpoints);
// if trackerid is not specified for tracker use default one, probably set explicitly
req.trackerid = ae.trackerid.empty() ? m_trackerid : ae.trackerid;
req.url = ae.url;
#if TORRENT_USE_I2P
if (is_i2p_url(req.url))
{
req.kind |= tracker_request::i2p;
}
else if (is_i2p() && !settings().get_bool(settings_pack::allow_i2p_mixed))
{
// if we don't allow mixing normal peers into this i2p
// torrent, skip this announce
continue;
}
#endif
for (auto& aep : ae.endpoints)
{
// do not add code which continues to the next endpoint here!
// listen_socket_states needs to be populated even if none of the endpoints
// will be announcing for this tracker
// otherwise the early bail out when neither announce_to_all_trackers
// nor announce_to_all_tiers is set may be triggered prematurely
auto aep_state_iter = std::find_if(listen_socket_states.begin(), listen_socket_states.end()
, [&](announce_state const& s) { return s.socket == aep.socket; });
if (aep_state_iter == listen_socket_states.end())
{
listen_socket_states.emplace_back(aep.socket);
aep_state_iter = listen_socket_states.end() - 1;
}
announce_state& ep_state = *aep_state_iter;
if (!aep.enabled) continue;
for (protocol_version const ih : all_versions)
{
if (!supports_protocol[ih]) continue;
auto& state = ep_state.state[ih];
auto& a = aep.info_hashes[ih];
// if we haven't sent an event=start to the tracker, there's no
// point in sending an event=stopped
if (!a.start_sent && req.event == event_t::stopped)
continue;
if (state.done) continue;
#ifndef TORRENT_DISABLE_LOGGING
if (should_log())
{
debug_log("*** tracker: (%d) [ep: %s ] \"%s\" [ "
" i->tier: %d tier: %d working: %d limit: %d"
" can: %d sent: %d ]"
, idx, print_endpoint(aep.local_endpoint).c_str()
, ae.url.c_str(), ae.tier, state.tier, a.is_working(), ae.fail_limit
, a.can_announce(now, is_seed(), ae.fail_limit), state.sent_announce);
}
#endif
if (settings().get_bool(settings_pack::announce_to_all_tiers)
&& !settings().get_bool(settings_pack::announce_to_all_trackers)
&& state.sent_announce
&& ae.tier <= state.tier
&& state.tier != INT_MAX)
continue;
if (ae.tier > state.tier && state.sent_announce
&& !settings().get_bool(settings_pack::announce_to_all_tiers)) continue;
if (a.is_working()) { state.tier = ae.tier; state.sent_announce = false; }
if (!a.can_announce(now, is_seed(), ae.fail_limit))
{
// this counts
if (a.is_working())
{
state.sent_announce = true;
if (!settings().get_bool(settings_pack::announce_to_all_trackers)
&& !settings().get_bool(settings_pack::announce_to_all_tiers))
{
state.done = true;
}
}
continue;
}
req.event = e;
if (req.event == event_t::none)
{
if (!a.start_sent) req.event = event_t::started;
else if (!m_complete_sent
&& !a.complete_sent
&& is_seed())
{
req.event = event_t::completed;
}
}
req.triggered_manually = a.triggered_manually;
a.triggered_manually = false;
#if TORRENT_ABI_VERSION == 1
req.auth = tracker_login();
#endif
req.key = tracker_key();
req.outgoing_socket = aep.socket;
req.info_hash = m_torrent_file->info_hashes().get(ih);
#ifndef TORRENT_DISABLE_LOGGING
if (should_log())
{
debug_log("==> TRACKER REQUEST \"%s\" event: %s abort: %d ssl: %p "
"port: %d ssl-port: %d fails: %d upd: %d ep: %s"
, req.url.c_str()
, (req.event == event_t::stopped ? "stopped"
: req.event == event_t::started ? "started" : "")
, m_abort
#ifdef TORRENT_SSL_PEERS
, static_cast<void*>(req.ssl_ctx)
#else
, static_cast<void*>(nullptr)
#endif
, m_ses.listen_port()
, m_ses.ssl_listen_port()
, a.fails
, a.updating
, print_endpoint(aep.local_endpoint).c_str());
}
// if we're not logging session logs, don't bother creating an
// observer object just for logging
if (m_abort && m_ses.should_log())
{
auto tl = std::make_shared<aux::tracker_logger>(m_ses);
m_ses.queue_tracker_request(req, tl);
}
else
#endif
{
m_ses.queue_tracker_request(req, shared_from_this());
}
a.updating = true;
a.next_announce = now;
a.min_announce = now;
if (m_ses.alerts().should_post<tracker_announce_alert>())
{
m_ses.alerts().emplace_alert<tracker_announce_alert>(
get_handle(), aep.local_endpoint, req.url, ih, req.event);
}
state.sent_announce = true;
if (a.is_working()
&& !settings().get_bool(settings_pack::announce_to_all_trackers)
&& !settings().get_bool(settings_pack::announce_to_all_tiers))
{
state.done = true;
}
}
}
if (std::all_of(listen_socket_states.begin(), listen_socket_states.end()
, [supports_protocol](announce_state const& s) {
for (protocol_version const ih : all_versions)
{
if (supports_protocol[ih] && !s.state[ih].done)
return false;
}
return true;
}))
break;
}
update_tracker_timer(now);
}
追调用堆栈太多了。跳着看。搜索关键字,metadata,可以看到核心函数:
bool received_metadata(ut_metadata_peer_plugin& source
, span<char const> buf, int piece, int total_size);
// returns a piece of the metadata that
// we should request.
// returns -1 if we should hold off the request
int metadata_request(bool has_metadata);
bool on_extended(int const length
, int const extended_msg, span<char const> body) override
{
if (extended_msg != 2) return false;
if (m_message_index == 0) return false;
if (length > 17 * 1024)
{
#ifndef TORRENT_DISABLE_LOGGING
m_pc.peer_log(peer_log_alert::incoming_message, "UT_METADATA"
, "packet too big %d", length);
#endif
m_pc.disconnect(errors::invalid_metadata_message, operation_t::bittorrent, peer_connection_interface::peer_error);
return true;
}
if (!m_pc.packet_finished()) return true;
error_code ec;
bdecode_node msg = bdecode(body, ec);
if (msg.type() != bdecode_node::dict_t)
{
#ifndef TORRENT_DISABLE_LOGGING
m_pc.peer_log(peer_log_alert::incoming_message, "UT_METADATA"
, "not a dictionary");
#endif
m_pc.disconnect(errors::invalid_metadata_message, operation_t::bittorrent, peer_connection_interface::peer_error);
return true;
}
bdecode_node const& type_ent = msg.dict_find_int("msg_type");
bdecode_node const& piece_ent = msg.dict_find_int("piece");
if (!type_ent || !piece_ent)
{
#ifndef TORRENT_DISABLE_LOGGING
m_pc.peer_log(peer_log_alert::incoming_message, "UT_METADATA"
, "missing or invalid keys");
#endif
m_pc.disconnect(errors::invalid_metadata_message, operation_t::bittorrent, peer_connection_interface::peer_error);
return true;
}
auto const type = msg_t(type_ent.int_value());
auto const piece = static_cast<int>(piece_ent.int_value());
#ifndef TORRENT_DISABLE_LOGGING
m_pc.peer_log(peer_log_alert::incoming_message, "UT_METADATA"
, "type: %d piece: %d", static_cast<int>(type), piece);
#endif
switch (type)
{
case msg_t::request:
{
if (!m_torrent.valid_metadata()
|| piece < 0 || piece >= (m_tp.metadata().size() + 16 * 1024 - 1) / (16 * 1024))
{
#ifndef TORRENT_DISABLE_LOGGING
if (m_pc.should_log(peer_log_alert::info))
{
m_pc.peer_log(peer_log_alert::info, "UT_METADATA"
, "have: %d invalid piece %d metadata size: %d"
, int(m_torrent.valid_metadata()), piece
, m_torrent.valid_metadata()
? int(m_tp.metadata().size()) : 0);
}
#endif
write_metadata_packet(msg_t::dont_have, piece);
return true;
}
if (m_pc.send_buffer_size() < send_buffer_limit)
write_metadata_packet(msg_t::piece, piece);
else if (m_incoming_requests.size() < max_incoming_requests)
m_incoming_requests.push_back(piece);
else
write_metadata_packet(msg_t::dont_have, piece);
}
break;
case msg_t::piece:
{
auto const i = std::find(m_sent_requests.begin()
, m_sent_requests.end(), piece);
// unwanted piece?
if (i == m_sent_requests.end())
{
#ifndef TORRENT_DISABLE_LOGGING
m_pc.peer_log(peer_log_alert::info, "UT_METADATA"
, "UNWANTED / TIMED OUT");
#endif
return true;
}
m_sent_requests.erase(i);
auto const len = msg.data_section().size();
auto const total_size = msg.dict_find_int_value("total_size", 0);
m_tp.received_metadata(*this, body.subspan(len), piece, static_cast<int>(total_size));
maybe_send_request();
}
break;
case msg_t::dont_have:
{
m_request_limit = std::max(aux::time_now() + minutes(1), m_request_limit);
auto const i = std::find(m_sent_requests.begin()
, m_sent_requests.end(), piece);
// unwanted piece?
if (i == m_sent_requests.end()) return true;
m_sent_requests.erase(i);
}
break;
}
m_pc.stats_counters().inc_stats_counter(counters::num_incoming_metadata);
return true;
}
void bt_peer_connection::on_extended(int received)
{
INVARIANT_CHECK;
TORRENT_ASSERT(received >= 0);
received_bytes(0, received);
if (m_recv_buffer.packet_size() < 2)
{
disconnect(errors::invalid_extended, operation_t::bittorrent, peer_error);
return;
}
if (associated_torrent().expired())
{
disconnect(errors::invalid_extended, operation_t::bittorrent, peer_error);
return;
}
span<char const> recv_buffer = m_recv_buffer.get();
if (int(recv_buffer.size()) < 2) return;
TORRENT_ASSERT(recv_buffer.front() == msg_extended);
recv_buffer = recv_buffer.subspan(1);
int const extended_id = aux::read_uint8(recv_buffer);
if (extended_id == 0)
{
on_extended_handshake();
disconnect_if_redundant();
return;
}
if (extended_id == upload_only_msg)
{
if (!m_recv_buffer.packet_finished()) return;
if (m_recv_buffer.packet_size() != 3)
{
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming_message, "UPLOAD_ONLY"
, "ERROR: unexpected packet size: %d", m_recv_buffer.packet_size());
#endif
return;
}
bool const ul = aux::read_uint8(recv_buffer) != 0;
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming_message, "UPLOAD_ONLY"
, "%s", (ul?"true":"false"));
#endif
set_upload_only(ul);
return;
}
#ifndef TORRENT_DISABLE_SHARE_MODE
if (extended_id == share_mode_msg)
{
if (!m_recv_buffer.packet_finished()) return;
if (m_recv_buffer.packet_size() != 3)
{
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming_message, "SHARE_MODE"
, "ERROR: unexpected packet size: %d", m_recv_buffer.packet_size());
#endif
return;
}
bool sm = aux::read_uint8(recv_buffer) != 0;
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming_message, "SHARE_MODE"
, "%s", (sm?"true":"false"));
#endif
set_share_mode(sm);
return;
}
#endif // TORRENT_DISABLE_SHARE_MODE
if (extended_id == holepunch_msg)
{
if (!m_recv_buffer.packet_finished()) return;
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming_message, "HOLEPUNCH");
#endif
on_holepunch();
return;
}
if (extended_id == dont_have_msg)
{
if (!m_recv_buffer.packet_finished()) return;
if (m_recv_buffer.packet_size() != 6)
{
#ifndef TORRENT_DISABLE_LOGGING
peer_log(peer_log_alert::incoming_message, "DONT_HAVE"
, "ERROR: unexpected packet size: %d", m_recv_buffer.packet_size());
#endif
return;
}
piece_index_t const piece(aux::read_int32(recv_buffer));
incoming_dont_have(piece);
return;
}
#ifndef TORRENT_DISABLE_LOGGING
if (m_recv_buffer.packet_finished())
peer_log(peer_log_alert::incoming_message, "EXTENSION_MESSAGE"
, "msg: %d size: %d", extended_id, m_recv_buffer.packet_size());
#endif
#ifndef TORRENT_DISABLE_EXTENSIONS
for (auto const& e : m_extensions)
{
if (e->on_extended(m_recv_buffer.packet_size() - 2, extended_id
, recv_buffer))
return;
}
#endif
disconnect(errors::invalid_message, operation_t::bittorrent, peer_error);
}
至此,答案明了了。
总结:
下载者拿着 info_hash 会从 dht 中获取 peer, 如果有 tracker, 也在 tracker 中询问 peer,然后对 peer 询问有无相同文件的 metadata ,如果有,获取 peer 传输的 metadata。
on_extended 是 peer_connection 的一种插件设计,如果有包收到,则检查插件列表。