8-6_基于嵌入式Linux平台实现RTSP协议

基于嵌入式Linux平台实现RTSP协议

参考源码:

1.程序的编译与运行

1.获取程序源码

git clone https://github.com/ImSjt/RtspServer.git

如果获取不到,可通过课程配套源码获取!

2.进入工程目录

cd RtspServer/

3.修改Makefile

vi Makefile

修改文件中的工具链路径为全志工具链路径

CROSS_COMPILE = ~/tina-v853-100ask/prebuilt/gcc/linux-x86/arm/toolchain-sunxi-musl/toolchain/bin/arm-openwrt-linux-

4.在终端指定存放交叉编译需要使用的库文件头文件的文件夹。

export STAGING_DIR=~/tina-v853-100ask/prebuilt/gcc/linux-x86/arm/toolchain-sunxi-musl/toolchain/arm-openwrt-linux-muslgnueabi/

5.编译程序

make

6.进入可执行文件目录

cd example/

进入之后可以看到生成可执行h264_rtsp_server,将可执行程序传输到开发板上即可运行。

2.程序的核心

示例代码如下所示:

int main(int argc, char* argv[])
{
    if(argc !=  2) //检查命令行参数的数量
    {
        std::cout<<"Usage: "<<argv[0]<<" <h264 file>"<<std::endl;
        return -1;
    }

    //Logger::setLogFile("xxx.log");
    Logger::setLogLevel(Logger::LogWarning);//设置日志级别为警告

    EventScheduler* scheduler = EventScheduler::createNew(EventScheduler::POLLER_SELECT);//建立一个事件调度器,使用select作为I/O多路复用机制。
    ThreadPool* threadPool = ThreadPool::createNew(2);//创建一个包含2个线程的线程池。
    UsageEnvironment* env = UsageEnvironment::createNew(scheduler, threadPool);//创建一个使用上述调度器和线程池的环境对象。

    Ipv4Address ipAddr("0.0.0.0", 8554);//定义服务器的IP地址和端口号。
    RtspServer* server = RtspServer::createNew(env, ipAddr);//创建一个RTSP服务器对象。
    MediaSession* session = MediaSession::createNew("live");//创建一个新的媒体会话,名称为"live"。
    MediaSource* mediaSource = H264FileMediaSource::createNew(env, argv[1]);//创建一个从指定H.264文件读取数据的媒体源。
    RtpSink* rtpSink = H264RtpSink::createNew(env, mediaSource);//创建一个RTP流,用于将媒体源的数据通过RTP协议发送。

    session->addRtpSink(MediaSession::TrackId0, rtpSink);//将RTP接收器添加到媒体会话中
    //session->startMulticast(); //多播

    server->addMeidaSession(session);//将媒体会话添加到RTSP服务器
    server->start();//启动RTSP服务器

    std::cout<<"Play the media using the URL \""<<server->getUrl(session)<<"\""<<std::endl;

    env->scheduler()->loop();//进入事件循环,处理事件。

    return 0;
}

上述代码的作用启动一个RTSP服务器,用于流式传输指定的H.264视频文件,程序的处理流程为:

  1. 检查命令行参数,确保提供了H.264文件路径。
  2. 设置日志级别为警告。
  3. 创建事件调度器和线程池,用于处理I/O操作和多线程任务。
  4. 创建使用环境,结合调度器和线程池。
  5. 定义服务器的IP地址和端口号
  6. 创建RTSP服务器
  7. 创建媒体会话和媒体源,从H.264文件读取数据。
  8. 创建RTP流,通过RTP协议发送媒体数据。
  9. 将RTP接收器添加到媒体会话中
  10. 将媒体会话添加到RTSP服务器
  11. 启动RTSP服务器
  12. 输出播放URL,供客户端使用。
  13. 进入事件循环,处理事件。

2.1 消息响应RtspConnection

程序通过调用RtspConnection类中handleReadBytes函数处理来自客户端的消息。

