Java牛客项目课_仿牛客网讨论区_第五章

Posted 夜中听雪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java牛客项目课_仿牛客网讨论区_第五章相关的知识,希望对你有一定的参考价值。

第五章:Kafka,构建TB级异步消息系统(Kafka端口9092)(Zookeeper端口2181)

5.1、阻塞队列

在这里插入图片描述
生产者和消费者线程直接接触,如果生产者速度快于消费者,那么生产者生产的数据不会被消费者消费,那么生产者占用着CPU生产就浪费了系统资源。一方面,生产的数据被浪费,另一方面,白白占用CPU资源。如果在生产者消费者之间加个阻塞队列,生产者把队列生产满了,那么生产者会阻塞,阻塞不会占用CPU,所以可以避免系统资源被白白浪费掉。


BlockingQueueTests

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTests {

    public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue(10);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }

}

class Producer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 100; i++) {
                Thread.sleep(20);
                queue.put(i);
                System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(new Random().nextInt(1000));
                queue.take();
                System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

5.5、Kafka入门

牛客课程助教1#
关于Kafka使用的重要提示
现象:在windows的命令行里启动kafka之后,当关闭命令行窗口时,就会强制关闭kafka。这种关闭方式为暴力关闭,很可能会导致kafka无法完成对日志文件的解锁。届时,再次启动kafka的时候,就会提示日志文件被锁,无法成功启动。
方案:将kafka的日志文件全部删除,再次启动即可。
建议:不要暴力关闭kafka,建议通过在命令行执行kafka-server-stop命令来关闭它。
其他:将来在Linux上部署kafka之后,采用后台运行的方式,就会避免这样的问题。
发表于 2019-08-06 12:11:43

牛友2#
老师关闭kafka的方式可以说的再具体一点吗,zookeeper也要用命令行的模式关闭吗,然后每次写写项目的时候要重新把kafka打开吗?
牛客课程助教 V 助教 : 在命令行中调用zookeeper-server-stop.bat关闭zookeeper,在命令行中调用kafka-server-stop.bat关闭kafka。每次运行项目之前,都要保证他们是启动的状态。
2019-08-22 19:06:45回复赞(0)
牛客课程助教 V 助教 : 直接关闭命令行的话,这属于暴力关闭,kafka没有走正常的销毁流程,可能会导致某些文件被锁定,下次启动报错。
2019-08-23 10:16:08


对硬盘的顺序读写的速度要高于对内存的随机读写。Kafka利用这一点,可以保证吞吐量,硬盘容量大,于是它能处理海量数据。Kafka是分布式的服务器,一台挂了还有另一台,因此它能保证高可靠性。有高扩展性,想加一台服务器很方便,简单配置下就好。
Zookeeper是用来管理集群的,其他需要集群的中间件,都可以用到zookeeper。
消息队列有两种实现方式:
点对点模式:每个数据只被一个消费者消费,比如上节课那种,一个生产者,多个消费者,一个阻塞队列。
发布订阅模式:消息可以被很多消费者同时/先后读取。Kafka是这种。

先启动zookeeper(第一个命令行窗口):

F:\\Kafka\\kafka_2.13-2.7.0\\bin\\windows>zookeeper-server-start.bat ..\\..\\config\\zookeeper.properties

启动成功,就显示:

[2021-04-14 08:31:02,245] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NioserverCnxnFactory)

再启动Kafka(第二个命令行窗口):

F:\\Kafka\\kafka_2.13-2.7.0\\bin\\windows>kafka-server-start.bat ..\\..\\config\\server.properties

启动成功,就显示:

[2021-04-14 08:32:46,162] INFO [GroupCoordinator 0]: Loading group metadata for community-consumer-group with generation 16 (kafka.coordinator.group.GroupCoordinator)
[2021-04-14 08:32:46,162] INFO [GroupMetadataManager brokerId=0] Finished loading offsets and group metadata from __consumer_offsets-28 in 259 milliseconds, of which 190 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)

再建立topic,然后查看是否建立成功,然后这边控制台作为生产者生产两条消息:

F:\\Kafka\\kafka_2.13-2.7.0\\bin\\windows>kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
Created topic test.

F:\\Kafka\\kafka_2.13-2.7.0\\bin\\windows>kafka-topics.bat --list --bootstrap-server localhost:9092
test

F:\\Kafka\\kafka_2.13-2.7.0\\bin\\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test
>hello
>world
>

然后再开一个控制台作为消费者来接收消息:

F:\\Kafka\\kafka_2.13-2.7.0\\bin\\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
hello
world

