6.Kafka发布和显示系统通知
Posted REXmama
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了6.Kafka发布和显示系统通知相关的知识,希望对你有一定的参考价值。
1.阻塞队列
生产者线程
线程需要实现 Runnable 接口
重写接口的run方法
声明变量private BlockingQueue<Integer> queue接受传入的阻塞队列
创建有参构造器
实现示例逻辑,生产100个数据,put进阻塞队列,每生产一个数据停顿20毫秒,输出信息
class Producer implements Runnable
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue)
this.queue = queue;
@Override
public void run()
try
for (int i = 0; i < 100; i++)
Thread.sleep(20);
queue.put(i);
System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
catch (Exception e)
e.printStackTrace();
消费者线程
线程需要实现 Runnable 接口
重写接口的run方法
声明变量private BlockingQueue<Integer> queue接受传入的阻塞队列
创建有参构造器
实现示例逻辑,不停的从队列中take,每生产一个数据停顿0-1000随机毫秒,输出信息
class Consumer implements Runnable
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue)
this.queue = queue;
@Override
public void run()
try
while (true)
Thread.sleep(new Random().nextInt(1000));
queue.take();
System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
catch (Exception e)
e.printStackTrace();
main函数
实例化阻塞队列BlockingQueue queue = new ArrayBlockingQueue(10);
实例化一个生产者线程
实例化三个消费者线程
public static void main(String[] args)
BlockingQueue queue = new ArrayBlockingQueue(10);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
2.Kafka入门
Kafka简介
早先只是消息队列,慢慢扩展功能不止消息队列
消息系统:消息队列的功能,核心功能
通过日志可以分析很多内容,用户追踪等
Kfaka特点
高吞吐量:可以处理TB级别数据
消息持久化:把数据永久保存到类似硬盘的某一介质。硬盘空间大,价格低。误解,读取硬盘速率高与低取决于对硬盘使用,对硬盘的顺序读取效率甚至高于对内存的随机读取,Kafka利用这一点保证能处理海量数据
高可靠性:分布式的服务,可以做集群部署,有容错能力
高扩展性:集群服务器不够用了简单的加一个服务器就可以
Kafka术语
Broker:Kafka的服务器,集群中每一台服务器成为一个Broker
Zookeeper:管理集群软件,Kafka内置了Zookeeper
Topic:消息队列实现的方式两种,一种点对点,如上面的BlockingQueue,生产者把消息放到一个队列里,消费者就从这里面取值,消费者可能有多个,如果A消费者取到了这个数据这数据就出队了,每个数据只被一个消费者消费;还有一种方式发布订阅方式,生产者把消息队列放到某一个位置,消息可以被多个消费者读到。生产者把消息发布到的位置(空间)就叫Topic
Partition:分区,对主题位置的分区,增强了并发能力
Offsrt:消息在分区内存放的索引
Leader Replica:主副本,从分区读数据时,主副本做响应
Follower Replica:从副本只是备份,不负责响应
开启kafka环境:
在此文件夹中打开终端,先开启的是zookeeper服务:
bin/zookeeper-server-start.sh config/zookeeper.properties
然后再打开一个终端,这次开启的是kafka服务:
bin/kafka-server-start.sh config/server.properties
如果两条命令都没有报错,那么就可以认定kafka的运行环境启动成功了
3. Spring整合kafka
引入以来
application.properties配置
# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092 服务器端口
spring.kafka.consumer.group-id=community-consumer-group 消费者分组id
spring.kafka.consumer.enable-auto-commit=true 是否自动提交消费者的偏移量
spring.kafka.consumer.auto-commit-interval=3000 自动提交频率
测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaTests
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka()
kafkaProducer.sendMessage("test", "你好");
kafkaProducer.sendMessage("test", "在吗");
try
Thread.sleep(1000 * 10);
catch (InterruptedException e)
e.printStackTrace();
@Component
class KafkaProducer
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String content)
kafkaTemplate.send(topic, content);
@Component
class KafkaConsumer
@KafkaListener(topics = "test")
public void handleMessage(ConsumerRecord record)
System.out.println(record.value());
4.发布系统通知
在评论点赞关注以后,就不用管他,扔进队列,并发异步。
解决方式:基于事件驱动的方式
开发事件实体Event
ps:创建get,set方法时
set方法改一下,有返回值为Event,可以连续.set进行处理,比全参构造器更灵活。
setData改一下只传key和value,调用更方便
2.开发事件生产者
建立新包 event,包下实现 EventProducer
@Component
public class EventProducer
@Autowired
private KafkaTemplate kafkaTemplate;
// 处理事件
public void fireEvent(Event event)
// 将事件发布到指定的主题
kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
3.开发事件消费者
@Component
public class EventConsumer implements CommunityConstant
private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
@Autowired
private EventProducer eventProducer;
@Autowired
private MessageService messageService;
//处理 消息事件
@KafkaListener(topics = TOPIC_COMMENT,TOPIC_FOLLOW,TOPIC_LIKE)
private void handleCommentMessage(ConsumerRecord record)
//判空 并记录日志
if(record == null || record.value() == null)
logger.error("消息的内容为空!");
return;
//将(json)event 回复成字符串
Event event = JSONObject.parseObject(record.value().toString(), Event.class);
//再判断event是否为空
if(event == null)
logger.error("消息格式错误!");
return;
//发送站内通知
//构建 message 实体
Message message = new Message();
message.setFromId(SYSTEM_USER_ID);
message.setToId(event.getEntityUserId());
message.setConversationId(event.getTopic());
message.setCreateTime(new Date());
//填充 message.content 将 event.data(map) 填入
Map<String,Object> content = new HashMap<>();
content.put("userId",event.getUserId());
content.put("entityType",event.getEntityType());
content.put("entityId",event.getEntityId());
if(!event.getData().isEmpty() ) //event里的 map 有内容
for(Map.Entry<String,Object> entry:event.getData().entrySet())
content.put(entry.getKey(),entry.getValue());
//将content 转成json格式
message.setContent(JSONObject.toJSONString(content));
//存入message
messageService.addMessage(message);
//!!!!!!!!!!!!!!! message里的 to_id 存错了
System.out.println(message);
4.修改对应的Controller
CommentController like follow
//增加评论
@RequestMapping(path = "/add/discussPostId",method = RequestMethod.POST)
public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment)
comment.setUserId(hostHolder.getUser().getId());
comment.setStatus(0);
comment.setCreateTime(new Date());
commentService.addComment(comment);
//出发 评论事件 kafka发消息
//构造事件 event 实体
Event event = new Event()
.setTopic(TOPIC_COMMENT)
.setUserId(hostHolder.getUser().getId())
.setEntityType(comment.getEntityType())
.setEntityId(comment.getEntityId())
.setData("postId",discussPostId);
//设置 跳转链接时 检查跳转类型 是 帖子 评论 还是关注 即 找到target
if(comment.getEntityType() == ENTITY_TYPE_POST) // 帖子类型
DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) //评论类型
Comment target = commentService.findCommentById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
//触发
eventProducer.fireEvent(event);
return "redirect:/discuss/detail/"+discussPostId;
5.显示系统通知
通知列表
MessageController
获取当前用户
查询评论类通知
把查到的message放入map
转义字符反转htmlUtils.htmlUnescape
查询点赞类通知(方法类似同上)
查询关注类通知(方法类似同上)
查询未读消息数量
@RequestMapping(path = "/notice/list", method = RequestMethod.GET)
public String getNoticeList(Model model)
User user = hostHolder.getUser();
// 查询评论类通知
Message message = messageService.findLatestNotice(user.getId(), TOPIC_COMMENT);
Map<String, Object> messageVO = new HashMap<>();
if (message != null)
messageVO.put("message", message);
String content = HtmlUtils.htmlUnescape(message.getContent());
Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);
messageVO.put("user", userService.findUserById((Integer) data.get("userId")));
messageVO.put("entityType", data.get("entityType"));
messageVO.put("entityId", data.get("entityId"));
messageVO.put("postId", data.get("postId"));
int count = messageService.findNoticeCount(user.getId(), TOPIC_COMMENT);
messageVO.put("count", count);
int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_COMMENT);
messageVO.put("unread", unread);
model.addAttribute("commentNotice", messageVO);
// 查询点赞类通知
message = messageService.findLatestNotice(user.getId(), TOPIC_LIKE);
messageVO = new HashMap<>();
if (message != null)
messageVO.put("message", message);
String content = HtmlUtils.htmlUnescape(message.getContent());
Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);
messageVO.put("user", userService.findUserById((Integer) data.get("userId")));
messageVO.put("entityType", data.get("entityType"));
messageVO.put("entityId", data.get("entityId"));
messageVO.put("postId", data.get("postId"));
int count = messageService.findNoticeCount(user.getId(), TOPIC_LIKE);
messageVO.put("count", count);
int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_LIKE);
messageVO.put("unread", unread);
model.addAttribute("likeNotice", messageVO);
// 查询关注类通知
message = messageService.findLatestNotice(user.getId(), TOPIC_FOLLOW);
messageVO = new HashMap<>();
if (message != null)
messageVO.put("message", message);
String content = HtmlUtils.htmlUnescape(message.getContent());
Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);
messageVO.put("user", userService.findUserById((Integer) data.get("userId")));
messageVO.put("entityType", data.get("entityType"));
messageVO.put("entityId", data.get("entityId"));
int count = messageService.findNoticeCount(user.getId(), TOPIC_FOLLOW);
messageVO.put("count", count);
int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_FOLLOW);
messageVO.put("unread", unread);
model.addAttribute("followNotice", messageVO);
// 查询未读消息数量
int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);
model.addAttribute("letterUnreadCount", letterUnreadCount);
int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
model.addAttribute("noticeUnreadCount", noticeUnreadCount);
return "/site/notice";
2.通知详情
MessageConctroller
获得user信息
分页信息
存入map
设置已读
@RequestMapping(path = "/notice/detail/topic", method = RequestMethod.GET)
public String getNoticeDetail(@PathVariable("topic") String topic, Page page, Model model)
User user = hostHolder.getUser();
page.setLimit(5);
page.setPath("/notice/detail/" + topic);
page.setRows(messageService.findNoticeCount(user.getId(), topic));
List<Message> noticeList = messageService.findNotices(user.getId(), topic, page.getOffset(), page.getLimit());
List<Map<String, Object>> noticeVoList = new ArrayList<>();
if (noticeList != null)
for (Message notice : noticeList)
Map<String, Object> map = new HashMap<>();
// 通知
map.put("notice", notice);
// 内容
String content = HtmlUtils.htmlUnescape(notice.getContent());
Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);
map.put("user", userService.findUserById((Integer) data.get("userId")));
map.put("entityType", data.get("entityType"));
map.put("entityId", data.get("entityId"));
map.put("postId", data.get("postId"));
// 通知作者
map.put("fromUser", userService.findUserById(notice.getFromId()));
noticeVoList.add(map);
model.addAttribute("notices", noticeVoList);
// 设置已读
List<Integer> ids = getLetterIds(noticeList);
if (!ids.isEmpty())
messageService.readMessage(ids);
return "/site/notice-detail";
3.未读消息
拦截器处理 MessageInterceptor
@Component
public class MessageInterceptor implements HandlerInterceptor
@Autowired
private HostHolder hostHolder;
@Autowired
private MessageService messageService;
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception
User user = hostHolder.getUser();
if (user != null && modelAndView != null)
int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);
int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
modelAndView.addObject("allUnreadCount", letterUnreadCount + noticeUnreadCount);
WebMvcConfig
@Autowired
private MessageInterceptor messageInterceptor;
@Override
public void addInterceptors(InterceptorRegistry registry)
registry.addInterceptor(messageInterceptor)
.excludePathPatterns("/**/*.css", "/**/*.js", "/**/*.png", "/**/*.jpg", "/**/*.jpeg");
以上是关于6.Kafka发布和显示系统通知的主要内容,如果未能解决你的问题,请参考以下文章
使用 React Native 前端和 Java Spring 后端创建推送通知系统?
404-not-found-while-running-spring-boot-rest-api
easy-rules-centraldogma-spring-boot-starter 引入外部rule
自定义spring-boot-autocofigure使用maven打包的时候报错了:Failed to execute goal org.springframework.boot:spring-bo