6.Kafka发布和显示系统通知

Posted REXmama

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了6.Kafka发布和显示系统通知相关的知识,希望对你有一定的参考价值。

1.阻塞队列

  1. 生产者线程

  1. 线程需要实现 Runnable 接口

  1. 重写接口的run方法

  1. 声明变量private BlockingQueue<Integer> queue接受传入的阻塞队列

  1. 创建有参构造器

  1. 实现示例逻辑,生产100个数据,put进阻塞队列,每生产一个数据停顿20毫秒,输出信息

class Producer implements Runnable 

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) 
        this.queue = queue;
    

    @Override
    public void run() 
        try 
            for (int i = 0; i < 100; i++) 
                Thread.sleep(20);
                queue.put(i);
                System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
            
         catch (Exception e) 
            e.printStackTrace();
        
    
  1. 消费者线程

  1. 线程需要实现 Runnable 接口

  1. 重写接口的run方法

  1. 声明变量private BlockingQueue<Integer> queue接受传入的阻塞队列

  1. 创建有参构造器

  1. 实现示例逻辑,不停的从队列中take,每生产一个数据停顿0-1000随机毫秒,输出信息

class Consumer implements Runnable 

    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) 
        this.queue = queue;
    

    @Override
    public void run() 
        try 
            while (true) 
                Thread.sleep(new Random().nextInt(1000));
                queue.take();
                System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
            
         catch (Exception e) 
            e.printStackTrace();
        
    
  1. main函数

  1. 实例化阻塞队列BlockingQueue queue = new ArrayBlockingQueue(10);

  1. 实例化一个生产者线程

  1. 实例化三个消费者线程

public static void main(String[] args) 
        BlockingQueue queue = new ArrayBlockingQueue(10);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
    

2.Kafka入门

  • Kafka简介

早先只是消息队列,慢慢扩展功能不止消息队列

消息系统:消息队列的功能,核心功能

通过日志可以分析很多内容,用户追踪等

  • Kfaka特点

高吞吐量:可以处理TB级别数据

消息持久化:把数据永久保存到类似硬盘的某一介质。硬盘空间大,价格低。误解,读取硬盘速率高与低取决于对硬盘使用,对硬盘的顺序读取效率甚至高于对内存的随机读取,Kafka利用这一点保证能处理海量数据

高可靠性:分布式的服务,可以做集群部署,有容错能力

高扩展性:集群服务器不够用了简单的加一个服务器就可以

  • Kafka术语

Broker:Kafka的服务器,集群中每一台服务器成为一个Broker

Zookeeper:管理集群软件,Kafka内置了Zookeeper

Topic:消息队列实现的方式两种,一种点对点,如上面的BlockingQueue,生产者把消息放到一个队列里,消费者就从这里面取值,消费者可能有多个,如果A消费者取到了这个数据这数据就出队了,每个数据只被一个消费者消费;还有一种方式发布订阅方式,生产者把消息队列放到某一个位置,消息可以被多个消费者读到。生产者把消息发布到的位置(空间)就叫Topic

Partition:分区,对主题位置的分区,增强了并发能力

Offsrt:消息在分区内存放的索引

Leader Replica:主副本,从分区读数据时,主副本做响应

Follower Replica:从副本只是备份,不负责响应

  • 开启kafka环境:

在此文件夹中打开终端,先开启的是zookeeper服务:

bin/zookeeper-server-start.sh config/zookeeper.properties

然后再打开一个终端,这次开启的是kafka服务:

bin/kafka-server-start.sh config/server.properties

如果两条命令都没有报错,那么就可以认定kafka的运行环境启动成功了

3. Spring整合kafka

  1. 引入以来

  1. application.properties配置

# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092 服务器端口
spring.kafka.consumer.group-id=community-consumer-group 消费者分组id
spring.kafka.consumer.enable-auto-commit=true 是否自动提交消费者的偏移量
spring.kafka.consumer.auto-commit-interval=3000 自动提交频率
  1. 测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaTests 

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    public void testKafka() 
        kafkaProducer.sendMessage("test", "你好");
        kafkaProducer.sendMessage("test", "在吗");

        try 
            Thread.sleep(1000 * 10);
         catch (InterruptedException e) 
            e.printStackTrace();
        
    



@Component
class KafkaProducer 

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content) 
        kafkaTemplate.send(topic, content);
    



@Component
class KafkaConsumer 

    @KafkaListener(topics = "test")
    public void handleMessage(ConsumerRecord record) 
        System.out.println(record.value());
    



