HI3861学习笔记(24)——MQTT客户端

Posted Leung_ManWah

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HI3861学习笔记(24)——MQTT客户端相关的知识,希望对你有一定的参考价值。

一、MQTT简介

1.1 实现方式

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

  • MQTT服务器的主要工作是数据分发,没有数据保存功能。
  • 可以订阅自己发布的主题,服务器就是回发测试。
  • MQTT让逻辑变得更清晰,需要什么订阅什么。
  • 走标准化流程,解放了私有协议制定、实现、调试、测试一整套复杂的流程。

1.2 Paho MQTT

Eclipse Paho项目是IBM在2011年建立的Eclipse开源项目,该项目包含以C、Java、Python、javascript等语言编写的可用客户端。

二、移植Paho MQTT

2.1 下载源码

嵌入式C语言客户端开源地址https://github.com/eclipse/paho.mqtt.embedded-c

下载之后解压,会得到这么一个文件夹:

  • MQTTClient: 封装MQTTPacket生成的高级别C++客户端程序。
  • MQTTClient-C: 封装MQTTPacket生成的高级别C客户端程序
    • samples目录提供FreeRTOS和Linux两个例程,分别支持FreeRTOS和Linux系统。
    • src目录提供MQTTClient的代码实现能力,以及用于移植到对应平台的网络驱动。
  • MQTTPacket: 提供MQTT数据包的序列化与反序列化,以及部分辅助函数。

2.2 新建BUILD.gn

在鸿蒙系统源码的 third_party 文件夹下创建一个 paho_mqtt 文件夹,然后把解压后的所有文件都拷贝到 paho_mqtt 文件夹下

下一步,我们在 paho_mqtt 文件夹下面新建 BUILD.gn 文件,用来构建编译。

其内容如下:

# Copyright (c) 2020 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import("//build/lite/config/component/lite_component.gni")
import("//build/lite/ndk/ndk.gni")

config("pahomqtt_config") 
    include_dirs = [
        "MQTTPacket/src",
        "MQTTClient-C/src",
        "MQTTClient-C/src/liteOS",
        "//third_party/iot_link/network/mqtt/mqtt_al",
        "//third_party/iot_link/inc",
        "//vendor/hisi/hi3861/hi3861/third_party/lwip_sack/include",
        "//kernel/liteos_m/components/cmsis/2.0",
    ]

    cflags = [ "-Wno-unused-variable" ]
    cflags += [ "-Wno-unused-but-set-variable" ]
    cflags += [ "-Wno-unused-parameter" ]
    cflags += [ "-Wno-sign-compare" ]
    cflags += [ "-Wno-unused-function" ]
    cflags += [ "-Wno-return-type" ]
pahomqtt_sources = [
"MQTTClient-C/src/liteOS/MQTTLiteOS.c",
"MQTTClient-C/src/MQTTClient.c",

"MQTTPacket/src/MQTTConnectClient.c",
"MQTTPacket/src/MQTTConnectServer.c",
"MQTTPacket/src/MQTTDeserializePublish.c",
"MQTTPacket/src/MQTTFormat.c",
"MQTTPacket/src/MQTTPacket.c",
"MQTTPacket/src/MQTTSerializePublish.c",
"MQTTPacket/src/MQTTSubscribeClient.c",
"MQTTPacket/src/MQTTSubscribeServer.c",
"MQTTPacket/src/MQTTUnsubscribeClient.c",
"MQTTPacket/src/MQTTUnsubscribeServer.c",
"MQTTPacket/samples/transport.c",
]

lite_library("pahomqtt_static") 
    target_type = "static_library"
    sources = pahomqtt_sources
    public_configs = [ ":pahomqtt_config" ]


lite_library("pahomqtt_shared") 
    target_type = "shared_library"
    sources = pahomqtt_sources
    public_configs = [ ":pahomqtt_config" ]


ndk_lib("pahomqtt_ndk") 
    if (board_name != "hi3861v100") 
        lib_extension = ".so"
        deps = [
            ":pahomqtt_shared"
        ]
     else 
        deps = [
            ":pahomqtt_static"
        ]
    
    head_files = [
        "//third_party/paho_mqtt"
    ]

2.3 创建LiteOS文件夹

MQTT 已经提供了 Linux 和 Freertos 的移植,这里我们参考,新建文件夹:
third_party\\paho_mqtt\\MQTTClient-C\\src\\liteOS 里面存放两个文件:
MQTTLiteOS.cMQTTLiteOS.h

内容如下:

2.3.1 MQTTLiteOS.c

/*******************************************************************************
 * Copyright (c) 2014, 2015 IBM Corp.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    http://www.eclipse.org/legal/epl-v10.html
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Allan Stockdill-Mander - initial API and implementation and/or initial documentation
 *    Ian Craggs - convert to FreeRTOS
 *******************************************************************************/

#include "MQTTLiteOS.h"


int ThreadStart(Thread* thread, void (*fn)(void*), void* arg)

	int rc = 0;
	thread = thread;

	osThreadAttr_t attr;

    attr.name = "MQTTTask";
    attr.attr_bits = 0U;
    attr.cb_mem = NULL;
    attr.cb_size = 0U;
    attr.stack_mem = NULL;
    attr.stack_size = 2048;
    attr.priority = osThreadGetPriority(osThreadGetId());

    rc = (int)osThreadNew((osThreadFunc_t)fn, arg, &attr);

	return rc;


void TimerInit(Timer* timer)

	timer->end_time = (struct timeval)0, 0;


char TimerIsExpired(Timer* timer)

	struct timeval now, res;
	gettimeofday(&now, NULL);
	timersub(&timer->end_time, &now, &res);
	return res.tv_sec < 0 || (res.tv_sec == 0 && res.tv_usec <= 0);



void TimerCountdownMS(Timer* timer, unsigned int timeout)

	struct timeval now;
	gettimeofday(&now, NULL);
	struct timeval interval = timeout / 1000, (timeout % 1000) * 1000;
	timeradd(&now, &interval, &timer->end_time);



void TimerCountdown(Timer* timer, unsigned int timeout)

	struct timeval now;
	gettimeofday(&now, NULL);
	struct timeval interval = timeout, 0;
	timeradd(&now, &interval, &timer->end_time);



int TimerLeftMS(Timer* timer)

	struct timeval now, res;
	gettimeofday(&now, NULL);
	timersub(&timer->end_time, &now, &res);
	//printf("left %d ms\\n", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000);
	return (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000;





void MutexInit(Mutex* mutex)

	mutex->sem = osSemaphoreNew(1, 1, NULL);


int MutexLock(Mutex* mutex)

	return osSemaphoreAcquire(mutex->sem, LOS_WAIT_FOREVER);


int MutexUnlock(Mutex* mutex)

	return osSemaphoreRelease(mutex->sem);


int linux_read(Network* n, unsigned char* buffer, int len, int timeout_ms)

	struct timeval interval = timeout_ms / 1000, (timeout_ms % 1000) * 1000;
	if (interval.tv_sec < 0 || (interval.tv_sec == 0 && interval.tv_usec <= 0))
	
		interval.tv_sec = 0;
		interval.tv_usec = 100;
	

	setsockopt(n->my_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval));

	int bytes = 0;
	while (bytes < len)
	
		int rc = recv(n->my_socket, &buffer[bytes], (size_t)(len - bytes), 0);
		if (rc == -1)
		
			if (errno != EAGAIN && errno != EWOULDBLOCK)
			  bytes = -1;
			break;
		
		else if (rc == 0)
		
			bytes = 0;
			break;
		
		else
			bytes += rc;
	
	return bytes;



int linux_write(Network* n, unsigned char* buffer, int len, int timeout_ms)

	struct timeval tv;

	tv.tv_sec = 0;  /* 30 Secs Timeout */
	tv.tv_usec = timeout_ms * 1000;  // Not init'ing this can cause strange errors

	setsockopt(n->my_socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,sizeof(struct timeval));
	int	rc = send(n->my_socket, buffer, len, 0);
	return rc;



void NetworkInit(Network* n)

	n->my_socket = 0;
	n->mqttread = linux_read;
	n->mqttwrite = linux_write;



