Серия SpringBoot реализует распределенные блокировки на основе Jedis.
Серия SpringBoot реализует распределенные блокировки на основе Jedis.

Серия Redis реализует распределенные блокировки на основе Jedis.

1. Зачем нужны распределенные блокировки

В автономной среде мы чаще всего используем автономную блокировку в пакете juc. Однако с учетом популярности распределенных проектов микросервисов блокировка в juc не может контролировать потокобезопасность распределенной среды блокировки, поскольку стенд. -alone lock может контролировать только один и тот же поток. Безопасность потоков в процессе не может контролировать безопасность потоков нескольких узлов, поэтому необходимо использовать распределенные блокировки.

2. Принцип распределенной блокировки Redis

Прежде чем учиться, сначала разберитесь с командами Redis.,setnxиexpire

команда setnx

SETNX — это сокращение от SET, если оно не существует. Если значение ключа не существует, его можно установить, в противном случае его нельзя установить. Это немного похоже на принцип блокировки cas в juc.

Язык кода:javascript
копировать
# команда setnx, что эквивалентно совместному использованию команд set и nx.
setnx tkey aaa

EX: устанавливает указанное время истечения срока действия в секундах. PX: установите указанное время истечения срока действия (в миллисекундах). NX: устанавливает ключ, только если он не существует. XX: Устанавливается только в том случае, если ключ уже существует.

истечь команда

Если вы используете толькоsetnxСрок годности не добавлен,Исключение возникает при снятии блокировки вручную.,Это приведет к тому, что замок невозможно будет разблокировать.,Так что мне все равно придется это добавитьexpireкоманда для установки времени истечения срока действия。

  • Гарантированная атомарность

Но есть еще одна проблема: если при установке времени истечения сообщается об ошибке, блокировка не будет снята. Поэтому, чтобы обеспечить атомарность, эти две команды нужно выполнять вместе.

Язык кода:javascript
копировать
# set время истечения срока действия ключа 10 секунд, nx: устанавливается, если ключ не существует.
set tkey aaa ex 10 nx

3. Распределить блокировку на основе почерка джедая.

Основываясь на приведенном выше принципе, мы можем просто написать распределенную блокировку.

Среда проекта:

  • JDK 1.8
  • SpringBoot 2.2.1
  • Maven 3.2+
  • Mysql 8.0.26
  • spring-boot-starter-data-redis 2.2.1
  • jedis3.1.0
  • инструменты разработки
Язык кода:txt
копировать
-  IntelliJ IDEA
Язык кода:txt
копировать
-  smartGit

Сначала создайте пример проекта Jedis, интегрированного в Springboot.,Обратитесь к моему предыдущемублог,Общая диаграмма классов представлена ​​на рисунке:

Напишите общий интерфейс для распределенных блокировок, поскольку в будущем распределенные блокировки могут быть реализованы с помощью другого промежуточного программного обеспечения.

Язык кода:javascript
копировать
package com.example.jedis.common;

public interface DistributedLock {

    default boolean acquire(String lockKey, String requestId) {
        return acquire(lockKey, requestId, RedisConstant.DEFAULT_EXPIRE);
    }

    default boolean acquire(String lockKey, String requestId, int expireTime) {
        return acquire(lockKey, requestId, expireTime, RedisConstant.DEFAULT_TIMEOUT);
    }

    boolean acquire(String lockKey, String requestId, int expireTime, int timeout);

    boolean release(String lockKey, String requestId);

}

Напишите абстрактный класс распределенной блокировки для реализации некоторой общей логики и оставьте реализацию других задач подклассам.

Язык кода:javascript
копировать
package com.example.jedis.common;

import lombok.extern.slf4j.Slf4j;

import java.net.SocketTimeoutException;
import java.util.concurrent.TimeUnit;

import static com.example.jedis.common.RedisConstant.DEFAULT_EXPIRE;
import static com.example.jedis.common.RedisConstant.DEFAULT_TIMEOUT;

@Slf4j
public abstract class AbstractDistributedLock implements DistributedLock {

    @Override
    public boolean acquire(String lockKey, String requestId, int expireTime, int timeout) {
        expireTime = expireTime <= 0 ? DEFAULT_EXPIRE : expireTime;
        timeout = timeout < 0 ? DEFAULT_TIMEOUT : timeout * 1000;

        long start = System.currentTimeMillis();
        try {
            do {
                if (doAcquire(lockKey, requestId, expireTime)) {
                    watchDog(lockKey, requestId, expireTime);
                    return true;
                }
                TimeUnit.MILLISECONDS.sleep(100);
            } while (System.currentTimeMillis() - start < timeout);

        } catch (Exception e) {
            Throwable cause = e.getCause();
            if (cause instanceof SocketTimeoutException) {
                // ignore exception
                log.error("sockTimeout exception:{}", e);
            }
            else if (cause instanceof  InterruptedException) {
                // ignore exception
                log.error("Interrupted exception:{}", e);
            }
            else {
                log.error("lock acquire exception:{}", e);
            }
            throw new LockException(e.getMessage(), e);
        }
        return false;
    }

    @Override
    public boolean release(String lockKey, String requestId) {
        try {
            return doRelease(lockKey, requestId);
        } catch (Exception e) {
            log.error("lock release exception:{}", e);
            throw new LockException(e.getMessage(), e);
        }
    }

    protected abstract boolean doAcquire(String lockKey, String requestId, int expireTime);

    protected abstract boolean doRelease(String lockKey, String requestId);

    protected abstract void watchDog(String lockKey, String requestId, int expireTime);

}

Абстрактный класс распределенной блокировки Redis

Язык кода:javascript
копировать
package com.example.jedis.common;

public abstract class AbstractRedisLock extends AbstractDistributedLock{

}

Класс реализации распределенной блокировки, основанный на Jedis, в основном контролирует атомарность разблокировки с помощью сценариев Lua, а также добавляет регулярное обновление сторожевого таймера, чтобы избежать ситуации, когда какое-то длительное время выполнения бизнеса является длительным и блокировка снимается.

Язык кода:javascript
копировать
package com.example.jedis.common;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class JedisLockTemplate extends AbstractRedisLock implements InitializingBean {

    private String UNLOCK_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

    private String WATCH_DOG_LUA = "local lock_key=KEYS[1]\n" +
            "local lock_value=ARGV[1]\n" +
            "local lock_ttl=ARGV[2]\n" +
            "local current_value=redis.call('get',lock_key)\n" +
            "local result=0\n" +
            "if lock_value==current_value then\n" +
            "    redis.call('expire',lock_key,lock_ttl)\n" +
            "    result=1\n" +
            "end\n" +
            "return result";

    private static final Long UNLOCK_SUCCESS = 1L;

    private static final Long RENEWAL_SUCCESS = 1L;

    @Autowired
    private JedisTemplate jedisTemplate;

    private ScheduledThreadPoolExecutor scheduledExecutorService;


    @Override
    public void afterPropertiesSet() throws Exception {
        this.UNLOCK_LUA = jedisTemplate.scriptLoad(UNLOCK_LUA);
        this.WATCH_DOG_LUA = jedisTemplate.scriptLoad(WATCH_DOG_LUA);
        scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    }


    @Override
    public boolean doAcquire(String lockKey, String requestId, int expire) {
        return jedisTemplate.setnxex(lockKey, requestId, expire);
    }

    @Override
    public boolean doRelease(String lockKey, String requestId) {
        Object eval = jedisTemplate.evalsha(UNLOCK_LUA, CollUtil.newArrayList(lockKey), CollUtil.newArrayList(requestId));
        if (UNLOCK_SUCCESS.equals(eval)) {
            scheduledExecutorService.shutdown();
            return true;
        }
        return false;
    }

    @Override
    public void watchDog(String lockKey, String requestId, int expire) {
        int period = getPeriod(expire);
        if (scheduledExecutorService.isShutdown()) {
            scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
        }
        scheduledExecutorService.scheduleAtFixedRate(
                new WatchDogTask(scheduledExecutorService, CollUtil.newArrayList(lockKey), CollUtil.newArrayList(requestId, Convert.toStr(expire))),
                1,
                period,
                TimeUnit.SECONDS
                );
    }

    class WatchDogTask implements Runnable {

        private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
        private List<String> keys;
        private List<String> args;

        public WatchDogTask(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, List<String> keys, List<String> args) {
            this.scheduledThreadPoolExecutor = scheduledThreadPoolExecutor;
            this.keys = keys;
            this.args = args;
        }

        @Override
        public void run() {
            log.info("watch dog for renewal...");
            Object evalsha = jedisTemplate.evalsha(WATCH_DOG_LUA, keys, args);
            if (!evalsha.equals(RENEWAL_SUCCESS)) {
                scheduledThreadPoolExecutor.shutdown();
            }
            log.info("renewal result:{}, keys:{}, args:{}", evalsha, keys, args);
        }
    }

    private int getPeriod(int expire) {
        if (expire < 1)
            throw new LockException("Срок действия не может быть меньше 1");
        return expire - 1;
    }



}

Написание общего джедая часто включает класс инкапсуляции API.,setnxexплюсsynchronized,Потому что Redis однопоточный,Добавить блокировку синхронизации,Избегайте одновременных запросов,Ситуация, когда jedispool не может загрузиться

Язык кода:javascript
копировать
package com.example.jedis.common;

import cn.hutool.core.collection.ConcurrentHashSet;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.params.SetParams;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;


@Slf4j
@Component
public class JedisTemplate implements InitializingBean {



    @Resource
    private JedisPool jedisPool;

    private Jedis jedis;

    public JedisTemplate() {

    }

    @Override
    public void afterPropertiesSet() {
        jedis = jedisPool.getResource();
    }


    public <T> T execute(Function<Jedis, T> action) {
        T apply = null;
        try {
            jedis = jedisPool.getResource();
            apply = action.apply(jedis);
        } catch (JedisException e) {
            handleException(e);
            throw e;
        } finally {
            jedis.close();
        }
        return apply;
    }

    public void execute(Consumer<Jedis> action) {
        try {
            jedis = jedisPool.getResource();
            action.accept(jedis);
        } catch (JedisException e) {
            handleException(e);
            throw e;
        } finally {
            jedis.close();
        }
    }

    public JedisPool getJedisPool() {
        return this.jedisPool;
    }

   
    public synchronized Boolean setnxex(final String key, final String value, int seconds) {
        return execute(e -> {
            SetParams setParams = new SetParams();
            setParams.nx();
            setParams.ex(seconds);
            return isStatusOk(jedis.set(key, value, setParams));
        });
    }
    

    public Object eval(final String script,final Integer keyCount,final String... params) {
        return execute(e -> {
            return jedis.eval(script, keyCount, params);
        });
    }

    public Object eval(final String script, final List<String> keys, final List<String> params) {
        return execute(e -> {
            return jedis.eval(script, keys, params);
        });
    }

    public Object evalsha(final String script, final List<String> keys, final List<String> params) {
        return execute(e -> {
            return jedis.evalsha(script, keys, params);
        });
    }

    public String scriptLoad(final String script) {
        return execute(e -> {
            return jedis.scriptLoad(script);
        });
    }
    

    protected void handleException(JedisException e) {
        if (e instanceof JedisConnectionException) {
            log.error("redis connection exception:{}", e);
        } else if (e instanceof JedisDataException) {
            log.error("jedis data exception:{}", e);
        } else {
            log.error("jedis exception:{}", e);
        }
    }

    protected synchronized static boolean isStatusOk(String status) {
        return status != null && ("OK".equals(status) || "+OK".equals(status));
    }

}

постоянный класс

Язык кода:javascript
копировать
package com.example.jedis.common;

public class RedisConstant {

    public static final Integer DEFAULT_EXPIRE = 30;
    public static final Integer DEFAULT_TIMEOUT = 1;


}

Пользовательский класс исключений:

Язык кода:javascript
копировать
package com.example.jedis.common;

public class LockException extends RuntimeException{

    public LockException(String message) {
        super(message);
    }

    public LockException(String message, Throwable t) {
        super(message, t);
    }

}

Класс приложения, запущенный SpringBoot

Язык кода:javascript
копировать
package com.example.jedis;

import cn.hutool.core.date.StopWatch;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;


@SpringBootApplication
@EnableScheduling
@EnableAsync
@Slf4j
public class SpringbootJedisApplication {

    @Resource
    RedisConnectionFactory factory;


    public static void main(String[] args) {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start("springbootJedis");
        SpringApplication.run(SpringbootJedisApplication.class, args);
        stopWatch.stop();
        log.info("Время успешного запуска проекта Springboot: {}ms \n", stopWatch.getTotalTimeMillis());
        log.info(stopWatch.prettyPrint());
    }

    @PreDestroy
    public void flushDB() {
        factory.getConnection().flushDb();
    }

}

Приведенная выше логика в основном реализует распределенную блокировку, и вы также можете добавить собственную аннотацию для ее реализации.

