lib torrent 源码分析

2024-11-02

torrent 是怎么样交换信息的?

torrent 本身像爬虫:

  1. 自身建立端口, udp datagram
  2. 自身找到DHT中心,寻找其他节点
  3. 发送查询请求给到其他节点,让其他节点介绍新节点
  4. 响应其他节点的四种操作
  5. 如果其他节点需要查询 infohash,则对 infohash 记录并给它介绍其他节点。(说明它要的infohash是当前热门的)

去中心服务器? 通过infohash下载torrent文件。 得到torrent文件可以进行解析,获得文件名,大小,类型,文件列表。 详情可以看 libtorrent cpp github 源码

版本 RC2.0, 从 test/bt-get3 得知 下载bt的时候会建立 session,

在 session 去解析 magnet 去得到 torrent


	torrent_handle session_handle::add_torrent(add_torrent_params&& params)
	{
		TORRENT_ASSERT_PRECOND(!params.save_path.empty());

#if TORRENT_ABI_VERSION < 3
		if (!params.info_hashes.has_v1() && !params.info_hashes.has_v2() && !params.ti)
			params.info_hashes.v1 = params.info_hash;
#endif

		// the internal torrent object keeps and mutates state in the
		// torrent_info object. We can't let that leak back to the client
		if (params.ti)
			params.ti = std::make_shared<torrent_info>(*params.ti);

#if TORRENT_ABI_VERSION == 1
		handle_backwards_compatible_resume_data(params);
#endif
		error_code ec;
		auto ecr = std::ref(ec);
		torrent_handle r = sync_call_ret<torrent_handle>(&session_impl::add_torrent, std::move(params), ecr);
		if (ec) aux::throw_ex<system_error>(ec);
		return r;
	}

	void session_handle::async_add_torrent(add_torrent_params&& params)
	{
		TORRENT_ASSERT_PRECOND(!params.save_path.empty());

#if TORRENT_ABI_VERSION < 3
		if (!params.info_hashes.has_v1() && !params.info_hashes.has_v2() && !params.ti)
			params.info_hashes.v1 = params.info_hash;
#endif

		// the internal torrent object keeps and mutates state in the
		// torrent_info object. We can't let that leak back to the client
		if (params.ti)
			params.ti = std::make_shared<torrent_info>(*params.ti);

		// we cannot capture a unique_ptr into a lambda in c++11, so we use a raw
		// pointer for now. async_call uses a lambda expression to post the call
		// to the main thread
		// TODO: in C++14, use unique_ptr and move it into the lambda
		auto* p = new add_torrent_params(std::move(params));
		auto guard = aux::scope_end([p]{ delete p; });
		p->save_path = complete(p->save_path);

#if TORRENT_ABI_VERSION == 1
		handle_backwards_compatible_resume_data(*p);
#endif

		async_call(&session_impl::async_add_torrent, p);
		guard.disarm();
	}


// 关键 add_torrent
	void session_impl::async_add_torrent(add_torrent_params* params)
	{
		std::unique_ptr<add_torrent_params> holder(params);
		error_code ec;
		add_torrent(std::move(*params), ec);
	}


// 关键 add_torrent_impl
	torrent_handle session_impl::add_torrent(add_torrent_params&& params
		, error_code& ec)
	{
		std::shared_ptr<torrent> torrent_ptr;

		// in case there's an error, make sure to abort the torrent before leaving
		// the scope
		auto abort_torrent = aux::scope_end([&]{ if (torrent_ptr) torrent_ptr->abort(); });

#ifndef TORRENT_DISABLE_EXTENSIONS
		auto extensions = std::move(params.extensions);
		auto const userdata = std::move(params.userdata);
#endif

		// copy the most important fields from params to pass back in the
		// add_torrent_alert
		add_torrent_params alert_params;
		alert_params.flags = params.flags;
		alert_params.ti = params.ti;
		alert_params.name = params.name;
		alert_params.save_path = params.save_path;
		alert_params.userdata = params.userdata;
		alert_params.trackerid = params.trackerid;

		auto const flags = params.flags;

		info_hash_t info_hash;
		bool added;
		std::tie(torrent_ptr, info_hash, added) = add_torrent_impl(std::move(params), ec);

		alert_params.info_hashes = info_hash;

		torrent_handle handle(torrent_ptr);

		if (!torrent_ptr)
		{
			m_alerts.emplace_alert<add_torrent_alert>(handle, std::move(alert_params), ec);
			return handle;
		}

		TORRENT_ASSERT(info_hash.has_v1() || info_hash.has_v2());

#if TORRENT_ABI_VERSION == 1
		if (m_alerts.should_post<torrent_added_alert>())
			m_alerts.emplace_alert<torrent_added_alert>(handle);
#endif

		// if this was an existing torrent, we can't start it again, or add
		// another set of plugins etc. we're done
		if (!added)
		{
			abort_torrent.disarm();
			m_alerts.emplace_alert<add_torrent_alert>(handle, std::move(alert_params), ec);
			return handle;
		}

		torrent_ptr->set_ip_filter(m_ip_filter);
		torrent_ptr->start();

#ifndef TORRENT_DISABLE_EXTENSIONS
		for (auto& ext : extensions)
		{
			std::shared_ptr<torrent_plugin> tp(ext(handle, userdata));
			if (tp) torrent_ptr->add_extension(std::move(tp));
		}

		add_extensions_to_torrent(torrent_ptr, userdata);
#endif

		TORRENT_ASSERT(info_hash == torrent_ptr->torrent_file().info_hashes());
		insert_torrent(info_hash, torrent_ptr);

        m_alerts.emplace_alert<add_torrent_alert>(handle, std::move(alert_params), ec);

		// once we successfully add the torrent, we can disarm the abort action
		abort_torrent.disarm();

		// recalculate auto-managed torrents sooner (or put it off)
		// if another torrent will be added within one second from now
		// we want to put it off again anyway. So that while we're adding
		// a boat load of torrents, we postpone the recalculation until
		// we're done adding them all (since it's kind of an expensive operation)
		if (flags & torrent_flags::auto_managed)
		{
			const int max_downloading = settings().get_int(settings_pack::active_downloads);
			const int max_seeds = settings().get_int(settings_pack::active_seeds);
			const int max_active = settings().get_int(settings_pack::active_limit);

			const int num_downloading
			= int(torrent_list(session_interface::torrent_downloading_auto_managed).size());
			const int num_seeds
			= int(torrent_list(session_interface::torrent_seeding_auto_managed).size());
			const int num_active = num_downloading + num_seeds;

			// there's no point in triggering the auto manage logic early if we
			// don't have a reason to believe anything will change. It's kind of
			// expensive.
			if ((num_downloading < max_downloading
				|| num_seeds < max_seeds)
				&& num_active < max_active)
			{
				trigger_auto_manage();
			}
		}

		return handle;
	}


    
	std::tuple<std::shared_ptr<torrent>, info_hash_t, bool>
	session_impl::add_torrent_impl(add_torrent_params&& params, error_code& ec)
	{
		TORRENT_ASSERT(!params.save_path.empty());

		using ptr_t = std::shared_ptr<torrent>;
		using ret_t = std::tuple<std::shared_ptr<torrent>, info_hash_t, bool>;

#if TORRENT_ABI_VERSION == 1
		if (string_begins_no_case("magnet:", params.url.c_str()))
		{
			parse_magnet_uri(params.url, params, ec);
			if (ec) return ret_t{ptr_t(), params.info_hashes, false};
			params.url.clear();
		}
#endif

		if (params.ti && !params.ti->is_valid())
		{
			ec = errors::no_metadata;
			return ret_t{ptr_t(), params.info_hashes, false};
		}

		if (params.ti && params.ti->is_valid() && params.ti->num_files() == 0)
		{
			ec = errors::no_files_in_torrent;
			return ret_t{ptr_t(), params.info_hashes, false};
		}

		if (params.ti
			&& ((params.info_hashes.has_v1() && params.info_hashes.v1 != params.ti->info_hashes().v1)
				|| (params.info_hashes.has_v2() && params.info_hashes.v2 != params.ti->info_hashes().v2)
			))
		{
			ec = errors::mismatching_info_hash;
			return ret_t{ptr_t(), params.info_hashes, false};
		}

#ifndef TORRENT_DISABLE_DHT
		// add params.dht_nodes to the DHT, if enabled
		for (auto const& n : params.dht_nodes)
			add_dht_node_name(n);

		if (params.ti)
		{
			for (auto const& n : params.ti->nodes())
				add_dht_node_name(n);
		}
#endif

		INVARIANT_CHECK;

		if (is_aborted())
		{
			ec = errors::session_is_closing;
			return ret_t{ptr_t(), params.info_hashes, false};
		}

		// figure out the info hash of the torrent and make sure
		// params.info_hashes is set correctly
		if (params.ti)
		{
			params.info_hashes = params.ti->info_hashes();
#if TORRENT_ABI_VERSION < 3
			params.info_hash = params.info_hashes.get_best();
#endif
		}

		if (!params.info_hashes.has_v1() && !params.info_hashes.has_v2())
		{
			ec = errors::missing_info_hash_in_uri;
			return ret_t{ptr_t(), params.info_hashes, false};
		}

		// is the torrent already active?
		std::shared_ptr<torrent> torrent_ptr = find_torrent(params.info_hashes).lock();

		if (torrent_ptr)
		{
			if (!(params.flags & torrent_flags::duplicate_is_error))
				return ret_t{std::move(torrent_ptr), params.info_hashes, false};

			ec = errors::duplicate_torrent;
			return ret_t{ptr_t(), params.info_hashes, false};
		}

		// make sure we have enough memory in the torrent lists up-front,
		// since when torrents changes states, we cannot allocate memory that
		// might fail.
		size_t const num_torrents = m_torrents.size();
		for (auto& l : m_torrent_lists)
		{
			l.reserve(num_torrents + 1);
		}

		try
		{
			torrent_ptr = std::make_shared<torrent>(*this, m_paused, std::move(params));
			torrent_ptr->set_queue_position(m_download_queue.end_index());
		}
		catch (system_error const& e)
		{
			ec = e.code();
			return ret_t{ptr_t(), params.info_hashes, false};
		}

		// it's fine to copy this moved-from info_hash_t object, since its move
		// construction is just a copy.
		return ret_t{std::move(torrent_ptr), params.info_hashes, true};
	}


从上面代码可以看出, 始终在尝试建立 torrent_ptr, 最终会得到 torrent。把 magnet 里面 url 解析以后给到 torrent 类实例化。 一万多行的代码,有点难看。

torrent.cpp

			// we need to start announcing since we don't have any
			// metadata. To receive peers to ask for it.
			set_state(torrent_status::downloading_metadata);


            
	void torrent::start_announcing()
	{
		TORRENT_ASSERT(is_single_thread());
		TORRENT_ASSERT(state() != torrent_status::checking_files);
		if (is_paused())
		{
#ifndef TORRENT_DISABLE_LOGGING
			debug_log("start_announcing(), paused");
#endif
			return;
		}
		// if we don't have metadata, we need to announce
		// before checking files, to get peers to
		// request the metadata from
		if (!m_files_checked && valid_metadata())
		{
#ifndef TORRENT_DISABLE_LOGGING
			debug_log("start_announcing(), files not checked (with valid metadata)");
#endif
			return;
		}
		if (m_announcing) return;

		m_announcing = true;

#ifndef TORRENT_DISABLE_DHT
		if ((!m_peer_list || m_peer_list->num_peers() < 50) && m_ses.dht())
		{
			// we don't have any peers, prioritize
			// announcing this torrent with the DHT
			m_ses.prioritize_dht(shared_from_this());
		}
#endif

		if (!m_trackers.empty())
		{
			// tell the tracker that we're back
			for (auto& t : m_trackers) t.reset();
		}

		// reset the stats, since from the tracker's
		// point of view, this is a new session
		m_total_failed_bytes = 0;
		m_total_redundant_bytes = 0;
		m_stat.clear();

		update_want_tick();

		announce_with_tracker();

		lsd_announce();
	}

前面都是异常处理, 重要的是

  • update_want_tick
  • announce_with_tracker
  • lsd_announce

