分布式系统架构-samgr/source系统服务开发框架基础代码message.c讲解

Posted 心永向阳

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式系统架构-samgr/source系统服务开发框架基础代码message.c讲解相关的知识,希望对你有一定的参考价值。

本篇概述

本篇主要讲解message.h和message_inner.h以及message.c的部分代码,这三个文件代码核心在于消息(msg)的收发,通过请求与响应的方式,利用消息交换进行外部接口对服务于功能的调用管理,请求与响应像是一对相生相融的兄弟,共同决定了系统的安全运作。

代码框架

message
├── 关键结构体
│ └── Identity
│ └── Request
│ └── Response
│ └── Exchange
├── 宏定义
│ └── Handler
│ └── Exchange
├── 消息功能函数
│ └── SAMGR_SendRequest
│ └── SAMGR_SendResponse
│ └── SAMGR_MsgRecv
│ └── SAMGR_FreeMsg
│ └── SharedSend
│ └── FreeReference
│ └── SAMGR_SendSharedRequest
│ └── SAMGR_SendSharedDirectRequest
│ └── SAMGR_SendResponseByIdentity

消息管理流程图

message.h

关键结构体

关键的结构体包括:

  • Identity:它决定了请求和响应对应服务或功能的id。
  • Request :其中包括了请求的消息id与数据内容。
  • Response :主要包含了响应的内容。
  • ==Exchange ==:这里尤为强调exchange,因为它是消息管理的基础,函数对请求与响应的功能性调用都是以exchange结构体内容为基础的。
/**
 * @brief Identifies a service and feature.
 * 概述:标识服务和特性
 * You can use this structure to identity a {@link IUnknown} feature to which messages will be
 * sent through the asynchronous function of {@link IUnknown}. \\n
 * 你可以使用这个结构去标识一个功能,这个功能的信号将通过异步功能IUnknown发送
 */
struct Identity {
    /** Service ID 服务ID*/
    int16 serviceId;
    /** Feature ID 功能ID*/
    int16 featureId;
    /** Message queue ID  消息队列ID*/
    MQueueId queueId;
};

/**
 * @brief Defines a request.
 * 概述:定义一个请求
 * You can use this structure to define the request that will be sent to a feature through the
 * asynchronous function of {@link IUnknown}. \\n
 * 你可以使用这个框架来定义将通过异步IUnknown接口发送给一个功能的请求
 * Request, which is data that is packed to send to a feature. \\n
 * 请求是被打包发送给一个feature(功能、特征)的数据
 * If the data is not empty and the length is not 0, the system automatically releases the data. \\n
 * 如果数据不为空且长度不为0,系统将自动释放数据
 */
struct Request {
    /** Message ID 消息ID*/
    int16 msgId;
    /** Data length 消息长度*/
    int16 len;
    /** Data content 数据内容*/
    void *data;
    /** Message value, which is defined by developers 被开发者定义的消息值*/
    uint32 msgValue;
};

/**
 * @brief Defines a response.
 * 概述:定义一个响应
 * This structure is used to send a response after the message processing function of a service
 * or feature processes a request. \\n
 * 这个框架是用来在一个服务或功能处理一个请求的消息处理功能之后发送的一个响应
 * If the data is not empty and the length is not 0, the system automatically releases the data. \\n
 * 如果数据不为空且长度不为0,系统将自动释放数据
 */
struct Response {
    /** Data content 响应的数据内容*/
    void *data;
    /** Data length 响应的数据长度*/
    int16 len;
};

struct Exchange {
    Identity id; /**< The target service or feature identity. 目标服务或功能的id*/
    Request request;
    Response response;
    short type;  /**< The exchange type. 交换类型*/
    Handler handler;   /**< async response or immediately request callback function 异步响应或立即请求的回调函数*/
    uint32 *sharedRef; /**< use to share the request and response for saving memory 用于共享请求和响应以节约内存*/
};

函数声明

其中,最重要的函数莫过于:SAMGR_SendRequest()和SAMGR_SendResponse(),它们的注解与声明如下:

/**
 * @brief Sends a request to a service or feature of a specified identity.
 * 概述:向一个服务或功能的特定id发送一个请求
 * This function is called by a service to send messages to its own features through the
 * asynchronous function of {@link IUnknown}. \\n
 *
 * @param identity Indicates the pointer to the ID of the feature or service that processes
 * the message.
 * identity代表指向一个服务或功能的id,它会处理这条消息
 * @param request Indicates the pointer to the request.
 * request代表指向请求的指针
 * @param handler Indicates the function handling the response. If the value is <b>NULL</b>,
 * no response is required.
 * handler代表处理响应的功能,如果这个值为空则没有响应被请求
 * @return Returns <b>EC_SUCCESS</b> if the request is sent successfully; returns other error codes
 * if the request fails to be sent. The caller needs to release the memory applied in the request.
 * 如果请求被成功发送将会返回EC_SUCCESS,返回错误代码如果请求发送失败,调用者需要释放请求应用的内存空间
 * @since 1.0
 * @version 1.0
 */
int32 SAMGR_SendRequest(const Identity *identity, const Request *request, Handler handler);

/**
 * @brief Sends a response after processing a request.
 * 概述:在处理请求后发送一个响应
 * This function is called to send a response after processing a request by {@link MessageHandle}
 * of a service or {@link OnMessage} of a feature. \\n
 *
 * @param request Indicates the pointer to the original request.
 * request代表指向原始请求的指针
 * @param response Indicates the pointer to the response content.
 * response代表响应内容的指针
 * @return Returns <b>EC_SUCCESS</b> if the response is sent successfully; returns other error
 * codes if the response fails to be sent.
 * 如果响应被成功发送将会返回EC_SUCCESS,返回错误代码如果请求发送失败
 * @attention
 *  <ul><li>This function can be called only in {@link MessageHandle} or {@link OnMessage}. </li>
 * 这个函数只能在MessageHandle或OnMessage中被调用
 *  <li>The request must be the original one passed from {@link MessageHandle} or
 *  {@link OnMessage}. Otherwise, a memory exception occurs. </li>
 * 请求必须是最初的从MessageHandle或OnMessage发送来的,否则内存例外便会发生
 *  <li> When the caller sends a request, the <b>handler</b> callback function must be carried. </li>
 * 当调用者发送一个请求时,handler回调函数必须被携带
 *  <li>The response is sent to the message queue of the service to which the requester belongs
 *  for processing. Therefore, the requester should wait for the response in non-blocking mode. </li></ul>
 * 响应被发送到请求者所属的服务的消息队列进行处理。因此,请求者应该以非阻塞模式等待响应。
 * @since 1.0
 * @version 1.0
 */
int32 SAMGR_SendResponse(const Request *request, const Response *response);

我们将其余函数声明代码也列在下面供读者参考:

//消息接收
int32 SAMGR_MsgRecv(MQueueId queueId, uint8 *interMsg, uint32 size);

/**
 * The function just release the Message->data and Message->sharedRef(use free), not release the msg entry.
 * 这个函数仅仅释放Message->data和Message->sharedRef(use free),不释放消息入口
 * If you alloc the msg on the heep, you should release it by yourself, you`d better alloc on the stack.
 * 如果你在堆上分配了消息空间,你需要自行释放它,你最好在栈中分配消息空间
 * The function will be called automatically.
 * 这个函数将被自动调用
 * Do not call this function manually, except the SM_SendRequest return error!
 * 不要人为地调用这个函数,除非SM_SendRequest返回错误代码
 **/
//消息释放
int32 SAMGR_FreeMsg(Exchange *exchange);

/**
 * @brief Sends a request to multiple services or features to save memory.
 *
 * This function is used to publish topics for the {@link Broadcast} service to broadcast messages. \\n
 *
 * @param identity Indicates the pointer to the IDs of services or features, to which requests
 * are sent.
 * @param request Indicates the pointer to the request.
 * @param token Indicates the pointer to reference counting.
 * @param handler Indicates the function handling the response. If the value is <b>NULL</b>,
 * no response is required.
 * @retval Returns the token if the request is sent successfully; returns <b>NULL</b> if the
 * request fails to be sent.
 * @attention
 *  <ul><li>Ensure that the thread specified by <b>identity</b> processes the message after
 *  all messages are sent. Common practice: Add a lock before sending a request and add
 *  the same lock during processing. </li>
 *  <li>If <b>NULL</b> is returned, the caller needs to release the memory of the request. </li></ul>
 * @since 1.0
 * @version 1.0
 */