Язык кода:javascript
копировать
package com.example.jedis.common;

import java.lang.annotation.*;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Lock {

    String lockKey();

    String requestId();

    int expire() default 30;

    int timeout() default  1;

}

Настройка класса аспекта для реализации бизнес-обработки

Язык кода:javascript
копировать
package com.example.jedis.common;


import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.concurrent.Future;

@Component
@Aspect
@Slf4j
public class WatchDog {

    @Resource
    private JedisLockTemplate jedisLockTemplate;

    @Resource
    private ThreadPoolTaskExecutor executor;


    @Around("@annotation(Lock)")
    public Object proxy (ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        Method method = methodSignature.getMethod();
        Lock lock = method.getAnnotation(Lock.class);

        boolean acquire = jedisLockTemplate.acquire(lock.lockKey(), lock.requestId(), lock.expire(), lock.timeout());
        if (!acquire)
            throw new LockException("Не удалось получить блокировку!");

        Future<Object> future = executor.submit(() -> {
            try {
                return joinPoint.proceed();
            } catch (Throwable e) {
                log.error("Ошибка выполнения задачи: {}", e);
                jedisLockTemplate.release(lock.lockKey(), lock.requestId());
                throw new RuntimeException("Ошибка выполнения задачи");
            } finally {
                jedisLockTemplate.release(lock.lockKey(), lock.requestId());
            }
        });

        return future.get();
    }


}

Я написал тестовый класс контроллера и начал использовать SpringBoot для тестирования этого класса, но обнаружил, что иногда время ожидания соединения все еще происходит. Это может быть ошибка совместимости платформы.

Язык кода:javascript
копировать
package com.example.jedis.controller;

import com.example.jedis.common.JedisLockTemplate;
import com.example.jedis.common.Lock;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.stream.IntStream;

@RestController
@Slf4j
public class TestController {


    private static final String REDIS_KEY = "test:lock";

    @Autowired
    private JedisLockTemplate jedisLockTemplate;

    @GetMapping("test")
    public void test(@RequestParam("threadNum")Integer threadNum) throws InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        IntStream.range(0, threadNum).forEach(e->{
            new Thread(new RunnableTask(countDownLatch)).start();
        });
        countDownLatch.await();


    }

    @GetMapping("testLock")
    @Lock(lockKey = "test:api", requestId = "123", expire = 5, timeout = 3)
    public void testLock() throws InterruptedException {
        doSomeThing();
    }

    class RunnableTask implements Runnable {

        CountDownLatch countDownLatch;

        public RunnableTask(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            redisLock();
            countDownLatch.countDown();
        }


    }


    private void redisLock() {
        String requestId = getRequestId();
        Boolean lock = jedisLockTemplate.acquire(REDIS_KEY, requestId, 5, 3);
        if (lock) {
            try {
                doSomeThing();
            } catch (Exception e) {
                jedisLockTemplate.release(REDIS_KEY, requestId);
            } finally {
                jedisLockTemplate.release(REDIS_KEY, requestId);
            }
        } else {
            log.warn("Не удалось получить блокировку!");
        }
    }

    private void doSomeThing() throws InterruptedException {
        log.info("do some thing");
        Thread.sleep(15 * 1000);
    }

    private String getRequestId() {
        String str="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
        Random random=new Random();
        StringBuffer sb=new StringBuffer();
        for(int i=0;i<32;i++){
            int number=random.nextInt(62);
            sb.append(str.charAt(number));
        }
        return sb.toString();

    }



}
Язык кода:javascript
копировать
# Имитация 100 одновременных запросов
curl http://127.0.0.1:8080/springboot-jedis/test?threadNum=100

Эта проблема возникает при запуске проекта,Это может быть класс junittest SpringBoot.,setnxex方法плюсsynchronizeблокировка синхронизации

java.net.SocketTimeoutException: Read timed out Could not get a resource from the pool

Краткое описание: В этой статье реализована распределенная блокировка на основе сценариев jedis и jua. Распределенная блокировка Redis основана на режиме AP, поэтому эффективность является относительно быстрой, но режим распределенного CP не может быть гарантирован. Если вы хотите обеспечить высокую согласованность, вы можете это сделать. Вы можете выбрать другое решение для распределенной блокировки. В этой статье также учитываются ситуации с длинными транзакциями и используется сторожевой таймер для обновления ключа.

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose