您当前的位置:首页 > 电脑百科 > 程序开发 > 架构

分布式锁,原来这么简单!

时间:2023-09-22 11:26:56  来源:51CTO  作者:

作者 | 蔡柱梁

审校 | 重楼

目录

  1. 分布式锁介绍
  2. 如何实现分布式锁
  3. 实现分布式锁

1 分布式锁介绍

现在的服务往往都是多节点,在一些特定的场景下容易产生并发问题,比如扣减库存,送完即止活动,中台的批量导入(有唯一校验要求)等等。这时,我们可以通过分布式锁解决这些问题。

2 如何实现分布式锁

实现的方式有很多种,如:

  • 基于 MySQL 等数据库实现
  • 基于 ZooKeeper 实现
  • 基于 redis 实现不管采用什么技术栈实现,但是逻辑流程都是大体不差的。下面是笔者自己在工作中基于Redis 实践过的流程图:

3 实现分布式锁

其实可以不用自己手写,现在有一个中间件Redisson 相当好用,十分推荐。这里的实现更多是用于学习。

3.1 Redis 是单节点的情况下实现的分布式锁

需要使用分布式锁的业务代码如下:

package com.example.demo.test.utils;

import com.example.demo.utils.RedisLockUtil;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * @author CAIZhuliang
 * @date 2023/8/31
 */
@Slf4j
@SpringBootTest
public class RedisLockUtilTest {
 @Autowired
 private RedisLockUtil redisLockUtil;

 @Test
 public void simpleLockTest() {
 String key = "redis:lock:" + System.currentTimeMillis();
 boolean result = redisLockUtil.lock(key, 8_000L);
 if (result) {
 try {
 // do something
 } catch (Exception e) {
 log.error("simpleLockTest - 系统异常!", e);
 } finally {
 boolean unlock = redisLockUtil.unlock(key);
 if (!unlock) {
 log.error("simpleLockTest - 释放锁失败,key : {}", key);
 }
 }
 }
 }
}

分布式锁工具类代码如下:

package com.example.demo.utils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.Apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;

import JAVA.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author CaiZhuliang
 * @date 2023/8/31
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisLockUtil {
 private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
 new BasicThreadFactory.Builder()
 .namingPattern("redisLockUtil-schedule-pool-%d")
 .daemon(true)
 .build());

 private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();

 private final RedisTemplate<String, String> redisTemplate;

 /**
 * 释放锁
 * <p>必须和RedisLockUtil#simpleLock是同一个线程</p>
 * @param key 需要释放锁的key
 * @return true-成功 false-失败
 */
 public boolean releaseSimpleLock(String key) {
 String token = THREAD_LOCAL.get();
 try {
 String remoteToken = redisTemplate.opsForValue().get(key);
 if (!token.equals(remoteToken)) {
 // 当前线程不再持有锁
 return false;
 }
 // 是自己持有锁才能释放
 return Boolean.TRUE.equals(redisTemplate.delete(key));
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 释放锁发生异常,key : {}", key, e);
 return false;
 } finally {
 THREAD_LOCAL.remove();
 }
 }

 /**
 * 这个方法不考虑Redis的集群架构,不考虑脑裂问题,当只有一个Redis来考虑。
 * @param key 需要上锁的key
 * @param expireTime 过期时间,单位:毫秒
 * @return true-成功 false-失败
 */
 public boolean simpleLock(String key, Long expireTime) {
 if (StringUtils.isBlank(key)) {
 log.warn("非cluster模式简单分布式锁 - key is blank");
 return false;
 }
 if (null == expireTime || expireTime <= 0) {
 expireTime = 0L;
 }
 String token = UUID.randomUUID().toString();
 // 续约周期,单位纳秒
 long renewPeriod = expireTime / 2 * 1000_000;
 try {
 // 设置锁
 Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
 if (Boolean.FALSE.equals(result)) {
 return false;
 }
 // 上锁成功后将令牌绑定当前线程
 THREAD_LOCAL.set(token);
 if (renewPeriod > 0) {
 // 续约任务
 renewTask(key, token, expireTime, renewPeriod);
 }
 return true;
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 上锁失败。", e);
 THREAD_LOCAL.remove();
 return false;
 }
 }

 /**
 * 锁续约任务
 * @param key 需要续命的key
 * @param token 成功获锁的线程持有的令牌
 * @param expireTime 过期时间,单位:毫秒
 * @param renewPeriod 续约周期,单位:纳秒
 */
 private void renewTask(String key, String token, long expireTime, long renewPeriod) {
 EXECUTOR_SERVICE.schedule(() -> {
 ValueOperations<String, String> valueOperator = redisTemplate.opsForValue();
 String val = valueOperator.get(key);
 if (token.equals(val)) {
 // 是自己持有锁才能续约
 try {
 Boolean result = valueOperator.setIfPresent(key, token, expireTime, TimeUnit.MILLISECONDS);
 if (Boolean.TRUE.equals(result)) {
 // 续约成功
 log.debug("非cluster模式简单分布式锁 - 锁续约成功,key : {}", key);
 // 开启下一次续约任务
 renewTask(key, token, expireTime, renewPeriod);
 } else {
 log.error("非cluster模式简单分布式锁 - 锁续约失败,key : {}", key);
 }
 } catch (Exception e) {
 // 这里异常是抛不出去的,所以需要 catch 打印
 log.error("非cluster模式简单分布式锁 - 锁续约发生异常,key : {}", key, e);
 }
 } else {
 log.error("非cluster模式简单分布式锁 - 锁续约失败,不再持有token,key : {}", key);
 }
 }, renewPeriod, TimeUnit.NANOSECONDS);
 }
}

