流媒体学习之路(mediasoup)——流数据传输(4)
文章目录
- 流媒体学习之路(mediasoup)——流数据传输(4)
- 一、Router的建立
- 二、Transport的创建
- 三、流数据传输
- 四、总结
一、Router的建立
??Router顾名思义就是转发信息记录的类。该类的建立是基于上一篇提到的信令WORKER_CREATE_ROUTER上的。当进入到该处理环节,Router就被创建了。
case Channel::Request::MethodId::WORKER_CREATE_ROUTER:{
std::string routerId;// This may throw.SetNewRouterIdFromInternal(request->internal, routerId);auto* router = new RTC::Router(routerId);this->mapRouters[routerId] = router;MS_DEBUG_DEV("Router created [routerId:%s]", routerId.c_str());request->Accept();break;}
??Router继承了Transport::Listener。
??而Transport这个类则是作为传输控制最主要的类。可以看到,它不仅有流传输模块,还继承了拥塞控制等模块。而Listener是Transport类的嵌套类,它可以使用外层类的任何成员(c++11之后)。也就可以对外层类进行操作。而嵌套类主要是作为隐藏封装实现,因此Transport这部分隐藏了Router的实现,保证了封装性。
class Transport : public RTC::Producer::Listener,public RTC::Consumer::Listener,public RTC::DataProducer::Listener,public RTC::DataConsumer::Listener,public RTC::SctpAssociation::Listener,public RTC::TransportCongestionControlClient::Listener,public RTC::TransportCongestionControlServer::Listener, #ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATORpublic RTC::SenderBandwidthEstimator::Listener, #endifpublic Timer::Listener......private:// Passed by argument.Listener* listener{
nullptr };// Allocated by this.std::unordered_map<std::string, RTC::Producer*> mapProducers;std::unordered_map<std::string, RTC::Consumer*> mapConsumers;std::unordered_map<std::string, RTC::DataProducer*> mapDataProducers;std::unordered_map<std::string, RTC::DataConsumer*> mapDataConsumers;std::unordered_map<uint32_t, RTC::Consumer*> mapSsrcConsumer;std::unordered_map<uint32_t, RTC::Consumer*> mapRtxSsrcConsumer;......
??
二、Transport的创建
??上面提到Router类通过MethodId进行不同处理,默认的处理则会查找对应的Router,调用GetRouterFromInternal函数。
??Router处理对应的请求:
void Router::HandleRequest(Channel::Request* request){
MS_TRACE();switch (request->methodId){
case Channel::Request::MethodId::ROUTER_DUMP:{
json data = json::object();FillJson(data);request->Accept(data);break;}case Channel::Request::MethodId::ROUTER_CREATE_WEBRTC_TRANSPORT:{
std::string transportId;// This may throw.SetNewTransportIdFromInternal(request->internal, transportId);// This may throw.auto* webRtcTransport = new RTC::WebRtcTransport(transportId, this, request->data);// Insert into the map.this->mapTransports[transportId] = webRtcTransport;MS_DEBUG_DEV("WebRtcTransport created [transportId:%s]", transportId.c_str());json data = json::object();webRtcTransport->FillJson(data);request->Accept(data);break;}case Channel::Request::MethodId::ROUTER_CREATE_PLAIN_TRANSPORT:{
std::string transportId;// This may throwSetNewTransportIdFromInternal(request->internal, transportId);auto* plainTransport = new RTC::PlainTransport(transportId, this, request->data);// Insert into the map.this->mapTransports[transportId] = plainTransport;MS_DEBUG_DEV("PlainTransport created [transportId:%s]", transportId.c_str());json data = json::object();plainTransport->FillJson(data);request->Accept(data);break;}case Channel::Request::MethodId::ROUTER_CREATE_PIPE_TRANSPORT:{
std::string transportId;// This may throwSetNewTransportIdFromInternal(request->internal, transportId);auto* pipeTransport = new RTC::PipeTransport(transportId, this, request->data);// Insert into the map.this->mapTransports[transportId] = pipeTransport;MS_DEBUG_DEV("PipeTransport created [transportId:%s]", transportId.c_str());json data = json::object();pipeTransport->FillJson(data);request->Accept(data);break;}case Channel::Request::MethodId::ROUTER_CREATE_DIRECT_TRANSPORT:{
std::string transportId;// This may throwSetNewTransportIdFromInternal(request->internal, transportId);auto* directTransport = new RTC::DirectTransport(transportId, this, request->data);// Insert into the map.this->mapTransports[transportId] = directTransport;MS_DEBUG_DEV("DirectTransport created [transportId:%s]", transportId.c_str());json data = json::object();directTransport->FillJson(data);request->Accept(data);break;}case Channel::Request::MethodId::ROUTER_CREATE_AUDIO_LEVEL_OBSERVER:{
std::string rtpObserverId;// This may throwSetNewRtpObserverIdFromInternal(request->internal, rtpObserverId);auto* audioLevelObserver = new RTC::AudioLevelObserver(rtpObserverId, request->data);// Insert into the map.this->mapRtpObservers[rtpObserverId] = audioLevelObserver;MS_DEBUG_DEV("AudioLevelObserver created [rtpObserverId:%s]", rtpObserverId.c_str());request->Accept();break;}case Channel::Request::MethodId::TRANSPORT_CLOSE:{
// This may throw.RTC::Transport* transport = GetTransportFromInternal(request->internal);// Tell the Transport to close all its Producers and Consumers so it will// notify us about their closures.transport->CloseProducersAndConsumers();// Remove it from the map and delete it.this->mapTransports.erase(transport->id);MS_DEBUG_DEV("Transport closed [transportId:%s]", transport->id.c_str());// Delete it.delete transport;request->Accept();break;}case Channel::Request::MethodId::RTP_OBSERVER_CLOSE:{
// This may throw.RTC::RtpObserver* rtpObserver = GetRtpObserverFromInternal(request->internal);// Remove it from the map.this->mapRtpObservers.erase(rtpObserver->id);// Iterate all entries in mapProducerRtpObservers and remove the closed one.for (auto& kv : this->mapProducerRtpObservers){
auto& rtpObservers = kv.second;rtpObservers.erase(rtpObserver);}MS_DEBUG_DEV("RtpObserver closed [rtpObserverId:%s]", rtpObserver->id.c_str());// Delete it.delete rtpObserver;request->Accept();break;}case Channel::Request::MethodId::RTP_OBSERVER_PAUSE:{
// This may throw.RTC::RtpObserver* rtpObserver = GetRtpObserverFromInternal(request->internal);rtpObserver->Pause();request->Accept();break;}case Channel::Request::MethodId::RTP_OBSERVER_RESUME:{
// This may throw.RTC::RtpObserver* rtpObserver = GetRtpObserverFromInternal(request->internal);rtpObserver->Resume();request->Accept();break;}case Channel::Request::MethodId::RTP_OBSERVER_ADD_PRODUCER:{
// This may throw.RTC::RtpObserver* rtpObserver = GetRtpObserverFromInternal(request->internal);RTC::Producer* producer = GetProducerFromInternal(request->internal);rtpObserver->AddProducer(producer);// Add to the map.this->mapProducerRtpObservers[producer].insert(rtpObserver);request->Accept();break;}case Channel::Request::MethodId::RTP_OBSERVER_REMOVE_PRODUCER:{
// This may throw.RTC::RtpObserver* rtpObserver = GetRtpObserverFromInternal(request->internal);RTC::Producer* producer = GetProducerFromInternal(request->internal);rtpObserver->RemoveProducer(producer);// Remove from the map.this->mapProducerRtpObservers[producer].erase(rtpObserver);request->Accept();break;}// Any other request must be delivered to the corresponding Transport.default:{
// This may throw.RTC::Transport* transport = GetTransportFromInternal(request->internal);transport->HandleRequest(request);break;}}}
??上面代码中,对其他MethodId类型进行了不同的处理。而传输部分创建则是ROUTER_CREATE_WEBRTC_TRANSPORT这个类型。在这个case里,创建了WebRtcTransport对象。这个对象则是继承了前面提到的Transport类。
class WebRtcTransport : public RTC::Transport,public RTC::UdpSocket::Listener,public RTC::TcpServer::Listener,public RTC::TcpConnection::Listener,public RTC::IceServer::Listener,public RTC::DtlsTransport::Listener
??WebRtcTransport类的构造函数:
WebRtcTransport::WebRtcTransport(const std::string& id, RTC::Transport::Listener* listener, json& data): RTC::Transport::Transport(id, listener, data){
MS_TRACE();bool enableUdp{
true };auto jsonEnableUdpIt = data.find("enableUdp");if (jsonEnableUdpIt != data.end()){
if (!jsonEnableUdpIt->is_boolean())MS_THROW_TYPE_ERROR("wrong enableUdp (not a boolean)");enableUdp = jsonEnableUdpIt->get<bool>();}bool enableTcp{
false };auto jsonEnableTcpIt = data.find("enableTcp");if (jsonEnableTcpIt != data.end()){
if (!jsonEnableTcpIt->is_boolean())MS_THROW_TYPE_ERROR("wrong enableTcp (not a boolean)");enableTcp = jsonEnableTcpIt->get<bool>();}bool preferUdp{
false };auto jsonPreferUdpIt = data.find("preferUdp");if (jsonPreferUdpIt != data.end()){
if (!jsonPreferUdpIt->is_boolean())MS_THROW_TYPE_ERROR("wrong preferUdp (not a boolean)");preferUdp = jsonPreferUdpIt->get<bool>();}bool preferTcp{
false };auto jsonPreferTcpIt = data.find("preferTcp");if (jsonPreferTcpIt != data.end()){
if (!jsonPreferTcpIt->is_boolean())MS_THROW_TYPE_ERROR("wrong preferTcp (not a boolean)");preferTcp = jsonPreferTcpIt->get<bool>();}auto jsonListenIpsIt = data.find("listenIps");if (jsonListenIpsIt == data.end())MS_THROW_TYPE_ERROR("missing listenIps");else if (!jsonListenIpsIt->is_array())MS_THROW_TYPE_ERROR("wrong listenIps (not an array)");else if (jsonListenIpsIt->empty())MS_THROW_TYPE_ERROR("wrong listenIps (empty array)");else if (jsonListenIpsIt->size() > 8)MS_THROW_TYPE_ERROR("wrong listenIps (too many IPs)");std::vector<ListenIp> listenIps(jsonListenIpsIt->size());for (size_t i{
0 }; i < jsonListenIpsIt->size(); ++i){
auto& jsonListenIp = (*jsonListenIpsIt)[i];auto& listenIp = listenIps[i];if (!jsonListenIp.is_object())MS_THROW_TYPE_ERROR("wrong listenIp (not an object)");auto jsonIpIt = jsonListenIp.find("ip");if (jsonIpIt == jsonListenIp.end())MS_THROW_TYPE_ERROR("missing listenIp.ip");else if (!jsonIpIt->is_string())MS_THROW_TYPE_ERROR("wrong listenIp.ip (not an string");listenIp.ip.assign(jsonIpIt->get<std::string>());// This may throw.Utils::IP::NormalizeIp(listenIp.ip);auto jsonAnnouncedIpIt = jsonListenIp.find("announcedIp");if (jsonAnnouncedIpIt != jsonListenIp.end()){
if (!jsonAnnouncedIpIt->is_string())MS_THROW_TYPE_ERROR("wrong listenIp.announcedIp (not an string)");listenIp.announcedIp.assign(jsonAnnouncedIpIt->get<std::string>());}}try{
uint16_t iceLocalPreferenceDecrement{
0 };if (enableUdp && enableTcp)this->iceCandidates.reserve(2 * jsonListenIpsIt->size());elsethis->iceCandidates.reserve(jsonListenIpsIt->size());for (auto& listenIp : listenIps){
if (enableUdp){
uint16_t iceLocalPreference =IceCandidateDefaultLocalPriority - iceLocalPreferenceDecrement;if (preferUdp)iceLocalPreference += 1000;uint32_t icePriority = generateIceCandidatePriority(iceLocalPreference);// This may throw.auto* udpSocket = new RTC::UdpSocket(this, listenIp.ip);this->udpSockets[udpSocket] = listenIp.announcedIp;if (listenIp.announcedIp.empty())this->iceCandidates.emplace_back(udpSocket, icePriority);elsethis->iceCandidates.emplace_back(udpSocket, icePriority, listenIp.announcedIp);}if (enableTcp){
uint16_t iceLocalPreference =IceCandidateDefaultLocalPriority - iceLocalPreferenceDecrement;if (preferTcp)iceLocalPreference += 1000;uint32_t icePriority = generateIceCandidatePriority(iceLocalPreference);// This may throw.auto* tcpServer = new RTC::TcpServer(this, this, listenIp.ip);this->tcpServers[tcpServer] = listenIp.announcedIp;if (listenIp.announcedIp.empty())this->iceCandidates.emplace_back(tcpServer, icePriority);elsethis->iceCandidates.emplace_back(tcpServer, icePriority, listenIp.announcedIp);}// Decrement initial ICE local preference for next IP.iceLocalPreferenceDecrement += 100;}// Create a ICE server.this->iceServer = new RTC::IceServer(this, Utils::Crypto::GetRandomString(16), Utils::Crypto::GetRandomString(32));// Create a DTLS transport.this->dtlsTransport = new RTC::DtlsTransport(this);}catch (const MediaSoupError& error){
// Must delete everything since the destructor won't be called.delete this->dtlsTransport;this->dtlsTransport = nullptr;delete this->iceServer;this->iceServer = nullptr;for (auto& kv : this->udpSockets){
auto* udpSocket = kv.first;delete udpSocket;}this->udpSockets.clear();for (auto& kv : this->tcpServers){
auto* tcpServer = kv.first;delete tcpServer;}this->tcpServers.clear();this->iceCandidates.clear();throw;}}
??构造函数前半段是解析json。随后根据ListenIp初始化了ICE服务的IceCandidate。IceServer中通过GetRandomString(16)产生一个16位的随机值,32就是32位的随机值作为连接Transport时的密码。
??ICE服务器相关问题可以查看:https://developer.mozilla.org/zh-CN/docs/Web/API/WebRTC_API/Protocols
??需要ICE服务器是为了让无法实现打洞的浏览器通过它进行一个中转,来实现传输。我们都知道WebRTC用的是P2P模式,同时仅支持https协议。DtlsTransport安全传输模块接着IceServer进行了初始化。
DtlsTransport::DtlsTransport(Listener* listener) : listener(listener){
MS_TRACE();/* Set SSL. *///设置sslthis->ssl = SSL_new(DtlsTransport::sslCtx);if (!this->ssl){
LOG_OPENSSL_ERROR("SSL_new() failed");goto error;}//加载ssl信息// Set this as custom data.SSL_set_ex_data(this->ssl, 0, static_cast<void*>(this));this->sslBioFromNetwork = BIO_new(BIO_s_mem());if (!this->sslBioFromNetwork){
LOG_OPENSSL_ERROR("BIO_new() failed");SSL_free(this->ssl);goto error;}this->sslBioToNetwork = BIO_new(BIO_s_mem());if (!this->sslBioToNetwork){
LOG_OPENSSL_ERROR("BIO_new() failed");BIO_free(this->sslBioFromNetwork);SSL_free(this->ssl);goto error;}SSL_set_bio(this->ssl, this->sslBioFromNetwork, this->sslBioToNetwork);// Set the MTU so that we don't send packets that are too large with no fragmentation.SSL_set_mtu(this->ssl, DtlsMtu);DTLS_set_link_mtu(this->ssl, DtlsMtu);// Set callback handler for setting DTLS timer interval.DTLS_set_timer_cb(this->ssl, onSslDtlsTimer);// Set the DTLS timer.this->timer = new Timer(this);return;error:// NOTE: At this point SSL_set_bio() was not called so we must free BIOs as// well.if (this->sslBioFromNetwork)BIO_free(this->sslBioFromNetwork);if (this->sslBioToNetwork)BIO_free(this->sslBioToNetwork);if (this->ssl)SSL_free(this->ssl);// NOTE: If this is not catched by the caller the program will abort, but// this should never happen.MS_THROW_ERROR("DtlsTransport instance creation failed");}
三、流数据传输
??在前面提到的WebRtcTransport类中,处理流数据包接收的函数是OnUdpSocketPacketReceived。我们通过该函数层层地往下翻来观察它函数转发的流程。
??首先是最底层的函数是:onRecv
inline static void onRecv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned int flags)
{
auto* socket = static_cast<UdpSocket*>(handle->data);if (socket)socket->OnUvRecv(nread, buf, addr, flags);
}
??与信令传输类似,都是取出handle并强转为其他类型来进行调用。
??调用OnUvRecv函数:
inline void UdpSocket::OnUvRecv(ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned int flags)
{
MS_TRACE();// NOTE: Ignore if there is nothing to read or if it was an empty datagram.if (nread == 0)return;// Check flags.if ((flags & UV_UDP_PARTIAL) != 0u){
MS_ERROR("received datagram was truncated due to insufficient buffer, ignoring it");return;}// Data received.if (nread > 0){
// Update received bytes.this->recvBytes += nread;// Notify the subclass.UserOnUdpDatagramReceived(reinterpret_cast<uint8_t*>(buf->base), nread, addr);}// Some error.else{
MS_DEBUG_DEV("read error: %s", uv_strerror(nread));}
}inline void UdpSocket::OnUvSend(int status, UdpSocket::onSendCallback* cb)
{
MS_TRACE();if (status == 0){
if (cb)(*cb)(true);}else{
#if MS_LOG_DEV_LEVEL == 3MS_DEBUG_DEV("send error: %s", uv_strerror(status));
#endifif (cb)(*cb)(false);}
}
??在回调函数部分都使用了内联函数来提高传输效率。对数据进行错误检查后调用UserOnUdpDatagramReceived通知上层。
void UdpSocket::UserOnUdpDatagramReceived(const uint8_t* data, size_t len, const struct sockaddr* addr){
MS_TRACE();if (!this->listener){
MS_ERROR("no listener set");return;}// Notify the reader.this->listener->OnUdpSocketPacketReceived(this, data, len, addr);}
??而WebRtcTransport类作为一个监听者进行关联,当收到udp包时调用OnUdpSocketPacketReceived函数。
inline void WebRtcTransport::OnUdpSocketPacketReceived(RTC::UdpSocket* socket, const uint8_t* data, size_t len, const struct sockaddr* remoteAddr){
MS_TRACE();RTC::TransportTuple tuple(socket, remoteAddr);OnPacketReceived(&tuple, data, len);}
??随后调用OnPacketReceived
inline void WebRtcTransport::OnPacketReceived(RTC::TransportTuple* tuple, const uint8_t* data, size_t len){
MS_TRACE();// Increase receive transmission.RTC::Transport::DataReceived(len);// Check if it's STUN.if (RTC::StunPacket::IsStun(data, len)){
OnStunDataReceived(tuple, data, len);}// Check if it's RTCP.else if (RTC::RTCP::Packet::IsRtcp(data, len)){
OnRtcpDataReceived(tuple, data, len);}// Check if it's RTP.else if (RTC::RtpPacket::IsRtp(data, len)){
OnRtpDataReceived(tuple, data, len);}// Check if it's DTLS.else if (RTC::DtlsTransport::IsDtls(data, len)){
OnDtlsDataReceived(tuple, data, len);}else{
MS_WARN_DEV("ignoring received packet of unknown type");}}
??从代码的注释中我们可以看出,这个函数对STUN、RTCP、RTP、DTLS分别进行了处理。流媒体数据RTP包被分配到了OnRtpDataReceived这个函数中。
inline void WebRtcTransport::OnRtpDataReceived(RTC::TransportTuple* tuple, const uint8_t* data, size_t len){
MS_TRACE();// Ensure DTLS is connected.if (this->dtlsTransport->GetState() != RTC::DtlsTransport::DtlsState::CONNECTED){
MS_DEBUG_2TAGS(dtls, rtp, "ignoring RTP packet while DTLS not connected");return;}// Ensure there is receiving SRTP session.if (!this->srtpRecvSession){
MS_DEBUG_TAG(srtp, "ignoring RTP packet due to non receiving SRTP session");return;}// Ensure it comes from a valid tuple.if (!this->iceServer->IsValidTuple(tuple)){
MS_WARN_TAG(rtp, "ignoring RTP packet coming from an invalid tuple");return;}// Decrypt the SRTP packet.if (!this->srtpRecvSession->DecryptSrtp(const_cast<uint8_t*>(data), &len)){
RTC::RtpPacket* packet = RTC::RtpPacket::Parse(data, len);if (!packet){
MS_WARN_TAG(srtp, "DecryptSrtp() failed due to an invalid RTP packet");}else{
MS_WARN_TAG(srtp,"DecryptSrtp() failed [ssrc:%" PRIu32 ", payloadType:%" PRIu8 ", seq:%" PRIu16 "]",packet->GetSsrc(),packet->GetPayloadType(),packet->GetSequenceNumber());delete packet;}return;}RTC::RtpPacket* packet = RTC::RtpPacket::Parse(data, len);if (!packet){
MS_WARN_TAG(rtp, "received data is not a valid RTP packet");return;}// Trick for clients performing aggressive ICE regardless we are ICE-Lite.this->iceServer->ForceSelectedTuple(tuple);// Pass the packet to the parent transport.RTC::Transport::ReceiveRtpPacket(packet);}
??从注释可以看出,该函数进行了以下几个操作:
??1.确定DTLS是否已经连接;
??2.确定SRTP session是否正在工作;
??3.确定是否是一个有效的iceserver元组用于选址连接;
??4解密 SRTP packet.;
??5.解析数据包;
??6.调用ReceiveRtpPacket。
void Transport::ReceiveRtpPacket(RTC::RtpPacket* packet){
MS_TRACE();// Apply the Transport RTP header extension ids so the RTP listener can use them.packet->SetMidExtensionId(this->recvRtpHeaderExtensionIds.mid);packet->SetRidExtensionId(this->recvRtpHeaderExtensionIds.rid);packet->SetRepairedRidExtensionId(this->recvRtpHeaderExtensionIds.rrid);packet->SetAbsSendTimeExtensionId(this->recvRtpHeaderExtensionIds.absSendTime);packet->SetTransportWideCc01ExtensionId(this->recvRtpHeaderExtensionIds.transportWideCc01);auto nowMs = DepLibUV::GetTimeMs();// Feed the TransportCongestionControlServer.if (this->tccServer)this->tccServer->IncomingPacket(nowMs, packet);// Get the associated Producer.RTC::Producer* producer = this->rtpListener.GetProducer(packet);if (!producer){
MS_WARN_TAG(rtp,"no suitable Producer for received RTP packet [ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]",packet->GetSsrc(),packet->GetPayloadType());// Tell the child class to remove this SSRC.RecvStreamClosed(packet->GetSsrc());delete packet;return;}// MS_DEBUG_DEV(// "RTP packet received [ssrc:%" PRIu32 ", payloadType:%" PRIu8 ", producerId:%s]",// packet->GetSsrc(),// packet->GetPayloadType(),// producer->id.c_str());// Pass the RTP packet to the corresponding Producer.auto result = producer->ReceiveRtpPacket(packet);switch (result){
case RTC::Producer::ReceiveRtpPacketResult::MEDIA:this->recvRtpTransmission.Update(packet);break;case RTC::Producer::ReceiveRtpPacketResult::RETRANSMISSION:this->recvRtxTransmission.Update(packet);break;default:;}delete packet;}
??该函数中,首先根据扩展头的内容传给对应的处理函数。随后调用Producer::ReceiveRtpPacket函数。
Producer::ReceiveRtpPacketResult Producer::ReceiveRtpPacket(RTC::RtpPacket* packet){
MS_TRACE();// Reset current packet.this->currentRtpPacket = nullptr;// Count number of RTP streams.auto numRtpStreamsBefore = this->mapSsrcRtpStream.size();auto* rtpStream = GetRtpStream(packet);if (!rtpStream){
MS_WARN_TAG(rtp, "no stream found for received packet [ssrc:%" PRIu32 "]", packet->GetSsrc());return ReceiveRtpPacketResult::DISCARDED;}// Pre-process the packet.PreProcessRtpPacket(packet);ReceiveRtpPacketResult result;bool isRtx{
false };// Media packet.if (packet->GetSsrc() == rtpStream->GetSsrc()){
result = ReceiveRtpPacketResult::MEDIA;// Process the packet.if (!rtpStream->ReceivePacket(packet)){
// May have to announce a new RTP stream to the listener.if (this->mapSsrcRtpStream.size() > numRtpStreamsBefore)NotifyNewRtpStream(rtpStream);return result;}}// RTX packet.else if (packet->GetSsrc() == rtpStream->GetRtxSsrc()){
result = ReceiveRtpPacketResult::RETRANSMISSION;isRtx = true;// Process the packet.if (!rtpStream->ReceiveRtxPacket(packet))return result;}// Should not happen.else{
MS_ABORT("found stream does not match received packet");}if (packet->IsKeyFrame()){
MS_DEBUG_TAG(rtp,"key frame received [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",packet->GetSsrc(),packet->GetSequenceNumber());// Tell the keyFrameRequestManager.if (this->keyFrameRequestManager)this->keyFrameRequestManager->KeyFrameReceived(packet->GetSsrc());}// May have to announce a new RTP stream to the listener.if (this->mapSsrcRtpStream.size() > numRtpStreamsBefore){
// Request a key frame for this stream since we may have lost the first packets// (do not do it if this is a key frame).if (this->keyFrameRequestManager && !this->paused && !packet->IsKeyFrame())this->keyFrameRequestManager->ForceKeyFrameNeeded(packet->GetSsrc());// Update current packet.this->currentRtpPacket = packet;NotifyNewRtpStream(rtpStream);// Reset current packet.this->currentRtpPacket = nullptr;}// If paused stop here.if (this->paused)return result;// May emit 'trace' event.EmitTraceEventRtpAndKeyFrameTypes(packet, isRtx);// Mangle the packet before providing the listener with it.if (!MangleRtpPacket(packet, rtpStream))return ReceiveRtpPacketResult::DISCARDED;// Post-process the packet.PostProcessRtpPacket(packet);this->listener->OnProducerRtpPacketReceived(this, packet);return result;}
??可以看到,该函数对这个包的ssrc以及是否是关键帧进行了判断,并执行了后续处理。这部分的内容我们后续再详细分析。
??接下来调用了OnProducerRtpPacketReceived函数,而该函数内只是调用了OnTransportProducerRtpPacketReceived函数,下面给两个函数的源码:
inline void Transport::OnProducerRtpPacketReceived(RTC::Producer* producer, RTC::RtpPacket* packet){
MS_TRACE();this->listener->OnTransportProducerRtpPacketReceived(this, producer, packet);}
inline void Router::OnTransportProducerRtpPacketReceived(RTC::Transport* /*transport*/, RTC::Producer* producer, RTC::RtpPacket* packet){
MS_TRACE();auto& consumers = this->mapProducerConsumers.at(producer);for (auto* consumer : consumers){
// Update MID RTP extension value.const auto& mid = consumer->GetRtpParameters().mid;if (!mid.empty())packet->UpdateMid(mid);consumer->SendRtpPacket(packet);}auto it = this->mapProducerRtpObservers.find(producer);if (it != this->mapProducerRtpObservers.end()){
auto& rtpObservers = it->second;for (auto* rtpObserver : rtpObservers){
rtpObserver->ReceiveRtpPacket(producer, packet);}}
??经过一系列的处理我们发现,数据包又回到了Router模块。
inline void Router::OnTransportProducerRtpPacketReceived(RTC::Transport* /*transport*/, RTC::Producer* producer, RTC::RtpPacket* packet){
MS_TRACE();auto& consumers = this->mapProducerConsumers.at(producer);for (auto* consumer : consumers){
// Update MID RTP extension value.const auto& mid = consumer->GetRtpParameters().mid;if (!mid.empty())packet->UpdateMid(mid);consumer->SendRtpPacket(packet);}auto it = this->mapProducerRtpObservers.find(producer);if (it != this->mapProducerRtpObservers.end()){
auto& rtpObservers = it->second;for (auto* rtpObserver : rtpObservers){
rtpObserver->ReceiveRtpPacket(producer, packet);}}}
??可以看到消费者模块将媒体包直放入了发送流程。而生产者模块(音频的生产者)则需要单独对音频包的声音进行处理。
??进行玩这些流程后,OnProducerRtpPacketReceived返回了一个结果。根据结果得到是否是重传包进行不同的操作,重传包则更新重传模块。
四、总结
??本章对流数据包的传输流程进行了梳理,但很多细节都一比带过。如:音频包为什么需要单独处理、以及STUN、DTLS、ICE等模块的处理流程、重传处理等。还有很多细节没有列出,后面会针对这些内容进行阅读分析。当然会有很多错误的解析,后面会一点一点纠正。