当前位置: 代码迷 >> 综合 >> BufferedInputStream 深入研究。
  详细解决方案

BufferedInputStream 深入研究。

热度:21   发布时间:2023-12-21 16:41:53.0

1. BufferedInputStream的基本原理

BuffredInputStream存在的意义在于其提供了一个内部缓冲区,当read的的时候先一次性的把byte读取到内部的缓冲区,以后每次调用read(byte[])实际是从缓冲区 copy数据。缓冲区的大小可以通过BufferedInputStream(InputStream in, int size)构造函数设置,默认大小8192。

 

2.BufferedInputStream的实现

 

注意read方法是同步的:

  public synchronized int read(byte b[], int off, int len)throws IOException{getBufIfOpen(); // Check for closed streamif ((off | len | (off + len) | (b.length - (off + len))) < 0) {throw new IndexOutOfBoundsException();} else if (len == 0) {return 0;}int n = 0;for (;;) {int nread = read1(b, off + n, len - n);if (nread <= 0) return (n == 0) ? nread : n;n += nread;if (n >= len)return n;// if not closed but no bytes available, returnInputStream input = in;if (input != null && input.available() <= 0)return n;}}

 实现都委托给里read1()

   private int read1(byte[] b, int off, int len) throws IOException {int avail = count - pos;if (avail <= 0) {/* If the requested length is at least as large as the buffer, andif there is no mark/reset activity, do not bother to copy thebytes into the local buffer.  In this way buffered streams willcascade harmlessly. */if (len >= getBufIfOpen().length && markpos < 0) {return getInIfOpen().read(b, off, len);}fill();avail = count - pos;if (avail <= 0) return -1;}int cnt = (avail < len) ? avail : len;System.arraycopy(getBufIfOpen(), pos, b, off, cnt);pos += cnt;return cnt;}

 fill:

/*** Fills the buffer with more data, taking into account* shuffling and other tricks for dealing with marks.* Assumes that it is being called by a synchronized method.* This method also assumes that all data has already been read in,* hence pos > count.*/private void fill() throws IOException {byte[] buffer = getBufIfOpen();if (markpos < 0)pos = 0;		/* no mark: throw away the buffer */else if (pos >= buffer.length)	/* no room left in buffer */if (markpos > 0) {	/* can throw away early part of the buffer */int sz = pos - markpos;System.arraycopy(buffer, markpos, buffer, 0, sz);pos = sz;markpos = 0;} else if (buffer.length >= marklimit) {markpos = -1;	/* buffer got too big, invalidate mark */pos = 0;	/* drop buffer contents */} else {		/* grow buffer */int nsz = pos * 2;if (nsz > marklimit)nsz = marklimit;byte nbuf[] = new byte[nsz];System.arraycopy(buffer, 0, nbuf, 0, pos);if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {// Can't replace buf if there was an async close.// Note: This would need to be changed if fill()// is ever made accessible to multiple threads.// But for now, the only way CAS can fail is via close.// assert buf == null;throw new IOException("Stream closed");}buffer = nbuf;}count = pos;int n = getInIfOpen().read(buffer, pos, buffer.length - pos);if (n > 0)count = n + pos;}

 3.BufferedInputStream对buffer的并发保护

 

buffreedInputStream是有状态的,需要考虑线程安全的问题,那么来看看它是如何保护buffer。首先

buf被定义为volatile,并且通过AtomicReferenceFieldUpdater来保护。

 

    /*** The internal buffer array where the data is stored. When necessary,* it may be replaced by another array of* a different size.*/protected volatile byte buf[];/*** Atomic updater to provide compareAndSet for buf. This is* necessary because closes can be asynchronous. We use nullness* of buf[] as primary indicator that this stream is closed. (The* "in" field is also nulled out on close.)*/private static final AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater = AtomicReferenceFieldUpdater.newUpdater(BufferedInputStream.class,  byte[].class, "buf");
 AtomicReferenceFieldUpdater基于反射的实用工具,可以对指定类的指定 volatile 字段进行原子更新。用法:

 

fill的时候:
System.arraycopy(buffer, 0, nbuf, 0, pos);
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {// Can't replace buf if there was an async close.// Note: This would need to be changed if fill()// is ever made accessible to multiple threads.// But for now, the only way CAS can fail is via close.// assert buf == null;throw new IOException("Stream closed");
}
close()的时候:
    public void close() throws IOException {byte[] buffer;while ( (buffer = buf) != null) {if (bufUpdater.compareAndSet(this, buffer, null)) {InputStream input = in;in = null;if (input != null)input.close();return;}// Else retry in case a new buf was CASed in fill()}}
4.BufferedInputStream使用的注意点
这段代码有一隐含的bug,也是使用inputStream读取容易出错的地方。
	String str = "hellohellohelloh123";ByteArrayInputStream input = new ByteArrayInputStream(str.getBytes());BufferedInputStream buffer = new BufferedInputStream(input);ByteArrayOutputStream out = new ByteArrayOutputStream(8192);byte[] chunk = new byte[3];int readByte = -1;while ((readByte = buffer.read(chunk)) != -1) {out.write(chunk);}System.out.println(str);System.out.println(out.toString());
 这段代码的输出结果:
hellohellohelloh123
hellohellohelloh12312
如果在while中加入System.out.println(readByte + ":" + new String(chunk));可以打印出:
3:hel
3:loh
3:ell
3:ohe
3:llo
3:h12
1:312
 最后一次read只读了1个byte,所以out.write(chunk)应该修改为
out.write(chunk, 0, readByte);