rdb/rdb.go

423 lines
9.2 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rdb
import (
"fmt"
"math"
"strconv"
"time"
"git.bit5.ru/backend/redigo/redis"
"git.bit5.ru/backend/redsync"
"git.bit5.ru/backend/res_tracker"
"github.com/go-logr/logr"
"github.com/pkg/errors"
)
const (
ExpireRDMutexSec = 30
)
type RdSettings struct {
Host, Prefix string
//will be shown in CLIENT LIST
ClientName string
Port int
Db int
LogLevel int
MaxIdle int
IdleTimeoutSec int
}
func (s *RdSettings) ConnStr() string {
return s.Host + ":" + strconv.Itoa(s.Port)
}
type Pool struct {
S RdSettings
RP *redis.Pool
}
func OpenPool(s RdSettings, logger logr.Logger) *Pool {
p := &Pool{S: s, RP: newRedisPool(s, logger)}
return p
}
func (p *Pool) Close() {
p.RP.Close()
}
func (p *Pool) Get() redis.Conn {
conn := p.RP.Get()
if res_tracker.IsOn() {
//let's wrap our connection with the special tracked wrapper
conn = &tracked{conn}
res_tracker.Track(conn)
}
return conn
}
//NOTE: convenience method for 'one-shot' commands
func (p *Pool) Do(cmd string, args ...interface{}) (interface{}, error) {
rc := p.Get()
defer rc.Close()
return rc.Do(cmd, args...)
}
//NOTE: redis connection logs all operations
type rdb struct {
orig redis.Conn
name string
logger logr.Logger
s RdSettings
}
func (rd *rdb) Close() error {
return rd.orig.Close()
}
func (rd *rdb) Err() error {
return rd.orig.Err()
}
func (rd *rdb) Flush() error {
return rd.orig.Flush()
}
func (rd *rdb) Do(command string, args ...interface{}) (interface{}, error) {
if rd.s.LogLevel > 1 {
if len(command) != 0 {
var s string
for i := 0; i < len(args); i++ {
s += fmt.Sprintf("%v ", args[i])
}
rd.logger.WithCallDepth(2).V(1).Info(command + " " + s)
}
} else if rd.s.LogLevel > 0 {
//for this level not logging too verbose commands
if len(command) != 0 && command != "PING" {
var s string
for i := 0; i < len(args) && i < 5; i++ {
s += fmt.Sprintf("%v ", args[i])
}
if len(args) > 5 {
s += " ..."
}
rd.logger.WithCallDepth(2).V(1).Info(command + " " + s)
}
}
return rd.orig.Do(command, args...)
}
func (rd *rdb) Send(command string, args ...interface{}) error {
if rd.s.LogLevel > 0 {
if len(command) != 0 {
var s string
for i := 0; i < len(args); i++ {
s += fmt.Sprintf("%v ", args[i])
}
rd.logger.WithCallDepth(2).V(1).Info(command + " " + s)
}
}
return rd.orig.Send(command, args...)
}
func (rd *rdb) Receive() (interface{}, error) {
return rd.orig.Receive()
}
func newRedisPool(s RdSettings, logger logr.Logger) *redis.Pool {
maxIdle := s.MaxIdle
if maxIdle == 0 {
maxIdle = 16
}
idleTimeoutSec := s.IdleTimeoutSec
if idleTimeoutSec == 0 {
idleTimeoutSec = 240
}
connStr := s.ConnStr()
if len(s.Prefix) > 0 {
logger = logger.WithValues("rd", s.Prefix)
}
return &redis.Pool{
Wait: false,
MaxIdle: maxIdle,
IdleTimeout: time.Second * time.Duration(idleTimeoutSec),
Dial: func() (redis.Conn, error) {
orig, err := redis.Dial("tcp", connStr)
if err != nil {
return nil, errors.WithStack(err)
}
c := &rdb{
orig: orig,
name: s.Prefix,
logger: logger,
s: s,
}
// Сохраняем название клиента для диагностики соединений с помощью команды CLIENT LIST.
// https://redis.io/commands/client-list/
if len(s.ClientName) > 0 {
_, err = c.Do("CLIENT", "SETNAME", s.ClientName)
if err != nil {
return nil, errors.WithStack(err)
}
}
_, err = c.Do("SELECT", s.Db)
if err != nil {
return nil, errors.WithStack(err)
}
return c, err
},
TestOnBorrow: func(c redis.Conn, _ time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
func GetRedisMutex(pool *Pool, name string, ttlSec int) *redsync.Mutex {
mx, _ := redsync.NewMutexWithPool(name, []*redis.Pool{pool.RP})
mx.Expiry = time.Duration(ttlSec) * time.Second
return mx
}
func GetRedisMutexAutoTTL(pool *Pool, name string) *redsync.Mutex {
return GetRedisMutex(pool, name, ExpireRDMutexSec)
}
func RemoveRedisMutex(pool *Pool, name string) error {
rd := pool.Get()
defer rd.Close()
_, err := rd.Do("DEL", name)
return err
}
func ExistsRedisMutex(pool *Pool, name string) (bool, error) {
rd := pool.Get()
defer rd.Close()
exists, err := redis.Bool(rd.Do("EXISTS", name))
if err != nil {
if err == redis.ErrNil {
return false, nil
}
return false, err
}
return exists, nil
}
func GetUint32(rd redis.Conn, key string) (uint32, error) {
nUint64, err := redis.Uint64(rd.Do("GET", key))
if err != nil {
return 0, err
}
if rdErr := rd.Err(); rdErr != nil {
return 0, rdErr
}
nUint32, err := uint64ToUint32(nUint64)
if err != nil {
return 0, err
}
return nUint32, nil
}
func GetListLen(rd redis.Conn, key string) (uint32, error) {
nUint64, err := redis.Uint64(rd.Do("LLEN", key))
if err != nil {
return 0, err
}
if rdErr := rd.Err(); rdErr != nil {
return 0, rdErr
}
nUint32, err := uint64ToUint32(nUint64)
if err != nil {
return 0, err
}
return nUint32, nil
}
func SendRpushSliceUint32(rd redis.Conn, key string, nums []uint32) {
if len(nums) == 0 {
return
}
params := make([]interface{}, 0, 1+len(nums))
params = append(params, key)
for _, n := range nums {
params = append(params, n)
}
rd.Send("RPUSH", params...)
}
func SendRpushSliceString(rd redis.Conn, key string, strs []string) {
if len(strs) == 0 {
return
}
params := make([]interface{}, 0, 1+len(strs))
params = append(params, key)
for _, s := range strs {
params = append(params, s)
}
rd.Send("RPUSH", params...)
}
func uint64ToUint32(n64 uint64) (uint32, error) {
if n64 > math.MaxUint32 {
return 0, errors.Errorf("Can not convert uint64 %d to uint32.", n64)
}
return uint32(n64), nil
}
// Uint32s is a helper that converts an array command reply to a []uint32.
// If err is not equal to nil, then Uint32s returns nil, err.
func Uint32s(reply interface{}, err error) ([]uint32, error) {
var nums []uint32
if reply == nil {
return nums, redis.ErrNil
}
values, err := redis.Values(reply, err)
if err != nil {
return nums, err
}
if err := redis.ScanSlice(values, &nums); err != nil {
return nums, err
}
return nums, nil
}
func ReceiveUint32s(rdconn redis.Conn) ([]uint32, error) {
nums, err := Uint32s(rdconn.Receive())
if e := getErr(rdconn, err); e != nil {
return nil, e
}
return nums, nil
}
func Uint32(reply interface{}, err error) (uint32, error) {
nUint64, err := redis.Uint64(reply, err)
if err != nil {
return 0, errors.Wrap(err, "Rdb can not convert reply to uint64.")
}
nUint32, err := uint64ToUint32(nUint64)
if err != nil {
return 0, err
}
return nUint32, nil
}
func ReceiveInt(rdConn redis.Conn) (int, error) {
n, err := redis.Int(rdConn.Receive())
if err != nil {
return 0, errors.Wrap(err, "Can not execute ReceiveInt(). Got error from redis.Int().")
}
if rdErr := rdConn.Err(); rdErr != nil {
return 0, errors.Wrap(rdErr, "Can not execute ReceiveInt(). Got error from rdConn.Err().")
}
return n, nil
}
func ReceiveUint32(rdConn redis.Conn) (uint32, error) {
n, err := Uint32(rdConn.Receive())
if err != nil {
return 0, errors.Wrap(err, "Can not execute ReceiveUint32(). Got error from Uint32().")
}
if rdErr := rdConn.Err(); rdErr != nil {
return 0, errors.Wrap(rdErr, "Can not execute ReceiveUint32(). Got error from rdConn.Err().")
}
return n, nil
}
func GetUint32s(conn redis.Conn, commandName string, args ...interface{}) ([]uint32, error) {
return Uint32s(Do(conn, commandName, args...))
}
func Do(conn redis.Conn, commandName string, args ...interface{}) (reply interface{}, err error) {
result, err := conn.Do(commandName, args...)
if e := getErr(conn, err); e != nil {
return nil, e
}
return result, nil
}
// Deprecated. Use func Flush2.
func Flush(conn redis.Conn) error {
err := conn.Flush()
return getErr(conn, err)
}
// TODO: Delete func Flush.
// TODO: Rename func Flush2 to Flush.
func Flush2(conn redis.Conn) error {
if err := conn.Flush(); err != nil {
return errors.Wrap(err, "Can not execute function Flush2. Got error from redis.Conn Flush.")
}
if err := conn.Err(); err != nil {
return errors.Wrap(err, "Can not execute function Flush2. Got redis connection error.")
}
return nil
}
func Err(conn redis.Conn, err error) error {
return getErr(conn, err)
}
// NOTE: workaround for redigo not returning original connection error
func getErr(conn redis.Conn, err error) error {
if err != nil {
if err == redis.ErrNil {
return nil
}
return errors.Wrap(err, "Got redis error.")
}
if connErr := conn.Err(); connErr != nil {
return errors.Wrap(connErr, "Got redis connection error.")
}
return nil
}
type tracked struct {
subj redis.Conn
}
func (t *tracked) Close() error {
res_tracker.Untrack(t)
return t.subj.Close()
}
func (t *tracked) Do(cmd string, args ...interface{}) (interface{}, error) {
return t.subj.Do(cmd, args...)
}
func (t *tracked) Send(cmd string, args ...interface{}) error { return t.subj.Send(cmd, args...) }
func (t *tracked) Err() error { return t.subj.Err() }
func (t *tracked) Flush() error { return t.subj.Flush() }
func (t *tracked) Receive() (interface{}, error) { return t.subj.Receive() }