版本依据
Kernel-5.10.117 , live555-2020


live555 作为一款 RTSP 开源库被广泛的应用在各种项目中间,但它并不是一个多线程库,虽然单线程应用在响应上具有一定的优势,但作为一个库融入到其他应用中时难免会导致一些性能上的问题,因此这里使用 perf 做一次性能分析

正常分析

使用 perf 记录 live555MediaServer 性能开销,记录拉流下的一小时数据

perf record -e cycles --call-graph dwarf ./live555mediaserver test.264

统计记录结果如下:

从这里可以看出 live555 是使用的 select() 来实现的任务管理,其中系统调用部分中网络发送的开销占用达到了22%,而 tcp_sendmsg_locked() 占了最主要的部分,查看相关的源码:

int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size)
{
int ret;

lock_sock(sk);
ret = tcp_sendmsg_locked(sk, msg, size);
release_sock(sk);

return ret;
}

static inline void lock_sock(struct sock *sk)
{
lock_sock_nested(sk, 0);
}

void lock_sock_nested(struct sock *sk, int subclass)
{
might_sleep();
spin_lock_bh(&sk->sk_lock.slock);
if (sk->sk_lock.owned)
__lock_sock(sk);
sk->sk_lock.owned = 1;
spin_unlock(&sk->sk_lock.slock);
/*
* The sk_lock has mutex_lock() semantics here:
*/
mutex_acquire(&sk->sk_lock.dep_map, subclass, 0, _RET_IP_);
local_bh_enable();
}

可以得出实际的调用堆栈如下:

flowchart TD
    subgraph "tcp_sendmsg()"
        f_lock_sock("lock_sock()")
        f_tcp_sendmsg_locked("f_tcp_sendmsg_locked()")
        f_release_sock("release_sock()")
        f_lock_sock --> f_tcp_sendmsg_locked
        f_tcp_sendmsg_locked --> f_release_sock
    end

不难看出,这个步骤会对对应的 socket 进行锁操作,同时 lock_sock() 中还存在一个函数 local_bh_enable(), 这个函数用于控制底层硬件中断,但它在 tcp_sendmsg() 中也有不小的占比。当然这对于单体应用来说这样确实没什么问题,但是在多业务应用中是否可以减少这部分的占比来提高性能呢?

优化调用的尝试

源码分析

RTSP 最终发出去的是 RTP 包,因此可以查看下 live555 最终发出 RTP 包的地方,不难找出和发送数据包相关的类与成员函数:

---
title: RTP发包相关函数
---
classDiagram
direction LR
    class Medium  {
        <<Abstract>>
    }
    class RTPInterface{
        + setStreamSocket()
        + addStreamSocket()
        + removeStreamSocket()
        + sendPacket() Boolean
        - sendRTPorRTCPPacketOverTCP() Boolean
        - sendDataOverTCP() Boolean
        - class tcpStreamRecord* fTCPStreams;
    }
    class RTCPInstance {
        + sendAppPacket()
        + sendReport()
        + sendBYE()
        - sendBuiltPacket()
        - OutPacketBuffer* fOutBuf
        - RTPInterface fRTCPInterface
    }
    Medium <|-- RTCPInstance
    RTCPInstance *-- RTPInterface

同时可以分析出发包相关的流程如下:

flowchart LR
    subgraph RTPInterface
        f_sendPacket("sendPacket()")
        f_sendRTPorRTCPPacketOverTCP("sendRTPorRTCPPacketOverTCP()")
        f_sendDataOverTCP("sendDataOverTCP()")
        f_sendDataOverUDP("(send data over udp...)")
        f_sendPacket --> f_sendDataOverUDP
        f_sendPacket --> f_sendRTPorRTCPPacketOverTCP
        f_sendRTPorRTCPPacketOverTCP --> f_sendDataOverTCP
    end
    subgraph RTCPInstance
        f_sendAppPacket("sendAppPacket()")
        f_sendReport("sendReport()")
        f_sendBYE("sendBYE()")
        f_sendBuiltPacket("sendBuiltPacket()")
        f_sendAppPacket --> f_sendBuiltPacket
        f_sendReport --> f_sendBuiltPacket
        f_sendBYE --> f_sendBuiltPacket
    end
    f_sendBuiltPacket --> f_sendPacket