这就是一个最简单的实现方式。不过这里存在着许多问题:

  • 续约任务

这里判断是否持有令牌和续约这两个动作不在同一个事务里,可能发生覆盖现象。假设A线程判断自己持有令牌,但是一直没有请求 Redis 导致锁过期。B线程成功获锁,这时A线程往下执行 Redis 请求,结果A线程抢了B线程的锁。

  • 释放锁

这里判断是否持有令牌和删除key这两个动作不在同一个事务里,可能出现误删现象。假设A线程现在要释放锁,通过了令牌判断,准备删除 key 但是还没执行。这时 key 过期了,B线程成功获锁。接着A线程执行删除 key 导致了 B 线程的锁被删除。

因此,判断持有令牌与续约/删除key这两个动作是需要原子性的,我们可以通过 lua 来实现。

扩展,了解管道与 lua 的区别

  • pipeline(多用于命令简单高效,无关联的场景)

优点:使用简单,有效减少网络IO

缺点:本质还是发送命令请求Redis 服务,如果效率过低,就会阻塞 Redis,导致 Redis 无法处理其他请求

  • lua(多用于命令复杂,命令间有关联的场景)

优点:

  1. Redis 支持 lua 脚本,Redis 服务执行 lua 的同时是可以处理别的请求的,不会产生阻塞
  2. 命令都在脚本中,有效减少网络IO
  3. 具有原子性

缺点:

有一定的学习成本

3.1.1 使用 lua 进行优化

RedisLockUtil 代码如下:

package com.example.demo.utils;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author CaiZhuliang
 * @date 2023/8/31
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisLockUtil {
 private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
 new BasicThreadFactory.Builder()
 .namingPattern("redisLockUtil-schedule-pool-%d")
 .daemon(true)
 .build());

 private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();
 private static final String SUCCESS = "1";
 /**
 * 允许当前token续约
 */
 private static final Integer CAN_RENEW = 0;
 /**
 * 记录token的状态,0-可以续约,其他情况均不能续约
 */
 private static final Map<String, Integer> TOKEN_STATUS = Maps.newConcurrentMap();

 private final RedisTemplate<String, String> redisTemplate;

 /**
 * 释放锁,这个方法与 com.example.demo.utils.RedisLockUtil#simpleLock(java.lang.String, java.lang.Long) 配对。
 * <p>必须和RedisLockUtil#simpleLock是同一个线程</p>
 * @param key 需要释放锁的key
 * @return true-成功 false-失败
 */
 public boolean releaseSimpleLock(String key) {
 String token = THREAD_LOCAL.get();
 if (null != token) {
 TOKEN_STATUS.put(token, 1);
 }
 try {
 String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
 "then redis.call('expire', KEYS[1], 0) return '1' end " +
 "return '0'";
 DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
 String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token);
 log.info("非cluster模式简单分布式锁 - 释放key: {}, result : {}, token : {}", key, result, token);
 return SUCCESS.equals(result);
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 释放锁发生异常,key : {}", key, e);
 return false;
 } finally {
 THREAD_LOCAL.remove();
 if (null != token) {
 TOKEN_STATUS.remove(token);
 }
 }
 }

 /**
 * 简单分布式锁实现,续约周期是 expireTime 的一半。举个例子, expireTime = 8000,那么锁续约将会是每 4000 毫秒续约一次
 * <p>这个方法不考虑Redis的集群架构,不考虑脑裂问题,当只有一个 Redis来考虑。</p>
 * <p>这个方法使用 com.example.demo.utils.RedisLockUtil#releaseSimpleLock(java.lang.String) 来释放锁</p>
 * @param key 需要上锁的key
 * @param expireTime 过期时间,单位:毫秒
 * @return true-成功 false-失败
 */
 public boolean simpleLock(String key, Long expireTime) {
 if (StringUtils.isBlank(key)) {
 log.warn("非cluster模式简单分布式锁 - key is blank");
 return false;
 }
 if (null == expireTime || expireTime <= 0) {
 expireTime = 0L;
 }
 // 续约周期,单位纳秒
 long renewPeriod = expireTime / 2 * 1000_000;
 try {
 String token = System.currentTimeMillis() + ":" + UUID.randomUUID();
 // 设置锁
 Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
 if (Boolean.FALSE.equals(result)) {
 return false;
 }
 log.info("非cluster模式简单分布式锁 - 上锁成功,key : {}, token : {}", key, token);
 // 上锁成功后将令牌绑定当前线程
 THREAD_LOCAL.set(token);
 TOKEN_STATUS.put(token, 0);
 if (renewPeriod > 0) {
 // 续约任务
 renewTask(key, token, expireTime, renewPeriod);
 }
 return true;
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 上锁发生异常,key : {}", key, e);
 String token = THREAD_LOCAL.get();
 if (StringUtils.isNotBlank(token)) {
 if (!releaseSimpleLock(key)) {
 log.warn("非cluster模式简单分布式锁 - 释放锁发生失败,key : {}, token : {}", key, token);
 }
 }
 return false;
 }
 }

 /**
 * 锁续约任务
 * @param key 需要续命的key
 * @param token 成功获锁的线程持有的令牌
 * @param expireTime 过期时间,单位:毫秒
 * @param renewPeriod 续约周期,单位:纳秒
 */
 private void renewTask(String key, String token, long expireTime, long renewPeriod) {
 if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
 EXECUTOR_SERVICE.schedule(() -> {
 if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
 try {
 String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
 "then " +
 " if (redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])) " +
 " then return '1' else return redis.call('get', KEYS[1]) end " +
 "end " +
 "return redis.call('get', KEYS[1])";
 DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
 String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token, String.valueOf(expireTime));
 if (SUCCESS.equals(result)) {
 // 续约成功
 log.debug("非cluster模式简单分布式锁 - 锁续约成功,key : {}", key);
 // 开启下一次续约任务
 renewTask(key, token, expireTime, renewPeriod);
 } else {
 // 打印下 result,看下是否因为不再持有令牌导致的续约失败
 log.warn("非cluster模式简单分布式锁 - 锁续约失败,key : {}, token : {}, result : {}", key, token, result);
 }
 } catch (Exception e) {
 // 这里异常是抛不出去的,所以需要 catch 打印
 log.error("非cluster模式简单分布式锁 - 锁续约发生异常,key : {}", key, e);
 }
 }
 }, renewPeriod, TimeUnit.NANOSECONDS);
 }
 }
}
这里还有一个问题:如果redis.call('get', KEYS[1]) == ARGV[1] 成立,但是执行redis.call('expire', KEYS[1], 0) 失败,怎么办?我这里已经执行了THREAD_LOCAL.remove(),想重复释放是不可能的了,但是我这里不能不 remove 或者仅当 Redis 释放锁成功才 remove,这样存在内存泄漏的风险。要怎么处理呢?

这是优化后的代码:

package com.example.demo.utils;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author CaiZhuliang
 * @date 2023/8/31
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisSimpleLockUtil {
 private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
 new BasicThreadFactory.Builder()
 .namingPattern("redisSimpleLockUtil-schedule-pool-%d")
 .daemon(true)
 .build());

 private static final ThreadLocal<String> THREAD_LOCAL_TOKEN = new ThreadLocal<>();
 private static final String SUCCESS = "1";
 /**
 * 允许当前token续约
 */
 private static final Integer CAN_RENEW = 0;
 /**
 * 记录token的状态,0-可以续约,其他情况均不能续约
 */
 private static final Map<String, Integer> TOKEN_STATUS = Maps.newConcurrentMap();

 private final RedisTemplate<String, String> redisTemplate;

 /**
 * 释放锁
 * <p>必须和 RedisSimpleLockUtil#lock 是同一个线程</p>
 * @param key key 需要释放锁的key
 * @param token 持有的令牌
 * @return true-成功 false-失败
 */
 public boolean releaseLock(String key, String token) {
 if (StringUtils.isBlank(token)) {
 return false;
 }
 TOKEN_STATUS.put(token, 1);
 try {
 String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
 "then redis.call('expire', KEYS[1], 0) return '1' end " +
 "return '0'";
 DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
 String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token);
 log.info("非cluster模式简单分布式锁 - 释放key: {}, result : {}, token : {}", key, result, token);
 if (SUCCESS.equals(result)) {
 return true;
 }
 String remoteToken = redisTemplate.opsForValue().get(key);
 if (token.equals(remoteToken)) {
 log.warn("非cluster模式简单分布式锁 - 释放锁失败,key : {}, token : {}", key, token);
 return false;
 }
 return true;
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 释放锁发生异常,key : {}, token : {}", key, token, e);
 return false;
 } finally {
 THREAD_LOCAL_TOKEN.remove();
 TOKEN_STATUS.remove(token);
 }
 }

 /**
 * 简单分布式锁实现,续约周期是 expireTime 的一半。举个例子, expireTime = 8000,那么锁续约将会是每 4000 毫秒续约一次
 * <p>这个方法不考虑Redis的集群架构,不考虑脑裂问题,当只有一个Redis来考虑。</p>
 * @param key 需要上锁的key
 * @param expireTime 过期时间,单位:毫秒
 * @return 上锁成功返回令牌,失败则返回空串
 */
 public String lock(String key, Long expireTime) {
 if (StringUtils.isBlank(key)) {
 log.warn("非cluster模式简单分布式锁 - key is blank");
 return StringUtils.EMPTY;
 }
 if (null == expireTime || expireTime <= 0) {
 expireTime = 0L;
 }
 // 续约周期,单位纳秒
 long renewPeriod = expireTime * 500_000;
 try {
 String token = System.currentTimeMillis() + ":" + UUID.randomUUID();
 // 设置锁
 Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
 if (Boolean.FALSE.equals(result)) {
 return StringUtils.EMPTY;
 }
 log.info("非cluster模式简单分布式锁 - 上锁成功,key : {}, token : {}", key, token);
 // 上锁成功后将令牌绑定当前线程
 THREAD_LOCAL_TOKEN.set(token);
 TOKEN_STATUS.put(token, 0);
 if (renewPeriod > 0) {
 // 续约任务
 log.info("非cluster模式简单分布式锁 - 添加续约任务,key : {}, token : {}, renewPeriod : {}纳秒", key, token, renewPeriod);
 renewTask(key, token, expireTime, renewPeriod);
 }
 return token;
 } catch (Exception e) {
 String token = THREAD_LOCAL_TOKEN.get();
 log.error("非cluster模式简单分布式锁 - 上锁发生异常,key : {}, token : {}", key, token, e);
 return StringUtils.isBlank(token) ? StringUtils.EMPTY : token;
 }
 }

 /**
 * 锁续约任务
 * @param key 需要续命的key
 * @param token 成功获锁的线程持有的令牌
 * @param expireTime 过期时间,单位:毫秒
 * @param renewPeriod 续约周期,单位:纳秒
 */
 private void renewTask(String key, String token, long expireTime, long renewPeriod) {
 try {
 EXECUTOR_SERVICE.schedule(() -> {
 try {
 String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
 "then " +
 " if (redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])) " +
 " then return '1' else return redis.call('get', KEYS[1]) end " +
 "end " +
 "return redis.call('get', KEYS[1])";
 DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
 String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token, String.valueOf(expireTime));
 if (SUCCESS.equals(result)) {
 // 续约成功
 log.debug("非cluster模式简单分布式锁 - 锁续约成功,key : {}, token : {}", key, token);
 // 这里加判断是为了减少定时任务
 if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
 // 开启下一次续约任务
 renewTask(key, token, expireTime, renewPeriod);
 }
 } else {
 // 这里加判断是为了防止误打印warn日志
 if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
 log.warn("非cluster模式简单分布式锁 - 锁续约失败,key : {}, token : {}, result : {}", key, token, result);
 }
 }
 } catch (Exception e) {
 // 这里异常是抛不出去的,所以需要 catch 打印
 log.error("非cluster模式简单分布式锁 - 锁续约发生异常,key : {}, token : {}", key, token, e);
 }
 }, renewPeriod, TimeUnit.NANOSECONDS);
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 添加锁续约任务发生异常,key : {}, token : {}", key, token, e);
 }
 }
}
 
