V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Coody
V2EX  ›  Java

使用 ELock 实现高性能分布式锁(非轮询)

  •  
  •   Coody · 2019-01-21 16:56:24 +08:00 · 3019 次点击
    这是一个创建于 2125 天前的主题,其中的信息可能已经有所发展或是发生改变。

    前言:

    随着笔者的颜值不断提高,用户量的日益增长,传统的单机方案已经不能满足产品的需求。笔者在网上寻遍方案,发现均为人云亦云,一份以毫秒为精度的轮询分布式锁被转发转载上万次。然,该方案没法满足笔者性能要求。故此,笔者研发 ELock 插件,并发布本文章。

    其实集群也好,分布式服务也好。当我们不能保证团队成员的整体素质,那么在某些业务上,分布式锁自然没法避免。

    公认开发原则

    能不使用分布式锁的,尽可能不使用

    举个例子

    一个商品交易,需要检查库存、检查余额、扣库存、扣款、生成订单。可能很多人觉得,在分布式环境下一定要分布式锁才能安全。

    致此,笔者提供一种简单的方案:

    订单处理{
        if(库存不足){
            return 库存不足;
        }
        if(余额不足){
            return 余额不足;
        }
        事务管理(rollbackFor = Exception.class){
            //扣库存
            int changeLine = 执行语句(update 商品表 set 库存=库存-购买数量 where 库存>购买数量 and 商品 ID = ?);
            if(changeLine != 1){
                return 库存不足;
            }
            double 扣款金额= 商品价格 x 购买数量;
            //扣款
            changeLine = 执行语句(update 用户余额表 set 余额=余额-扣款金额 where 余额 > 扣款金额 and 扣款金额 > 0 and 用户 ID = ?);
            if(changeLine != 1){
                throw CustomRuntimeException("余额不足");
            }
            //生成订单
            changeLine = 执行语句( insert into 订单表 set ......);
            if(changeLine < 1){
                throw CustomRuntimeException("订单生成失败");
            }
        }
    }
    

    我们仔细来分析一下如上的整个逻辑

    1、当一个业务进入逻辑体,先检查余额和库存,不满足条件则返回错误(可阻挡非并发情况下的大部分业务流入事物)

    2、进入事物后,先扣取库存,当扣取失败,直接返回错误

    3、扣取库存后,则进行扣款,当扣款失败,则抛出异常(由于在业务体走到这里,已经扣取了库存,本处不能 return,需抛出异常,让事物回滚)

    4、扣款成功后,则生成订单,当订单生成失败,则抛出异常(理由同第三点)

    特别注意:语句中,通过 where 来进行余额不足和库存不足的条件判断。通过执行语句返回的影响行数,来判断是否扣取成功。 在以上流程中,我们发现,即便不使用分布式锁,也无并发问题。

    ===========================================================

    分布式锁

    友情提示:切勿觉得笔者以上理论是拆自己的台,笔者作为互联网技术人,希望各位技术人能够将产品质量做到最好,少加班,多回家陪陪家人

    ELock 介绍

     ELock 是笔者闲暇之余写的一套分布式锁插件,代码非常精简、并且以非轮询阻塞的方式进行加锁控制。适用于面向用户的互联网产品,目前用在一套用户量为 7 位数的直播系统中。  源码地址: https://gitee.com/coodyer/Coody-Framework/tree/original/coody-elock
    

    Maven 引用代码(可关注更新情况):

    <dependency>
      <groupId>org.coody.framework</groupId>
      <artifactId>coody-elock</artifactId>
      <!--更新于 2019-01-18 12:23:00 -->
      <version>alpha-1.2.2</version>
    </dependency>
    

    初始化 JedisPool

    //直接传入连接池初始化(注:无密码请传 null)
    ELockCache.initJedisPool(JediPool);
    //传入 ip、端口、密码、超时时间初始化
    ELockCache.initJedisPool(host, port, secretKey, timeOut);
    //传入 ip、端口、密码、超时时间、配置器初始化
    ELockCache.initJedisPool(host, port, secretKey, timeOut, jedisPoolConfig);
    

    加锁

    ELocker.lock(key, expireSecond);
    

    释放锁

    ELocker.unLock(key);
    

    注意: 加锁代码(ELocker.lock(key, expireSecond);)。需 try{}catch{}包围,并在 finally 释放锁(ELocker.unLock(key);)

    try {
       ELocker.lock(key, 100);
       for (int i = 0; i < 10; i++) {
          System.out.println(Thread.currentThread().getId() + ">>" + i);
          Thread.sleep(100l);
       }
    } catch (InterruptedException e) {
         e.printStackTrace();
    } finally {
         ELocker.unLock(key);
    }
    

    测试代码

    import java.util.ArrayList;
    import java.util.List;
    
    import org.coody.framework.elock.ELocker;
    import org.coody.framework.elock.redis.ELockCache;
    
    /**
     * 分布式锁测试
     * [@author]( https://my.oschina.net/arthor) Coody
     *
     * 2018 年 12 月 14 日
     * 
     * [@blog]( https://my.oschina.net/wangboxi) 54sb.org
     */
    public class ELockTest {
    
    	//要加锁的 key
    	static String key = "TESTLOCK_1";
    
    	static {
    		//初始化 jedis 连接
    		ELockCache.initJedisPool("127.0.0.1", 16379, "123456", 10000);
    	}
    
    	public static void main(String[] args) {
    		List<Thread> threads = new ArrayList<Thread>();
    		for (int i = 0; i < 10; i++) {
    			Thread thread = new Thread(new Runnable() {
    				[@Override]( https://my.oschina.net/u/1162528)
    				public void run() {
    					test();
    				}
    			});
    			threads.add(thread);
    		}
    		//启动十个线程
    		for (Thread thread : threads) {
    			thread.start();
    		}
    	}
    
    	//要锁的方法
    	private static void test() {
    		try {
    			ELocker.lock(key, 100);
    			for (int i = 0; i < 10; i++) {
    				System.out.println(Thread.currentThread().getId() + ">>" + i);
    				Thread.sleep(100l);
    			}
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			ELocker.unLock(key);
    		}
    	}
    }
    

    执行效果:

    输入图片说明

    ===========================================================

    这套锁在 Spring 下的花样玩法

    1、配置分布式锁中使用的缓存
    	<bean id="eLockCache" class="org.coody.framework.elock.redis.ELockCache" lazy-init="false">
    		<property name="jedisPool" ref="jedisPool" />
    	</bean>
    
    2、配置分布式锁切面
    <!-- 配置切面的 bean -->
    	<bean id="eLockInterceptor" class="org.coody.framework.elock.aspect.ELockAspect"></bean>
    	<!-- 配置 AOP -->
    	<aop:config>
    		<!-- 配置切面表达式 -->
    		<aop:pointcut
    			expression="@annotation(org.coody.framework.elock.annotation.ELock)"
    			id="eLockPointcut" />
    		<!-- 配置切面和通知 order:越小优先级越高 -->
    		<aop:aspect id="logAspect" ref="eLockInterceptor">
    			<aop:around method="rdLockForAspectj" pointcut-ref="eLockPointcut" />
    		</aop:aspect>
    	</aop:config>
    

    致此,分布式锁配置完成,开始进入我们的花样玩法。

    NO1. 使用注解添加分布式锁:
    	@ELock(name = "USER_MODIFY_LOCK", fields = "userId", waitTime = 20)
    	public void delUser(String userId) {
    		userDao.delUser(userId);
    	}
    

    在 ELock 注解中,name 代表 key 名字,field 代表拼接的字段。

    当 fields 所有字段长度超过 32 时,elock 将会对 key 进行 md5 获取摘要作为缓存的 key,即 name:key。

    本处 fields 支持选择对象的字段,即:方法参数名.字段值(如:userInfo.userId )

    本处 fields 支持多个字段,fields={"userInfo.userId","orderInfo.orderId"}

    当不指定 key 时,elock 将会根据包名、类名、方法名和方法参数生成 key

    当不指定 fields 时,elock 不会拼接任何多余参数,则该方法变成全局同步方法

    如图:

    输入图片说明

    NO2. 使用锁执行器添加分布式锁
    public void delUser(String userId) throws InterruptedException {
    		String key="USER_MODIFY_LOCK"+userId;
    		Integer code=new AbstractLockAble(key,20) {
    			
    			@Override
    			public Object doService() {
    				return userDao.delUser(userId);
    			}
    		}.invoke();
    	}
    

    通过 返回值=new AbstractLockAble(锁名称,超时时间){}.invoke()的方式,覆盖 doService 方法,将需要加锁的代码块放置 doService 方法里面执行。

    如图:

    输入图片说明

    ===========================================================

    如果在项目中觉得两种方式不可取,可采用上文中常规方式。

    笔者曾经百度搜索 Java 分布式锁实现,发现所提供方案都如出一辙(由于没有作图工具,就随便写下流程)。

    1、尝试获得锁

    2、死循环轮询获得锁

    3、执行业务

    4、释放锁

    在网上查到的方案,相信很多小朋友都知道,不知道是谁通过这种方式来做分布式锁,然后被一大堆网友转载。

    这种方案是可以实现锁,但是不适用于对外的互联网产品。

    重大问题地雷:当多个线程尝试获得锁,只有一个线程会执行,剩下的线程都在轮询获得锁。这里我们假设时间精度为 1ms,那就意味着每个线程每秒钟最多轮询 1000 次。然而在分布式锁中,我们需要借助中介容器去进行尝试获得锁的操作,如 redis zookeeper。故此,我们假设这个 key 有 100 个线程,第一个线程执行卡住,那么,1 个线程在执行业务,99 个线程在以每秒钟 1000 的频次对中间容器发起 ddos 攻击。故此,如上方案不适用于对外的互联网产品。

    笔者的方案:

    1、尝试获得锁

    2、线程入列并暂停

    3、执行业务

    4、发送消息释放锁,并唤醒下一个线程(轮询至第 1 步)

    我们知道,redis 也好,zookeeper 也好,都有消息订阅机制。当业务流入的时候,获取锁失败的线程,都进入了挂起的状态,那么此时有一个线程在执行。当这个线程执行完毕后,发送消息,这时候所有的应用程序都收到了这个消息,并尝试获得锁,以此往复,实现业务体执行权限

    版权说明:

    作者:Coody

    版权:©2014-2020 Test404 All right reserved. 版权所有

    反馈邮箱: [email protected]

    第 1 条附言  ·  2019-01-21 17:52:45 +08:00
    1.2.2 版本中,初始化锁内部 redis 实例采用本方案:

    new ELockCache().initJedisPool(......);
    14 条回复    2019-03-11 14:20:00 +08:00
    letitbesqzr
        1
    letitbesqzr  
       2019-01-21 17:48:19 +08:00
    赞,非常常见的场景,我们目前采用的就是 其他等待锁的线程去轮询获取锁,因为请求量小,所以没感觉出对系统有什么影响。有空学习下楼主的实现方式,感觉更加的优雅。
    vinsa
        2
    vinsa  
       2019-01-21 18:41:44 +08:00
    楼主讲的是 push vs pull,私以为按需选择即可,并非优劣替代关系。
    例子中 1ms 的轮询相当极端,有点不妥。
    xinyewdz
        3
    xinyewdz  
       2019-01-21 21:06:46 +08:00
    看了下代码,很多分布式锁遇到的问题都没有解决。精神可嘉,继续加油。
    Coody
        4
    Coody  
    OP
       2019-01-21 21:48:25 +08:00
    @xinyewdz 可以提 ISSUE,笔者会适时的完善。
    Coody
        5
    Coody  
    OP
       2019-01-21 21:49:54 +08:00
    @vinsa 可能笔者生涯中的产品比较在乎延时,一般 30ms 以上的接口我们都会列为待优化接口,50ms 以上就会即时去进行优化。故此时间精度通常较高。
    lhx2008
        6
    lhx2008  
       2019-01-21 21:59:48 +08:00
    有没可能和原来的自旋机制结合呢,频繁挂起解挂线程消耗应该还挺大的,像 AQS 就有自旋,多少毫秒不挂起。

    还有就是重复拿锁、交叉拿锁导致死锁的一些检测,我想应该是有必要加上的。

    代码很精简,可靠性未知,希望可以更加完善。
    lhx2008
        7
    lhx2008  
       2019-01-21 22:00:47 +08:00
    还有守护线程启动停 2s 好像随意了一点,有没有更优雅的方法?
    lhx2008
        8
    lhx2008  
       2019-01-21 22:03:33 +08:00
    @lhx2008
    #7 我说意思的 guard() 是 sleep 2ms 扫描
    Coody
        9
    Coody  
    OP
       2019-01-21 22:16:49 +08:00
    @lhx2008 谢谢支持啊,最近可能关注点在死锁和 redis 推送的可靠性方面。以及挂起线程的方式,也是可以优化的。
    dezhou9
        10
    dezhou9  
       2019-01-21 23:16:17 +08:00 via Android
    大神,献个丑。read-copy-update (RCU) is a synchronization mechanism implementing a kind of mutual exclusion。rcu 是不是比 cas 性能更优呢。
    holyghost
        11
    holyghost  
       2019-01-21 23:28:45 +08:00
    看到分布式锁就进来了
    想讨论的是:DDIA 上提到的几个问题,楼主是如何避免的呢?
    http://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
    IfEles
        12
    IfEles  
       2019-03-10 23:23:05 +08:00
    大佬有没有技术群可以交流一下的
    Coody
        13
    Coody  
    OP
       2019-03-11 13:41:43 +08:00
    @IfEles 218481849 不过大部分都是在开车
    IfEles
        14
    IfEles  
       2019-03-11 14:20:00 +08:00
    @Coody 哈哈,很多技术交流群慢慢都变成开车群了
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2649 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 31ms · UTC 15:33 · PVG 23:33 · LAX 07:33 · JFK 10:33
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.