前言

在之前的项目架构中对应用功能进行了分离调整,并使用了 IPC 来处理进程间的交互,虽然是嵌入式设备但是在空间性能足够的情况下采用了 ProtoBuf 来对数据进行解析传递。但到了后面的项目中我发现其他同事采用的数据结构为 Json ,毕竟当时我只提供了一个 IPC 中间件来作为各个应用交互的节点,其他的数据解析则需要根据不同的情况来进行解析处理,因此应用间数据传输的流程如下:

flowchart TB
    subgraph App1
        direction TB
        buffer1("JSON Object")
        data1("JSON data")
        parser1("JSON serialize/deserialize")
        buffer1 <--> parser1 <--> data1
    end
    subgraph App2
        direction TB
        buffer2("JSON Object")
        data2("JSON data")
        parser2("JSON serialize/deserialize")
        buffer2 <--> parser2 <--> data2
    end
    ipc[[      IPC      ]]
    data1 <--> ipc
    data2 <--> ipc

先不说 Json 带来的数据类型转换问题,就作为应用间数据传递的话有几点疑惑:

  • 在 IPC 中我们是否需要传递 Key 值?毕竟用于 IPC 基本上是具有确定的 Key
  • 我们是否需要这么大的数据进行传输?当 KV 变多了,Json 紧缩后得到的字符串的长度可能远远大于预期
  • 每次进行序列/反序列化是否值得?Json 中 KV 是乱序,不一定按照指定的顺序进行排列
    总而言之,使用 Json 用于 IPC 进行数据转换和传输并不值得,需要一套更加紧凑的二进制数据和转换更加高效的方式来满足需求

代码实现

完整代码与测试单元:

设计目标

针对数据传输最通用的方式是设计一个结构体,发送方依次拷贝结构体成员中的内容到指定内存空间中并发送出去,接收方通过同样的结构体将得到的数据进行对应的解析转换。这在 C 中是很常见的做法,但使用的是 C++,所以我希望能做到:

  1. 直接对接对应的容器比如 vectorstring
  2. 实现 BinarystructJson 三者的互转

转换设计&实现

类型转换

对于整型,浮点以及布尔来说,直接按大小进行拷贝即可。而对于 vectorstring 来说,本质上是个数组,所以可以按照常规数组设计即可:

// Array structure
// +----------------+---------------------------------------------+
// | size (4 bytes) | data (size * (element`s size) bytes) |
// +----------------+---------------------------------------------+

转换接口设计

既然是 C++ ,那么可以使用 vector 来作为目标 buffer ,同时可以使用模板来对数组和字符串进行对应转换。那么需要设计两个正反转换的模板类:

// 使用vector作为buffer
using Buffer = std::vector<uint8_t>;
/**
* 反序列化的核心类
* @tparam T 目标类型
*/
template <typename T>
struct FromBuffer {
/**
* 重载()用于转换返回
* @param dst 目标地址
* @param buff 待转换的buffer空间
* @param offset buffer指针偏移量
* @return 正常返回已处理字节数, 异常时返回 -1
*/
int operator()(T& dst, const Buffer& buff, off_t offset) {
return -1;
}
};

/**
* 序列化核心类
* @tparam T 源类型
*/
template <typename T>
struct ToBuffer {
/**
* 重载()用于转换返回
* @param dst 目标buffer
* @param src 源类型
*/
void operator()(Buffer& buffer, const T& src) {}
};

实现

FromBuffer 的实现

对于普通的类型来说直接使用拷贝的方式进行实现,其中 offset 需要考虑 buff 边界问题:

  1. offset 超出 buf 长度时,需要返回异常
  2. offset 与参数类型大小相加后超出 buf 长度时,需要返回异常
  3. offset 等于 buf 长度时,这时候没有需要处理的数据则对目标的元素进行默认赋值

因此不难实现:

/**
* 反序列化的核心类
* @tparam T 目标类型
*/
template <typename T>
struct FromBuffer {
/**
* 重载()用于转换返回
* @param dst 目标地址
* @param buff 待转换的buffer空间
* @param offset buffer指针偏移量
* @return 正常返回已处理字节数, 异常时返回 -1
*/
int operator()(T& dst, const Buffer& buff, off_t offset) {
if (offset > buff.size() || offset + sizeof(T) > buff.size()) return -1;
if (offset == buff.size()) {
dst = std::move(T{});
return 0;
}
memcpy(&dst, buff.data() + offset, sizeof(T));
return sizeof(T);
}
};

