fzy-blog

java异步计算场景应用

2019-05-24

最近项目中遇到一个业务场景:
将当期数据库中的表迁移到另外一个数据库中,为满足迁移效率需要进行并发数据迁移。对每一数据表可以启动不同的线程同时迁移数据。迁移完成后,同步更新对应该迁移任务的状态字段。
最先想到的是使用 java 中并发工具类:同步屏障 CyclicBarrier。
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。
一、通过 CyclicBarrier 实现迁移任务代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.future.test;

import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class VerticaTransfer extends DataTransfer<DataInfo>{
int threadCount = 10;
//线程调度
ExecutorService executor = null;
CyclicBarrier barrier;
//计算结果集
protected void doBefore(DataInfo entity){
//线程池
executor = Executors.newFixedThreadPool(threadCount);
//CyclicBarrier可以用于多线程计算数据,最后处理结果的场景
barrier = new CyclicBarrier(threadCount,new DoAfter(this,entity));
}
protected void doJob(DataInfo entity){
//并行计算
List<Product> ps = entity.getProducts();
for (Product product : ps) {
executor.execute(new VerticaTransferTask(barrier,product));
}

}
@Override
protected void doAfter(DataInfo entity) {
}

}
/**
* 合并计算处理
* @author Administrator
*
*/
class DoAfter implements Runnable {
private VerticaTransfer verticaTransfer;
private DataInfo entity;
DoAfter(VerticaTransfer verticaTransfer,DataInfo entity) {
this.verticaTransfer = verticaTransfer;
this.entity = entity;
}
public void run() {
System.out.println("迁移完成。共迁移:" + entity.getProducts().size());
}
}

