当前位置: 代码迷 >> 综合 >> 流媒体学习之路(mediasoup)——信令传输(3)
  详细解决方案

流媒体学习之路(mediasoup)——信令传输(3)

热度:32   发布时间:2023-12-13 23:42:07.0

流媒体学习之路(mediasoup)——信令传输(3)

文章目录

  • 流媒体学习之路(mediasoup)——信令传输(3)
  • 一、Node.js部分
  • 二、C++部分
    • 2.1 UnixStreamSocket创建
    • 2.2 UnixStreamSocket
    • 2.3 OnRead函数:
    • 2.4 OnUvRead函数:
    • 2.5 UserOnUnixStreamRead:
    • 2.6 OnConsumerSocketMessage
    • 2.7 Request
    • 2.8 OnChannelRequest
  • 三、总结


一、Node.js部分

??mediasoup——c++部分的信令交互主要通过Socket管道来进行的。Node.js部分与c++部分通过本机网络进行通话,实现交互。
前面两篇提到,Node.js部分worker建立后需要建立channel来进行通信,我们再来回顾一下channel.js部分通信的代码。下面是Worker的构造函数:

 constructor({
      logLevel, logTags, rtcMinPort, rtcMaxPort, dtlsCertificateFile, dtlsPrivateKeyFile, appData }) {
    super();// Closed flag.this._closed = false;// Routers set.this._routers = new Set();// Observer instance.this._observer = new EnhancedEventEmitter_1.EnhancedEventEmitter();logger.debug('constructor()');let spawnBin = workerBin;let spawnArgs = [];if (process.env.MEDIASOUP_USE_VALGRIND === 'true') {
    spawnBin = process.env.MEDIASOUP_VALGRIND_BIN || 'valgrind';if (process.env.MEDIASOUP_VALGRIND_OPTIONS)spawnArgs = spawnArgs.concat(process.env.MEDIASOUP_VALGRIND_OPTIONS.split(/\s+/));spawnArgs.push(workerBin);}if (typeof logLevel === 'string' && logLevel)spawnArgs.push(`--logLevel=${
      logLevel}`);for (const logTag of (Array.isArray(logTags) ? logTags : [])) {
    if (typeof logTag === 'string' && logTag)spawnArgs.push(`--logTag=${
      logTag}`);}if (typeof rtcMinPort === 'number' && !Number.isNaN(rtcMinPort))spawnArgs.push(`--rtcMinPort=${
      rtcMinPort}`);if (typeof rtcMaxPort === 'number' && !Number.isNaN(rtcMaxPort))spawnArgs.push(`--rtcMaxPort=${
      rtcMaxPort}`);if (typeof dtlsCertificateFile === 'string' && dtlsCertificateFile)spawnArgs.push(`--dtlsCertificateFile=${
      dtlsCertificateFile}`);if (typeof dtlsPrivateKeyFile === 'string' && dtlsPrivateKeyFile)spawnArgs.push(`--dtlsPrivateKeyFile=${
      dtlsPrivateKeyFile}`);logger.debug('spawning worker process: %s %s', spawnBin, spawnArgs.join(' '));this._child = child_process_1.spawn(// commandspawnBin, // argsspawnArgs, // options{
    env: {
    MEDIASOUP_VERSION: '3.6.13'},detached: false,// fd 0 (stdin) : Just ignore it.// fd 1 (stdout) : Pipe it for 3rd libraries that log their own stuff.// fd 2 (stderr) : Same as stdout.// fd 3 (channel) : Producer Channel fd.// fd 4 (channel) : Consumer Channel fd.// fd 5 (channel) : Producer PayloadChannel fd.// fd 6 (channel) : Consumer PayloadChannel fd.stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe'],windowsHide: true});this._pid = this._child.pid;this._channel = new Channel_1.Channel({
    producerSocket: this._child.stdio[3],consumerSocket: this._child.stdio[4],pid: this._pid});this._payloadChannel = new PayloadChannel_1.PayloadChannel({
    // NOTE: TypeScript does not like more than 5 fds.// @ts-ignoreproducerSocket: this._child.stdio[5],// @ts-ignoreconsumerSocket: this._child.stdio[6]});this._appData = appData;let spawnDone = false;// Listen for 'running' notification.this._channel.once(String(this._pid), (event) => {
    if (!spawnDone && event === 'running') {
    spawnDone = true;logger.debug('worker process running [pid:%s]', this._pid);this.emit('@success');}});this._child.on('exit', (code, signal) => {
    this._child = undefined;this.close();if (!spawnDone) {
    spawnDone = true;if (code === 42) {
    logger.error('worker process failed due to wrong settings [pid:%s]', this._pid);this.emit('@failure', new TypeError('wrong settings'));}else {
    logger.error('worker process failed unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal);this.emit('@failure', new Error(`[pid:${
      this._pid}, code:${
      code}, signal:${
      signal}]`));}}else {
    logger.error('worker process died unexpectedly [pid:%s, code:%s, signal:%s]', this._pid, code, signal);this.safeEmit('died', new Error(`[pid:${
      this._pid}, code:${
      code}, signal:${
      signal}]`));}});this._child.on('error', (error) => {
    this._child = undefined;this.close();if (!spawnDone) {
    spawnDone = true;logger.error('worker process failed [pid:%s]: %s', this._pid, error.message);this.emit('@failure', error);}else {
    logger.error('worker process error [pid:%s]: %s', this._pid, error.message);this.safeEmit('died', error);}});// Be ready for 3rd party worker libraries logging to stdout.this._child.stdout.on('data', (buffer) => {
    for (const line of buffer.toString('utf8').split('\n')) {
    if (line)workerLogger.debug(`(stdout) ${
      line}`);}});// In case of a worker bug, mediasoup will log to stderr.this._child.stderr.on('data', (buffer) => {
    for (const line of buffer.toString('utf8').split('\n')) {
    if (line)workerLogger.error(`(stderr) ${
      line}`);}});}

??可见创建子进程(worker)后,创建了两个频道channel、payloadchannel。这两个对应cpp代码中的两个频道。创建channel时传入的三个参数——producerSocket, consumerSocket, pid 分别代表:子进程的生产者Socket、子进程消费者Socket以及子进程(worker-cpp)的进程id。

 constructor({
      producerSocket, consumerSocket, pid }) {
    super();// Closed flag.this._closed = false;// Next id for messages sent to the worker process.this._nextId = 0;// Map of pending sent requests.this._sents = new Map();logger.debug('constructor()');this._producerSocket = producerSocket;this._consumerSocket = consumerSocket;// Read Channel responses/notifications from the worker.this._consumerSocket.on('data', (buffer) => {
    if (!this._recvBuffer) {
    this._recvBuffer = buffer;}else {
    this._recvBuffer = Buffer.concat([this._recvBuffer, buffer], this._recvBuffer.length + buffer.length);}if (this._recvBuffer.length > NS_PAYLOAD_MAX_LEN) {
    logger.error('receiving buffer is full, discarding all data into it');// Reset the buffer and exit.this._recvBuffer = undefined;return;}while (true) // eslint-disable-line no-constant-condition{
    let nsPayload;try {
    nsPayload = netstring.nsPayload(this._recvBuffer);}catch (error) {
    logger.error('invalid netstring data received from the worker process: %s', String(error));// Reset the buffer and exit.this._recvBuffer = undefined;return;}// Incomplete netstring message.if (nsPayload === -1)return;try {
    // We can receive JSON messages (Channel messages) or log strings.switch (nsPayload[0]) {
    // 123 = '{' (a Channel JSON messsage).case 123:this._processMessage(JSON.parse(nsPayload.toString('utf8')));break;// 68 = 'D' (a debug log).case 68:logger.debug(`[pid:${
      pid}] ${
      nsPayload.toString('utf8', 1)}`);break;// 87 = 'W' (a warn log).case 87:logger.warn(`[pid:${
      pid}] ${
      nsPayload.toString('utf8', 1)}`);break;// 69 = 'E' (an error log).case 69:logger.error(`[pid:${
      pid} ${
      nsPayload.toString('utf8', 1)}`);break;// 88 = 'X' (a dump log).case 88:// eslint-disable-next-line no-consoleconsole.log(nsPayload.toString('utf8', 1));break;default:// eslint-disable-next-line no-consoleconsole.warn(`worker[pid:${
      pid}] unexpected data: %s`, nsPayload.toString('utf8', 1));}}catch (error) {
    logger.error('received invalid message from the worker process: %s', String(error));}// Remove the read payload from the buffer.this._recvBuffer =this._recvBuffer.slice(netstring.nsLength(this._recvBuffer));if (!this._recvBuffer.length) {
    this._recvBuffer = undefined;return;}}});this._consumerSocket.on('end', () => (logger.debug('Consumer Channel ended by the worker process')));this._consumerSocket.on('error', (error) => (logger.error('Consumer Channel error: %s', String(error))));this._producerSocket.on('end', () => (logger.debug('Producer Channel ended by the worker process')));this._producerSocket.on('error', (error) => (logger.error('Producer Channel error: %s', String(error))));}

??在channel中执行了一个死循环,等待接收buffer中传入数据后想worker进行通信。

二、C++部分

??为了方便理解,我将cpp代码信令部分按语义进行了分层。分层图大致如下:
在这里插入图片描述

2.1 UnixStreamSocket创建

??这里有一个很容易让人头晕的类名,就是UnixStreamSocket。这个类在命名空间Channel中以及全局空间中有同名的两个类。
??a. 全局空间的类是生产者/消费者类的真正基类,它将会保存Socket的fd、最大buf大小、角色类型(CONSUMER 或者 PRODUCER)。
??b. Channel内部的类则为主函数中调用的类,该类负责初始化生产者、消费者,同时该类也继承了生产者消费者类内的监听类。这样就可以在外层实现内部类函数的调用。
??下面列一下伪代码:

class UnixStreamSocket
{
    
public:UnixStreamSocket(int fd, size_t bufferSize, UnixStreamSocket::Role role);
};namespace Channel {
    class ConsumerSocket : public ::UnixStreamSocket{
    public:class Listener{
    };ConsumerSocket(int fd, size_t bufferSize, Listener* listener);};     class UnixStreamSocket : public ConsumerSocket::Listener {
    public:explicit UnixStreamSocket(int consumerFd, int producerFd);};
};

2.2 UnixStreamSocket

??在C++代码的main函数中,最初就构造了两个channel。其中,Channel::UnixStreamSocket的构造中首先创建了一个名为uvHandle的uv_pipe_t私有对象。该对象将会开启libUV,这里做了一下libUV的基础简介:https://blog.csdn.net/qw225967/article/details/119319970
??libUV中,初始化了uv_pipe,随后调用uv_pipe_open然后开始uv_read_start。这样libUV就进入了loop。
??打开之后进入uv_read_start,这个函数是用来启动读的操作,它同样也有三个参数:
??a. uvHandle,里面存放的是这个对象本身
??b. onAlloc,当buffer不足时回调这个函数,便能重新创建一个buffer
??c. onRead,接收pipe的另一端发送的数据
??此处引用熠熠微光作者的文章:https://blog.csdn.net/Frederick_Fung/article/details/107063392

UnixStreamSocket::UnixStreamSocket(int fd, size_t bufferSize, UnixStreamSocket::Role role): bufferSize(bufferSize), role(role)
{
    MS_TRACE_STD();int err;this->uvHandle       = new uv_pipe_t;this->uvHandle->data = static_cast<void*>(this);err = uv_pipe_init(DepLibUV::GetLoop(), this->uvHandle, 0);if (err != 0){
    delete this->uvHandle;this->uvHandle = nullptr;MS_THROW_ERROR_STD("uv_pipe_init() failed: %s", uv_strerror(err));}err = uv_pipe_open(this->uvHandle, fd);if (err != 0){
    uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onClose));MS_THROW_ERROR_STD("uv_pipe_open() failed: %s", uv_strerror(err));}if (this->role == UnixStreamSocket::Role::CONSUMER){
    // Start reading.err = uv_read_start(reinterpret_cast<uv_stream_t*>(this->uvHandle),static_cast<uv_alloc_cb>(onAlloc),static_cast<uv_read_cb>(onRead));if (err != 0){
    uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onClose));MS_THROW_ERROR_STD("uv_read_start() failed: %s", uv_strerror(err));}}// NOTE: Don't allocate the buffer here. Instead wait for the first uv_alloc_cb().
}

2.3 OnRead函数:

inline static void onRead(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf)
{
    auto* socket = static_cast<UnixStreamSocket*>(handle->data);if (socket)socket->OnUvRead(nread, buf);
}

??libUV回调时传入了fd、读取的数据信息以及buf位置。
??这里强转handle->data后取出socket,然后使用socket调用OnUvRead函数。为什么使用强转后可以调用呢?是因为uv_stream_t这个结构体第一个成员为void*,可以将其转为任何类型的指针。

2.4 OnUvRead函数:

??在函数里只是判断了读到数据的大小,判断是否需要调用子类的读函数。

inline void UnixStreamSocket::OnUvRead(ssize_t nread, const uv_buf_t* /*buf*/)
{
    MS_TRACE_STD();if (nread == 0)return;// Data received.if (nread > 0){
    // Update the buffer data length.this->bufferDataLen += static_cast<size_t>(nread);// Notify the subclass.UserOnUnixStreamRead();}// Peer disconnected.else if (nread == UV_EOF || nread == UV_ECONNRESET){
    this->isClosedByPeer = true;// Close local side of the pipe.Close();// Notify the subclass.UserOnUnixStreamSocketClosed();}// Some error.else{
    MS_ERROR_STD("read error, closing the pipe: %s", uv_strerror(nread));this->hasError = true;// Close the socket.Close();// Notify the subclass.UserOnUnixStreamSocketClosed();}
}

2.5 UserOnUnixStreamRead:

void ConsumerSocket::UserOnUnixStreamRead(){
    MS_TRACE();// Be ready to parse more than a single message in a single chunk.while (true){
    if (IsClosed())return;size_t readLen = this->bufferDataLen - this->msgStart;char* msgStart = nullptr;size_t msgLen;int nsRet = netstring_read(reinterpret_cast<char*>(this->buffer + this->msgStart), readLen, &msgStart, &msgLen);if (nsRet != 0){
    switch (nsRet){
    case NETSTRING_ERROR_TOO_SHORT:{
    // Check if the buffer is full.if (this->bufferDataLen == this->bufferSize){
    // First case: the incomplete message does not begin at position 0 of// the buffer, so move the incomplete message to the position 0.if (this->msgStart != 0){
    std::memmove(this->buffer, this->buffer + this->msgStart, readLen);this->msgStart      = 0;this->bufferDataLen = readLen;}// Second case: the incomplete message begins at position 0 of the buffer.// The message is too big, so discard it.else{
    MS_ERROR("no more space in the buffer for the unfinished message being parsed, ""discarding it");this->msgStart      = 0;this->bufferDataLen = 0;}}// Otherwise the buffer is not full, just wait.return;}case NETSTRING_ERROR_TOO_LONG:{
    MS_ERROR("NETSTRING_ERROR_TOO_LONG");break;}case NETSTRING_ERROR_NO_COLON:{
    MS_ERROR("NETSTRING_ERROR_NO_COLON");break;}case NETSTRING_ERROR_NO_COMMA:{
    MS_ERROR("NETSTRING_ERROR_NO_COMMA");break;}case NETSTRING_ERROR_LEADING_ZERO:{
    MS_ERROR("NETSTRING_ERROR_LEADING_ZERO");break;}case NETSTRING_ERROR_NO_LENGTH:{
    MS_ERROR("NETSTRING_ERROR_NO_LENGTH");break;}}// Error, so reset and exit the parsing loop.this->msgStart      = 0;this->bufferDataLen = 0;return;}// If here it means that msgStart points to the beginning of a message// with msgLen bytes length, so recalculate readLen.readLen =reinterpret_cast<const uint8_t*>(msgStart) - (this->buffer + this->msgStart) + msgLen + 1;this->listener->OnConsumerSocketMessage(this, msgStart, msgLen);// If there is no more space available in the buffer and that is because// the latest parsed message filled it, then empty the full buffer.if ((this->msgStart + readLen) == this->bufferSize){
    this->msgStart      = 0;this->bufferDataLen = 0;}// If there is still space in the buffer, set the beginning of the next// parsing to the next position after the parsed message.else{
    this->msgStart += readLen;}// If there is more data in the buffer after the parsed message// then parse again. Otherwise break here and wait for more data.if (this->bufferDataLen > this->msgStart){
    continue;}break;}}

??子类的读函数中会解析信息,如果信息不止有一个块大小则会一直循环解析到信息完整。进行错误判断后就会调用监听者来进行上层回调到“业务处理层”。

2.6 OnConsumerSocketMessage

void UnixStreamSocket::OnConsumerSocketMessage(ConsumerSocket* /*consumerSocket*/, char* msg, size_t msgLen){
    MS_TRACE_STD();try{
    json jsonMessage = json::parse(msg, msg + msgLen);auto* request    = new Channel::Request(this, jsonMessage);// Notify the listener.try{
    this->listener->OnChannelRequest(this, request);}catch (const MediaSoupTypeError& error){
    request->TypeError(error.what());}catch (const MediaSoupError& error){
    request->Error(error.what());}// Delete the Request.delete request;}catch (const json::parse_error& error){
    MS_ERROR_STD("JSON parsing error: %s", error.what());}catch (const MediaSoupError& error){
    MS_ERROR_STD("discarding wrong Channel request");}}

??该函数将接收到的信息转换成json格式向上传递。
??而json的类型则是 Request类 的形式:

class Request{
    public:enum class MethodId{
    WORKER_DUMP = 1,WORKER_GET_RESOURCE_USAGE,WORKER_UPDATE_SETTINGS,WORKER_CREATE_ROUTER,ROUTER_CLOSE,ROUTER_DUMP,ROUTER_CREATE_WEBRTC_TRANSPORT,ROUTER_CREATE_PLAIN_TRANSPORT,ROUTER_CREATE_PIPE_TRANSPORT,ROUTER_CREATE_DIRECT_TRANSPORT,ROUTER_CREATE_AUDIO_LEVEL_OBSERVER,TRANSPORT_CLOSE,TRANSPORT_DUMP,TRANSPORT_GET_STATS,TRANSPORT_CONNECT,TRANSPORT_SET_MAX_INCOMING_BITRATE,TRANSPORT_RESTART_ICE,TRANSPORT_PRODUCE,TRANSPORT_CONSUME,TRANSPORT_PRODUCE_DATA,TRANSPORT_CONSUME_DATA,TRANSPORT_ENABLE_TRACE_EVENT,PRODUCER_CLOSE,PRODUCER_DUMP,PRODUCER_GET_STATS,PRODUCER_PAUSE,PRODUCER_RESUME,PRODUCER_ENABLE_TRACE_EVENT,CONSUMER_CLOSE,CONSUMER_DUMP,CONSUMER_GET_STATS,CONSUMER_PAUSE,CONSUMER_RESUME,CONSUMER_SET_PREFERRED_LAYERS,CONSUMER_SET_PRIORITY,CONSUMER_REQUEST_KEY_FRAME,CONSUMER_ENABLE_TRACE_EVENT,DATA_PRODUCER_CLOSE,DATA_PRODUCER_DUMP,DATA_PRODUCER_GET_STATS,DATA_CONSUMER_CLOSE,DATA_CONSUMER_DUMP,DATA_CONSUMER_GET_STATS,RTP_OBSERVER_CLOSE,RTP_OBSERVER_PAUSE,RTP_OBSERVER_RESUME,RTP_OBSERVER_ADD_PRODUCER,RTP_OBSERVER_REMOVE_PRODUCER};private:static std::unordered_map<std::string, MethodId> string2MethodId;public:Request(Channel::UnixStreamSocket* channel, json& jsonRequest);virtual ~Request();void Accept();void Accept(json& data);void Error(const char* reason = nullptr);void TypeError(const char* reason = nullptr);public:// Passed by argument.Channel::UnixStreamSocket* channel{
     nullptr };uint32_t id{
     0u };std::string method;MethodId methodId;json internal;json data;// Others.bool replied{
     false };};

2.7 Request

??Request的构造函数中,将json信息初始化。

Request::Request(Channel::UnixStreamSocket* channel, json& jsonRequest) : channel(channel){
    MS_TRACE();auto jsonIdIt = jsonRequest.find("id");if (jsonIdIt == jsonRequest.end() || !Utils::Json::IsPositiveInteger(*jsonIdIt))MS_THROW_ERROR("missing id");this->id = jsonIdIt->get<uint32_t>();auto jsonMethodIt = jsonRequest.find("method");if (jsonMethodIt == jsonRequest.end() || !jsonMethodIt->is_string())MS_THROW_ERROR("missing method");this->method = jsonMethodIt->get<std::string>();auto methodIdIt = Request::string2MethodId.find(this->method);if (methodIdIt == Request::string2MethodId.end()){
    Error("unknown method");MS_THROW_ERROR("unknown method '%s'", this->method.c_str());}this->methodId = methodIdIt->second;auto jsonInternalIt = jsonRequest.find("internal");if (jsonInternalIt != jsonRequest.end() && jsonInternalIt->is_object())this->internal = *jsonInternalIt;elsethis->internal = json::object();auto jsonDataIt = jsonRequest.find("data");if (jsonDataIt != jsonRequest.end() && jsonDataIt->is_object())this->data = *jsonDataIt;elsethis->data = json::object();}

??将json内容转换完成后把Request对象继续向上传递。

2.8 OnChannelRequest

??当Worker接收到Request的数据后,便能做出相应的处理了,接下来进入Worker.cpp的OnChannelRequest()

??switch里就会根据methodId做出相应的处理,当它为:
??a. WORKER_DUMP,表示将Worker中的Router信息都打印出来
??b. WORKER_GET_RESOURCE_USAGE,表示将RU的参数信息打印出来
??c. WORKER_UPDATE_SETTINGS,表示更新设置
??d. WORKER_CREATE_ROUTER,表示创建Router
??e. ROUTER_CLOSE,表示关闭

inline void Worker::OnChannelRequest(Channel::UnixStreamSocket* /*channel*/, Channel::Request* request)
{
    MS_TRACE();MS_DEBUG_DEV("Channel request received [method:%s, id:%" PRIu32 "]", request->method.c_str(), request->id);switch (request->methodId){
    case Channel::Request::MethodId::WORKER_DUMP:{
    json data = json::object();FillJson(data);request->Accept(data);break;}case Channel::Request::MethodId::WORKER_GET_RESOURCE_USAGE:{
    json data = json::object();FillJsonResourceUsage(data);request->Accept(data);break;}case Channel::Request::MethodId::WORKER_UPDATE_SETTINGS:{
    Settings::HandleRequest(request);break;}case Channel::Request::MethodId::WORKER_CREATE_ROUTER:{
    std::string routerId;// This may throw.SetNewRouterIdFromInternal(request->internal, routerId);auto* router = new RTC::Router(routerId);this->mapRouters[routerId] = router;MS_DEBUG_DEV("Router created [routerId:%s]", routerId.c_str());request->Accept();break;}case Channel::Request::MethodId::ROUTER_CLOSE:{
    // This may throw.RTC::Router* router = GetRouterFromInternal(request->internal);// Remove it from the map and delete it.this->mapRouters.erase(router->id);delete router;MS_DEBUG_DEV("Router closed [id:%s]", router->id.c_str());request->Accept();break;}// Any other request must be delivered to the corresponding Router.default:{
    // This may throw.RTC::Router* router = GetRouterFromInternal(request->internal);router->HandleRequest(request);break;}}
}

三、总结

??本章主要捋了一下Channel这部分的信令传输流程,可以看到主要回调形式是通过观察者向上传递。分层处理了信令数据,后续我们会对各个信令的意义进行更详细的分析。