uint32 *SAMGR_SendSharedRequest(const Identity *identity, const Request *request, uint32 *token, Handler handler);

/**
 * @brief Sends a request and response of a caller to the feature thread. The handler is directly
 * called to process the request and response without using the message processing functions.
 * (Customized function for the broadcast service)
 *
 * This function is used to publish topics for the {@link Broadcast} service to broadcast messages. \\n
 * The value of reference counting is incremented by one each time this function is called. \\n
 *
 * @param id Indicates the pointer to the IDs of services or features, to which the request and
 * response are sent.
 * @param request Indicates the pointer to the request.
 * @param resp Indicates the pointer to the response.
 * @param ref Indicates the reference counting.
 * @param handler Indicates the function for handling the request and response. This parameter
 * cannot be <b>NULL</b>.
 * @return Returns <b>EC_SUCCESS</b> if the request and response are sent successfully; returns
 * other error codes if the request and response fail to be sent.
 * @attention
 *  <ul><li>Ensure that the thread specified by <b>identity</b> processes the message after all
 *  messages are sent. Common practice: Add a lock before sending a request and add the same lock
 *  during processing. </li>
 *  <li>If <b>NULL</b> is returned, the caller needs to release the memory of the request and
 *  response. </li>
 *  <li>If the response changes each time when a request is sent, ensure that the response
 *  will not be released. (Set <b>len</b> to <b>0</b>, the <b>data</b> of response will be
 *  the resident memory.) </li></ul>
 * @since 1.0
 * @version 1.0
 */
int32 SAMGR_SendSharedDirectRequest(const Identity *id, const Request *req, const Response *resp, uint32 **ref,
                                    Handler handler);
/**
 * @brief Sends a response to a specified service or feature after processing the original request.
 * (Customized function for <b>bootstrap</b>)
 *
 * This function is called to send a response after processing a request by {@link MessageHandle}
 * of a service or {@link OnMessage} of a feature. \\n
 * This function can be customized to implement phased startup of different types of services. \\n
 *
 * @param id Indicates the pointer to the ID of a service or feature. The response is sent to the
 * thread of the service or feature for processing.
 * @param request Indicates the pointer to the original request.
 * @param response Indicates the pointer to the response content.
 * @return Returns <b>EC_SUCCESS</b> if the response is sent successfully; returns other error
 * codes if the response fails to be sent.
 * @attention
 *  <ul><li>This function can be called only in <b>MessageHandle</b> or <b>OnMessage</b>. </li>
 *  <li>The request must be the original one passed from <b>MessageHandle</b> or <b>OnMessage</b>.
 *  Otherwise, a memory exception occurs. </li>
 *  <li> When the caller sends a request, the <b>handler</b> callback function must be carried. </li>
 *  <li>The response is sent to the message queue of a specified ID for processing. Therefore,
 *  wait for the response in non-blocking mode. </li></ul>
 * @since 1.0
 * @version 1.0
 */
int32 SAMGR_SendResponseByIdentity(const Identity *id, const Request *request, const Response *response);

static int32 SharedSend(MQueueId queueId, Exchange *exchange, int initRef);
static BOOL FreeReference(Exchange *exchange);

message.c

发送请求与响应

详细代码及注解如下:

//定义发送请求函数
int32 SAMGR_SendRequest(const Identity *identity, const Request *request, Handler handler)
{
    if (request == NULL || identity == NULL) {
        return EC_INVALID;
    }
    //初始化消息交换结构体
    Exchange exchange = {*identity, *request, {NULL, 0}, MSG_NON, handler, NULL};
    //将交换消息需要的目标服务或功能id的队列id置空
    exchange.id.queueId = NULL;
    //如果处理器的值非空则重新赋值队列id以及交换类型
    if (handler != NULL) {
        exchange.id.queueId = SAMGR_GetCurrentQueueID();
        exchange.type = MSG_CON;
    }

    //将id队列中的执行信息压入exchange中
    return QUEUE_Put(identity->queueId, &exchange, 0, DONT_WAIT);
}