4.发布系统通知

在评论点赞关注以后,就不用管他,扔进队列,并发异步。

解决方式:基于事件驱动的方式

  1. 开发事件实体Event

ps:创建get,set方法时

set方法改一下,有返回值为Event,可以连续.set进行处理,比全参构造器更灵活。

setData改一下只传key和value,调用更方便

2.开发事件生产者

建立新包 event,包下实现 EventProducer

@Component
public class EventProducer 

    @Autowired
    private KafkaTemplate kafkaTemplate;

    // 处理事件
    public void fireEvent(Event event) 
        // 将事件发布到指定的主题
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    

3.开发事件消费者

@Component
public class EventConsumer implements CommunityConstant 
    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    @Autowired
    private EventProducer eventProducer;

    @Autowired
    private MessageService messageService;

    //处理 消息事件
    @KafkaListener(topics = TOPIC_COMMENT,TOPIC_FOLLOW,TOPIC_LIKE)
    private void handleCommentMessage(ConsumerRecord record)
        //判空 并记录日志
        if(record == null || record.value() == null)
            logger.error("消息的内容为空!");
            return;
        
        //将(json)event 回复成字符串
        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        //再判断event是否为空
        if(event == null)
            logger.error("消息格式错误!");
            return;
        
        //发送站内通知
        //构建 message 实体
        Message message = new Message();
        message.setFromId(SYSTEM_USER_ID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());

        //填充 message.content   将 event.data(map) 填入
        Map<String,Object> content = new HashMap<>();
        content.put("userId",event.getUserId());
        content.put("entityType",event.getEntityType());
        content.put("entityId",event.getEntityId());
        if(!event.getData().isEmpty() ) //event里的 map 有内容
            for(Map.Entry<String,Object> entry:event.getData().entrySet())
                content.put(entry.getKey(),entry.getValue());
            
        
        //将content 转成json格式
        message.setContent(JSONObject.toJSONString(content));
        //存入message
        messageService.addMessage(message);
        //!!!!!!!!!!!!!!! message里的 to_id 存错了
        System.out.println(message);
    

4.修改对应的Controller

CommentController like follow

//增加评论
    @RequestMapping(path = "/add/discussPostId",method = RequestMethod.POST)
    public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment)
        comment.setUserId(hostHolder.getUser().getId());
        comment.setStatus(0);
        comment.setCreateTime(new Date());
        commentService.addComment(comment);

        //出发 评论事件 kafka发消息
        //构造事件 event 实体

        Event event = new Event()
                .setTopic(TOPIC_COMMENT)
                .setUserId(hostHolder.getUser().getId())
                .setEntityType(comment.getEntityType())
                .setEntityId(comment.getEntityId())
                .setData("postId",discussPostId);

        //设置 跳转链接时 检查跳转类型 是 帖子 评论 还是关注  即 找到target
        if(comment.getEntityType() == ENTITY_TYPE_POST) // 帖子类型
            DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
         else if (comment.getEntityType() == ENTITY_TYPE_COMMENT)  //评论类型
            Comment target = commentService.findCommentById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        
        //触发
        eventProducer.fireEvent(event);

        return "redirect:/discuss/detail/"+discussPostId;
    

5.显示系统通知

  1. 通知列表

MessageController

获取当前用户

查询评论类通知

  1. 把查到的message放入map

  1. 转义字符反转htmlUtils.htmlUnescape

查询点赞类通知(方法类似同上)

查询关注类通知(方法类似同上)

查询未读消息数量