package com.example.demo.utils;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author CaiZhuliang
 * @date 2023/8/31
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisSimpleLockUtil {
 private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
 new BasicThreadFactory.Builder()
 .namingPattern("redisSimpleLockUtil-schedule-pool-%d")
 .daemon(true)
 .build());

 private static final ThreadLocal<String> THREAD_LOCAL_TOKEN = new ThreadLocal<>();
 private static final String SUCCESS = "1";
 /**
 * 允许当前token续约
 */
 private static final Integer CAN_RENEW = 0;
 /**
 * 记录token的状态,0-可以续约,其他情况均不能续约
 */
 private static final Map<String, Integer> TOKEN_STATUS = Maps.newConcurrentMap();

 private final RedisTemplate<String, String> redisTemplate;

 /**
 * 释放锁
 * <p>必须和 RedisSimpleLockUtil#lock 是同一个线程</p>
 * @param key key 需要释放锁的key
 * @param token 持有的令牌
 * @return true-成功 false-失败
 */
 public boolean releaseLock(String key, String token) {
 if (StringUtils.isBlank(token)) {
 return false;
 }
 TOKEN_STATUS.put(token, 1);
 try {
 String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
 "then redis.call('expire', KEYS[1], 0) return '1' end " +
 "return '0'";
 DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
 String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token);
 log.info("非cluster模式简单分布式锁 - 释放key: {}, result : {}, token : {}", key, result, token);
 if (SUCCESS.equals(result)) {
 return true;
 }
 String remoteToken = redisTemplate.opsForValue().get(key);
 if (token.equals(remoteToken)) {
 log.warn("非cluster模式简单分布式锁 - 释放锁失败,key : {}, token : {}", key, token);
 return false;
 }
 return true;
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 释放锁发生异常,key : {}, token : {}", key, token, e);
 return false;
 } finally {
 THREAD_LOCAL_TOKEN.remove();
 TOKEN_STATUS.remove(token);
 }
 }

 /**
 * 简单分布式锁实现,续约周期是 expireTime 的一半。举个例子, expireTime = 8000,那么锁续约将会是每 4000 毫秒续约一次
 * <p>这个方法不考虑Redis的集群架构,不考虑脑裂问题,当只有一个Redis来考虑。</p>
 * @param key 需要上锁的key
 * @param expireTime 过期时间,单位:毫秒
 * @return 上锁成功返回令牌,失败则返回空串
 */
 public String lock(String key, Long expireTime) {
 if (StringUtils.isBlank(key)) {
 log.warn("非cluster模式简单分布式锁 - key is blank");
 return StringUtils.EMPTY;
 }
 if (null == expireTime || expireTime <= 0) {
 expireTime = 0L;
 }
 // 续约周期,单位纳秒
 long renewPeriod = expireTime * 500_000;
 try {
 String token = System.currentTimeMillis() + ":" + UUID.randomUUID();
 // 设置锁
 Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
 if (Boolean.FALSE.equals(result)) {
 return StringUtils.EMPTY;
 }
 log.info("非cluster模式简单分布式锁 - 上锁成功,key : {}, token : {}", key, token);
 // 上锁成功后将令牌绑定当前线程
 THREAD_LOCAL_TOKEN.set(token);
 TOKEN_STATUS.put(token, 0);
 if (renewPeriod > 0) {
 // 续约任务
 log.info("非cluster模式简单分布式锁 - 添加续约任务,key : {}, token : {}, renewPeriod : {}纳秒", key, token, renewPeriod);
 renewTask(key, token, expireTime, renewPeriod);
 }
 return token;
 } catch (Exception e) {
 String token = THREAD_LOCAL_TOKEN.get();
 log.error("非cluster模式简单分布式锁 - 上锁发生异常,key : {}, token : {}", key, token, e);
 return StringUtils.isBlank(token) ? StringUtils.EMPTY : token;
 }
 }

 /**
 * 锁续约任务
 * @param key 需要续命的key
 * @param token 成功获锁的线程持有的令牌
 * @param expireTime 过期时间,单位:毫秒
 * @param renewPeriod 续约周期,单位:纳秒
 */
 private void renewTask(String key, String token, long expireTime, long renewPeriod) {
 try {
 EXECUTOR_SERVICE.schedule(() -> {
 try {
 String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
 "then " +
 " if (redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])) " +
 " then return '1' else return redis.call('get', KEYS[1]) end " +
 "end " +
 "return redis.call('get', KEYS[1])";
 DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
 String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token, String.valueOf(expireTime));
 if (SUCCESS.equals(result)) {
 // 续约成功
 log.debug("非cluster模式简单分布式锁 - 锁续约成功,key : {}, token : {}", key, token);
 // 这里加判断是为了减少定时任务
 if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
 // 开启下一次续约任务
 renewTask(key, token, expireTime, renewPeriod);
 }
 } else {
 // 这里加判断是为了防止误打印warn日志
 if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
 log.warn("非cluster模式简单分布式锁 - 锁续约失败,key : {}, token : {}, result : {}", key, token, result);
 }
 }
 } catch (Exception e) {
 // 这里异常是抛不出去的,所以需要 catch 打印
 log.error("非cluster模式简单分布式锁 - 锁续约发生异常,key : {}, token : {}", key, token, e);
 }
 }, renewPeriod, TimeUnit.NANOSECONDS);
 } catch (Exception e) {
 log.error("非cluster模式简单分布式锁 - 添加锁续约任务发生异常,key : {}, token : {}", key, token, e);
 }
 }
}

