commit 62afe859d8300fd4f0878b5d440ab9f29a11e3b2 Author: Pavel Shevaev Date: Tue Oct 25 18:33:12 2022 +0300 First commit diff --git a/example_test.go b/example_test.go new file mode 100644 index 0000000..a8901ec --- /dev/null +++ b/example_test.go @@ -0,0 +1,18 @@ +package redsync_test + +import "github.com/hjr265/redsync.go/redsync" + +func ExampleMutex() { + m, err := redsync.NewMutexWithGenericPool("FlyingSquirrels", pools) + if err != nil { + panic(err) + } + + err = m.Lock() + if err != nil { + panic(err) + } + defer m.Unlock() + + // Output: +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9c77b9b --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module github.com/hjr265/redsync + +go 1.13 + +require github.com/garyburd/redigo v1.6.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..126c663 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/garyburd/redigo v1.6.0 h1:0VruCpn7yAIIu7pWVClQC8wxCJEcG3nyzpMSHKi1PQc= +github.com/garyburd/redigo v1.6.0/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= diff --git a/mutex.go b/mutex.go new file mode 100644 index 0000000..cd29b16 --- /dev/null +++ b/mutex.go @@ -0,0 +1,340 @@ +// Package redsync provides a Redis-based distributed mutual exclusion lock implementation as described in the blog post http://antirez.com/news/77. +// +// Values containing the types defined in this package should not be copied. +package redsync + +import ( + "crypto/rand" + "encoding/base64" + "errors" + "net" + "sync" + "time" + + "github.com/garyburd/redigo/redis" +) + +const ( + // DefaultExpiry is used when Mutex Duration is 0 + DefaultExpiry = 8 * time.Second + // DefaultTries is used when Mutex Duration is 0 + DefaultTries = 16 + // DefaultDelay is used when Mutex Delay is 0 + DefaultDelay = 512 * time.Millisecond + // DefaultFactor is used when Mutex Factor is 0 + DefaultFactor = 0.01 +) + +var ( + // ErrFailed is returned when lock cannot be acquired + ErrFailed = errors.New("failed to acquire lock") +) + +// Locker interface with Lock returning an error when lock cannot be aquired +type Locker interface { + Lock() error + Touch() bool + Unlock() bool +} + +// Pool is a generic connection pool +type Pool interface { + Get() redis.Conn +} + +var _ = Pool(&redis.Pool{}) + +// A Mutex is a mutual exclusion lock. +// +// Fields of a Mutex must not be changed after first use. +type Mutex struct { + Name string // Resouce name + Expiry time.Duration // Duration for which the lock is valid, DefaultExpiry if 0 + + Tries int // Number of attempts to acquire lock before admitting failure, DefaultTries if 0 + Delay time.Duration // Delay between two attempts to acquire lock, DefaultDelay if 0 + + Factor float64 // Drift factor, DefaultFactor if 0 + + Quorum int // Quorum for the lock, set to len(addrs)/2+1 by NewMutex() + + value string + until time.Time + + nodes []Pool + nodem sync.Mutex +} + +var _ = Locker(&Mutex{}) + +// NewMutex returns a new Mutex on a named resource connected to the Redis instances at given addresses. +func NewMutex(name string, addrs []net.Addr) (*Mutex, error) { + if len(addrs) == 0 { + panic("redsync: addrs is empty") + } + + nodes := make([]Pool, len(addrs)) + for i, addr := range addrs { + dialTo := addr + node := &redis.Pool{ + MaxActive: 1, + Wait: true, + Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", dialTo.String()) + }, + } + nodes[i] = Pool(node) + } + + return NewMutexWithGenericPool(name, nodes) +} + +// NewMutexWithPool returns a new Mutex on a named resource connected to the Redis instances at given redis Pools. +func NewMutexWithPool(name string, nodes []*redis.Pool) (*Mutex, error) { + if len(nodes) == 0 { + panic("redsync: nodes is empty") + } + + genericNodes := make([]Pool, len(nodes)) + for i, node := range nodes { + genericNodes[i] = Pool(node) + } + + return &Mutex{ + Name: name, + Quorum: len(genericNodes)/2 + 1, + nodes: genericNodes, + }, nil +} + +// NewMutexWithGenericPool returns a new Mutex on a named resource connected to the Redis instances at given generic Pools. +// different from NewMutexWithPool to maintain backwards compatibility +func NewMutexWithGenericPool(name string, genericNodes []Pool) (*Mutex, error) { + if len(genericNodes) == 0 { + panic("redsync: genericNodes is empty") + } + + return &Mutex{ + Name: name, + Quorum: len(genericNodes)/2 + 1, + nodes: genericNodes, + }, nil +} + +// RedSync provides mutex handling via a multiple Redis connection pools. +type RedSync struct { + pools []Pool +} + +// New creates and returns a new RedSync instance from given network addresses. +func New(addrs []net.Addr) *RedSync { + pools := make([]Pool, len(addrs)) + for i, addr := range addrs { + dialTo := addr + node := &redis.Pool{ + MaxActive: 1, + Wait: true, + Dial: func() (redis.Conn, error) { + return redis.Dial("tcp", dialTo.String()) + }, + } + pools[i] = Pool(node) + } + + return &RedSync{pools} +} + +// NewWithGenericPool creates and returns a new RedSync instance from given generic Pools. +func NewWithGenericPool(genericNodes []Pool) *RedSync { + if len(genericNodes) == 0 { + panic("redsync: genericNodes is empty") + } + + return &RedSync{ + pools: genericNodes, + } +} + +// NewMutex returns a new Mutex with the given name. +func (r *RedSync) NewMutex(name string) *Mutex { + return &Mutex{ + Name: name, + Quorum: len(r.pools)/2 + 1, + nodes: r.pools, + } +} + +// Lock locks m. +// In case it returns an error on failure, you may retry to acquire the lock by calling this method again. +func (m *Mutex) Lock() error { + m.nodem.Lock() + defer m.nodem.Unlock() + + b := make([]byte, 16) + _, err := rand.Read(b) + if err != nil { + return err + } + value := base64.StdEncoding.EncodeToString(b) + + expiry := m.Expiry + if expiry == 0 { + expiry = DefaultExpiry + } + + retries := m.Tries + if retries == 0 { + retries = DefaultTries + } + + for i := 0; i < retries; i++ { + n := 0 + start := time.Now() + for _, node := range m.nodes { + if node == nil { + continue + } + + conn := node.Get() + reply, err := redis.String(conn.Do("set", m.Name, value, "nx", "px", int(expiry/time.Millisecond))) + conn.Close() + if err != nil { + continue + } + if reply != "OK" { + continue + } + n++ + } + + factor := m.Factor + if factor == 0 { + factor = DefaultFactor + } + + until := time.Now().Add(expiry - time.Now().Sub(start) - time.Duration(int64(float64(expiry)*factor)) + 2*time.Millisecond) + if n >= m.Quorum && time.Now().Before(until) { + m.value = value + m.until = until + return nil + } + for _, node := range m.nodes { + if node == nil { + continue + } + + conn := node.Get() + _, err := delScript.Do(conn, m.Name, value) + conn.Close() + if err != nil { + continue + } + } + + // Have no delay on the last try so we can return ErrFailed sooner. + if i == retries-1 { + continue + } + + delay := m.Delay + if delay == 0 { + delay = DefaultDelay + } + time.Sleep(delay) + } + + return ErrFailed +} + +// Touch resets m's expiry to the expiry value. +// It is a run-time error if m is not locked on entry to Touch. +// It returns the status of the touch +func (m *Mutex) Touch() bool { + m.nodem.Lock() + defer m.nodem.Unlock() + + value := m.value + if value == "" { + panic("redsync: touch of unlocked mutex") + } + + expiry := m.Expiry + if expiry == 0 { + expiry = DefaultExpiry + } + reset := int(expiry / time.Millisecond) + + n := 0 + for _, node := range m.nodes { + if node == nil { + continue + } + + conn := node.Get() + reply, err := touchScript.Do(conn, m.Name, value, reset) + conn.Close() + if err != nil { + continue + } + if reply != "OK" { + continue + } + n++ + } + if n >= m.Quorum { + return true + } + return false +} + +// Unlock unlocks m. +// It is a run-time error if m is not locked on entry to Unlock. +// It returns the status of the unlock +func (m *Mutex) Unlock() bool { + m.nodem.Lock() + defer m.nodem.Unlock() + + value := m.value + if value == "" { + panic("redsync: unlock of unlocked mutex") + } + + m.value = "" + m.until = time.Unix(0, 0) + + n := 0 + for _, node := range m.nodes { + if node == nil { + continue + } + + conn := node.Get() + status, err := delScript.Do(conn, m.Name, value) + conn.Close() + if err != nil { + continue + } + if status == 0 { + continue + } + n++ + } + if n >= m.Quorum { + return true + } + return false +} + +var delScript = redis.NewScript(1, ` +if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) +else + return 0 +end`) + +var touchScript = redis.NewScript(1, ` +if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("set", KEYS[1], ARGV[1], "xx", "px", ARGV[2]) +else + return "ERR" +end`) diff --git a/mutex_test.go b/mutex_test.go new file mode 100644 index 0000000..16f455d --- /dev/null +++ b/mutex_test.go @@ -0,0 +1,140 @@ +package redsync_test + +import ( + "math/rand" + "os" + "testing" + "time" + + "github.com/garyburd/redigo/redis" + "github.com/hjr265/redsync.go/redsync" + "github.com/stvp/tempredis" +) + +var servers []*tempredis.Server +var pools []redsync.Pool + +func TestMain(m *testing.M) { + for i := 0; i < 4; i++ { + server, err := tempredis.Start(tempredis.Config{}) + if err != nil { + panic(err) + } + defer server.Term() + servers = append(servers, server) + } + pools = makeTestPools() + result := m.Run() + for _, server := range servers { + server.Term() + } + os.Exit(result) +} + +func TestMutex(t *testing.T) { + done := make(chan bool) + chErr := make(chan error) + + for i := 0; i < 4; i++ { + go func() { + m, err := redsync.NewMutexWithGenericPool("RedsyncMutex", pools) + if err != nil { + chErr <- err + return + } + + f := 0 + for j := 0; j < 32; j++ { + err := m.Lock() + if err == redsync.ErrFailed { + f++ + if f > 2 { + chErr <- err + return + } + continue + } + if err != nil { + chErr <- err + return + } + + time.Sleep(1 * time.Millisecond) + + m.Unlock() + + time.Sleep(time.Duration(rand.Int31n(128)) * time.Millisecond) + } + done <- true + }() + } + for i := 0; i < 4; i++ { + select { + case <-done: + case err := <-chErr: + t.Fatal(err) + } + } +} + +func TestMutexWithRedSync(t *testing.T) { + done := make(chan bool) + chErr := make(chan error) + + rs := redsync.NewWithGenericPool(pools) + for i := 0; i < 4; i++ { + go func() { + m := rs.NewMutex("RedsyncMutex2") + f := 0 + for j := 0; j < 32; j++ { + err := m.Lock() + if err == redsync.ErrFailed { + f++ + if f > 2 { + chErr <- err + return + } + continue + } + if err != nil { + chErr <- err + return + } + + time.Sleep(1 * time.Millisecond) + + m.Unlock() + + time.Sleep(time.Duration(rand.Int31n(128)) * time.Millisecond) + } + done <- true + }() + } + for i := 0; i < 4; i++ { + select { + case <-done: + case err := <-chErr: + t.Fatal(err) + } + } +} + +func makeTestPools() []redsync.Pool { + pools := []redsync.Pool{} + for _, server := range servers { + func(server *tempredis.Server) { + pools = append(pools, &redis.Pool{ + MaxIdle: 3, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + return redis.Dial("unix", server.Socket()) + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err + }, + }) + }(server) + } + return pools +}