看源码学编程系列之kafka
Posted pangjia
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了看源码学编程系列之kafka相关的知识,希望对你有一定的参考价值。
kafka 由于它自身的高性能发送与消费能力,而受到广大企业的喜欢,所以我们就先看看kafka 一些源码实现如下:
1 public void run() { 2 int messageNo = 1; 3 while (true) { 4 String messageStr = "Message_" + messageNo; 5 long startTime = System.currentTimeMillis(); 6 if (isAsync) { 7 producer.send(new ProducerRecord<>(topic, 8 messageNo, 9 messageStr), new DemoCallBack(startTime, messageNo, messageStr));// 异步发送 10 } else { 11 try { 12 producer.send(new ProducerRecord<>(topic, 13 messageNo, 14 messageStr)).get();// 同步发送 15 System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); 16 } catch (InterruptedException | ExecutionException e) { 17 e.printStackTrace(); 18 } 19 } 20 ++messageNo; 21 } 22 }
这段代码摘抄的是,kafka源码 生产者发送消息demo(kafka.examples.Producer) 里面的一个片段,主要是涉及到两个知识点,一个是异步发送消息,
回调函数的实现,另一个就是同步发送,多线程Future.get 模式的实现。现在分别阐述这两种实现方式。
异步回调方式
其实这种方式主要应用在调用多线程执行某个任务时,不用傻傻等到该线程完成后得到相应的反馈信息。举个例子Client端需要调用Server端来执行某个任务,并且希望Server端执行完成后
主动将相应的结果告诉Client端。这个过程就叫做回调了。如下代码:
1 public class Client implements CSCallBack { 2 3 private volatile boolean stopThread = false; 4 private Server server; 5 6 public Client(Server server) { 7 this.server = server; 8 } 9 10 public void sendMsg(final String msg){ 11 System.out.println("ThreadName="+Thread.currentThread().getName()+" 客户端:发送的消息为:" + msg); 12 new Thread(new Runnable() { 13 @Override 14 public void run() { 15 server.getClientMsg(Client.this,msg);// 核心代码1:将被调用方自己当作参数(client)传递到调用方(Server) 16 17 while(!stopThread) {// 模拟等待另服务器端代码完成 18 System.out.println("ThreadName="+Thread.currentThread().getName()+"客户端:模拟等待回调完成"); 19 20 try { 21 Thread.sleep(50); 22 } catch (InterruptedException e) { 23 e.printStackTrace(); 24 } 25 } 26 } 27 }).start(); 28 System.out.println("ThreadName="+Thread.currentThread().getName()+" 客户端:异步发送成功"); 29 } 30 31 @Override 32 public void process(String status) { 33 stopThread = true; 34 System.out.println("ThreadName="+Thread.currentThread().getName()+" 客户端:收到服务端回调状态为:" + status); 35 } 36 }
1 public class Server { 2 3 public void getClientMsg(CSCallBack csCallBack , String msg) { 4 5 6 // 模拟服务端需要对数据处理 7 try { 8 new Thread(new Runnable() { 9 @Override 10 public void run() { 11 System.out.println("ThreadName="+Thread.currentThread().getName()+" 服务端:服务端接收到客户端发送的消息为:" + msg); 12 while(true) { 13 int max=10,min=1; 14 int ranNum = (int) (Math.random()*(max-min)+min); 15 16 if(ranNum >6) {// 当随机数大于5时认为任务完成 17 System.out.println("ThreadName="+Thread.currentThread().getName()+" 服务端:数据处理成功,返回成功状态 200"); 18 String status = "200"; 19 csCallBack.process(status);// 核心代码2:调用方(Server)任务处理完成相应的任务后,调用被调用方(Client)的方法告知任务完成 20 break; 21 } 22 23 try { 24 Thread.sleep(80); 25 } catch (InterruptedException e) { 26 e.printStackTrace(); 27 } 28 } 29 } 30 }).start(); 31 32 } catch (Exception e) { 33 e.printStackTrace(); 34 } 35 36 } 37 }
其实核心代码就两个:
client端:被调用方自己当作参数(client)传递到调用方(Server)。
Server端:调用方(Server)任务处理完成相应的任务后,调用被调用方(Client)的方法告知任务完成。
同步发送多线程 Future.get 模式实现
这种方式方式主要是用来等待某一项任务完成后,接着顺序执行某项任务。和上面的例子一样都是client 端 向server 端请求完成某项任务,并且期望server 端在完成任务后,返回结果
实例代码如下:
1 public class FutureDemo { 2 3 protected RealData realdata = null; 4 protected boolean isReady = false; 5 public synchronized void requestData(RealData realdata) {// client请求server完成某项任务 6 if (isReady) { 7 return; 8 } 9 this.realdata = realdata; 10 isReady = true; 11 notifyAll();//核心代码2:当请求的任务处理完成时,唤醒等待中的线程 12 } 13 14 public synchronized String getResult() {// client等待server完成任务后返回,此处就相当于 Future.get 15 while (!isReady) { 16 try { 17 wait();//核心代码1:发出请求后等待线程被激活 18 } catch (InterruptedException e) { 19 } 20 } 21 return realdata.result; 22 } 23 }
核心实现代码其实就是多线程里面的,wait 和 notify 实现方式。异步回调 和 同步 Future get 模式最大的区别,举个例子吧,
老婆(client 端)很爱老公,老公(服务器端)每天完成加班很晚,老婆都会等到老公回家然后给他做夜宵(同步 Future get 模式)
老婆(client 端)很爱老公,老公(服务器端)每天完成加班很晚,老婆觉得一直等太累了,就先睡觉,等老公回来后通知老婆(回调),然后老婆再给老公做夜宵(异步回调方式)。
所以大家都期望自己的老婆是, Future get 模式 还是 异步回调模式?
以上是关于看源码学编程系列之kafka的主要内容,如果未能解决你的问题,请参考以下文章
全栈编程系列SpringBoot整合Shiro(含KickoutSessionControlFilter并发在线人数控制以及不生效问题配置启动异常No SecurityManager...)(代码片段