update_want_tick 更新列表用的

	void torrent::update_want_tick()
	{
		update_list(aux::session_interface::torrent_want_tick, want_tick());
	}

announce_with_tracker 给 tracker 们发信息

						m_ses.queue_tracker_request(req, tl);
	void announce_with_tracker(event_t = event_t::none);


	void torrent::announce_with_tracker(event_t e)
	{
		TORRENT_ASSERT(is_single_thread());
		TORRENT_ASSERT(e == event_t::stopped || state() != torrent_status::checking_files);
		INVARIANT_CHECK;

		if (m_trackers.empty())
		{
#ifndef TORRENT_DISABLE_LOGGING
			debug_log("*** announce: no trackers");
#endif
			return;
		}

		if (m_abort) e = event_t::stopped;

		// having stop_tracker_timeout <= 0 means that there is
		// no need to send any request to trackers or trigger any
		// related logic when the event is stopped
		if (e == event_t::stopped
			&& settings().get_int(settings_pack::stop_tracker_timeout) <= 0)
		{
#ifndef TORRENT_DISABLE_LOGGING
			debug_log("*** announce: event == stopped && stop_tracker_timeout <= 0");
#endif
			return;
		}

		// if we're not announcing to trackers, only allow
		// stopping
		if (e != event_t::stopped && !m_announce_to_trackers)
		{
#ifndef TORRENT_DISABLE_LOGGING
			debug_log("*** announce: event != stopped && !m_announce_to_trackers");
#endif
			return;
		}

		// if we're not allowing peers, there's no point in announcing
		if (e != event_t::stopped && m_paused)
		{
#ifndef TORRENT_DISABLE_LOGGING
			debug_log("*** announce: event != stopped && m_paused");
#endif
			return;
		}

		TORRENT_ASSERT(!m_paused || e == event_t::stopped);

		if (e == event_t::none && is_finished() && !is_seed())
			e = event_t::paused;

		tracker_request req;
		if (settings().get_bool(settings_pack::apply_ip_filter_to_trackers)
			&& m_apply_ip_filter)
		{
			req.filter = m_ip_filter;
		}

		req.private_torrent = m_torrent_file->priv();

		req.pid = m_peer_id;
		req.downloaded = m_stat.total_payload_download() - m_total_failed_bytes;
		req.uploaded = m_stat.total_payload_upload();
		req.corrupt = m_total_failed_bytes;
		req.left = value_or(bytes_left(), 16*1024);
#ifdef TORRENT_SSL_PEERS
		// if this torrent contains an SSL certificate, make sure
		// any SSL tracker presents a certificate signed by it
		req.ssl_ctx = m_ssl_ctx.get();
#endif

		req.redundant = m_total_redundant_bytes;
		// exclude redundant bytes if we should
		if (!settings().get_bool(settings_pack::report_true_downloaded))
		{
			req.downloaded -= m_total_redundant_bytes;

			// if the torrent is complete we know that all incoming pieces will be
			// marked redundant so add them to the redundant count
			// this is mainly needed to cover the case where a torrent has just completed
			// but still has partially downloaded pieces
			// if the incoming pieces are not accounted for it could cause the downloaded
			// amount to exceed the total size of the torrent which upsets some trackers
			if (is_seed())
			{
				for (auto c : m_connections)
				{
					TORRENT_INCREMENT(m_iterating_connections);
					auto const pbp = c->downloading_piece_progress();
					if (pbp.bytes_downloaded > 0)
					{
						req.downloaded -= pbp.bytes_downloaded;
						req.redundant += pbp.bytes_downloaded;
					}
				}
			}
		}
		if (req.downloaded < 0) req.downloaded = 0;

		req.event = e;

		// since sending our IPv4/v6 address to the tracker may be sensitive. Only
		// do that if we're not in anonymous mode and if it's a private torrent
		if (!settings().get_bool(settings_pack::anonymous_mode)
			&& m_torrent_file
			&& m_torrent_file->priv())
		{
			m_ses.for_each_listen_socket([&](aux::listen_socket_handle const& s)
			{
				if (s.is_ssl() != is_ssl_torrent()) return;
				tcp::endpoint const ep = s.get_local_endpoint();
				if (ep.address().is_unspecified()) return;
				if (aux::is_v6(ep))
				{
					if (!aux::is_local(ep.address()) && !ep.address().is_loopback())
						req.ipv6.push_back(ep.address().to_v6());
				}
				else
				{
					if (!aux::is_local(ep.address()) && !ep.address().is_loopback())
						req.ipv4.push_back(ep.address().to_v4());
				}
			});
		}

		// if we are aborting. we don't want any new peers
		req.num_want = (req.event == event_t::stopped)
			? 0 : settings().get_int(settings_pack::num_want);

// some older versions of clang had a bug where it would fire this warning here
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wmissing-braces"
#endif
		aux::array<bool const, num_protocols, protocol_version> const supports_protocol
		{ {
			m_info_hash.has_v1(),
			m_info_hash.has_v2()
		} };
#ifdef __clang__
#pragma clang diagnostic pop
#endif

		time_point32 const now = aux::time_now32();

		// each listen socket gets its own announce state
		// so that each one should get at least one announce
		std::vector<announce_state> listen_socket_states;

#ifndef TORRENT_DISABLE_LOGGING
		int idx = -1;
		if (should_log())
		{
			debug_log("*** announce: "
				"[ announce_to_all_tiers: %d announce_to_all_trackers: %d num_trackers: %d ]"
				, settings().get_bool(settings_pack::announce_to_all_tiers)
				, settings().get_bool(settings_pack::announce_to_all_trackers)
				, int(m_trackers.size()));
		}
#endif

		for (auto& ae : m_trackers)
		{
#ifndef TORRENT_DISABLE_LOGGING
			++idx;
#endif
			refresh_endpoint_list(m_ses, ae.url, is_ssl_torrent(), bool(m_complete_sent), ae.endpoints);

			// if trackerid is not specified for tracker use default one, probably set explicitly
			req.trackerid = ae.trackerid.empty() ? m_trackerid : ae.trackerid;
			req.url = ae.url;

#if TORRENT_USE_I2P
			if (is_i2p_url(req.url))
			{
				req.kind |= tracker_request::i2p;
			}
			else if (is_i2p() && !settings().get_bool(settings_pack::allow_i2p_mixed))
			{
				// if we don't allow mixing normal peers into this i2p
				// torrent, skip this announce
				continue;
			}
#endif

			for (auto& aep : ae.endpoints)
			{
				// do not add code which continues to the next endpoint here!
				// listen_socket_states needs to be populated even if none of the endpoints
				// will be announcing for this tracker
				// otherwise the early bail out when neither announce_to_all_trackers
				// nor announce_to_all_tiers is set may be triggered prematurely

				auto aep_state_iter = std::find_if(listen_socket_states.begin(), listen_socket_states.end()
					, [&](announce_state const& s) { return s.socket == aep.socket; });
				if (aep_state_iter == listen_socket_states.end())
				{
					listen_socket_states.emplace_back(aep.socket);
					aep_state_iter = listen_socket_states.end() - 1;
				}
				announce_state& ep_state = *aep_state_iter;

				if (!aep.enabled) continue;

				for (protocol_version const ih : all_versions)
				{
					if (!supports_protocol[ih]) continue;

					auto& state = ep_state.state[ih];
					auto& a = aep.info_hashes[ih];

					// if we haven't sent an event=start to the tracker, there's no
					// point in sending an event=stopped
					if (!a.start_sent && req.event == event_t::stopped)
						continue;

					if (state.done) continue;

#ifndef TORRENT_DISABLE_LOGGING
					if (should_log())
					{
						debug_log("*** tracker:  (%d) [ep: %s ] \"%s\" [ "
							" i->tier: %d tier: %d working: %d limit: %d"
							" can: %d sent: %d ]"
							, idx, print_endpoint(aep.local_endpoint).c_str()
							, ae.url.c_str(), ae.tier, state.tier, a.is_working(), ae.fail_limit
							, a.can_announce(now, is_seed(), ae.fail_limit), state.sent_announce);
					}
#endif

					if (settings().get_bool(settings_pack::announce_to_all_tiers)
						&& !settings().get_bool(settings_pack::announce_to_all_trackers)
						&& state.sent_announce
						&& ae.tier <= state.tier
						&& state.tier != INT_MAX)
						continue;

					if (ae.tier > state.tier && state.sent_announce
						&& !settings().get_bool(settings_pack::announce_to_all_tiers)) continue;
					if (a.is_working()) { state.tier = ae.tier; state.sent_announce = false; }
					if (!a.can_announce(now, is_seed(), ae.fail_limit))
					{
						// this counts
						if (a.is_working())
						{
							state.sent_announce = true;
							if (!settings().get_bool(settings_pack::announce_to_all_trackers)
								&& !settings().get_bool(settings_pack::announce_to_all_tiers))
							{
								state.done = true;
							}
						}
						continue;
					}

					req.event = e;
					if (req.event == event_t::none)
					{
						if (!a.start_sent) req.event = event_t::started;
						else if (!m_complete_sent
							&& !a.complete_sent
							&& is_seed())
						{
							req.event = event_t::completed;
						}
					}

					req.triggered_manually = a.triggered_manually;
					a.triggered_manually = false;

#if TORRENT_ABI_VERSION == 1
					req.auth = tracker_login();
#endif
					req.key = tracker_key();

					req.outgoing_socket = aep.socket;
					req.info_hash = m_torrent_file->info_hashes().get(ih);

#ifndef TORRENT_DISABLE_LOGGING
					if (should_log())
					{
						debug_log("==> TRACKER REQUEST \"%s\" event: %s abort: %d ssl: %p "
							"port: %d ssl-port: %d fails: %d upd: %d ep: %s"
							, req.url.c_str()
							, (req.event == event_t::stopped ? "stopped"
								: req.event == event_t::started ? "started" : "")
							, m_abort
#ifdef TORRENT_SSL_PEERS
							, static_cast<void*>(req.ssl_ctx)
#else
							, static_cast<void*>(nullptr)
#endif
							, m_ses.listen_port()
							, m_ses.ssl_listen_port()
							, a.fails
							, a.updating
							, print_endpoint(aep.local_endpoint).c_str());
					}

					// if we're not logging session logs, don't bother creating an
					// observer object just for logging
					if (m_abort && m_ses.should_log())
					{
						auto tl = std::make_shared<aux::tracker_logger>(m_ses);
						m_ses.queue_tracker_request(req, tl);
					}
					else
#endif
					{
						m_ses.queue_tracker_request(req, shared_from_this());
					}

					a.updating = true;
					a.next_announce = now;
					a.min_announce = now;

					if (m_ses.alerts().should_post<tracker_announce_alert>())
					{
						m_ses.alerts().emplace_alert<tracker_announce_alert>(
							get_handle(), aep.local_endpoint, req.url, ih, req.event);
					}

					state.sent_announce = true;
					if (a.is_working()
						&& !settings().get_bool(settings_pack::announce_to_all_trackers)
						&& !settings().get_bool(settings_pack::announce_to_all_tiers))
					{
						state.done = true;
					}
				}
			}

			if (std::all_of(listen_socket_states.begin(), listen_socket_states.end()
				, [supports_protocol](announce_state const& s) {
					for (protocol_version const ih : all_versions)
					{
						if (supports_protocol[ih] && !s.state[ih].done)
							return false;
					}
					return true;
				}))
				break;
		}
		update_tracker_timer(now);
	}

追调用堆栈太多了。跳着看。搜索关键字,metadata,可以看到核心函数:


		bool received_metadata(ut_metadata_peer_plugin& source
			, span<char const> buf, int piece, int total_size);

		// returns a piece of the metadata that
		// we should request.
		// returns -1 if we should hold off the request
		int metadata_request(bool has_metadata);

	
		bool on_extended(int const length
			, int const extended_msg, span<char const> body) override
		{
			if (extended_msg != 2) return false;
			if (m_message_index == 0) return false;

			if (length > 17 * 1024)
			{
#ifndef TORRENT_DISABLE_LOGGING
				m_pc.peer_log(peer_log_alert::incoming_message, "UT_METADATA"
					, "packet too big %d", length);
#endif
				m_pc.disconnect(errors::invalid_metadata_message, operation_t::bittorrent, peer_connection_interface::peer_error);
				return true;
			}

			if (!m_pc.packet_finished()) return true;

			error_code ec;
			bdecode_node msg = bdecode(body, ec);
			if (msg.type() != bdecode_node::dict_t)
			{
#ifndef TORRENT_DISABLE_LOGGING
				m_pc.peer_log(peer_log_alert::incoming_message, "UT_METADATA"
					, "not a dictionary");
#endif
				m_pc.disconnect(errors::invalid_metadata_message, operation_t::bittorrent, peer_connection_interface::peer_error);
				return true;
			}

			bdecode_node const& type_ent = msg.dict_find_int("msg_type");
			bdecode_node const& piece_ent = msg.dict_find_int("piece");
			if (!type_ent || !piece_ent)
			{
#ifndef TORRENT_DISABLE_LOGGING
				m_pc.peer_log(peer_log_alert::incoming_message, "UT_METADATA"
					, "missing or invalid keys");
#endif
				m_pc.disconnect(errors::invalid_metadata_message, operation_t::bittorrent, peer_connection_interface::peer_error);
				return true;
			}
			auto const type = msg_t(type_ent.int_value());
			auto const piece = static_cast<int>(piece_ent.int_value());

#ifndef TORRENT_DISABLE_LOGGING
			m_pc.peer_log(peer_log_alert::incoming_message, "UT_METADATA"
				, "type: %d piece: %d", static_cast<int>(type), piece);
#endif

			switch (type)
			{
				case msg_t::request:
				{
					if (!m_torrent.valid_metadata()
						|| piece < 0 || piece >= (m_tp.metadata().size() + 16 * 1024 - 1) / (16 * 1024))
					{
#ifndef TORRENT_DISABLE_LOGGING
						if (m_pc.should_log(peer_log_alert::info))
						{
							m_pc.peer_log(peer_log_alert::info, "UT_METADATA"
								, "have: %d invalid piece %d metadata size: %d"
								, int(m_torrent.valid_metadata()), piece
								, m_torrent.valid_metadata()
									? int(m_tp.metadata().size()) : 0);
						}
#endif
						write_metadata_packet(msg_t::dont_have, piece);
						return true;
					}
					if (m_pc.send_buffer_size() < send_buffer_limit)
						write_metadata_packet(msg_t::piece, piece);
					else if (m_incoming_requests.size() < max_incoming_requests)
						m_incoming_requests.push_back(piece);
					else
						write_metadata_packet(msg_t::dont_have, piece);
				}
				break;
				case msg_t::piece:
				{
					auto const i = std::find(m_sent_requests.begin()
						, m_sent_requests.end(), piece);

					// unwanted piece?
					if (i == m_sent_requests.end())
					{
#ifndef TORRENT_DISABLE_LOGGING
						m_pc.peer_log(peer_log_alert::info, "UT_METADATA"
							, "UNWANTED / TIMED OUT");
#endif
						return true;
					}

					m_sent_requests.erase(i);
					auto const len = msg.data_section().size();
					auto const total_size = msg.dict_find_int_value("total_size", 0);
					m_tp.received_metadata(*this, body.subspan(len), piece, static_cast<int>(total_size));
					maybe_send_request();
				}
				break;
				case msg_t::dont_have:
				{
					m_request_limit = std::max(aux::time_now() + minutes(1), m_request_limit);
					auto const i = std::find(m_sent_requests.begin()
						, m_sent_requests.end(), piece);
					// unwanted piece?
					if (i == m_sent_requests.end()) return true;
					m_sent_requests.erase(i);
				}
				break;
			}

			m_pc.stats_counters().inc_stats_counter(counters::num_incoming_metadata);

			return true;
		}


	void bt_peer_connection::on_extended(int received)
	{
		INVARIANT_CHECK;

		TORRENT_ASSERT(received >= 0);
		received_bytes(0, received);
		if (m_recv_buffer.packet_size() < 2)
		{
			disconnect(errors::invalid_extended, operation_t::bittorrent, peer_error);
			return;
		}

		if (associated_torrent().expired())
		{
			disconnect(errors::invalid_extended, operation_t::bittorrent, peer_error);
			return;
		}

		span<char const> recv_buffer = m_recv_buffer.get();
		if (int(recv_buffer.size()) < 2) return;

		TORRENT_ASSERT(recv_buffer.front() == msg_extended);
		recv_buffer = recv_buffer.subspan(1);

		int const extended_id = aux::read_uint8(recv_buffer);

		if (extended_id == 0)
		{
			on_extended_handshake();
			disconnect_if_redundant();
			return;
		}

		if (extended_id == upload_only_msg)
		{
			if (!m_recv_buffer.packet_finished()) return;
			if (m_recv_buffer.packet_size() != 3)
			{
#ifndef TORRENT_DISABLE_LOGGING
				peer_log(peer_log_alert::incoming_message, "UPLOAD_ONLY"
					, "ERROR: unexpected packet size: %d", m_recv_buffer.packet_size());
#endif
				return;
			}
			bool const ul = aux::read_uint8(recv_buffer) != 0;
#ifndef TORRENT_DISABLE_LOGGING
			peer_log(peer_log_alert::incoming_message, "UPLOAD_ONLY"
				, "%s", (ul?"true":"false"));
#endif
			set_upload_only(ul);
			return;
		}

#ifndef TORRENT_DISABLE_SHARE_MODE
		if (extended_id == share_mode_msg)
		{
			if (!m_recv_buffer.packet_finished()) return;
			if (m_recv_buffer.packet_size() != 3)
			{
#ifndef TORRENT_DISABLE_LOGGING
				peer_log(peer_log_alert::incoming_message, "SHARE_MODE"
					, "ERROR: unexpected packet size: %d", m_recv_buffer.packet_size());
#endif
				return;
			}
			bool sm = aux::read_uint8(recv_buffer) != 0;
#ifndef TORRENT_DISABLE_LOGGING
			peer_log(peer_log_alert::incoming_message, "SHARE_MODE"
				, "%s", (sm?"true":"false"));
#endif
			set_share_mode(sm);
			return;
		}
#endif // TORRENT_DISABLE_SHARE_MODE

		if (extended_id == holepunch_msg)
		{
			if (!m_recv_buffer.packet_finished()) return;
#ifndef TORRENT_DISABLE_LOGGING
			peer_log(peer_log_alert::incoming_message, "HOLEPUNCH");
#endif
			on_holepunch();
			return;
		}

		if (extended_id == dont_have_msg)
		{
			if (!m_recv_buffer.packet_finished()) return;
			if (m_recv_buffer.packet_size() != 6)
			{
#ifndef TORRENT_DISABLE_LOGGING
				peer_log(peer_log_alert::incoming_message, "DONT_HAVE"
					, "ERROR: unexpected packet size: %d", m_recv_buffer.packet_size());
#endif
				return;
			}
			piece_index_t const piece(aux::read_int32(recv_buffer));
			incoming_dont_have(piece);
			return;
		}

#ifndef TORRENT_DISABLE_LOGGING
		if (m_recv_buffer.packet_finished())
			peer_log(peer_log_alert::incoming_message, "EXTENSION_MESSAGE"
				, "msg: %d size: %d", extended_id, m_recv_buffer.packet_size());
#endif

#ifndef TORRENT_DISABLE_EXTENSIONS
		for (auto const& e : m_extensions)
		{
			if (e->on_extended(m_recv_buffer.packet_size() - 2, extended_id
				, recv_buffer))
				return;
		}
#endif

		disconnect(errors::invalid_message, operation_t::bittorrent, peer_error);
	}

至此,答案明了了。

总结:

下载者拿着 info_hash 会从 dht 中获取 peer, 如果有 tracker, 也在 tracker 中询问 peer,然后对 peer 询问有无相同文件的 metadata ,如果有,获取 peer 传输的 metadata。

on_extended 是 peer_connection 的一种插件设计,如果有包收到,则检查插件列表。

copyright ©2019-2024 shenzhen
粤ICP备20041170号-1