最近项目中遇到一个业务场景: 将当期数据库中的表迁移到另外一个数据库中,为满足迁移效率需要进行并发数据迁移。对每一数据表可以启动不同的线程同时迁移数据。迁移完成后,同步更新对应该迁移任务的状态字段。 最先想到的是使用 java 中并发工具类:同步屏障 CyclicBarrier。 CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。 CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。 一、通过 CyclicBarrier 实现迁移任务代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package com.future.test;import java.util.List;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class VerticaTransfer extends DataTransfer <DataInfo > { int threadCount = 10 ; ExecutorService executor = null ; CyclicBarrier barrier; protected void doBefore (DataInfo entity) { executor = Executors.newFixedThreadPool(threadCount); barrier = new CyclicBarrier(threadCount,new DoAfter(this ,entity)); } protected void doJob (DataInfo entity) { List<Product> ps = entity.getProducts(); for (Product product : ps) { executor.execute(new VerticaTransferTask(barrier,product)); } } @Override protected void doAfter (DataInfo entity) { } } class DoAfter implements Runnable { private VerticaTransfer verticaTransfer; private DataInfo entity; DoAfter(VerticaTransfer verticaTransfer,DataInfo entity) { this .verticaTransfer = verticaTransfer; this .entity = entity; } public void run () { System.out.println("迁移完成。共迁移:" + entity.getProducts().size()); } }
业务处理代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package com.future.test;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class VerticaTransferTask implements Runnable { private CyclicBarrier barrier; private Product product; VerticaTransferTask(Product product){ this .product = product; } VerticaTransferTask(CyclicBarrier barrier,Product product){ this .barrier = barrier; this .product = product; } @Override public void run () { try { System.out.println("进行迁移 :" + product.getId()); Thread.sleep(1000 ); }catch (Exception e){ e.printStackTrace(); } finally { try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }
测试入口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package com.future.test;import java.util.ArrayList;import java.util.List;public class VerticaTransferTest { public static void main (String[] args) { VerticaTransfer transfer = new VerticaTransfer(); DataInfo data = new DataInfo(); List<Product> ps = new ArrayList<Product>(); int tmp = 0 ; for (int i = 0 ; i < 10 ;i++){ Product p = new Product(); p.setId(i + "" ); p.setPurchase_price(10 ); p.setSalse_price(10 + i); ps.add(p); tmp += i; } data.setProducts(ps); transfer.execute(data); } }
通过上述实现步骤,完全可以实现业务场景。 增强业务场景:在上述场景基础上,对每次迁移的结果进行最终的汇总。多少迁移成功,多少迁移失败。也就是对每个线程处理结果进行汇总。 这个就涉及到线程间通信的问题。在现有处理的基础上,添加一个公共 List 变量,在迁移 VerticaTransferTask run()方法中将迁移结果 synchronized 放在 List 中即可。 但是,有没有更好的实现方式呢? Future 接口 描述:从 Java 1.5 开始,提供了 Callable 和 Future,通过它们可以在任务执行完毕之后得到任务执行结果。 这就表示我们可以通过 Future 获取每个线程的执行结果。我以下通过并行计算产品利润的方式简单实现需求。 二、通过 Future 实现并行处理任务代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package com.test;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class VerticaTransfer extends DataTransfer <DataInfo > { int threadCount = 10 ; ExecutorService executor = null ; List<Future<ResultInfo>> results = new ArrayList<Future<ResultInfo>>(); protected void doBefore (DataInfo entity) { executor = Executors.newFixedThreadPool(threadCount); } protected void doJob (DataInfo entity) { List<Product> ps = entity.getProducts(); for (Product product : ps) { Future<ResultInfo> res = executor.submit(new VerticaTransferTask(product)); results.add(res); } } @Override protected void doAfter (DataInfo entity) { double total = 0 ; List<Future<ResultInfo>> rs = this .results; for (Future<ResultInfo> future : rs) { try { ResultInfo info = future.get(); total += info.getPrice(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } System.out.println("产品总利润:" + total); } }
业务处理代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package com.test;import java.util.concurrent.Callable;public class VerticaTransferTask implements Callable <ResultInfo > { private Product product; VerticaTransferTask(Product product){ this .product = product; } @Override public ResultInfo call () throws Exception { ResultInfo res = null ; try { double money = product.getSalse_price() - product.getPurchase_price(); res = new ResultInfo(); res.setPrice(money); res.setProductId(product.getId()); Thread.sleep(1000 ); }catch (Exception e){ e.printStackTrace(); } return res; } }
很简单我们就实现了并行计算并合并结果集。 那我能不能两者一起使用呢,我在 CyclicBarrier 处理结果 DoAfter 类中获取 Future 结果进行统计。 这样不就可以满足需求了吗。设想处理如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class VerticaTransfer extends DataTransfer <DataInfo > { int threadCount = 10 ; ExecutorService executor = null ; CyclicBarrier barrier; List<Future<ResultInfo>> results = new ArrayList<Future<ResultInfo>>(); protected void doBefore (DataInfo entity) { executor = Executors.newFixedThreadPool(threadCount); barrier = new CyclicBarrier(threadCount,new DoAfter(this ,entity)); } protected void doJob (DataInfo entity) { List<Product> ps = entity.getProducts(); for (Product product : ps) { Future<ResultInfo> res = executor.submit(new VerticaTransferTask(product)); results.add(res); } } @Override protected void doAfter (DataInfo entity) { } } class DoAfter implements Runnable { private VerticaTransfer verticaTransfer; private DataInfo entity; DoAfter(VerticaTransfer verticaTransfer,DataInfo entity) { this .verticaTransfer = verticaTransfer; this .entity = entity; } public void run () { double total = 0 ; List<Future<ResultInfo>> rs = verticaTransfer.results; for (Future<ResultInfo> future : rs) { try { ResultInfo info = future.get(); total += info.getPrice(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } System.out.println("产品总利润:" + total); } }
业务处理 VerticaTransferTask:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class VerticaTransferTask implements Callable <ResultInfo > { private CyclicBarrier barrier; private Product product; VerticaTransferTask(Product product){ this .product = product; } VerticaTransferTask(CyclicBarrier barrier,Product product){ this .barrier = barrier; this .product = product; } @Override public ResultInfo call () { ResultInfo res = null ; try { double money = product.getSalse_price() - product.getPurchase_price(); res = new ResultInfo(); res.setPrice(money); res.setProductId(product.getId()); Thread.sleep(1000 ); }catch (Exception e){ e.printStackTrace(); } finally { try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } return res; } }
运行后发现死锁啦,原因是什么呢? 查了一下 CyclicBarrier 资料,注意这一点: CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrier(int parties,Runnable barrier-Action),用于在线程到达屏障时,优先执行 barrierAction。 也就是在 barrier.await()执行之后会优先执行 DoAfter 类中的 run, 而这时 run 中的 future.get()阻塞等待 VerticaTransferTask call 运行结果,形成了资源相互 抢占,造成了死锁。 这样我们就大概了解了在 java 中有两种实现并行计算的方式,那么具体遇到问题的时候如何选择呢? 我们还是要清楚两者的概念: CyclicBarrier 在到达屏障之后线程并没有处理结束,而是被阻塞等待,等有优先处理 barrierAction 完成后,被 signalAll 唤醒继续运行。 CyclicBarrier 中的源代码:
1 2 3 4 5 6 7 8 private void nextGeneration () { trip.signalAll(); count = parties; generation = new Generation(); }
而 Future 是等待线程运行完成之后才获取结果,否则一直阻塞等待。
http://richard-lee.iteye.com/blog/2276319