package ringBuffer;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.LinkedHashMap;
import java.util.Map;
public class MsgWriter {
/**
* @param args
*/
public static void main(String[] args) {
long start = System.currentTimeMillis() ;
MsgWriter mc = new MsgWriter() ;
Consumer cr = mc.new Consumer() ;
cr.start() ;
for (Integer j = 0; j < 5; j++) {
LinkedHashMap<Integer, String> msgs = new LinkedHashMap<Integer, String>() ;
for (Integer i = 0; i < 100000; i++) {
msgs.put(i,j.toString()) ;
}
cr.addToMsgBuf(msgs);
}
cr.interrupt() ;
System.out.println(" last : "+ ( System.currentTimeMillis() -start));
}
class Consumer extends Thread {
int bufferSize = 2 * 1024 * 1024;
private byte[] buffer = new byte[bufferSize];
private Integer pos = 0;
private Integer prePos = 0;
String outputFile = "F:\\test\\ringBuf.txt";
private int totalLength = 0;
RandomAccessFile raf;
LinkedHashMap<Integer, String> cMsgs = new LinkedHashMap<Integer, String>() ; ;
public Consumer(){
try {
raf = new RandomAccessFile(outputFile, "rw");
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (cMsgs){
while (cMsgs.isEmpty()) {
cMsgs.wait();
}
StringBuffer sb = new StringBuffer();
for (Map.Entry<Integer, String> v : cMsgs.entrySet()) {
sb.append(v.getKey()).append(v.getValue()).append("V") ;
}
sb.append("S");
byte[] buf = sb.toString().getBytes();
int length = buf.length ;
pos = prePos + length ;
int bufPos = 0;
if (pos > bufferSize && prePos < bufferSize) {
bufPos = bufferSize - prePos;
System.arraycopy(buf, 0, buffer, prePos, bufPos); // 写满剩余的buf
prePos = pos = 0;
long fileLength = raf.length();
raf.seek(fileLength);
raf.write(buffer);
}else
System.arraycopy(buf, 0, buffer, prePos, length); // 写满剩余的buf
if(bufPos==0){ //说明整段buf被复制
prePos = pos;
}else{ //此时 缓冲区应清空,存入后续buf
System.arraycopy(buf, bufPos, buffer, prePos, length-bufPos); // 写满剩余的buf
prePos = prePos + length-bufPos ;
pos =prePos ;
}
cMsgs.clear() ;
}
}
long fileLength = raf.length(); // 文件长度,字节数
raf.seek(fileLength); // 将写文件指针移到文件尾
long start = System.currentTimeMillis() ;
raf.write(buffer, 0, prePos); //溢写剩余数据
raf.close();
}catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void addToMsgBuf(LinkedHashMap<Integer, String> currentStepMsgs) {
synchronized (cMsgs) {
cMsgs.putAll(currentStepMsgs);
cMsgs.notifyAll();
}
}
}
}
package ringBuffer;
import java.io.IOException;
import java.io.RandomAccessFile;
public class MsgWriter2 {
public static void main(String[] args) {
long start = System.currentTimeMillis() ;
MsgWriter2 mw = new MsgWriter2() ;
Consumer cr = mw.new Consumer() ;
cr.start() ;
for (Integer j = 0; j < 5; j++) {
StringBuffer sb = new StringBuffer();
for (Integer i = 0; i < 100000; i++) {
sb.append(i.toString()+ j.toString() + "V");
}
sb.append("S");
byte[] msgs = sb.toString().getBytes();
System.out.println(msgs.length);
cr.addToMsgBuf(msgs);
}
cr.interrupt() ;
System.out.println(" last : "+ ( System.currentTimeMillis() -start));
}
class Consumer extends Thread {
int bufferSize = 2 * 1024 * 1024;
private byte[] buffer = new byte[bufferSize];
private Integer pos = 0;
private Integer prePos = 0;
String outputFile = "F:\\test\\ringBuf.txt";
private int totalLength = 0;
RandomAccessFile raf;
public Consumer(){
try {
raf = new RandomAccessFile(outputFile, "rw");
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (buffer) {
if (pos > bufferSize ) {
totalLength = totalLength + prePos ;
System.out.println("write Begin Time: " + System.currentTimeMillis() + " bytes:" + totalLength);
long fileLength = raf.length(); // 文件长度,字节数
raf.seek(fileLength); // 将写文件指针移到文件尾
raf.write(buffer) ;
System.out.println("write End Time: " + System.currentTimeMillis());
prePos = prePos % bufferSize ;
pos = prePos; //理论上的环形缓冲实际应把后面剩下的写满
buffer.notifyAll() ; //清空buffer后通知等待的线程
}
}
}
totalLength = totalLength + prePos ;
long fileLength = raf.length(); // 文件长度,字节数
raf.seek(fileLength); // 将写文件指针移到文件尾
System.out.println("write Begin Time: " + System.currentTimeMillis()) ;
raf.write(buffer, 0, prePos) ;
System.out.println("write End Time: " + System.currentTimeMillis());
raf.close() ;
} catch (IOException e) {
e.printStackTrace();
}
}
public void addToMsgBuf(byte[] buf) {
if (buf == null || buf.length == 0)
return;
try {
synchronized (buffer) {
pos = prePos + buf.length ;
int bufPos = 0 ;
while (pos > bufferSize && prePos<bufferSize) {
bufPos = bufferSize-prePos ;
System.arraycopy(buf, 0, buffer, prePos, bufPos); //写满剩余的buf
prePos = bufferSize;
System.out.println("Wait Until there is enough space! ");
buffer.wait();
}
System.arraycopy(buf,bufPos, buffer, prePos, buf.length - bufPos);
if(pos==0&&pos==prePos){
pos = prePos + buf.length ;
}
prePos = pos ;
buffer.notifyAll() ;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
以上两段代码,实现的功能相同,但是性能差异很大,理论上说,MsgWriter把这个步骤:
StringBuffer sb = new StringBuffer();
for (Map.Entry<Integer, String> v : cMsgs.entrySet()) {
sb.append(v.getKey()).append(v.getValue()).append("V") ;
}
sb.append("S");
放到另外一个并行线程中,应该更快才对,但是明显结果是MsgWriter慢,且内存消耗大(这个很容易理解),但是性能上差异很大,是内存的原因么?MsgWriter2?无论是时间还是空间都比MsgWriter2好,为啥呢? 求大牛指点下
------解决思路----------------------
http://blog.csdn.net/it_man/article/details/8225477