More robust and straight forward pool abstraction
This commit is contained in:
parent
737a24befb
commit
33b922f181
98
rdb.go
98
rdb.go
|
@ -4,13 +4,13 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.bit5.ru/backend/colog"
|
||||
"git.bit5.ru/backend/redigo/redis"
|
||||
"git.bit5.ru/backend/redsync"
|
||||
"git.bit5.ru/backend/res_tracker"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
|
@ -18,25 +18,53 @@ const (
|
|||
ExpireRDMutexSec = 30
|
||||
)
|
||||
|
||||
var pools sync.Map
|
||||
|
||||
type RdSettings struct {
|
||||
type Settings struct {
|
||||
Host, Prefix string
|
||||
//will be shown in CLIENT LIST
|
||||
ClientName string
|
||||
Port int
|
||||
Db int
|
||||
LogLevel int
|
||||
MaxIdle int
|
||||
IdleTimeoutSec int
|
||||
}
|
||||
|
||||
func (s *RdSettings) ConnStr() string {
|
||||
func (s *Settings) ConnStr() string {
|
||||
return s.Host + ":" + strconv.Itoa(s.Port)
|
||||
}
|
||||
|
||||
type Pool struct {
|
||||
S Settings
|
||||
RP *redis.Pool
|
||||
}
|
||||
|
||||
func OpenPool(s Settings, logger *colog.CoLog) *Pool {
|
||||
p := &Pool{S: s, RP: newRedisPool(s, logger)}
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *Pool) Close() {
|
||||
p.RP.Close()
|
||||
}
|
||||
|
||||
func (p *Pool) Get() redis.Conn {
|
||||
conn := p.RP.Get()
|
||||
|
||||
if res_tracker.IsOn() {
|
||||
//let's wrap our connection with the special tracked wrapper
|
||||
conn = &tracked{conn}
|
||||
res_tracker.Track(conn)
|
||||
}
|
||||
|
||||
return conn
|
||||
}
|
||||
|
||||
// NOTE: redis connection logs all operations
|
||||
type rdb struct {
|
||||
orig redis.Conn
|
||||
name string
|
||||
logger *colog.CoLog
|
||||
s RdSettings
|
||||
s Settings
|
||||
}
|
||||
|
||||
func (rd *rdb) Close() error {
|
||||
|
@ -93,28 +121,22 @@ func (rd *rdb) Receive() (interface{}, error) {
|
|||
return rd.orig.Receive()
|
||||
}
|
||||
|
||||
func redisPool(logger *colog.CoLog, s RdSettings, clientName string) *redis.Pool {
|
||||
//TODO: why not simply using s as a key for pools.Load(..)?
|
||||
key := s.Host + ":" + strconv.Itoa(s.Port) + ":" + strconv.Itoa(s.Db) + ":" + s.Prefix
|
||||
v, ok := pools.Load(key)
|
||||
if ok {
|
||||
return v.(*redis.Pool)
|
||||
} else {
|
||||
rdpool := NewRedisPool(logger, s, clientName)
|
||||
pools.Store(key, rdpool)
|
||||
return rdpool
|
||||
}
|
||||
func newRedisPool(s Settings, logger *colog.CoLog) *redis.Pool {
|
||||
|
||||
maxIdle := s.MaxIdle
|
||||
if maxIdle == 0 {
|
||||
maxIdle = 16
|
||||
}
|
||||
|
||||
func NewRedisPool(logger *colog.CoLog, s RdSettings, clientName string) *redis.Pool {
|
||||
return NewRedisPoolEx(logger, s, 16, 240*time.Second, clientName)
|
||||
idleTimeoutSec := s.IdleTimeoutSec
|
||||
if idleTimeoutSec == 0 {
|
||||
idleTimeoutSec = 240
|
||||
}
|
||||
|
||||
func NewRedisPoolEx(logger *colog.CoLog, s RdSettings, maxIdle int, idleTimeout time.Duration, clientName string) *redis.Pool {
|
||||
return &redis.Pool{
|
||||
Wait: false,
|
||||
MaxIdle: maxIdle,
|
||||
IdleTimeout: idleTimeout,
|
||||
IdleTimeout: time.Second * time.Duration(idleTimeoutSec),
|
||||
Dial: func() (redis.Conn, error) {
|
||||
orig, err := redis.Dial("tcp", s.ConnStr())
|
||||
if err != nil {
|
||||
|
@ -129,8 +151,8 @@ func NewRedisPoolEx(logger *colog.CoLog, s RdSettings, maxIdle int, idleTimeout
|
|||
|
||||
// Сохраняем название клиента для диагностики соединений с помощью команды CLIENT LIST.
|
||||
// https://redis.io/commands/client-list/
|
||||
if len(clientName) > 0 {
|
||||
_, err = c.Do("CLIENT", "SETNAME", clientName)
|
||||
if len(s.ClientName) > 0 {
|
||||
_, err = c.Do("CLIENT", "SETNAME", s.ClientName)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
@ -150,37 +172,27 @@ func NewRedisPoolEx(logger *colog.CoLog, s RdSettings, maxIdle int, idleTimeout
|
|||
}
|
||||
}
|
||||
|
||||
func GetRedisConn(logger *colog.CoLog, s RdSettings, clientName string) redis.Conn {
|
||||
conn := redisPool(logger, s, clientName).Get()
|
||||
|
||||
if res_tracker.IsOn() {
|
||||
//let's wrap our connection with the special tracked wrapper
|
||||
conn = tracked{conn}
|
||||
res_tracker.Track(conn)
|
||||
}
|
||||
|
||||
return conn
|
||||
}
|
||||
|
||||
func GetRedisMutex(logger *colog.CoLog, s RdSettings, name string, ttlSec int) *redsync.Mutex {
|
||||
mx, _ := redsync.NewMutexWithPool(name, []*redis.Pool{redisPool(logger, s, "")})
|
||||
func GetRedisMutex(pool *Pool, name string, ttlSec int) *redsync.Mutex {
|
||||
mx, _ := redsync.NewMutexWithPool(name, []*redis.Pool{pool.RP})
|
||||
mx.Expiry = time.Duration(ttlSec) * time.Second
|
||||
return mx
|
||||
}
|
||||
|
||||
func GetRedisMutexAutoTTL(logger *colog.CoLog, s RdSettings, name string) *redsync.Mutex {
|
||||
return GetRedisMutex(logger, s, name, ExpireRDMutexSec)
|
||||
func GetRedisMutexAutoTTL(pool *Pool, name string) *redsync.Mutex {
|
||||
return GetRedisMutex(pool, name, ExpireRDMutexSec)
|
||||
}
|
||||
|
||||
func RemoveRedisMutex(logger *colog.CoLog, s RdSettings, name string) error {
|
||||
rd := redisPool(logger, s, "").Get()
|
||||
func RemoveRedisMutex(pool *Pool, name string) error {
|
||||
rd := pool.Get()
|
||||
defer rd.Close()
|
||||
|
||||
_, err := rd.Do("DEL", name)
|
||||
return err
|
||||
}
|
||||
|
||||
func ExistsRedisMutex(logger *colog.CoLog, s RdSettings, name string) (bool, error) {
|
||||
rd := redisPool(logger, s, "").Get()
|
||||
func ExistsRedisMutex(pool *Pool, name string) (bool, error) {
|
||||
rd := pool.Get()
|
||||
defer rd.Close()
|
||||
|
||||
exists, err := redis.Bool(rd.Do("EXISTS", name))
|
||||
if err != nil {
|
||||
|
|
20
rdb_test.go
20
rdb_test.go
|
@ -6,12 +6,11 @@ import (
|
|||
|
||||
"git.bit5.ru/backend/colog"
|
||||
"git.bit5.ru/backend/rdb"
|
||||
"git.bit5.ru/backend/redigo/redis"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func getPool() *redis.Pool {
|
||||
pool := rdb.NewRedisPool(getLogger(), rdb.RdSettings{Host: "localhost", Port: 6379, Db: 10}, "test")
|
||||
func getPool() *rdb.Pool {
|
||||
pool := rdb.OpenPool(rdb.Settings{Host: "localhost", Port: 6379, Db: 10, ClientName: "test"}, getLogger())
|
||||
return pool
|
||||
}
|
||||
|
||||
|
@ -24,7 +23,7 @@ func getLogger() *colog.CoLog {
|
|||
}
|
||||
|
||||
func TestBadConnection(t *testing.T) {
|
||||
pool := rdb.NewRedisPool(getLogger(), rdb.RdSettings{Host: "dummy", Port: 80}, "test")
|
||||
pool := rdb.OpenPool(rdb.Settings{Host: "dummy", Port: 80, ClientName: "test"}, getLogger())
|
||||
conn := pool.Get()
|
||||
assert.NotNil(t, conn)
|
||||
_, err := conn.Do("PING")
|
||||
|
@ -33,6 +32,7 @@ func TestBadConnection(t *testing.T) {
|
|||
|
||||
func TestGetConn(t *testing.T) {
|
||||
pool := getPool()
|
||||
defer pool.Close()
|
||||
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
@ -42,9 +42,10 @@ func TestGetConn(t *testing.T) {
|
|||
|
||||
func TestCloseConn(t *testing.T) {
|
||||
pool := getPool()
|
||||
defer pool.Close()
|
||||
|
||||
conn := pool.Get()
|
||||
assert.EqualValues(t, 0, pool.IdleCount())
|
||||
assert.EqualValues(t, 0, pool.RP.IdleCount())
|
||||
|
||||
_, err := conn.Do("PING")
|
||||
assert.Nil(t, err)
|
||||
|
@ -52,19 +53,20 @@ func TestCloseConn(t *testing.T) {
|
|||
conn.Close()
|
||||
_, err = conn.Do("PING")
|
||||
assert.NotNil(t, err)
|
||||
assert.EqualValues(t, 1, pool.IdleCount())
|
||||
assert.EqualValues(t, 1, pool.RP.IdleCount())
|
||||
}
|
||||
|
||||
func TestDoubleCloseConnIsOk(t *testing.T) {
|
||||
pool := getPool()
|
||||
defer pool.Close()
|
||||
|
||||
conn := pool.Get()
|
||||
_, err := conn.Do("PING")
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.EqualValues(t, 0, pool.IdleCount())
|
||||
assert.EqualValues(t, 0, pool.RP.IdleCount())
|
||||
conn.Close()
|
||||
assert.EqualValues(t, 1, pool.IdleCount())
|
||||
assert.EqualValues(t, 1, pool.RP.IdleCount())
|
||||
conn.Close()
|
||||
assert.EqualValues(t, 1, pool.IdleCount())
|
||||
assert.EqualValues(t, 1, pool.RP.IdleCount())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue