流数据接入之WebSocket

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了流数据接入之WebSocket相关的知识,希望对你有一定的参考价值。

参考技术A

之前我们已经讲述了流处理流模型的web界面配置介绍https://blog.csdn.net/supermapsupport/article/details/94003055,本篇文章将介绍接入websocket数据的具体配置使用

接收器配置

在流处理模型编辑中,将websocket接收器拖拽出来,点击模块进行编辑接收器配置

元数据选择StreamingMetadata,进行元数据的结构配置
epsg:数据投影的epsgcode
id字段名:数据对象的唯一标识
接收的数据类型选择,目前流处理模型只支point类型
时间格式:可选,如果带了时间字段,可以设置对应的时间格式

过滤器、转换器(可选)

过滤器和转换器是属于可选配置,可以根据自己的需要进行相应的配置
这里我们选择一个字段添加转换器作为示例
字段添加位置:即字段插入的位置
添加字段名称:插入的字段名称
添加字段类型
运算表达式:字段值是根据运算表达式运算结果得出的

发送器配置

根据需要将对应的发送器拖拽到中间,我们这里选择的是websocket发送器
结果信息格式选择,这里也有三中格式供选择SVFormatter,JsonFormatter,GeoJsonFormatter

websocket服务地址,可以选择将数据推送到其他的websocket服务上,也可以选择推送到iServer的流数据服务中,这里我们选择填写我们事先发布的流数据服务地址(流服务可以在快速发布中选择进行发布),注意的是,需要是服务地址后面加上token

验证

我们可以在发布的流数据服务的订阅页面,点击订阅进行验证是否成功

"type":"Feature","properties":"XX":106.67124,"destinationLabel":"CDG","originLabel":"AUH","x":53.33562,"y":25.71172,"id":"ETD37","geometry":"type":"Point","coordinates":[53.33562,25.71172]

以上就是流数据接入Websocket的配置内容。后面我们还将介绍更多的流数据接入数据格式

如何将 websocket 二进制消息作为流发送到 Google Speech API?

【中文标题】如何将 websocket 二进制消息作为流发送到 Google Speech API?【英文标题】:How to send websocket binary messages as stream to Google Speech API? 【发布时间】:2017-03-09 20:11:19 【问题描述】:

我正在尝试将 websocket 连接的音频流发送到 Google Speech API。 websocket 以 20ms 的增量发送二进制消息。它以增量方式发送它使我相信我将不得不以某种方式临时读取数据并将其写入本地文件以避免终止与 Google 的连接。然而,这并不理想。

有没有办法将 websocket 流直接通过管道传输到recognizeStream

Google streamingRecognize 文档中的示例:

const request = 
   config: 
      encoding: encoding,
      sampleRate: sampleRate
   
;

const recognizeStream = speech.createRecognizeStream(request)
  .on('error', console.error)
  .on('data', (data) => process.stdout.write(data.results));

record.start(
  sampleRate: sampleRate,
  threshold: 0
).pipe(recognizeStream);

Websocket 连接:

var HttpDispatcher = require('httpdispatcher');
var dispatcher     = new HttpDispatcher();
var WebSocketServer = require('websocket').server;


var server = http.createServer(handleRequest);

var wsServer = new WebSocketServer(
    httpServer: server,
    autoAcceptConnections: true,

);

function handleRequest(request, response)
    try 
        //log the request on console
        console.log(request.url);
        //Dispatch
        dispatcher.dispatch(request, response);
     catch(err) 
        console.log(err);
    



wsServer.on('connect', function(connection) 
    console.log((new Date()) + ' Connection accepted' + ' - Protocol Version ' + connection.webSocketVersion);
    connection.on('message', function(message) 
        if (message.type === 'utf8') 
            console.log(message.utf8Data);
        

        else if (message.type === 'binary') 

          //Send to Google Speech API by passing into recognizeStream

        
    );

    connection.on('close', function(reasonCode, description) 
        console.log((new Date()) + ' Peer ' + connection.remoteAddress + ' disconnected.');
    );


);

【问题讨论】:

【参考方案1】:

这其实很简单。如此简单,以至于我因为没有看到它而感到有点不好意思。根据代码在 OP 中的确切编写方式,这完美地工作:

else if (message.type === 'binary') 

  //Send to Google Speech API by passing into recognizeStream
  recognizeStream.write(message.binaryData)


【讨论】:

【参考方案2】:

最好的解决方案是使用专门的流解决方案而不是自己做,这将处理所有缓冲区并为您提供适合 Google Speech API 的稳定流。尝试使用类似的东西,

https://www.npmjs.com/package/websocket-stream

【讨论】:

以上是关于流数据接入之WebSocket的主要内容,如果未能解决你的问题,请参考以下文章

Apache NiFi之Kafka流数据到HBase

WindowsLinuxARMAndroidiOS全平台支持的RTMP推流组件EasyRTMP-iOS如何接入软编码?

限流nginx接入层限流

TSINGSEE青犀视频使用海康硬盘录像机接入摄像机rtsp流步骤介绍

可视化限流管理,Sentinel 控制台启动和接入

Django之网络请求与响应