一种基于消息发布-订阅的观察者模式实现

Posted Mic_chen

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一种基于消息发布-订阅的观察者模式实现相关的知识,希望对你有一定的参考价值。

notice.h
#define OBSERVER_MAX    4


typedef struct notice_msg {
    // public
    char            *sub_name;
    int             cmd;
    int             data_fix;           // 固定数据 
    
    unsigned int    data_ex_len;         // 扩展数据长度,0:没有扩展数据
    void            *pdata_ex;           // 扩展数据指针
}notice_msg_t;

typedef int (*NOTICE_ON_MSG_CB)(notice_msg_t *pmsg);

typedef struct notice_observer {
    ak_queue_t  msg_queue;

}notice_observer_t;


typedef struct notice_subject {
    char *name;
    notice_observer_t* obs_tbl[OBSERVER_MAX];
}notice_subject_t;

  

/******************************************************
*                    Constant         
******************************************************/

/******************************************************
*                    Macro         
******************************************************/
#define NOTICE_MSG_MAX  20

#define CHECK_THIS_IS_NULL() do{if(NULL == this) return -1;}while(0)


/******************************************************
*                    Type Definitions         
******************************************************/


/******************************************************
*               Function Declarations
******************************************************/

// obsever 
int akp_notice_observer_ctor(notice_observer_t *this)
{
    int ret;
    CHECK_THIS_IS_NULL();

    ret = ak_thread_queue_init(&(this->msg_queue), sizeof(notice_msg_t),NOTICE_MSG_MAX);
    if(ret) return -1;
    else  return 0;

}
int akp_notice_observer_dector(notice_observer_t *this)
{
    int ret;
    CHECK_THIS_IS_NULL();

    ret = ak_thread_queue_destroy(&(this->msg_queue));
    if(ret) return -1;
    else  return 0;
}

int akp_notice_on_msg(notice_observer_t *this, NOTICE_ON_MSG_CB fun_cb)
{
    CHECK_THIS_IS_NULL();
    
    notice_msg_t msg;
    ak_thread_queue_wait(&(this->msg_queue), &msg);
    fun_cb(&msg);
    if(msg.data_ex_len > 0 && msg.pdata_ex != NULL) {
        free(msg.pdata_ex);
    }
}



// subject
int akp_notice_subject_ctor(notice_subject_t *this,const char *name)
{
    CHECK_THIS_IS_NULL();
 
    this->name = (char *)name;

    memset(this->obs_tbl,0,sizeof(this->obs_tbl));
    return 0;
}

int akp_notice_subject_dector(notice_subject_t *this)
{
    CHECK_THIS_IS_NULL();
     
    return 0;
}

int akp_notice_add_observer(notice_subject_t *this, notice_observer_t *obs)
{
    CHECK_THIS_IS_NULL();
    
    int i;
    for(i = 0; i< OBSERVER_MAX;i++) {
        if(this->obs_tbl[i] == 0) {
            break;
        }
    }
    if(i >= OBSERVER_MAX) {
        ak_print_error("[notice]: no more room to add\n");
        return -1;
    }

    //ak_print_error("akp_notice_add_observer tbl=%x obs=%x i=%d\n",this->obs_tbl,obs,i);
    this->obs_tbl[i] = obs;
    return 0;
}

int akp_notice_remove_observer(notice_subject_t *this, notice_observer_t *obs)
{
    CHECK_THIS_IS_NULL();

    int i;
    for(i = 0; i< OBSERVER_MAX;i++) {
        if(this->obs_tbl[i] == obs) {
            break;
        }
    }
    if(i >= OBSERVER_MAX) {
        ak_print_error("[notice]: match observer fail\n");
        return -1;
    }

    this->obs_tbl[i] = 0;
    return 0;
}

int akp_notice_notify(notice_subject_t *this, notice_msg_t *pmsg, void *pdata_ex, unsigned int len)
{
    CHECK_THIS_IS_NULL();
    void *p = NULL;
    notice_observer_t *pobs = NULL;

    // clear extend data area
    pmsg->pdata_ex= NULL;
    pmsg->data_ex_len = 0;

    // msg dispense
    int i;
    for(i = 0; i < OBSERVER_MAX;i++) {
        if(this->obs_tbl[i] != 0) {
            if(len > 0) {
                p = malloc(len);
                if(NULL != p){
                    memcpy(p ,pdata_ex,len);
                    pmsg->pdata_ex= p;
                    pmsg->data_ex_len = len;
                } else {
                    ak_print_error_ex("[notice]: malloc error\n");
                }
            }
            //ak_print_normal("[notice]: post msg src name [%s] cmd %d\n",pmsg->sub_name,pmsg->cmd);
            //ak_print_error("[notice]: %d this->obs_tbl[i]=%x\n",i,this->obs_tbl[i]);
            pobs = this->obs_tbl[i];
            ak_thread_queue_post(&(pobs->msg_queue), pmsg);
        }
    }

    return 0;
}

  

以上是关于一种基于消息发布-订阅的观察者模式实现的主要内容,如果未能解决你的问题,请参考以下文章

LiveDataBus

观察者模式与发布订阅模式的区别

java观察者模式的实现

案例分析:设计模式与代码的结构特性

设计模式-观察者模式 实现

Java描述设计模式(11):观察者模式