TCP/IP协议 怎么用JAVA发送和接收二进制数据 要具体实例
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了TCP/IP协议 怎么用JAVA发送和接收二进制数据 要具体实例相关的知识,希望对你有一定的参考价值。
1.TCP/IP协议要求信息必须在块(chunk)中发送和接收,而块的长度必须是8位的倍数,因此,我们可以认为TCP/IP协议中传输的信息是字节序列。如何发送和解析信息需要一定的应用程序协议。2.信息编码:
首先是Java里对基本整型的处理,发送时,要注意:1)每种数据类型的字节个数;2)这些字节的发送顺序是怎样的?(little-endian还是
big-endian);3)所传输的数值是有符号的(signed)还是无符号的(unsigned)。具体编码时采用位操作(移位和屏蔽)就可以了。
具体在Java里,可以采用DataOutputStream类和ByteArrayOutputStream来实现。恢复时可以采用
DataInputStream类和ByteArrayInputStream类。
其次,字符串和文本,在一组符号与一组整数之间的映射称为编码字符集(coded character
set)。发送者与接收者必须在符号与整数的映射方式上达成共识,才能使用文本信息进行通信,最简单的方法就是定义一个标准字符集。具体编码时采用
String的getBytes()方法。
最后,位操作。如果设置一个特定的设为1,先设置好掩码(mask),之后用或操作;要清空特定一位,用与操作。
3.成帧与解析
成帧(framing)技术解决了接收端如何定位消息的首位位置的问题。
如果接收者试图从套接字中读取比消息本身更多的字节,将可能发生以下两种情况之一:如果信道中没有其他消息,接收者将阻塞等待,同时无法处理接收
到的消息;如果发送者也在等待接收端的响应消息,则会形成死锁(dealock);另一方面,如果信道中还有其他消息,则接收者会将后面消息的一部分甚至
全部读到第一条消息中去,这将产生一些协议错误。因此,在使用TCP套接字时,成帧就是一个非常重要的考虑因素。
有两个技术:
1.基于定界符(Delimiter-based):消息的结束由一个唯一的标记(unique
marker)指出,即发送者在传输完数据后显式添加的一个特殊字节序列。这个特殊标记不能在传输的数据中出现。幸运的是,填充(stuffing)技术
能够对消息中出现的定界符进行修改,从而使接收者不将其识别为定界符。在接收者扫描定界符时,还能识别出修改过的数据,并在输出消息中对其进行还原,从而
使其与原始消息一致。
2.显式长度(Explicit length):在变长字段或消息前附加一个固定大小的字段,用来指示该字段或消息中包含了多少字节。这种方法要确定消息长度的上限,以确定保存这个长度需要的字节数。
接口:
Java代码 import java.io.IOException; import java.io.OutputStream; public interface Framer void frameMsg(byte [] message,OutputStream out) throws IOException; byte [] nextMsg() throws IOException;
定界符的方式:
Java代码 import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class DelimFramer implements Framer private InputStream in;//data source; private static final byte DELIMTER=(byte)'\n';//message delimiter public DelimFramer(InputStream in) this.in=in; @Override public void frameMsg(byte[] message, OutputStream out) throws IOException //ensure that the message dose not contain the delimiter for(byte b:message) if(b==DELIMTER) throw new IOException("Message contains delimiter"); out.write(message); out.write(DELIMTER); out.flush(); @Override public byte[] nextMsg() throws IOException ByteArrayOutputStream messageBuffer=new ByteArrayOutputStream(); int nextByte; while((nextByte=in.read())!=DELIMTER) if(nextByte==-1)//end of stream? if(messageBuffer.size()==0) return null; else throw new EOFException("Non-empty message without delimiter"); messageBuffer.write(nextByte); return messageBuffer.toByteArray();
显式长度方法:
Java代码 import java.io.DataInputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class LengthFramer implements Framer public static final int MAXMESSAGELENGTH=65535; public static final int BYTEMASK=0xff; public static final int SHOTMASK=0xffff; public static final int BYTESHIFT=8; private DataInputStream in;// wrapper for data I/O public LengthFramer(InputStream in) throws IOException this.in=new DataInputStream(in); @Override public void frameMsg(byte[] message, OutputStream out) throws IOException if(message.length>MAXMESSAGELENGTH) throw new IOException("message too long"); //write length prefix out.write((message.length>>BYTEMASK)&BYTEMASK); out.write(message.length&BYTEMASK); //write message out.write(message); out.flush(); @Override public byte[] nextMsg() throws IOException int length; try length=in.readUnsignedShort(); catch(EOFException e) //no (or 1 byte) message; return null; //0<=length<=65535; byte [] msg=new byte[length]; in.readFully(msg);//if exception,it's a framing error; return msg; 参考技术A 用java socket,这是封装好的tcp/ip API,然后用数据流,之间就可以相互传递字节数组了
TCP/IP传输层协议实现 - TCP报文接收/发送(lwip)
(tcp的收发与接收窗口/发送窗口/通告窗口关联比较紧密,接收/发送过程在《TCP/IP传输层协议实现 - TCP接收窗口/发送窗口/通告窗口(lwip)》https://blog.csdn.net/arm7star/article/details/117153533 都有介绍,本文对收发过程进行更详细一步介绍。)
1、滑动窗口
1.1、接收窗口(接收滑动窗口)
接收窗口是本地可以接收数据的窗口,接收端只接收窗口内的数据,窗口外的丢弃。
接收到数据,接收窗口左边沿右移,接收窗口减小。
(1)、接收到数据前(4、5、6、7、8、9可接收) | (2)、接收窗口左边沿右移,接收窗口减小(收到4、5、6数据,可接收7、8、9) |
数据被应用层接收,接收窗口大小恢复,接收窗口右边沿右移。
(1)、应用层接收数据前(4、5、6待应用层接收,可接收7、8、9) | (2)、接收窗口恢复,接收窗口右边沿右移(4、5、6被应用层接收,恢复3个字节数据,接收窗口大小恢复3个字节,右边沿右移) |
1.2、发送窗口(发送滑动窗口)
发送窗口与接收窗口类似,发送窗口内的数据才能发送。不考虑拥塞窗口的前提下,如下图所示,接收方通告接收窗口大小为6个字节大小(接收方还可以再接收6个字节的数据,发送方只能再发送6个字节的数据),发送方发送的1、2、3被接收方接收并被确认,发送方的4、5、6已经发送但未被确认,7、8、9为能够发送的数据,10、11不能发送(发送窗口外的数据需要发送时,协议栈会缓存在发送缓存里面,10、11虽然不能发送,但是如果发送缓存够的话,会缓存到发送缓存里面,当数据被确认时,发送窗口移动,缓存的数据如果在发送窗口内,那么就可以发送)。
2、报文接收
tcp报文输入调用tcp_input函数,tcp_input依次查找tcp_active_pcbs、tcp_tw_pcbs、tcp_listen_pcbs链表,已连接的tcp信息保存在tcp_active_pcbs里面(收发双方的IP地址及端口)。(连接建立与终止参考https://blog.csdn.net/arm7star/article/details/116560454 《TCP/IP传输层协议实现 - TCP连接的建立与终止(lwip)》)
2.1、报文解析接收
(释放已被确认接收的报文,更新发送缓存等,接收数据...)
获取tcp首部。
iphdr = p->payload; // 获取ip首部
tcphdr = (struct tcp_hdr *)((u8_t *)p->payload + IPH_HL(iphdr) * 4); // 获取tcp首部
解析tcp首部。(源地址,目的地址、seqno、ackno、flags、tcplen)
/* Convert fields in TCP header to host byte order. */
tcphdr->src = ntohs(tcphdr->src);
tcphdr->dest = ntohs(tcphdr->dest);
seqno = tcphdr->seqno = ntohl(tcphdr->seqno);
ackno = tcphdr->ackno = ntohl(tcphdr->ackno);
tcphdr->wnd = ntohs(tcphdr->wnd);
flags = TCPH_FLAGS(tcphdr);
tcplen = p->tot_len + ((flags & (TCP_FIN | TCP_SYN)) ? 1 : 0);
初始化接收报文的tcp_seg。(输入报文长度、报文数据指针、tcp首部,对于接收窗口内的可接收的报文,输入的报文会拷贝到inseg;如果数据可接受,那么recv_data会指向可接受的数据,recv_flags设置为可接受报文的flags)
/* Set up a tcp_seg structure. */
inseg.next = NULL;
inseg.len = p->tot_len;
inseg.dataptr = p->payload;
inseg.p = p;
inseg.tcphdr = tcphdr;
recv_data = NULL; // 可接受数据的指针(如果期待的下一个序号为4,那么序号为4的报文即为可接受的报文)
recv_flags = 0;
检查是否有被应用层拒绝接收的数据refused_data。(收到可接受的数据recv_data,协议栈会将数据推送到应用层,应用层因为某些原因没办法接收该数据,那么被拒绝接收的recv_data数据会保存在refused_data里面;处理报文前需要先检查是否有被拒绝接收的refused_data数据,如果有,那么需要再次将refused_data推送到应用层,推送成功才处理当前收到的报文(如果推送失败的话,应用层应该没办法处理更多数据,即使当前输入报文可能有数据要接收,但是应用层没办法接收,协议栈直接丢弃该报文即可,让发送方超时重传))
/* If there is data which was previously "refused" by upper layer */
if (pcb->refused_data != NULL) {
/* Notify again application with data previously received. */
LWIP_DEBUGF(TCP_INPUT_DEBUG, ("tcp_input: notify kept packet\\n"));
TCP_EVENT_RECV(pcb, pcb->refused_data, ERR_OK, err); // 发送数据到应用层
if (err == ERR_OK) { // 发送成功
pcb->refused_data = NULL; // 重置refused_data
} else {
/* drop incoming packets, because pcb is "full" */
LWIP_DEBUGF(TCP_INPUT_DEBUG, ("tcp_input: drop incoming packets, because pcb is \\"full\\"\\n"));
TCP_STATS_INC(tcp.drop);
snmp_inc_tcpinerrs();
pbuf_free(p); // 发送数据到应用层失败,释放报文,直接丢弃报,返回
return;
}
}
调用tcp_process处理输入报文(报文标志位、连接状态检查,tmr定时器复位、KEEPALIVE counter重置,处理tcp报文选项),ESTABLISHED状态下的连接调用tcp_receive处理输入报文。
发送窗口更新。(细节参考https://blog.csdn.net/arm7star/article/details/117153533)
if (flags & TCP_ACK) {
right_wnd_edge = pcb->snd_wnd + pcb->snd_wl2;
......
重复ack处理,3个以上重复ACK执行快速恢复算法(快速重传、拥塞避免)。(《TCP-IP详解卷 1:协议》21.7 快速重传与快速恢复算法)
if (pcb->lastack == ackno) { // 没有新的数据被确认接收
pcb->acked = 0; // 被确认的数据为0
if (pcb->snd_wl2 + pcb->snd_wnd == right_wnd_edge){ // 重复ACK判断
++pcb->dupacks;
if (pcb->dupacks >= 3 && pcb->unacked != NULL) { // 如果一连串收到3个或3个以上的重复ACK,就非常可能是发送的报文段丢失了
if (!(pcb->flags & TF_INFR)) {
/* This is fast retransmit. Retransmit the first unacked segment. */
LWIP_DEBUGF(TCP_FR_DEBUG, ("tcp_receive: dupacks %"U16_F" (%"U32_F"), fast retransmit %"U32_F"\\n",
(u16_t)pcb->dupacks, pcb->lastack,
ntohl(pcb->unacked->tcphdr->seqno)));
tcp_rexmit(pcb); // 重传丢失的数据报文段,而无需等待超时定时器溢出
/* Set ssthresh to max (FlightSize / 2, 2*SMSS) */
/*pcb->ssthresh = LWIP_MAX((pcb->snd_max -
pcb->lastack) / 2,
2 * pcb->mss);*/
/* Set ssthresh to half of the minimum of the current cwnd and the advertised window */
if (pcb->cwnd > pcb->snd_wnd)
pcb->ssthresh = pcb->snd_wnd / 2;
else
pcb->ssthresh = pcb->cwnd / 2; // 慢启动门限ssthresh设置为拥塞窗口的一半
/* The minimum value for ssthresh should be 2 MSS */
if (pcb->ssthresh < 2*pcb->mss) { // 慢启动门限小于2个mss
LWIP_DEBUGF(TCP_FR_DEBUG, ("tcp_receive: The minimum value for ssthresh %"U16_F" should be min 2 mss %"U16_F"...\\n", pcb->ssthresh, 2*pcb->mss));
pcb->ssthresh = 2*pcb->mss; // 设置慢启动门限为2个mss
}
pcb->cwnd = pcb->ssthresh + 3 * pcb->mss; // 拥塞窗口设置为pcb->ssthresh + 3 * pcb->mss(拥塞窗口大于慢启动门限ssthresh时,执行拥塞避免算法)
pcb->flags |= TF_INFR; // 快速恢复算法(In fast recovery)
} else {
/* Inflate the congestion window, but not if it means that
the value overflows. */
if ((u16_t)(pcb->cwnd + pcb->mss) > pcb->cwnd) {
pcb->cwnd += pcb->mss;
}
}
}
} else {
LWIP_DEBUGF(TCP_FR_DEBUG, ("tcp_receive: dupack averted %"U32_F" %"U32_F"\\n",
pcb->snd_wl2 + pcb->snd_wnd, right_wnd_edge));
}
} else if (TCP_SEQ_BETWEEN(ackno, pcb->lastack+1, pcb->snd_nxt)){
有数据被确认接收。(ackno >= pcb->lastack + 1 && ackno <= pcb->snd_nxt)
} else if (TCP_SEQ_BETWEEN(ackno, pcb->lastack+1, pcb->snd_nxt)){ // ackno确认(, ackno - 1]区间的数据被接收,[pcb->lastack, pcb->snd_nxt - 1]区间的数据等待被确认,因此有数据被确认的话,ackno - 1 >= pcb->lastack并且ackno - 1 <= pcb->snd_nxt - 1,即ackno >= pcb->lastack + 1 && ackno <= pcb->snd_nxt。
/* We come here when the ACK acknowledges new data. */
/* Reset the "IN Fast Retransmit" flag, since we are no longer
in fast retransmit. Also reset the congestion window to the
slow start threshold. */
if (pcb->flags & TF_INFR) {
pcb->flags &= ~TF_INFR;
pcb->cwnd = pcb->ssthresh;
}
有数据被确认接收后重置超时重传定时器,恢复发送缓存,更新lastack,更新拥塞窗口cwnd等(慢启动、拥塞避免)。
/* Reset the number of retransmissions. */
pcb->nrtx = 0;
/* Reset the retransmission time-out. */
pcb->rto = (pcb->sa >> 3) + pcb->sv;
/* Update the send buffer space. Diff between the two can never exceed 64K? */
pcb->acked = (u16_t)(ackno - pcb->lastack);
pcb->snd_buf += pcb->acked;
/* Reset the fast retransmit variables. */
pcb->dupacks = 0;
pcb->lastack = ackno;
/* Update the congestion control variables (cwnd and
ssthresh). */
if (pcb->state >= ESTABLISHED) {
if (pcb->cwnd < pcb->ssthresh) {
if ((u16_t)(pcb->cwnd + pcb->mss) > pcb->cwnd) {
pcb->cwnd += pcb->mss; // 慢启动算法
}
LWIP_DEBUGF(TCP_CWND_DEBUG, ("tcp_receive: slow start cwnd %"U16_F"\\n", pcb->cwnd));
} else {
u16_t new_cwnd = (pcb->cwnd + pcb->mss * pcb->mss / pcb->cwnd); // 拥塞避免算法
if (new_cwnd > pcb->cwnd) {
pcb->cwnd = new_cwnd;
}
LWIP_DEBUGF(TCP_CWND_DEBUG, ("tcp_receive: congestion avoidance cwnd %"U16_F"\\n", pcb->cwnd));
}
}
LWIP_DEBUGF(TCP_INPUT_DEBUG, ("tcp_receive: ACK for %"U32_F", unacked->seqno %"U32_F":%"U32_F"\\n",
ackno,
pcb->unacked != NULL?
ntohl(pcb->unacked->tcphdr->seqno): 0,
pcb->unacked != NULL?
ntohl(pcb->unacked->tcphdr->seqno) + TCP_TCPLEN(pcb->unacked): 0));
被确认接收的数据从unacked队列删除。
/* Remove segment from the unacknowledged list if the incoming
ACK acknowlegdes them. */
while (pcb->unacked != NULL &&
TCP_SEQ_LEQ(ntohl(pcb->unacked->tcphdr->seqno) +
TCP_TCPLEN(pcb->unacked), ackno)) {
LWIP_DEBUGF(TCP_INPUT_DEBUG, ("tcp_receive: removing %"U32_F":%"U32_F" from pcb->unacked\\n",
ntohl(pcb->unacked->tcphdr->seqno),
ntohl(pcb->unacked->tcphdr->seqno) +
TCP_TCPLEN(pcb->unacked)));
next = pcb->unacked;
pcb->unacked = pcb->unacked->next;
LWIP_DEBUGF(TCP_QLEN_DEBUG, ("tcp_receive: queuelen %"U16_F" ... ", (u16_t)pcb->snd_queuelen));
LWIP_ASSERT("pcb->snd_queuelen >= pbuf_clen(next->p)", (pcb->snd_queuelen >= pbuf_clen(next->p)));
pcb->snd_queuelen -= pbuf_clen(next->p);
tcp_seg_free(next);
LWIP_DEBUGF(TCP_QLEN_DEBUG, ("%"U16_F" (after freeing unacked)\\n", (u16_t)pcb->snd_queuelen));
if (pcb->snd_queuelen != 0) {
LWIP_ASSERT("tcp_receive: valid queue length", pcb->unacked != NULL ||
pcb->unsent != NULL);
}
}
接下来被确认接收的数据从unsent删除。(超时重传,unacked队列的数据会插入unsent表头,因此unsent里面可能包含已发送但unacked的数据)
如果报文有数据,那么处理报文里面的数据。报文处理参考https://blog.csdn.net/arm7star/article/details/117153533 《TCP/IP传输层协议实现 - TCP接收窗口/发送窗口/通告窗口(lwip)》"3.1、接收窗口的更新"。
2.2、报文接收后处理
数据被确认接收以及有可接受的数据时,需要通知应用层。
tcp_process处理报文后返回tcp_input,tcp_input对报文处理结果进行处理。
有数据被确认接收,需要调用sent函数(select(lwip_select)函数有监听sent事件的功能,应用层阻塞在lwip_select等待写事件时,需要调用TCP_EVENT_SENT唤醒应用层)
/* If the application has registered a "sent" function to be
called when new send buffer space is available, we call it
now. */
if (pcb->acked > 0) {
TCP_EVENT_SENT(pcb, pcb->acked, err); // sent事件处理
}
接收到数据recv_data,需要发送到应用层(报文如果有PSH标志,需要将PSH标志传递到应用层)。
if (recv_data != NULL) { // 有可接受的数据
if(flags & TCP_PSH) { // 报文含有PSH标志
recv_data->flags |= PBUF_FLAG_PUSH; // 传递PSH标志到应用层
}
/* Notify application that data has been received. */
TCP_EVENT_RECV(pcb, recv_data, ERR_OK, err); // 发送数据到应用层(lwip内部为mail box)
/* If the upper layer can't receive this data, store it */
if (err != ERR_OK) { // 应用层拒绝接收数据(mail box已经满了)
pcb->refused_data = recv_data; // 拒收的数据保存在refused_data里面,下次收到报文时再次尝试发送到应用层,如果下次仍发送失败,则丢弃下次收到的报文
LWIP_DEBUGF(TCP_INPUT_DEBUG, ("tcp_input: keep incoming packet, because pcb is \\"full\\"\\n"));
}
}
FIN报文处理。(应用层接读数据阻塞时,如果传输层收到FIN报文,需要让应用层返回,lwip通过发送一个NULL数据给应用层,应用层收到NULL的数据时,认为收到了FIN标记,读数据将返回)
/* If a FIN segment was received, we call the callback
function with a NULL buffer to indicate EOF. */
if (recv_flags & TF_GOT_FIN) { // 报文带有FIN标记
TCP_EVENT_RECV(pcb, NULL, ERR_OK, err); // 发送一个NULL数据包到应用层
}
发送更多报文。(有数据被确认接收了,发送窗口里面有更多数据可以发送(unsent发送窗口里面的数据可以发送),调用tcp_output发送数据;不考虑拥塞窗口的情况下,总有发送窗口大小的数据在发送,tcp_output会把发送窗口占满,如果发送窗口太大,导致网络拥塞,则拥塞窗口会限制同一时间能发送报文的数量)
/* If there were no errors, we try to send something out. */
if (err == ERR_OK) {
tcp_output(pcb);
}
3、报文发送
在数据被确认接收后,tcp_input会调用tcp_output发送更多unsent的数据,因此tcp_output可以认为是发送缓存里面的数据到发送窗口。发送unsent队列里面的报文比较简单,如果unsent里面的报文在发送窗口/拥塞窗口范围内,那么就发送;另外,tcp_output有调用tcp_do_output_nagle,tcp_do_output_nagle主要在发送端用于避免糊涂窗口综合症(《TCP-IP详解卷 1:协议》22.3 糊涂窗口综合症);unsent队列发送可以参考https://blog.csdn.net/arm7star/article/details/117153533《TCP/IP传输层协议实现 - TCP接收窗口/发送窗口/通告窗口(lwip)》"4.1、报文发送/加入发送队列"。
应用层数据发送到unsent队列是通过tcp_enqueue函数实现。tcp_enqueue函数原型为“err_t tcp_enqueue(struct tcp_pcb *pcb, void *arg, u16_t len, u8_t flags, u8_t apiflags, u8_t optflags)”,arg即为应用层write/sengmsg的数据,len即为应用层write/sengmsg数据的长度,socket封装了对pcb及其他参数的处理,应用层调用write/sengmsg函数,协议栈会找到socket对应的tcp_pcb并调用tcp_enqueue。
发送队列/发送缓存主要涉及snd_lbb、snd_buf、unsent,snd_lbb(Sequence number of next byte to be buffered)为下一个缓存报文的序号(unsent、unacked之外的第一个序号),snd_buf为发送缓存的大小(可用缓存的大小,数据加入发送队列时,发送缓存减小,数据被确认接收时,恢复发送缓存),unsent前面介绍过,如果unsent队列不为空、当前发送的数据比较少,unsent最末一个报文可能可以装下当前的数据,那么可能需要合并成一个报文。
加入发送缓存/发送队列的过程如下。
3.1、发送的数据组装成tcp报文队列
发送缓存、缓存报文序号等获取。
/* fail on too much data */
if (len > pcb->snd_buf) { // 本次要发送的数据长度大于可用的缓存
LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 3, ("tcp_enqueue: too much data (len=%"U16_F" > snd_buf=%"U16_F")\\n", len, pcb->snd_buf));
pcb->flags |= TF_NAGLEMEMERR;
return ERR_MEM; // 返回内存错误(内存不够)
}
left = len; // 剩余需要缓存的数据left(一次发送的数据可能过大,需要缓存成多个报文)
ptr = arg; // 数据指针
optlen = LWIP_TCP_OPT_LENGTH(optflags);
/* seqno will be the sequence number of the first segment enqueued
* by the call to this function. */
seqno = pcb->snd_lbb; // 当前缓存报文的seqno
发送队列长度检查。(虽然发送缓存够大,但是发送队列有限制,因此也不能缓存超过发送队列的数据;发送队列包括unacked、unsent;发送队列长度是按pbuf节点来计算的,就是内存链表有多少个节点,简单理解就是发送队列(unacked、unsent)里面有多少不连续的内存数据块;pbuf_clen计算一个pbuf有多少内存节点)
/* If total number of pbufs on the unsent/unacked queues exceeds the
* configured maximum, return an error */
queuelen = pcb->snd_queuelen; // 发送队列长度
/* check for configured max queuelen and possible overflow */
if ((queuelen >= TCP_SND_QUEUELEN) || (queuelen > TCP_SNDQUEUELEN_OVERFLOW)) {
LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 3, ("tcp_enqueue: too long queue %"U16_F" (max %"U16_F")\\n", queuelen, TCP_SND_QUEUELEN));
TCP_STATS_INC(tcp.memerr);
pcb->flags |= TF_NAGLEMEMERR;
return ERR_MEM;
}
应用层发送的数据组装成tcp报文,放入queue队列(queue队列保存的是当前发送的数据的tcp报文链表;发送的数据过大时,会将数据拆分保存在一个个的tcp报文里面,这些报文组成一个queue链表)。
while (queue == NULL || left > 0) {
/* The segment length (including options) should be at most the MSS */
seglen = left > (pcb->mss - optlen) ? (pcb->mss - optlen) : left; // 如果当前剩余数据加上选项数据长度大于一个mss报文,那么拆分剩余的数据,先组装一个mss大小的报文
/* Allocate memory for tcp_seg, and fill in fields. */
seg = memp_malloc(MEMP_TCP_SEG);
if (seg == NULL) {
LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 2,
("tcp_enqueue: could not allocate memory for tcp_seg\\n"));
goto memerr;
}
seg->next = NULL; // next为空
seg->p = NULL; // 负载为空(还没挂接数据)
/* first segment of to-be-queued data? */
if (queue == NULL) {
queue = seg; // 当前报文为本次缓存的第一个报文,queue指向第一个拆分的报文
}
/* subsequent segments of to-be-queued data */
else {
/* Attach the segment to the end of the queued segments */
LWIP_ASSERT("useg != NULL", useg != NULL);
useg->next = seg; // useg指向前一次循环拆分的报文,当前报文链接到前一个拆分报文的后面
}
/* remember last segment of to-be-queued data for next iteration */
useg = seg; // useg指向本次循环拆分的报文,下次循环时,useg就指向前一次循环拆分的报文
/* If copy is set, memory should be allocated
* and data copied into pbuf, otherwise data comes from
* ROM or other static memory, and need not be copied. */
if (apiflags & TCP_WRITE_FLAG_COPY) { // 写拷贝,应用层的数据需要拷贝到报文里面
if ((seg->p = pbuf_alloc(PBUF_TRANSPORT, seglen + optlen, PBUF_RAM)) == NULL) {
LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 2,
("tcp_enqueue : could not allocate memory for pbuf copy size %"U16_F"\\n", seglen));
goto memerr;
}
LWIP_ASSERT("check that first pbuf can hold the complete seglen",
(seg->p->len >= seglen + optlen));
queuelen += pbuf_clen(seg->p); // pbuf_clen计算Count number of pbufs in a chain,非内存大小,增加queuelen
if (arg != NULL) {
MEMCPY((char *)seg->p->payload + optlen, ptr, seglen); // 拷贝应用层数据到报文里面
}
seg->dataptr = seg->p->payload; // 更新dataptr
}
/* do not copy data */
else {
/* First, allocate a pbuf for the headers. */
if ((seg->p = pbuf_alloc(PBUF_TRANSPORT, optlen, PBUF_RAM)) == NULL) { // 不需要拷贝数据,只分配optlen大小的内存来保存选项数据
LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 2,
("tcp_enqueue: could not allocate memory for header pbuf\\n"));
goto memerr;
}
queuelen += pbuf_clen(seg->p); // 增加queuelen
/* Second, allocate a pbuf for holding the data.
* since the referenced data is available at least until it is sent out on the
* link (as it has to be ACKed by the remote party) we can safely use PBUF_ROM
* instead of PBUF_REF here.
*/
if (left > 0) {
if ((p = pbuf_alloc(PBUF_RAW, seglen, PBUF_ROM)) == NULL) { // 申请一个pbuf(没有申请保存数据的内存空间,应用层数据不拷贝到报文里面,pbuf的数据指针指向应用层数据地址)
/* If allocation fails, we have to deallocate the header pbuf as well. */
pbuf_free(seg->p);
seg->p = NULL;
LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 2,
("tcp_enqueue: could not allocate memory for zero-copy pbuf\\n"));
goto memerr;
}
++queuelen; // 应用层的数据是连续的,只占一个连续内存块,因此queuelen只需要加1即可
/* reference the non-volatile payload data */
p->payload = ptr; // payload指向应用层数据地址
seg->dataptr = ptr; // dataptr指向应用层数据地址
/* Concatenate the headers and data pbufs together. */
pbuf_cat(seg->p/*header*/, p/*data*/);
p = NULL;
}
}
/* Now that there are more segments queued, we check again if the
length of the queue exceeds the configured maximum or overflows. */
if ((queuelen > TCP_SND_QUEUELEN) || (queuelen > TCP_SNDQUEUELEN_OVERFLOW)) {
LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 2, ("tcp_enqueue: queue too long %"U16_F" (%"U16_F")\\n", queuelen, TCP_SND_QUEUELEN));
goto memerr;
}
seg->len = seglen; // 设置报文长度
/* build TCP header */
if (pbuf_header(seg->p, TCP_HLEN)) { // 申请tcp首部空间
LWIP_DEBUGF(TCP_OUTPUT_DEBUG | 2, ("tcp_enqueue: no room for TCP header in pbuf.\\n"));
TCP_STATS_INC(tcp.err);
goto memerr;
}
seg->tcphdr = seg->p->payload; // tcp首部设置
seg->tcphdr->src = htons(pcb->local_port);
seg->tcphdr->dest = htons(pcb->remote_port);
seg->tcphdr->seqno = htonl(seqno);
seg->tcphdr->urgp = 0;
TCPH_FLAGS_SET(seg->tcphdr, flags);
/* don't fill in tcphdr->ackno and tcphdr->wnd until later */
seg->flags = optflags;
/* Set the length of the header */
TCPH_HDRLEN_SET(seg->tcphdr, (5 + optlen / 4));
LWIP_DEBUGF(TCP_OUTPUT_DEBUG | LWIP_DBG_TRACE, ("tcp_enqueue: queueing %"U32_F":%"U32_F" (0x%"X16_F")\\n",
ntohl(seg->tcphdr->seqno),
ntohl(seg->tcphdr->seqno) + TCP_TCPLEN(seg),
(u16_t)flags));
left -= seglen; // 剩余数长度减去已拆分成tcp报文的数据长度(剩余的left数据需要组装tcp报文)
seqno += seglen; // 更新下个报文的seqno
ptr = (void *)((u8_t *)ptr + seglen); // 下一个报文数据的指针
}
3.2、本次发送数据的报文队列插入unsent队列
获取queue报文插入的位置。(如果unsent队列不为空,那么需要插入到unsent队列末尾,否则queue就是未发送的队列)
/* Now that the data to be enqueued has been broken up into TCP
segments in the queue variable, we add them to the end of the
pcb->unsent queue. */
if (pcb->unsent == NULL) { // unsent队列为空
useg = NULL;
}
else {
for (useg = pcb->unsent; useg->next != NULL; useg = useg->next); // 找到unsent队列的尾节点(unsent不为空,待发送数据的报文插入unsent末尾)
}
unsent末尾节点报文与queue的第一个节点合并。(TCP_SYN|TCP_FIN相关的报文不能合并,合并的两个报文不能超过一个mss最大报文,合并的两个报文flags要相等;不能合并时,直接将queue插入到unsent末尾即可)
if (useg != NULL &&
TCP_TCPLEN(useg) != 0 &&
!(TCPH_FLAGS(useg->tcphdr) & (TCP_SYN | TCP_FIN)) &&
!(flags & (TCP_SYN | TCP_FIN)) &&
/* fit within max seg size */
(useg->len + queue->len <= pcb->mss) &&
/* only concatenate segments with the same options */
(useg->flags == queue->flags)) {
/* Remove TCP header from first segment of our to-be-queued list */
if(pbuf_header(queue->p, -(TCP_HLEN + optlen))) {
/* Can we cope with this failing? Just assert for now */
LWIP_ASSERT("pbuf_header failed\\n", 0);
TCP_STATS_INC(tcp.err);
goto memerr;
}
if (queue->p->len == 0) {
/* free the first (header-only) pbuf if it is now empty (contained only headers) */
struct pbuf *old_q = queue->p;
queue->p = queue->p->next;
old_q->next = NULL;
queuelen--;
pbuf_free(old_q);
}
LWIP_ASSERT("zero-length pbuf", (queue->p != NULL) && (queue->p->len > 0));
pbuf_cat(useg->p, queue->p);
useg->len += queue->len;
useg->next = queue->next;
LWIP_DEBUGF(TCP_OUTPUT_DEBUG | LWIP_DBG_TRACE | LWIP_DBG_STATE, ("tcp_enqueue: chaining segments, new len %"U16_F"\\n", useg->len));
if (seg == queue) {
seg = useg;
seglen = useg->len;
}
memp_free(MEMP_TCP_SEG, queue);
}
3.3、发送缓存更新
发送缓存、seqno更新。
if ((flags & TCP_SYN) || (flags & TCP_FIN)) { // TCP_SYN|TCP_FIN报文占一个字节
++len;
}
if (flags & TCP_FIN) {
pcb->flags |= TF_FIN;
}
pcb->snd_lbb += len; // 更新下一个缓存报文的seqno
pcb->snd_buf -= len; // 发送缓存大小减去len(非TCP_SYN|TCP_FIN报文,len就是数据的长度,TCP_SYN|TCP_FIN报文,len就需要在数据长度上加1个字节,TCP_SYN|TCP_FIN占一个数据)
3.4、PSH标志设置
应用层已经发送完一次数据,最后一次的sendmsg不带MORE标志,最后一个报文添加PSH标志。(对端收到PSH报文会立即返回,而不等待接收足够数据,PSH就是发送端期望接收端收完这些数据后应该尽快处理,不用等待更多数据,接下来收到的数据与当前没有关联(例如发送端希望发送两个消息“hello”和“world”,语义上“hello”和“world”是独立的,接收端收到“hello”即可返回,下次在接收"world",而不是一次接收“helloworld”);write操作理论上每个数据后面都有设置PSH标志,write没办法设置more参数;ceph异步消息就使用sendmsg发送消息,ceph消息是buffer list结构,而且一个完整的消息不一定在一个buffer list里面,因此buffer list前面的消息都设置一个more标志,消息的最后那部分数据才不设置more标志)
/* Set the PSH flag in the last segment that we enqueued, but only
if the segment has data (indicated by seglen > 0). */
if (seg != NULL && seglen > 0 && seg->tcphdr != NULL && ((apiflags & TCP_WRITE_FLAG_MORE)==0)) {
TCPH_SET_FLAG(seg->tcphdr, TCP_PSH);
}
报文加入未发送队列后,调用tcp_output发送报文。
以上是关于TCP/IP协议 怎么用JAVA发送和接收二进制数据 要具体实例的主要内容,如果未能解决你的问题,请参考以下文章
loadrunner 怎么测试tcp/ip 协议的GPS,是用socket协议录制吗