Opensplice 读写数据

Posted hgl0417

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Opensplice 读写数据相关的知识,希望对你有一定的参考价值。

介绍完Topic之后,来讲一下opensplice数据的读/写。在Opensplice Overview中我们讲解了数据的读写,但为了能够充分利用DDS,有必要理解读/写操作和Topic instance生命周期之间的关系。

可以这里需要说明一下,我们可以通过理解面向对象的类/对象,来理解Topics/topic datatypes。与对象一样,instance of topic‘s datatype具有:

-> 唯一的标识、键值;

-> 生命周期。

1. 写数据

可以通过DataWriter隐含的语义隐式管理Topics/topic datatypes的实例生命周期,后者可以通过DataWriter API显式控制它。

(1)Topic instance的生命周期

在了解Topic生命周期之前,先说明一下Topic所有可能状态:

-> ALIVE:如果至少有一个DataWriter已经显式或隐式地(通过写入)注册了它,同时,至少一个DataWriter预留了这个Topic instance 的资源,那么Topic就是Alive。同时每个DataReader在读取这个topic instance的时候会在本地同样预留类似的资源(sample)。

-> NOT_ALIVE_NO_WRITERS:当没有更多的DataWriters来更新Topic instance,并且所有的DataWriters释放(注销)了他们之前注册的Topic instance。同时,之前的DataReader不在期望更新,并且释放(注销)了他们之前在本地的预留的类似的Topic instance。那么这个Topic instance就处在NOT_ALIVE_NO_WRITERS。如果本地一个DataWriter忘记注销它不想更新的DataWriter,那么不仅这个这个DataWriter会泄露(保留)这个topic instance资源,其他所有订阅的DataReader一会泄露(保留)这个topic instance资源。

-> NOT_ALIVE_DISPOSED:如果topic instance被隐式(通过QoS默认设置),或者显式地通过一个指定的DataWriter处理掉(dispose),NOT_ALIVE_DISPOSED意味着系统不在需要这个topic instance,并且将要从存储器中擦除。

NOT_ALIVE_NO_WRITERS与NOT_ALIVE_DISPOSED的区别在于:在所有DataWriter注销topic instance之后,前者依然有效,而后者已经无效了。

如果当前的任务销毁了,并且将来在其他任务中可能调用这个任务中的Topic instance,那么Topic instance的状态将从ALIVE变为NOT_ALIVE_NO_WRITERS,这时如果有其他的任务开始运行这个Topic instance,那么这个Topic instance的状态变为ALIVE。如果当前的Topic instance不想被DDS其他的任务调用,那么需要将Topic instance的状态从ALIVE变为NOT_ALIVE_DISPOSED。

(2)自动的生命周期管理

在Opensplice Overview中的DataWriter是在系统中唯一运行的任务,所以DataCommType是在系统中的Topic instance。当程序终止时那么系统OpenSplice将默认将DataCommType从ALIVE转换为NOT_ALIVE_DISPOSED,因为没有其他的任务写DataCommType。如果将DataCommType注销,那么默认会将Topic instance从ALIVE转换成NOT_ALIVE_NO_WRITERS。因为DataCommType可能会重新访问。

(3)显式的生命周期管理

显式地管理意味着我们可以在编写程序的时候直接操作Topic instance的注册(register)、注销(unregister)以及处置(dispose)。

下面我们通过代码来解释

