gRPC 实现服务端消息推送

Posted DaleLee

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了gRPC 实现服务端消息推送相关的知识,希望对你有一定的参考价值。

1. gRPC 简介

gRPC 是一种高性能、开源和通用的 RPC 框架,支持多种编程语言。在 gRPC 中,有四种类型的 RPC,分别是 Unary RPC、Server Streaming RPC、Client Streaming RPC 和 Bidirectional Streaming RPC。

  1. Unary RPC:一元 RPC
    一元 RPC 是最简单的 RPC 类型,它是一种单向的请求-响应模式。客户端向服务端发送一个请求,并等待服务端响应。

  2. Server Streaming RPC:服务器流式 RPC
    服务器流式 RPC 是一种服务端主动向客户端发送流式数据的 RPC 类型。在这种 RPC 类型中,客户端向服务端发送一条请求消息,并等待服务端的响应。与 Unary RPC 不同的是,服务端在响应客户端的请求消息后,会向客户端发送一系列的响应消息,客户端在接收到消息后可以进行相应的处理。

  3. Client Streaming RPC:客户端流式 RPC
    客户端流式 RPC 是一种客户端主动向服务端发送流式数据的 RPC 类型。在这种 RPC 类型中,客户端可以向服务端发送多条请求消息,服务端在接收到请求消息后进行相应的处理,并最终向客户端发送一条响应消息。

  4. Bidirectional Streaming RPC:双向流式 RPC
    双向流式 RPC 是一种双向流式数据传输的 RPC 类型。在这种 RPC 类型中,客户端和服务端都可以主动向对方发送数据,并可以同时接收对方发送的数据。

若要实现服务端的消息推送,应该使用 Server Streaming RPC。

2. 代码示例

(1) 编写 proto 文件

syntax = "proto3";

package com.example.grpcdemo;
option java_multiple_files = true;
// 定义传输的消息格式
message Message 
  string content = 1;


// 定义服务接口
service MessageService 
  // 定义服务方法,客户端向服务端发送一个请求,并返回一个流式响应
  rpc streamMessages (Message) returns (stream Message);

(2) 实现服务端

public class MessageServiceImpl extends MessageServiceGrpc.MessageServiceImplBase 
    @Override
    public void streamMessages(Message request, final StreamObserver<Message> responseObserver) 
        System.out.println("Received message: " + request.getContent());
        // 启动一个定时任务,每秒钟向客户端发送一条消息
        Timer timer = new Timer();
        timer.schedule(new TimerTask() 
            int count = 0;

            @Override
            public void run() 
                String message = "Message " + count++;
                System.out.println("Sending message: " + message);

                // 构造消息对象并发送给客户端
                Message response = Message.newBuilder().setContent(message).build();
                responseObserver.onNext(response);
            
        , 0, 1000);
    
public class MessageServer 
    public static void main(String[] args) throws IOException, InterruptedException 
        int port = 8080;
        // 创建 gRPC 服务器
        Server server = ServerBuilder.forPort(port)
                .addService(new MessageServiceImpl())
                .build();

        // 启动 gRPC 服务器
        server.start();

        // 阻塞线程以等待关闭信号
        System.out.println("Server started at port " + port);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> 
            System.out.println("Server stopped.");
            server.shutdown();
        ));
        server.awaitTermination();
    

(3) 实现客户端

public class MessageClient 
    public static void main(String[] args) throws InterruptedException 
        // 创建 gRPC channel 和 stub
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
                .usePlaintext()
                .build();
        MessageServiceGrpc.MessageServiceStub stub = MessageServiceGrpc.newStub(channel);

        // 创建请求对象
        Message request = Message.newBuilder().setContent("Hello, server!").build();

        // 调用服务方法,并接收来自服务端推送的消息
        stub.streamMessages(request, new StreamObserver<Message>() 
            @Override
            public void onNext(Message response) 
                System.out.println("Received message: " + response.getContent());
            

            @Override
            public void onError(Throwable t) 
                t.printStackTrace();
            

            @Override
            public void onCompleted() 
                System.out.println("Server closed the stream.");
            
        );

        // 阻塞线程以等待来自服务端推送的消息
        Thread.sleep(10000);

        // 关闭gRPC通道
        channel.shutdown();
    

服务端输出如下:

Server started at port 8080
Received message: Hello, server!
Sending message: Message 0
Sending message: Message 1
Sending message: Message 2
Sending message: Message 3
Sending message: Message 4
Sending message: Message 5

客户端输出如下:

Received message: Message 0
Received message: Message 1
Received message: Message 2
Received message: Message 3
Received message: Message 4
Received message: Message 5

