面试官问我有没有java架构开发经验,java架构是啥?怎样才算是有架构开发经验?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了面试官问我有没有java架构开发经验,java架构是啥?怎样才算是有架构开发经验?相关的知识,希望对你有一定的参考价值。
这么问估计他自己都说不清楚什么是架构,架构这个定义范围很广。
从java技术体系上看有3中:
J2SE:标准版开发(目前都叫,JAVASE)
J2ME:小型版(目前都叫,JAVAME)
J2EE:企业级开发(目前都叫,JAVAEE)
现在me,和se基本看不到了!java ee 也是主流,为开发企业级环境应用程序提供解决的一套解决方案,这个技术体系会用到Servlet,Jsp等技术,主要针对Web开发。
个人理解,架构就是解决问题的一系列方案,方法。
一个小的管理系统分为:web 前段和 web 后端,这也是一个常见的软件架构。
当一个软件平台用户量慢慢增加,单机部署的方式可能就应对不了实际的访问压力。这是可能会部署2台2个一样的应用,来分流降压。这种群集的方式也是属于架构中的一种方案。
当平台访问页面访问量过高的时候,耗去的连接数过大,可能就会考虑页面的静态化。这也是架构的内容
当访问量过db无法承受的时候,就会考虑增加缓存,减少不必要的重复查库操作,再者做读写分离。这也是架构的内容
当软件内容功能越来越多的时候,可能就考虑根据业务拆分成不同的小服务(微服务)。
等等
架构是一系列的技术解决方案,其实开发过的人,基本都是在做架构内容,只是自己不知道而已。
架构是很范的,构架师涉猎的东西很多,比如多线程,io这些都是基本的,普通的开发这大都应该掌握的。
平常开发只有碰到问题,才能进步。
要解决问题,就必须要不断的学习,才能懂的更多。当你发现你不懂的东西越来越多的时候,后头看看,真的是懂的越多,问题就会越多。
参考技术A ava软件开发主要分三个方向:JAVASE、JAVAME、JAVAEE。开发的一般是:java Swing桌面应用程序,javaME(做嵌入式)不是很常用,JAVAEE(开发B/S系统)是主流。一般说的JAVA软件开发主要就是用JAVA开发一系列的B/S系统,包括各种办公系统,各种网站,电力的监控系统,报表系统等等。学java的话,一般都是要先学JAVASE(一切的基础),因为不管是要从事哪个方向开发,都必须先把基础掌握好。然后,就是一系列的东西需要学习,首先,必须了解servlet,JSP,然后就是要学习各种的框架,struts,struts2,hibernate,Spring,应为SSH2框架是现在的主流,所以,这个必须学。然后是数据库也要懂一点。总之,java需要学的东西特别多。不过,如果java基础特别特别好的话,那么公司也不会太在意其他的,因为,技术是学不完的,如果基础特别好,其他的也容易培养上手。测试,一个项目会很大,会出现各种问题,所以,必须每次完成一个功能,最好自己写测试的代码,测试一下。 参考技术B 你先说说你用过什么,架构就是比较高级的东西了。至少听说过spring cloud
多线程至少要熟知吧,synchronized 要知道底层是怎么实现的
说简单点就是奔着30W以上去的,自己去b站,搜马士兵的高级视频吧,好像叫马士兵说
对你会有帮助的,希望你不是新手,要不然我很无语追问
我学过ssm框架
多线程也学过
但是他和我说架构开发经验的时候我很懵
我就只知道dao层、selves层、web层等
追答作为新手,自己去找找,各个地区的新手工资吧
就此打住!!!
话说30w工资,可是我找工作他开的是5w的工资啊
不是!我说错了
是5000的工资
追答那你就怼他,说:“装什么逼,自己做过架构师吗?”
才5k,想上天啊。直接拜拜,搞得程序员没人要一样
JAVA框架可以分为三层:表示层,业务层和物理层。
框架又叫做开发中的半成品,它不能提供整个WEB应用程序的所有东西,但是有了框架,我们就可以集中精力进行业务逻辑的开发而不用去关心它的技术实现以及一些辅助的业务逻辑。
参考技术D 他说java架构应该就是指SSM之类的吧。Kafka从成神到升仙系列 五面试官问我 Kafka 生产者的网络架构,我直接开始从源码背起.......
- 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,Java领域新星创作者
- 📕系列专栏:Java设计模式、数据结构和算法、Kafka从入门到成神、Kafka从成神到升仙
- 📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
- 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
- 🍂博主正在努力完成2022计划中:以梦为马,扬帆起航,2022追梦人
文章目录
- Kafka 生产者的网络架构
Kafka从成神到成仙系列
Kafka 生产者的网络架构
初学一个技术,怎么了解该技术的源码至关重要。
对我而言,最佳的阅读源码的方式,那就是:不求甚解,观其大略
你如果进到庐山里头,二话不说,蹲下头来,弯下腰,就对着某棵树某棵小草猛研究而不是说先把庐山的整体脉络研究清楚了,那么你的学习方法肯定效率巨低而且特别痛苦。
最重要的还是慢慢地打击你的积极性,说我的学习怎么那么不 happy 啊,怎么那么没劲那,因为你的学习方法错了,大体读明白,先拿来用,用着用着,很多道理你就明白了。
先从整体上把关源码,再去扣一些细节问题。
举个简单的例子:
如果你刚接触 HashMap,你刚有兴趣去看其源码,在看 HashMap 的时候,有一个知识:当链表长度达到 8 之后,就变为了红黑树,小于 6 就变成了链表,当然,还和当前的长度有关。
这个时候,如果你去深究红黑树、为什么是 8 不是别的,又去查 泊松分布,最终会慢慢的搞死自己。
所以,正确的做法,我们先把这一部分给略过去,知道这个概念即可,等后面我们把整个庐山看完之后,再回过头抠细节。
当然,本章我们讲述 Kafka 生产者的网络架构
一、引言
kafka生产端的组成主要由以下几方面构成:
- 生产端的初始化
- 元数据的更新
- 缓存池(BufferPool)机制
- 网络架构模型
- 消息发送
其中,我们 生产端的初始化、元数据的更新、缓存池(BufferPool)机制已经介绍完毕,今天我们来看看 网络架构模型
废话不多说,老司机开始发车
二、网络架构模型
从我们之前的讲解中,我们可以知道,生产端最重要的几个技术点:
- KafkaProducer:主要将消息发送至
RecordAccumulator
并唤醒Sender
- Sender:调用
NetworkClient
将RecordAccumulator
的消息发送至Broker
- NetworkClient:
Kafka
对Java NIO
的封装
而正是它们几个组成了 Kafka 生产者的网络架构,其网络模型如下:
不难看出,我们 Kafka
生产者最终的网络架构也是使用的 Java NIO
,和我们的 Netty
殊途同归。
至于 kafka
为什么不用 Netty
做通信组件,这个之间在 【Kafka从成神到升仙系列 三】你真的了解 Kafka 的元数据嘛 已经讲过,此处不再叙述,有兴趣的同学可以跳转阅读。
三、网络架构整体流程
上面我们了解了 Kafka
生产端的几个网络组件及其对应的关系
我们深入的看一下,这几个组件之间到底是如何进行数据的处理及业务的处理的
网络架构整体流程如下所示:
这里涉及的主要几个方法:
- KafkaProducer
- waitOnMetadata:等待更新元数据
- accumulator.append:消息发送到缓冲区
- sender.wakeup:唤醒
Sender
线程
- Sender
- accumulator.ready:得到符合发送规定的节点
- metadata.requestUpdate:是否更新元数据
- remove any nodes:删除尚未建立连接的节点
- accumulator.drain:得到每个节点需要发送的消息批次
- createProduceRequests:组装成客户端请求
- client.send:调用
NetworkClient
设置事件类型 - client.poll:调用
NetworkClient
发送消息
- NetworkClient
- send:调用
Selector
设置事件类型 - poll:调用
Selector
发送消息
- send:调用
- Kafka-Selector
- send:设置事件类型
- poll:发送消息
可能大多数的小伙伴这个时候已经有点晕了,没关系,我们本篇文章就是解决你晕的问题的
我们会从 Producer
的源码一直会讲到 Selector
的源码并最终通过打日志的方式验证我们的猜想
戴好安全带,我们发车了
四、网络架构源码剖析
1、KafkaProducer
对于 KafkaProducer
来说,其最重要的功能就是将 record
发送至我们的 RecordAccumulator
中去
1.1 waitOnMetadata
这个方法相信看过上篇博客:【Kafka从成神到升仙系列 三】你真的了解 Kafka 的元数据嘛,已经有印象
对,没错,这个就是我们 kafka
在发送消息时,会优先请求 Broker
获取元数据信息,然后再去发送消息
具体细节的话,这里也不叙述了
总之:第一次发送消息时,这里会判断当前是否拿到了元数据。如果没有拿到元数据信息,这里会堵塞循环并唤醒 Sender
线程,让其帮忙更新元数据。
1.2 accumulator.append
这个其实我们这篇博客中也讲过:【Kafka从成神到升仙系列 二】生产者如何将消息放入到内存缓冲区
具体的细节如上,更多的细节可以参考上面那篇博客
1.3 sender.wakeup
当我们 **首次获取元数据 **或者 当前的 batch 满了 或者 一个新的 batch 创建了,我们都可以去唤醒我们的 Sender
,让这个线程执行我们的业务。
- 首次获取元数据:让
Sender
去更新元数据信息 - 当前的
batch
满了 或者 一个新的batch
创建:让Sender
将batch
发送至Broker
那这个 sender.wakeup
到底执行了什么呢,我们一起来看看其执行流程与执行代码
// 类 = KafkaProducer
sender.wakeup();
// 类 = Sender
public void wakeup()
this.client.wakeup();
// 类 = NetworkClient
public void wakeup()
this.selector.wakeup();
// 类 = Selector
public void wakeup()
this.nioSelector.wakeup();
// 类 = WindowsSelectorImpl
public Selector wakeup()
// Java NIO 包里面的操作
这里可以看到,整体的调用流程和我们上面的 网络架构 是一样的,也侧面验证了我们上面的 网络架构 是正确的。
不难看出,sender.wakeup()
实际上是唤醒了 Java NIO
里面的 Selector
,让其能够接受所有的 keys
,从而完成通信的链接与发送。
2. Sender
Sender 线程的东西稍微有点多,但核心只有两个:
- 更新元数据消息
- 将消息发送至
Broker
当 Sender
线程启动时,会启动如下代码:
public void run()
while (running)
run(time.milliseconds());
void run(long now)
// 业务代码
从代码中不难看出,当我们启动 Sender
线程之后,Sender
线程会不断的轮询调用 run(long now)
该方法,执行其业务。
那 run(long now)
方法到底做了些什么呢,我们一起来看一下
2.1 accumulator.ready
- 遍历所有的
TopicPartition
,获取每一个TopicPartition
的Leader
节点 - 弹出每一个
TopicPartition
的第一个batch
,校验该batch
有没有符合发送的规定 - 如果该
batch
符合了发送的规定后,将节点放至readyNodes
中,标识该节点已经可以发送数据了
public ReadyCheckResult ready(Cluster cluster, long nowMs)
// 准备好的节点
Set<Node> readyNodes = new HashSet<>();
// 遍历所有的 TopicPartition
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet())
TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();
// 获取当前Partition的leader节点
Node leader = cluster.leaderFor(part);
if (leader == null)
unknownLeadersExist = true;
else if (!readyNodes.contains(leader) && !muted.contains(part))
synchronized (deque)
// 弹出每一个 TopicPartition 的第一个batch
RecordBatch batch = deque.peekFirst();
if (batch != null)
// bactch 满足 batch.size() 或者 时间达到 linger.ms、
boolean full = deque.size() > 1 || batch.records.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff)
// 将当前的节点添加至准备好的队列中
readyNodes.add(leader);
else
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
// 最终返回该节点(这里最重要的还是 Set<String> 也就是准备好的节点集合)
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
public ReadyCheckResult(Set<Node> readyNodes, long nextReadyCheckDelayMs, boolean unknownLeadersExist)
this.readyNodes = readyNodes;
this.nextReadyCheckDelayMs = nextReadyCheckDelayMs;
this.unknownLeadersExist = unknownLeadersExist;
2.2 metadata.requestUpdate
- 如果发现有
TopicPartition
没有 leader,那么这里就调用requestUpdate()
方法更新 metadata
// 如果这个地方是 True,说明我们上面有的 TopicPartition 的 leader 节点为 null
if (result.unknownLeadersExist)
// 更新元数据
this.metadata.requestUpdate();
// 设置标记位为true,后续进行更新
public synchronized int requestUpdate()
this.needUpdate = true;
return this.version;
2.3 remove any nodes
- 遍历所有准备好的节点,利用
NetworkClient
来判断改节点是不是已经准备完毕 - 如果该节点未准备完毕,则从
readyNodes
中剔除 - 节点未准备完毕,会初始化链接该节点,便于下一次的消息发送
PS:这里可能会有同学对上面已经准备好了,下面为什么还有准备好的逻辑筛选有疑问
- 第一步筛选的是
TopicPartition
对应的batch
已经满足了发送的必要 - 第二步筛选的是
TopicPartition
对应的Broker
是否建立了链接,如果不是则初始化链接
// 遍历所有准备好的节点
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext())
Node node = iter.next();
// 利用 NetworkClient 来判断改节点是不是已经准备完毕
// 如果还未准备好,从准备好的队列中剔除掉
if (!this.client.ready(node, now))
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
// 判断节是否准备好发送
// 如果没有准备好发送,则会与该节点初始化链接,便于下一次的消息发送
public boolean ready(Node node, long now)
// 已经准备好
if (isReady(node, now))
return true;
// 与该节点的初始化
if (connectionStates.canConnect(node.idString(), now))
initiateConnect(node, now);
return false;
2.4 accumulator.drain
- 遍历所有准备好的
readyNodes
,得到该Broker
上所有的PartitionInfo
信息,判断该Partition
是否被处理中,如果没有在处理中则获取其对应的Deque<RecordBatch>
- 弹出队列中的
First
,判断其是否在backoff (没有重试过,或者重试了但是间隔已经达到了retryBackoffMs)
且加上该 batch 的大小 < maxRequestSize
,该batch
符合规定 - 将该
batch
放进readyRecordBatchList
中,最终放进Map<node.id(), readyRecordBatchList>
,这样我们一个Broker
可以发送的batch
就已经整理完毕。 - 最终我们得到
Map<Integer, List<RecordBatch>>
,key
代表当前已经连接好的Broker
,value
代表当前需要发送的batch
// 生成节点对应的batch消息
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,result.readyNodes,this.maxRequestSize, now);
public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes,int maxSize,long now)
Map<Integer, List<RecordBatch>> batches = new HashMap<>();
// 遍历所有准备好的node节点
for (Node node : nodes)
int size = 0;
// 通过node节点获取其所有的Partition
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
// 存储该节点需要发送的Batch
List<RecordBatch> ready = new ArrayList<>();
int start = drainIndex = drainIndex % parts.size();
do
// 取Partition
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
// 当分区没有正在进行的批处理时
if (!muted.contains(tp))
// 获取该分区的所有的RecordBatch
Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
if (deque != null)
synchronized (deque)
// 查看队列第一个
RecordBatch first = deque.peekFirst();
if (first != null)
// 判断其重试与时间
boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
if (!backoff)
// 判断是否超越最大发送限制
if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty())
break;
else
// 取出队列第一个
RecordBatch batch = deque.pollFirst();
batch.records.close();
// 当前发送的大小累积
size += batch.records.sizeInBytes();
// 放入准备好的列表中
ready.add(batch);
batch.drainedMs = now;
this.drainIndex = (this.drainIndex + 1) % parts.size();
while (start != drainIndex);
// 将节点与准备好的batch列表对应
batches.put(node.id(), ready);
// 最终返回:所有准备好的节点与对应的batch列表
return batches;
2.5 createProduceRequests
- 遍历刚刚我们得到的
Map<node.id(), readyRecordBatchList
,组装成客户端请求
List<ClientRequest> requests = createProduceRequests(batches, now);
// 组装客户端请求
private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now)
List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
return requests;
2.6 client.send
- 遍历每一个客户端请求并进行发送
PS:这里的发送是通过 KafkaClient
提供的接口,具体由 NetworkClient
实现,我们后面会讲
for (ClientRequest request : requests)
client.send(request, now);
2.7 client.poll
- 发送消息
PS:这里也同样是通过 KafkaClient
提供的接口,具体由 NetworkClient
实现,我们后面会讲
this.client.poll(pollTimeout, now);
3. NetworkClient
我们的 Sender
将 Producer
发送的消息进行 校验、筛选、组装,让我们的 NetworkClient
进一步的将消息发送
3.1 send
- 拿到当前客户端请求的
node
,校验其是否有权限 - 如果有权限的话,我们设置下时间并添加到到
inFlightRequests
,调用selector
进行发送(这里提前剧透一下,send
方法虽然叫发送,实际上并没有发送,只是注册了写事件,后面会讲到)
inFlightRequests 的作用:
- 缓存已经发出去但还没有收到响应的请求,保存对象的具体形式为
Map<NodeId,Deque<Request>>
- 配置参数
max.in.flight.requests.per.connection
,默认值为5,即每个连接最多只能缓存5个未收到响应的请求,超过这个数值之后便不能再往这个连接发送更多的请求了
public void send(ClientRequest request, long now)
// 拿到当前客户端请求的node
String nodeId = request.request().destination();
// 是否可以发送请求(我们前面已经校验过,一般情况下都能够发送)
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
doSend(request, now);
private void doSend(ClientRequest request, long now)
// 设置时间
request.setSendTimeMs(now);
// 将当前请求添加到 inFlightRequests
this.inFlightRequests.add(request);
selector.send(request.request());
3.2 poll
- 判断当前需要更新元数据,如果需要则更新元数据
- 调用
selector
的poll
方法进行Socket IO
的操作(这里也在后面会讲到) - 处理完成之后的操作
- 处理已经完成的 send
- 处理从 server 端接收到 Receive
- 处理连接失败那些连接
- 处理新建立的那些连接
- 处理超时的连接
- 如果回调的话,处理回调的信息
public List<ClientResponse> poll(long timeout, long now)
// 判断当前需要更新元数据,如果需要则更新元数据
long metadataTimeout = metadataUpdater.maybeUpdate(now);
// 调用 selector 的 poll 方法进行 Socket IO 的操作
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
// 处理完成之后的操作
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
// 处理已经完成的 send(不需要 response 的 request,如 sendÿ以上是关于面试官问我有没有java架构开发经验,java架构是啥?怎样才算是有架构开发经验?的主要内容,如果未能解决你的问题,请参考以下文章
Java8新特性面试官问我:Java8中创建Stream流有哪几种方式?
Java8新特性面试官问我:Java8中创建Stream流有哪几种方式?