redsync/mutex.go

341 lines
7.2 KiB
Go

// Package redsync provides a Redis-based distributed mutual exclusion lock implementation as described in the blog post http://antirez.com/news/77.
//
// Values containing the types defined in this package should not be copied.
package redsync
import (
"crypto/rand"
"encoding/base64"
"errors"
"net"
"sync"
"time"
"git.bit5.ru/backend/redigo/redis"
)
const (
// DefaultExpiry is used when Mutex Duration is 0
DefaultExpiry = 8 * time.Second
// DefaultTries is used when Mutex Duration is 0
DefaultTries = 16
// DefaultDelay is used when Mutex Delay is 0
DefaultDelay = 512 * time.Millisecond
// DefaultFactor is used when Mutex Factor is 0
DefaultFactor = 0.01
)
var (
// ErrFailed is returned when lock cannot be acquired
ErrFailed = errors.New("failed to acquire lock")
)
// Locker interface with Lock returning an error when lock cannot be aquired
type Locker interface {
Lock() error
Touch() bool
Unlock() bool
}
// Pool is a generic connection pool
type Pool interface {
Get() redis.Conn
}
var _ = Pool(&redis.Pool{})
// A Mutex is a mutual exclusion lock.
//
// Fields of a Mutex must not be changed after first use.
type Mutex struct {
Name string // Resouce name
Expiry time.Duration // Duration for which the lock is valid, DefaultExpiry if 0
Tries int // Number of attempts to acquire lock before admitting failure, DefaultTries if 0
Delay time.Duration // Delay between two attempts to acquire lock, DefaultDelay if 0
Factor float64 // Drift factor, DefaultFactor if 0
Quorum int // Quorum for the lock, set to len(addrs)/2+1 by NewMutex()
value string
until time.Time
nodes []Pool
nodem sync.Mutex
}
var _ = Locker(&Mutex{})
// NewMutex returns a new Mutex on a named resource connected to the Redis instances at given addresses.
func NewMutex(name string, addrs []net.Addr) (*Mutex, error) {
if len(addrs) == 0 {
panic("redsync: addrs is empty")
}
nodes := make([]Pool, len(addrs))
for i, addr := range addrs {
dialTo := addr
node := &redis.Pool{
MaxActive: 1,
Wait: true,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", dialTo.String())
},
}
nodes[i] = Pool(node)
}
return NewMutexWithGenericPool(name, nodes)
}
// NewMutexWithPool returns a new Mutex on a named resource connected to the Redis instances at given redis Pools.
func NewMutexWithPool(name string, nodes []*redis.Pool) (*Mutex, error) {
if len(nodes) == 0 {
panic("redsync: nodes is empty")
}
genericNodes := make([]Pool, len(nodes))
for i, node := range nodes {
genericNodes[i] = Pool(node)
}
return &Mutex{
Name: name,
Quorum: len(genericNodes)/2 + 1,
nodes: genericNodes,
}, nil
}
// NewMutexWithGenericPool returns a new Mutex on a named resource connected to the Redis instances at given generic Pools.
// different from NewMutexWithPool to maintain backwards compatibility
func NewMutexWithGenericPool(name string, genericNodes []Pool) (*Mutex, error) {
if len(genericNodes) == 0 {
panic("redsync: genericNodes is empty")
}
return &Mutex{
Name: name,
Quorum: len(genericNodes)/2 + 1,
nodes: genericNodes,
}, nil
}
// RedSync provides mutex handling via a multiple Redis connection pools.
type RedSync struct {
pools []Pool
}
// New creates and returns a new RedSync instance from given network addresses.
func New(addrs []net.Addr) *RedSync {
pools := make([]Pool, len(addrs))
for i, addr := range addrs {
dialTo := addr
node := &redis.Pool{
MaxActive: 1,
Wait: true,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", dialTo.String())
},
}
pools[i] = Pool(node)
}
return &RedSync{pools}
}
// NewWithGenericPool creates and returns a new RedSync instance from given generic Pools.
func NewWithGenericPool(genericNodes []Pool) *RedSync {
if len(genericNodes) == 0 {
panic("redsync: genericNodes is empty")
}
return &RedSync{
pools: genericNodes,
}
}
// NewMutex returns a new Mutex with the given name.
func (r *RedSync) NewMutex(name string) *Mutex {
return &Mutex{
Name: name,
Quorum: len(r.pools)/2 + 1,
nodes: r.pools,
}
}
// Lock locks m.
// In case it returns an error on failure, you may retry to acquire the lock by calling this method again.
func (m *Mutex) Lock() error {
m.nodem.Lock()
defer m.nodem.Unlock()
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
return err
}
value := base64.StdEncoding.EncodeToString(b)
expiry := m.Expiry
if expiry == 0 {
expiry = DefaultExpiry
}
retries := m.Tries
if retries == 0 {
retries = DefaultTries
}
for i := 0; i < retries; i++ {
n := 0
start := time.Now()
for _, node := range m.nodes {
if node == nil {
continue
}
conn := node.Get()
reply, err := redis.String(conn.Do("set", m.Name, value, "nx", "px", int(expiry/time.Millisecond)))
conn.Close()
if err != nil {
continue
}
if reply != "OK" {
continue
}
n++
}
factor := m.Factor
if factor == 0 {
factor = DefaultFactor
}
until := time.Now().Add(expiry - time.Now().Sub(start) - time.Duration(int64(float64(expiry)*factor)) + 2*time.Millisecond)
if n >= m.Quorum && time.Now().Before(until) {
m.value = value
m.until = until
return nil
}
for _, node := range m.nodes {
if node == nil {
continue
}
conn := node.Get()
_, err := delScript.Do(conn, m.Name, value)
conn.Close()
if err != nil {
continue
}
}
// Have no delay on the last try so we can return ErrFailed sooner.
if i == retries-1 {
continue
}
delay := m.Delay
if delay == 0 {
delay = DefaultDelay
}
time.Sleep(delay)
}
return ErrFailed
}
// Touch resets m's expiry to the expiry value.
// It is a run-time error if m is not locked on entry to Touch.
// It returns the status of the touch
func (m *Mutex) Touch() bool {
m.nodem.Lock()
defer m.nodem.Unlock()
value := m.value
if value == "" {
panic("redsync: touch of unlocked mutex")
}
expiry := m.Expiry
if expiry == 0 {
expiry = DefaultExpiry
}
reset := int(expiry / time.Millisecond)
n := 0
for _, node := range m.nodes {
if node == nil {
continue
}
conn := node.Get()
reply, err := touchScript.Do(conn, m.Name, value, reset)
conn.Close()
if err != nil {
continue
}
if reply != "OK" {
continue
}
n++
}
if n >= m.Quorum {
return true
}
return false
}
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
// It returns the status of the unlock
func (m *Mutex) Unlock() bool {
m.nodem.Lock()
defer m.nodem.Unlock()
value := m.value
if value == "" {
panic("redsync: unlock of unlocked mutex")
}
m.value = ""
m.until = time.Unix(0, 0)
n := 0
for _, node := range m.nodes {
if node == nil {
continue
}
conn := node.Get()
status, err := delScript.Do(conn, m.Name, value)
conn.Close()
if err != nil {
continue
}
if status == 0 {
continue
}
n++
}
if n >= m.Quorum {
return true
}
return false
}
var delScript = redis.NewScript(1, `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end`)
var touchScript = redis.NewScript(1, `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("set", KEYS[1], ARGV[1], "xx", "px", ARGV[2])
else
return "ERR"
end`)