commit 99e2d31d05e391341668f668205346db9a48a8fe Author: Pavel Shevaev Date: Tue Oct 25 18:24:01 2022 +0300 First commit diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..c4446f3 --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module github.com/bit.games/rdb + +go 1.13 + +require ( + git.bit5.ru/backend/res_tracker v1.0.0 + git.bit5.ru/backend/colog v1.0.0 + github.com/pkg/errors v0.9.1 + github.com/garyburd/redigo v1.6.4 + github.com/gocraft/dbr v0.0.0-20190714181702-8114670a83bd // indirect + github.com/google/uuid v1.1.1 // indirect + github.com/gorilla/websocket v1.4.2 // indirect + github.com/hjr265/redsync v4.6.0 + github.com/stretchr/testify v1.4.0 +) + diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..5a3f07c --- /dev/null +++ b/go.sum @@ -0,0 +1,32 @@ +github.com/DATA-DOG/go-sqlmock v1.3.3 h1:CWUqKXe0s8A2z6qCgkP4Kru7wC11YoAnoupUKFDnH08= +github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/garyburd/redigo v1.6.0 h1:0VruCpn7yAIIu7pWVClQC8wxCJEcG3nyzpMSHKi1PQc= +github.com/garyburd/redigo v1.6.0/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= +github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= +github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/gocraft/dbr v0.0.0-20190714181702-8114670a83bd h1:GlmMPhEpMWrNOyUaAMpRGy4zkb03eXuTb8TKXr3j0dQ= +github.com/gocraft/dbr v0.0.0-20190714181702-8114670a83bd/go.mod h1:BK1nFI5Pp8XJg1sE7oMBzyW32LBuS2r25HlZPa6tXXs= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA= +github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.1.1 h1:sJZmqHoEaY7f+NPP8pgLB/WxulyR3fewgCM2qaSlBb4= +github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o= +github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/rdb.go b/rdb.go new file mode 100644 index 0000000..baf7cd4 --- /dev/null +++ b/rdb.go @@ -0,0 +1,397 @@ +package rdb + +import ( + "fmt" + "math" + "strconv" + "sync" + "time" + + "git.bit5.ru/backend/colog" + "git.bit5.ru/backend/res_tracker" + "github.com/garyburd/redigo/redis" + "github.com/hjr265/redsync" + "github.com/pkg/errors" +) + +const ( + ExpireRDMutexSec = 30 +) + +var pools sync.Map + +type RdSettings struct { + Host, Prefix string + Port int + Db int + LogLevel int +} + +func (s *RdSettings) ConnStr() string { + return s.Host + ":" + strconv.Itoa(s.Port) +} + +// NOTE: redis connection logs all operations +type rdb struct { + orig redis.Conn + name string + logger *colog.CoLog + s RdSettings +} + +func (rd *rdb) Close() error { + return rd.orig.Close() +} + +func (rd *rdb) Err() error { + return rd.orig.Err() +} + +func (rd *rdb) Flush() error { + return rd.orig.Flush() +} + +func (rd *rdb) Do(command string, args ...interface{}) (interface{}, error) { + if rd.s.LogLevel > 1 { + if len(command) != 0 { + var s string + for i := 0; i < len(args); i++ { + s += fmt.Sprintf("%v ", args[i]) + } + rd.logger.Output(colog.LDebug, 5, fmt.Sprintf("%s %s", command, s)) + } + } else if rd.s.LogLevel > 0 { + //for this level not logging too verbose commands + if len(command) != 0 && command != "PING" { + var s string + for i := 0; i < len(args) && i < 5; i++ { + s += fmt.Sprintf("%v ", args[i]) + } + if len(args) > 5 { + s += " ..." + } + rd.logger.Output(colog.LDebug, 5, fmt.Sprintf("%s %s", command, s)) + } + } + return rd.orig.Do(command, args...) +} + +func (rd *rdb) Send(command string, args ...interface{}) error { + if rd.s.LogLevel > 0 { + if len(command) != 0 { + var s string + for i := 0; i < len(args); i++ { + s += fmt.Sprintf("%v ", args[i]) + } + rd.logger.Output(colog.LDebug, 5, fmt.Sprintf("%s %s", command, s)) + } + } + return rd.orig.Send(command, args...) +} + +func (rd *rdb) Receive() (interface{}, error) { + return rd.orig.Receive() +} + +func redisPool(logger *colog.CoLog, s RdSettings, clientName string) *redis.Pool { + //TODO: why not simply using s as a key for pools.Load(..)? + key := s.Host + ":" + strconv.Itoa(s.Port) + ":" + strconv.Itoa(s.Db) + ":" + s.Prefix + v, ok := pools.Load(key) + if ok { + return v.(*redis.Pool) + } else { + rdpool := NewRedisPool(logger, s, clientName) + pools.Store(key, rdpool) + return rdpool + } +} + +func NewRedisPool(logger *colog.CoLog, s RdSettings, clientName string) *redis.Pool { + return NewRedisPoolEx(logger, s, 16, 240*time.Second, clientName) +} + +func NewRedisPoolEx(logger *colog.CoLog, s RdSettings, maxIdle int, idleTimeout time.Duration, clientName string) *redis.Pool { + return &redis.Pool{ + Wait: false, + MaxIdle: maxIdle, + IdleTimeout: idleTimeout, + Dial: func() (redis.Conn, error) { + orig, err := redis.Dial("tcp", s.ConnStr()) + if err != nil { + return nil, errors.WithStack(err) + } + c := &rdb{ + orig: orig, + name: s.Prefix, + logger: logger.Clone().AddPrefix("[" + s.Prefix + "] "), + s: s, + } + + // Сохраняем название клиента для диагностики соединений с помощью команды CLIENT LIST. + // https://redis.io/commands/client-list/ + if len(clientName) > 0 { + _, err = c.Do("CLIENT", "SETNAME", clientName) + if err != nil { + return nil, errors.WithStack(err) + } + } + + _, err = c.Do("SELECT", s.Db) + if err != nil { + return nil, errors.WithStack(err) + } + + return c, err + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err + }, + } +} + +func GetRedisConn(logger *colog.CoLog, s RdSettings, clientName string) redis.Conn { + conn := redisPool(logger, s, clientName).Get() + + if res_tracker.IsOn() { + //let's wrap our connection with the special tracked wrapper + conn = tracked{conn} + res_tracker.Track(conn) + } + + return conn +} + +func GetRedisMutex(logger *colog.CoLog, s RdSettings, name string, ttlSec int) *redsync.Mutex { + mx, _ := redsync.NewMutexWithPool(name, []*redis.Pool{redisPool(logger, s, "")}) + mx.Expiry = time.Duration(ttlSec) * time.Second + return mx +} + +func GetRedisMutexAutoTTL(logger *colog.CoLog, s RdSettings, name string) *redsync.Mutex { + return GetRedisMutex(logger, s, name, ExpireRDMutexSec) +} + +func RemoveRedisMutex(logger *colog.CoLog, s RdSettings, name string) error { + rd := redisPool(logger, s, "").Get() + + _, err := rd.Do("DEL", name) + return err +} + +func ExistsRedisMutex(logger *colog.CoLog, s RdSettings, name string) (bool, error) { + rd := redisPool(logger, s, "").Get() + + exists, err := redis.Bool(rd.Do("EXISTS", name)) + if err != nil { + if err == redis.ErrNil { + return false, nil + } + return false, err + } + return exists, nil +} + +func GetUint32(rd redis.Conn, key string) (uint32, error) { + nUint64, err := redis.Uint64(rd.Do("GET", key)) + if err != nil { + return 0, err + } + if rdErr := rd.Err(); rdErr != nil { + return 0, rdErr + } + + nUint32, err := uint64ToUint32(nUint64) + if err != nil { + return 0, err + } + + return nUint32, nil +} + +func GetListLen(rd redis.Conn, key string) (uint32, error) { + nUint64, err := redis.Uint64(rd.Do("LLEN", key)) + if err != nil { + return 0, err + } + if rdErr := rd.Err(); rdErr != nil { + return 0, rdErr + } + + nUint32, err := uint64ToUint32(nUint64) + if err != nil { + return 0, err + } + + return nUint32, nil +} + +func SendRpushSliceUint32(rd redis.Conn, key string, nums []uint32) { + if len(nums) == 0 { + return + } + + params := make([]interface{}, 0, 1+len(nums)) + params = append(params, key) + for _, n := range nums { + params = append(params, n) + } + + rd.Send("RPUSH", params...) +} + +func SendRpushSliceString(rd redis.Conn, key string, strs []string) { + if len(strs) == 0 { + return + } + + params := make([]interface{}, 0, 1+len(strs)) + params = append(params, key) + for _, s := range strs { + params = append(params, s) + } + + rd.Send("RPUSH", params...) +} + +func uint64ToUint32(n64 uint64) (uint32, error) { + if n64 > math.MaxUint32 { + return 0, errors.Errorf("Can not convert uint64 %d to uint32.", n64) + } + return uint32(n64), nil +} + +// Uint32s is a helper that converts an array command reply to a []uint32. +// If err is not equal to nil, then Uint32s returns nil, err. +func Uint32s(reply interface{}, err error) ([]uint32, error) { + var nums []uint32 + if reply == nil { + return nums, redis.ErrNil + } + values, err := redis.Values(reply, err) + if err != nil { + return nums, err + } + if err := redis.ScanSlice(values, &nums); err != nil { + return nums, err + } + return nums, nil +} + +func ReceiveUint32s(rdconn redis.Conn) ([]uint32, error) { + nums, err := Uint32s(rdconn.Receive()) + if e := getErr(rdconn, err); e != nil { + return nil, e + } + + return nums, nil +} + +func Uint32(reply interface{}, err error) (uint32, error) { + nUint64, err := redis.Uint64(reply, err) + if err != nil { + return 0, errors.Wrap(err, "Rdb can not convert reply to uint64.") + } + + nUint32, err := uint64ToUint32(nUint64) + if err != nil { + return 0, err + } + + return nUint32, nil +} + +func ReceiveInt(rdConn redis.Conn) (int, error) { + n, err := redis.Int(rdConn.Receive()) + if err != nil { + return 0, errors.Wrap(err, "Can not execute ReceiveInt(). Got error from redis.Int().") + } + + if rdErr := rdConn.Err(); rdErr != nil { + return 0, errors.Wrap(rdErr, "Can not execute ReceiveInt(). Got error from rdConn.Err().") + } + + return n, nil +} + +func ReceiveUint32(rdConn redis.Conn) (uint32, error) { + n, err := Uint32(rdConn.Receive()) + if err != nil { + return 0, errors.Wrap(err, "Can not execute ReceiveUint32(). Got error from Uint32().") + } + + if rdErr := rdConn.Err(); rdErr != nil { + return 0, errors.Wrap(rdErr, "Can not execute ReceiveUint32(). Got error from rdConn.Err().") + } + + return n, nil +} + +func GetUint32s(conn redis.Conn, commandName string, args ...interface{}) ([]uint32, error) { + return Uint32s(Do(conn, commandName, args...)) +} + +func Do(conn redis.Conn, commandName string, args ...interface{}) (reply interface{}, err error) { + result, err := conn.Do(commandName, args...) + if e := getErr(conn, err); e != nil { + return nil, e + } + + return result, nil +} + +// Deprecated. Use func Flush2. +func Flush(conn redis.Conn) error { + err := conn.Flush() + return getErr(conn, err) +} + +// TODO: Delete func Flush. +// TODO: Rename func Flush2 to Flush. +func Flush2(conn redis.Conn) error { + if err := conn.Flush(); err != nil { + return errors.Wrap(err, "Can not execute function Flush2. Got error from redis.Conn Flush.") + } + + if err := conn.Err(); err != nil { + return errors.Wrap(err, "Can not execute function Flush2. Got redis connection error.") + } + + return nil +} + +func Err(conn redis.Conn, err error) error { + return getErr(conn, err) +} + +// NOTE: workaround for redigo not returning original connection error +func getErr(conn redis.Conn, err error) error { + if err != nil { + if err == redis.ErrNil { + return nil + } + return errors.Wrap(err, "Got redis error.") + } + + if connErr := conn.Err(); connErr != nil { + return errors.Wrap(connErr, "Got redis connection error.") + } + + return nil +} + +type tracked struct { + subj redis.Conn +} + +func (t tracked) Close() error { + res_tracker.Untrack(t) + return t.subj.Close() +} +func (t tracked) Do(cmd string, args ...interface{}) (interface{}, error) { + return t.subj.Do(cmd, args...) +} +func (t tracked) Send(cmd string, args ...interface{}) error { return t.subj.Send(cmd, args...) } +func (t tracked) Err() error { return t.subj.Err() } +func (t tracked) Flush() error { return t.subj.Flush() } +func (t tracked) Receive() (interface{}, error) { return t.subj.Receive() } diff --git a/rdb_test.go b/rdb_test.go new file mode 100644 index 0000000..cf604d7 --- /dev/null +++ b/rdb_test.go @@ -0,0 +1,70 @@ +package rdb_test + +import ( + "os" + "testing" + + "git.bit5.ru/backend/colog" + "git.bit5.ru/backend/rdb" + "github.com/garyburd/redigo/redis" + "github.com/stretchr/testify/assert" +) + +func getPool() *redis.Pool { + pool := rdb.NewRedisPool(getLogger(), rdb.RdSettings{Host: "localhost", Port: 6379, Db: 10}, "test") + return pool +} + +func getLogger() *colog.CoLog { + tlog := colog.NewCoLog(os.Stderr, "", 0) + ft := &colog.StdFormatter{Flag: 0} + ft.ColorSupported(true) + tlog.SetFormatter(ft) + return tlog +} + +func TestBadConnection(t *testing.T) { + pool := rdb.NewRedisPool(getLogger(), rdb.RdSettings{Host: "dummy", Port: 80}, "test") + conn := pool.Get() + assert.NotNil(t, conn) + _, err := conn.Do("PING") + assert.NotNil(t, err) +} + +func TestGetConn(t *testing.T) { + pool := getPool() + + conn := pool.Get() + defer conn.Close() + _, err := conn.Do("PING") + assert.Nil(t, err) +} + +func TestCloseConn(t *testing.T) { + pool := getPool() + + conn := pool.Get() + assert.EqualValues(t, 0, pool.IdleCount()) + + _, err := conn.Do("PING") + assert.Nil(t, err) + + conn.Close() + _, err = conn.Do("PING") + assert.NotNil(t, err) + assert.EqualValues(t, 1, pool.IdleCount()) +} + +func TestDoubleCloseConnIsOk(t *testing.T) { + pool := getPool() + + conn := pool.Get() + _, err := conn.Do("PING") + assert.Nil(t, err) + + assert.EqualValues(t, 0, pool.IdleCount()) + conn.Close() + assert.EqualValues(t, 1, pool.IdleCount()) + conn.Close() + assert.EqualValues(t, 1, pool.IdleCount()) +}