int NetworkConnect(Network* n, char* addr, int port)

	int type = SOCK_STREAM;
	struct sockaddr_in address;
	int rc = -1;
	sa_family_t family = AF_INET;
	struct addrinfo *result = NULL;
	struct addrinfo hints = 0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL;

	if ((rc = getaddrinfo(addr, NULL, &hints, &result)) == 0)
	
		struct addrinfo* res = result;

		/* prefer ip4 addresses */
		while (res)
		
			if (res->ai_family == AF_INET)
			
				result = res;
				break;
			
			res = res->ai_next;
		

		if (result->ai_family == AF_INET)
		
			address.sin_port = htons(port);
			address.sin_family = family = AF_INET;
			address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
		
		else
			rc = -1;

		freeaddrinfo(result);
	

	if (rc == 0)
	
		n->my_socket = socket(family, type, 0);
		if (n->my_socket != -1)
			rc = connect(n->my_socket, (struct sockaddr*)&address, sizeof(address));
		else
			rc = -1;
	

	return rc;



void NetworkDisconnect(Network* n)

	close(n->my_socket);

2.3.2 MQTTLiteOS.h

/*******************************************************************************
 * Copyright (c) 2014, 2015 IBM Corp.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    http://www.eclipse.org/legal/epl-v10.html
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Allan Stockdill-Mander - initial API and implementation and/or initial documentation
 *******************************************************************************/

#if !defined(MQTTLiteOS_H)
#define MQTTLiteOS_H


#include <sys/types.h>

#if !defined(SOCKET_ERROR)
	/** error in socket operation */
	#define SOCKET_ERROR -1
#endif

#if defined(WIN32)
/* default on Windows is 64 - increase to make Linux and Windows the same */
#define FD_SETSIZE 1024
#include <winsock2.h>
#include <ws2tcpip.h>
#define MAXHOSTNAMELEN 256
#define EAGAIN WSAEWOULDBLOCK
#define EINTR WSAEINTR
#define EINVAL WSAEINVAL
#define EINPROGRESS WSAEINPROGRESS
#define EWOULDBLOCK WSAEWOULDBLOCK
#define ENOTCONN WSAENOTCONN
#define ECONNRESET WSAECONNRESET
#define ioctl ioctlsocket
#define socklen_t int
#else
#define INVALID_SOCKET SOCKET_ERROR
#include <sys/socket.h>
#include <sys/param.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#endif

#if defined(WIN32)
#include <Iphlpapi.h>
#else
#include <sys/ioctl.h>
#include <net/if.h>
#endif

#include "ohos_init.h"
#include "cmsis_os2.h"

#include "lwip/ip_addr.h"
#include "lwip/netifapi.h"

#include "lwip/sockets.h"

#define MQTT_TASK

typedef struct Thread

	osThreadId_t task;
 Thread;

int ThreadStart(Thread*, void (*fn)(void*), void* arg);

typedef struct Timer

	struct timeval end_time;
 Timer;

typedef struct Mutex

	osSemaphoreId_t sem;
 Mutex;

void MutexInit(Mutex*);
int MutexLock(Mutex*);
int MutexUnlock(Mutex*);

void TimerInit(Timer*);
char TimerIsExpired(Timer*);
void TimerCountdownMS(Timer*, unsigned int);
void TimerCountdown(Timer*, unsigned int);
int TimerLeftMS(Timer*);

typedef struct Network

	int my_socket;
	int (*mqttread) (struct Network*, unsigned char*, int, int);
	int (*mqttwrite) (struct Network*, unsigned char*, int, int);
 Network;

int linux_read(Network*, unsigned char*, int, int);
int linux_write(Network*, unsigned char*, int, int);

 void NetworkInit(Network*);
 int NetworkConnectHI3861学习笔记(19)——WiFi接口使用(STA和AP模式)

HI3861学习笔记(20)——TCP客户端

HI3861学习笔记(22)——UDP客户端

HI3861学习笔记(22)——UDP客户端

HI3861学习笔记(20)——TCP客户端

HI3861学习笔记(11)——GPIO输出接口使用