void RtspConnection::handleReadBytes()
{
    bool ret;

    if(mIsRtpOverTcp)
    {
        if(mInputBuffer.peek()[0] == '$')
        {
            handleRtpOverTcp();
            return;
        }
    }

    ret = parseRequest();
    if(ret != true)
    {
        LOG_WARNING("failed to parse request\n");
        goto err;
    }

    switch (mMethod)
    {
    case OPTIONS:
        if(handleCmdOption() != true)
            goto err;
        break;
    case DESCRIBE:
        if(handleCmdDescribe() != true)
            goto err;
        break;
    case SETUP:
        if(handleCmdSetup() != true)
            goto err;
        break;
    case PLAY:
        if(handleCmdPlay() != true)
            goto err;
        break;
    case TEARDOWN:
        if(handleCmdTeardown() != true)
            goto err;
        break;
    case GET_PARAMETER:
        if(handleCmdGetParamter() != true)
            goto err;
        break;
    default:
        goto err;
        break;
    }

    return;
err:
    handleDisconnection();
}

2.1.1 OPTIONS响应

bool RtspConnection::handleCmdOption()
{
    snprintf(mBuffer, sizeof(mBuffer),
            "RTSP/1.0 200 OK\r\n"
            "CSeq: %u\r\n"
            "Public: OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY\r\n"
            "\r\n", mCSeq);

    if(sendMessage(mBuffer, strlen(mBuffer)) < 0)
        return false;

    return true;
}

2.1.2 Describe响应

bool RtspConnection::handleCmdDescribe()
{
    MediaSession* session = mRtspServer->loopupMediaSession(mSuffix);
    if(!session)
    {
        LOG_DEBUG("can't loop up %s session\n", mSuffix.c_str());
        return false;
    }

    mSession = session;
    std::string sdp = session->generateSDPDescription();

    memset((void*)mBuffer, 0, sizeof(mBuffer));
    snprintf((char*)mBuffer, sizeof(mBuffer),
            "RTSP/1.0 200 OK\r\n"
            "CSeq: %u\r\n"
            "Content-Length: %u\r\n"
            "Content-Type: application/sdp\r\n"
            "\r\n"
            "%s",
            mCSeq,
            (unsigned int)sdp.size(),
            sdp.c_str());

    if(sendMessage(mBuffer, strlen(mBuffer)) < 0)
            return false;

    return true;
}

2.1.3 Setup响应

bool RtspConnection::handleCmdSetup()
{
    char sessionName[100];
    if(sscanf(mSuffix.c_str(), "%[^/]/", sessionName) != 1)
    {
        return false;
    }

    MediaSession* session = mRtspServer->loopupMediaSession(sessionName);
    if(!session)
    {
        LOG_DEBUG("can't loop up %s session\n", sessionName);
        return false;
    }

    if(mTrackId >= MEDIA_MAX_TRACK_NUM || mRtpInstances[mTrackId] || mRtcpInstances[mTrackId])
        return false;

    if(session->isStartMulticast())
    {
        snprintf((char*)mBuffer, sizeof(mBuffer),
                    "RTSP/1.0 200 OK\r\n"
                    "CSeq: %d\r\n"
                    "Transport: RTP/AVP;multicast;"
                    "destination=%s;source=%s;port=%d-%d;ttl=255\r\n"
                    "Session: %08x\r\n"
                    "\r\n",
                    mCSeq,
                    session->getMulticastDestAddr().c_str(),
                    sockets::getLocalIp().c_str(),
                    session->getMulticastDestRtpPort(mTrackId),
                    session->getMulticastDestRtpPort(mTrackId)+1,
                    mSessionId);
    }
    else
    {
        if(mIsRtpOverTcp) //rtp over tcp
        {
            /* 创建rtp over tcp */
            createRtpOverTcp(mTrackId, mSocket.fd(), mRtpChannel);
            mRtpInstances[mTrackId]->setSessionId(mSessionId);
            session->addRtpInstance(mTrackId, mRtpInstances[mTrackId]);

            snprintf((char*)mBuffer, sizeof(mBuffer),
                        "RTSP/1.0 200 OK\r\n"
                        "CSeq: %d\r\n"
                        "Transport: RTP/AVP/TCP;unicast;interleaved=%hhu-%hhu\r\n"
                        "Session: %08x\r\n"
                        "\r\n",
                        mCSeq,
                        mRtpChannel,
                        mRtpChannel+1,
                        mSessionId);
        }
        else //rtp over udp
        {
            if(createRtpRtcpOverUdp(mTrackId, mPeerIp, mPeerRtpPort, mPeerRtcpPort) != true)//通过UDP创建RTP和RTCP连接
            {
                LOG_WARNING("failed to create rtp and rtcp\n");
                return false;
            }

            mRtpInstances[mTrackId]->setSessionId(mSessionId);
            mRtcpInstances[mTrackId]->setSessionId(mSessionId);

            /* 添加到会话中 */
            session->addRtpInstance(mTrackId, mRtpInstances[mTrackId]);

            snprintf((char*)mBuffer, sizeof(mBuffer),
                    "RTSP/1.0 200 OK\r\n"
                    "CSeq: %u\r\n"
                    "Transport: RTP/AVP;unicast;client_port=%hu-%hu;server_port=%hu-%hu\r\n"
                    "Session: %08x\r\n"
                    "\r\n",
                    mCSeq,
                    mPeerRtpPort,
                    mPeerRtcpPort,
                    mRtpInstances[mTrackId]->getLocalPort(), 
                    mRtcpInstances[mTrackId]->getLocalPort(),
                    mSessionId);
        }
        
    }

    if(sendMessage(mBuffer, strlen(mBuffer)) < 0)
        return false;

    return true;
}

2.1.4 Play响应

bool RtspConnection::handleCmdPlay()
{
    snprintf((char*)mBuffer, sizeof(mBuffer),
            "RTSP/1.0 200 OK\r\n"
            "CSeq: %d\r\n"
            "Range: npt=0.000-\r\n"
            "Session: %08x; timeout=60\r\n"
            "\r\n",
            mCSeq,
            mSessionId);

    if(sendMessage(mBuffer, strlen(mBuffer)) < 0)
        return false;

    for(int i = 0; i < MEDIA_MAX_TRACK_NUM; ++i)
    {
        if(mRtpInstances[i])
            mRtpInstances[i]->setAlive(true);
        
        if(mRtcpInstances[i])
            mRtcpInstances[i]->setAlive(true);
    }

    return true;
}

2.1.5 Teardown响应

bool RtspConnection::handleCmdTeardown()
{
    snprintf((char*)mBuffer, sizeof(mBuffer),
            "RTSP/1.0 200 OK\r\n"
            "CSeq: %d\r\n"
            "\r\n",
            mCSeq);

    if(sendMessage(mBuffer, strlen(mBuffer)) < 0)
    {
        return false;
    }
    
    return true;
}

2.2 生成SDP数据包

std::string MediaSession::generateSDPDescription()
{
    if(!mSdp.empty())
        return mSdp;
    
    std::string ip = sockets::getLocalIp();
    char buf[2048] = {0};

    snprintf(buf, sizeof(buf),
        "v=0\r\n"
        "o=- 9%ld 1 IN IP4 %s\r\n"
        "t=0 0\r\n"
        "a=control:*\r\n"
        "a=type:broadcast\r\n",
        (long)time(NULL), ip.c_str());

    if(isStartMulticast())
    {
        snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
                "a=rtcp-unicast: reflection\r\n");
    }

    for(int i = 0; i < MEDIA_MAX_TRACK_NUM; ++i)
    {
        uint16_t port = 0;

        if(mTracks[i].mIsAlive != true)
            continue;

        if(isStartMulticast())
            port = getMulticastDestRtpPort((TrackId)i);
        
        snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
                    "%s\r\n", mTracks[i].mRtpSink->getMediaDescription(port).c_str());

        if(isStartMulticast())
            snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
                        "c=IN IP4 %s/255\r\n", getMulticastDestAddr().c_str());
        else
            snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
                        "c=IN IP4 0.0.0.0\r\n");

        snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
                    "%s\r\n", mTracks[i].mRtpSink->getAttribute().c_str());

        snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),											
                "a=control:track%d\r\n", mTracks[i].mTrackId);
    }

    mSdp = buf;
    return mSdp;
}

2.3 通过UDP创建RTP和RTCP连接