可以看出发包的关键位于 RTPInterface::sendPacket() ,这里会发出 UDP 和 TCP 对应的 RTP 数据包:

Boolean RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
Boolean success = True; // we'll return False instead if any of the sends fail
// Normal case: Send as a UDP packet:
if (!fGS->output(envir(), packet, packetSize)) success = False;

// Also, send over each of our TCP sockets:
tcpStreamRecord* nextStream;
for (tcpStreamRecord* stream = fTCPStreams; stream != NULL; stream = nextStream) {
nextStream = stream->fNext; // Set this now, in case the following deletes "stream":
if (!sendRTPorRTCPPacketOverTCP(packet, packetSize,
stream->fStreamSocketNum, stream->fStreamChannelId)) {
success = False;
}
}
return success;
}

从这里不难看出 live555 是使用的链表来对 TCP 客户端进行管理,通过 setStreamSocket()addStreamSocket()removeStreamSocket() 这些接口对 fTCPStreams 链表进行操作。同时也不难看出,如果处于高并发的应用场景下 live555 的默认实现并不是一个很好的选择。这里主要分析的是 TCP 方式,所以接下来查看 sendRTPorRTCPPacketOverTCP()

Boolean RTPInterface::sendRTPorRTCPPacketOverTCP(u_int8_t* packet, unsigned packetSize,
int socketNum, unsigned char streamChannelId) {
#ifdef DEBUG_SEND
fprintf(stderr, "sendRTPorRTCPPacketOverTCP: %d bytes over channel %d (socket %d)\n",
packetSize, streamChannelId, socketNum); fflush(stderr);
#endif
// Send a RTP/RTCP packet over TCP, using the encoding defined in RFC 2326, section 10.12:
// $<streamChannelId><packetSize><packet>
// (If the initial "send()" of '$<streamChannelId><packetSize>' succeeds, then we force
// the subsequent "send()" for the <packet> data to succeed, even if we have to do so with
// a blocking "send()".)
do {
u_int8_t framingHeader[4];
framingHeader[0] = '$';
framingHeader[1] = streamChannelId;
framingHeader[2] = (u_int8_t) ((packetSize&0xFF00)>>8);
framingHeader[3] = (u_int8_t) (packetSize&0xFF);
if (!sendDataOverTCP(socketNum, framingHeader, 4, False)) break;
if (!sendDataOverTCP(socketNum, packet, packetSize, True)) break;
#ifdef DEBUG_SEND
fprintf(stderr, "sendRTPorRTCPPacketOverTCP: completed\n"); fflush(stderr);
#endif
return True;
} while (0);
#ifdef DEBUG_SEND
fprintf(stderr, "sendRTPorRTCPPacketOverTCP: failed! (errno %d)\n", envir().getErrno()); fflush(stderr);
#endif
return False;
}

这里可以发现 RTP 发包还有一个前缀,用于标识数据包的相关信息。同时它并不是一次发送完成,而是分成了两步进行发送:

  1. 首先发出包的相关数据
  2. 再将帧数据发送出去

那么这里调用的 RTPInterface::sendDataOverTCP() 如下:

Boolean RTPInterface::sendDataOverTCP(int socketNum, u_int8_t const* data, unsigned dataSize, Boolean forceSendToSucceed) {
int sendResult = send(socketNum, (char const*)data, dataSize, 0/*flags*/);
if (sendResult < (int)dataSize) {
// The TCP send() failed - at least partially.

unsigned numBytesSentSoFar = sendResult < 0 ? 0 : (unsigned)sendResult;
if (numBytesSentSoFar > 0 || (forceSendToSucceed && envir().getErrno() == EAGAIN)) {
// The OS's TCP send buffer has filled up (because the stream's bitrate has exceeded
// the capacity of the TCP connection!).
// Force this data write to succeed, by blocking if necessary until it does:
unsigned numBytesRemainingToSend = dataSize - numBytesSentSoFar;
#ifdef DEBUG_SEND
fprintf(stderr, "sendDataOverTCP: resending %d-byte send (blocking)\n", numBytesRemainingToSend); fflush(stderr);
#endif
makeSocketBlocking(socketNum, RTPINTERFACE_BLOCKING_WRITE_TIMEOUT_MS);
sendResult = send(socketNum, (char const*)(&data[numBytesSentSoFar]), numBytesRemainingToSend, 0/*flags*/);
if ((unsigned)sendResult != numBytesRemainingToSend) {
// The blocking "send()" failed, or timed out. In either case, we assume that the
// TCP connection has failed (or is 'hanging' indefinitely), and we stop using it
// (for both RTP and RTP).
// (If we kept using the socket here, the RTP or RTCP packet write would be in an
// incomplete, inconsistent state.)
#ifdef DEBUG_SEND
fprintf(stderr, "sendDataOverTCP: blocking send() failed (delivering %d bytes out of %d); closing socket %d\n", sendResult, numBytesRemainingToSend, socketNum); fflush(stderr);
#endif
removeStreamSocket(socketNum, 0xFF);
return False;
}
makeSocketNonBlocking(socketNum);

return True;
} else if (sendResult < 0 && envir().getErrno() != EAGAIN) {
// Because the "send()" call failed, assume that the socket is now unusable, so stop
// using it (for both RTP and RTCP):
removeStreamSocket(socketNum, 0xFF);
}

return False;
}

return True;
}

可以看到这里的核心是调用 send(),而且 live555 在这也做了重发的逻辑,使用阻塞超时的方式继续/重新发送剩余/未发送成功数据。显然,如果处于高并发的情况下不利用多线程的优势这里会导致后面的客户端收到对应的延迟时间增大

尝试改进

考虑发送成功的情况下,sendRTPorRTCPPacketOverTCP() 中调用 sendDataOverTCP() 就出现了两次内核上下文的切换,虽然 framingHeader 作为的是4个字节的数组,复制到 socket 缓存区中的方式可能会被优化成赋值,但考虑到数据的大小以及SO_SNDBUF设置的大小,是否值得进行一次上下文切换?
在考虑成功的情况下对 sendRTPorRTCPPacketOverTCP() 中进行了修改,采用 iovec 将不连续的内存拼接在一起发送:

Boolean RTPInterface::sendRTPorRTCPPacketOverTCP(u_int8_t* packet, unsigned packetSize,
int socketNum, unsigned char streamChannelId) {
#ifdef DEBUG_SEND
fprintf(stderr, "sendRTPorRTCPPacketOverTCP: %d bytes over channel %d (socket %d)\n",
packetSize, streamChannelId, socketNum); fflush(stderr);
#endif
// Send a RTP/RTCP packet over TCP, using the encoding defined in RFC 2326, section 10.12:
// $<streamChannelId><packetSize><packet>
// (If the initial "send()" of '$<streamChannelId><packetSize>' succeeds, then we force
// the subsequent "send()" for the <packet> data to succeed, even if we have to do so with
// a blocking "send()".)
do {
u_int8_t framingHeader[4];
framingHeader[0] = '$';
framingHeader[1] = streamChannelId;
framingHeader[2] = (u_int8_t) ((packetSize&0xFF00)>>8);
framingHeader[3] = (u_int8_t) (packetSize&0xFF);
#if USE_IOV
struct iovec iov[2];
iov[0].iov_base = framingHeader;
iov[0].iov_len = 4;
iov[1].iov_base = packet;
iov[1].iov_len = packetSize;
if (writev(socketNum, iov, 2) <= 0) break;
#else
if (!sendDataOverTCP(socketNum, framingHeader, 4, False)) break;
if (!sendDataOverTCP(socketNum, packet, packetSize, True)) break;
#endif
#ifdef DEBUG_SEND
fprintf(stderr, "sendRTPorRTCPPacketOverTCP: completed\n"); fflush(stderr);
#endif

return True;
} while (0);

#ifdef DEBUG_SEND
fprintf(stderr, "sendRTPorRTCPPacketOverTCP: failed! (errno %d)\n", envir().getErrno()); fflush(stderr);
#endif
return False;
}

然后再次通过 perf 进行了记录:

可以看到总体的样本数有所下降,而且 tcp_sendmsg() 由前面的 21.02% 下降到了 16.75% ,而且预览并未受到影响,说明通过将非连续的内存连接在一起进行发送会在一定程度上提高系统性能。由于 live555 在用户层对 RTP 进行了分包以至于每次调用 send() 的时数据大小均不会超过 MTU 的 1500 限制,在 MultiFramedRTPSink.cpp 中可以看到分包的限制:

#ifndef RTP_PAYLOAD_MAX_SIZE
#define RTP_PAYLOAD_MAX_SIZE 1456
// Default max packet size (1500, minus allowance for IP, UDP, UMTP headers)
// (Also, make it a multiple of 4 bytes, just in case that matters.)
#endif
#ifndef RTP_PAYLOAD_PREFERRED_SIZE
#define RTP_PAYLOAD_PREFERRED_SIZE ((RTP_PAYLOAD_MAX_SIZE) < 1000 ? (RTP_PAYLOAD_MAX_SIZE) : 1000)
#endif

MultiFramedRTPSink::MultiFramedRTPSink(UsageEnvironment& env,
Groupsock* rtpGS,
unsigned char rtpPayloadType,
unsigned rtpTimestampFrequency,
char const* rtpPayloadFormatName,
unsigned numChannels)
: RTPSink(env, rtpGS, rtpPayloadType, rtpTimestampFrequency,
rtpPayloadFormatName, numChannels),
fOutBuf(NULL), fCurFragmentationOffset(0), fPreviousFrameEndedFragmentation(False),
fOnSendErrorFunc(NULL), fOnSendErrorData(NULL) {

setPacketSizes((RTP_PAYLOAD_PREFERRED_SIZE), (RTP_PAYLOAD_MAX_SIZE));
}

总结

从此可以看到,包头+数据一起的总大小不会超过 1460,也就是 MSS 的最大值,但实际上可以从上面两者的比较看出,ipv4_mtu() 在前后两次的比例分别为 0.17%0.09% ,说明其实虽然分两次发送不需要消耗 MTU 相关函数过多的时间,但是实际上一次性满足分包大小进行发送的开销更好
从这两次可以观察到 skb_do_copy_data_nocache() 这个函数在两次的结果中并没有减少比重(1.75%1.68%),说明将两个数据结合成一次发送并不会减少从用户层向内核层拷贝的比重,只是减少了上下文的切换,那这个部分是否可以减少呢?

零拷贝的尝试

翻了下 tcp_sendmsg_locked() 源码会发现一个标志位 zc

