Серия Redis реализует распределенные блокировки на основе Jedis.
В автономной среде мы чаще всего используем автономную блокировку в пакете juc. Однако с учетом популярности распределенных проектов микросервисов блокировка в juc не может контролировать потокобезопасность распределенной среды блокировки, поскольку стенд. -alone lock может контролировать только один и тот же поток. Безопасность потоков в процессе не может контролировать безопасность потоков нескольких узлов, поэтому необходимо использовать распределенные блокировки.
Прежде чем учиться, сначала разберитесь с командами Redis.,setnx
иexpire
команда setnx
SETNX — это сокращение от SET, если оно не существует. Если значение ключа не существует, его можно установить, в противном случае его нельзя установить. Это немного похоже на принцип блокировки cas в juc.
# команда setnx, что эквивалентно совместному использованию команд set и nx.
setnx tkey aaa
EX: устанавливает указанное время истечения срока действия в секундах. PX: установите указанное время истечения срока действия (в миллисекундах). NX: устанавливает ключ, только если он не существует. XX: Устанавливается только в том случае, если ключ уже существует.
истечь команда
Если вы используете толькоsetnx
Срок годности не добавлен,Исключение возникает при снятии блокировки вручную.,Это приведет к тому, что замок невозможно будет разблокировать.,Так что мне все равно придется это добавитьexpire
команда для установки времени истечения срока действия。
Но есть еще одна проблема: если при установке времени истечения сообщается об ошибке, блокировка не будет снята. Поэтому, чтобы обеспечить атомарность, эти две команды нужно выполнять вместе.
# set время истечения срока действия ключа 10 секунд, nx: устанавливается, если ключ не существует.
set tkey aaa ex 10 nx
Основываясь на приведенном выше принципе, мы можем просто написать распределенную блокировку.
Среда проекта:
- IntelliJ IDEA
- smartGit
Сначала создайте пример проекта Jedis, интегрированного в Springboot.,Обратитесь к моему предыдущемублог,Общая диаграмма классов представлена на рисунке:
Напишите общий интерфейс для распределенных блокировок, поскольку в будущем распределенные блокировки могут быть реализованы с помощью другого промежуточного программного обеспечения.
package com.example.jedis.common;
public interface DistributedLock {
default boolean acquire(String lockKey, String requestId) {
return acquire(lockKey, requestId, RedisConstant.DEFAULT_EXPIRE);
}
default boolean acquire(String lockKey, String requestId, int expireTime) {
return acquire(lockKey, requestId, expireTime, RedisConstant.DEFAULT_TIMEOUT);
}
boolean acquire(String lockKey, String requestId, int expireTime, int timeout);
boolean release(String lockKey, String requestId);
}
Напишите абстрактный класс распределенной блокировки для реализации некоторой общей логики и оставьте реализацию других задач подклассам.
package com.example.jedis.common;
import lombok.extern.slf4j.Slf4j;
import java.net.SocketTimeoutException;
import java.util.concurrent.TimeUnit;
import static com.example.jedis.common.RedisConstant.DEFAULT_EXPIRE;
import static com.example.jedis.common.RedisConstant.DEFAULT_TIMEOUT;
@Slf4j
public abstract class AbstractDistributedLock implements DistributedLock {
@Override
public boolean acquire(String lockKey, String requestId, int expireTime, int timeout) {
expireTime = expireTime <= 0 ? DEFAULT_EXPIRE : expireTime;
timeout = timeout < 0 ? DEFAULT_TIMEOUT : timeout * 1000;
long start = System.currentTimeMillis();
try {
do {
if (doAcquire(lockKey, requestId, expireTime)) {
watchDog(lockKey, requestId, expireTime);
return true;
}
TimeUnit.MILLISECONDS.sleep(100);
} while (System.currentTimeMillis() - start < timeout);
} catch (Exception e) {
Throwable cause = e.getCause();
if (cause instanceof SocketTimeoutException) {
// ignore exception
log.error("sockTimeout exception:{}", e);
}
else if (cause instanceof InterruptedException) {
// ignore exception
log.error("Interrupted exception:{}", e);
}
else {
log.error("lock acquire exception:{}", e);
}
throw new LockException(e.getMessage(), e);
}
return false;
}
@Override
public boolean release(String lockKey, String requestId) {
try {
return doRelease(lockKey, requestId);
} catch (Exception e) {
log.error("lock release exception:{}", e);
throw new LockException(e.getMessage(), e);
}
}
protected abstract boolean doAcquire(String lockKey, String requestId, int expireTime);
protected abstract boolean doRelease(String lockKey, String requestId);
protected abstract void watchDog(String lockKey, String requestId, int expireTime);
}
Абстрактный класс распределенной блокировки Redis
package com.example.jedis.common;
public abstract class AbstractRedisLock extends AbstractDistributedLock{
}
Класс реализации распределенной блокировки, основанный на Jedis, в основном контролирует атомарность разблокировки с помощью сценариев Lua, а также добавляет регулярное обновление сторожевого таймера, чтобы избежать ситуации, когда какое-то длительное время выполнения бизнеса является длительным и блокировка снимается.
package com.example.jedis.common;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class JedisLockTemplate extends AbstractRedisLock implements InitializingBean {
private String UNLOCK_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
private String WATCH_DOG_LUA = "local lock_key=KEYS[1]\n" +
"local lock_value=ARGV[1]\n" +
"local lock_ttl=ARGV[2]\n" +
"local current_value=redis.call('get',lock_key)\n" +
"local result=0\n" +
"if lock_value==current_value then\n" +
" redis.call('expire',lock_key,lock_ttl)\n" +
" result=1\n" +
"end\n" +
"return result";
private static final Long UNLOCK_SUCCESS = 1L;
private static final Long RENEWAL_SUCCESS = 1L;
@Autowired
private JedisTemplate jedisTemplate;
private ScheduledThreadPoolExecutor scheduledExecutorService;
@Override
public void afterPropertiesSet() throws Exception {
this.UNLOCK_LUA = jedisTemplate.scriptLoad(UNLOCK_LUA);
this.WATCH_DOG_LUA = jedisTemplate.scriptLoad(WATCH_DOG_LUA);
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
}
@Override
public boolean doAcquire(String lockKey, String requestId, int expire) {
return jedisTemplate.setnxex(lockKey, requestId, expire);
}
@Override
public boolean doRelease(String lockKey, String requestId) {
Object eval = jedisTemplate.evalsha(UNLOCK_LUA, CollUtil.newArrayList(lockKey), CollUtil.newArrayList(requestId));
if (UNLOCK_SUCCESS.equals(eval)) {
scheduledExecutorService.shutdown();
return true;
}
return false;
}
@Override
public void watchDog(String lockKey, String requestId, int expire) {
int period = getPeriod(expire);
if (scheduledExecutorService.isShutdown()) {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
}
scheduledExecutorService.scheduleAtFixedRate(
new WatchDogTask(scheduledExecutorService, CollUtil.newArrayList(lockKey), CollUtil.newArrayList(requestId, Convert.toStr(expire))),
1,
period,
TimeUnit.SECONDS
);
}
class WatchDogTask implements Runnable {
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
private List<String> keys;
private List<String> args;
public WatchDogTask(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, List<String> keys, List<String> args) {
this.scheduledThreadPoolExecutor = scheduledThreadPoolExecutor;
this.keys = keys;
this.args = args;
}
@Override
public void run() {
log.info("watch dog for renewal...");
Object evalsha = jedisTemplate.evalsha(WATCH_DOG_LUA, keys, args);
if (!evalsha.equals(RENEWAL_SUCCESS)) {
scheduledThreadPoolExecutor.shutdown();
}
log.info("renewal result:{}, keys:{}, args:{}", evalsha, keys, args);
}
}
private int getPeriod(int expire) {
if (expire < 1)
throw new LockException("Срок действия не может быть меньше 1");
return expire - 1;
}
}
Написание общего джедая часто включает класс инкапсуляции API.,setnxexплюсsynchronized
,Потому что Redis однопоточный,Добавить блокировку синхронизации,Избегайте одновременных запросов,Ситуация, когда jedispool не может загрузиться
package com.example.jedis.common;
import cn.hutool.core.collection.ConcurrentHashSet;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.params.SetParams;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
@Slf4j
@Component
public class JedisTemplate implements InitializingBean {
@Resource
private JedisPool jedisPool;
private Jedis jedis;
public JedisTemplate() {
}
@Override
public void afterPropertiesSet() {
jedis = jedisPool.getResource();
}
public <T> T execute(Function<Jedis, T> action) {
T apply = null;
try {
jedis = jedisPool.getResource();
apply = action.apply(jedis);
} catch (JedisException e) {
handleException(e);
throw e;
} finally {
jedis.close();
}
return apply;
}
public void execute(Consumer<Jedis> action) {
try {
jedis = jedisPool.getResource();
action.accept(jedis);
} catch (JedisException e) {
handleException(e);
throw e;
} finally {
jedis.close();
}
}
public JedisPool getJedisPool() {
return this.jedisPool;
}
public synchronized Boolean setnxex(final String key, final String value, int seconds) {
return execute(e -> {
SetParams setParams = new SetParams();
setParams.nx();
setParams.ex(seconds);
return isStatusOk(jedis.set(key, value, setParams));
});
}
public Object eval(final String script,final Integer keyCount,final String... params) {
return execute(e -> {
return jedis.eval(script, keyCount, params);
});
}
public Object eval(final String script, final List<String> keys, final List<String> params) {
return execute(e -> {
return jedis.eval(script, keys, params);
});
}
public Object evalsha(final String script, final List<String> keys, final List<String> params) {
return execute(e -> {
return jedis.evalsha(script, keys, params);
});
}
public String scriptLoad(final String script) {
return execute(e -> {
return jedis.scriptLoad(script);
});
}
protected void handleException(JedisException e) {
if (e instanceof JedisConnectionException) {
log.error("redis connection exception:{}", e);
} else if (e instanceof JedisDataException) {
log.error("jedis data exception:{}", e);
} else {
log.error("jedis exception:{}", e);
}
}
protected synchronized static boolean isStatusOk(String status) {
return status != null && ("OK".equals(status) || "+OK".equals(status));
}
}
постоянный класс
package com.example.jedis.common;
public class RedisConstant {
public static final Integer DEFAULT_EXPIRE = 30;
public static final Integer DEFAULT_TIMEOUT = 1;
}
Пользовательский класс исключений:
package com.example.jedis.common;
public class LockException extends RuntimeException{
public LockException(String message) {
super(message);
}
public LockException(String message, Throwable t) {
super(message, t);
}
}
Класс приложения, запущенный SpringBoot
package com.example.jedis;
import cn.hutool.core.date.StopWatch;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
@SpringBootApplication
@EnableScheduling
@EnableAsync
@Slf4j
public class SpringbootJedisApplication {
@Resource
RedisConnectionFactory factory;
public static void main(String[] args) {
StopWatch stopWatch = new StopWatch();
stopWatch.start("springbootJedis");
SpringApplication.run(SpringbootJedisApplication.class, args);
stopWatch.stop();
log.info("Время успешного запуска проекта Springboot: {}ms \n", stopWatch.getTotalTimeMillis());
log.info(stopWatch.prettyPrint());
}
@PreDestroy
public void flushDB() {
factory.getConnection().flushDb();
}
}
Приведенная выше логика в основном реализует распределенную блокировку, и вы также можете добавить собственную аннотацию для ее реализации.
package com.example.jedis.common;
import java.lang.annotation.*;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Lock {
String lockKey();
String requestId();
int expire() default 30;
int timeout() default 1;
}
Настройка класса аспекта для реализации бизнес-обработки
package com.example.jedis.common;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.concurrent.Future;
@Component
@Aspect
@Slf4j
public class WatchDog {
@Resource
private JedisLockTemplate jedisLockTemplate;
@Resource
private ThreadPoolTaskExecutor executor;
@Around("@annotation(Lock)")
public Object proxy (ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Method method = methodSignature.getMethod();
Lock lock = method.getAnnotation(Lock.class);
boolean acquire = jedisLockTemplate.acquire(lock.lockKey(), lock.requestId(), lock.expire(), lock.timeout());
if (!acquire)
throw new LockException("Не удалось получить блокировку!");
Future<Object> future = executor.submit(() -> {
try {
return joinPoint.proceed();
} catch (Throwable e) {
log.error("Ошибка выполнения задачи: {}", e);
jedisLockTemplate.release(lock.lockKey(), lock.requestId());
throw new RuntimeException("Ошибка выполнения задачи");
} finally {
jedisLockTemplate.release(lock.lockKey(), lock.requestId());
}
});
return future.get();
}
}
Я написал тестовый класс контроллера и начал использовать SpringBoot для тестирования этого класса, но обнаружил, что иногда время ожидания соединения все еще происходит. Это может быть ошибка совместимости платформы.
package com.example.jedis.controller;
import com.example.jedis.common.JedisLockTemplate;
import com.example.jedis.common.Lock;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.stream.IntStream;
@RestController
@Slf4j
public class TestController {
private static final String REDIS_KEY = "test:lock";
@Autowired
private JedisLockTemplate jedisLockTemplate;
@GetMapping("test")
public void test(@RequestParam("threadNum")Integer threadNum) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(threadNum);
IntStream.range(0, threadNum).forEach(e->{
new Thread(new RunnableTask(countDownLatch)).start();
});
countDownLatch.await();
}
@GetMapping("testLock")
@Lock(lockKey = "test:api", requestId = "123", expire = 5, timeout = 3)
public void testLock() throws InterruptedException {
doSomeThing();
}
class RunnableTask implements Runnable {
CountDownLatch countDownLatch;
public RunnableTask(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
redisLock();
countDownLatch.countDown();
}
}
private void redisLock() {
String requestId = getRequestId();
Boolean lock = jedisLockTemplate.acquire(REDIS_KEY, requestId, 5, 3);
if (lock) {
try {
doSomeThing();
} catch (Exception e) {
jedisLockTemplate.release(REDIS_KEY, requestId);
} finally {
jedisLockTemplate.release(REDIS_KEY, requestId);
}
} else {
log.warn("Не удалось получить блокировку!");
}
}
private void doSomeThing() throws InterruptedException {
log.info("do some thing");
Thread.sleep(15 * 1000);
}
private String getRequestId() {
String str="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
Random random=new Random();
StringBuffer sb=new StringBuffer();
for(int i=0;i<32;i++){
int number=random.nextInt(62);
sb.append(str.charAt(number));
}
return sb.toString();
}
}
# Имитация 100 одновременных запросов
curl http://127.0.0.1:8080/springboot-jedis/test?threadNum=100
Эта проблема возникает при запуске проекта,Это может быть класс junittest SpringBoot.,setnxex方法плюсsynchronize
блокировка синхронизации
java.net.SocketTimeoutException: Read timed out Could not get a resource from the pool
Краткое описание: В этой статье реализована распределенная блокировка на основе сценариев jedis и jua. Распределенная блокировка Redis основана на режиме AP, поэтому эффективность является относительно быстрой, но режим распределенного CP не может быть гарантирован. Если вы хотите обеспечить высокую согласованность, вы можете это сделать. Вы можете выбрать другое решение для распределенной блокировки. В этой статье также учитываются ситуации с длинными транзакциями и используется сторожевой таймер для обновления ключа.