bool RtspConnection::createRtpRtcpOverUdp(MediaSession::TrackId trackId, std::string peerIp,
                                    uint16_t peerRtpPort, uint16_t peerRtcpPort)
{
    int rtpSockfd, rtcpSockfd;
    int16_t rtpPort, rtcpPort;
    bool ret;

    if(mRtpInstances[trackId] || mRtcpInstances[trackId])
        return false;

    int i;
    for(i = 0; i < 10; ++i)
    {
        rtpSockfd = sockets::createUdpSock();
        if(rtpSockfd < 0)
        {
            return false;
        }

        rtcpSockfd = sockets::createUdpSock();
        if(rtcpSockfd < 0)
        {
            close(rtpSockfd);
            return false;
        }

        uint16_t port = rand() & 0xfffe;
        if(port < 10000)
            port += 10000;
        
        rtpPort = port;
        rtcpPort = port+1;

        ret = sockets::bind(rtpSockfd, "0.0.0.0", rtpPort);
        if(ret != true)
        {
            sockets::close(rtpSockfd);
            sockets::close(rtcpSockfd);
            continue;
        }

        ret = sockets::bind(rtcpSockfd, "0.0.0.0", rtcpPort);
        if(ret != true)
        {
            sockets::close(rtpSockfd);
            sockets::close(rtcpSockfd);
            continue;
        }

        break;
    }

    if(i == 10)
        return false;

    mRtpInstances[trackId] = RtpInstance::createNewOverUdp(rtpSockfd, rtpPort,
                                                        peerIp, peerRtpPort);
    mRtcpInstances[trackId] = RtcpInstance::createNew(rtcpSockfd, rtcpPort,
                                                        peerIp, peerRtcpPort);

    return true;
}

2.5 回调函数处理RTP包的发送

void RtpSink::timeoutCallback(void* arg)
{
    RtpSink* rtpSink = (RtpSink*)arg;
    AVFrame* frame = rtpSink->mMediaSource->getFrame();
    if(!frame)
    {
        return;
    }

    rtpSink->handleFrame(frame);

    rtpSink->mMediaSource->putFrame(frame);
}

2.5.1 处理和发送RTP包

void H264RtpSink::handleFrame(AVFrame* frame)
{
    RtpHeader* rtpHeader = mRtpPacket.mRtpHeadr;
    uint8_t naluType = frame->mFrame[0];

    if(frame->mFrameSize <= RTP_MAX_PKT_SIZE)
    {
        memcpy(rtpHeader->payload, frame->mFrame, frame->mFrameSize);
        mRtpPacket.mSize = frame->mFrameSize;
        sendRtpPacket(&mRtpPacket);
        mSeq++;

        if ((naluType & 0x1F) == 7 || (naluType & 0x1F) == 8) // 如果是SPS、PPS就不需要加时间戳
            return;
    }
    else
    {
        int pktNum = frame->mFrameSize / RTP_MAX_PKT_SIZE;       // 有几个完整的包
        int remainPktSize = frame->mFrameSize % RTP_MAX_PKT_SIZE; // 剩余不完整包的大小
        int i, pos = 1;

        /* 发送完整的包 */
        for (i = 0; i < pktNum; i++)
        {
            /*
            *     FU Indicator
            *    0 1 2 3 4 5 6 7
            *   +-+-+-+-+-+-+-+-+
            *   |F|NRI|  Type   |
            *   +---------------+
            * */
            rtpHeader->payload[0] = (naluType & 0x60) | 28; //(naluType & 0x60)表示nalu的重要性,28表示为分片
            
            /*
            *      FU Header
            *    0 1 2 3 4 5 6 7
            *   +-+-+-+-+-+-+-+-+
            *   |S|E|R|  Type   |
            *   +---------------+
            * */
            rtpHeader->payload[1] = naluType & 0x1F;
            
            if (i == 0) //第一包数据
                rtpHeader->payload[1] |= 0x80; // start
            else if (remainPktSize == 0 && i == pktNum - 1) //最后一包数据
                rtpHeader->payload[1] |= 0x40; // end

            memcpy(rtpHeader->payload+2, frame->mFrame+pos, RTP_MAX_PKT_SIZE);
            mRtpPacket.mSize = RTP_MAX_PKT_SIZE+2;
            sendRtpPacket(&mRtpPacket);

            mSeq++;
            pos += RTP_MAX_PKT_SIZE;
        }

        /* 发送剩余的数据 */
        if (remainPktSize > 0)
        {
            rtpHeader->payload[0] = (naluType & 0x60) | 28;
            rtpHeader->payload[1] = naluType & 0x1F;
            rtpHeader->payload[1] |= 0x40; //end

            memcpy(rtpHeader->payload+2, frame->mFrame+pos, remainPktSize);
            mRtpPacket.mSize = remainPktSize+2;
            sendRtpPacket(&mRtpPacket);

            mSeq++;
        }
    }
    
    mTimestamp += mClockRate/mFps;
}