阿里云物联网智能视频服务接入
Posted GMaya
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了阿里云物联网智能视频服务接入相关的知识,希望对你有一定的参考价值。
物联网视频服务(LinkVisual)支持视频流上云、存储、转发、告警事件等视频基础能力,提供丰富的视频算法以及云边协同(算法云端训练、云端下发、边缘计算推理)服务。旨在帮助视频设备厂商、方案商与服务提供商,快速将存量或者新增的视频设备上云。
文章目录
前言
提示:主要记录物联网智能视频服务的接入过程:
后端服务主要是微服务Springcloud。
云平台主要是阿里云物联网智能视频服务-企业版实例
主要业务场景是,户外版4G摄像头,在无人观看是不产生上行流量,在有人观看是再进行推流查看直播画面,以及支持设备主动抓图。
提示:以下是本篇文章正文内容,下面案例可供参考
一、购买企业版实例
首先在物联网平台-购买企业版实例。下面的视频服务一定要开启
二、使用步骤
1.新增设备
主要在购买的实例上进行新增产品-新增设备,以及设备端烧录,设备端开发一般都摄像头厂商进行对接,此处不在进行记录。
2.服务器端开发
主要记录服务器端与阿里云平台的对接
主动触发IPC设备拍摄图片并上传至云端
官网文档api 可以参考官方对接文档,其他相关接口和此接口对接方式相同,不在粘贴代码了
/**
* 调用该接口查询IPC设备获取的图片信息。。
* @param iotInstanceId 实例id
* @param productKey 产品key
* @param deviceName 设备名称
* @param captureId 图片id
* @return
* @throws Exception
*/
public QueryDevicePictureFileResponseBody.QueryDevicePictureFileResponseBodyData queryDevicePictureFileWithOptions(String iotInstanceId, String productKey, String deviceName, String captureId) throws Exception
// 调用该接口主动触发IPC设备拍摄图片并上传至云端
Client client = getClient();
QueryDevicePictureFileRequest queryLiveStreamingRequest = new QueryDevicePictureFileRequest();
queryLiveStreamingRequest.setDeviceName(deviceName);
queryLiveStreamingRequest.setProductKey(productKey);
queryLiveStreamingRequest.setIotInstanceId(iotInstanceId);
queryLiveStreamingRequest.setCaptureId(captureId);
com.aliyun.teautil.models.RuntimeOptions runtime = new com.aliyun.teautil.models.RuntimeOptions();
try
// 复制代码运行请自行打印 API 的返回值
log.info("调用该接口查询IPC设备获取的图片信息,入参:",JSON.toJSONString(queryLiveStreamingRequest));
QueryDevicePictureFileResponse queryDevicePictureFileResponse = client.queryDevicePictureFileWithOptions(queryLiveStreamingRequest, runtime);
log.info("调用该接口查询IPC设备获取的图片信息,结果:",JSON.toJSONString(queryDevicePictureFileResponse));
// "body":"code":"200","data":"iotId":"Z50Qxs5BSuUoxO06mKRUi3rb00","picCreateTime":1672135564877,"picId":"WGJmWkYxU2dNaE5HSmpEWDJiQjY3T3ZnT2dNdl90LTlSVXhBVjU5SjJ5WS9lZmVmYzc0YmU2OTY0ZWY2YmVjNzg1MWM5MTA5YmY0Y18xNjcyMTM1NTY0ODc3","picUrl":"https://link-vision-picture-sh.oss-cn-shanghai.aliyuncs.com/XbfZF1SgMhNGJjDX2bB67OvgOgMv_t-9RUxAV59J2yY/efefc74be6964ef6bec7851c9109bf4c?Expires=1672139359&OSSAccessKeyId=LTAILduaCDAC561K&Signature=z3jXpII0Hjijbf8%2FGaJv%2FterhAw%3D","thumbUrl":"https://link-vision-picture-sh.oss-cn-shanghai.aliyuncs.com/XbfZF1SgMhNGJjDX2bB67OvgOgMv_t-9RUxAV59J2yY/efefc74be6964ef6bec7851c9109bf4c?Expires=1672139359&OSSAccessKeyId=LTAILduaCDAC561K&Signature=yZw6elXowc%2F2R3GQ753H8OR3E44%3D&x-oss-process=image%2Fauto-orient%2C1%2Fresize%2Cm_lfit%2Cw_400%2Climit_0%2Fquality%2Cq_90","requestId":"DCEAE36F-FBD2-5187-A035-317AFC1380A5","success":true,"headers":"access-control-allow-origin":"*","date":"Tue, 27 Dec 2022 10:09:19 GMT","content-length":"866","x-acs-request-id":"DCEAE36F-FBD2-5187-A035-317AFC1380A5","connection":"keep-alive","content-type":"application/json;charset=utf-8","x-acs-trace-id":"853464dc94d2787fef2e9b5d5fc21556","statusCode":200
QueryDevicePictureFileResponseBody body = queryDevicePictureFileResponse.getBody();
if(body.getSuccess())
return body.getData();
else
log.error("调用该接口查询IPC设备获取的图片信息 有误,", JSON.toJSONString(queryDevicePictureFileResponse));
throw new ServiceWarnException(body.getErrorMessage() == null ? "查询IPC设备获取的图片有误,请联系管理员。" : body.getErrorMessage());
catch (TeaException error)
// 如有需要,请打印 error
log.error("调用该接口查询IPC设备获取的图片信息,", error.message);
catch (Exception _error)
TeaException error = new TeaException(_error.getMessage(), _error);
// 如有需要,请打印 error
log.error("调用该接口查询IPC设备获取的图片信息,", error.message);
return null;
private Client getClient() throws Exception
Config config = new Config().setAccessKeyId(aliiotConfig.getAccessKeyId()).setAccessKeySecret(aliiotConfig.getAccessKeySecret());
config.endpoint = aliiotConfig.getEndpoint();
return new Client(config);
主要引入的包
import com.aliyun.iot20180120.models.BatchGetDeviceStateRequest;
import com.aliyun.iot20180120.models.BatchGetDeviceStateResponse;
import com.aliyun.iot20180120.models.BatchGetDeviceStateResponseBody;
import com.aliyun.linkvisual20180120.Client;
import com.aliyun.linkvisual20180120.models.*;
import com.aliyun.tea.TeaException;
import com.aliyun.teaopenapi.models.Config;
服务器端订阅AMQP
官方文档
官方文档介绍的AmqpClient.java,主要是main方法运行,肯定不适用咱们微服务启动的,所以还需要单独调整或者修改方式,接入自己的代码平台中。
改版后:
AmqpClient.java
package com.dindo.monitoring.mq;
import cn.hutool.core.util.IdUtil;
import com.dindo.monitoring.config.AliiotConfig;
import com.dindo.monitoring.mq.aliiot.JmsConnectionImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
/**
* @author GMaya
* @create 2022/12/28 17:26
* @Description 类描述: TODO 当正式环境使用时, 测试环境不在进行消息订阅,避免和正式环境消息订阅产生冲突
*/
@Slf4j
@Component
public class AmqpClient
@Resource
private AliiotConfig aliiotConfig;
// 告警消息
private static String consumerGroupId_alarm = "xxxxxxxxx";
// 设备运行状态消息
private static String consumerGroupId_run = "xxxxxxxxx";
// 设备基本信息消费组
private static String consumerGroupId_info = "xxxxxxxxxxxx";
private static String clientId = IdUtil.simpleUUID();
private static int connectionCount = 1;
@Resource
@Qualifier("MessageListenerImpl")
private MessageListener messageListenerImpl;
@Resource
private JmsConnectionImpl jmsConnectionImpl;
@PostConstruct
public void init() throws Exception
List<Connection> connections = new ArrayList<>();
List<String> consumerGroupIdList = new ArrayList<>();
consumerGroupIdList.add(consumerGroupId_alarm);
consumerGroupIdList.add(consumerGroupId_run);
consumerGroupIdList.add(consumerGroupId_info);
for (String consumerGroupId : consumerGroupIdList)
//参数说明,请参见AMQP客户端接入说明文档。
for (int i = 0; i < connectionCount; i++)
long timeStamp = System.currentTimeMillis();
//签名方法:支持hmacmd5、hmacsha1和hmacsha256。
String signMethod = "hmacsha1";
//userName组装方法,请参见AMQP客户端接入说明文档。
String userName = clientId + "-" + i + "|authMode=aksign"
+ ",signMethod=" + signMethod
+ ",timestamp=" + timeStamp
+ ",authId=" + aliiotConfig.getAccessKeyId()
+ ",iotInstanceId=" + aliiotConfig.getIotInstanceId()
+ ",consumerGroupId=" + consumerGroupId
+ "|";
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
String signContent = "authId=" + aliiotConfig.getAccessKeyId() + "×tamp=" + timeStamp;
String password = doSign(signContent, aliiotConfig.getAccessKeySecret(), signMethod);
String connectionUrl = "failover:(amqps://" + aliiotConfig.getAmqpHost() + ":5671?amqp.idleTimeout=80000)"
+ "?failover.reconnectDelay=30";
Hashtable<String, String> hashtable = new Hashtable<>();
hashtable.put("connectionfactory.SBCF", connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Destination queue = (Destination) context.lookup("QUEUE");
// 创建连接。
Connection connection = cf.createConnection(userName, password);
connections.add(connection);
((JmsConnection) connection).addConnectionListener(jmsConnectionImpl);
// 创建会话。
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// 创建Receiver连接。
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListenerImpl);
/**
* 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
*/
private String doSign(String toSignString, String secret, String signMethod) throws Exception
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Base64.encodeBase64String(rawHmac);
JmsConnectionImpl.java
package com.dindo.monitoring.mq.aliiot;
import lombok.extern.slf4j.Slf4j;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.springframework.stereotype.Component;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.net.URI;
/**
* @author GMaya
* @create 2022/12/28 18:33
* @Description 类描述:
*/
@Slf4j
@Component
public class JmsConnectionImpl implements JmsConnectionListener
/**
* 连接成功建立。
*/
@Override
public void onConnectionEstablished(URI remoteURI)
log.info("连接成功建立。, remoteUri:", remoteURI);
/**
* 尝试过最大重试次数之后,最终连接失败。
*/
@Override
public void onConnectionFailure(Throwable error)
log.error("尝试过最大重试次数之后,最终连接失败, ", error.getMessage());
/**
* 连接中断。
*/
@Override
public void onConnectionInterrupted(URI remoteURI)
log.info("连接中断。, remoteUri:", remoteURI);
/**
* 连接中断后又自动重连上。
*/
@Override
public void onConnectionRestored(URI remoteURI)
log.info("连接中断后又自动重连上, remoteUri:", remoteURI);
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope)
@Override
public void onSessionClosed(Session session, Throwable cause)
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause)
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause)
MessageListenerImpl.java
package com.dindo.monitoring.mq.aliiot;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.jms.Message;
import javax.jms.MessageListener;
/**
* @author GMaya
* @create 2022/12/28 18:30
* @Description 类描述: 消息消费类,订阅的消费组消息都会到此处
*/
@Component
@Slf4j
public class MessageListenerImpl implements MessageListener
@Resource
private AliiotMessage aliiotMessage;
@Override
public void onMessage(Message message)
try
//1.收到消息之后一定要ACK。
// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
// message.acknowledge();
//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
// processMessage(message);
aliiotMessage.processMessage(message);
catch (Exception e)
log.error("阿里物联网消息推送处理失败", e);
private static void processMessage(Message message)
try
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
log.info("获取的阿里iot推送的相关消息 message"
+ ",\\n topic = " + topic
+ ",\\n messageId = " + messageId
+ ",\\n content = " + content);
catch (Exception e)
log.error("processMessage occurs error ", e);
具体的消费处理类
AliiotMessage.java
package com.dindo.monitoring.mq.aliiot;
import com.alibaba.fastjson
节选自微信公众号:
6. 设备端输出log
-
通过HTTPS认证接入,获取MQTT的用户名和密码

-
MQTT使用获取到的用户名和密钥接入

-
使用MQTT订阅和发布消息