业务处理代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.future.test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
* 数据迁移执行任务
* @author
*
*/
public class VerticaTransferTask implements Runnable{

private CyclicBarrier barrier;
private Product product;
VerticaTransferTask(Product product){
this.product = product;
}
VerticaTransferTask(CyclicBarrier barrier,Product product){
this.barrier = barrier;
this.product = product;
}


@Override
public void run() {
// TODO Auto-generated method stub
try {
System.out.println("进行迁移 :" + product.getId());
Thread.sleep(1000);
}catch(Exception e){
e.printStackTrace();
} finally {
try {
barrier.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

测试入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

package com.future.test;

import java.util.ArrayList;
import java.util.List;

public class VerticaTransferTest{
public static void main(String[] args) {
VerticaTransfer transfer = new VerticaTransfer(); //
DataInfo data = new DataInfo();
List<Product> ps = new ArrayList<Product>();
int tmp = 0;
for(int i = 0; i < 10;i++){
Product p = new Product();
p.setId(i + "");
p.setPurchase_price(10);
p.setSalse_price(10 + i);
ps.add(p);
tmp += i;
}

data.setProducts(ps);
transfer.execute(data);
}

}

通过上述实现步骤,完全可以实现业务场景。
增强业务场景:在上述场景基础上,对每次迁移的结果进行最终的汇总。多少迁移成功,多少迁移失败。也就是对每个线程处理结果进行汇总。
这个就涉及到线程间通信的问题。在现有处理的基础上,添加一个公共 List 变量,在迁移 VerticaTransferTask run()方法中将迁移结果 synchronized 放在 List
中即可。
但是,有没有更好的实现方式呢?
Future 接口
描述:从 Java 1.5 开始,提供了 Callable 和 Future,通过它们可以在任务执行完毕之后得到任务执行结果。
这就表示我们可以通过 Future 获取每个线程的执行结果。我以下通过并行计算产品利润的方式简单实现需求。
二、通过 Future 实现并行处理任务代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

package com.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class VerticaTransfer extends DataTransfer<DataInfo>{
int threadCount = 10;
//线程调度
ExecutorService executor = null;
//计算结果集
List<Future<ResultInfo>> results = new ArrayList<Future<ResultInfo>>();
protected void doBefore(DataInfo entity){
//线程池
executor = Executors.newFixedThreadPool(threadCount);

}
protected void doJob(DataInfo entity){
//并行计算
List<Product> ps = entity.getProducts();
for (Product product : ps) {
Future<ResultInfo> res = executor.submit(new VerticaTransferTask(product));
results.add(res);
}

}
@Override
protected void doAfter(DataInfo entity) {
double total = 0;
List<Future<ResultInfo>> rs = this.results;
for (Future<ResultInfo> future : rs) {
try {
ResultInfo info = future.get();
total += info.getPrice();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("产品总利润:" + total);
}

}

业务处理代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

package com.test;

import java.util.concurrent.Callable;

/**
* 数据迁移执行任务
* @author
*
*/
public class VerticaTransferTask implements Callable<ResultInfo>{

private Product product;
VerticaTransferTask(Product product){
this.product = product;
}


@Override
public ResultInfo call() throws Exception {
// TODO Auto-generated method stub
ResultInfo res = null;
try {
double money = product.getSalse_price() - product.getPurchase_price();
res = new ResultInfo();
res.setPrice(money);
res.setProductId(product.getId());
Thread.sleep(1000);
}catch(Exception e){
e.printStackTrace();
}
return res;
}
}

很简单我们就实现了并行计算并合并结果集。
那我能不能两者一起使用呢,我在 CyclicBarrier 处理结果 DoAfter 类中获取 Future 结果进行统计。
这样不就可以满足需求了吗。设想处理如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

public class VerticaTransfer extends DataTransfer<DataInfo>{
int threadCount = 10;
//线程调度
ExecutorService executor = null;
CyclicBarrier barrier;
//计算结果集
List<Future<ResultInfo>> results = new ArrayList<Future<ResultInfo>>();
protected void doBefore(DataInfo entity){
//线程池
executor = Executors.newFixedThreadPool(threadCount);
//CyclicBarrier可以用于多线程计算数据,最后处理结果的场景
barrier = new CyclicBarrier(threadCount,new DoAfter(this,entity));
}
protected void doJob(DataInfo entity){
//并行计算
List<Product> ps = entity.getProducts();
for (Product product : ps) {
Future<ResultInfo> res = executor.submit(new VerticaTransferTask(product));
results.add(res);
}

}
@Override
protected void doAfter(DataInfo entity) {

}

}
/**
* 合并计算处理
* @author Administrator
*
*/
class DoAfter implements Runnable {
private VerticaTransfer verticaTransfer;
private DataInfo entity;
DoAfter(VerticaTransfer verticaTransfer,DataInfo entity) {
this.verticaTransfer = verticaTransfer;
this.entity = entity;
}
public void run() {
double total = 0;
List<Future<ResultInfo>> rs = verticaTransfer.results;
for (Future<ResultInfo> future : rs) {
try {
ResultInfo info = future.get();
total += info.getPrice();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("产品总利润:" + total);
}
}

业务处理 VerticaTransferTask:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

public class VerticaTransferTask implements Callable<ResultInfo>{

private CyclicBarrier barrier;
private Product product;
VerticaTransferTask(Product product){
this.product = product;
}
VerticaTransferTask(CyclicBarrier barrier,Product product){
this.barrier = barrier;
this.product = product;
}


@Override
public ResultInfo call() {
// TODO Auto-generated method stub
ResultInfo res = null;
try {
double money = product.getSalse_price() - product.getPurchase_price();
res = new ResultInfo();
res.setPrice(money);
res.setProductId(product.getId());
Thread.sleep(1000);
}catch(Exception e){
e.printStackTrace();
} finally {
try {
barrier.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return res;
}
}

运行后发现死锁啦,原因是什么呢?
查了一下 CyclicBarrier 资料,注意这一点:
CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrier(int parties,Runnable barrier-Action),用于在线程到达屏障时,优先执行 barrierAction。
也就是在 barrier.await()执行之后会优先执行 DoAfter 类中的 run, 而这时 run 中的 future.get()阻塞等待 VerticaTransferTask call 运行结果,形成了资源相互
抢占,造成了死锁。
这样我们就大概了解了在 java 中有两种实现并行计算的方式,那么具体遇到问题的时候如何选择呢?
我们还是要清楚两者的概念:
CyclicBarrier 在到达屏障之后线程并没有处理结束,而是被阻塞等待,等有优先处理 barrierAction 完成后,被 signalAll 唤醒继续运行。
CyclicBarrier 中的源代码:

1
2
3
4
5
6
7
8

private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

而 Future 是等待线程运行完成之后才获取结果,否则一直阻塞等待。

http://richard-lee.iteye.com/blog/2276319

使用支付宝打赏
使用微信打赏

若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏

扫描二维码,分享此文章