使用极光推送实现分组发送和服务端集成

推送功能在手机应用开发中越来越重要,几乎成为所有App必备的功能,由于Android本身没有消息推送机制,通常采用的是基于XMPP协议的推送,但这种开发很麻烦,因此在市场上应运而生了提供消息推送服务的诸多产品,例如:百度云、个推、极光等。

  极光推送正是一个整合了Android推送、iOS推送的统一推送服务平台。下面讲解一下如何使用极光实现消息推送应用,并重点讲解一下如何实现向分组发送消息及推送服务端和自身应用集成,具体实现过程如下:

目录:

一、注册应用

二、环境搭建

三、Android开发,实现指定人群推送消息

四、开发消息推送服务端

  1、环境搭建

  2、服务端实现步骤

  3、服务端代码实现

一、注册应用

1.进入官网

   首先进入官网首页https://www.jpush.cn/,注册账号

2.注册应用

  登录到用户平台,点击创建应用如图所示:

 

  包名:创建应用项目的基础包,单击创建,产生应用信息如图所示:

  产生AppKey和API主密码,每个客户端应用使用唯一一个AppKey

二、环境搭建

1.SDK下载

   官网资源地址: https://www.jpush.cn/downloads/sdk/android/  下载 JPUSH Android – SDK。

2.导入SDK开发包

  •   复制 libs/jpush-sdk-release1.x.y.jar 到工程 libs/ 目录下
  •   复制 libs/armeabi/libjpush.so 到工程 libs/armeabi 目录下

3.配置AndroidManifest.xml

    详细请参照官网http://docs.jpush.io/guideline/android_guide/#sdk文档中说明进行配置

4.测试是否成功

   集成完成后,JPush  提供的推送服务是默认就已经开启,这时客户端就可以接收到来自服务的消息了,可通过登录到JPush用户平台,发送通知测试集成是否成功

   如图所示:

  如终端收到消息说明集成成功

三、Android端开发,实现向指定人群推送消息

   集成成功之后,服务端发送消息会被所有的安装应用的客户端所接收,我们再继续看看如何在应用中向指定的人群进行消息推送?

要指定向某一个特定的人,或者某一群特定的人,则相对复杂。因为对于 JPush 来说,某一个人就是一个注册ID,这个注册ID与开发者App没有任何关系,或者说对开发者App是没有意义的。

  如果要对开发者App有意义的某个特定的用户推送消息,则需要:把 JPush 注册用户与开发者App 用户绑定起来。我们可以使用别名与标签的功能

1.设置别名(alias)

  为安装了应用程序的用户,取个别名来标识。以后给该用户 Push 消息时,就可以用此别名来指定。每个用户只能指定一个别名。同一个应用中,尽可能为每个客户端用户标识唯一名称,以便服务端能通过该名称来唯一确定用户。

  调用JPushInterface方法:

public static void setAlias(Context context, String alias,  TagAliasCallback callback)

 参数说明:

  • Alias 设置别名
    • 实现TagAliasCallback的组件对象, 该组件提实现 gotResult 方法。采用回调机制返回执行结果,对应的参数
    • responseCode状态码:0为成功
    • Alias别名称
    • Tags标签名,没有为null
    • "" (空字符串)表示取消之前的设置。
    • 每次调用设置有效的别名,覆盖之前的设置。
    • 有效的别名组成:字母(区分大小写)、数字、下划线、汉字。
    • 限制:alias 命名长度限制为 40 字节。(判断长度需采用UTF-8编码)
  • callback

  如下所示:

new TagAliasCallback() {                

                @Override
                public void gotResult(int responseCode, String alias, Set<String> tags) {
                switch (code) {
                    case 0:
                        logs = "Set tag and alias success";
                        Log.i(TAG, logs);
                        break;
                        
                    case 6002:
                        logs = "Failed to set alias and tags due to timeout. Try again after 60s.";
                        Log.i(TAG, logs);
                        break;
                    default:
                        logs = "Failed with errorCode = " + code;
                        Log.e(TAG, logs);
                    }                
}
}

2.设置标签(tag)

   标签实质就是将应用客户根据应用的需要按类别进行分组,服务端可以组为单位来批量下发 Push 消息,这样组内客户端所有用户都可以收到该消息。一个用户可以打多个标签,标签没有唯一性要求。

  调用JPushInterface方法:

public static void setTags(Context context, Set<String> tags,TagAliasCallback callback)

 参数说明:

  • Tags 为用户设置组,一个用户可设置多个组
    • 空数组或列表表示取消之前的设置。
    • 每次调用至少设置一个 tag,覆盖之前的设置,不是新增。
    • 有效的标签组成:字母(区分大小写)、数字、下划线、汉字。
    • 限制:每个 tag 命名长度限制为 40 字节,最多支持设置 100 个 tag,但总长度不得超过1K字节。(判断长度需采用UTF-8编码)
  • Callback 同上

3.同时设置组和标签

  也可调用JPushInterface的setAliasAndTags方法同时设置别名和标签,方法如下:

 public static void setAliasAndTags(Context context, String alias,  Set<String> tags, TagAliasCallback callback)

  案例实现:

  下面代码实现为登录的学生设置别名和标签,别名为其学号,每个用户有两个标签分别是所属班级和所属专业,user是用户登录成功后的信息

if(user!=null){

         // TODO 
Set<String> tags=new HashSet<String>();
if(user.getClassName()!=null&&!user.getClassName().equals("")){
                tags.add(user.getClassName());//记录学生所属班级
}
if(user.getMajorName()!=null&&!user.getMajorName().equals("")){
     tags.add(user.getMajorName());//记录学生所属专业
}
//将学生设置到班级和专业组,并设置别名为学生的学号
JPushInterface.setAliasAndTags(getApplicationContext(), user.getUserNo(),tags, new TagAliasCallback() {                
                @Override
                public void gotResult(int responseCode, String alias, Set<String> tags) {
                    // TODO 
                    if(responseCode==0){
                        Log.i("tags", tags.toString());
                    }
                }
            });
        }

  这样服务端可以以专业和班级为单位实现批量发送消息,也可以按学号向指定学生发消息

四、开发消息推送服务端

一环境搭建

1.下载服务端SDK

  下载地址: http://docs.jpush.cn/download/attachments/2228302/jpush-client-3.2.3.zip?version=2&modificationDate=1415166491000

2.在项目中加入jar包

 

 

二服务端实现步骤

1.创建JPushClient

JPushClient jpushClient = new JPushClient(masterSecret, appKey, 3);

 参数说明:

     masterSecret:注册应用的主密码,即API 主密码

     appKey:注册应用的应用Key

     maxRetryTime:最大的尝试次数,设为3表示:跟服务器进行建立连接若失败会尝试再进行两次尝试

