硬件 & 版本依据:ARM A53,ARMv7,ARM GCC 11.1.0
前言
最近学习性能优化时想起这么多项目当前使用的数据缓冲队列,虽然经历了多个产品版本迭代,但是并没得到重视因此并没有进行良好的改进(毕竟代码和人一个能跑就行)。因此这里做一次优化分析和改进,顺便掌握下性能工具的用法
分析
使用流程分析
从业务场景出发分析,大致的流程如下:
flowchart TD
subgraph Kernel
Camera(相机)-->Encode(硬件编码)
end
subgraph CameraService
SetofFrameQueue(多分辨率 std::set<FrameQueue*> 数组)
subgraph 事件处理
CreateMem(创建 FrameBufferMemory*)
CreateMem --> ChooseResoultion(选择对应分辨率集合)
ChooseResoultion --> Push(将数据指针压入队列)
Push --> WaitForNext([等待下一次触发])
end
SetofFrameQueue <--> ChooseResoultion
end
Encode --> |通知| 事件处理
Worker(业务模块 1..N)
Worker --> |创建| FrameQueue(FrameQueue 1..N)
FrameQueue --> |注册| SetofFrameQueue
Worker ----> PopTimedOut(超时等待数据指针弹出队列)
FrameQueue <----> PopTimedOut
PopTimedOut --> Process([业务处理])
从整体的流程看出,一个相机服务下面其实托管了多个分辨率的 std::set<FrameQueue*>
数组,由于历史原因,选择对应数组的下标与既定的分辨率数组和硬件相对应,大致代码如下:
#define MAX_CHANNEL 8 struct RESOLUTION { int32_t w; int32_t h; int32_t maxFps; }; class CameraService { ... private: RESOLUTION m_resolutions[MAX_CHANNEL]; std::set<std::shared_ptr<FrameQueue> > m_outputs[MAX_CHANNEL]; };
|
由于是嵌入式业务高并发需求相对较低,但是通过选用数组+集合的形式来管理是否有点过于小题大做?是否可以通过一个 FrameQueue
采用订阅发布模式进行处理呢?
代码分析
最主要的实现为 Buffer
和 Queue
,UML 如下:
classDiagram
direction RL
class FrameBuffer {
<< Abstract >>
+uint32_t getFrameSize(PIXEL_FMT type, int32_t w, int32_t h)$
+void setSeq(uint32_t seq)
+uint32_t getSeq()
+void setTimeStamp(uint64_t timestamp)
+uint64_t getTimeStamp()
+void setPixelExtFmt(const PIXEL_EXT_FMT& fmt)*
+PIXEL_EXT_FMT getPixelExtFmt()*
+PIXEL_FMT getPixelFmt()*
+int32_t getWidth()*
+int32_t getHeight()*
+char* getVirAddr(int32_t idx, uint32_t& bufSize)*
+uint64_t getPhyAddr(int32_t idx)*
+void setDataLen(uint32_t len)*
+uint32_t getDataLen()*
+void setExtData(void* pBuf, uint32_t bufSize)*
+void* getExtData(uint32_t& bufSize)*
+uint32_t writeData(const char* data, uint32_t dataLen, uint32_t offset)*
+uint32_t readData(char* buf, uint32_t bufSize, uint32_t offset)*
- m_seq : uint32_t
- m_timestamp : uint64_t
}
class FrameQueue {
+FrameQueue(uint32_t maxSize)
+void push(const std::shared_ptr~FrameBuffer~& buffer)
+std::shared_ptr~FrameBuffer~ pop(uint32_t wait)
+bool empty()
+bool full()
+void clear()
+uint32_t size()
-std::mutex m_mutex
-std::condition_variable m_cond
-std::list~std::shared_ptr~FrameBuffer~ ~ m_bufferList
-int32_t m_w
-int32_t m_h
-PIXEL_FMT m_fmt
-uint32_t m_maxSize
}
class PIXEL_FMT {
<< Enumeration >>
PIXEL_FMT_UNKNOWN
PIXEL_FMT_BAYER_RGGB16
PIXEL_FMT_YUYV
PIXEL_FMT_NV12
PIXEL_FMT_H264
PIXEL_FMT_H265
PIXEL_FMT_MJPEG
PIXEL_FMT_RAW16
PIXEL_FMT_AAC
PIXEL_FMT_PCM
}
class PIXEL_EXT_FMT {
<< Enumeration >>
PIXEL_FMT_EXT_UNKNOWN
PIXEL_FMT_EXT_I
PIXEL_FMT_EXT_P
PIXEL_FMT_EXT_B
}
FrameBuffer--PIXEL_FMT
FrameBuffer--PIXEL_EXT_FMT
FrameBufferMemory-->FrameBuffer
FrameQueue*--FrameBuffer
随着基线的切换很难找到最初始的版本,但设计的意图比较明显:从 FrameBuffer
出发对不同类型视频帧进行实现,然后使用 FrameQueue
形成缓冲管理,不过随着业务发展外加缺乏 Code Review 导致现在拥有了一个奇怪的 class FrameBufferMemory
继承然后又混入了音频帧类型,然后音频帧的 Buffer 也拥有视频帧的成员函数?既然是这样,那这个父类又有何意义?这里列出 FrameBufferMemory
成员变量以及对应的实现:
class { private: PIXEL_FMT m_type; PIXEL_EXT_FMT m_extFmt; char* m_buffer; uint32_t m_dataLen; uint32_t m_bufferSize; int32_t m_width; int32_t m_height; std::mutex m_mutex; void* m_pExtData; uint32_t m_ExtDataSizeInBytes; };
|
实现:
FrameBufferMemory::FrameBufferMemory(PIXEL_FMT type, void* buf, int32_t w, int32_t h, uint32_t bufSize) : m_type(type), m_extFmt(PIXEL_FMT_EXT_UNKNOWN), m_buffer((char*)buf), m_bufferSize(bufSize), m_width(w), m_height(h) { m_pExtData = NULL; m_ExtDataSizeInBytes = 0; }
FrameBufferMemory::~FrameBufferMemory() { free(m_buffer); if (m_pExtData != NULL) { free(m_pExtData); m_pExtData = NULL; } m_ExtDataSizeInBytes = 0; }
void FrameBufferMemory::setPixelExtFmt(const PIXEL_EXT_FMT& fmt) { std::lock_guard<std::mutex> lock(m_mutex); m_extFmt = fmt; }
PIXEL_FMT FrameBufferMemory::getPixelFmt() { std::lock_guard<std::mutex> lock(m_mutex); return m_type; }
PIXEL_EXT_FMT FrameBufferMemory::getPixelExtFmt() { std::lock_guard<std::mutex> lock(m_mutex); return m_extFmt; }
int32_t FrameBufferMemory::getWidth() { std::lock_guard<std::mutex> lock(m_mutex); return m_width; }
int32_t FrameBufferMemory::getHeight() { std::lock_guard<std::mutex> lock(m_mutex); return m_height; }
char* FrameBufferMemory::getVirAddr(int32_t idx, uint32_t& bufSize) { std::lock_guard<std::mutex> lock(m_mutex); bufSize = m_bufferSize; return (char*)m_buffer; }
uint32_t FrameBufferMemory::getDataLen() { std::lock_guard<std::mutex> lock(m_mutex); return m_dataLen; }
void FrameBufferMemory::setDataLen(uint32_t len) { std::lock_guard<std::mutex> lock(m_mutex); m_dataLen = len; if (m_dataLen > m_bufferSize) { m_dataLen = m_bufferSize; } }
void FrameBufferMemory::setExtData(void* pBuf, uint32_t bufSize) { std::lock_guard<std::mutex> lock(m_mutex); if (m_pExtData != NULL) { free(m_pExtData); m_pExtData = NULL; } m_pExtData = pBuf; m_ExtDataSizeInBytes = bufSize; }
void* FrameBufferMemory::getExtData(uint32_t& bufSize) { std::lock_guard<std::mutex> lock(m_mutex); void* pBuf = m_pExtData; bufSize = m_ExtDataSizeInBytes; return pBuf; }
uint32_t FrameBufferMemory::writeData(const char* data, uint32_t dataLen, uint32_t offset) { std::lock_guard<std::mutex> lock(m_mutex); if (offset + dataLen > m_bufferSize) { dataLen = m_bufferSize - offset; } memcpy(m_buffer + offset, data, dataLen); m_dataLen = dataLen + offset; return dataLen; }
uint32_t FrameBufferMemory::readData(char* buf, uint32_t bufSize, uint32_t offset) { std::lock_guard<std::mutex> lock(m_mutex); if (offset >= m_dataLen) { return 0; }
if (bufSize > m_dataLen - offset) { bufSize = m_dataLen - offset; } memcpy(buf, m_buffer + offset, bufSize); return bufSize; }
|
不难发现一些问题:
- 不相关的帧类型也可以调用无关的成员函数
- 锁的粒度太大:一些只有 get 操作的成员变量也有对锁的操作,倘若出现一个线程在写入内存数据而另一个线程只需要快速查询 width 和 height 的场景时,查询线程必须等待写入线程释放锁才能继续
- 锁的数量过多:这需要和
FrameQueue
结合来分析,就以目前项目中最大容量 4 来看,总共 8 个不同的分辨率,就会有 32 把锁,如果容量和分辨率数量增多,那锁的数量超乎你的想象
- 内存的申请和释放不在同一个对象里管理:这里只有析构里面调用
free
,但是所申请的内存是由外面传入,这有可能导致调用者出现二次释放的危险
- 函数接口设计混乱:比如对 extdata 的设计,set 和 get 的设计十分混乱
- 缺乏闭包设计:很多参数可以通过设计合适的结构体来传入/获取,比如构造函数中的参数就过多了,如果后续继续增加那么就会出现参数压栈的情况
总而言之,这是一个很差的实现,接下来看下 FrameQueue
:
FrameQueue::FrameQueue(uint32_t maxSize) : m_maxSize(maxSize) { m_w = 0; m_h = 0; m_fmt = PIXEL_FMT_UNKNOWN; }
FrameQueue::~FrameQueue() {}
void FrameQueue::push(const std::shared_ptr<FrameBuffer>& buffer) { std::lock_guard<std::mutex> lock(m_mutex); if (m_w != 0 && buffer->getWidth() != m_w) { return; }
if (m_h != 0 && buffer->getHeight() != m_h) { return; }
if (m_fmt != PIXEL_FMT_UNKNOWN && buffer->getPixelFmt() != m_fmt) { return; }
if (m_bufferList.size() == m_maxSize) { m_bufferList.pop_front(); } m_bufferList.push_back(buffer); if (m_bufferList.size() == 1) { m_cond.notify_all(); } }
std::shared_ptr<FrameBuffer> FrameQueue::pop(uint32_t wait) { std::unique_lock<std::mutex> lock(m_mutex); if (wait == 0 && m_bufferList.empty()) { return nullptr; }
while (m_bufferList.empty()) { if (m_cond.wait_for(lock, std::chrono::milliseconds(wait)) == std::cv_status::timeout) { return nullptr; } }
if (m_bufferList.empty()) { return nullptr; }
auto front = m_bufferList.front(); m_bufferList.pop_front(); return front; }
void FrameQueue::clear() { std::lock_guard<std::mutex> lock(m_mutex); m_bufferList.clear(); }
uint32_t FrameQueue::size() { std::lock_guard<std::mutex> lock(m_mutex); return m_bufferList.size(); }
bool FrameQueue::empty() { std::lock_guard<std::mutex> lock(m_mutex); return m_bufferList.empty(); }
bool FrameQueue::full() { std::lock_guard<std::mutex> lock(m_mutex); return m_bufferList.size() == m_maxSize; }
|
总结下设计问题:
- 缺乏事件驱动设计:只存在超时
pop
的设计,毕竟 cv 的 wait_for
中设计存在循环
- 存在隐性的内存泄漏风险:根据
pop
的设计可以看出,当数据从 m_bufferList
中拿出后,这块内存就不再由 FrameQueue
管理,数据的释放取决于最后一个模块,也就是引用计数归 0 时,但如果出现某个模块占有时间过长,那么就会出现泄露的风险
- 释放的性能问题:根据现在的设计,既然依靠智能指针对
FrameBuffer
进行管理,那么最终释放内存归还给系统的路径在最后一个模块使用完毕的地方
- 未能复用内存:复用内存能得到更好的性能,比如采用常见的环形队列,这样可以减少容器使用的开销
从代码上分析不难看出 Buffer 和 Queue 两者的设计确实差劲