#include <iostream>
#include <DataComm_DCPS.hpp>
int main(int, char **) {
    dds::domain::DomainParticipant dp(org::opensplice::domain::default_id());
    dds::topic::Topic<DataComm::DataCommType> topic(dp, "DataCommTopic");
    dds::pub::Publisher pub(dp);

    // 避免处理掉注销的topic instance。
    dds::pub::qos::DataWriterQos dwqos = pub.default_datawriter_qos()
    << dds::core::policy::WriterDataLifecycle::ManuallyDisposeUnregisteredInstances();

    
    // 通过Qos创建DataWriter
    dds::pub::DataWriter<DataComm::DataCommType> dw(pub, topic, dwqos);
   // 自动注册状态为0的topic instance,并在将数据写入。
    DataComm::DataCommType data(0, 24.3F, 0.5F);
    dw.write(data);

    DataComm::DataCommType key;
    short id = 1;
    key.id(id);

    // 显式地注册id分别为1,2,3的topic instance,id为0,1,2,3的topic instance的状态为ALIVE。
    dds::core::InstanceHandle h1 = dw.register_instance(key);
    id = 2;
    key.id(id);
    dds::core::InstanceHandle h2 = dw.register_instance(key);
    id = 3;
    key.id(id);
    dds::core::InstanceHandle h3 = dw.register_instance(key);
    // 由于是key topic instance,分别对id为1,2,3的topic instance写入数据。
    dw << DataComm::DataCommType(1, 24.3F, 0.5F);
    dw << DataComm::DataCommType(2, 23.5F, 0.6F);
    dw << DataComm::DataCommType(3, 21.7F, 0.5F);

    // 注销id为1的topic instance,这是id为1的topic instance转换为NOT_ALIVE_NO_WRITERS
    dw.unregister_instance(h1);

    // 处理掉id为2的topic instance,这时id为2的topic instance转换为NOT_ALIVE_DISPOSED
    dw.dispose_instance(h2);

    // 程序结束,id为0,3的topic instance将注销
    // 程序结束,dw将要被销毁。

    return 0;
}

(4)keyless topic life-cycle

由于keyless topic仅能生成一个instance,所以keyless topic instance仅与DataWriter相关。

(5)写操作是否阻塞(Blocking or Non-Blocking Write)

在Opensplice中,写操作是不阻塞的。但是,后面会将到,写操作是可以通过服务来阻塞的。写操作阻塞是为了避免数据丢失。

 

2. 数据访问(Accessing Data)

Opensplice通过内容(content)与状态(state)来选择数据。控制方法主要由两种:read与take(remove from cache)

(1)read与take的比较

Opensplice通过DataReader实现数据访问:包含两种方式:read与take。

DataReader::read方式访问数据,数据不会从cahe中移除,即在整个生命周期,数据始终保持可读。

DataReader::take方式访问数据,数据从cahe中移除,即一旦执行take,那么就不能在继续执行read或者take。

read与take操作使Opensplice作为分布式的cache(distributed cache),或者队列系统(queue system)或者两者皆可。这种结合在其他的中间件是很少见的。

一般,read操作用于状态(state),take操作用于读取事件(event)

(2)数据样本(Data and Meta-Data)

在这里回顾一下,本章开头介绍DataWriter控制topic instance的生命周期(life-cycle),同时DataReader根据topic instance的生命周期(life-cycle)以及一些其他的数据样本(data sample)判断是否可读,并且采用的读取方式(read or take)。所以数据样本(data sample)应该具有相应的SampleInfo,包括:

-> 数据样本状态(Data Sample):READ 以及 NOT_READ。影响这两个状态的因素是数据样本(data sample)是否被读取;

-> 实例状态(Instance State):在topic 生命周期介绍过,分为:ALIVE, NOT_ALIVE_NO_WRITERS, or NOT_ALIVE_DISPOSED;

-> 更新状态(View State):NEW 以及 NOT_NEW。影响这两个状态的是否为第一个数据样本(data sample)。

SampleInfo还包含计数器,可以让我们决定状态转移的次数,比如从ALIVE到NOT_ALIVE_NO_WRITERS的次数。

最后SampleInfo还包含了时间戳(timestamp),用以标记数据是否可用。

(3)数据样本选择(selecting sample)

 对于read或者take操作,数据选择的机制是一样的,仅仅是更换程序中的函数(将read更换为take)。

Opensplice基于状态(state)与内容(content)选择数据样本(selecting sample):

-> 基于状态(state)的选择:由view state, instance state and sample state的值决定。

-> 基于内容(content)的选择:基于样本(sample)的值。

基于状态的选择

在这里用代码说明:(以下的代码read()全部可以替换成take())

dds::sub::LoanedSamples<DataComm::DataCommType> samples;
// any()表示无论view state, instance state and sample state的值是什么,都会读取值。
samples = dr.select().state(dds::sub::status::DataState::any()).read();
// 仅仅读取没有被读取过的数据
samples = dr.select().state(dds::sub::status::SampleState::not_read()).read();
// 读取新的可用数据样本
samples = dr.select().state(dds::sub::status::DataState::new_data()).read();
// 仅读取与首次在系统中出现的数据样本,注意:这种方式只读与获取每个topic instance最新的一次写操作。
dds::sub::status::DataState ds;
ds << dds::sub::status::SampleState::not_read()
   << dds::sub::status::ViewState::new_view()
   << dds::sub::status::InstanceState::alive();
samples = dr.select().state(ds).read();

基于状态的数据样本选择非常适合解决第一次出现在系统中的应用。举个例子,比如一个新的航班进入新的控制区等等。

值得一提的是基于状态选择的省略形式:

// 相当于采用NOT_READ_SAMPLE_STATE, ALIVE_INSTANCE_STATE and ANY_VIEW_STATE 状态读数据样本。
auto samples2 = dr.read();

基于内容的选择

基于内容的选择是通过查询(queries)实现的。与contest filter有些类似,但本质上不同。contest filter是关于控制数据读取器接收的数据:与过滤器不匹配的数据不插入到数据读取器高速缓存中。 另一方面,query是关于选择(已经)在数据读取器高速缓存中的数据。

还是通过代码说明:

// 定义查询(query)的表达式
std::string expression =
    "(data1 NOT BETWEEN (%20.5 AND %21.5))     OR     (data2 NOT BETWEEN (%30 and %50))";
// 定义查询(query)的参数
std::vector<std::string> params = {"20.5", "21.5", "30", "50"};
dds::sub::Query query(dr, expression, params);
auto samples = dr.select().content(query).read();

上述代码的含义

内容查询(query)支持的语法与过滤器表达式的语法相同,这里就不在叙述了(见Opensplice Topic中的表3)。

基于实例的选择

在一些情况下,我们仅想查看来自于特定topic instance的数据。这时会想到使用内容滤波器来选择特定的值。这样做非常没有效率。Opensplice提供了有效的方式,通过每个instance的handler直接查询。代码如下:

DataComm::DataCommType key;
key.id() = 123;
auto handle = dr.lookup_instance(key);

auto samples = dr.select().instance(handle).read();

(4)迭代器或者容器?(Iterators or Containers?)

到现在为止,我们没有对数据进行“存储”。然而,当我们想将数据存进我们自己选择的容器中,我们希望可以用基于迭代器(iterator-based)方式进行read或者take的操作。

基于迭代器(iterator-based)方式的read或者take API支持前向迭代器(forward iterators)同时也支持后向插入迭代器(back-inserting iterators)。read或者take API支持我们选择的任何结构,只要可以定义一个前向或者后向插入迭代器。见如下代码:

// Forward iterator using array.
dds::sub::Sample<DataComm::DataCommType> samples[MAXSAMPLES]; 
unsigned int readSamples = dr.read(&samples, MAXSAMPLES);
// Forward iterator using vector.
std::vector<dds::sub::Sample<DataComm::DataCommType>> fSamples(MAXSAMPLES);
readSamples = dr.read(fSamples.begin(), MAXSAMPLES);
// Back-inserting iterator using vector. 
std::vector<dds::sub::Sample<DataComm::DataCommType>> biSamples;
uint32_t readBiSamples = dr.read(std::back_inserter(biSamples));

上述代码以前向迭代器为例,后向插入迭代器类似。

(5)read或者take是否阻塞?

read或者take是不阻塞的,而且如果没有数据可读取时会立刻返回。

 

3.  同步及异步(Waiting and being Notified)