下面是并发单元测试代码:

@Test
 public void concurrencyTest() {
 String[] nums = {"1", "2", "3", "4", "5"};
 List<CompletableFuture<Void>> list = Lists.newArrayListWithExpectedSize(100);
 for (int i = 0; i < 50; i++) {
 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
 for (int count = 0; count < 10; count++) {
 int random = new Random().nextInt(100) % 5;
 String key = "test_" + nums[random];
 while (true) {
 String token = redisSimpleLockUtil.lock(key, 3_000L);
 if (StringUtils.isNotBlank(token)) {
 log.info("concurrencyTest - key : {}", key);
 try {
 Thread.sleep(new Random().nextInt(1500));
 } catch (Exception e) {
 log.error("concurrencyTest - 发生异常, key : {}", key, e);
 } finally {
 boolean unlock = redisSimpleLockUtil.releaseLock(key, token);
 if (!unlock) {
 log.error("concurrencyTest - 释放锁失败,key : {}", key);
 }
 }
 break;
 }
 }
 }
 });
 list.add(future);
 }
 CompletableFuture<?>[] futures = new CompletableFuture[list.size()];
 list.toArray(futures);
 CompletableFuture.allOf(futures).join();
 }

3.2 红锁

一般公司使用Redis 时都不可能是单节点的,要么主从+哨兵架构,要么就是 cluster 架构。面对集群,我们不得不思考如何应对脑裂这个问题。而 Redlock 是Redis官方网站给出的解决方案。

下面看下针对这两种集群架构的处理方式:

  1. 主从+哨兵

通过访问哨兵获取当前 master 节点,统计票数,超过半数的 master 节点就是真的 master。我们可以对比我们成功上锁的节点是否是真的 master node,从而避免脑裂问题。

  1. cluster
  2. 上锁需要在集群中半数以上的 master 操作成功了才算成功。

3.2.1 红锁的问题

