HI3861学习笔记(24)——MQTT客户端
Posted Leung_ManWah
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HI3861学习笔记(24)——MQTT客户端相关的知识,希望对你有一定的参考价值。
一、MQTT简介
1.1 实现方式
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
- MQTT服务器的主要工作是数据分发,没有数据保存功能。
- 可以订阅自己发布的主题,服务器就是回发测试。
- MQTT让逻辑变得更清晰,需要什么订阅什么。
- 走标准化流程,解放了私有协议制定、实现、调试、测试一整套复杂的流程。
1.2 Paho MQTT
Eclipse Paho项目是IBM在2011年建立的Eclipse开源项目,该项目包含以C、Java、Python、javascript等语言编写的可用客户端。
二、移植Paho MQTT
2.1 下载源码
嵌入式C语言客户端开源地址:https://github.com/eclipse/paho.mqtt.embedded-c
下载之后解压,会得到这么一个文件夹:
- MQTTClient: 封装MQTTPacket生成的高级别C++客户端程序。
- MQTTClient-C: 封装MQTTPacket生成的高级别C客户端程序
- samples目录提供FreeRTOS和Linux两个例程,分别支持FreeRTOS和Linux系统。
- src目录提供MQTTClient的代码实现能力,以及用于移植到对应平台的网络驱动。
- MQTTPacket: 提供MQTT数据包的序列化与反序列化,以及部分辅助函数。
2.2 新建BUILD.gn
在鸿蒙系统源码的 third_party 文件夹下创建一个 paho_mqtt 文件夹,然后把解压后的所有文件都拷贝到 paho_mqtt 文件夹下
下一步,我们在 paho_mqtt 文件夹下面新建 BUILD.gn
文件,用来构建编译。
其内容如下:
# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import("//build/lite/config/component/lite_component.gni")
import("//build/lite/ndk/ndk.gni")
config("pahomqtt_config")
include_dirs = [
"MQTTPacket/src",
"MQTTClient-C/src",
"MQTTClient-C/src/liteOS",
"//third_party/iot_link/network/mqtt/mqtt_al",
"//third_party/iot_link/inc",
"//vendor/hisi/hi3861/hi3861/third_party/lwip_sack/include",
"//kernel/liteos_m/components/cmsis/2.0",
]
cflags = [ "-Wno-unused-variable" ]
cflags += [ "-Wno-unused-but-set-variable" ]
cflags += [ "-Wno-unused-parameter" ]
cflags += [ "-Wno-sign-compare" ]
cflags += [ "-Wno-unused-function" ]
cflags += [ "-Wno-return-type" ]
pahomqtt_sources = [
"MQTTClient-C/src/liteOS/MQTTLiteOS.c",
"MQTTClient-C/src/MQTTClient.c",
"MQTTPacket/src/MQTTConnectClient.c",
"MQTTPacket/src/MQTTConnectServer.c",
"MQTTPacket/src/MQTTDeserializePublish.c",
"MQTTPacket/src/MQTTFormat.c",
"MQTTPacket/src/MQTTPacket.c",
"MQTTPacket/src/MQTTSerializePublish.c",
"MQTTPacket/src/MQTTSubscribeClient.c",
"MQTTPacket/src/MQTTSubscribeServer.c",
"MQTTPacket/src/MQTTUnsubscribeClient.c",
"MQTTPacket/src/MQTTUnsubscribeServer.c",
"MQTTPacket/samples/transport.c",
]
lite_library("pahomqtt_static")
target_type = "static_library"
sources = pahomqtt_sources
public_configs = [ ":pahomqtt_config" ]
lite_library("pahomqtt_shared")
target_type = "shared_library"
sources = pahomqtt_sources
public_configs = [ ":pahomqtt_config" ]
ndk_lib("pahomqtt_ndk")
if (board_name != "hi3861v100")
lib_extension = ".so"
deps = [
":pahomqtt_shared"
]
else
deps = [
":pahomqtt_static"
]
head_files = [
"//third_party/paho_mqtt"
]
2.3 创建LiteOS文件夹
MQTT 已经提供了 Linux 和 Freertos 的移植,这里我们参考,新建文件夹:
third_party\\paho_mqtt\\MQTTClient-C\\src\\liteOS 里面存放两个文件:
MQTTLiteOS.c
和 MQTTLiteOS.h
内容如下:
2.3.1 MQTTLiteOS.c
/*******************************************************************************
* Copyright (c) 2014, 2015 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander - initial API and implementation and/or initial documentation
* Ian Craggs - convert to FreeRTOS
*******************************************************************************/
#include "MQTTLiteOS.h"
int ThreadStart(Thread* thread, void (*fn)(void*), void* arg)
int rc = 0;
thread = thread;
osThreadAttr_t attr;
attr.name = "MQTTTask";
attr.attr_bits = 0U;
attr.cb_mem = NULL;
attr.cb_size = 0U;
attr.stack_mem = NULL;
attr.stack_size = 2048;
attr.priority = osThreadGetPriority(osThreadGetId());
rc = (int)osThreadNew((osThreadFunc_t)fn, arg, &attr);
return rc;
void TimerInit(Timer* timer)
timer->end_time = (struct timeval)0, 0;
char TimerIsExpired(Timer* timer)
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&timer->end_time, &now, &res);
return res.tv_sec < 0 || (res.tv_sec == 0 && res.tv_usec <= 0);
void TimerCountdownMS(Timer* timer, unsigned int timeout)
struct timeval now;
gettimeofday(&now, NULL);
struct timeval interval = timeout / 1000, (timeout % 1000) * 1000;
timeradd(&now, &interval, &timer->end_time);
void TimerCountdown(Timer* timer, unsigned int timeout)
struct timeval now;
gettimeofday(&now, NULL);
struct timeval interval = timeout, 0;
timeradd(&now, &interval, &timer->end_time);
int TimerLeftMS(Timer* timer)
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&timer->end_time, &now, &res);
//printf("left %d ms\\n", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000);
return (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000;
void MutexInit(Mutex* mutex)
mutex->sem = osSemaphoreNew(1, 1, NULL);
int MutexLock(Mutex* mutex)
return osSemaphoreAcquire(mutex->sem, LOS_WAIT_FOREVER);
int MutexUnlock(Mutex* mutex)
return osSemaphoreRelease(mutex->sem);
int linux_read(Network* n, unsigned char* buffer, int len, int timeout_ms)
struct timeval interval = timeout_ms / 1000, (timeout_ms % 1000) * 1000;
if (interval.tv_sec < 0 || (interval.tv_sec == 0 && interval.tv_usec <= 0))
interval.tv_sec = 0;
interval.tv_usec = 100;
setsockopt(n->my_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval));
int bytes = 0;
while (bytes < len)
int rc = recv(n->my_socket, &buffer[bytes], (size_t)(len - bytes), 0);
if (rc == -1)
if (errno != EAGAIN && errno != EWOULDBLOCK)
bytes = -1;
break;
else if (rc == 0)
bytes = 0;
break;
else
bytes += rc;
return bytes;
int linux_write(Network* n, unsigned char* buffer, int len, int timeout_ms)
struct timeval tv;
tv.tv_sec = 0; /* 30 Secs Timeout */
tv.tv_usec = timeout_ms * 1000; // Not init'ing this can cause strange errors
setsockopt(n->my_socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,sizeof(struct timeval));
int rc = send(n->my_socket, buffer, len, 0);
return rc;
void NetworkInit(Network* n)
n->my_socket = 0;
n->mqttread = linux_read;
n->mqttwrite = linux_write;
int NetworkConnect(Network* n, char* addr, int port)
int type = SOCK_STREAM;
struct sockaddr_in address;
int rc = -1;
sa_family_t family = AF_INET;
struct addrinfo *result = NULL;
struct addrinfo hints = 0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL;
if ((rc = getaddrinfo(addr, NULL, &hints, &result)) == 0)
struct addrinfo* res = result;
/* prefer ip4 addresses */
while (res)
if (res->ai_family == AF_INET)
result = res;
break;
res = res->ai_next;
if (result->ai_family == AF_INET)
address.sin_port = htons(port);
address.sin_family = family = AF_INET;
address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
else
rc = -1;
freeaddrinfo(result);
if (rc == 0)
n->my_socket = socket(family, type, 0);
if (n->my_socket != -1)
rc = connect(n->my_socket, (struct sockaddr*)&address, sizeof(address));
else
rc = -1;
return rc;
void NetworkDisconnect(Network* n)
close(n->my_socket);
2.3.2 MQTTLiteOS.h
/*******************************************************************************
* Copyright (c) 2014, 2015 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander - initial API and implementation and/or initial documentation
*******************************************************************************/
#if !defined(MQTTLiteOS_H)
#define MQTTLiteOS_H
#include <sys/types.h>
#if !defined(SOCKET_ERROR)
/** error in socket operation */
#define SOCKET_ERROR -1
#endif
#if defined(WIN32)
/* default on Windows is 64 - increase to make Linux and Windows the same */
#define FD_SETSIZE 1024
#include <winsock2.h>
#include <ws2tcpip.h>
#define MAXHOSTNAMELEN 256
#define EAGAIN WSAEWOULDBLOCK
#define EINTR WSAEINTR
#define EINVAL WSAEINVAL
#define EINPROGRESS WSAEINPROGRESS
#define EWOULDBLOCK WSAEWOULDBLOCK
#define ENOTCONN WSAENOTCONN
#define ECONNRESET WSAECONNRESET
#define ioctl ioctlsocket
#define socklen_t int
#else
#define INVALID_SOCKET SOCKET_ERROR
#include <sys/socket.h>
#include <sys/param.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#endif
#if defined(WIN32)
#include <Iphlpapi.h>
#else
#include <sys/ioctl.h>
#include <net/if.h>
#endif
#include "ohos_init.h"
#include "cmsis_os2.h"
#include "lwip/ip_addr.h"
#include "lwip/netifapi.h"
#include "lwip/sockets.h"
#define MQTT_TASK
typedef struct Thread
osThreadId_t task;
Thread;
int ThreadStart(Thread*, void (*fn)(void*), void* arg);
typedef struct Timer
struct timeval end_time;
Timer;
typedef struct Mutex
osSemaphoreId_t sem;
Mutex;
void MutexInit(Mutex*);
int MutexLock(Mutex*);
int MutexUnlock(Mutex*);
void TimerInit(Timer*);
char TimerIsExpired(Timer*);
void TimerCountdownMS(Timer*, unsigned int);
void TimerCountdown(Timer*, unsigned int);
int TimerLeftMS(Timer*);
typedef struct Network
int my_socket;
int (*mqttread) (struct Network*, unsigned char*, int, int);
int (*mqttwrite) (struct Network*, unsigned char*, int, int);
Network;
int linux_read(Network*, unsigned char*, int, int);
int linux_write(Network*, unsigned char*, int, int);
void NetworkInit(Network*);
int NetworkConnectHI3861学习笔记(19)——WiFi接口使用(STA和AP模式)