逐步优化实现redis分布式锁

已有 884人阅读此文 - - 服务器配置

前言

什么是分布式锁

        首先对于单进程而言,锁可以看成是多线程情况下访问共享资源的一种线程同步机制,所有线程都在同一个JVM进程里的时候,使用Java语言提供的锁机制可以起到对共享资源进行同步的作用。

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

    分布式锁实现方式一般有数据库唯一索引,redis原子性,zookeeper等实现方式.

今天我们采用redis实现分布式锁,并根据每一次实现提出可能出现的问题并优化实现一个新的方案.为了保证文章尽可能的短,忽略更多的废话,就不从最差的实现一步步的实现了.

方案一最简单的实现:

使用redis原子性操作increment来实现分布式锁

/**
 * 做一些业务执行的操作,后面几个实现都会调用这个方法
 */
private void doSomething() {

    System.out.println("业务执行代码开始");
    try {
        System.out.println("业务代码执行花费3秒时间");
        Thread.sleep(3000);
        //制造一个业务异常
        int i = 0;
        int a = 1 / i;
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("业务执行代码结束");
}


/**
 * 简单分布锁改进1 保证即使业务执行部分发生异常,锁也会被释放
 */
@Test
public void redisLock() {

    //如果等于1时,说明获取到了锁,大于1时说明锁还被别的线程占用  //1
   if (1 == redisTemplate.opsForValue().increment(lockKey, 1)) { //2
        try {
            doSomething();//3
        } finally {//4
            //执行完成删除这个键(释放锁),业务代码无论发执行情况怎么样,都删除lockKey
            //是不是很完美呢?
            redisTemplate.delete(lockKey);//5
        }
    }
}

        这种实现有没有问题呢,在大多正常情况下都是可以使用的,原因是一获取锁的时候是原子性的 ,二是即使业务连加发生了异常,finally里也把锁释放掉了,是不是很完美呢?有没有听过再完美的设计有可能也抵不过蓝翔的挖掘机^_^,开玩笑.还有运维的kill 9.

     有这样一个场景 已获取到lockKey,正在执行业务代码,这时候你要升级服务了,运维kill 9 ....lockKey值为1,服务启动后发会生什么?    //2 再次执行时,永远会大于1,因为没有设置过期时间,也没有执行//5的操作,那下面的业务就不会再执行,就死锁了....尴不尴尬... 那再次完善一下,解决如上的问题.


方案二,使用setNX操作

注意:以下基于springboot 2.1.7实现,如果你是低于2.1的版本的可能会没有 default Boolean setIfAbsent(K key, V value, Duration timeout) 方法,那可以使用如下实现.2.1以上忽略这段代码

/**
 * springboot 2.1以下版本 使用以下实现 保证setnx的原子性
 *
 * @param key
 * @param value
 * @param exptime
 * @return
 */
public boolean setIfAbsent(final String key, final Serializable value, final long exptime) {
    Boolean result = (Boolean) redisTemplate.execute((RedisCallback) connection -> {
        RedisSerializer valueSerializer = redisTemplate.getValueSerializer();
        RedisSerializer keySerializer = redisTemplate.getKeySerializer();
        Object obj = connection.execute("set", keySerializer.serialize(key),
                valueSerializer.serialize(value),
                "NX".getBytes(StandardCharsets.UTF_8),
                "EX".getBytes(StandardCharsets.UTF_8),
                String.valueOf(exptime).getBytes(StandardCharsets.UTF_8));
        return obj != null;
    });
    return result;
}
/**
 * 使用 SetNX  
 */
@Test
public void redisLock() {
    //使用setnx操作
    //这里根据业务代码
    String taskId = UUID.randomUUID().toString();
    if (redisTemplate.opsForValue().setIfAbsent(lockKey, taskId, 3000, TimeUnit.SECONDS)) {
        try {
            doSomething();
        } finally {
            //方式2先判断当前锁持有者,以免误删了别的线程持有的锁
            if (taskId.equalsIgnoreCase(String.valueOf(redisTemplate.opsForValue().get(lockKey)))) {
                redisTemplate.delete(lockKey);
            }
        }
    }
}


此方案以完全解决了上面方案的问题,获取锁后,即使重启,异常,都会在3秒后正常释放锁.但新的问题又出现了,当前doSomething业务确定时间是3秒时间,真正的业务里,谁能确定业务代码实际能执行多久呢?设置过期时间长了或再发生如kill 9这样的操作,虽然不会死锁,但仍然会造成锁被长久占用.如果设置的过期时间太短,比如1秒,那么doSomething还在执行,这时另一个线程就可以获取到锁.显然也有很大的问题,就达不到锁定的目的.

所以现在思考,是不是可以加一个监听器之类的做如下判断

if(doSomethingIsFinish){业务如果完成(锁还在被持有)
    释放锁
    releaseLock()
} else {
    增加一些过期时间
    incrementLockExpire()
}

以上是伪代码.

方案三,使用redisson实现分布式锁

public class RedissonManager {
    private static Config config = new Config();
    //声明redisso对象
    private static Redisson redisson = null;

    //实例化redisson
    static {
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        //得到redisson对象
        redisson = (Redisson) Redisson.create(config);
    }

    //获取redisson对象的方法
    public static Redisson getRedisson() {
        return redisson;
    }
}


@Test
public void redisLock3() {
    Redisson redisson = RedissonManager.getRedisson();
    //获取锁对象
    RLock lock = redisson.getLock(lockKey);
    boolean locked = false;
    try {
        //尝试加锁 并设置10秒过期,这个值很重要
        locked = lock.tryLock(1000, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    if (locked) {
        doSomething();
        // 释放锁
        lock.unlock();
    }
}


方案一和二,除了已说的问题,还有就是要用一个很大很在的try finally,而使用redisson,完全不需要这样,调用lock.unlock()就行了,根据方案一和二的经验,这个方法也不一定能执行到,但是没有关系.

让我们看一下redisson实现的原理图

22852215091600347

如图:只要加锁成功,就会有一个监听器,一直在查看锁是否还被持有,如果在被持有,监视器会默认每次增加30秒的锁生成时间,可以在配置Config中使用setLockWatchdogTimeout更改

好了,以上就是分布式锁的实现,有了redisson实现分布式锁是不是很简单了呢?这很完美吗?至少大多情况下,这是一个很好的解决方案,但有没有考虑过这个监视器会发生什么问题呢? 后面我们会继续尝试使用zookeeper来实现分布式锁



来源:自成e家 出处:逐步优化实现redis分布式锁
本文由 自成e家 原创 ,转载请注明出处,你的支持是我继续写作、分享的最大动力!
期待你一针见血的评论,Come on!