<div class="iteye-blog-content-contain" style="font-size: 14px;">
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span style="font-family: Wingdings;" lang="EN-US">Ø<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><!--[endif]--><strong><span style="font-family: 宋体;">问题背景</span></strong></p>
<p class="MsoNormal"><span style="font-family: 宋体;">之前写过一个单线程程序, 从数据库查询卡号的, 今天要做批量导入, 发现单线程太慢, 于是修改为多线程方式, 只是在前段开了多线程,以为就OK了, 谁知问题这时就出现了, 由于有实务问题, 事物是在县城内开启的, 这样就导致了每一个线程就是一个事物, 就导致事物之内更新的数据在其他线程读不到。</span></p>
<p class="MsoNormal"> </p>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<div class="quote_title">多线程开启 写道</div>
<div class="quote_div">/**<br> * 异步多线程导入<br> */<br> public void batchRelateAllExternalCardThread(<br> List<ExternalCard> externalCardList, int type)<br> throws InterruptedException {<br> long startTime = System.currentTimeMillis();<br> ConcurrentLinkedQueue<ExternalCard> clQueue = new ConcurrentLinkedQueue<ExternalCard>(<br> externalCardList);<br> ExecutorService es = Executors.newFixedThreadPool(THREAD_NUM);<br> for (int i = 0; i < THREAD_NUM; i++) {<br> es.execute(new RelateCardThread(clQueue, type));<br> }<br> boolean next = true;// 主线程退出标记<br> while (next) {<br> if (THREAD_NUM == threadNum.get()) {<br> es.shutdown(); // 这里是让 线程池推出使用的 ,但是主线程并没有退出<br> if (es.isShutdown()) {<br> next = false;<br> }<br> } else {<br> Thread.sleep(5000);<br> }<br> }<br> }</div>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<div class="quote_title">线程执行 写道</div>
<div class="quote_div">class RelateCardThread implements Runnable {<br> private ConcurrentLinkedQueue<ExternalCard> clQueue;<br> private int type;<br><br> public RelateCardThread(ConcurrentLinkedQueue<ExternalCard> clQueue,<br> int type) {<br> this.clQueue = clQueue;<br> this.type = type;<br> }<br><br> public void run() {<br> int count = 0;<br> while (!clQueue.isEmpty()) {<br> logger.info("============================================="<br> + Thread.currentThread().getId() + "执行:" + count);<br> ExternalCard ec = clQueue.poll();<br> if (null != ec) {<br> relateExternalCard(ec, type);<br> }<br> count++;<br> }<br> threadNum.getAndIncrement();<br> }<br> }</div>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<div class="quote_title">写道</div>
<div class="quote_div">private void relateExternalCard(ExternalCard ec, int type) {<br> resultInfo = cardCtlService.activeOtherCardAndRelateCard(<br> mbTemp, merchant, ec, ec.getCardType(), null,<br> WebCommonContents.ExternalCardRelateSource.CRM<br> .getCode());<br> }}}</div>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<pre name="code" class="java">@Transactional(value = "trade", readOnly = false, propagation = Propagation.REQUIRED)
@Override
public ResultInfo activeOtherCardAndRelateCard(){
//在这个方法中需要同步获取这个对象, 之前我们采用的 synchronized 的
cardRecord = cardRecordBizService.selectCardRecord(cardRecord);
}</pre>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<p class="MsoNormal"> </p>
<p class="MsoNormal"> </p>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<p class="MsoNormal"><span style="font-family: 宋体;">通过上面伪代码,可以看出整个代码逻辑:</span></p>
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span lang="EN-US">1.<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><!--[endif]--><span style="font-family: 宋体;">并行计算每个线程对部分卡进行开卡、激活。</span></p>
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span lang="EN-US">2.<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><span style="font-family: monospace; font-size: 1em; line-height: 1.5; background-color: #fafafa;">selectCardRecord</span><span style="">方法加了线程同步锁处理,意图解决卡号重复问题。</span></p>
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span lang="EN-US">3.<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><!--[endif]--><span style="font-family: 宋体;">事务</span><span lang="EN-US">Transaction</span><span style="font-family: 宋体;">在线程内,因此每个线程开启一个事务,导入数据不在一个大事务中,效率较高也保证批量多个卡激活互不影响,每个卡激活一个独立事务处理。</span></p>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span style="font-family: Wingdings;" lang="EN-US">Ø<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><!--[endif]--><strong><span style="font-family: 宋体;">问题分析</span></strong></p>
<p class="MsoNormal"><span style="font-family: 宋体;">开发对</span><span style="font-family: monospace; font-size: 1em; line-height: 1.5; background-color: #fafafa;">selectCardRecord</span><span style="font-size: 12px; line-height: 1.5; font-family: 宋体;">加了线程同步锁,但仍然出现卡号重复。实际上,再仔细查看代码逻辑,会发现,<span style="background: yellow;">卡号流水并不通过代码维护,而是通过数据库的激活标识</span></span><span style="font-size: 12px; line-height: 1.5; background: yellow;" lang="EN-US">flag</span><span style="font-size: 12px; line-height: 1.5; font-family: 宋体; background: yellow;">维护</span><span style="font-size: 12px; line-height: 1.5; font-family: 宋体;">。每次</span><span style="font-size: 12px; line-height: 1.5;" lang="EN-US">SQL</span><span style="font-size: 12px; line-height: 1.5; font-family: 宋体;">取出一条没有激活的卡号,然后设置为激活。</span></p>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<p class="MsoNormal"><span style="font-family: 宋体;">问题出在并发和事务的关系上,虽然代码试图线程同步,但数据库事务提交先后顺序并不受代码控制。</span></p>
<p class="MsoNormal"><br><img src="http://dl2.iteye.com/upload/attachment/0101/9683/c54ef4e4-8298-3824-9469-022c91f0df6f.png" alt=""><br> <br><!--[endif]--><!--[endif]--></p>
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span lang="EN-US">1.<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><!--[endif]--><span lang="EN-US">Time1</span><span style="font-family: 宋体;">时刻,线程</span><span lang="EN-US">1</span><span style="font-family: 宋体;">开启事务</span><span lang="EN-US">T1</span><span style="font-family: 宋体;">,获取第一个可用的卡号</span><span lang="EN-US">ID=1</span></p>
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span lang="EN-US">2.<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><!--[endif]--><span lang="EN-US">Time1’</span><span style="font-family: 宋体;">时刻,线程</span><span lang="EN-US">2</span><span style="font-family: 宋体;">开启事务</span><span lang="EN-US">T2</span><span style="font-family: 宋体;">,由于</span><span lang="EN-US">T1</span><span style="font-family: 宋体;">未提交,所以仍然获取可用的卡号</span><span lang="EN-US">ID=1</span></p>
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span lang="EN-US">3.<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><!--[endif]--><span lang="EN-US">Time2</span><span style="font-family: 宋体;">时刻,</span><span lang="EN-US">T1</span><span style="font-family: 宋体;">提交,将标识更改为</span><span lang="EN-US">true</span></p>
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span lang="EN-US">4.<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><!--[endif]--><span lang="EN-US">Time2’</span><span style="font-family: 宋体;">时刻,</span><span lang="EN-US">T2</span><span style="font-family: 宋体;">提交,再次更改标识为</span><span lang="EN-US">true</span></p>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<p class="MsoNormal"><span lang="EN-US">T1</span><span style="font-family: 宋体;">,</span><span lang="EN-US">T2</span><span style="font-family: 宋体;">返回了相同的</span><span lang="EN-US">ID=1</span><span style="font-family: 宋体;">的卡号,导致卡号重复。这是典型的脏读问题。</span></p>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<p class="MsoNormal"> </p>
<p class="MsoNormal"><span lang="EN-US">尝试解决方案:</span></p>
<p class="MsoNormal"><span lang="EN-US">最开始 想到了使用脏的的问题, 于是我们叫事物级别修改为:isolation = Isolation.READ_UNCOMMITTED</span></p>
<p class="MsoNormal"><span lang="EN-US">这是虽然读到的卡号不重复了, 但是,我么在更新卡信息的时候,考虑到是藏读引起的, 么有起到隔离作用, 这是就在想到行锁, 使用行锁后, 发现解决了这个问题, 且由于所范围的缩小,加快了多线程的效率;</span></p>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span style="font-family: Wingdings;" lang="EN-US">Ø<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><!--[endif]--><strong><span style="font-family: 宋体;">解决方案</span></strong></p>
<p class="MsoNormal"><span style="font-family: 宋体;">分析出原因后,有多种可行的改法。</span><span lang="EN-US">Postgres</span><span style="font-family: 宋体;">默认隔离级别是读提交</span><span lang="EN-US">read committed</span><span style="font-family: 宋体;">,在代码改动较小的前提下,我们可以选择对读数据加行锁,阻塞其他读直到事务提交。</span></p>
<p class="MsoNormal"><span style="font-family: 宋体;">数据库的锁从粒度上简单说有两种</span><span lang="EN-US">(</span><span style="font-family: 宋体;">实际比这个要多,只说最简单情况</span><span lang="EN-US">)</span><span style="font-family: 宋体;">:</span></p>
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span lang="EN-US">1.<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><!--[endif]--><span style="font-family: 宋体;">表级锁</span></p>
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span lang="EN-US">2.<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><!--[endif]--><span style="font-family: 宋体;">行级锁</span></p>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<p class="MsoNormal"><span style="font-family: 宋体;">从锁行为上分:</span></p>
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span lang="EN-US">1.<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><!--[endif]--><span style="font-family: 宋体;">共享锁</span></p>
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span lang="EN-US">2.<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><!--[endif]--><span style="font-family: 宋体;">排它锁</span></p>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<p class="MsoNormal"><span style="font-family: 宋体;">我们的需求是对数据的读取加锁,阻塞其他事务读取相同行,不同行的数据不阻塞。根据这个需求,代码最小改动的方案如下:</span></p>
<p class="MsoNormal"><span lang="EN-US"> String id = executeQuery(‘select id from trade.card_record where batch=xxx and flag=false limit 1 <span style="background: yellow;">for update</span>’);</span></p>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<p class="MsoNormal"><span style="font-family: 宋体; background: yellow;">对</span><span style="background: yellow;" lang="EN-US">select</span><span style="font-family: 宋体; background: yellow;">读加行级排它锁,这样就达到了我们的目的。</span> <span style="font-family: 宋体;">原有代码中的</span><span lang="EN-US">synchronized</span><span style="font-family: 宋体;">没有任何作用,去掉即可。</span></p>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span style="font-family: Wingdings;" lang="EN-US">Ø<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><!--[endif]--><strong><span style="font-family: 宋体;">测试结果</span></strong></p>
<p class="MsoNormal"><span style="font-family: 宋体;">并发冲突问题验证解决,改为并行计算后,</span><span lang="EN-US">40w</span><span style="font-family: 宋体;">数据导入测试结果预计</span><span lang="EN-US">2</span><span style="font-family: 宋体;">小时,提高</span><span lang="EN-US">4</span><span style="font-family: 宋体;">倍。根据硬件和线程数的不同,效率会有所波动。</span></p>
<p class="MsoNormal"><span lang="EN-US"> </span></p>
<p class="MsoListParagraph" style=""><!--[if !supportLists]--><span style="font-family: Wingdings;" lang="EN-US">Ø<span style="font-size: 7pt; line-height: normal; font-family: 'Times New Roman';"> </span></span><!--[endif]--><strong><span style="font-family: 宋体;">案例总结</span></strong></p>
<p class="MsoNormal"><span style="font-family: 宋体;">这个</span><span lang="EN-US">bug</span><span style="font-family: 宋体;">实际上是未考虑到并发情况而产生的,一直埋藏在软</span><span lang="EN-US">pos</span><span style="font-family: 宋体;">的代码中多年。并行计算加重了并发的程度,才会很容易暴露这个</span><span lang="EN-US">bug</span><span style="font-family: 宋体;">。</span></p>
<p> </p>
<p class="MsoNormal"><span style="font-family: 宋体;">在程序代码设计中,对于系统中“唯一流水号”这样的情况,我们要特别重视并发的访问,同时要考虑事务、线程之间的数据一致性。希望通过这个案例,让大家认识到事务的概念。</span></p>
</div>
相关推荐
Java 模拟线程并发 Java, 模拟线程并发,线程,并发 Java, 模拟线程并发,线程,并发 Java, 模拟线程并发,线程,并发 Java, 模拟线程并发,线程,并发
基于Qt的多线程并发服务器 incomingConnection(qintptr socketDescriptor)检测
代码里面包含一个并发4个线程同时运行 全部开始 全部停止 单个停止还有点问题。 还有生产者消费者 里面的里面能帮助你理解多线程的运用!
WEB API 多线程并发测试工具; WEB API 多线程并发测试工具
Tesseract OCR多线程并发识别案例----只演示多线程并发识别,此工具不关注识别正确率,可通过训练tessdata来获得更高的识别正确率。
并发服务器-多线程服务器详解
模拟IPC,注册接收sip信令消息,模拟发送视频数据,多线程支持多路并发
java多线程并发的在新窗口
在开发过程中自己编写的多线程并发程序组件源代码共享给大家,里面有测试的例子,提供给大家学习,希望大家多提宝贵意见~
实现多线程的并发执行,能演示操作系统的时间转轮调度算法对多线程程序执行的影响效果,能控制一个或多个线程的执行情况。
JVM优化 并发调试和JDK8新特性,并发调试和JDK8新特性
JDK5中的多线程并发库.doc 描述了JDK多线程的并发
多线程并发从,学习笔记,代码+注释,从线程创建开始到多线程并发,相关锁以及一些设计模式等
JDK5中的多线程并发库
多线程,高并发
该资源是多线程并发下的单例模式-源码,几乎包含了所有方式实现的单例模式,并且能够确保在多线程并发下的线程安全性。 读者可结合本人博客 http://blog.csdn.net/cselmu9?viewmode=list 中的《线程并发之单例模式...
qtconcurrent 多线程并发处理应用demo。开发环境Qt5.9.4
C#(也称Csharp)在多线程下并发执行HTTP请求的实现,采用C#封装HttpWebRequest类开发的多线程并发采集程序源码文档,文档中详细说明了HttpWebRequest并发HTTP请求实现网站采集的方法,经过测试同时并发1000+不是问题,...
java多线程并发编程知识导图笔记.xmind
易语言websocket模块,多线程并发稳定模块,实测稳定,保证可用