Java资源网

| JAVA基础 | 环境配置 | JDBC | 线程技术 | Socket编程 | JavaMail | JAVA与XML | 设计模式 | 技术新闻 | Java认证 | 程序人生 软件下载
| JSP&Servlet | Spring | Struts | Hibernate | JBuilder | Eclipse | WebService | EJB技术 | J2ME开发 | 应用服务器 | JXTA | Ajax
Articles search文章搜索
   关键字:
   类 别:
       
New download 最新下载
· [组件]HTML Parser 1.5
· [教程]WebSphere Studio应用教程
· [组件]JDom 1.0
· [工具]Junit3.8.1
· [教程]EJB编程及J2EE系统架构和设计
· [教程]EJB教程
· [教程]J2EE Tutorial中文版
· [教程]Java编程思想2(英文)
· [教程]java编程思想(完整版)
· [教程]Java网络编程
New articles 最新文章
· 设计移动 Web 服务
· 解析XML的时候完全忽略DTD
· 理解XML Schema XML Schema 初步
· 标签库的深入研究
· 提升JSP应用程序的七大绝招
· 如何使用JDOM对XML文件进行操作
· 处理XML字符串中特殊字符
· 利用Digester把XML转换成为Java对象
· 使用WebService 和RMI远程协作
· 使用Axis开发Web Service程序
Articles top 热门文章
· Eclipse基础--plugin插件安装(6644)
· eclipse+tomcat+lomboz的安装配置说明(4774)
· Java程序员就业前景(4584)
· Windows下JAVA环境变量的设置祥解(3788)
· Tomcat下JSP、Servlet和JavaBean环境的配置(3716)
· 使用links方式安装Eclipse插件(3698)
· 一个老程序员的心理话(3533)
· linux下jdk的安装与配置(3459)
· 初学者入门:Structs中基本配置入门(3334)
· Eclipse 运行命令行参数大全(3084)
您的位置:首页>>Socket编程>>Java数据报之失序和丢包
Java数据报之失序和丢包
2005-08-29   来源:CSDN  作者:keepeye
 

习惯了TCP编程,认为UDP可以包办这些问题是错误的。一个UDP应用程序要承担可靠性方面的全部工作,包括报文的丢失、重复、时延、乱序以及连接失效等问题。

通常我们在可靠性好,传输时延小的局域网上开发测试,一些问题不容易暴露,但在大型互联网上却会出现错误。

UDP协议把递送的可靠性责任推到了上层即应用层,下面简单编写了几个类来专门处理两个问题:乱序和丢包。

四个类:DataPacket 类,PacketHeader类,PacketBody类 ,DataEntry类,位于同一个文件DataPacket .java中。

DataPacket 类相当于一个门面模式,提供给外部使用,通信数据也在这个类中处理。

package com.skysoft.pcks;

import java.io.*;
import java.net.*;
import java.util.*;

public class  DataPacket {
  InputStream is;
  OutputStream os;
  PacketHeader header;
  PacketBody body;
  ArrayList al;
  public static final int DataSwapSize = 64532;