@RequestMapping(path = "/notice/list", method = RequestMethod.GET)
public String getNoticeList(Model model) 
    User user = hostHolder.getUser();

    // 查询评论类通知
    Message message = messageService.findLatestNotice(user.getId(), TOPIC_COMMENT);
    Map<String, Object> messageVO = new HashMap<>();
    if (message != null) 
        messageVO.put("message", message);

        String content = HtmlUtils.htmlUnescape(message.getContent());
        Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);

        messageVO.put("user", userService.findUserById((Integer) data.get("userId")));
        messageVO.put("entityType", data.get("entityType"));
        messageVO.put("entityId", data.get("entityId"));
        messageVO.put("postId", data.get("postId"));

        int count = messageService.findNoticeCount(user.getId(), TOPIC_COMMENT);
        messageVO.put("count", count);

        int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_COMMENT);
        messageVO.put("unread", unread);
    
    model.addAttribute("commentNotice", messageVO);

    // 查询点赞类通知
    message = messageService.findLatestNotice(user.getId(), TOPIC_LIKE);
    messageVO = new HashMap<>();
    if (message != null) 
        messageVO.put("message", message);

        String content = HtmlUtils.htmlUnescape(message.getContent());
        Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);

        messageVO.put("user", userService.findUserById((Integer) data.get("userId")));
        messageVO.put("entityType", data.get("entityType"));
        messageVO.put("entityId", data.get("entityId"));
        messageVO.put("postId", data.get("postId"));

        int count = messageService.findNoticeCount(user.getId(), TOPIC_LIKE);
        messageVO.put("count", count);

        int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_LIKE);
        messageVO.put("unread", unread);
    
    model.addAttribute("likeNotice", messageVO);

    // 查询关注类通知
    message = messageService.findLatestNotice(user.getId(), TOPIC_FOLLOW);
    messageVO = new HashMap<>();
    if (message != null) 
        messageVO.put("message", message);

        String content = HtmlUtils.htmlUnescape(message.getContent());
        Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);

        messageVO.put("user", userService.findUserById((Integer) data.get("userId")));
        messageVO.put("entityType", data.get("entityType"));
        messageVO.put("entityId", data.get("entityId"));

        int count = messageService.findNoticeCount(user.getId(), TOPIC_FOLLOW);
        messageVO.put("count", count);

        int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_FOLLOW);
        messageVO.put("unread", unread);
    
    model.addAttribute("followNotice", messageVO);

    // 查询未读消息数量
    int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);
    model.addAttribute("letterUnreadCount", letterUnreadCount);
    int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
    model.addAttribute("noticeUnreadCount", noticeUnreadCount);

    return "/site/notice";

2.通知详情

MessageConctroller

获得user信息

分页信息

存入map

设置已读

@RequestMapping(path = "/notice/detail/topic", method = RequestMethod.GET)
public String getNoticeDetail(@PathVariable("topic") String topic, Page page, Model model) 
    User user = hostHolder.getUser();

    page.setLimit(5);
    page.setPath("/notice/detail/" + topic);
    page.setRows(messageService.findNoticeCount(user.getId(), topic));

    List<Message> noticeList = messageService.findNotices(user.getId(), topic, page.getOffset(), page.getLimit());
    List<Map<String, Object>> noticeVoList = new ArrayList<>();
    if (noticeList != null) 
        for (Message notice : noticeList) 
            Map<String, Object> map = new HashMap<>();
            // 通知
            map.put("notice", notice);
            // 内容
            String content = HtmlUtils.htmlUnescape(notice.getContent());
            Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);
            map.put("user", userService.findUserById((Integer) data.get("userId")));
            map.put("entityType", data.get("entityType"));
            map.put("entityId", data.get("entityId"));
            map.put("postId", data.get("postId"));
            // 通知作者
            map.put("fromUser", userService.findUserById(notice.getFromId()));

            noticeVoList.add(map);
        
    
    model.addAttribute("notices", noticeVoList);

    // 设置已读
    List<Integer> ids = getLetterIds(noticeList);
    if (!ids.isEmpty()) 
        messageService.readMessage(ids);
    

    return "/site/notice-detail";

3.未读消息

拦截器处理 MessageInterceptor

@Component
public class MessageInterceptor implements HandlerInterceptor 

    @Autowired
    private HostHolder hostHolder;

    @Autowired
    private MessageService messageService;

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception 
        User user = hostHolder.getUser();
        if (user != null && modelAndView != null) 
            int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);
            int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
            modelAndView.addObject("allUnreadCount", letterUnreadCount + noticeUnreadCount);
        
    

WebMvcConfig

@Autowired
private MessageInterceptor messageInterceptor;

@Override
public void addInterceptors(InterceptorRegistry registry) 
    registry.addInterceptor(messageInterceptor)
            .excludePathPatterns("/**/*.css", "/**/*.js", "/**/*.png", "/**/*.jpg", "/**/*.jpeg");

以上是关于6.Kafka发布和显示系统通知的主要内容,如果未能解决你的问题,请参考以下文章

使用 React Native 前端和 Java Spring 后端创建推送通知系统?

404-not-found-while-running-spring-boot-rest-api

easy-rules-centraldogma-spring-boot-starter 引入外部rule

自定义spring-boot-autocofigure使用maven打包的时候报错了:Failed to execute goal org.springframework.boot:spring-bo

本地通知 swift 3

Firebase (FCM):打开活动并在通知点击时传递数据。安卓