DDS通过轮询的方式每隔一段时间read或者take topic中的数据。轮询对于一些应用是最合理的方式,但是,有些应用想要数据的可用,或者等待数据的可用,这样就不能采用轮询的方式。DDS支持同步及异步的调整方式,即等待(waitsets)与监听(listener)。

(1)waitsets

waitsets是Opensplice的同步处理机制。

Opensplice对于条件等待(waiting on condition)提供了通用的机制。其中一种机制是ReadConditions,它可用于等待一个或多个DataReader上的可用性数据。代码如下:

// 创建waitset
dds::core::cond::WaitSet ws;
// 为DataReader创建ReadCondition并且配置成基于状态的选择(新的可用数据)
dds::sub::cond::ReadCondition rc(dr, dds::sub::status::DataState::new_data());
// 加入条件
ws += rc;
// 等待新的数据可用 ws.wait(); // 读取数据 auto samples = dr.read(); std::for_each(samples.begin(), samples.end(), [](const dds::sub::Sample<DataComm::DataCommType>& s) {
   std::cout << s.data() << std::endl;
});

以上的代码通过调用Waitset :: wait方法,该方法返回活动条件列表。Waitset :: wait是通过同步数据实现的方式。另一种同步数据的方法是调用Waitset :: dispatch。

Opensplice条件可以与functor对象相关联,然后在触发条件时用于执行特定于应用逻辑。Opensplice事件处理机制允许您将任何想要的事物绑定到事件,这意味着您可以将函数,类方法甚至lambda函数绑定。下面的代码将这种机制附加到waitset上,在这里我们调用Waitset :: dispatch函数,这会导致基础结构在解除阻塞之前自动调用与每个触发条件关联的函数,如使用WaitSet中所示分派到传入的数据。

下面的程序单独演示:

// 创建waitset
dds::core::cond::WaitSet ws;

// 为DataReader创建ReadCondition,同时new_data()基于状态的条件绑定到DataReader()函数,使其可以按照条件执行。
dds::sub::cond::ReadCondition rc(dr,
             dds::sub::status::DataState::new_data(),
             [](const dds::sub::ReadCondition& srcCond) {
                dds::sub::DataReader<DataComm::DataCommType> srcReader = srcCond.data_reader();
                // 读数据
                auto samples = srcReader.read();
                std::for_each(samples.begin(),
                    samples.end(),
                    [](const dds::sub::Sample<DataComm::DataCommType>& s) {
                      std::cout << s.data() << std::endl;
                    });
             });
// 加入条件等待
ws += rc;
// 等待新数据可用 ws.dispatch();

(2)Listener

Listener是Opensplice异步处理机制

OpenSplice通过注册handler通知(notified)数据。我们将处理程序与DataReader通过on_data_available事件连接起来。如下代码:

class DataCommListener :
public dds::sub::NoOpDataReaderListener<DataComm::DataCommType>
{
public:
    virtual void on_data_available(dds::sub::DataReader<DataComm::DataCommType>& dr) {
        auto samples = dr.read();
        std::for_each(samples.begin(), samples.end(),
                      [](const dds::sub::Sample<DataComm::DataCommType>& s) {
                       std::cout << s.data().id() << std::endl;
                     });
    }
};    
DataCommListener listener;
dr.listener(&listener, dds::core::status::StatusMask::data_available());

NoOpDataReaderListener类是通过API实现的一个实用类,这个API提供了listener操作的简单实现。

这里应该注意一下:handler代码应该在middleware的线程中实现,由于listener应该花费尽量少的时间。

 

原创博文,转载请标明出处。

以上是关于Opensplice 读写数据的主要内容,如果未能解决你的问题,请参考以下文章

为 OpenSplice 模板创建容器?

Opensplice DDS 持久数据可以有多大?

Opensplice 无法构建 dcpsisocpp2

是啥导致 OpenSplice 中出现此 Java 错误?

OpenSplice DDS 开源实现支持——数据库事务处理?

Opensplice QoS(Quality of Service)