Привет всем, я рыбак. В этом аккаунте недавно запущена серия «Go Toolbox», цель которой — поделиться со всеми практичными и интересными инструментами, написанными на языке Go.
Сегодня я рекомендую вам версию инструмента распределенной блокировки Go, основанную на Redis: redsync. Этот инструмент также рекомендуется на официальном сайте Redis. Пакет redsync обладает характеристиками высокой производительности, высокой доступности, защиты от взаимоблокировок и случайного удаления.
redsyncмаленький файл | |||
---|---|---|---|
star | 2k | used by | 276 |
contributors | 21 | Классификация инструментов | Распределенная блокировка |
Введение функции | Высокая доступность на основе Redis、высокая производительность、Анти-тупиковая ситуация、Распределенная блокировка реализации для предотвращения случайного удаления | ||
Адрес проекта | https://github.com/go-redsync/redsync | ||
автор | hjr265 | ||
Сопутствующие знания | Распределенная Основы блока, шаблон функциональных опций、атомарная операция redis+lua |
что такое Распределенная блокировка
Замок,В программированииязык это переменная,Эта переменная может одновременно принадлежать только одному потоку.,Чтобы защитить общие данные, одновременно может работать только один поток. И Распределенная блокировка - это тоже замок.,Прямо сейчасраспределенныйв системе Замок。Должен Замокиспользуется для решения проблемыраспределенный Проблемы с контролем доступа к общим ресурсам в системе.。
Распределенная блокировка Распространенные сценарии использования
1. Наиболее распространенный вычет запасов 2. Пробой кэша/лавина кэша (также можно использовать Распределенная блокировка) 3. В сценариях с высоким уровнем параллелизма предотвращайте передачу обратного трафика и т. д.
Установить
go get github.com/go-redsync/redsync/v4
Основное использование
Должен Сумкаиз Это также очень просто в использовании。первый Создайте клиентское соединение Redis。а потом Должен Клиентское соединение присоединяется кredisизPoolсередина。наконец,redsyncна основе ДолженredisPoolсоздавать экземпляр。тогда пройдиredsyncПримеризNewMutex就可以на основе一个具体изkeyСоздать новый Распределенная блокировка。Затем добавьте Замокпримирение Замокдействовать。
Этот пакет можно создать двумя способами: в автономном режиме и в кластерном режиме на основе Redis. Есть два основных различия в использовании:
Давайте рассмотрим основное использование в двух конкретных режимах. Следующий пример кода основан на использовании автономного режима Redis. Используйте NewClient для создания соединения при инициализации клиентского соединения. следующее:
package main
import (
goredislib "github.com/go-redis/redis/v8"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)
func main() {
// Создайте клиентское соединение Redis
client := goredislib.NewClient(&goredislib.Options{
Addr: "localhost:6379",
})
// Создайте пул клиентских подключений redsync.
pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
// Создать экземпляр Redsync
rs := redsync.New(pool)
// Получите ту же блокировку мьютекса через то же имя значения ключа.
mutexname := "my-global-mutex"
//Создаем блокировку мьютекса на основе ключа
mutex := rs.NewMutex(mutexname)
// Исполнять под ключ
if err := mutex.Lock(); err != nil {
panic(err)
}
// Обработка бизнес-логики после получения блокировки.
// снять блокировку мьютекса
if ok, err := mutex.Unlock(); !ok || err != nil {
panic("unlock failed")
}
}
Если вам нужен режим кластера на основе Redis, используйте функцию NewClusterClient при создании клиентского подключения Redis следующим образом:
// Создайте клиентское соединение в режиме кластера Redis.
client := goredislib.NewClusterClient(&goredislib.ClusterOptions{
Addr: []string{"localhost:6379"},
})
анализ реализации
Как видно из приведенного выше примера кода, процесс использования этого пакета заключается в создании клиентского соединения Redis, создании экземпляра объекта redsync, создании мьютекса, блокировке и разблокировке. Далее мы шаг за шагом разберем процесс его реализации.
Создание объектов redsync в этом пакете достигается с помощью следующих функций:
redsync.NewPool(pool ...redis.Pool) *Redsync
первый,Мы видим, что функция NewPool получает несколько параметров redis.Pool.,Давайте еще раз посмотрим на структуру Redsync.,В структуре имеется только один атрибут пула,И это часть пула соединений Redis,Объясните, что может существовать несколько пулов клиентских подключений Redis. В то же время, вы можете узнать через комментарии,Redsync может создавать Распределенную блокировку, используя несколько пулов соединений Redis.
// Redsync provides a simple method for creating distributed mutexes using multiple Redis connection pools.
type Redsync struct {
pools []redis.Pool
}
Вопрос: Почему нам нужно использовать здесь фрагмент пула соединений Redis? На данный момент мы сначала думаем, что сюда передается только один пул клиентских подключений Redis. Продолжайте читать, помня об этом вопросе.
После создания экземпляра Redsync вы можете создать блокировку мьютекса с помощью метода NewMutex в этом экземпляре. Здесь создается экземпляр объекта Mutex. следующее:
// NewMutex returns a new distributed mutex with given name.
func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
m := &Mutex{
name: name,
expiry: 8 * time.Second,
tries: 32,
delayFunc: func(tries int) time.Duration {
return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
},
genValueFunc: genValue,
driftFactor: 0.01,
timeoutFactor: 0.05,
quorum: len(r.pools)/2 + 1,
pools: r.pools,
}
for _, o := range options {
o.Apply(m)
}
return m
}
Здесь мы сначала сосредоточимся на имени, genValueFunc, кворуме и пулах. Остальные мы проанализируем позже.
Мьютекс создан Замокпосле объекта,Вы можете выполнять операции блокировки с помощью метода Lock объекта мьютекса. Суть блокировки заключается в использовании операции setnx. Потому что setnx сначала определит, существует ли уже ключ.,Если ключ не существует,Затем установите значение ключа на значение,и возвращает 1, если ключ уже существует;,Тогда значение ключа не будет обновляться.,Верните 0 напрямую. Используя эту возможность, мы можем реализовать простейшую Распределенную блокировку.
image.png
Этот пакет также использует setnx для использования имени в объекте мьютекса в качестве ключа, случайного значения, сгенерированного функцией genValueFunc, в качестве значения и атрибута expiry в объекте мьютекса в качестве времени истечения срока действия. следующее:
func (m *Mutex) acquire(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
reply, err := conn.SetNX(m.name, value, m.expiry)
if err != nil {
return false, err
}
return reply, nil
}
Установка срока действия для setnx здесь предназначена для того, чтобы местоположение никогда не было освобождено. Предполагая, что срок действия ключа не установлен, если программа выйдет из строя до отправки команды удаления для снятия блокировки, то ключ будет постоянно храниться в Redis, и другие клиенты никогда не смогут получить блокировку.
image.png
Значение в этом пакете генерируется случайным образом с помощью функции genValueFunc. Эта функция по умолчанию генерирует случайное значение, обеспечивая в определенной степени уникальность значения. Уникальность значения гарантированно предотвращает случайное удаление блокировки при ее снятии. Здесь при снятии блокировки для операции удаления будет оцениваться, является ли удаляемое значение значением, удерживаемым текущей блокировкой. Конечно, вы можете указать функцию, которая генерирует значение при использовании NewMutex, но уникальность значения должна быть гарантирована.
При инициализации Redsync мы упомянули, что существует срез пулов, в котором хранится пул соединений Redis. Один вопрос: зачем использовать срез? Ответ: высокая доступность. При выполнении операции блокировки пакет будет циклически перебирать пулы и разрешать каждому клиентскому соединению пытаться выполнить операцию setnx. Если количество успешных операций превышает половину всех подключений, то блокировка считается успешной. В противном случае блокировка выйдет из строя.
Мы упоминали, что для того, чтобы блокировка никогда не была снята, мы установили срок действия ключа. Тогда время обработки во время процесса блокировки будет близко к времени истечения срока действия. Даже если setnx увенчается успехом, время истечения срока действия вскоре будет достигнуто. Тогда оставшегося времени будет недостаточно для обработки бизнес-логики после блокировки, что приведет к автоматическому выпуску. . В это время блокировка может быть получена другими потоками, что вызовет проблемы параллелизма. Таким образом, чтобы определить, является ли блокировка успешной, здесь необходимо не только определить, сколько операций redis setnx было успешным, но также определить, может ли оставшееся время после успешной блокировки обрабатывать последующую бизнес-логику, чтобы предотвратить блокировку. блокируется сразу после успешного завершения ситуации блокировки.
Таким образом, для определения успешности блокировки в этом пакете необходимы следующие условия:
now := time.Now()
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
if n >= m.quorum && now.Before(until) {
m.value = value
m.until = until
return nil
}
При расчете «до» используется текущее время плюс оставшееся время. Оставшееся время — это время действия m.expiry, минус время обработки блокировки now.Sub(start), а затем расчетное оставшееся значение (умножьте время действия на коэффициент DriftFactro, значение этого коэффициента по умолчанию — 0,01, конечно). его можно установить в зависимости от бизнеса).
Если во время процесса блокировки, учитывая проблемы с производительностью, блокировка однажды не удалась, вы можете попробовать еще раз. Однако во время повторной попытки необходимо учитывать временной интервал. Для обеспечения справедливости к минимальному времени ожидания будет добавлено случайное значение. Ниже представлена реализация этого пакета:
for i := 0; i < m.tries; i++ {
if i != 0 {
select {
case <-ctx.Done():
// Exit early if the context is done.
return ErrFailed
case <-time.After(m.delayFunc(i)):
// Fall-through when the delay timer completes.
}
}
// Другая логика блокировки
}
Здесь реализация функции m.delayFunc следующая:
delayFunc: func(tries int) time.Duration {
return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
},
Суть снятия блокировки заключается в удалении соответствующего ключа из redis. Просто используйте операцию удаления. Но при удалении обязательно не допускайте случайного удаления. Например, клиент1 начинает выполнять бизнес-обработку после получения блокировки, но бизнес-обработка занимает много времени и превышает время истечения срока действия блокировки. В результате до завершения бизнес-обработки срок действия блокировки истекает и автоматически удаляется (. эквивалентно освобождению блокировки, принадлежащей клиенту1), в этот момент клиент2 получит блокировку, а затем выполнит собственную бизнес-обработку. В это время бизнес-обработка клиента1 завершается, а затем отправляет удаление в Redis. key для снятия блокировки. После того, как Redis получает команду, он напрямую удаляет ключ, но в этот момент ключ принадлежит клиенту2, поэтому это эквивалентно снятию блокировки клиентом1:
image.png
Поэтому при блокировке мы задаем для ключа уникальное значение, а при его удалении будем судить, принадлежит ли значение текущему потоку. Когда бизнес-обработка не завершена, срок действия ключа автоматически истекает, и вы можете снять собственную блокировку в обычном режиме, не затрагивая другие потоки.
image.png
Другая проблема здесь заключается в том, что два шага: определение принадлежности блокировки текущему потоку и снятие блокировки не являются атомарными операциями. Обычно, если значение, полученное потоком 1 из Redis посредством операции get, равно 123, блокировка будет удалена. Однако, если система зависает на несколько секунд, прежде чем блокировка будет удалена, это происходит в течение этих нескольких секунд. Через несколько секунд срок действия ключа автоматически истекает, и Поток 2 успешно получает блокировку и начинает выполнять свою собственную логику. В это время Поток 1 восстанавливается после задержки и продолжает выполнять действие по удалению блокировки, и что при этом удаляется. время — это блокировка потока 2.
image.png
Решением здесь является использование скриптов Lua, чтобы гарантировать, что запросы и удаления являются атомарными операциями. Давайте посмотрим на реализацию пакета Redsync:
var deleteScript = redis.NewScript(1, `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`)
func (m *Mutex) release(ctx context.Context, pool redis.Pool, value string) (bool, error) {
conn, err := pool.Get(ctx)
if err != nil {
return false, err
}
defer conn.Close()
status, err := conn.Eval(deleteScript, m.name, value)
if err != nil {
return false, err
}
return status != int64(0), nil
}
image.png
Вернемся назад и посмотрим на функцию при создании блокировки мьютекса:
func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
m := &Mutex{
name: name,
expiry: 8 * time.Second,
tries: 32,
delayFunc: func(tries int) time.Duration {
return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
},
genValueFunc: genValue,
driftFactor: 0.01,
timeoutFactor: 0.05,
quorum: len(r.pools)/2 + 1,
pools: r.pools,
}
for _, o := range options {
o.Apply(m)
}
return m
}
Опция во второй сигнатуре функции представляет собой срез, который может задавать пользовательские значения для параметров переменной Mutex, таких как количество повторов, функция, генерирующая значение и т. д. Мы видим, что в реализации будет цикл:
for _, o := range options {
o.Apply(m)
}
type Option interface {
Apply(*Mutex)
}
// OptionFunc is a function that configures a mutex.
type OptionFunc func(*Mutex)
// Apply calls f(mutex)
func (f OptionFunc) Apply(mutex *Mutex) {
f(mutex)
}
Каждая опция реализует интерфейс Apply. Фактически здесь используется режим функциональной опции. Например, если мы хотим настроить количество повторов Mutex, мы можем использовать следующую функцию:
func WithTries(tries int) Option {
return OptionFunc(func(m *Mutex) {
m.tries = tries
})
}
При инициализации Mutex,проходить Долженфункция может быть установленаMutexизколичество попыток。Болееиз Шаблон функциональных опций Вы можете обратиться к тому, что я написал ранееиз Серия распространенных ошибокизстатья:Сборник типичных ошибок Го: Режим функциональных опций
---Особенно рекомендуется---
Особая рекомендация: «Академия Го» — это общедоступный аккаунт, посвященный практическим проектам Го, опыту работы с ловушками и руководствам по предотвращению ловушек в проектах, а также различным интересным инструментам Го. Он ориентирован на практичность и очень достоин всеобщего внимания. Нажмите на карточку официального аккаунта ниже, чтобы подписаться напрямую. Следуйте инструкциям и получите PDF-документ «100 распространенных ошибок в го».