1. BufferedInputStream的基本原理
BuffredInputStream存在的意义在于其提供了一个内部缓冲区,当read的的时候先一次性的把byte读取到内部的缓冲区,以后每次调用read(byte[])实际是从缓冲区 copy数据。缓冲区的大小可以通过BufferedInputStream(InputStream in, int size)构造函数设置,默认大小8192。
2.BufferedInputStream的实现
注意read方法是同步的:
public synchronized int read(byte b[], int off, int len)throws IOException{getBufIfOpen(); // Check for closed streamif ((off | len | (off + len) | (b.length - (off + len))) < 0) {throw new IndexOutOfBoundsException();} else if (len == 0) {return 0;}int n = 0;for (;;) {int nread = read1(b, off + n, len - n);if (nread <= 0) return (n == 0) ? nread : n;n += nread;if (n >= len)return n;// if not closed but no bytes available, returnInputStream input = in;if (input != null && input.available() <= 0)return n;}}
实现都委托给里read1()
private int read1(byte[] b, int off, int len) throws IOException {int avail = count - pos;if (avail <= 0) {/* If the requested length is at least as large as the buffer, andif there is no mark/reset activity, do not bother to copy thebytes into the local buffer. In this way buffered streams willcascade harmlessly. */if (len >= getBufIfOpen().length && markpos < 0) {return getInIfOpen().read(b, off, len);}fill();avail = count - pos;if (avail <= 0) return -1;}int cnt = (avail < len) ? avail : len;System.arraycopy(getBufIfOpen(), pos, b, off, cnt);pos += cnt;return cnt;}
fill:
/*** Fills the buffer with more data, taking into account* shuffling and other tricks for dealing with marks.* Assumes that it is being called by a synchronized method.* This method also assumes that all data has already been read in,* hence pos > count.*/private void fill() throws IOException {byte[] buffer = getBufIfOpen();if (markpos < 0)pos = 0; /* no mark: throw away the buffer */else if (pos >= buffer.length) /* no room left in buffer */if (markpos > 0) { /* can throw away early part of the buffer */int sz = pos - markpos;System.arraycopy(buffer, markpos, buffer, 0, sz);pos = sz;markpos = 0;} else if (buffer.length >= marklimit) {markpos = -1; /* buffer got too big, invalidate mark */pos = 0; /* drop buffer contents */} else { /* grow buffer */int nsz = pos * 2;if (nsz > marklimit)nsz = marklimit;byte nbuf[] = new byte[nsz];System.arraycopy(buffer, 0, nbuf, 0, pos);if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {// Can't replace buf if there was an async close.// Note: This would need to be changed if fill()// is ever made accessible to multiple threads.// But for now, the only way CAS can fail is via close.// assert buf == null;throw new IOException("Stream closed");}buffer = nbuf;}count = pos;int n = getInIfOpen().read(buffer, pos, buffer.length - pos);if (n > 0)count = n + pos;}
3.BufferedInputStream对buffer的并发保护
buffreedInputStream是有状态的,需要考虑线程安全的问题,那么来看看它是如何保护buffer。首先
buf被定义为volatile,并且通过AtomicReferenceFieldUpdater来保护。
/*** The internal buffer array where the data is stored. When necessary,* it may be replaced by another array of* a different size.*/protected volatile byte buf[];/*** Atomic updater to provide compareAndSet for buf. This is* necessary because closes can be asynchronous. We use nullness* of buf[] as primary indicator that this stream is closed. (The* "in" field is also nulled out on close.)*/private static final AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater = AtomicReferenceFieldUpdater.newUpdater(BufferedInputStream.class, byte[].class, "buf");
AtomicReferenceFieldUpdater基于反射的实用工具,可以对指定类的指定
volatile
字段进行原子更新。用法: