fastDDS之Publisher
Posted Captain--Jack
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了fastDDS之Publisher相关的知识,希望对你有一定的参考价值。
发布定义了DataWriter和Publisher的关联。要开始发布数据实例的值,应用程序在Publisher中创建一个新的DataWriter。此DataWriter将绑定到描述正在传输的数据类型的Topic上。与此Topic匹配的远程订阅将能够从DataWriter接收数据值更新。
PublisherQos
Publisher作为一个容器,会在PublisherQos给出的公共配置下对不同的DataWriter对象进行分组。Publisher可以包含不同的主题和数据类型的DataWriter对象,属于同一Publisher的DataWriter对象之间除了PublisherQos之外没有任何其他关系,它们独立工作。PublisherQos控制着Publisher的行为,内部包含以下QosPolicy对象:
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
// Error
return;
// Create a Publisher with default PublisherQos
Publisher* publisher =
participant->create_publisher(PUBLISHER_QOS_DEFAULT);
if (nullptr == publisher)
// Error
return;
// Get the current QoS or create a new one from scratch
PublisherQos qos = publisher->get_qos();
// Modify QoS attributes
// (...)
// Assign the new Qos to the object
publisher->set_qos(qos);
通过DomainParticipant对象的get_default_publisher_qos()成员函数可以获取默认的PublisherQos,常量对象PUBLISHER_QOS_DEFAULT可以作为 create_publisher() 和Publisher::set_qos()承运函数的Qos参数,代表使用当前的默认PublisherQos 。
当系统启动时,默认的PublisherQos等同于默认构造值PublisherQos()。默认的PublisherQos可以在任何时候使用DomainParticipant实例上的set_default_publisher_qos()成员函数进行修改。修改默认的PublisherQos不会影响已经存在的Publisher实例。
set_default_publisher_qos()成员函数也接受常量对象PUBLISHER_QOS_DEFAULT作为输入参数。这将把当前默认的PublisherQos重置为默认的构造值PublisherQos()。
PublisherListener
PublisherListener是一个抽象类,定义了将在响应Publisher的状态更改时触发的回调。默认情况下,这些回调函数都是空的,不做任何事情。用户应该实现该类的特化,重写应用程序上需要的回调。未被重写的回调将维持它们空的实现。
PublisherListener继承自DataWriterListener。因此,它能够对报告给DataWriter的所有事件做出反应。由于事件总是被通知给能够处理事件的最特定的实体监听器,因此只有在触发的DataWriter没有附加监听器,或者DataWriter上的StatusMask禁用了回调时,才会调用PublisherListener从DataWriterListener继承的回调。
PublisherListener不添加任何新的回调。
Publisher创建删除
Publisher隶属于一个DomainParticipant,它的创建是通过DomainParticipant实例上的create_publisher()成员函数完成。
Publisher* create_publisher(const PublisherQos& qos,PublisherListener* listener = nullptr,const StatusMask& mask = StatusMask::all())
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
// Error
return;
// Create a Publisher with default PublisherQos and no Listener
// The value PUBLISHER_QOS_DEFAULT is used to denote the default QoS.
Publisher* publisher_with_default_qos =
participant->create_publisher(PUBLISHER_QOS_DEFAULT);
if (nullptr == publisher_with_default_qos)
// Error
return;
// A custom PublisherQos can be provided to the creation method
PublisherQos custom_qos;
// Modify QoS attributes
// (...)
Publisher* publisher_with_custom_qos =
participant->create_publisher(custom_qos);
if (nullptr == publisher_with_custom_qos)
// Error
return;
// Create a Publisher with default QoS and a custom Listener.
// CustomPublisherListener inherits from PublisherListener.
// The value PUBLISHER_QOS_DEFAULT is used to denote the default QoS.
CustomPublisherListener custom_listener;
Publisher* publisher_with_default_qos_and_custom_listener =
participant->create_publisher(PUBLISHER_QOS_DEFAULT, &custom_listener);
if (nullptr == publisher_with_default_qos_and_custom_listener)
// Error
return;
在创建Publisher的DomainParticipant实例上使用delete_publisher()成员函数,可以删除Publisher。
// Create a DomainParticipant in the desired domain
DomainParticipant* participant =
DomainParticipantFactory::get_instance()->create_participant(0, PARTICIPANT_QOS_DEFAULT);
if (nullptr == participant)
// Error
return;
// Create a Publisher
Publisher* publisher =
participant->create_publisher(PUBLISHER_QOS_DEFAULT);
if (nullptr == publisher)
// Error
return;
// Use the Publisher to communicate
// (...)
// Delete the entities the Publisher created.
if (publisher->delete_contained_entities() != ReturnCode_t::RETCODE_OK)
// Publisher failed to delete the entities it created.
return;
// Delete the Publisher
if (participant->delete_publisher(publisher) != ReturnCode_t::RETCODE_OK)
// Error
return;
DataWriter
DataWriter依附于一个作为其工厂的Publisher。每个DataWriter一创建就会被绑定到单个Topic。此Topic必须在创建DataWriter之前存在,并且必须绑定到DataWriter要发布的数据类型。
Publisher为特定主题创建新的DataWriter的会发布一个具有主题所描述的名称和数据类型的消息。一旦创建了DataWriter,应用程序就可以使用DataWriter上的write()成员函数通知数据值的更改。这些更改将传输到与此发布匹配的所有订阅。
DataWriterQos控制DataWriter的行为。在内部它包含以下QosPolicy对象:
已经创建的DataWriter的QoS值可以使用成员函数DataWriter::set_qos()修改。
// Create a DataWriter with default DataWriterQos
DataWriter* data_writer =
publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT);
if (nullptr == data_writer)
// Error
return;
// Get the current QoS or create a new one from scratch
DataWriterQos qos = data_writer->get_qos();
// Modify QoS attributes
// (...)
// Assign the new Qos to the object
data_writer->set_qos(qos);
默认的DataWriterQos是由Publisher实例的get_default_datawriter_qos()成员函数返回的值。常量对象DATAWRITER_QOS_DEFAULT可作为create_datawriter()或DataWriter::set_qos()成员函数上的QoS参数,表示应使用当前默认的DataWriterQos。
当系统启动时,默认的DataWriterQos相当于默认构造值DataWriterQos()。默认的DataWriterQos可以在任何时候使用Publisher实例上的set_default_datawriter_qos()成员函数进行修改。修改默认的DataWriterQos不会影响已经存在的DataWriter实例。
set_default_datawriter_qos()成员函数也接受特殊值DATAWRITER_QOS_DEFAULT作为输入参数。这将把当前默认的DataWriterQos重置为默认构造值DataWriterQos()。
DataWriterListener
DataWriterListener是一个抽象类,定义了在响应DataWriter上的状态更改时将触发的回调。默认情况下,所有这些回调函数都是空的,不做任何事情。用户应该实现该类的特化,覆盖应用程序上需要的回调。未被覆盖的回调将保持它们的空实现。
DataWriterListener定义了以下回调:
1.on_publication_matched(): DataReader已经找到了主题匹配,具有公共分区和兼容的QoS的DataReader,或者停止与先前认为匹配的DataReader进行匹配。
2.on_offered_deadline_missed():在DataWriterQos上配置的截止时间内,DataWriter无法提供数据。它将在DataWriter未能提供数据的每个截止日期期间和数据实例中被调用。
3.on_offered_incompatible_qos(): DataWriter已经找到了一个与主题匹配的DataReader,并且有一个公共分区,但是请求的QoS与DataWriter上定义的QoS不兼容。
4.on_liveliness_lost(): DataWriter没有按照DataWriterQos上的活动配置执行,因此,DataReader将认为datwriter不再活动。
5.on_unacknowledgement d_sample_removed(): DataWriter删除一个没有被任何DataReader匹配确认的样本。这可能发生在受限的网络中,或者如果发布吞吐量要求太高。这个回调可以用来检测这些情况,这样发布应用程序就可以应用一些解决方案来缓解这个问题,比如降低发布速率。
DataWriter创建删除
DataWriter始终属于一个Publisher。DataWriter的创建是通过Publisher实例上的create_datawriter()成员函数完成的,该函数充当DataWriter的工厂。
DataWriter* create_datawriter(
Topic* topic,
const DataWriterQos& qos,
DataWriterListener* listener = nullptr,
const StatusMask& mask = StatusMask::all())
下面是创建的示例:
// Create a DataWriter with default DataWriterQos and no Listener
// The value DATAWRITER_QOS_DEFAULT is used to denote the default QoS.
DataWriter* data_writer_with_default_qos =
publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT);
if (nullptr == data_writer_with_default_qos)
// Error
return;
// A custom DataWriterQos can be provided to the creation method
DataWriterQos custom_qos;
// Modify QoS attributes
// (...)
DataWriter* data_writer_with_custom_qos =
publisher->create_datawriter(topic, custom_qos);
if (nullptr == data_writer_with_custom_qos)
// Error
return;
// Create a DataWriter with default QoS and a custom Listener.
// CustomDataWriterListener inherits from DataWriterListener.
// The value DATAWRITER_QOS_DEFAULT is used to denote the default QoS.
CustomDataWriterListener custom_listener;
DataWriter* data_writer_with_default_qos_and_custom_listener =
publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT, &custom_listener);
if (nullptr == data_writer_with_default_qos_and_custom_listener)
// Error
return;
在创建DataWriter的Publisher上使用delete_datawriter()成员函数删除DataWriter
// Create a DataWriter
DataWriter* data_writer =
publisher->create_datawriter(topic, DATAWRITER_QOS_DEFAULT);
if (nullptr == data_writer)
// Error
return;
// Use the DataWriter to communicate
// (...)
// Delete the DataWriter
if (publisher->delete_datawriter(data_writer) != ReturnCode_t::RETCODE_OK)
// Error
return;
发布数据
用户通过writer上的write()成员函数通知数据实例的值发生变化。然后,此更改将传达给与DataWriter匹配的每个DataReader。
bool write(void* data);
ReturnCode_t write( void* data,const InstanceHandle_t& handle);
以下是发送例程:
// Register the data type in the DomainParticipant.
TypeSupport custom_type_support(new CustomDataType());
custom_type_support.register_type(participant, custom_type_support.get_type_name());
// Create a Topic with the registered type.
Topic* custom_topic =
participant->create_topic("topic_name", custom_type_support.get_type_name(), TOPIC_QOS_DEFAULT);
if (nullptr == custom_topic)
// Error
return;
// Create a DataWriter
DataWriter* data_writer =
publisher->create_datawriter(custom_topic, DATAWRITER_QOS_DEFAULT);
if (nullptr == data_writer)
// Error
return;
// Get a data instance
void* data = custom_type_support->createData();
// Fill the data values
// (...)
// Publish the new value, deduce the instance handle
if (data_writer->write(data, eprosima::fastrtps::rtps::InstanceHandle_t()) != ReturnCode_t::RETCODE_OK)
// Error
return;
// The data instance can be reused to publish new values,
// but delete it at the end to avoid leaks
custom_type_support->deleteData(data);
通信中间件 Fast DDS 基础概念简述与通信示例
从事汽车软件开发,通信中间件绕不开,当前最火热的无非有 2 种:Some/IP 和 DDS。DDS 是一种分布式通信标准,有很多商业和开源的实现,Fast DDS 是其中的一种。它在 ROS2 中被应用,而 Apollo 中的 CyberRT 框架中也有它的身影。
讲 Fast DDS 之前先讲什么是 DDS。
1. 什么是 DDS ?
DDS 是 OMG 组织发布的一种中间件协议和 API 标准,它将系统的组件集成在一起,提供业务和任务关键型物联网 (IoT) 应用程序所需的低延迟数据连接、极高的可靠性和可扩展架构。
DDS(Data Distribution Service,数据分发服务) 是一种以数据为中心的通信协议,用于分布式软件应用程序通信。
它描述了支持 数据提供者(Data Providers) 和 数据消费者(Data Consumers) 之间通信的通信应用程序编程接口 (API) 和通信语义。
要学习 DDS 就不能忽略它的模型:DCPS(以数据为中心的发布订阅模型)。
DCPS 有 3 个关键实体:
- publication entities: 定义消息生成对象及相关属性
- subscription entities:定义消息消费对象及相关属性
- configuration entities:定义传输相关的属性如 Topic 类型,通信的 QoS(服务质量)。
QoS 是一个非常重要的概念,DDS 使用 QoS 来定义 DDS 实体的行为特征。 QoS 由单独的 QoS 策略(源自 QoSPolicy 的类型的对象)组成。
2. 什么是 Fast-DDS?
DDS 是一套标准,它有很多实现,有商业的,也有开源的。
商业:RTI
开源: Cyclone DDS、Fast-DDS
所以,Fast DDS 是一种开源的 DDS 标准实现,它由 ePromise 公司发布并维护。
3. 什么是 DCPS?
先看看 Fast DDS 官方文档中的一张图。
DDS 是以数据为中心的通信模型,那么这个数据中心是什么呢?
我个人的理解是以 Topic 为代表的消息对象就是 DDS 中的数据中心。
通过 Topic 的纽带关系,可以将数据生成为数据消费对象连接起来,并且可以通过 QoS 执行通信服务质量策略。
在 DCPS 模型中,有 4 个基础的概念:
- Publisher:它是负责创建和配置其实现的 DataWriters 的 DCPS 实体。 DataWriter 是负责实际发布消息的实体。每个 DataWriter 都有一个分配的 Topic,在该 Topic 下发布消息。
- Subscriber:它负责接收在其订阅的 Topic下发布的数据。它为一个或多个 DataReader 对象提供服务,这些对象负责将新数据的可用性传达给应用程序。
- Topic(话题):它是绑定发布和订阅的实体。它在 DDS 域中是唯一的。通过TopicDescription,它允许发布和订阅数据类型的统一。
- Domain(领域):这是用于链接所有发布者和订阅者的概念,属于一个或多个应用程序,它们在不同主题下交换数据。这些参与域的单个应用程序称为 DomainParticipant。 DDS 域由域 ID 标识。 DomainParticipant 定义域 ID 以指定它所属的 DDS 域。具有不同 ID 的两个 DomainParticipants 不知道彼此在网络中的存在。因此,可以创建多个通信通道。这适用于涉及多个DDS应用程序的场景,它们各自的 DomainParticipants 相互通信,但这些应用程序不得干扰。 DomainParticipant 充当其他 DCPS 实体的容器,充当发布者、订阅者和主题实体的工厂,并在域中提供管理服务。
DDS 是一种通过信息,而 DCPS 是一个抽象的模型概念,实际上映射到具体的代码维度,则需要 DomainParticipants 作为容器去承载 Publisher、Subsriber、Topic 等等。
可以这样理解:
- Domain 是项目组
- Topic 是项目生产过程各类相关产出物,如需求文档、概要设计、产品方案、测试用例等
- DomainParticipant 代表项目中不同的参与组织如UI小组、产品小组、开发小组、测试小组等等
- Publisher 和 Subscriber 代表不同小组中对外输出文档和接受信息的窗口角色
- QoS 代表不同的文档在各个小组流转时,双方对于文件传输状态的质量要求
最后说明一下,如何理解 QoS 呢?
你可以联想到,假如你是写代码的,产品经理传递文件给你时的方法和需要你反馈的时效,以及测试人员传递文档给你要求的时效是不一样的。
当然,这里只是类比,为了帮助大家加深理解,真正的 DDS 不一定这样。
4. 什么是 RTPS ?
RTPS 是 Real-Time Publish Subscribe 的缩写,它是 DDS 的通信中间件,是发布-订阅模式,通信能力强大,支持 UDP/IP、TCP 及共享内存。
RTPS 是 DDS 通信的根基,它内部有一样重要的概念:
- Domain
- RTPSParticipant
- Topic
- Attributes
- Change
- History
- RTPSWriter
- RTPSReader
RTPS 中定义了一个 Domain 的概念,它定义了一个单独的通信平面。几个域可以同时独立地共存。一个域包含任意数量的 RTPSParticipant,即能够发送和接收数据的元素。
RTPSParticipants 使用 EndPoint 进行通信:
- RTPSWriter:能够发送数据的 EndPoint 端点。
- RTPSReader:能够接收数据的 EndPoint 端点。
RTPSParticipant 可以有任意数量的写入器和读取器端点。
Topic 定义和标记正在交换的数据。主题不属于特定 DomainParticipant。DomainParticipant 通过 RTPSWriters 对 Topic 发布的数据进行更改,并通过 RTPSReaders 接收与其订阅的 Topic 相关的数据。
在 Fast DDS 中最基础的通信单元称为 Change,它表示在 Topic 下写入的数据的更新。 RTPSReaders/RTPSWriters 在其 History 中注册这些 Change,History 是一种用作最近更改缓存的数据结构。
在 eProsima Fast DDS 的默认配置中,当您通过 RTPSWriter 端点发布更改时,会在后台执行以下步骤:
-
Change 将添加到 RTPSWriter 的 History 中。
-
RTPSWriter 将 Change 发送到它知道的任何 RTPSReaders。
-
接收到数据后,RTPSReaders 用新的 Change 更新他们的 History。
Fast DDS 支持多种配置,允许更改 RTPSWriters/RTPSReaders 的行为。修改 RTPS 实体的默认配置意味着 RTPSWriters 和 RTPSReaders 之间的数据交换流发生变化。此外,通过选择服务质量 (QoS) 策略,您可以通过多种方式影响这些历史缓存的管理方式,但通信循环保持不变。
5. Fast DDS 和 RTPS 关系?
前文说过 RTPS 是 DDS 的基础,实际上完整的 Fast DDS 架构分为 4 层:
- Application Layer
- FAST DDS Layer
- RTPS Layer
- Tranport Layer
Application 指的是采用 Fast DDS API 的各类应用。
DDS Layer 主要定义一个系统中不同的 Domain,在同一个 Domain 下 Topic 按规则通信。
Fast RTPS 是通信协调层,是下层 Transport 的抽象。
Transport 层处理底层 UDP、TCP、SHM(共享内存)。
6. 一个简单的 Fast DDS 示例
要使用 Fast DDS 首先需要安装它,有 bin、Source、docker image 3 种方式,但 bin 和 docker image 需要到官网预留个人信息才能下载,所以,我们可以考虑源码下载。
要下载 3 份源码:
- vendor
- fast cdr
- fast dds
我选择的是在 ubuntu 下用 cmake 方式编译。
可以参考这个地址:DDS安装
当然,还要下载编译 Fast DDS Gen,它是一个工具,能够将 IDL 文件转换成 C++ 代码。
现在考虑写一个最基础的 DDS 应用。
我们首先需要知道一个最小的 DDS 应该包含什么。
- 消息.IDL
- 数据发布器对应的.cpp
- 数据接收器对应的.cpp
- CMakeLists.txt
消息数据通过 IDL 文件定义。
IDL 功能很强大,定义了基础数据类型、数组、窗器、map、枚举、注解等等。[3]
fastddsgen 可以将其转换成 c++ 数据结构体。
通过 fastddsgen 可以转换成 C++ 类。
现在我们可以编写一个简单的 IDL
然后可以通过 fastddsgen 快速生成代码。
最终会自动产生好几个代码文件。
FrankTestDDS.idl 被转换成 FrankTestDDS.cxx 和它应对的 .h 文件。
其它的 FrankTestDDSPubSubMain 之类是 fastddsgen 自动生成的,用于实现发布和订阅演示代码。
我们先观察 CMakeLists.txt。
我们可以发现,整个工程依赖于 fastcdr 和 fastrtps 两个库,之后,代表消息数据经 idl 转换后的 FrankTestDDS.cxx 被编译成库的形式。
这样后面编译的 FrankTestDDS 这个可执行文件就可以链接消息库,保证了应用代码和消息的解耦。
现在我要试验 FastDDS 的发布-订阅功能。我在生成的 FrankTestDDSPublisher.cxx 中添加了一些代码。
st 是我们的消息体,我将其中的 msg 赋值。
同时,我还得修改 FrankTestSubscriber.cxx 的代码。
现在,我们可以编译代码并尝试运行了。
mkdir build
cd build
cmake ..
make
然后,分别在两个终端中运行 publisher 和 subscriber
./FrankTestDDS publisher
./FrankTestDDS subscriber
我们可以看到,通信正常,这也说明我们可以开始通过 fast dds 干活了。
至于高阶内容,需要结合业务实际需求了,比如大量传输摄像头图片、点云数据、控制命令等等。我们得处理好相应的数据结构转换和 QOS 定义。这个在本文中就不展开了。
参考
- https://www.dds-foundation.org/what-is-dds-3/
- https://fast-dds.docs.eprosima.com/en/latest/fastdds/getting_started/definitions.html
- https://www.omg.org/spec/IDL/4.2/PDF
以上是关于fastDDS之Publisher的主要内容,如果未能解决你的问题,请参考以下文章