Adding convenience autoConn which is closed automatically once it's finished
This commit is contained in:
parent
33b922f181
commit
a9f5855503
71
rdb.go
71
rdb.go
|
@ -18,7 +18,7 @@ const (
|
||||||
ExpireRDMutexSec = 30
|
ExpireRDMutexSec = 30
|
||||||
)
|
)
|
||||||
|
|
||||||
type Settings struct {
|
type RdSettings struct {
|
||||||
Host, Prefix string
|
Host, Prefix string
|
||||||
//will be shown in CLIENT LIST
|
//will be shown in CLIENT LIST
|
||||||
ClientName string
|
ClientName string
|
||||||
|
@ -29,16 +29,16 @@ type Settings struct {
|
||||||
IdleTimeoutSec int
|
IdleTimeoutSec int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Settings) ConnStr() string {
|
func (s *RdSettings) ConnStr() string {
|
||||||
return s.Host + ":" + strconv.Itoa(s.Port)
|
return s.Host + ":" + strconv.Itoa(s.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Pool struct {
|
type Pool struct {
|
||||||
S Settings
|
S RdSettings
|
||||||
RP *redis.Pool
|
RP *redis.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
func OpenPool(s Settings, logger *colog.CoLog) *Pool {
|
func OpenPool(s RdSettings, logger *colog.CoLog) *Pool {
|
||||||
p := &Pool{S: s, RP: newRedisPool(s, logger)}
|
p := &Pool{S: s, RP: newRedisPool(s, logger)}
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
@ -59,12 +59,16 @@ func (p *Pool) Get() redis.Conn {
|
||||||
return conn
|
return conn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Pool) Auto() redis.Conn {
|
||||||
|
return &autoConn{p, nil}
|
||||||
|
}
|
||||||
|
|
||||||
// NOTE: redis connection logs all operations
|
// NOTE: redis connection logs all operations
|
||||||
type rdb struct {
|
type rdb struct {
|
||||||
orig redis.Conn
|
orig redis.Conn
|
||||||
name string
|
name string
|
||||||
logger *colog.CoLog
|
logger *colog.CoLog
|
||||||
s Settings
|
s RdSettings
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rd *rdb) Close() error {
|
func (rd *rdb) Close() error {
|
||||||
|
@ -121,7 +125,7 @@ func (rd *rdb) Receive() (interface{}, error) {
|
||||||
return rd.orig.Receive()
|
return rd.orig.Receive()
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRedisPool(s Settings, logger *colog.CoLog) *redis.Pool {
|
func newRedisPool(s RdSettings, logger *colog.CoLog) *redis.Pool {
|
||||||
|
|
||||||
maxIdle := s.MaxIdle
|
maxIdle := s.MaxIdle
|
||||||
if maxIdle == 0 {
|
if maxIdle == 0 {
|
||||||
|
@ -396,14 +400,57 @@ type tracked struct {
|
||||||
subj redis.Conn
|
subj redis.Conn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t tracked) Close() error {
|
func (t *tracked) Close() error {
|
||||||
res_tracker.Untrack(t)
|
res_tracker.Untrack(t)
|
||||||
return t.subj.Close()
|
return t.subj.Close()
|
||||||
}
|
}
|
||||||
func (t tracked) Do(cmd string, args ...interface{}) (interface{}, error) {
|
func (t *tracked) Do(cmd string, args ...interface{}) (interface{}, error) {
|
||||||
return t.subj.Do(cmd, args...)
|
return t.subj.Do(cmd, args...)
|
||||||
}
|
}
|
||||||
func (t tracked) Send(cmd string, args ...interface{}) error { return t.subj.Send(cmd, args...) }
|
func (t *tracked) Send(cmd string, args ...interface{}) error { return t.subj.Send(cmd, args...) }
|
||||||
func (t tracked) Err() error { return t.subj.Err() }
|
func (t *tracked) Err() error { return t.subj.Err() }
|
||||||
func (t tracked) Flush() error { return t.subj.Flush() }
|
func (t *tracked) Flush() error { return t.subj.Flush() }
|
||||||
func (t tracked) Receive() (interface{}, error) { return t.subj.Receive() }
|
func (t *tracked) Receive() (interface{}, error) { return t.subj.Receive() }
|
||||||
|
|
||||||
|
type autoConn struct {
|
||||||
|
p *Pool
|
||||||
|
pipe redis.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *autoConn) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (c *autoConn) Do(cmd string, args ...interface{}) (interface{}, error) {
|
||||||
|
if c.pipe != nil {
|
||||||
|
return nil, errors.New("There is an active pipeline")
|
||||||
|
}
|
||||||
|
|
||||||
|
rc := c.p.Get()
|
||||||
|
defer rc.Close()
|
||||||
|
return rc.Do(cmd, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *autoConn) Send(cmd string, args ...interface{}) error {
|
||||||
|
if c.pipe == nil {
|
||||||
|
c.pipe = c.p.Get()
|
||||||
|
}
|
||||||
|
return c.pipe.Send(cmd, args...)
|
||||||
|
}
|
||||||
|
func (c *autoConn) Err() error {
|
||||||
|
if c.pipe != nil {
|
||||||
|
return c.pipe.Err()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (c *autoConn) Flush() error {
|
||||||
|
if c.pipe != nil {
|
||||||
|
return c.pipe.Flush()
|
||||||
|
}
|
||||||
|
return errors.New("There is no active pipeline")
|
||||||
|
}
|
||||||
|
func (c *autoConn) Receive() (interface{}, error) {
|
||||||
|
if c.pipe != nil {
|
||||||
|
return c.pipe.Receive()
|
||||||
|
}
|
||||||
|
return nil, errors.New("There is no active pipeline")
|
||||||
|
}
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func getPool() *rdb.Pool {
|
func getPool() *rdb.Pool {
|
||||||
pool := rdb.OpenPool(rdb.Settings{Host: "localhost", Port: 6379, Db: 10, ClientName: "test"}, getLogger())
|
pool := rdb.OpenPool(rdb.RdSettings{Host: "localhost", Port: 6379, Db: 10, ClientName: "test"}, getLogger())
|
||||||
return pool
|
return pool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,7 +23,7 @@ func getLogger() *colog.CoLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBadConnection(t *testing.T) {
|
func TestBadConnection(t *testing.T) {
|
||||||
pool := rdb.OpenPool(rdb.Settings{Host: "dummy", Port: 80, ClientName: "test"}, getLogger())
|
pool := rdb.OpenPool(rdb.RdSettings{Host: "dummy", Port: 80, ClientName: "test"}, getLogger())
|
||||||
conn := pool.Get()
|
conn := pool.Get()
|
||||||
assert.NotNil(t, conn)
|
assert.NotNil(t, conn)
|
||||||
_, err := conn.Do("PING")
|
_, err := conn.Do("PING")
|
||||||
|
|
Loading…
Reference in New Issue