针对于 vectorstring 来说需要 偏特化 来进行实现,这两者的本质结构即数组,因此使用最常用的格式进行实现:

// Array structure
// +----------------+---------------------------------------------+
// | size (1 byte) | data (size * (element`s size) bytes) |
// +----------------+---------------------------------------------+

这里的 size 用于记录的是元素个数,而并非 data 的总体长度。考虑到使用场景,这里 size 选用 uint8_t 类型,可以根据使用场景适当的调整 size 范围。当 vector 出现嵌套不定长的数组时也是遵循这种方式进行排列:

// +------+-------------------+-------------------+  
// | size | Elem 1 | Elem 2 | ....
// +------+------+------------+------+------------+
// | | size | data | size | data | ...
// +------+------+------------+------+------------+

在代码上对于嵌套的实现来说,比如 std::vector<std::string> 在模板中就可以通过递归的方式进行解析。同时两者的实现也需要考虑 offset 的边界问题,因此可以对两者进行分开实现:

// 字符串长度类型
using StrSizeType = uint8_t;
// 数组长度类型
using ArrSizeType = uint8_t;

template <>
struct FromBuffer<std::string> {
int operator()(std::string& dst, const Buffer& buff, off_t offset) {
constexpr int SizeLen = sizeof(StrSizeType);
if (offset > buff.size() || offset + SizeLen > buff.size()) return -1;
if (offset == buff.size()) {
dst.clear();
return 0;
}
auto _data = buff.data();
auto _size = *reinterpret_cast<const StrSizeType*>(_data + offset);
dst.assign(reinterpret_cast<const char*>(_data + offset + SizeLen), _size);
return _size + SizeLen;
}
};

template <typename T>
struct FromBuffer<std::vector<T> > {
int operator()(std::vector<T>& dst, const Buffer& buff, off_t offset) {
constexpr int SizeLen = sizeof(ArrSizeType);
if (offset > buff.size() || offset + SizeLen > buff.size()) return -1;
if (offset == buff.size()) {
dst.clear();
return 0;
}
auto _data = buff.data();
auto _size = *reinterpret_cast<const ArrSizeType*>(_data + offset);
// 判断是否为class, 用于处理嵌套情况
if constexpr (std::is_class_v<T>) {
off_t _offset = offset + SizeLen;
for (int i = 0; i < _size; i++) {
T _value;
// 递归调用, 当解析失败时返回
auto res = FromBuffer<T>()(_value, buff, _offset);
if (res < 0) return res;
_offset += res;
dst.push_back(std::move(_value));
}
return _offset - offset;
} else {
auto _array = reinterpret_cast<const T*>(_data + offset + SizeLen);
for (int i = 0; i < _size; i++) {
dst.push_back(_array[i]);
}
return _size * sizeof(T) + SizeLen;
}
}
};

不难看出其实三者的实现有部分是相同的,那么可以将共同的部分单独拿出来缩短代码的同时也有利于在某些情况下减少代码的生成量,可以定义一个命名空间用来专门放置相关的定义和工具类:

namespace buffer {
// buffer类型
using Type = std::vector<uint8_t>;
// 字符串长度类型
using StrSizeType = uint8_t;
// 数组长度类型
using ArrSizeType = uint8_t;

struct Helper {
enum RANGE_STATUS : int8_t {
OUT_OF_RANGE = -1,
TO_BUFFER_END = 0,
IN_RANGE,
};
// 范围判断
static RANGE_STATUS IsInRange(size_t buff_size, off_t offset, int size_len) {
if (offset > buff_size || offset + size_len > buff_size) return OUT_OF_RANGE;
if (offset == buff_size) return TO_BUFFER_END;
return IN_RANGE;
}
};

#define RANGE_CHK(BUFF_SIZE, OFFSET, TYPE_SIZE) \
if (auto res = buffer::Helper::IsInRange(BUFF_SIZE, OFFSET, TYPE_SIZE) != buffer::Helper::IN_RANGE) { return res; }

} // buffer

因此三者的实现就可以调整为:

/**
* 反序列化的核心类
* @tparam T 目标类型
*/
template <typename T>
struct FromBuffer {
/**
* 重载()用于转换返回
* @param dst 目标地址
* @param buff 待转换的buffer空间
* @param offset buffer指针偏移量
* @return 正常返回已处理字节数, 异常时返回 -1
*/
int operator()(T& dst, const buffer::Type& buff, off_t offset) {
RANGE_CHK(buff.size(), offset, sizeof(T))
memcpy(&dst, buff.data() + offset, sizeof(T));
return sizeof(T);
}
};

