fzy-blog

Java多线程写同一个文件实现

2019-05-24

最近项目中需要从网站上抓取大量的数据,采用了多线程技术,每个线程抓取的数据都需要保存到一个文件中,避免消耗大量的内存。

思路:多个访问线程将需要写入到文件中的数据先保存到一个队列里面,然后由专门的 写出线程负责从队列中取出数据并写入到文件中。

WriterQueue.Java 存放要输出的数据队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.yulore.write;

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class WriterQueue {

private static final int MAX_QUEUE_SIZE = 5000;
private LinkedList<String> queue = new LinkedList<String>();
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();

private static WriterQueue manager = new WriterQueue();

private WriterQueue(){

}

public static WriterQueue getQueue(){

return manager;
}

public void put(String phone){

lock.lock();

try {
while (queue.size() == MAX_QUEUE_SIZE) {
System.out.println("warning: data queue is full!");
notFull.await();
}

queue.addFirst(phone);

notEmpty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
lock.unlock();
}
}

public LinkedList<String> takeAll(){

LinkedList<String> retVal = new LinkedList<String>();

lock.lock();

try {
while (queue.size() == 0) {
System.out.println("warning: data queue is empty!");
notEmpty.await();
}

retVal.addAll(queue);
// for(String str : queue){
// retVal.add(str);
// }
//清空队列
queue.clear();

notFull.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
lock.unlock();
}
return retVal;
}
}

WriteTask_New.java 模拟产生数据的线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.yulore.write;

public class WriteTask_New implements Runnable {


@Override
public void run() {
for(int i=0;i<20;i++){
// try {
// sleep(100);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
WriterQueue.getQueue().put("for:"+i+" thread:"+Thread.currentThread().getName());
}
}

private void sleep(int millis) throws InterruptedException {
Thread.sleep(millis);
}

}

OutputTask.java 负责将数据写入到文件中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.yulore.write;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.LinkedList;

public class OutputTask implements Runnable {
private String fileName;

public OutputTask(String fileName) {
this.fileName = fileName;
}
@Override
public void run() {

while(true){
try {
sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LinkedList<String> list = WriterQueue.getQueue().takeAll();
write2Disk(list);
list = null;
}
}

private void write2Disk(LinkedList<String> list) {

if(list==null ||list.size()==0){
System.out.println("no data...");
return;
}

System.out.println("开始序列化数据 "+fileName);

String path = "D:/fbb/myWorkSpace_DW07/";
File outputFile = new File(path+fileName);

if(outputFile==null ||!outputFile.exists()){
try {
outputFile.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}

FileOutputStream out = null;
OutputStreamWriter writer = null;
BufferedWriter bw = null;

try {
out = new FileOutputStream(outputFile, true);
writer = new OutputStreamWriter(out);
bw = new BufferedWriter(writer);

for(String content : list){
bw.write(content);
bw.newLine();
bw.flush();
}

} catch (IOException e) {
e.printStackTrace();
}finally{
try {
if(bw!=null)
bw.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

private void sleep(int millis) throws InterruptedException {
Thread.sleep(millis);
}

}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.yulore.write;

public class TestWrite {

/**
* @param args
*/
public static void main(String[] args) {

// test();
test02();
}

private static void test02() {
WriteTask_New write = new WriteTask_New();
for(int i=0;i<4;i++){
new Thread(write).start();
}

OutputTask output = new OutputTask("abc.txt");
new Thread(output).start();
}

private static void test() {
WriteTask write = new WriteTask("abc.txt");
for(int i=0;i<5;i++){
new Thread(write).start();
}
}
}

来源: http://blog.csdn.net/top_code/article/details/8896047

使用支付宝打赏
使用微信打赏

若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏

扫描二维码,分享此文章