后面继续在生产者那边的控制台窗口输入消息,消费者这边的控制台窗口可以很快接收到消息。
1、
问题:【 输入行太长。 命令语法不正确。】windows10下 Kafka环境搭建。
解决方法:使用较短的文件夹和文件的名称。减少文件夹树的深度。

2、
问题:

F:\\Kafka\\kafka_2.13-2.7.0\\bin\\windows>kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partition 1 --topic test
Exception in thread "main" joptsimple.UnrecognizedOptionException: partition is not a recognized option
        at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
        at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
        at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
        at joptsimple.OptionParser.parse(OptionParser.java:396)
        at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:688)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

解决:报错原因是命令参数有问题,可以看到上面的--partitions少了个s。

3、
问题:

WARN Stopping serving logs in dir F:\\Kafka\\data\\kafka-logs (kafka.log.LogManager)
[2021-04-22 10:46:35,895] ERROR Shutdown broker because all log dirs in F:\\Kafka\\data\\kafka-logs have failed (kafka.log.LogManager)

解决:把上方列出的目录"F:\\Kafka\\data\\kafka-logs"删除,然后再重启Kafka。


pom.xml

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

application.properties

# KafkaProperties
# 3000毫秒是3秒
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000

5.9、Spring整合Kafka

问题:idea用SpringBoot整合报错:

java.lang.IllegalStateException: Failed to load ApplicationContext

	at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:125)
	at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:108)
	at org.springframework.test.context.web.ServletTestExecutionListener.setUpRequestContextIfNecessary(ServletTestExecutionListener.java:190)
	at org.springframework.test.context.web.ServletTestExecutionListener.prepareTestInstance(ServletTestExecutionListener.java:132)
	at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:246)
	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:227)
	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291)
	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:246)
	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
	at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
Caused by: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
	at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
	at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
	at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
	at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
	at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
	at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:127)
	at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
	at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
	... 24 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

原因:没启动windows上的kafka。

KafkaTests:测试Spring整合Kafka

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    public void testKafka() {
        kafkaProducer.sendMessage("test", "你好");
        kafkaProducer.sendMessage("test", "在吗");

        try {
            Thread.sleep(1000 * 10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

@Component
class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content) {
        kafkaTemplate.send(topic, content);
    }

}

@Component
class KafkaConsumer {

    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record) {
        System.out.println(record.value());
    }

}

5.11、发送系统通知

评论是指对帖子的评论,回复是指对评论的回复。

• 触发事件

  • 评论后,发布通知 (评论、回复 后,都发通知)
  • 点赞后,发布通知 (点赞评论、回复 后,都发通知。评论/回复获得的赞,都算作用户个人信息页面的赞的个数。)
  • 关注后,发布通知

通知格式

用户 aaa 评论了你的帖子/回复, 点击查看 ! 链接:帖子详情页面
用户 aaa 点赞了你的帖子/回复, 点击查看 ! 链接:帖子详情页面
用户 lhh 关注了你, 点击查看 ! 链接:lhh的个人信息页面

在这里插入图片描述


生产者和消费者,可以同时进行操作,它们是并发的,这种叫做异步。
就是生产者把点赞评论关注的消息包装下扔到消息队列,就不用管了,消费者一条条消息来处理。
从技术角度来说,用Kafka消息队列来解决问题,不同的操作用不同的主题。
从业务角度,解决问题的方式是事件驱动方式。
评论是一个事件、点赞是一个事件、关注是一个事件。
在这里插入图片描述
可以一个方法消费多个主题,也可以一个主题被多个方法消费,是多对多的关系。
消费者:

@Component
public class EventConsumer implements CommunityConstant {

    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    @Autowired
    private MessageService messageService;

    @KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record) {
        if (record == null || record.value() == null) {
            logger.error("消息的内容为空!");
            return;
        }

        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null) {
            logger.error("消息格式错误!");
            return;
        }

        // 发送站内通知
        Message message = new Message();
        message.setFromId(SYSTEM_USER_ID);//1存为常量SYSTEM_USER_ID,方便看程序的人一眼就能看出这个1是什么意思
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());
        //status不用设置,因为不设置就默认是0,就代表未读

        Map<String, Object> content = new HashMap<>();//用这个拼出一条发送的通知
        content.put("userId", event.getUserId());
        content.put("entityType", event.getEntityType());
        content.put("entityId", event.getEntityId());

        //不方便存到message表其他字段的数据,通通存到message表的content字段
        if (!event.getData().isEmpty()) {
            for (Map.Entry<String, Object> entry : event.getData().entrySet()) {
                content.put(entry.getKey(), entry.getValue());
            }
        }

        message.setContent(JSONObject.toJSONString(content));//Map<String, Object> content
        messageService.addMessage(message);
    }
}

生产者:

@Component
public class EventProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    // 处理事件
    public void fireEvent(Event event) {
        // 将事件发布到指定的主题
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));//传一个对象的JSON字符串过去,消费者再把字符串转为对象再做处理
    }

}

事件对象Event(用这样一个类,存点赞、评论、关注的信息,而不是直接存字符串:"用户xxx点赞了你的帖子,点击查看"这样,更加灵活,如果以后业务要传其他字符串,也可以修改。):

public class Event {

    private String topic;
    private int userId;
    private int entityType;
    private int entityId;
    private int entityUserId;
    private Map<String, Object> data = new HashMap<>();//如果日后这个对象还要加什么字段,方便扩展

    public String getTopic() {
        return topic;
    }

    public Event setTopic(String topic) {
        this.topic = topic;
        return this;//这种写法方便多次set。为什么不用构造器传参,因为可能某个参数不用传,这样要写很多构造器,而且可能参数非常多,传参混乱。这种方式很灵活。
    }

	。。。。。。

CommentController:

    @RequestMapping(path = "/add/{discussPostId}", method = RequestMethod.POST)
    public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment) {
        comment.setUserId(hostHolder.getUser().getId());
        comment.setStatus(0);
        comment.setCreateTime(new Date());
        commentService.addComment(comment);

        // 触发评论事件
        Event event = new Event()
                .setTopic(TOPIC_COMMENT)
                .setUserId(hostHolder.getUser().getId())
                .setEntityType(comment.getEntityType())
                .setEntityId(comment.getEntityId())
                .setData("postId", discussPostId);
        if (comment.getEntityType() == ENTITY_TYPE_POST) {
            DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        } else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) {
            Comment target = commentService.findCommentById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }
        //这里把消息发出去后,就立刻往下执行重定向页面了。消费者线程会慢慢从消息队列中取数据,来做处理。
        eventProducer.fireEvent(event);

        return "redirect:/discuss/detail/" + discussPostId;
    }

关于Kafka使用的重要提示:
现象:在windows的命令行里启动kafka之后,当关闭命令行窗口时,就会强制关闭kafka。这种关闭方式为暴力关闭,很可能会导致kafka无法完成对日志文件的解锁。届时,再次启动kafka的时候,就会提示日志文件被锁,无法成功启动。
方案:将kafka的日志文件全部删除,再次启动即可。
建议:不要暴力关闭kafka,建议通过在命令行执行kafka-server-stop命令来关闭它。
其他:将来在Linux上部署kafka之后,采用后台运行的方式,就会避免这样的问题。

ServiceLogAspect类(用于AOP):

    @Before("pointcut()")
    public void before(JoinPoint joinPoint) {
        // 用户[1.2.3.4],在[xxx],访问了[com.nowcoder.community.service.xxx()].
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        //如果不加下方的if判断,会报错:attributes空指针异常。
        // 因为这里是AOP,拦截所有对service的调用。而前面我们所有对service的调用都是通过controller调用的,所以有attributes。
        //但现在,我们有通过consumer消费者来访问service,于是attributes空指针异常。
        if (attributes == null) {
            return;
        }
        HttpServletRequest request = attributes.getRequest();
        String ip = request.getRemoteHost();
        String now = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        String target = joinPoint.getSignature().getDeclaringTypeName() + "." + joinPoint.getSignature().getName();
        logger.info(String.format("用户[%s],在[%s],访问了[%s].", ip, now, target));
    }

导入的项目,可以根据idea的左侧的project目录,查看改了哪些文件,改过的文件是蓝色的,因为有git。

5.13、显示系统通知

org.springframework.web.util.htmlUtils 可以实现HTML标签及转义字符之间的转换。
测试:

        String s = HtmlUtils.htmlEscape("<div>hello world</div><p> </p>");
        System.out.println(s);
        String s2 = HtmlUtils.htmlUnescape(s);
        System.out.println(s2);

显示:

&lt;div&gt;hello world&lt;/div&gt;&lt;p&gt; &lt;/p&gt;
<div>hello worldJava牛客项目课_仿牛客网讨论区_第二章

Java牛客项目课_仿牛客网讨论区_第八章

Java牛客项目课_仿牛客网讨论区_第八章

Java牛客项目课_仿牛客网讨论区_第七章

Java牛客项目课_仿牛客网讨论区_第六章

Java牛客项目课_仿牛客网讨论区_html页面