最近项目中需要从网站上抓取大量的数据,采用了多线程技术,每个线程抓取的数据都需要保存到一个文件中,避免消耗大量的内存。
思路:多个访问线程将需要写入到文件中的数据先保存到一个队列里面,然后由专门的 写出线程负责从队列中取出数据并写入到文件中。
WriterQueue.Java 存放要输出的数据队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| package com.yulore.write;
import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
public class WriterQueue {
private static final int MAX_QUEUE_SIZE = 5000; private LinkedList<String> queue = new LinkedList<String>(); private Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition();
private static WriterQueue manager = new WriterQueue();
private WriterQueue(){
}
public static WriterQueue getQueue(){
return manager; }
public void put(String phone){
lock.lock();
try { while (queue.size() == MAX_QUEUE_SIZE) { System.out.println("warning: data queue is full!"); notFull.await(); }
queue.addFirst(phone);
notEmpty.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally{ lock.unlock(); } }
public LinkedList<String> takeAll(){
LinkedList<String> retVal = new LinkedList<String>();
lock.lock();
try { while (queue.size() == 0) { System.out.println("warning: data queue is empty!"); notEmpty.await(); }
retVal.addAll(queue);
queue.clear();
notFull.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally{ lock.unlock(); } return retVal; } }
|
WriteTask_New.java 模拟产生数据的线程类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package com.yulore.write;
public class WriteTask_New implements Runnable {
@Override public void run() { for(int i=0;i<20;i++){
WriterQueue.getQueue().put("for:"+i+" thread:"+Thread.currentThread().getName()); } }
private void sleep(int millis) throws InterruptedException { Thread.sleep(millis); }
}
|
OutputTask.java 负责将数据写入到文件中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| package com.yulore.write;
import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.LinkedList;
public class OutputTask implements Runnable { private String fileName;
public OutputTask(String fileName) { this.fileName = fileName; } @Override public void run() {
while(true){ try { sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } LinkedList<String> list = WriterQueue.getQueue().takeAll(); write2Disk(list); list = null; } }
private void write2Disk(LinkedList<String> list) {
if(list==null ||list.size()==0){ System.out.println("no data..."); return; }
System.out.println("开始序列化数据 "+fileName);
String path = "D:/fbb/myWorkSpace_DW07/"; File outputFile = new File(path+fileName);
if(outputFile==null ||!outputFile.exists()){ try { outputFile.createNewFile(); } catch (IOException e) { e.printStackTrace(); } }
FileOutputStream out = null; OutputStreamWriter writer = null; BufferedWriter bw = null;
try { out = new FileOutputStream(outputFile, true); writer = new OutputStreamWriter(out); bw = new BufferedWriter(writer);
for(String content : list){ bw.write(content); bw.newLine(); bw.flush(); }
} catch (IOException e) { e.printStackTrace(); }finally{ try { if(bw!=null) bw.close(); } catch (IOException e) { e.printStackTrace(); } } }
private void sleep(int millis) throws InterruptedException { Thread.sleep(millis); }
}
|
测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.yulore.write;
public class TestWrite {
public static void main(String[] args) {
test02(); }
private static void test02() { WriteTask_New write = new WriteTask_New(); for(int i=0;i<4;i++){ new Thread(write).start(); }
OutputTask output = new OutputTask("abc.txt"); new Thread(output).start(); }
private static void test() { WriteTask write = new WriteTask("abc.txt"); for(int i=0;i<5;i++){ new Thread(write).start(); } } }
|
来源: http://blog.csdn.net/top_code/article/details/8896047