int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
{
struct tcp_sock *tp = tcp_sk(sk);
struct ubuf_info *uarg = NULL;
struct sk_buff *skb;
struct sockcm_cookie sockc;
int flags, err, copied = 0;
int mss_now = 0, size_goal, copied_syn = 0;
int process_backlog = 0;
bool zc = false;
long timeo;

flags = msg->msg_flags;

if (flags & MSG_ZEROCOPY && size && sock_flag(sk, SOCK_ZEROCOPY)) {
skb = tcp_write_queue_tail(sk);
uarg = sock_zerocopy_realloc(sk, size, skb_zcopy(skb));
if (!uarg) {
err = -ENOBUFS;
goto out_err;
}
...
}

很显然,Linux 提供了用户层和内核层之间的零拷贝方式,即 SOCK_ZEROCOPYMSG_ZEROCOPY

SOCK_ZEROCOPYMSG_ZEROCOPY 的使用

这部分源于谷歌的 PPT Making TCP faster and cheaper for applications ,这里大致总结下使用方式

  1. 首先需要启用 socketSOCK_ZEROCOPY,这里必须在 bind() 前进行设置,不能随时开启:
// create socket
int const enabled = 1;
auto res = setsockopt(newSocket, SOL_SOCKET, SO_ZEROCOPY, &enabled, sizeof enabled);
// bind
  1. 然后使用 send()sendmsg() 时就可以带上 MSG_ZEROCOPY 标识:
...
send(sock, buff, 0, MSG_ZEROCOPY);
...
  1. 最后需要监听 socket 的错误队列,来得知数据已经被硬件发送出去,可以使用 poll 来监听 socketPOLLERR 事件来减少自旋的时间:
...
do {
ret = recvmsg(socket, &msg, MSG_ERRQUEUE)
} while (ret < 0 && errno == EINTR);

修改代码

因此对代码可以进行修改:

  1. GroupsockHelper.cpp 中的 setupStreamSocket() 启用 SO_ZEROCOPY 选项
int setupStreamSocket(UsageEnvironment& env,
Port port, Boolean makeNonBlocking, Boolean setKeepAlive) {
...
// 附加到最后启用ZEROCOPY
int const enabled = 1;
auto res = setsockopt(newSocket, SOL_SOCKET, SO_ZEROCOPY, &enabled, sizeof enabled);
printf("enable SO_ZEROCOPY: %d\n", res);
return newSocket;
}
  1. 修改 RTPInterface::sendRTPorRTCPPacketOverTCP() 采用 MSG_ZEROCOPY 发送非连续数据:
Boolean RTPInterface::sendRTPorRTCPPacketOverTCP(u_int8_t* packet, unsigned packetSize,
int socketNum, unsigned char streamChannelId) {
...
do {
u_int8_t framingHeader[4];
framingHeader[0] = '$';
framingHeader[1] = streamChannelId;
framingHeader[2] = (u_int8_t) ((packetSize&0xFF00)>>8);
framingHeader[3] = (u_int8_t) (packetSize&0xFF);
#if USE_ZEROMSG
// 组合包
struct iovec iov[2];
iov[0].iov_base = framingHeader;
iov[0].iov_len = 4;
iov[1].iov_base = packet;
iov[1].iov_len = packetSize;
// 标识MSG_ZEROCOPY并调用sendmsg
struct msghdr msg;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 2;
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = MSG_ZEROCOPY;
sendmsg(socketNum, &msg, MSG_ZEROCOPY);
// 使用poll监听错误事件
struct pollfd pfd;
pfd.fd = socketNum;
pfd.events = 0;
if (poll(&pfd, 1, -1) != 1 || pfd.revents & POLLERR == 0)
{
perror("poll error");
return False;
}
struct msghdr errMsg;
memset(&errMsg, 0, sizeof(errMsg));
// 等待错误消息, 收到消息说明数据已被发送出去
int ret = 0;
do
{
ret = recvmsg(socketNum, &errMsg, MSG_ERRQUEUE);
}
while (ret < 0 && errno == EINTR);
#else
if (!sendDataOverTCP(socketNum, framingHeader, 4, False)) break;
if (!sendDataOverTCP(socketNum, packet, packetSize, True)) break;
#endif
#ifdef DEBUG_SEND
fprintf(stderr, "sendRTPorRTCPPacketOverTCP: completed\n"); fflush(stderr);
#endif

return True;
} while (0);
...
}

修改后再次通过 perf 进行记录:

可以看到 syscall 的总量出现了下降,同时通过零拷贝来进行数据的发送。虽然结果看上去很美好,但客户端预览的画面并不连续,客户端每播放 1s 会出现卡顿导致效果并不如意。而且 poll 的使用不应该处于每次发包的函数中,此处只是为了验证可行性。如需要更好的改造应该是针对 class tcpStreamRecord 实现一套监听机制,但这样就需要引入多线程来进行处理。那是不是有更好的办法来解决?
很不幸,还真没有
虽然零拷贝有多种方案,但是可以将用户空间的内容带入内核空间并进行成功发送的方法目前只有两种:

  1. 使用 SOCK_ZEROCOPYMSG_ZEROCOPY(如上)
  2. 采用 PF_PACKETsocket 直接将数据写入到 TXRingBuffer 中(对于 live555 来说会有巨大的修改量)

或许你还知道通过 vmsplice()splice() 以及 pipe 来将用户空间的 buffer 地址传入到内核空间中进行发送:

但是很可惜,虽然这样做到了零拷贝,但是 splice() 返回的数据长度总和可能等于两个包的大小,但是这个包不一定被硬件发送了出去,总而言之,使用这个方法 splice()不知道数据是否已经被送出去,以至于没法确定重用 buffer 的时机,这也就导致了接收端收到错误的数据

总结

综上,虽然使用 SOCK_ZEROCOPYMSG_ZEROCOPY 取得了一定的效果,但是需要对源码进行大幅度的修改才能达到更好的效果

Ref

splice() (Linus Torvalds)
splice() on Linux
Rethinking splice() [LWN.net]
netdevconf.org/2.1/papers/netdev.pdf
Making TCP faster and cheaper for applications