From 33b922f1810d7ac6e8c162e7a5fadd103da780b6 Mon Sep 17 00:00:00 2001 From: Pavel Shevaev Date: Fri, 28 Oct 2022 14:03:40 +0300 Subject: [PATCH] More robust and straight forward pool abstraction --- rdb.go | 106 +++++++++++++++++++++++++++++----------------------- rdb_test.go | 20 +++++----- 2 files changed, 70 insertions(+), 56 deletions(-) diff --git a/rdb.go b/rdb.go index 8800cee..048f676 100644 --- a/rdb.go +++ b/rdb.go @@ -4,13 +4,13 @@ import ( "fmt" "math" "strconv" - "sync" "time" "git.bit5.ru/backend/colog" "git.bit5.ru/backend/redigo/redis" "git.bit5.ru/backend/redsync" "git.bit5.ru/backend/res_tracker" + "github.com/pkg/errors" ) @@ -18,25 +18,53 @@ const ( ExpireRDMutexSec = 30 ) -var pools sync.Map - -type RdSettings struct { +type Settings struct { Host, Prefix string - Port int - Db int - LogLevel int + //will be shown in CLIENT LIST + ClientName string + Port int + Db int + LogLevel int + MaxIdle int + IdleTimeoutSec int } -func (s *RdSettings) ConnStr() string { +func (s *Settings) ConnStr() string { return s.Host + ":" + strconv.Itoa(s.Port) } +type Pool struct { + S Settings + RP *redis.Pool +} + +func OpenPool(s Settings, logger *colog.CoLog) *Pool { + p := &Pool{S: s, RP: newRedisPool(s, logger)} + return p +} + +func (p *Pool) Close() { + p.RP.Close() +} + +func (p *Pool) Get() redis.Conn { + conn := p.RP.Get() + + if res_tracker.IsOn() { + //let's wrap our connection with the special tracked wrapper + conn = &tracked{conn} + res_tracker.Track(conn) + } + + return conn +} + // NOTE: redis connection logs all operations type rdb struct { orig redis.Conn name string logger *colog.CoLog - s RdSettings + s Settings } func (rd *rdb) Close() error { @@ -93,28 +121,22 @@ func (rd *rdb) Receive() (interface{}, error) { return rd.orig.Receive() } -func redisPool(logger *colog.CoLog, s RdSettings, clientName string) *redis.Pool { - //TODO: why not simply using s as a key for pools.Load(..)? - key := s.Host + ":" + strconv.Itoa(s.Port) + ":" + strconv.Itoa(s.Db) + ":" + s.Prefix - v, ok := pools.Load(key) - if ok { - return v.(*redis.Pool) - } else { - rdpool := NewRedisPool(logger, s, clientName) - pools.Store(key, rdpool) - return rdpool +func newRedisPool(s Settings, logger *colog.CoLog) *redis.Pool { + + maxIdle := s.MaxIdle + if maxIdle == 0 { + maxIdle = 16 } -} -func NewRedisPool(logger *colog.CoLog, s RdSettings, clientName string) *redis.Pool { - return NewRedisPoolEx(logger, s, 16, 240*time.Second, clientName) -} + idleTimeoutSec := s.IdleTimeoutSec + if idleTimeoutSec == 0 { + idleTimeoutSec = 240 + } -func NewRedisPoolEx(logger *colog.CoLog, s RdSettings, maxIdle int, idleTimeout time.Duration, clientName string) *redis.Pool { return &redis.Pool{ Wait: false, MaxIdle: maxIdle, - IdleTimeout: idleTimeout, + IdleTimeout: time.Second * time.Duration(idleTimeoutSec), Dial: func() (redis.Conn, error) { orig, err := redis.Dial("tcp", s.ConnStr()) if err != nil { @@ -129,8 +151,8 @@ func NewRedisPoolEx(logger *colog.CoLog, s RdSettings, maxIdle int, idleTimeout // Сохраняем название клиента для диагностики соединений с помощью команды CLIENT LIST. // https://redis.io/commands/client-list/ - if len(clientName) > 0 { - _, err = c.Do("CLIENT", "SETNAME", clientName) + if len(s.ClientName) > 0 { + _, err = c.Do("CLIENT", "SETNAME", s.ClientName) if err != nil { return nil, errors.WithStack(err) } @@ -150,37 +172,27 @@ func NewRedisPoolEx(logger *colog.CoLog, s RdSettings, maxIdle int, idleTimeout } } -func GetRedisConn(logger *colog.CoLog, s RdSettings, clientName string) redis.Conn { - conn := redisPool(logger, s, clientName).Get() - - if res_tracker.IsOn() { - //let's wrap our connection with the special tracked wrapper - conn = tracked{conn} - res_tracker.Track(conn) - } - - return conn -} - -func GetRedisMutex(logger *colog.CoLog, s RdSettings, name string, ttlSec int) *redsync.Mutex { - mx, _ := redsync.NewMutexWithPool(name, []*redis.Pool{redisPool(logger, s, "")}) +func GetRedisMutex(pool *Pool, name string, ttlSec int) *redsync.Mutex { + mx, _ := redsync.NewMutexWithPool(name, []*redis.Pool{pool.RP}) mx.Expiry = time.Duration(ttlSec) * time.Second return mx } -func GetRedisMutexAutoTTL(logger *colog.CoLog, s RdSettings, name string) *redsync.Mutex { - return GetRedisMutex(logger, s, name, ExpireRDMutexSec) +func GetRedisMutexAutoTTL(pool *Pool, name string) *redsync.Mutex { + return GetRedisMutex(pool, name, ExpireRDMutexSec) } -func RemoveRedisMutex(logger *colog.CoLog, s RdSettings, name string) error { - rd := redisPool(logger, s, "").Get() +func RemoveRedisMutex(pool *Pool, name string) error { + rd := pool.Get() + defer rd.Close() _, err := rd.Do("DEL", name) return err } -func ExistsRedisMutex(logger *colog.CoLog, s RdSettings, name string) (bool, error) { - rd := redisPool(logger, s, "").Get() +func ExistsRedisMutex(pool *Pool, name string) (bool, error) { + rd := pool.Get() + defer rd.Close() exists, err := redis.Bool(rd.Do("EXISTS", name)) if err != nil { diff --git a/rdb_test.go b/rdb_test.go index c8c6e50..e19dba0 100644 --- a/rdb_test.go +++ b/rdb_test.go @@ -6,12 +6,11 @@ import ( "git.bit5.ru/backend/colog" "git.bit5.ru/backend/rdb" - "git.bit5.ru/backend/redigo/redis" "github.com/stretchr/testify/assert" ) -func getPool() *redis.Pool { - pool := rdb.NewRedisPool(getLogger(), rdb.RdSettings{Host: "localhost", Port: 6379, Db: 10}, "test") +func getPool() *rdb.Pool { + pool := rdb.OpenPool(rdb.Settings{Host: "localhost", Port: 6379, Db: 10, ClientName: "test"}, getLogger()) return pool } @@ -24,7 +23,7 @@ func getLogger() *colog.CoLog { } func TestBadConnection(t *testing.T) { - pool := rdb.NewRedisPool(getLogger(), rdb.RdSettings{Host: "dummy", Port: 80}, "test") + pool := rdb.OpenPool(rdb.Settings{Host: "dummy", Port: 80, ClientName: "test"}, getLogger()) conn := pool.Get() assert.NotNil(t, conn) _, err := conn.Do("PING") @@ -33,6 +32,7 @@ func TestBadConnection(t *testing.T) { func TestGetConn(t *testing.T) { pool := getPool() + defer pool.Close() conn := pool.Get() defer conn.Close() @@ -42,9 +42,10 @@ func TestGetConn(t *testing.T) { func TestCloseConn(t *testing.T) { pool := getPool() + defer pool.Close() conn := pool.Get() - assert.EqualValues(t, 0, pool.IdleCount()) + assert.EqualValues(t, 0, pool.RP.IdleCount()) _, err := conn.Do("PING") assert.Nil(t, err) @@ -52,19 +53,20 @@ func TestCloseConn(t *testing.T) { conn.Close() _, err = conn.Do("PING") assert.NotNil(t, err) - assert.EqualValues(t, 1, pool.IdleCount()) + assert.EqualValues(t, 1, pool.RP.IdleCount()) } func TestDoubleCloseConnIsOk(t *testing.T) { pool := getPool() + defer pool.Close() conn := pool.Get() _, err := conn.Do("PING") assert.Nil(t, err) - assert.EqualValues(t, 0, pool.IdleCount()) + assert.EqualValues(t, 0, pool.RP.IdleCount()) conn.Close() - assert.EqualValues(t, 1, pool.IdleCount()) + assert.EqualValues(t, 1, pool.RP.IdleCount()) conn.Close() - assert.EqualValues(t, 1, pool.IdleCount()) + assert.EqualValues(t, 1, pool.RP.IdleCount()) }