2. 构建一个PushPayload对象(推送对象)

  确定推送消息的目标,包括推送的平台(Android、IOS)、消息内容和目标(所有人、别名、标签),构建简单的推送对象:向所有平台,所有人,推送内容为 content 的通知。

 public static PushPayload buildPushObject_all_all_alert(String content) {

        return PushPayload.alertAll(content); 

  构建推送对象:所有平台,推送目标是别名为alias,通知内容为 content。 

 public static PushPayload buildPushObject_all_alias_alert(String alias,String content) {

        return PushPayload.newBuilder()
         .setPlatform(Platform.all())//所有平台
         .setAudience(Audience.alias(alias))//向选定的人推送
         .setNotification(Notification.alert(content))//消息内容
         .build();}

  构建推送对象:向android平台,向目标标签tag,通知标题title,内容为 content。

 public static PushPayload buildPushObject_android_tag_alertWithTitle (String alias,String title,String content) {

    return PushPayload.newBuilder().setPlatform(Platform.android())
                .setAudience(Audience.tag(tag))//向指定的组推送
                .setNotification(Notification.android(message, title, null)).build();        
}

3.推送消息

PushResult result=jpushClient.sendPush(payload);

  应用客户推送消息

    参数:payload 即第二步创建的推送对象

    返回值:PushResult 表示服务端推送结果

  包含下列数据:

       msg_id:返回推送消息的id

三服务端代码实现

1.编写MessagePush组件封装消息推送方法

public class MessagePush {

    private static final String appKey = "d1c241706d82996e1fcdc2b2";
    private static final String masterSecret = "7ee1df1a631aee5a6a5a1129";
    private JPushClient jpushClient ;
    private String title;
    private String content;
public MessagePush(String message) {
         this.message = message;    
         jpushClient = new JPushClient(masterSecret, appKey,3);
     }
     public MessagePush(String message,String title) {
this(message);        
         this.title=title;    
     }
/**
      * 向所有人发送消息
      * @return 消息id
      */
public long sendPushAll(){
        PushPayload payload=buildPushObject_all_all_alert();
        long msgId=0;
        try {
            PushResult result=jpushClient.sendPush(payload);
            msgId=result.msg_id;
        } catch (APIConnectionException e) {
            // TODO Auto-generated catch block
                LOG.error("Connection error. Should retry later. ", e);
        } catch (APIRequestException e) {
            LOG.info("HTTP Status: " + e.getStatus());
            msgId=e.getMsgId();
        }
        return msgId;
    }
/**
     * 向指定别名的客户端发送消息
     * @param alias 所有别名信息集合,这里表示发送所有学生编号
     * @return 消息id
     */
    public long sendPushAlias(Set<String> alias){
    PushPayload payloadAlias=buildPushObject_android_alias_alertWithTitle(alias);
    long msgId=0;
        try {
            PushResult result=jpushClient.sendPush(payloadAlias);
            msgId=result.msg_id;
            
        } catch (APIConnectionException e) {
            LOG.error("Connection error. Should retry later. ", e);
        } catch (APIRequestException e) {
            LOG.info("HTTP Status: " + e.getStatus());
            LOG.info("Error Code: " + e.getErrorCode());
            LOG.info("Error Message: " + e.getErrorMessage());
            LOG.info("Msg ID: " + e.getMsgId());
            msgId=e.getMsgId();
        }
        return msgId;
    }
    /**
     * 向指定组发送消息
     * @param tag 组名称
* @return 消息id     
*/
public  long sendPushTag(String tag) {
        PushPayload payloadtag = buildPushObject_android_tag_alertWithTitle(tag);
        long msgId=0;
        try {
            PushResult result = jpushClient.sendPush(payloadtag);
            msgId=result.msg_id;
            LOG.info("Got result - " + result);
        } catch (APIConnectionException e) {
            LOG.error("Connection error. Should retry later. ", e);
            
        } catch (APIRequestException e) {
            LOG.info("HTTP Status: " + e.getStatus());
            LOG.info("Error Code: " + e.getErrorCode());
            LOG.info("Error Message: " + e.getErrorMessage());
            LOG.info("Msg ID: " + e.getMsgId());
            msgId=e.getMsgId();
        }
        return msgId;
    }


/**
* 下列封装了三种获得消息推送对象(PushPayload)的方法
*  buildPushObject_android_alias_alertWithTitle、
*  buildPushObject_android_tag_alertWithTitle、
*  buildPushObject_all_all_alert
*/
public  PushPayload buildPushObject_android_alias_alertWithTitle(Set<String> alias) {
        return PushPayload.newBuilder().setPlatform(Platform.android())
                .setAudience(Audience.alias(alias))        .setNotification(Notification.android(message,title,null)).build();
}

public  PushPayload buildPushObject_android_tag_alertWithTitle(String tag){
    return PushPayload.newBuilder().setPlatform(Platform.android())
                .setAudience(Audience.tag(tag))
                .setNotification(Notification.android(message, title, null)).build();}
    
public  PushPayload buildPushObject_all_all_alert() {
        return PushPayload.alertAll(message);
    }

2.JSP页面

    主要是设计表单将数据提交给MessagePushServlet,这里页面代码就不附加了

3.编写Servlet

public class MessagePushServlet extends HttpServlet {

public void doPost(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException {
        int ret=0;
        request.setCharacterEncoding("utf-8");
        response.setCharacterEncoding("utf-8");
        String title=request.getParameter("title");
        String msg=request.getParameter("msg");
        String major=request.getParameter("majorName");
        String stuClass=request.getParameter("className");
        String alias=request.getParameter("selStuNos");
        MessagePush push=new MessagePush(msg,title);
        long msgId=0;
        if(alias!=null&&!alias.equals("")){
            String[] aliasArr=alias.split(",");
            Set<String> aliasSet=new HashSet<String>();
            for(String item:aliasArr){
                aliasSet.add(item);
            }
            msgId=push.sendPushAlias(aliasSet);
        }else if(stuClass!=null&&!stuClass.equals("")){
            msgId=push.sendPushTag(stuClass);
        }else if(major!=null&&!major.equals("")){
            msgId=push.sendPushTag(major);
        } else{
            msgId=push.sendPushAll();            
        }
        request.getRequestDispatcher("/push.jsp").forward(request, response);            
        }    
    }
 
 转自:http://blog.csdn.net/gebitan505/article/details/46812841

以上是关于gRPC 实现服务端消息推送的主要内容,如果未能解决你的问题,请参考以下文章

Java 消息推送------GoEasy实现服务端推送和web端推送

grpc的简单用例 (golang实现)

如何自己搭建一个xmpp,实现推送消息

WebSocket 实现服务端给客户端推送消息

基于 SSE 实现服务端消息主动推送解决方案

基于 SSE 实现服务端消息主动推送解决方案