基于嵌入式Linux平台实现RTSP协议
参考源码:
- 简单的RTSP服务器:https://github.com/ImSjt/RtspServer
1.程序的编译与运行
1.获取程序源码
git clone https://github.com/ImSjt/RtspServer.git
如果获取不到,可通过课程配套源码获取!
2.进入工程目录
cd RtspServer/
3.修改Makefile
vi Makefile
修改文件中的工具链路径为全志工具链路径
CROSS_COMPILE = ~/tina-v853-100ask/prebuilt/gcc/linux-x86/arm/toolchain-sunxi-musl/toolchain/bin/arm-openwrt-linux-
4.在终端指定存放交叉编译需要使用的库文件头文件的文件夹。
export STAGING_DIR=~/tina-v853-100ask/prebuilt/gcc/linux-x86/arm/toolchain-sunxi-musl/toolchain/arm-openwrt-linux-muslgnueabi/
5.编译程序
make
6.进入可执行文件目录
cd example/
进入之后可以看到生成可执行h264_rtsp_server
,将可执行程序传输到开发板上即可运行。
2.程序的核心
示例代码如下所示:
int main(int argc, char* argv[])
{
if(argc != 2) //检查命令行参数的数量
{
std::cout<<"Usage: "<<argv[0]<<" <h264 file>"<<std::endl;
return -1;
}
//Logger::setLogFile("xxx.log");
Logger::setLogLevel(Logger::LogWarning);//设置日志级别为警告
EventScheduler* scheduler = EventScheduler::createNew(EventScheduler::POLLER_SELECT);//建立一个事件调度器,使用select作为I/O多路复用机制。
ThreadPool* threadPool = ThreadPool::createNew(2);//创建一个包含2个线程的线程池。
UsageEnvironment* env = UsageEnvironment::createNew(scheduler, threadPool);//创建一个使用上述调度器和线程池的环境对象。
Ipv4Address ipAddr("0.0.0.0", 8554);//定义服务器的IP地址和端口号。
RtspServer* server = RtspServer::createNew(env, ipAddr);//创建一个RTSP服务器对象。
MediaSession* session = MediaSession::createNew("live");//创建一个新的媒体会话,名称为"live"。
MediaSource* mediaSource = H264FileMediaSource::createNew(env, argv[1]);//创建一个从指定H.264文件读取数据的媒体源。
RtpSink* rtpSink = H264RtpSink::createNew(env, mediaSource);//创建一个RTP流,用于将媒体源的数据通过RTP协议发送。
session->addRtpSink(MediaSession::TrackId0, rtpSink);//将RTP接收器添加到媒体会话中
//session->startMulticast(); //多播
server->addMeidaSession(session);//将媒体会话添加到RTSP服务器
server->start();//启动RTSP服务器
std::cout<<"Play the media using the URL \""<<server->getUrl(session)<<"\""<<std::endl;
env->scheduler()->loop();//进入事件循环,处理事件。
return 0;
}
上述代码的作用启动一个RTSP服务器,用于流式传输指定的H.264视频文件,程序的处理流程为:
- 检查命令行参数,确保提供了H.264文件路径。
- 设置日志级别为警告。
- 创建事件调度器和线程池,用于处理I/O操作和多线程任务。
- 创建使用环境,结合调度器和线程池。
- 定义服务器的IP地址和端口号。
- 创建RTSP服务器。
- 创建媒体会话和媒体源,从H.264文件读取数据。
- 创建RTP流,通过RTP协议发送媒体数据。
- 将RTP接收器添加到媒体会话中。
- 将媒体会话添加到RTSP服务器。
- 启动RTSP服务器。
- 输出播放URL,供客户端使用。
- 进入事件循环,处理事件。
2.1 消息响应RtspConnection
程序通过调用RtspConnection类中handleReadBytes函数处理来自客户端的消息。
void RtspConnection::handleReadBytes()
{
bool ret;
if(mIsRtpOverTcp)
{
if(mInputBuffer.peek()[0] == '$')
{
handleRtpOverTcp();
return;
}
}
ret = parseRequest();
if(ret != true)
{
LOG_WARNING("failed to parse request\n");
goto err;
}
switch (mMethod)
{
case OPTIONS:
if(handleCmdOption() != true)
goto err;
break;
case DESCRIBE:
if(handleCmdDescribe() != true)
goto err;
break;
case SETUP:
if(handleCmdSetup() != true)
goto err;
break;
case PLAY:
if(handleCmdPlay() != true)
goto err;
break;
case TEARDOWN:
if(handleCmdTeardown() != true)
goto err;
break;
case GET_PARAMETER:
if(handleCmdGetParamter() != true)
goto err;
break;
default:
goto err;
break;
}
return;
err:
handleDisconnection();
}
2.1.1 OPTIONS响应
bool RtspConnection::handleCmdOption()
{
snprintf(mBuffer, sizeof(mBuffer),
"RTSP/1.0 200 OK\r\n"
"CSeq: %u\r\n"
"Public: OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY\r\n"
"\r\n", mCSeq);
if(sendMessage(mBuffer, strlen(mBuffer)) < 0)
return false;
return true;
}
2.1.2 Describe响应
bool RtspConnection::handleCmdDescribe()
{
MediaSession* session = mRtspServer->loopupMediaSession(mSuffix);
if(!session)
{
LOG_DEBUG("can't loop up %s session\n", mSuffix.c_str());
return false;
}
mSession = session;
std::string sdp = session->generateSDPDescription();
memset((void*)mBuffer, 0, sizeof(mBuffer));
snprintf((char*)mBuffer, sizeof(mBuffer),
"RTSP/1.0 200 OK\r\n"
"CSeq: %u\r\n"
"Content-Length: %u\r\n"
"Content-Type: application/sdp\r\n"
"\r\n"
"%s",
mCSeq,
(unsigned int)sdp.size(),
sdp.c_str());
if(sendMessage(mBuffer, strlen(mBuffer)) < 0)
return false;
return true;
}
2.1.3 Setup响应
bool RtspConnection::handleCmdSetup()
{
char sessionName[100];
if(sscanf(mSuffix.c_str(), "%[^/]/", sessionName) != 1)
{
return false;
}
MediaSession* session = mRtspServer->loopupMediaSession(sessionName);
if(!session)
{
LOG_DEBUG("can't loop up %s session\n", sessionName);
return false;
}
if(mTrackId >= MEDIA_MAX_TRACK_NUM || mRtpInstances[mTrackId] || mRtcpInstances[mTrackId])
return false;
if(session->isStartMulticast())
{
snprintf((char*)mBuffer, sizeof(mBuffer),
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Transport: RTP/AVP;multicast;"
"destination=%s;source=%s;port=%d-%d;ttl=255\r\n"
"Session: %08x\r\n"
"\r\n",
mCSeq,
session->getMulticastDestAddr().c_str(),
sockets::getLocalIp().c_str(),
session->getMulticastDestRtpPort(mTrackId),
session->getMulticastDestRtpPort(mTrackId)+1,
mSessionId);
}
else
{
if(mIsRtpOverTcp) //rtp over tcp
{
/* 创建rtp over tcp */
createRtpOverTcp(mTrackId, mSocket.fd(), mRtpChannel);
mRtpInstances[mTrackId]->setSessionId(mSessionId);
session->addRtpInstance(mTrackId, mRtpInstances[mTrackId]);
snprintf((char*)mBuffer, sizeof(mBuffer),
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Transport: RTP/AVP/TCP;unicast;interleaved=%hhu-%hhu\r\n"
"Session: %08x\r\n"
"\r\n",
mCSeq,
mRtpChannel,
mRtpChannel+1,
mSessionId);
}
else //rtp over udp
{
if(createRtpRtcpOverUdp(mTrackId, mPeerIp, mPeerRtpPort, mPeerRtcpPort) != true)//通过UDP创建RTP和RTCP连接
{
LOG_WARNING("failed to create rtp and rtcp\n");
return false;
}
mRtpInstances[mTrackId]->setSessionId(mSessionId);
mRtcpInstances[mTrackId]->setSessionId(mSessionId);
/* 添加到会话中 */
session->addRtpInstance(mTrackId, mRtpInstances[mTrackId]);
snprintf((char*)mBuffer, sizeof(mBuffer),
"RTSP/1.0 200 OK\r\n"
"CSeq: %u\r\n"
"Transport: RTP/AVP;unicast;client_port=%hu-%hu;server_port=%hu-%hu\r\n"
"Session: %08x\r\n"
"\r\n",
mCSeq,
mPeerRtpPort,
mPeerRtcpPort,
mRtpInstances[mTrackId]->getLocalPort(),
mRtcpInstances[mTrackId]->getLocalPort(),
mSessionId);
}
}
if(sendMessage(mBuffer, strlen(mBuffer)) < 0)
return false;
return true;
}
2.1.4 Play响应
bool RtspConnection::handleCmdPlay()
{
snprintf((char*)mBuffer, sizeof(mBuffer),
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Range: npt=0.000-\r\n"
"Session: %08x; timeout=60\r\n"
"\r\n",
mCSeq,
mSessionId);
if(sendMessage(mBuffer, strlen(mBuffer)) < 0)
return false;
for(int i = 0; i < MEDIA_MAX_TRACK_NUM; ++i)
{
if(mRtpInstances[i])
mRtpInstances[i]->setAlive(true);
if(mRtcpInstances[i])
mRtcpInstances[i]->setAlive(true);
}
return true;
}
2.1.5 Teardown响应
bool RtspConnection::handleCmdTeardown()
{
snprintf((char*)mBuffer, sizeof(mBuffer),
"RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"\r\n",
mCSeq);
if(sendMessage(mBuffer, strlen(mBuffer)) < 0)
{
return false;
}
return true;
}
2.2 生成SDP数据包
std::string MediaSession::generateSDPDescription()
{
if(!mSdp.empty())
return mSdp;
std::string ip = sockets::getLocalIp();
char buf[2048] = {0};
snprintf(buf, sizeof(buf),
"v=0\r\n"
"o=- 9%ld 1 IN IP4 %s\r\n"
"t=0 0\r\n"
"a=control:*\r\n"
"a=type:broadcast\r\n",
(long)time(NULL), ip.c_str());
if(isStartMulticast())
{
snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
"a=rtcp-unicast: reflection\r\n");
}
for(int i = 0; i < MEDIA_MAX_TRACK_NUM; ++i)
{
uint16_t port = 0;
if(mTracks[i].mIsAlive != true)
continue;
if(isStartMulticast())
port = getMulticastDestRtpPort((TrackId)i);
snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
"%s\r\n", mTracks[i].mRtpSink->getMediaDescription(port).c_str());
if(isStartMulticast())
snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
"c=IN IP4 %s/255\r\n", getMulticastDestAddr().c_str());
else
snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
"c=IN IP4 0.0.0.0\r\n");
snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
"%s\r\n", mTracks[i].mRtpSink->getAttribute().c_str());
snprintf(buf+strlen(buf), sizeof(buf)-strlen(buf),
"a=control:track%d\r\n", mTracks[i].mTrackId);
}
mSdp = buf;
return mSdp;
}
2.3 通过UDP创建RTP和RTCP连接
bool RtspConnection::createRtpRtcpOverUdp(MediaSession::TrackId trackId, std::string peerIp,
uint16_t peerRtpPort, uint16_t peerRtcpPort)
{
int rtpSockfd, rtcpSockfd;
int16_t rtpPort, rtcpPort;
bool ret;
if(mRtpInstances[trackId] || mRtcpInstances[trackId])
return false;
int i;
for(i = 0; i < 10; ++i)
{
rtpSockfd = sockets::createUdpSock();
if(rtpSockfd < 0)
{
return false;
}
rtcpSockfd = sockets::createUdpSock();
if(rtcpSockfd < 0)
{
close(rtpSockfd);
return false;
}
uint16_t port = rand() & 0xfffe;
if(port < 10000)
port += 10000;
rtpPort = port;
rtcpPort = port+1;
ret = sockets::bind(rtpSockfd, "0.0.0.0", rtpPort);
if(ret != true)
{
sockets::close(rtpSockfd);
sockets::close(rtcpSockfd);
continue;
}
ret = sockets::bind(rtcpSockfd, "0.0.0.0", rtcpPort);
if(ret != true)
{
sockets::close(rtpSockfd);
sockets::close(rtcpSockfd);
continue;
}
break;
}
if(i == 10)
return false;
mRtpInstances[trackId] = RtpInstance::createNewOverUdp(rtpSockfd, rtpPort,
peerIp, peerRtpPort);
mRtcpInstances[trackId] = RtcpInstance::createNew(rtcpSockfd, rtcpPort,
peerIp, peerRtcpPort);
return true;
}
2.5 回调函数处理RTP包的发送
void RtpSink::timeoutCallback(void* arg)
{
RtpSink* rtpSink = (RtpSink*)arg;
AVFrame* frame = rtpSink->mMediaSource->getFrame();
if(!frame)
{
return;
}
rtpSink->handleFrame(frame);
rtpSink->mMediaSource->putFrame(frame);
}
2.5.1 处理和发送RTP包
void H264RtpSink::handleFrame(AVFrame* frame)
{
RtpHeader* rtpHeader = mRtpPacket.mRtpHeadr;
uint8_t naluType = frame->mFrame[0];
if(frame->mFrameSize <= RTP_MAX_PKT_SIZE)
{
memcpy(rtpHeader->payload, frame->mFrame, frame->mFrameSize);
mRtpPacket.mSize = frame->mFrameSize;
sendRtpPacket(&mRtpPacket);
mSeq++;
if ((naluType & 0x1F) == 7 || (naluType & 0x1F) == 8) // 如果是SPS、PPS就不需要加时间戳
return;
}
else
{
int pktNum = frame->mFrameSize / RTP_MAX_PKT_SIZE; // 有几个完整的包
int remainPktSize = frame->mFrameSize % RTP_MAX_PKT_SIZE; // 剩余不完整包的大小
int i, pos = 1;
/* 发送完整的包 */
for (i = 0; i < pktNum; i++)
{
/*
* FU Indicator
* 0 1 2 3 4 5 6 7
* +-+-+-+-+-+-+-+-+
* |F|NRI| Type |
* +---------------+
* */
rtpHeader->payload[0] = (naluType & 0x60) | 28; //(naluType & 0x60)表示nalu的重要性,28表示为分片
/*
* FU Header
* 0 1 2 3 4 5 6 7
* +-+-+-+-+-+-+-+-+
* |S|E|R| Type |
* +---------------+
* */
rtpHeader->payload[1] = naluType & 0x1F;
if (i == 0) //第一包数据
rtpHeader->payload[1] |= 0x80; // start
else if (remainPktSize == 0 && i == pktNum - 1) //最后一包数据
rtpHeader->payload[1] |= 0x40; // end
memcpy(rtpHeader->payload+2, frame->mFrame+pos, RTP_MAX_PKT_SIZE);
mRtpPacket.mSize = RTP_MAX_PKT_SIZE+2;
sendRtpPacket(&mRtpPacket);
mSeq++;
pos += RTP_MAX_PKT_SIZE;
}
/* 发送剩余的数据 */
if (remainPktSize > 0)
{
rtpHeader->payload[0] = (naluType & 0x60) | 28;
rtpHeader->payload[1] = naluType & 0x1F;
rtpHeader->payload[1] |= 0x40; //end
memcpy(rtpHeader->payload+2, frame->mFrame+pos, remainPktSize);
mRtpPacket.mSize = remainPktSize+2;
sendRtpPacket(&mRtpPacket);
mSeq++;
}
}
mTimestamp += mClockRate/mFps;
}