红锁通过过半原则来规避脑裂,但是这就让我们不得不考虑访问节点的等待超时时间应该要多长。而且,也会降低Redis 分布式锁的吞吐量。如果有半数节点不可用,那么分布式锁也将变得不可用。因此,实际使用中,我们还要结合自己实际的业务场景来权衡要不要用红锁或者修改实现方案。

作者介绍

蔡柱梁,51CTO社区编辑,从事Java后端开发8年,做过传统项目广电BOSS系统,后投身互联网电商,负责过订单,TMS,中间件等。



Tags:分布式锁   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
在Redis中如何实现分布式锁的防死锁机制?
在Redis中实现分布式锁是一个常见的需求,可以通过使用Redlock算法来防止死锁。Redlock算法是一种基于多个独立Redis实例的分布式锁实现方案,它通过协调多个Redis实例之间的锁...【详细内容】
2024-02-20  Search: 分布式锁  点击:(47)  评论:(0)  加入收藏
手动撸一个 Redis 分布式锁
大家好呀,我是楼仔。今天第一天开工,收拾心情,又要开始好好学习,好好工作了。对于使用 Java 的小伙伴,其实我们完全不用手动撸一个分布式锁,直接使用 Redisson 就行。但是因为这些...【详细内容】
2024-02-19  Search: 分布式锁  点击:(39)  评论:(0)  加入收藏
Redis分布式锁常见坑点分析
日常开发中,基于 Redis 天然支持分布式锁,大家在线上分布式项目中都使用过 Redis 锁。本文主要针对日常开发中加锁过程中某些异常场景进行讲解与分析。本文讲解示例代码都在 h...【详细内容】
2023-12-11  Search: 分布式锁  点击:(111)  评论:(0)  加入收藏
一文详解分布式锁的看门狗机制
我们今天来看看这个 Redis 的看门狗机制,毕竟现在还是有很多是会使用 Redis 来实现分布式锁的,我们现在看看这个 Redis 是怎么实现分布式锁的,然后我们再来分析这个 Redis 的看...【详细内容】
2023-11-29  Search: 分布式锁  点击:(216)  评论:(0)  加入收藏
全新的分布式锁,功能简单且强大
作者:donnie4w链接:https://my.oschina.net/donnie4w/blog/10114233前言:分布式锁是分布式系统中一个极为重要的工具。目前有多种分布式锁的设计方案,比如借助 redis,mq,数据库,zoo...【详细内容】
2023-10-30  Search: 分布式锁  点击:(268)  评论:(0)  加入收藏
Redis分布式锁失效,数据是否仍存在于内存中?
正文大家好,我是小米,欢迎来到小米的技术分享!今天,我要和大家一起探讨一个有趣而又深奥的话题:Redis分布式锁失效了,数据还存在Redis内存中吗?这个问题在面试中经常被提出,也是我们...【详细内容】
2023-10-11  Search: 分布式锁  点击:(320)  评论:(0)  加入收藏
Redis魔法:点燃分布式锁的奇妙实现
分布式锁是一种用于在分布式系统中控制对共享资源的访问的锁。它与传统的单机锁不同,因为它需要在多个节点之间协调以确保互斥访问。本文将介绍什么是分布式锁,以及使用Redis...【详细内容】
2023-10-11  Search: 分布式锁  点击:(246)  评论:(0)  加入收藏
分布式锁,原来这么简单!
作者 | 蔡柱梁审校 | 重楼目录 分布式锁介绍 如何实现分布式锁 实现分布式锁1 分布式锁介绍现在的服务往往都是多节点,在一些特定的场景下容易产生并发问题,比如扣减库存,送完...【详细内容】
2023-09-22  Search: 分布式锁  点击:(232)  评论:(0)  加入收藏
lua+redis:分布式锁解决方案分享
介绍当我们涉及到多进程或多节点的分布式系统时,传统的单机锁机制不再足够应对并发控制的需求。这是因为在分布式环境中,多个进程或节点同时访问共享资源,传统锁无法有效地协调...【详细内容】
2023-09-13  Search: 分布式锁  点击:(342)  评论:(0)  加入收藏
分布式锁的3种实现!
分布式锁是一种用于保证分布式系统中多个进程或线程同步访问共享资源的技术。同时它又是面试中的常见问题,所以我们本文就重点来看分布式锁的具体实现(含实现代码)。在分布式系...【详细内容】
2023-09-13  Search: 分布式锁  点击:(309)  评论:(0)  加入收藏
▌简易百科推荐
对于微服务架构监控应该遵守的原则
随着软件交付方式的变革,微服务架构的兴起使得软件开发变得更加快速和灵活。在这种情况下,监控系统成为了微服务控制系统的核心组成部分。随着软件的复杂性不断增加,了解系统的...【详细内容】
2024-04-03  步步运维步步坑    Tags:架构   点击:(5)  评论:(0)  加入收藏
大模型应用的 10 种架构模式
作者 | 曹洪伟在塑造新领域的过程中,我们往往依赖于一些经过实践验证的策略、方法和模式。这种观念对于软件工程领域的专业人士来说,已经司空见惯,设计模式已成为程序员们的重...【详细内容】
2024-03-27    InfoQ  Tags:架构模式   点击:(13)  评论:(0)  加入收藏
哈啰云原生架构落地实践
一、弹性伸缩技术实践1.全网容器化后一线研发的使用问题全网容器化后一线研发会面临一系列使用问题,包括时机、容量、效率和成本问题,弹性伸缩是云原生容器化后的必然技术选择...【详细内容】
2024-03-27  哈啰技术  微信公众号  Tags:架构   点击:(10)  评论:(0)  加入收藏
DDD 与 CQRS 才是黄金组合
在日常工作中,你是否也遇到过下面几种情况: 使用一个已有接口进行业务开发,上线后出现严重的性能问题,被老板当众质疑:“你为什么不使用缓存接口,这个接口全部走数据库,这怎么能扛...【详细内容】
2024-03-27  dbaplus社群    Tags:DDD   点击:(11)  评论:(0)  加入收藏
高并发架构设计(三大利器:缓存、限流和降级)
软件系统有三个追求:高性能、高并发、高可用,俗称三高。本篇讨论高并发,从高并发是什么到高并发应对的策略、缓存、限流、降级等。引言1.高并发背景互联网行业迅速发展,用户量剧...【详细内容】
2024-03-13    阿里云开发者  Tags:高并发   点击:(6)  评论:(0)  加入收藏
如何判断架构设计的优劣?
架构设计的基本准则是非常重要的,它们指导着我们如何构建可靠、可维护、可测试的系统。下面是这些准则的转换表达方式:简单即美(KISS):KISS原则的核心思想是保持简单。在设计系统...【详细内容】
2024-02-20  二进制跳动  微信公众号  Tags:架构设计   点击:(36)  评论:(0)  加入收藏
详解基于SpringBoot的WebSocket应用开发
在现代Web应用中,实时交互和数据推送的需求日益增长。WebSocket协议作为一种全双工通信协议,允许服务端与客户端之间建立持久性的连接,实现实时、双向的数据传输,极大地提升了用...【详细内容】
2024-01-30  ijunfu  今日头条  Tags:SpringBoot   点击:(9)  评论:(0)  加入收藏
PHP+Go 开发仿简书,实战高并发高可用微服务架构
来百度APP畅享高清图片//下栽のke:chaoxingit.com/2105/PHP和Go语言结合,可以开发出高效且稳定的仿简书应用。在实现高并发和高可用微服务架构时,我们可以采用一些关键技术。首...【详细内容】
2024-01-14  547蓝色星球    Tags:架构   点击:(115)  评论:(0)  加入收藏
GraalVM与Spring Boot 3.0:加速应用性能的完美融合
在2023年,SpringBoot3.0的发布标志着Spring框架对GraalVM的全面支持,这一支持是对Spring技术栈的重要补充。GraalVM是一个高性能的多语言虚拟机,它提供了Ahead-of-Time(AOT)编...【详细内容】
2024-01-11    王建立  Tags:Spring Boot   点击:(124)  评论:(0)  加入收藏
Spring Boot虚拟线程的性能还不如Webflux?
早上看到一篇关于Spring Boot虚拟线程和Webflux性能对比的文章,觉得还不错。内容较长,抓重点给大家介绍一下这篇文章的核心内容,方便大家快速阅读。测试场景作者采用了一个尽可...【详细内容】
2024-01-10  互联网架构小马哥    Tags:Spring Boot   点击:(115)  评论:(0)  加入收藏
站内最新
站内热门
站内头条