//定义发送响应函数
int32 SAMGR_SendResponse(const Request *request, const Response *response)
{
    // we need send the default the con message or not?
    if (request == NULL) {
        return EC_INVALID;
    }

    //获取交换请求对象
    Exchange *exchange = GET_OBJECT(request, Exchange, request);
    //交换类型一定要是需要被确认的请求
    if (exchange->type != MSG_CON) {
        return EC_INVALID;
    }

    if (exchange->handler == NULL) {
        return EC_SUCCESS;
    }
    //对交换类型、响应数据及长度进行初始化
    exchange->type = MSG_ACK;
    exchange->response.data = NULL;
    exchange->response.len = 0;
    //将响应的内容赋值到exchange结构体中去
    if (response != NULL) {
        exchange->response = *response;
    }

    // If there is no task queue, we will call the response processor in current task.
    //如果没有任务队列,则调用当前任务中的响应处理器。
    if (exchange->id.queueId == NULL) {
        exchange->handler(&exchange->request, &exchange->response);
        return EC_SUCCESS;
    }

    // Send back to the origin to process the task.
    // 如果上述情况都不满足则进行发送分享,这里的含义是将任务发送回原点以处理任务,即重置任务。
    int32 ret = SharedSend(exchange->id.queueId, exchange, 1);
    if (ret != EC_SUCCESS) {
        exchange->handler(&exchange->request, &exchange->response);
        (void)FreeReference(exchange);
    }
    return EC_SUCCESS;
}

接收消息与释放消息

详细代码及注解如下:

//定义消息接收函数
int32 SAMGR_MsgRecv(MQueueId queueId, uint8 *interMsg, uint32 size)
{
    if (queueId == NULL || interMsg == NULL || size == 0) {
        return EC_INVALID;
    }

    //将传入的消息复制入地址为0x00的位置
    if (memset_s(interMsg, size, 0x00, size) != EOK) {
        return EC_FAILURE;
    }

    //在执行队列ID中加入接收到的消息信息
    return QUEUE_Pop(queueId, interMsg, 0, WAIT_FOREVER);
}

//定义释放消息
int32 SAMGR_FreeMsg(Exchange *exchange)
{
    //检查exchange的索引是否需要置空,调用函数以达成free索引的功能
    if (!FreeReference(exchange)) {
        return EC_SUCCESS;
    }
    //检查请求并置空请求数据
    if (exchange->request.len > 0) {
        SAMGR_Free(exchange->request.data);
        exchange->request.data = NULL;
    }
    //检查响应并置空响应数据
    if (exchange->response.len > 0) {
        SAMGR_Free(exchange->response.data);
        exchange->response.data = NULL;
    }
    return EC_SUCCESS;
}

共享

我们可以利用共享消息来节省一定的时间效率与空间效率,详细代码及注解如下:

//发送共享请求
uint32 *SAMGR_SendSharedRequest(const Identity *identity, const Request *request, uint32 *token, Handler handler)
{
    if (identity == NULL || request == NULL) {
        return NULL;
    }
    Exchange exchange = {*identity, *request, {NULL, 0}, MSG_NON, handler, token};
    exchange.type = (handler == NULL) ? MSG_NON : MSG_CON;
    exchange.id.queueId = NULL;
    int32 err = SharedSend(identity->queueId, &exchange, 0);
    if (err != EC_SUCCESS) {
        HILOG_ERROR(HILOG_MODULE_SAMGR, "SharedSend [%p] failed(%d)!", identity->queueId, err);
        (void)FreeReference(&exchange);
    }
    return exchange.sharedRef;
}

//发送共享直接请求
int32 SAMGR_SendSharedDirectRequest(const Identity *id, const Request *req, const Response *resp, uint32 **ref,
                                    Handler handler)
{
    if (handler == NULL || ref == NULL) {
        return EC_INVALID;
    }

    Exchange exchange = {0};
    if (req != NULL) {
        exchange.request = *req;
    }

    if (resp != NULL) {
        exchange.response = *resp;
    }

    exchange.handler = handler;
    exchange.sharedRef = *ref;
    exchange.type = MSG_DIRECT;
    exchange.id = *id;
    exchange.id.queueId = NULL;
    int32 err = SharedSend(id->queueId, &exchange, 0);
    if (err != EC_SUCCESS) {
        HILOG_ERROR(HILOG_MODULE_SAMGR, "SharedSend [%p] failed(%d)!", id->queueId, err);
        (void)FreeReference(&exchange);
    }
    *ref = exchange.sharedRef;
    return err;
}

//通过id发送响应
int32 SAMGR_SendResponseByIdentity(const Identity *id, const Request *request, const Response *response)
{
    // we need send the default the con message or not?
    if (request == NULL || id == NULL) {
        return EC_INVALID;
    }

    Exchange *exchange = GET_OBJECT(request, Exchange, request);
    if (exchange->type == MSG_NON) {
        return EC_INVALID;
    }
    exchange->id.queueId = id->queueId;

    return SAMGR_SendResponse(request, response);
}

//定义共享发送函数
static int32 SharedSend(MQueueId queueId, Exchange *exchange, int initRef)
{
    /* if the msg data and response is NULL, we just direct copy, no need shared the message. */
    //如果消息数据和响应是空值,我们只需要直接复制,不需要分享消息
    if ((exchange->request.data == NULL || exchange->request.len <= 0) &&
        (exchange->response.data == NULL || exchange->response.len <= 0)) {
        return QUEUE_Put(queueId, exchange, 0, DONT_WAIT);
        //这里是直接将消息队列id复制入exchange执行task的id中
    }

    /* 1.add reference */
    MUTEX_GlobalLock();//加入全局互斥锁
    if (exchange->sharedRef == NULL) {
        //加入引用
        exchange->sharedRef = (uint32*)SAMGR_Malloc(sizeof(uint32));
        if (exchange->sharedRef == NULL) {
            MUTEX_GlobalUnlock();
            return EC_NOMEMORY;
        }
        *(exchange->sharedRef) = initRef;
    }
    //地址移1位
    (*(exchange->sharedRef))++;
    MUTEX_GlobalUnlock();
    //将任务队列中的id加入exchange中
    return QUEUE_Put(queueId, exchange, 0, DONT_WAIT);
}

//释放exchange中的索引值
static BOOL FreeReference(Exchange *exchange)
{
    if (exchange == NULL) {
        return FALSE;
    }

    BOOL needFree = TRUE;
    /* 检查共享索引 */
    MUTEX_GlobalLock();
    if (exchange->sharedRef != NULL) {
        //共享索引地址移1位,但不能为0,因此当出现0时需要及时进行重置
        if (*(exchange->sharedRef) > 0) {
            (*(exchange->sharedRef))--;
        }

        if (*(exchange->sharedRef) > 0) {
            needFree = FALSE;
        }
    }
    MUTEX_GlobalUnlock();

    if (needFree) {
        SAMGR_Free(exchange->sharedRef);
        exchange->sharedRef = NULL;
    }
    return needFree;
}

知识补充

在exchange结构体中,我们对The exchange type.即交换类型有详细的枚举类型定义:

enum ExchangeType {
    MSG_EXIT = -1,//退出
    MSG_NON = 0,//不需要确认
    MSG_CON = 1,//需要确认
    MSG_ACK = 2, //应答消息
    MSG_SYNC = 3,//同步消息
    MSG_DIRECT = 4,//直接交换
};

事实上,消息的传导,即响应请求模式是基于COAP协议的。
详情请见:COAP协议简介。

以上是关于分布式系统架构-samgr/source系统服务开发框架基础代码message.c讲解的主要内容,如果未能解决你的问题,请参考以下文章

分布式系统开发微服务架构,基于JavaSE部署环境提供REST微服务

新课重磅发布-Java开发微信朋友圈PC版系统(架构2.0+分布式中间件)

架构设计基础:单服务.集群.分布式,基本区别和联系

分布式系统-秒杀系统架构设计第二章 paxos算法-一致性协议

java 的电商系统的完整源码

10个微服务架构设计的最佳实践