  /**
   * 在接收数据报使用
   */
  public DataPacket() {
    header = new PacketHeader();
    body = new PacketBody();
    al = new ArrayList();
  }
  /**
   * 在发送数据报时使用,它调用报文分割操作.
   * @param file String  硬盘文件
   */
  public DataPacket(String file) {
    this();
    try {
      is = new FileInputStream(file);
      header.CalcHeaderInfo(is.available());
      this.madeBody();
      is.close();
      //this.Gereratedata();
    }
    catch (FileNotFoundException ex) {
      ex.printStackTrace();
    }
    catch (IOException ex1) {
      ex1.printStackTrace();
    }
  }
  /**
   * 在发送数据报时使用,它调用报文分割操作.
   * @param url URL url地址
   */
  public DataPacket(URL url) {
    this();
    try {
      //is = url.openStream();
      URLConnection conn=url.openConnection();
      is=conn.getInputStream();
      int total=conn.getContentLength();
      header.CalcHeaderInfo(total);
      this.madeBody();
      //System.out.println(total+":"+total);
      is.close();
    }
    catch (IOException ex) {
      ex.printStackTrace();
    }
  }
  /**
   * 为发送构造分组,使用PackageHeader处理了报头格式,并为分组编序号.
   */
  private void madeBody() {
    al.clear();
    byte[] buffer;
    DataEntry de;
    for (int i = 0; i < header.fragmentcounter; i++) {
      try {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        //is.skip(i * body.BODY_BUFFER_SIZE);
        header.ArrageSort(i);
        de = new DataEntry(PacketBody.BODY_BUFFER_SIZE);
        de.setSn(i);
        de.setStreamsize(header.getStreamsize());
        de.setFragmentcounter(header.getFragmentcounter());
        if (header.isWTailFragment(i)) {
          buffer = new byte[header.getMinfragment()];
          is.read(buffer, 0, buffer.length);
          header.setActByteSize(header.getMinfragment());
          de.setActByteSize(header.getMinfragment());
        }
        else {
          buffer = new byte[body.BODY_BUFFER_SIZE];
          is.read(buffer, 0, buffer.length);
        }
        //System.out.println("length-------"+i+" "+body.getBody().length+" "+header.getMinfragment());
        body.setBody(buffer);
        //System.out.println("length:" + i + " " + header.toString());
        bos.write(header.getByte(), 0, header.HEADER_BUFFER_SIZE);
        bos.write(body.getBody(), 0, body.getBody().length);
        de.setBytes(bos.toByteArray());
        al.add(de);
      }
      catch (IOException ex) {
        ex.printStackTrace();
      }
    }
  }
  /**
   * 为发送构造分组,没有考虑报头格式,也没有为分组编序号.
   */
  private void madeBody1() {
    al.clear();
    for (int i = 0; i < header.fragmentcounter; i++) {
      try {
        if (header.isWTailFragment(i))
          is.read(body.getBody(), i * body.BODY_BUFFER_SIZE,
                  header.getMinfragment());
        else
          is.read(body.getBody(), i * body.BODY_BUFFER_SIZE,
                  body.BODY_BUFFER_SIZE);
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        bos.write(header.getByte(), 0, header.HEADER_BUFFER_SIZE);
        bos.write(body.getBody(), header.HEADER_BUFFER_SIZE,
                  body.getBody().length);
        al.add(bos);
      }
      catch (IOException ex) {
        ex.printStackTrace();
      }
    }
  }
  /**
   * 在接收到报文后,对此报文执行组装,并处理报文丢失和乱序情况.
   * @param b1 byte[]
   */
  public void Add(byte[] b1) {
    byte[] buffer = (byte[]) b1.clone();
    handlerText(buffer);
    DataEntry de = new DataEntry(buffer, header.getActByteSize());
    de.setSn(header.getSn());
    de.setStreamsize(header.getStreamsize());
    de.setFragmentcounter(header.getFragmentcounter());
    al.add(de);
  }
  private void handlerText(byte[] buffer) {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    baos.write(buffer, 0, header.HEADER_BUFFER_SIZE);
    byte[] b=new byte[header.HEADER_BUFFER_SIZE];
    System.arraycopy(buffer,0,b,0,b.length);
    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
    InputStreamReader isr = new InputStreamReader(bais);
    BufferedReader br = new BufferedReader(isr);
    try {
      header = new PacketHeader(br.readLine());
    }
    catch (Exception ex) {
      ex.printStackTrace();
    }
  }
 
  private String calFileSize(int size) {
    return size / 1024 + "K";
  }

  public ArrayList getDataPackets() {
    return al;
  }
/**
 * 是否接收完毕,通过序号是否等于最大段数来判断,这也许有问题,比如,正好是最后一个段丢失了,这样
 * 这个包整个就丢失了.
 * @return
 */
  public boolean isFull() {
    return this.header.getSn() == this.header.getFragmentcounter() - 1 ? true : false;
  }
/**
 * 判断是否只有一个段.
 * @return
 */
  public boolean isZero() {
    return this.header.getSn() == 0 ? true : false;
  }
/**
 * 该函数执行报文组装,不考虑丢失的报文.
 * @return
 */
  private ByteArrayOutputStream fetchDataPackets() {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    byte[] buffer = null;
    DataEntry de;
    for (int i = 0; i < al.size(); i++) {
      try {
        de = this.getSnData(i);
        buffer = de.getByte();
        if (header.getStreamsize() == de.getStreamsize()) {
          bos.write(de.getByte(), header.HEADER_BUFFER_SIZE, de.getActByteSize());
          System.out.println(de.toString() + " -- fetchDataPackets");
        }
      }
      catch (Exception ex) {
        ex.printStackTrace();
      }
    }
    return bos;
  }

  /**
   * 该函数执行报文组装,对于丢失的报文,写入空报文.
   * @return ByteArrayOutputStream
   */
  private ByteArrayOutputStream fetchDataPackets_sn() {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    byte[] buffer;
    DataEntry de;
    for (int i = 0; i < header.getFragmentcounter(); i++) {
      try {
        de = this.getSnData(i);
        if (de == null) {
          de = seachDeData(i);
        }
        buffer = de.getByte();
        //System.out.println(de.getSn() + ":" + i);
        //handlerText(buffer);
        //bos.write(buffer, header.HEADER_BUFFER_SIZE,
        //          buffer.length - header.HEADER_BUFFER_SIZE);
        if (header.getStreamsize() == de.getStreamsize()) {
          bos.write(de.getByte(), header.HEADER_BUFFER_SIZE,
                    de.getActByteSize());
          //System.out.println(de.toString());
        }
      }
      catch (Exception ex) {
        ex.printStackTrace();
      }
    }
    return bos;
  }

  /**
   * 对缓冲的数据包进行排序处理,即按顺序提取同一帧的数据,如果没有找到该序号的帧,则返回空值.
   * @param sn int 要找的帧序号.
   * @return DataEntry
   */
  private DataEntry getSnData(int sn) {
    DataEntry de = null;
    for (int i = 0; i < al.size(); i++) {
      de = (DataEntry) al.get(i);
      if (header.getStreamsize() == de.getStreamsize()) {
        if (sn == de.getSn())
          break;
        else
          de = null;
      }
    }
    return de;
  }

  /**
   * 按序号开始向前或者是向后寻找最近的帧片段,日后可以增加请求重发功能,通过开一个通信连接.
   * @param sn int
   * @return DataEntry
   */
  private DataEntry seachDeData(int sn) {
    DataEntry de = null;
    int initvalue, minvalue = 10000;
    DataEntry back, fore = null;
    for (int i = 0; i < al.size(); i++) {
      de = (DataEntry) al.get(i);
      if (header.getStreamsize() == de.getStreamsize()) {
        initvalue = Math.abs(de.getSn() - sn);
        if (de.getFragmentcounter() != de.getSn() && initvalue < minvalue) {
          minvalue = initvalue;
          fore = de;
        }
      }
    }
    return fore;
  }

  /**
   * 除去最后一帧外,随机抽取一帧.
   * @return DataEntry
   */
  private DataEntry seachDeData() {
    DataEntry de = null;
    for (int i = 0; i < al.size(); i++) {
      de = (DataEntry) al.get(i);
      System.out.println("sky ::::" + de.getFragmentcounter() + ":" + de.getSn() +
                         ":" + i);
      if (header.getStreamsize() == de.getStreamsize()) {
        if (de.getFragmentcounter() != de.getSn()) {
          break;
        }
      }
    }
    return de;
  }
  /**
   * 生成组装完的结果数据.因为用图像来做测试,所以令其返回图像.
   * @return Image
   */
  public java.awt.Image Gereratedata() {
     ByteArrayInputStream bis;
     java.awt.image.BufferedImage bimage = null;
     try {
       byte[] b = fetchDataPackets_sn().toByteArray();
       //fetchDataPackets_old1()
       bis = new ByteArrayInputStream(b);
       bimage = javax.imageio.ImageIO.read(bis);

     }
     catch (Exception ex1) {
       ex1.printStackTrace();
     }
     return bimage;
  }

  public static void main(String args[]) {
    DataPacket dp = new DataPacket("e:\\nature\\14.jpg");
  }
}
/**
 * 数据实体,充当临时处理场所.
 * @author Administrator
 *
 */
class DataEntry {
  byte[] bytes;
  int fragmentcounter, sn, actbytesize;
  long streamsize;
  int minfragment;

  public DataEntry() {

  }

  public DataEntry(int size) {
    this.actbytesize = size;
  }

  public DataEntry(byte[] b, int i) {
    this.bytes = b;
    this.actbytesize = i;
  }

  public byte[] getByte() {
    return this.bytes;
  }

  public void setBytes(byte[] b) {
    this.bytes = b;
  }

  public void setStreamsize(long size) {
    this.streamsize = size;
  }

  public long getStreamsize() {
    return this.streamsize;
  }

  public int getMinfragment() {
    return minfragment;
  }

  public synchronized void setSn(int i) {
    this.sn = i;
  }

  public synchronized int getSn() {
    return sn;
  }

  public synchronized int getFragmentcounter() {
    return fragmentcounter;
  }

  public synchronized void setFragmentcounter(int c) {
    this.fragmentcounter = c;
  }

  public void setActByteSize(int size) {
    actbytesize = size;
  }

  public int getActByteSize() {
    return actbytesize;
  }

  public String toString() {
    return this.streamsize + "::" + this.fragmentcounter + "::" + this.sn +
        "::" + this.actbytesize + " recv DataEntry";
  }
}
/**
 * 报头,处理报头格式
 * @author Administrator
 *
 */
class PacketHeader implements Serializable{
  public static final int HEADER_BUFFER_SIZE = 1024;
  int fragmentcounter, sn;
  int actbytesize = PacketBody.BODY_BUFFER_SIZE;
  byte[] header; //= new byte[HEADER_BUFFER_SIZE];
  long streamsize;
  int minfragment;

  public PacketHeader() {

  }

  public PacketHeader(long l) {
    this.setStreamsize(l);

  }

  public PacketHeader(String s) {
    String[] tm = s.split("::");
    this.setActByteSize(Integer.parseInt(tm[3]));
    this.setSn(Integer.parseInt(tm[2]));
    this.setFragmentcounter(Integer.parseInt(tm[1]));
    this.setStreamsize(Long.parseLong(tm[0]));
  }

  /**
   * 根据文件的段的顺序生成数据头.
   * @param sn 文件序列
   */
  public void ArrageSort(int sn) {
    this.setSn(sn);
    this.setByte();
  }

  public void CalcHeaderInfo(long l) {
    this.setStreamsize(l);
    CalcHeaderInfo();
  }
  /**
   * 计算流要被分成的片段数量,并得出最小片段余量.
   */
  public void CalcHeaderInfo() {
    fragmentcounter = Math.round( (float) streamsize /
                                 PacketBody.BODY_BUFFER_SIZE);
    float critical = (float) streamsize / PacketBody.BODY_BUFFER_SIZE;
    if (critical - fragmentcounter < 0.5 && critical - fragmentcounter > 0)
      fragmentcounter++;
    minfragment = (int) (streamsize % PacketBody.BODY_BUFFER_SIZE);
  }

  public byte[] getHeader() {
    Long it = new Long(this.streamsize);
    return new byte[] {it.byteValue()};
  }

  public byte[] getByte() {
    return header; //this.toString().getBytes();
  }
  /**
   * 生成报头字节,首先取得数据包头 流尺寸::段片数::段顺序::段实际尺寸 的字节形式,
   * 然后加入回车换行符号,对于1024字节中剩余的部分一律写入元素为0的字节数组.
   */
  public void setByte() {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    byte[] buffer = this.toByte();
    try {
      bos.write(buffer);
      bos.write("\r\n".getBytes());
      bos.write(new byte[PacketHeader.HEADER_BUFFER_SIZE - buffer.length], 0,
                PacketHeader.HEADER_BUFFER_SIZE - buffer.length);
      header = bos.toByteArray();
    }
    catch (IOException ex) {
      ex.printStackTrace();
    }
  }

  public void setStreamsize(long size) {
    this.streamsize = size;
  }

  public long getStreamsize() {
    return this.streamsize;
  }

  public int getMinfragment() {
    return minfragment;
  }

  public synchronized void setSn(int i) {
    this.sn = i;
  }

  public int getSn() {
    return sn;
  }

  public int getFragmentcounter() {
    return fragmentcounter;
  }

  public synchronized void setFragmentcounter(int c) {
    this.fragmentcounter = c;
  }

  public void setActByteSize(int size) {
    actbytesize = size;
    setByte();
  }

  public int getActByteSize() {
    return actbytesize;
  }
  /**
   * 数据包头的格式为:流尺寸::段片数::段顺序::段实际尺寸
   * 报头字节长度是可变化的,比如,可以加入流的具体信息如:流所属文件的名称,文件类型以及一些其他信息.
   * @return String
   */
  public String toString() {
    return streamsize + "::" + this.fragmentcounter + "::" + this.getSn() +
        "::" + this.getActByteSize();
  }

  public byte[] toByte() {
    return this.toString().getBytes();
  }
  /**
   * 是否为尾段
   * @param i int
   * @return boolean
   */
  public boolean isWTailFragment(int i) {
    return (i == fragmentcounter - 1) ? true : false;
  }

}
/**
 * 用户数据区
 * @author Administrator
 *
 */
class PacketBody implements Serializable{
  public static final int BODY_BUFFER_SIZE = 63508; //65508
  byte[] body;

  public PacketBody() {
  }

  public void setBody(byte[] b) {
    this.body = b;
  }

  public byte[] getBody() {
    return body;
  }
}

这个数据处理类,将在接下来使用。

  --相关文章--
· 调整nbsp;Javanbsp;I Onbsp;性能 (2007-04-13)
· 用Java实现FTP服务器 (2007-04-13)
· 探讨对象次第读写(Serialization) (2007-04-13)
· 彻底明白Java的IO系统 (2007-04-13)
· 使用缓冲IO提高性能 (2007-04-13)
· 两远程客户端之间的通讯原理 (2007-04-13)

版权所有©2005-2006 JAVA资源网 渝ICP备05007591号 虚拟主机 | 关于我们 | 联系方式 | 广告业务 | 网站地图 | 友情链接