前言
在之前的项目架构中对应用功能进行了分离调整,并使用了 IPC 来处理进程间的交互,虽然是嵌入式设备但是在空间性能足够的情况下采用了 ProtoBuf 来对数据进行解析传递。但到了后面的项目中我发现其他同事采用的数据结构为 Json ,毕竟当时我只提供了一个 IPC 中间件来作为各个应用交互的节点,其他的数据解析则需要根据不同的情况来进行解析处理,因此应用间数据传输的流程如下:
flowchart TB
subgraph App1
direction TB
buffer1("JSON Object")
data1("JSON data")
parser1("JSON serialize/deserialize")
buffer1 <--> parser1 <--> data1
end
subgraph App2
direction TB
buffer2("JSON Object")
data2("JSON data")
parser2("JSON serialize/deserialize")
buffer2 <--> parser2 <--> data2
end
ipc[[ IPC ]]
data1 <--> ipc
data2 <--> ipc
先不说 Json 带来的数据类型转换问题,就作为应用间数据传递的话有几点疑惑:
- 在 IPC 中我们是否需要传递 Key 值?毕竟用于 IPC 基本上是具有确定的 Key
- 我们是否需要这么大的数据进行传输?当 KV 变多了,Json 紧缩后得到的字符串的长度可能远远大于预期
- 每次进行序列/反序列化是否值得?Json 中 KV 是乱序,不一定按照指定的顺序进行排列
总而言之,使用 Json 用于 IPC 进行数据转换和传输并不值得,需要一套更加紧凑的二进制数据和转换更加高效的方式来满足需求
代码实现
完整代码与测试单元:
设计目标
针对数据传输最通用的方式是设计一个结构体,发送方依次拷贝结构体成员中的内容到指定内存空间中并发送出去,接收方通过同样的结构体将得到的数据进行对应的解析转换。这在 C 中是很常见的做法,但使用的是 C++,所以我希望能做到:
- 直接对接对应的容器比如
vector
和 string
- 实现 Binary,struct,Json 三者的互转
转换设计&实现
类型转换
对于整型,浮点以及布尔来说,直接按大小进行拷贝即可。而对于 vector
和 string
来说,本质上是个数组,所以可以按照常规数组设计即可:
转换接口设计
既然是 C++ ,那么可以使用 vector
来作为目标 buffer ,同时可以使用模板来对数组和字符串进行对应转换。那么需要设计两个正反转换的模板类:
using Buffer = std::vector<uint8_t>;
template <typename T> struct FromBuffer {
int operator()(T& dst, const Buffer& buff, off_t offset) { return -1; } };
template <typename T> struct ToBuffer {
void operator()(Buffer& buffer, const T& src) {} };
|
实现
FromBuffer
的实现
对于普通的类型来说直接使用拷贝的方式进行实现,其中 offset
需要考虑 buff
边界问题:
- 当
offset
超出 buf
长度时,需要返回异常
- 当
offset
与参数类型大小相加后超出 buf
长度时,需要返回异常
- 当
offset
等于 buf
长度时,这时候没有需要处理的数据则对目标的元素进行默认赋值
因此不难实现:
template <typename T> struct FromBuffer {
int operator()(T& dst, const Buffer& buff, off_t offset) { if (offset > buff.size() || offset + sizeof(T) > buff.size()) return -1; if (offset == buff.size()) { dst = std::move(T{}); return 0; } memcpy(&dst, buff.data() + offset, sizeof(T)); return sizeof(T); } };
|
针对于 vector
和 string
来说需要 偏特化 来进行实现,这两者的本质结构即数组,因此使用最常用的格式进行实现:
这里的 size
用于记录的是元素个数,而并非 data
的总体长度。考虑到使用场景,这里 size
选用 uint8_t
类型,可以根据使用场景适当的调整 size
范围。当 vector
出现嵌套不定长的数组时也是遵循这种方式进行排列:
在代码上对于嵌套的实现来说,比如 std::vector<std::string>
在模板中就可以通过递归的方式进行解析。同时两者的实现也需要考虑 offset
的边界问题,因此可以对两者进行分开实现:
using StrSizeType = uint8_t;
using ArrSizeType = uint8_t;
template <> struct FromBuffer<std::string> { int operator()(std::string& dst, const Buffer& buff, off_t offset) { constexpr int SizeLen = sizeof(StrSizeType); if (offset > buff.size() || offset + SizeLen > buff.size()) return -1; if (offset == buff.size()) { dst.clear(); return 0; } auto _data = buff.data(); auto _size = *reinterpret_cast<const StrSizeType*>(_data + offset); dst.assign(reinterpret_cast<const char*>(_data + offset + SizeLen), _size); return _size + SizeLen; } };
template <typename T> struct FromBuffer<std::vector<T> > { int operator()(std::vector<T>& dst, const Buffer& buff, off_t offset) { constexpr int SizeLen = sizeof(ArrSizeType); if (offset > buff.size() || offset + SizeLen > buff.size()) return -1; if (offset == buff.size()) { dst.clear(); return 0; } auto _data = buff.data(); auto _size = *reinterpret_cast<const ArrSizeType*>(_data + offset); if constexpr (std::is_class_v<T>) { off_t _offset = offset + SizeLen; for (int i = 0; i < _size; i++) { T _value; auto res = FromBuffer<T>()(_value, buff, _offset); if (res < 0) return res; _offset += res; dst.push_back(std::move(_value)); } return _offset - offset; } else { auto _array = reinterpret_cast<const T*>(_data + offset + SizeLen); for (int i = 0; i < _size; i++) { dst.push_back(_array[i]); } return _size * sizeof(T) + SizeLen; } } };
|
不难看出其实三者的实现有部分是相同的,那么可以将共同的部分单独拿出来缩短代码的同时也有利于在某些情况下减少代码的生成量,可以定义一个命名空间用来专门放置相关的定义和工具类:
namespace buffer {
using Type = std::vector<uint8_t>;
using StrSizeType = uint8_t;
using ArrSizeType = uint8_t;
struct Helper { enum RANGE_STATUS : int8_t { OUT_OF_RANGE = -1, TO_BUFFER_END = 0, IN_RANGE, }; static RANGE_STATUS IsInRange(size_t buff_size, off_t offset, int size_len) { if (offset > buff_size || offset + size_len > buff_size) return OUT_OF_RANGE; if (offset == buff_size) return TO_BUFFER_END; return IN_RANGE; } };
#define RANGE_CHK(BUFF_SIZE, OFFSET, TYPE_SIZE) \ if (auto res = buffer::Helper::IsInRange(BUFF_SIZE, OFFSET, TYPE_SIZE) != buffer::Helper::IN_RANGE) { return res; }
}
|
因此三者的实现就可以调整为:
template <typename T> struct FromBuffer {
int operator()(T& dst, const buffer::Type& buff, off_t offset) { RANGE_CHK(buff.size(), offset, sizeof(T)) memcpy(&dst, buff.data() + offset, sizeof(T)); return sizeof(T); } };
template <> struct FromBuffer<std::string> { int operator()(std::string& dst, const buffer::Type& buff, off_t offset) { constexpr int SizeLen = sizeof(buffer::StrSizeType); RANGE_CHK(buff.size(), offset, SizeLen) auto _data = buff.data(); auto _size = *reinterpret_cast<const buffer::StrSizeType*>(_data + offset); dst.assign(reinterpret_cast<const char*>(_data + offset + SizeLen), _size); return _size + SizeLen; } };
template <typename T> struct FromBuffer<std::vector<T> > { int operator()(std::vector<T>& dst, const buffer::Type& buff, off_t offset) { constexpr int SizeLen = sizeof(buffer::ArrSizeType); RANGE_CHK(buff.size(), offset, SizeLen) auto _data = buff.data(); auto _size = *reinterpret_cast<const buffer::ArrSizeType*>(_data + offset); if constexpr (std::is_class_v<T>) { off_t _offset = offset + SizeLen; for (int i = 0; i < _size; i++) { T _value; auto res = FromBuffer<T>()(_value, buff, _offset); if (res < 0) return res; _offset += res; dst.push_back(std::move(_value)); } return _offset - offset; } else { auto _array = reinterpret_cast<const T*>(_data + offset + SizeLen); for (int i = 0; i < _size; i++) { dst.push_back(_array[i]); } return _size * sizeof(T) + SizeLen; } } };
|
ToBuffer
的实现
三者的实现很简单,毕竟使用了容器来做目标 buffer 空间:
template <typename T> struct ToBuffer {
void operator()(buffer::Type& buffer, const T& src) { auto ptr = reinterpret_cast<const uint8_t*>(&src); for (int i = 0; i < sizeof(T); i++) { buffer.push_back(ptr[i]); } } };
template <> struct ToBuffer<std::string> { void operator()(buffer::Type& buffer, const std::string& src) { ToBuffer<buffer::StrSizeType>()(buffer, static_cast<buffer::StrSizeType>(src.size())); for (auto i : src) { buffer.push_back(i); } } };
template <typename T> struct ToBuffer<std::vector<T> > { void operator()(buffer::Type& buffer, const std::vector<T>& src) { ToBuffer<buffer::ArrSizeType>()(buffer, static_cast<buffer::ArrSizeType>(src.size())); for (const auto& i : src) { ToBuffer<T>()(buffer, i); } } };
|
包装
由于 C++ 中函数无法进行偏特化,因此使用类模板调用时需要指定类型进行调用:
... tools::ToBuffer<std::string>()(buff, str, 0); ...
|
所以需要进行一次调整和封装来更好的便于使用:
- 将转换的整体逻辑迁移至
_impl
命名空间内
- 在外部的命名空间内实现
Serialized
和 Deserialized
两个接口
因此整体可以这样实现:
namespace cvt { namespace _impl { ... }
struct Binary {
template<typename T> static void Serialized(_impl::buffer::Type& buffer, const T& value) { _impl::ToBuffer<T>()(buffer, value); }
template<typename T> static bool Deserialized(T& value, const _impl::buffer::Type& buffer) { return _impl::FromBuffer<T>()(value, buffer, 0) >= 0; } };
}
|
测试
需要编写一个单元测试来验证转换的逻辑是否正确,这里采用 GoogleTest 进行测试。编写主要的转换逻辑函数,将来回转换的数据进行对比即可:
template <typename T> bool CvtTest(const T& val, T& result) { std::vector<uint8_t> buffer; tools::cvt::Binary::Serialized(buffer, val); return tools::cvt::Binary::Deserialized(result, buffer); }
|