template <>
struct FromBuffer<std::string> {
int operator()(std::string& dst, const buffer::Type& buff, off_t offset) {
constexpr int SizeLen = sizeof(buffer::StrSizeType);
RANGE_CHK(buff.size(), offset, SizeLen)
auto _data = buff.data();
auto _size = *reinterpret_cast<const buffer::StrSizeType*>(_data + offset);
dst.assign(reinterpret_cast<const char*>(_data + offset + SizeLen), _size);
return _size + SizeLen;
}
};

template <typename T>
struct FromBuffer<std::vector<T> > {
int operator()(std::vector<T>& dst, const buffer::Type& buff, off_t offset) {
constexpr int SizeLen = sizeof(buffer::ArrSizeType);
RANGE_CHK(buff.size(), offset, SizeLen)
auto _data = buff.data();
auto _size = *reinterpret_cast<const buffer::ArrSizeType*>(_data + offset);
// 判断是否为class, 用于处理嵌套情况
if constexpr (std::is_class_v<T>) {
off_t _offset = offset + SizeLen;
for (int i = 0; i < _size; i++) {
T _value;
// 递归调用, 当解析失败时返回
auto res = FromBuffer<T>()(_value, buff, _offset);
if (res < 0) return res;
_offset += res;
dst.push_back(std::move(_value));
}
return _offset - offset;
} else {
auto _array = reinterpret_cast<const T*>(_data + offset + SizeLen);
for (int i = 0; i < _size; i++) {
dst.push_back(_array[i]);
}
return _size * sizeof(T) + SizeLen;
}
}
};

ToBuffer 的实现

三者的实现很简单,毕竟使用了容器来做目标 buffer 空间:

/**
* 序列化核心类
* @tparam T 源类型
*/
template <typename T>
struct ToBuffer {
/**
* 重载()用于转换返回
* @param dst 目标buffer
* @param src 源类型
*/
void operator()(buffer::Type& buffer, const T& src) {
auto ptr = reinterpret_cast<const uint8_t*>(&src);
for (int i = 0; i < sizeof(T); i++) {
buffer.push_back(ptr[i]);
}
}
};

template <>
struct ToBuffer<std::string> {
void operator()(buffer::Type& buffer, const std::string& src) {
ToBuffer<buffer::StrSizeType>()(buffer, static_cast<buffer::StrSizeType>(src.size()));
for (auto i : src) {
buffer.push_back(i);
}
}
};

template <typename T>
struct ToBuffer<std::vector<T> > {
void operator()(buffer::Type& buffer, const std::vector<T>& src) {
ToBuffer<buffer::ArrSizeType>()(buffer, static_cast<buffer::ArrSizeType>(src.size()));
for (const auto& i : src) {
ToBuffer<T>()(buffer, i);
}
}
};

包装

由于 C++ 中函数无法进行偏特化,因此使用类模板调用时需要指定类型进行调用:

...
tools::ToBuffer<std::string>()(buff, str, 0);
...

所以需要进行一次调整和封装来更好的便于使用:

  • 将转换的整体逻辑迁移至 _impl 命名空间内
  • 在外部的命名空间内实现 SerializedDeserialized 两个接口

因此整体可以这样实现:

namespace cvt {
namespace _impl {
... // 转换的主要逻辑
} // _impl

//////////////////// Binary Cvt Api ////////////////////
struct Binary {
/**
* 单元素序列化
* @tparam T 值类型
* @param buffer 目标空间
* @param value 待转换值
*/
template<typename T>
static void Serialized(_impl::buffer::Type& buffer, const T& value) {
_impl::ToBuffer<T>()(buffer, value);
}

/**
* 单元素反序列化
* @tparam T 目标类型
* @param value 目标值地址
* @param buffer 带转换内存空间
* @return 转换成功与否
*/
template<typename T>
static bool Deserialized(T& value, const _impl::buffer::Type& buffer) {
return _impl::FromBuffer<T>()(value, buffer, 0) >= 0;
}
};

} // cvt

测试

需要编写一个单元测试来验证转换的逻辑是否正确,这里采用 GoogleTest 进行测试。编写主要的转换逻辑函数,将来回转换的数据进行对比即可:

template <typename T>
bool CvtTest(const T& val, T& result) {
std::vector<uint8_t> buffer;
tools::cvt::Binary::Serialized(buffer, val);
return tools::cvt::Binary::Deserialized(result, buffer);
}