Android 一个异步SocketHelper
Posted 星辰
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Android 一个异步SocketHelper相关的知识,希望对你有一定的参考价值。
发送流程:首先定义一个缓冲池,发送数据时仅仅是将待发送的数据加入到缓冲池中,再由后台的工作线程从缓冲池中取得待发送数据进行发送。可能某些情况下在数据发送完成时需要做一些处理(比如写日志),便定义了一个发送完成监听,在数据发送完成时触发此事件。
接收流程:同样定义了一个接收缓冲池,由接收数据线程将接收到的数据加入到接收缓冲池中(由于这个SocketHelper只是为JT/T 794服务,因此分包逻辑我直接加入到了接收线程中),为防止阻塞接收数据线程,并未在接收线程中触发完成接收事件,而是移到了检查线程中触发。
package com.van.dev; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; import java.util.HashMap; import android.os.AsyncTask; import android.util.Log; import com.van.jt794.MyBytesList; public class SocketHelper { /** * @ClassName: SendDataModel * @Description: TODO(发送数据实体) * @author WANJJ * @date 2011-11-17 下午02:24:27 * */ class SendDataModel { public SendDataModel(byte[] bts, MySendListener sendEndListener, ActiveSendData data) { this.bts = bts; this.data = data; this.sendEndListener = sendEndListener; } byte[] bts; ActiveSendData data; MySendListener sendEndListener; public void OnMyAction(Exception ex) { if (sendEndListener != null) sendEndListener.EndSend(data, ex); } } /** * @ClassName: ReConnIn * @Description: TODO(重连数据监听接口) * @author WANJJ * @date 2011-11-17 下午02:24:47 * */ public interface ReConnIn { void ReConned(); } /** * @ClassName: MySendListener * @Description: TODO(发送数据监听接口) * @author WANJJ * @date 2011-11-17 下午02:26:21 * */ public interface MySendListener extends java.util.EventListener { void EndSend(ActiveSendData sendData, Exception ex); } /** * @ClassName: MyReceiveListener * @Description: TODO(接收数据监听接口) * @author WANJJ * @date 2011-11-17 下午02:26:42 * */ public interface MyReceiveListener extends java.util.EventListener { void EndReceive(byte[] bts, Exception ex); } /** * @ClassName: SendThread * @Description: TODO(发送线程) * @author WANJJ * @date 2011-11-17 下午02:22:40 * */ class SendThread extends AsyncTask<Integer, Void, Void> { /** * <p> * Title: doInBackground * </p> * <p> * Description: * </p> * * @param params * @return * @see android.os.AsyncTask#doInBackground(Params[]) */ @Override protected Void doInBackground(Integer... params) { int nowId = params[0]; while (RunId == nowId) { if (ConnedState > 1 && sBuff.size() > 0) { SendDataModel send = sBuff.get(0); try { out.write(send.bts); sBuff.remove(send); send.OnMyAction(null); continue; } catch (IOException e) { e.printStackTrace(); send.OnMyAction(e); } } mySleep(1000); } return null; } } /** * @ClassName: RecThread * @Description: TODO(接受数据线程) * @author WANJJ * @date 2011-11-17 下午02:22:56 * */ class RecThread extends AsyncTask<Integer, Void, Void> { /** * <p> * Title: doInBackground * </p> * <p> * Description: * </p> * * @param params * @return * @see android.os.AsyncTask#doInBackground(Params[]) */ @Override protected Void doInBackground(Integer... params) { int nowId = params[0]; int dlen = 0; byte[] tmp = new byte[50000]; while (RunId == nowId) { if (ConnedState > 1) { try { dlen = in.read(tmp); if (dlen == -1) { changeState(0); } else if (dlen > 0) { int i = 0; boolean isend;// 当前为结束标记位 boolean flag7d;// 上一个字节是否为0x7d MyBytesList lst; /* * 问题描述:上传数据频率过快时,接收数据时可能断在某包数据内,这样造成了被断数据包无法解析。 * 解决方法: 1.定义一个全局变量PGps记录下上次未解析数据; * 2.解析完后判断出现7E的是否为结束符 * ,如果为结束符则将PGps赋值为NULL,反之记录下未解析部分数据 * 3.开始解析时判断首字节是否为7E,如果为7E则不处理断包问题,反之继续解析 */ if (PGps != null && ((dlen > 0 && tmp[0] != 0x7e)// 第一个字符不为7E || (dlen > 1 && tmp[0] == 0x7e && tmp[1] == 0x7e) // 首字母为7E接下来一个也为7E ))// 处理接收断包问题 { lst = PGps; isend = false; flag7d = PGps.get(PGps.size() - 1) == 0x7d; } else { lst = new MyBytesList(); isend = true; flag7d = false; } while (i < dlen) { if (tmp[i] == 0x7e)// 开始结束标记 { isend = !isend; if (isend)// 结束标记时解析数据 { rBuff.add(lst); } else { // 清空暂存区 lst.clear(); } } else if (flag7d)// 转义还原 { if (tmp[i] == 0x01)// 0x7d 0x01 => 0x7d lst.set(lst.size() - 1, (byte) 0x7d); else if (tmp[i] == 0x02)// 0x7d 0x02 => 0x7e lst.set(lst.size() - 1, (byte) 0x7e); else// 出错 { StringBuilder sb = new StringBuilder(); for (Byte byte1 : tmp) { sb.append(Integer .toHexString(byte1)); } Log.d("HVT300", sb.toString()); break; } } else lst.add(tmp[i]); flag7d = tmp[i] == 0x7d; i++; } PGps = isend ? null : lst;// 处理接收断包问题 } } catch (IOException e) { e.printStackTrace(); } } else { mySleep(1000); } } return null; } } /** * @ClassName: checkThread * @Description: TODO(检查线程) * @author WANJJ * @date 2011-11-17 下午02:23:28 * */ class checkThread extends AsyncTask<Integer, Void, Void> { /** * <p> * Title: doInBackground * </p> * <p> * Description: * </p> * * @param params * @return * @see android.os.AsyncTask#doInBackground(Params[]) */ @Override protected Void doInBackground(Integer... params) { int nowId = params[0]; while (RunId == nowId) { if (ConnedState == 0) { disConn(); conn(); } else if (receiveListener != null) { MyBytesList data = getData(); if (data != null) receiveListener.EndReceive(data.toBytes(), null); } mySleep(1000); } return null; } } /** * @Fields receiveListener : TODO(接收数据监听) */ private MyReceiveListener receiveListener; private Socket socket; private OutputStream out; private InputStream in; private InetSocketAddress isa = null; /** * @Fields HOST : TODO(服务器地址 支持域名) */ private String HOST; /** * @Fields PORT : TODO(服务器端口) */ private int PORT ; /** * @Fields TIMEOUT : TODO(连接超时时间) */ public int TIMEOUT = 5000; /** * @Fields 0 未连接 <br/> * 1 已连接初始化中<br/> * 2 已连接初始化完成 */ private int ConnedState = 0; /** * @Fields RunId : TODO(线程执行ID) */ private int RunId = 0; /** * @Fields AllSendData : TODO(主动发送的数据) */ HashMap<Integer, ActiveSendData> AllSendData = new HashMap<Integer, ActiveSendData>(); /** * @Fields sBuff : TODO(发送缓存数据) */ private ArrayList<SendDataModel> sBuff = new ArrayList<SendDataModel>(); /** * @Fields rBuff : TODO(接受缓存数据) */ private ArrayList<MyBytesList> rBuff = new ArrayList<MyBytesList>(); /** * @Fields PGps : TODO(未解析数据) */ private MyBytesList PGps; /** * @Fields reConn : TODO(重连监听) */ ReConnIn reConn; /** * <p>Title: </p> * <p>Description: </p> * @param HOST 服务器地址(支持域名) * @param PORT 服务器端口 */ public SocketHelper(String HOST, int PORT) { this.HOST = HOST; this.PORT = PORT; } /** * @Title: getConnedState * @Description: TODO(获取连接状态) * * @return 0 未连接 <br/> * 1 已连接初始化中<br/> * 2 已连接初始化完成 * * @version: v1.0.0 * @author: WANJJ * @date: 2011-11-12 * * Modification History: Date Author Version Description * ---------------------------------------------------------* * 2011-11-12 wanjj v1.0.0 修改原因 */ public int getConnedState() { return ConnedState; } /** * @Title: setReceiveListener * @Description: TODO(设置接收数据监听) * * @param receiveListener * @return:void * * @version: v1.0.0 * @author: WANJJ * @date: 2011-11-17 * * Modification History: * Date Author Version Description * ---------------------------------------------------------* * 2011-11-17 wanjj v1.0.0 修改原因 */ public void setReceiveListener(MyReceiveListener receiveListener) { this.receiveListener = receiveListener; } public void start() { isa = new InetSocketAddress(HOST, PORT); conn(); new SendThread().execute(RunId); new RecThread().execute(RunId); new checkThread().execute(RunId); } public void stop() { RunId++; disConn(); } /** * @Title: conn * @Description: TODO(连接服务器) * * @return:void * * @version: v1.0.0 * @author: WANJJ * @date: 2011-11-9 * * Modification History: Date Author Version Description * ---------------------------------------------------------* * 2011-11-9 wanjj v1.0.0 修改原因 */ void conn() { try { socket = null; socket = new Socket(); socket.setReuseAddress(true); socket.connect(isa, TIMEOUT); in = socket.getInputStream(); out = socket.getOutputStream(); changeState(1); if (reConn != null) reConn.ReConned(); changeState(2); } catch (IOException e) { e.printStackTrace(); changeState(0); } } /** * @Title: disConn * @Description: TODO(关闭连接) * * @return:void * * @version: v1.0.0 * @author: WANJJ * @date: 2011-11-9 * * Modification History: Date Author Version Description * ---------------------------------------------------------* * 2011-11-9 wanjj v1.0.0 修改原因 */ boolean disConn() { boolean flag = true; try { if (socket != null) { socket.shutdownInput(); socket.shutdownOutput(); try { in.close(); out.close(); } catch (IOException e) { } // 关闭socket socket.close(); } } catch (IOException e) { e.printStackTrace(); flag = false; } changeState(0); return flag; } void changeState(int state) { if (ConnedState != state) { ConnedState = state; // RunId++; } } /** * @Title: beginSend * @Description: TODO(开始异步发送数据) * * @param bts * @param sendEndListener * @param data * @return:void * * @version: v1.0.0 * @author: WANJJ * @date: 2011-11-11 * * Modification History: Date Author Version Description * ---------------------------------------------------------* * 2011-11-11 wanjj v1.0.0 修改原因 */ public void beginSend(byte[] bts, MySendListener sendEndListener, ActiveSendData data) { sBuff.add(new SendDataModel(bts, sendEndListener, data)); } /** * @Title: SendData * @Description: TODO(同步发送数据) * * @param bts * @return * @return:boolean * * @version: v1.0.0 * @author: WANJJ * @date: 2011-11-11 * * Modification History: Date Author Version Description * ---------------------------------------------------------* * 2011-11-11 wanjj v1.0.0 修改原因 */ public boolean SendData(byte[] bts) { if (ConnedState > 0) { try { out.write(bts); return true; } catch (IOException e) { e.printStackTrace(); } } return false; } /** * @Title: getData * @Description: TODO(获取第一包数据) * * @return * @return:ArrayList<Byte> * * @version: v1.0.0 * @author: WANJJ * @date: 2011-11-9 * * Modification History: Date Author Version Description * ---------------------------------------------------------* * 2011-11-9 wanjj v1.0.0 修改原因 */ public MyBytesList getData() { if (rBuff.size() > 0) { MyBytesList tmp = rBuff.get(0); rBuff.remove(0); return tmp; } else { return null; } } /** * @Title: mySleep * @Description: TODO(当前执行线程暂停一段时间) * * @param time * @return:void * * @version: v1.0.0 * @author: WANJJ * @date: 2011-11-17 * * Modification History: * Date Author Version Description * ---------------------------------------------------------* * 2011-11-17 wanjj v1.0.0 修改原因 */ public static void mySleep(long time) { try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }
以上是关于Android 一个异步SocketHelper的主要内容,如果未能解决你的问题,请参考以下文章
我的Android进阶之旅关于Android使用bindService()绑定服务,onServiceConnected()方法是异步回调的问题以及借鉴NotificationManager来优化(代