Java牛客项目课_仿牛客网讨论区_第五章
Posted 夜中听雪
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java牛客项目课_仿牛客网讨论区_第五章相关的知识,希望对你有一定的参考价值。
文章目录
第五章:Kafka,构建TB级异步消息系统(Kafka端口9092)(Zookeeper端口2181)
5.1、阻塞队列
生产者和消费者线程直接接触,如果生产者速度快于消费者,那么生产者生产的数据不会被消费者消费,那么生产者占用着CPU生产就浪费了系统资源。一方面,生产的数据被浪费,另一方面,白白占用CPU资源。如果在生产者消费者之间加个阻塞队列,生产者把队列生产满了,那么生产者会阻塞,阻塞不会占用CPU,所以可以避免系统资源被白白浪费掉。
BlockingQueueTests
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueTests {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(10);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
Thread.sleep(20);
queue.put(i);
System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
Thread.sleep(new Random().nextInt(1000));
queue.take();
System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
5.5、Kafka入门
牛客课程助教1#
关于Kafka使用的重要提示
现象:在windows的命令行里启动kafka之后,当关闭命令行窗口时,就会强制关闭kafka。这种关闭方式为暴力关闭,很可能会导致kafka无法完成对日志文件的解锁。届时,再次启动kafka的时候,就会提示日志文件被锁,无法成功启动。
方案:将kafka的日志文件全部删除,再次启动即可。
建议:不要暴力关闭kafka,建议通过在命令行执行kafka-server-stop命令来关闭它。
其他:将来在Linux上部署kafka之后,采用后台运行的方式,就会避免这样的问题。
发表于 2019-08-06 12:11:43
牛友2#
老师关闭kafka的方式可以说的再具体一点吗,zookeeper也要用命令行的模式关闭吗,然后每次写写项目的时候要重新把kafka打开吗?
牛客课程助教 V 助教 : 在命令行中调用zookeeper-server-stop.bat关闭zookeeper,在命令行中调用kafka-server-stop.bat关闭kafka。每次运行项目之前,都要保证他们是启动的状态。
2019-08-22 19:06:45回复赞(0)
牛客课程助教 V 助教 : 直接关闭命令行的话,这属于暴力关闭,kafka没有走正常的销毁流程,可能会导致某些文件被锁定,下次启动报错。
2019-08-23 10:16:08
对硬盘的顺序读写的速度要高于对内存的随机读写。Kafka利用这一点,可以保证吞吐量,硬盘容量大,于是它能处理海量数据。Kafka是分布式的服务器,一台挂了还有另一台,因此它能保证高可靠性。有高扩展性,想加一台服务器很方便,简单配置下就好。
Zookeeper是用来管理集群的,其他需要集群的中间件,都可以用到zookeeper。
消息队列有两种实现方式:
点对点模式:每个数据只被一个消费者消费,比如上节课那种,一个生产者,多个消费者,一个阻塞队列。
发布订阅模式:消息可以被很多消费者同时/先后读取。Kafka是这种。
先启动zookeeper(第一个命令行窗口):
F:\\Kafka\\kafka_2.13-2.7.0\\bin\\windows>zookeeper-server-start.bat ..\\..\\config\\zookeeper.properties
启动成功,就显示:
[2021-04-14 08:31:02,245] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NioserverCnxnFactory)
再启动Kafka(第二个命令行窗口):
F:\\Kafka\\kafka_2.13-2.7.0\\bin\\windows>kafka-server-start.bat ..\\..\\config\\server.properties
启动成功,就显示:
[2021-04-14 08:32:46,162] INFO [GroupCoordinator 0]: Loading group metadata for community-consumer-group with generation 16 (kafka.coordinator.group.GroupCoordinator)
[2021-04-14 08:32:46,162] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-28 in 259 milliseconds, of which 190 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
再建立topic,然后查看是否建立成功,然后这边控制台作为生产者生产两条消息:
F:\\Kafka\\kafka_2.13-2.7.0\\bin\\windows>kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Created topic test.
F:\\Kafka\\kafka_2.13-2.7.0\\bin\\windows>kafka-topics.bat --list --bootstrap-server localhost:9092
test
F:\\Kafka\\kafka_2.13-2.7.0\\bin\\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test
>hello
>world
>
然后再开一个控制台作为消费者来接收消息:
F:\\Kafka\\kafka_2.13-2.7.0\\bin\\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
hello
world
后面继续在生产者那边的控制台窗口输入消息,消费者这边的控制台窗口可以很快接收到消息。
1、
问题:【 输入行太长。 命令语法不正确。】windows10下 Kafka环境搭建。
解决方法:使用较短的文件夹和文件的名称。减少文件夹树的深度。
2、
问题:
F:\\Kafka\\kafka_2.13-2.7.0\\bin\\windows>kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partition 1 --topic test
Exception in thread "main" joptsimple.UnrecognizedOptionException: partition is not a recognized option
at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
at joptsimple.OptionParser.parse(OptionParser.java:396)
at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:688)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
解决:报错原因是命令参数有问题,可以看到上面的--partitions
少了个s。
3、
问题:
WARN Stopping serving logs in dir F:\\Kafka\\data\\kafka-logs (kafka.log.LogManager)
[2021-04-22 10:46:35,895] ERROR Shutdown broker because all log dirs in F:\\Kafka\\data\\kafka-logs have failed (kafka.log.LogManager)
解决:把上方列出的目录"F:\\Kafka\\data\\kafka-logs"删除,然后再重启Kafka。
pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.properties
# KafkaProperties
# 3000毫秒是3秒
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000
5.9、Spring整合Kafka
问题:idea用SpringBoot整合报错:
java.lang.IllegalStateException: Failed to load ApplicationContext
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:125)
at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:108)
at org.springframework.test.context.web.ServletTestExecutionListener.setUpRequestContextIfNecessary(ServletTestExecutionListener.java:190)
at org.springframework.test.context.web.ServletTestExecutionListener.prepareTestInstance(ServletTestExecutionListener.java:132)
at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:246)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:227)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:246)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
Caused by: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:127)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
... 24 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
原因:没启动windows上的kafka。
KafkaTests:测试Spring整合Kafka
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka() {
kafkaProducer.sendMessage("test", "你好");
kafkaProducer.sendMessage("test", "在吗");
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Component
class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}
}
@Component
class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}
}
5.11、发送系统通知
评论是指对帖子的评论,回复是指对评论的回复。
• 触发事件
- 评论后,发布通知 (评论、回复 后,都发通知)
- 点赞后,发布通知 (点赞评论、回复 后,都发通知。评论/回复获得的赞,都算作用户个人信息页面的赞的个数。)
- 关注后,发布通知
通知格式
用户 aaa 评论了你的帖子/回复, 点击查看 ! 链接:帖子详情页面
用户 aaa 点赞了你的帖子/回复, 点击查看 ! 链接:帖子详情页面
用户 lhh 关注了你, 点击查看 ! 链接:lhh的个人信息页面
生产者和消费者,可以同时进行操作,它们是并发的,这种叫做异步。
就是生产者把点赞评论关注的消息包装下扔到消息队列,就不用管了,消费者一条条消息来处理。
从技术角度来说,用Kafka消息队列来解决问题,不同的操作用不同的主题。
从业务角度,解决问题的方式是事件驱动方式。
评论是一个事件、点赞是一个事件、关注是一个事件。
可以一个方法消费多个主题,也可以一个主题被多个方法消费,是多对多的关系。
消费者:
@Component
public class EventConsumer implements CommunityConstant {
private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
@Autowired
private MessageService messageService;
@KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})
public void handleCommentMessage(ConsumerRecord record) {
if (record == null || record.value() == null) {
logger.error("消息的内容为空!");
return;
}
Event event = JSONObject.parseObject(record.value().toString(), Event.class);
if (event == null) {
logger.error("消息格式错误!");
return;
}
// 发送站内通知
Message message = new Message();
message.setFromId(SYSTEM_USER_ID);//1存为常量SYSTEM_USER_ID,方便看程序的人一眼就能看出这个1是什么意思
message.setToId(event.getEntityUserId());
message.setConversationId(event.getTopic());
message.setCreateTime(new Date());
//status不用设置,因为不设置就默认是0,就代表未读
Map<String, Object> content = new HashMap<>();//用这个拼出一条发送的通知
content.put("userId", event.getUserId());
content.put("entityType", event.getEntityType());
content.put("entityId", event.getEntityId());
//不方便存到message表其他字段的数据,通通存到message表的content字段
if (!event.getData().isEmpty()) {
for (Map.Entry<String, Object> entry : event.getData().entrySet()) {
content.put(entry.getKey(), entry.getValue());
}
}
message.setContent(JSONObject.toJSONString(content));//Map<String, Object> content
messageService.addMessage(message);
}
}
生产者:
@Component
public class EventProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
// 处理事件
public void fireEvent(Event event) {
// 将事件发布到指定的主题
kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));//传一个对象的JSON字符串过去,消费者再把字符串转为对象再做处理
}
}
事件对象Event(用这样一个类,存点赞、评论、关注的信息,而不是直接存字符串:"用户xxx点赞了你的帖子,点击查看"这样,更加灵活,如果以后业务要传其他字符串,也可以修改。):
public class Event {
private String topic;
private int userId;
private int entityType;
private int entityId;
private int entityUserId;
private Map<String, Object> data = new HashMap<>();//如果日后这个对象还要加什么字段,方便扩展
public String getTopic() {
return topic;
}
public Event setTopic(String topic) {
this.topic = topic;
return this;//这种写法方便多次set。为什么不用构造器传参,因为可能某个参数不用传,这样要写很多构造器,而且可能参数非常多,传参混乱。这种方式很灵活。
}
。。。。。。
CommentController:
@RequestMapping(path = "/add/{discussPostId}", method = RequestMethod.POST)
public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment) {
comment.setUserId(hostHolder.getUser().getId());
comment.setStatus(0);
comment.setCreateTime(new Date());
commentService.addComment(comment);
// 触发评论事件
Event event = new Event()
.setTopic(TOPIC_COMMENT)
.setUserId(hostHolder.getUser().getId())
.setEntityType(comment.getEntityType())
.setEntityId(comment.getEntityId())
.setData("postId", discussPostId);
if (comment.getEntityType() == ENTITY_TYPE_POST) {
DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
} else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) {
Comment target = commentService.findCommentById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
}
//这里把消息发出去后,就立刻往下执行重定向页面了。消费者线程会慢慢从消息队列中取数据,来做处理。
eventProducer.fireEvent(event);
return "redirect:/discuss/detail/" + discussPostId;
}
关于Kafka使用的重要提示:
现象:在windows的命令行里启动kafka之后,当关闭命令行窗口时,就会强制关闭kafka。这种关闭方式为暴力关闭,很可能会导致kafka无法完成对日志文件的解锁。届时,再次启动kafka的时候,就会提示日志文件被锁,无法成功启动。
方案:将kafka的日志文件全部删除,再次启动即可。
建议:不要暴力关闭kafka,建议通过在命令行执行kafka-server-stop命令来关闭它。
其他:将来在Linux上部署kafka之后,采用后台运行的方式,就会避免这样的问题。
ServiceLogAspect类(用于AOP):
@Before("pointcut()")
public void before(JoinPoint joinPoint) {
// 用户[1.2.3.4],在[xxx],访问了[com.nowcoder.community.service.xxx()].
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
//如果不加下方的if判断,会报错:attributes空指针异常。
// 因为这里是AOP,拦截所有对service的调用。而前面我们所有对service的调用都是通过controller调用的,所以有attributes。
//但现在,我们有通过consumer消费者来访问service,于是attributes空指针异常。
if (attributes == null) {
return;
}
HttpServletRequest request = attributes.getRequest();
String ip = request.getRemoteHost();
String now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
String target = joinPoint.getSignature().getDeclaringTypeName() + "." + joinPoint.getSignature().getName();
logger.info(String.format("用户[%s],在[%s],访问了[%s].", ip, now, target));
}
导入的项目,可以根据idea的左侧的project目录,查看改了哪些文件,改过的文件是蓝色的,因为有git。
5.13、显示系统通知
org.springframework.web.util.htmlUtils 可以实现HTML标签及转义字符之间的转换。
测试:
String s = HtmlUtils.htmlEscape("<div>hello world</div><p> </p>");
System.out.println(s);
String s2 = HtmlUtils.htmlUnescape(s);
System.out.println(s2);
显示:
<div>hello world</div><p> </p>
<div>hello worldJava牛客项目课_仿牛客网讨论区_第二章