commit c8376b3ef85997a3638cef9cc4054e1ee6668f93 Author: Pavel Shevaev Date: Tue Oct 25 18:38:58 2022 +0300 First commit diff --git a/README.markdown b/README.markdown new file mode 100644 index 0000000..ab42c31 --- /dev/null +++ b/README.markdown @@ -0,0 +1,44 @@ +Redigo +====== + +Redigo is a [Go](http://golang.org/) client for the [Redis](http://redis.io/) database. + +Features +------- + +* A [Print-like](http://godoc.org/github.com/garyburd/redigo/redis#hdr-Executing_Commands) API with support for all Redis commands. +* [Pipelining](http://godoc.org/github.com/garyburd/redigo/redis#hdr-Pipelining), including pipelined transactions. +* [Publish/Subscribe](http://godoc.org/github.com/garyburd/redigo/redis#hdr-Publish_and_Subscribe). +* [Connection pooling](http://godoc.org/github.com/garyburd/redigo/redis#Pool). +* [Script helper type](http://godoc.org/github.com/garyburd/redigo/redis#Script) with optimistic use of EVALSHA. +* [Helper functions](http://godoc.org/github.com/garyburd/redigo/redis#hdr-Reply_Helpers) for working with command replies. + +Documentation +------------- + +- [API Reference](http://godoc.org/github.com/garyburd/redigo/redis) +- [FAQ](https://github.com/garyburd/redigo/wiki/FAQ) + +Installation +------------ + +Install Redigo using the "go get" command: + + go get github.com/garyburd/redigo/redis + +The Go distribution is Redigo's only dependency. + +Contributing +------------ + +Contributions are welcome. + +Before writing code, send mail to gary@beagledreams.com to discuss what you +plan to do. This gives me a chance to validate the design, avoid duplication of +effort and ensure that the changes fit the goals of the project. Do not start +the discussion with a pull request. + +License +------- + +Redigo is available under the [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html). diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..72e0046 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.bit5.ru/backend/redigo + +go 1.13 diff --git a/internal/commandinfo.go b/internal/commandinfo.go new file mode 100644 index 0000000..7ad1ade --- /dev/null +++ b/internal/commandinfo.go @@ -0,0 +1,45 @@ +// Copyright 2014 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package internal // import "github.com/garyburd/redigo/internal" + +import ( + "strings" +) + +const ( + WatchState = 1 << iota + MultiState + SubscribeState + MonitorState +) + +type CommandInfo struct { + Set, Clear int +} + +var commandInfos = map[string]CommandInfo{ + "WATCH": {Set: WatchState}, + "UNWATCH": {Clear: WatchState}, + "MULTI": {Set: MultiState}, + "EXEC": {Clear: WatchState | MultiState}, + "DISCARD": {Clear: WatchState | MultiState}, + "PSUBSCRIBE": {Set: SubscribeState}, + "SUBSCRIBE": {Set: SubscribeState}, + "MONITOR": {Set: MonitorState}, +} + +func LookupCommandInfo(commandName string) CommandInfo { + return commandInfos[strings.ToUpper(commandName)] +} diff --git a/internal/redistest/testdb.go b/internal/redistest/testdb.go new file mode 100644 index 0000000..5f955c4 --- /dev/null +++ b/internal/redistest/testdb.go @@ -0,0 +1,65 @@ +// Copyright 2014 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +// Package redistest contains utilities for writing Redigo tests. +package redistest + +import ( + "errors" + "time" + + "github.com/garyburd/redigo/redis" +) + +type testConn struct { + redis.Conn +} + +func (t testConn) Close() error { + _, err := t.Conn.Do("SELECT", "9") + if err != nil { + return nil + } + _, err = t.Conn.Do("FLUSHDB") + if err != nil { + return err + } + return t.Conn.Close() +} + +// Dial dials the local Redis server and selects database 9. To prevent +// stomping on real data, DialTestDB fails if database 9 contains data. The +// returned connection flushes database 9 on close. +func Dial() (redis.Conn, error) { + c, err := redis.DialTimeout("tcp", ":6379", 0, 1*time.Second, 1*time.Second) + if err != nil { + return nil, err + } + + _, err = c.Do("SELECT", "9") + if err != nil { + return nil, err + } + + n, err := redis.Int(c.Do("DBSIZE")) + if err != nil { + return nil, err + } + + if n != 0 { + return nil, errors.New("database #9 is not empty, test can not continue") + } + + return testConn{c}, nil +} diff --git a/redis/conn.go b/redis/conn.go new file mode 100644 index 0000000..ac0e971 --- /dev/null +++ b/redis/conn.go @@ -0,0 +1,455 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "io" + "net" + "strconv" + "sync" + "time" +) + +// conn is the low-level implementation of Conn +type conn struct { + + // Shared + mu sync.Mutex + pending int + err error + conn net.Conn + + // Read + readTimeout time.Duration + br *bufio.Reader + + // Write + writeTimeout time.Duration + bw *bufio.Writer + + // Scratch space for formatting argument length. + // '*' or '$', length, "\r\n" + lenScratch [32]byte + + // Scratch space for formatting integers and floats. + numScratch [40]byte +} + +// Dial connects to the Redis server at the given network and address. +func Dial(network, address string) (Conn, error) { + dialer := xDialer{} + return dialer.Dial(network, address) +} + +// DialTimeout acts like Dial but takes timeouts for establishing the +// connection to the server, writing a command and reading a reply. +func DialTimeout(network, address string, connectTimeout, readTimeout, writeTimeout time.Duration) (Conn, error) { + netDialer := net.Dialer{Timeout: connectTimeout} + dialer := xDialer{ + NetDial: netDialer.Dial, + ReadTimeout: readTimeout, + WriteTimeout: writeTimeout, + } + return dialer.Dial(network, address) +} + +// A Dialer specifies options for connecting to a Redis server. +type xDialer struct { + // NetDial specifies the dial function for creating TCP connections. If + // NetDial is nil, then net.Dial is used. + NetDial func(network, addr string) (net.Conn, error) + + // ReadTimeout specifies the timeout for reading a single command + // reply. If ReadTimeout is zero, then no timeout is used. + ReadTimeout time.Duration + + // WriteTimeout specifies the timeout for writing a single command. If + // WriteTimeout is zero, then no timeout is used. + WriteTimeout time.Duration +} + +// Dial connects to the Redis server at address on the named network. +func (d *xDialer) Dial(network, address string) (Conn, error) { + dial := d.NetDial + if dial == nil { + dial = net.Dial + } + netConn, err := dial(network, address) + if err != nil { + return nil, err + } + return &conn{ + conn: netConn, + bw: bufio.NewWriter(netConn), + br: bufio.NewReader(netConn), + readTimeout: d.ReadTimeout, + writeTimeout: d.WriteTimeout, + }, nil +} + +// NewConn returns a new Redigo connection for the given net connection. +func NewConn(netConn net.Conn, readTimeout, writeTimeout time.Duration) Conn { + return &conn{ + conn: netConn, + bw: bufio.NewWriter(netConn), + br: bufio.NewReader(netConn), + readTimeout: readTimeout, + writeTimeout: writeTimeout, + } +} + +func (c *conn) Close() error { + c.mu.Lock() + err := c.err + if c.err == nil { + c.err = errors.New("redigo: closed") + err = c.conn.Close() + } + c.mu.Unlock() + return err +} + +func (c *conn) fatal(err error) error { + c.mu.Lock() + if c.err == nil { + c.err = err + // Close connection to force errors on subsequent calls and to unblock + // other reader or writer. + c.conn.Close() + } + c.mu.Unlock() + return err +} + +func (c *conn) Err() error { + c.mu.Lock() + err := c.err + c.mu.Unlock() + return err +} + +func (c *conn) writeLen(prefix byte, n int) error { + c.lenScratch[len(c.lenScratch)-1] = '\n' + c.lenScratch[len(c.lenScratch)-2] = '\r' + i := len(c.lenScratch) - 3 + for { + c.lenScratch[i] = byte('0' + n%10) + i -= 1 + n = n / 10 + if n == 0 { + break + } + } + c.lenScratch[i] = prefix + _, err := c.bw.Write(c.lenScratch[i:]) + return err +} + +func (c *conn) writeString(s string) error { + c.writeLen('$', len(s)) + c.bw.WriteString(s) + _, err := c.bw.WriteString("\r\n") + return err +} + +func (c *conn) writeBytes(p []byte) error { + c.writeLen('$', len(p)) + c.bw.Write(p) + _, err := c.bw.WriteString("\r\n") + return err +} + +func (c *conn) writeInt64(n int64) error { + return c.writeBytes(strconv.AppendInt(c.numScratch[:0], n, 10)) +} + +func (c *conn) writeFloat64(n float64) error { + return c.writeBytes(strconv.AppendFloat(c.numScratch[:0], n, 'g', -1, 64)) +} + +func (c *conn) writeCommand(cmd string, args []interface{}) (err error) { + c.writeLen('*', 1+len(args)) + err = c.writeString(cmd) + for _, arg := range args { + if err != nil { + break + } + switch arg := arg.(type) { + case string: + err = c.writeString(arg) + case []byte: + err = c.writeBytes(arg) + case int: + err = c.writeInt64(int64(arg)) + case int64: + err = c.writeInt64(arg) + case float64: + err = c.writeFloat64(arg) + case bool: + if arg { + err = c.writeString("1") + } else { + err = c.writeString("0") + } + case nil: + err = c.writeString("") + default: + var buf bytes.Buffer + fmt.Fprint(&buf, arg) + err = c.writeBytes(buf.Bytes()) + } + } + return err +} + +type protocolError string + +func (pe protocolError) Error() string { + return fmt.Sprintf("redigo: %s (possible server error or unsupported concurrent read by application)", string(pe)) +} + +func (c *conn) readLine() ([]byte, error) { + p, err := c.br.ReadSlice('\n') + if err == bufio.ErrBufferFull { + return nil, protocolError("long response line") + } + if err != nil { + return nil, err + } + i := len(p) - 2 + if i < 0 || p[i] != '\r' { + return nil, protocolError("bad response line terminator") + } + return p[:i], nil +} + +// parseLen parses bulk string and array lengths. +func parseLen(p []byte) (int, error) { + if len(p) == 0 { + return -1, protocolError("malformed length") + } + + if p[0] == '-' && len(p) == 2 && p[1] == '1' { + // handle $-1 and $-1 null replies. + return -1, nil + } + + var n int + for _, b := range p { + n *= 10 + if b < '0' || b > '9' { + return -1, protocolError("illegal bytes in length") + } + n += int(b - '0') + } + + return n, nil +} + +// parseInt parses an integer reply. +func parseInt(p []byte) (interface{}, error) { + if len(p) == 0 { + return 0, protocolError("malformed integer") + } + + var negate bool + if p[0] == '-' { + negate = true + p = p[1:] + if len(p) == 0 { + return 0, protocolError("malformed integer") + } + } + + var n int64 + for _, b := range p { + n *= 10 + if b < '0' || b > '9' { + return 0, protocolError("illegal bytes in length") + } + n += int64(b - '0') + } + + if negate { + n = -n + } + return n, nil +} + +var ( + okReply interface{} = "OK" + pongReply interface{} = "PONG" +) + +func (c *conn) readReply() (interface{}, error) { + line, err := c.readLine() + if err != nil { + return nil, err + } + if len(line) == 0 { + return nil, protocolError("short response line") + } + switch line[0] { + case '+': + switch { + case len(line) == 3 && line[1] == 'O' && line[2] == 'K': + // Avoid allocation for frequent "+OK" response. + return okReply, nil + case len(line) == 5 && line[1] == 'P' && line[2] == 'O' && line[3] == 'N' && line[4] == 'G': + // Avoid allocation in PING command benchmarks :) + return pongReply, nil + default: + return string(line[1:]), nil + } + case '-': + return Error(string(line[1:])), nil + case ':': + return parseInt(line[1:]) + case '$': + n, err := parseLen(line[1:]) + if n < 0 || err != nil { + return nil, err + } + p := make([]byte, n) + _, err = io.ReadFull(c.br, p) + if err != nil { + return nil, err + } + if line, err := c.readLine(); err != nil { + return nil, err + } else if len(line) != 0 { + return nil, protocolError("bad bulk string format") + } + return p, nil + case '*': + n, err := parseLen(line[1:]) + if n < 0 || err != nil { + return nil, err + } + r := make([]interface{}, n) + for i := range r { + r[i], err = c.readReply() + if err != nil { + return nil, err + } + } + return r, nil + } + return nil, protocolError("unexpected response line") +} + +func (c *conn) Send(cmd string, args ...interface{}) error { + c.mu.Lock() + c.pending += 1 + c.mu.Unlock() + if c.writeTimeout != 0 { + c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout)) + } + if err := c.writeCommand(cmd, args); err != nil { + return c.fatal(err) + } + return nil +} + +func (c *conn) Flush() error { + if c.writeTimeout != 0 { + c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout)) + } + if err := c.bw.Flush(); err != nil { + return c.fatal(err) + } + return nil +} + +func (c *conn) Receive() (reply interface{}, err error) { + if c.readTimeout != 0 { + c.conn.SetReadDeadline(time.Now().Add(c.readTimeout)) + } + if reply, err = c.readReply(); err != nil { + return nil, c.fatal(err) + } + // When using pub/sub, the number of receives can be greater than the + // number of sends. To enable normal use of the connection after + // unsubscribing from all channels, we do not decrement pending to a + // negative value. + // + // The pending field is decremented after the reply is read to handle the + // case where Receive is called before Send. + c.mu.Lock() + if c.pending > 0 { + c.pending -= 1 + } + c.mu.Unlock() + if err, ok := reply.(Error); ok { + return nil, err + } + return +} + +func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) { + c.mu.Lock() + pending := c.pending + c.pending = 0 + c.mu.Unlock() + + if cmd == "" && pending == 0 { + return nil, nil + } + + if c.writeTimeout != 0 { + c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout)) + } + + if cmd != "" { + c.writeCommand(cmd, args) + } + + if err := c.bw.Flush(); err != nil { + return nil, c.fatal(err) + } + + if c.readTimeout != 0 { + c.conn.SetReadDeadline(time.Now().Add(c.readTimeout)) + } + + if cmd == "" { + reply := make([]interface{}, pending) + for i := range reply { + r, e := c.readReply() + if e != nil { + return nil, c.fatal(e) + } + reply[i] = r + } + return reply, nil + } + + var err error + var reply interface{} + for i := 0; i <= pending; i++ { + var e error + if reply, e = c.readReply(); e != nil { + return nil, c.fatal(e) + } + if e, ok := reply.(Error); ok && err == nil { + err = e + } + } + return reply, err +} diff --git a/redis/conn_test.go b/redis/conn_test.go new file mode 100644 index 0000000..8003701 --- /dev/null +++ b/redis/conn_test.go @@ -0,0 +1,542 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis_test + +import ( + "bufio" + "bytes" + "math" + "net" + "reflect" + "strings" + "testing" + "time" + + "github.com/garyburd/redigo/internal/redistest" + "github.com/garyburd/redigo/redis" +) + +var writeTests = []struct { + args []interface{} + expected string +}{ + { + []interface{}{"SET", "key", "value"}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n", + }, + { + []interface{}{"SET", "key", "value"}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n", + }, + { + []interface{}{"SET", "key", byte(100)}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$3\r\n100\r\n", + }, + { + []interface{}{"SET", "key", 100}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$3\r\n100\r\n", + }, + { + []interface{}{"SET", "key", int64(math.MinInt64)}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$20\r\n-9223372036854775808\r\n", + }, + { + []interface{}{"SET", "key", float64(1349673917.939762)}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$21\r\n1.349673917939762e+09\r\n", + }, + { + []interface{}{"SET", "key", ""}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$0\r\n\r\n", + }, + { + []interface{}{"SET", "key", nil}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$0\r\n\r\n", + }, + { + []interface{}{"ECHO", true, false}, + "*3\r\n$4\r\nECHO\r\n$1\r\n1\r\n$1\r\n0\r\n", + }, +} + +func TestWrite(t *testing.T) { + for _, tt := range writeTests { + var buf bytes.Buffer + rw := bufio.ReadWriter{Writer: bufio.NewWriter(&buf)} + c := redis.NewConnBufio(rw) + err := c.Send(tt.args[0].(string), tt.args[1:]...) + if err != nil { + t.Errorf("Send(%v) returned error %v", tt.args, err) + continue + } + rw.Flush() + actual := buf.String() + if actual != tt.expected { + t.Errorf("Send(%v) = %q, want %q", tt.args, actual, tt.expected) + } + } +} + +var errorSentinel = &struct{}{} + +var readTests = []struct { + reply string + expected interface{} +}{ + { + "+OK\r\n", + "OK", + }, + { + "+PONG\r\n", + "PONG", + }, + { + "@OK\r\n", + errorSentinel, + }, + { + "$6\r\nfoobar\r\n", + []byte("foobar"), + }, + { + "$-1\r\n", + nil, + }, + { + ":1\r\n", + int64(1), + }, + { + ":-2\r\n", + int64(-2), + }, + { + "*0\r\n", + []interface{}{}, + }, + { + "*-1\r\n", + nil, + }, + { + "*4\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$5\r\nHello\r\n$5\r\nWorld\r\n", + []interface{}{[]byte("foo"), []byte("bar"), []byte("Hello"), []byte("World")}, + }, + { + "*3\r\n$3\r\nfoo\r\n$-1\r\n$3\r\nbar\r\n", + []interface{}{[]byte("foo"), nil, []byte("bar")}, + }, + + { + // "x" is not a valid length + "$x\r\nfoobar\r\n", + errorSentinel, + }, + { + // -2 is not a valid length + "$-2\r\n", + errorSentinel, + }, + { + // "x" is not a valid integer + ":x\r\n", + errorSentinel, + }, + { + // missing \r\n following value + "$6\r\nfoobar", + errorSentinel, + }, + { + // short value + "$6\r\nxx", + errorSentinel, + }, + { + // long value + "$6\r\nfoobarx\r\n", + errorSentinel, + }, +} + +func TestRead(t *testing.T) { + for _, tt := range readTests { + rw := bufio.ReadWriter{ + Reader: bufio.NewReader(strings.NewReader(tt.reply)), + Writer: bufio.NewWriter(nil), // writer need to support Flush + } + c := redis.NewConnBufio(rw) + actual, err := c.Receive() + if tt.expected == errorSentinel { + if err == nil { + t.Errorf("Receive(%q) did not return expected error", tt.reply) + } + } else { + if err != nil { + t.Errorf("Receive(%q) returned error %v", tt.reply, err) + continue + } + if !reflect.DeepEqual(actual, tt.expected) { + t.Errorf("Receive(%q) = %v, want %v", tt.reply, actual, tt.expected) + } + } + } +} + +var testCommands = []struct { + args []interface{} + expected interface{} +}{ + { + []interface{}{"PING"}, + "PONG", + }, + { + []interface{}{"SET", "foo", "bar"}, + "OK", + }, + { + []interface{}{"GET", "foo"}, + []byte("bar"), + }, + { + []interface{}{"GET", "nokey"}, + nil, + }, + { + []interface{}{"MGET", "nokey", "foo"}, + []interface{}{nil, []byte("bar")}, + }, + { + []interface{}{"INCR", "mycounter"}, + int64(1), + }, + { + []interface{}{"LPUSH", "mylist", "foo"}, + int64(1), + }, + { + []interface{}{"LPUSH", "mylist", "bar"}, + int64(2), + }, + { + []interface{}{"LRANGE", "mylist", 0, -1}, + []interface{}{[]byte("bar"), []byte("foo")}, + }, + { + []interface{}{"MULTI"}, + "OK", + }, + { + []interface{}{"LRANGE", "mylist", 0, -1}, + "QUEUED", + }, + { + []interface{}{"PING"}, + "QUEUED", + }, + { + []interface{}{"EXEC"}, + []interface{}{ + []interface{}{[]byte("bar"), []byte("foo")}, + "PONG", + }, + }, +} + +func TestDoCommands(t *testing.T) { + c, err := redistest.Dial() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + for _, cmd := range testCommands { + actual, err := c.Do(cmd.args[0].(string), cmd.args[1:]...) + if err != nil { + t.Errorf("Do(%v) returned error %v", cmd.args, err) + continue + } + if !reflect.DeepEqual(actual, cmd.expected) { + t.Errorf("Do(%v) = %v, want %v", cmd.args, actual, cmd.expected) + } + } +} + +func TestPipelineCommands(t *testing.T) { + c, err := redistest.Dial() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + for _, cmd := range testCommands { + if err := c.Send(cmd.args[0].(string), cmd.args[1:]...); err != nil { + t.Fatalf("Send(%v) returned error %v", cmd.args, err) + } + } + if err := c.Flush(); err != nil { + t.Errorf("Flush() returned error %v", err) + } + for _, cmd := range testCommands { + actual, err := c.Receive() + if err != nil { + t.Fatalf("Receive(%v) returned error %v", cmd.args, err) + } + if !reflect.DeepEqual(actual, cmd.expected) { + t.Errorf("Receive(%v) = %v, want %v", cmd.args, actual, cmd.expected) + } + } +} + +func TestBlankCommmand(t *testing.T) { + c, err := redistest.Dial() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + for _, cmd := range testCommands { + if err := c.Send(cmd.args[0].(string), cmd.args[1:]...); err != nil { + t.Fatalf("Send(%v) returned error %v", cmd.args, err) + } + } + reply, err := redis.Values(c.Do("")) + if err != nil { + t.Fatalf("Do() returned error %v", err) + } + if len(reply) != len(testCommands) { + t.Fatalf("len(reply)=%d, want %d", len(reply), len(testCommands)) + } + for i, cmd := range testCommands { + actual := reply[i] + if !reflect.DeepEqual(actual, cmd.expected) { + t.Errorf("Receive(%v) = %v, want %v", cmd.args, actual, cmd.expected) + } + } +} + +func TestRecvBeforeSend(t *testing.T) { + c, err := redistest.Dial() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer c.Close() + done := make(chan struct{}) + go func() { + c.Receive() + close(done) + }() + time.Sleep(time.Millisecond) + c.Send("PING") + c.Flush() + <-done + _, err = c.Do("") + if err != nil { + t.Fatalf("error=%v", err) + } +} + +func TestError(t *testing.T) { + c, err := redistest.Dial() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + c.Do("SET", "key", "val") + _, err = c.Do("HSET", "key", "fld", "val") + if err == nil { + t.Errorf("Expected err for HSET on string key.") + } + if c.Err() != nil { + t.Errorf("Conn has Err()=%v, expect nil", c.Err()) + } + _, err = c.Do("SET", "key", "val") + if err != nil { + t.Errorf("Do(SET, key, val) returned error %v, expected nil.", err) + } +} + +func TestReadDeadline(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("net.Listen returned %v", err) + } + defer l.Close() + + go func() { + for { + c, err := l.Accept() + if err != nil { + return + } + go func() { + time.Sleep(time.Second) + c.Write([]byte("+OK\r\n")) + c.Close() + }() + } + }() + + c1, err := redis.DialTimeout(l.Addr().Network(), l.Addr().String(), 0, time.Millisecond, 0) + if err != nil { + t.Fatalf("redis.Dial returned %v", err) + } + defer c1.Close() + + _, err = c1.Do("PING") + if err == nil { + t.Fatalf("c1.Do() returned nil, expect error") + } + if c1.Err() == nil { + t.Fatalf("c1.Err() = nil, expect error") + } + + c2, err := redis.DialTimeout(l.Addr().Network(), l.Addr().String(), 0, time.Millisecond, 0) + if err != nil { + t.Fatalf("redis.Dial returned %v", err) + } + defer c2.Close() + + c2.Send("PING") + c2.Flush() + _, err = c2.Receive() + if err == nil { + t.Fatalf("c2.Receive() returned nil, expect error") + } + if c2.Err() == nil { + t.Fatalf("c2.Err() = nil, expect error") + } +} + +// Connect to local instance of Redis running on the default port. +func ExampleDial(x int) { + c, err := redis.Dial("tcp", ":6379") + if err != nil { + // handle error + } + defer c.Close() +} + +// TextExecError tests handling of errors in a transaction. See +// http://redis.io/topics/transactions for information on how Redis handles +// errors in a transaction. +func TestExecError(t *testing.T) { + c, err := redistest.Dial() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + // Execute commands that fail before EXEC is called. + + c.Do("ZADD", "k0", 0, 0) + c.Send("MULTI") + c.Send("NOTACOMMAND", "k0", 0, 0) + c.Send("ZINCRBY", "k0", 0, 0) + v, err := c.Do("EXEC") + if err == nil { + t.Fatalf("EXEC returned values %v, expected error", v) + } + + // Execute commands that fail after EXEC is called. The first command + // returns an error. + + c.Do("ZADD", "k1", 0, 0) + c.Send("MULTI") + c.Send("HSET", "k1", 0, 0) + c.Send("ZINCRBY", "k1", 0, 0) + v, err = c.Do("EXEC") + if err != nil { + t.Fatalf("EXEC returned error %v", err) + } + + vs, err := redis.Values(v, nil) + if err != nil { + t.Fatalf("Values(v) returned error %v", err) + } + + if len(vs) != 2 { + t.Fatalf("len(vs) == %d, want 2", len(vs)) + } + + if _, ok := vs[0].(error); !ok { + t.Fatalf("first result is type %T, expected error", vs[0]) + } + + if _, ok := vs[1].([]byte); !ok { + t.Fatalf("second result is type %T, expected []byte", vs[2]) + } + + // Execute commands that fail after EXEC is called. The second command + // returns an error. + + c.Do("ZADD", "k2", 0, 0) + c.Send("MULTI") + c.Send("ZINCRBY", "k2", 0, 0) + c.Send("HSET", "k2", 0, 0) + v, err = c.Do("EXEC") + if err != nil { + t.Fatalf("EXEC returned error %v", err) + } + + vs, err = redis.Values(v, nil) + if err != nil { + t.Fatalf("Values(v) returned error %v", err) + } + + if len(vs) != 2 { + t.Fatalf("len(vs) == %d, want 2", len(vs)) + } + + if _, ok := vs[0].([]byte); !ok { + t.Fatalf("first result is type %T, expected []byte", vs[0]) + } + + if _, ok := vs[1].(error); !ok { + t.Fatalf("second result is type %T, expected error", vs[2]) + } +} + +func BenchmarkDoEmpty(b *testing.B) { + b.StopTimer() + c, err := redistest.Dial() + if err != nil { + b.Fatal(err) + } + defer c.Close() + b.StartTimer() + for i := 0; i < b.N; i++ { + if _, err := c.Do(""); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkDoPing(b *testing.B) { + b.StopTimer() + c, err := redistest.Dial() + if err != nil { + b.Fatal(err) + } + defer c.Close() + b.StartTimer() + for i := 0; i < b.N; i++ { + if _, err := c.Do("PING"); err != nil { + b.Fatal(err) + } + } +} diff --git a/redis/doc.go b/redis/doc.go new file mode 100644 index 0000000..a5cd454 --- /dev/null +++ b/redis/doc.go @@ -0,0 +1,169 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +// Package redis is a client for the Redis database. +// +// The Redigo FAQ (https://github.com/garyburd/redigo/wiki/FAQ) contains more +// documentation about this package. +// +// Connections +// +// The Conn interface is the primary interface for working with Redis. +// Applications create connections by calling the Dial, DialWithTimeout or +// NewConn functions. In the future, functions will be added for creating +// sharded and other types of connections. +// +// The application must call the connection Close method when the application +// is done with the connection. +// +// Executing Commands +// +// The Conn interface has a generic method for executing Redis commands: +// +// Do(commandName string, args ...interface{}) (reply interface{}, err error) +// +// The Redis command reference (http://redis.io/commands) lists the available +// commands. An example of using the Redis APPEND command is: +// +// n, err := conn.Do("APPEND", "key", "value") +// +// The Do method converts command arguments to binary strings for transmission +// to the server as follows: +// +// Go Type Conversion +// []byte Sent as is +// string Sent as is +// int, int64 strconv.FormatInt(v) +// float64 strconv.FormatFloat(v, 'g', -1, 64) +// bool true -> "1", false -> "0" +// nil "" +// all other types fmt.Print(v) +// +// Redis command reply types are represented using the following Go types: +// +// Redis type Go type +// error redis.Error +// integer int64 +// simple string string +// bulk string []byte or nil if value not present. +// array []interface{} or nil if value not present. +// +// Use type assertions or the reply helper functions to convert from +// interface{} to the specific Go type for the command result. +// +// Pipelining +// +// Connections support pipelining using the Send, Flush and Receive methods. +// +// Send(commandName string, args ...interface{}) error +// Flush() error +// Receive() (reply interface{}, err error) +// +// Send writes the command to the connection's output buffer. Flush flushes the +// connection's output buffer to the server. Receive reads a single reply from +// the server. The following example shows a simple pipeline. +// +// c.Send("SET", "foo", "bar") +// c.Send("GET", "foo") +// c.Flush() +// c.Receive() // reply from SET +// v, err = c.Receive() // reply from GET +// +// The Do method combines the functionality of the Send, Flush and Receive +// methods. The Do method starts by writing the command and flushing the output +// buffer. Next, the Do method receives all pending replies including the reply +// for the command just sent by Do. If any of the received replies is an error, +// then Do returns the error. If there are no errors, then Do returns the last +// reply. If the command argument to the Do method is "", then the Do method +// will flush the output buffer and receive pending replies without sending a +// command. +// +// Use the Send and Do methods to implement pipelined transactions. +// +// c.Send("MULTI") +// c.Send("INCR", "foo") +// c.Send("INCR", "bar") +// r, err := c.Do("EXEC") +// fmt.Println(r) // prints [1, 1] +// +// Concurrency +// +// Connections do not support concurrent calls to the write methods (Send, +// Flush) or concurrent calls to the read method (Receive). Connections do +// allow a concurrent reader and writer. +// +// Because the Do method combines the functionality of Send, Flush and Receive, +// the Do method cannot be called concurrently with the other methods. +// +// For full concurrent access to Redis, use the thread-safe Pool to get and +// release connections from within a goroutine. +// +// Publish and Subscribe +// +// Use the Send, Flush and Receive methods to implement Pub/Sub subscribers. +// +// c.Send("SUBSCRIBE", "example") +// c.Flush() +// for { +// reply, err := c.Receive() +// if err != nil { +// return err +// } +// // process pushed message +// } +// +// The PubSubConn type wraps a Conn with convenience methods for implementing +// subscribers. The Subscribe, PSubscribe, Unsubscribe and PUnsubscribe methods +// send and flush a subscription management command. The receive method +// converts a pushed message to convenient types for use in a type switch. +// +// psc := redis.PubSubConn{c} +// psc.Subscribe("example") +// for { +// switch v := psc.Receive().(type) { +// case redis.Message: +// fmt.Printf("%s: message: %s\n", v.Channel, v.Data) +// case redis.Subscription: +// fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count) +// case error: +// return v +// } +// } +// +// Reply Helpers +// +// The Bool, Int, Bytes, String, Strings and Values functions convert a reply +// to a value of a specific type. To allow convenient wrapping of calls to the +// connection Do and Receive methods, the functions take a second argument of +// type error. If the error is non-nil, then the helper function returns the +// error. If the error is nil, the function converts the reply to the specified +// type: +// +// exists, err := redis.Bool(c.Do("EXISTS", "foo")) +// if err != nil { +// // handle error return from c.Do or type conversion error. +// } +// +// The Scan function converts elements of a array reply to Go types: +// +// var value1 int +// var value2 string +// reply, err := redis.Values(c.Do("MGET", "key1", "key2")) +// if err != nil { +// // handle error +// } +// if _, err := redis.Scan(reply, &value1, &value2); err != nil { +// // handle error +// } +package redis // import "github.com/garyburd/redigo/redis" diff --git a/redis/log.go b/redis/log.go new file mode 100644 index 0000000..129b86d --- /dev/null +++ b/redis/log.go @@ -0,0 +1,117 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "bytes" + "fmt" + "log" +) + +// NewLoggingConn returns a logging wrapper around a connection. +func NewLoggingConn(conn Conn, logger *log.Logger, prefix string) Conn { + if prefix != "" { + prefix = prefix + "." + } + return &loggingConn{conn, logger, prefix} +} + +type loggingConn struct { + Conn + logger *log.Logger + prefix string +} + +func (c *loggingConn) Close() error { + err := c.Conn.Close() + var buf bytes.Buffer + fmt.Fprintf(&buf, "%sClose() -> (%v)", c.prefix, err) + c.logger.Output(2, buf.String()) + return err +} + +func (c *loggingConn) printValue(buf *bytes.Buffer, v interface{}) { + const chop = 32 + switch v := v.(type) { + case []byte: + if len(v) > chop { + fmt.Fprintf(buf, "%q...", v[:chop]) + } else { + fmt.Fprintf(buf, "%q", v) + } + case string: + if len(v) > chop { + fmt.Fprintf(buf, "%q...", v[:chop]) + } else { + fmt.Fprintf(buf, "%q", v) + } + case []interface{}: + if len(v) == 0 { + buf.WriteString("[]") + } else { + sep := "[" + fin := "]" + if len(v) > chop { + v = v[:chop] + fin = "...]" + } + for _, vv := range v { + buf.WriteString(sep) + c.printValue(buf, vv) + sep = ", " + } + buf.WriteString(fin) + } + default: + fmt.Fprint(buf, v) + } +} + +func (c *loggingConn) print(method, commandName string, args []interface{}, reply interface{}, err error) { + var buf bytes.Buffer + fmt.Fprintf(&buf, "%s%s(", c.prefix, method) + if method != "Receive" { + buf.WriteString(commandName) + for _, arg := range args { + buf.WriteString(", ") + c.printValue(&buf, arg) + } + } + buf.WriteString(") -> (") + if method != "Send" { + c.printValue(&buf, reply) + buf.WriteString(", ") + } + fmt.Fprintf(&buf, "%v)", err) + c.logger.Output(3, buf.String()) +} + +func (c *loggingConn) Do(commandName string, args ...interface{}) (interface{}, error) { + reply, err := c.Conn.Do(commandName, args...) + c.print("Do", commandName, args, reply, err) + return reply, err +} + +func (c *loggingConn) Send(commandName string, args ...interface{}) error { + err := c.Conn.Send(commandName, args...) + c.print("Send", commandName, args, nil, err) + return err +} + +func (c *loggingConn) Receive() (interface{}, error) { + reply, err := c.Conn.Receive() + c.print("Receive", "", nil, reply, err) + return reply, err +} diff --git a/redis/pool.go b/redis/pool.go new file mode 100644 index 0000000..f4293d7 --- /dev/null +++ b/redis/pool.go @@ -0,0 +1,397 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "bytes" + "container/list" + "crypto/rand" + "crypto/sha1" + "errors" + "io" + "strconv" + "sync" + "time" + + "github.com/garyburd/redigo/internal" +) + +var nowFunc = time.Now // for testing + +// ErrPoolExhausted is returned from a pool connection method (Do, Send, +// Receive, Flush, Err) when the maximum number of database connections in the +// pool has been reached. +var ErrPoolExhausted = errors.New("redigo: connection pool exhausted") + +var ( + errPoolClosed = errors.New("redigo: connection pool closed") + errConnClosed = errors.New("redigo: connection closed") +) + +// Pool maintains a pool of connections. The application calls the Get method +// to get a connection from the pool and the connection's Close method to +// return the connection's resources to the pool. +// +// The following example shows how to use a pool in a web application. The +// application creates a pool at application startup and makes it available to +// request handlers using a global variable. +// +// func newPool(server, password string) *redis.Pool { +// return &redis.Pool{ +// MaxIdle: 3, +// IdleTimeout: 240 * time.Second, +// Dial: func () (redis.Conn, error) { +// c, err := redis.Dial("tcp", server) +// if err != nil { +// return nil, err +// } +// if _, err := c.Do("AUTH", password); err != nil { +// c.Close() +// return nil, err +// } +// return c, err +// }, +// TestOnBorrow: func(c redis.Conn, t time.Time) error { +// _, err := c.Do("PING") +// return err +// }, +// } +// } +// +// var ( +// pool *redis.Pool +// redisServer = flag.String("redisServer", ":6379", "") +// redisPassword = flag.String("redisPassword", "", "") +// ) +// +// func main() { +// flag.Parse() +// pool = newPool(*redisServer, *redisPassword) +// ... +// } +// +// A request handler gets a connection from the pool and closes the connection +// when the handler is done: +// +// func serveHome(w http.ResponseWriter, r *http.Request) { +// conn := pool.Get() +// defer conn.Close() +// .... +// } +// +type Pool struct { + + // Dial is an application supplied function for creating and configuring a + // connection + Dial func() (Conn, error) + + // TestOnBorrow is an optional application supplied function for checking + // the health of an idle connection before the connection is used again by + // the application. Argument t is the time that the connection was returned + // to the pool. If the function returns an error, then the connection is + // closed. + TestOnBorrow func(c Conn, t time.Time) error + + // Maximum number of idle connections in the pool. + MaxIdle int + + // Maximum number of connections allocated by the pool at a given time. + // When zero, there is no limit on the number of connections in the pool. + MaxActive int + + // Close connections after remaining idle for this duration. If the value + // is zero, then idle connections are not closed. Applications should set + // the timeout to a value less than the server's timeout. + IdleTimeout time.Duration + + // If Wait is true and the pool is at the MaxIdle limit, then Get() waits + // for a connection to be returned to the pool before returning. + Wait bool + + // mu protects fields defined below. + mu sync.Mutex + cond *sync.Cond + closed bool + active int + + // Stack of idleConn with most recently used at the front. + idle list.List +} + +type idleConn struct { + c Conn + t time.Time +} + +// NewPool creates a new pool. This function is deprecated. Applications should +// initialize the Pool fields directly as shown in example. +func NewPool(newFn func() (Conn, error), maxIdle int) *Pool { + return &Pool{Dial: newFn, MaxIdle: maxIdle} +} + +// Get gets a connection. The application must close the returned connection. +// This method always returns a valid connection so that applications can defer +// error handling to the first use of the connection. If there is an error +// getting an underlying connection, then the connection Err, Do, Send, Flush +// and Receive methods return that error. +func (p *Pool) Get() Conn { + c, err := p.get() + if err != nil { + return errorConnection{err} + } + return &pooledConnection{p: p, c: c} +} + +// ActiveCount returns the number of active connections in the pool. +func (p *Pool) ActiveCount() int { + p.mu.Lock() + active := p.active + p.mu.Unlock() + return active +} + +// IdleCount returns the number of idle connections in the pool. +func (p *Pool) IdleCount() int { + p.mu.Lock() + idle := p.idle.Len() + p.mu.Unlock() + return idle +} + +// Close releases the resources used by the pool. +func (p *Pool) Close() error { + p.mu.Lock() + idle := p.idle + p.idle.Init() + p.closed = true + p.active -= idle.Len() + if p.cond != nil { + p.cond.Broadcast() + } + p.mu.Unlock() + for e := idle.Front(); e != nil; e = e.Next() { + e.Value.(idleConn).c.Close() + } + return nil +} + +// release decrements the active count and signals waiters. The caller must +// hold p.mu during the call. +func (p *Pool) release() { + p.active -= 1 + if p.cond != nil { + p.cond.Signal() + } +} + +// get prunes stale connections and returns a connection from the idle list or +// creates a new connection. +func (p *Pool) get() (Conn, error) { + p.mu.Lock() + + // Prune stale connections. + + if timeout := p.IdleTimeout; timeout > 0 { + for i, n := 0, p.idle.Len(); i < n; i++ { + e := p.idle.Back() + if e == nil { + break + } + ic := e.Value.(idleConn) + if ic.t.Add(timeout).After(nowFunc()) { + break + } + p.idle.Remove(e) + p.release() + p.mu.Unlock() + ic.c.Close() + p.mu.Lock() + } + } + + for { + + // Get idle connection. + + for i, n := 0, p.idle.Len(); i < n; i++ { + e := p.idle.Front() + if e == nil { + break + } + ic := e.Value.(idleConn) + p.idle.Remove(e) + test := p.TestOnBorrow + p.mu.Unlock() + if test == nil || test(ic.c, ic.t) == nil { + return ic.c, nil + } + ic.c.Close() + p.mu.Lock() + p.release() + } + + // Check for pool closed before dialing a new connection. + + if p.closed { + p.mu.Unlock() + return nil, errors.New("redigo: get on closed pool") + } + + // Dial new connection if under limit. + + if p.MaxActive == 0 || p.active < p.MaxActive { + dial := p.Dial + p.active += 1 + p.mu.Unlock() + c, err := dial() + if err != nil { + p.mu.Lock() + p.release() + p.mu.Unlock() + c = nil + } + return c, err + } + + if !p.Wait { + p.mu.Unlock() + return nil, ErrPoolExhausted + } + + if p.cond == nil { + p.cond = sync.NewCond(&p.mu) + } + p.cond.Wait() + } +} + +func (p *Pool) put(c Conn, forceClose bool) error { + err := c.Err() + p.mu.Lock() + if !p.closed && err == nil && !forceClose { + p.idle.PushFront(idleConn{t: nowFunc(), c: c}) + if p.idle.Len() > p.MaxIdle { + c = p.idle.Remove(p.idle.Back()).(idleConn).c + } else { + c = nil + } + } + + if c == nil { + if p.cond != nil { + p.cond.Signal() + } + p.mu.Unlock() + return nil + } + + p.release() + p.mu.Unlock() + return c.Close() +} + +type pooledConnection struct { + p *Pool + c Conn + state int +} + +var ( + sentinel []byte + sentinelOnce sync.Once +) + +func initSentinel() { + p := make([]byte, 64) + if _, err := rand.Read(p); err == nil { + sentinel = p + } else { + h := sha1.New() + io.WriteString(h, "Oops, rand failed. Use time instead.") + io.WriteString(h, strconv.FormatInt(time.Now().UnixNano(), 10)) + sentinel = h.Sum(nil) + } +} + +func (pc *pooledConnection) Close() error { + c := pc.c + if _, ok := c.(errorConnection); ok { + return nil + } + pc.c = errorConnection{errConnClosed} + + if pc.state&internal.MultiState != 0 { + c.Send("DISCARD") + pc.state &^= (internal.MultiState | internal.WatchState) + } else if pc.state&internal.WatchState != 0 { + c.Send("UNWATCH") + pc.state &^= internal.WatchState + } + if pc.state&internal.SubscribeState != 0 { + c.Send("UNSUBSCRIBE") + c.Send("PUNSUBSCRIBE") + // To detect the end of the message stream, ask the server to echo + // a sentinel value and read until we see that value. + sentinelOnce.Do(initSentinel) + c.Send("ECHO", sentinel) + c.Flush() + for { + p, err := c.Receive() + if err != nil { + break + } + if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) { + pc.state &^= internal.SubscribeState + break + } + } + } + c.Do("") + pc.p.put(c, pc.state != 0) + return nil +} + +func (pc *pooledConnection) Err() error { + return pc.c.Err() +} + +func (pc *pooledConnection) Do(commandName string, args ...interface{}) (reply interface{}, err error) { + ci := internal.LookupCommandInfo(commandName) + pc.state = (pc.state | ci.Set) &^ ci.Clear + return pc.c.Do(commandName, args...) +} + +func (pc *pooledConnection) Send(commandName string, args ...interface{}) error { + ci := internal.LookupCommandInfo(commandName) + pc.state = (pc.state | ci.Set) &^ ci.Clear + return pc.c.Send(commandName, args...) +} + +func (pc *pooledConnection) Flush() error { + return pc.c.Flush() +} + +func (pc *pooledConnection) Receive() (reply interface{}, err error) { + return pc.c.Receive() +} + +type errorConnection struct{ err error } + +func (ec errorConnection) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err } +func (ec errorConnection) Send(string, ...interface{}) error { return ec.err } +func (ec errorConnection) Err() error { return ec.err } +func (ec errorConnection) Close() error { return ec.err } +func (ec errorConnection) Flush() error { return ec.err } +func (ec errorConnection) Receive() (interface{}, error) { return nil, ec.err } diff --git a/redis/pool_test.go b/redis/pool_test.go new file mode 100644 index 0000000..1fe305f --- /dev/null +++ b/redis/pool_test.go @@ -0,0 +1,674 @@ +// Copyright 2011 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis_test + +import ( + "errors" + "io" + "reflect" + "sync" + "testing" + "time" + + "github.com/garyburd/redigo/internal/redistest" + "github.com/garyburd/redigo/redis" +) + +type poolTestConn struct { + d *poolDialer + err error + redis.Conn +} + +func (c *poolTestConn) Close() error { c.d.open -= 1; return nil } +func (c *poolTestConn) Err() error { return c.err } + +func (c *poolTestConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) { + if commandName == "ERR" { + c.err = args[0].(error) + commandName = "PING" + } + if commandName != "" { + c.d.commands = append(c.d.commands, commandName) + } + return c.Conn.Do(commandName, args...) +} + +func (c *poolTestConn) Send(commandName string, args ...interface{}) error { + c.d.commands = append(c.d.commands, commandName) + return c.Conn.Send(commandName, args...) +} + +type poolDialer struct { + t *testing.T + dialed int + open int + commands []string + dialErr error +} + +func (d *poolDialer) dial() (redis.Conn, error) { + d.dialed += 1 + if d.dialErr != nil { + return nil, d.dialErr + } + c, err := redistest.Dial() + if err != nil { + return nil, err + } + d.open += 1 + return &poolTestConn{d: d, Conn: c}, nil +} + +func (d *poolDialer) check(message string, p *redis.Pool, dialed, open int) { + if d.dialed != dialed { + d.t.Errorf("%s: dialed=%d, want %d", message, d.dialed, dialed) + } + if d.open != open { + d.t.Errorf("%s: open=%d, want %d", message, d.open, open) + } + if active := p.ActiveCount(); active != open { + d.t.Errorf("%s: active=%d, want %d", message, active, open) + } +} + +func TestPoolReuse(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + Dial: d.dial, + } + + for i := 0; i < 10; i++ { + c1 := p.Get() + c1.Do("PING") + c2 := p.Get() + c2.Do("PING") + c1.Close() + c2.Close() + } + + d.check("before close", p, 2, 2) + p.Close() + d.check("after close", p, 2, 0) +} + +func TestPoolMaxIdle(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + Dial: d.dial, + } + for i := 0; i < 10; i++ { + c1 := p.Get() + c1.Do("PING") + c2 := p.Get() + c2.Do("PING") + c3 := p.Get() + c3.Do("PING") + c1.Close() + c2.Close() + c3.Close() + } + d.check("before close", p, 12, 2) + p.Close() + d.check("after close", p, 12, 0) +} + +func TestPoolError(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + Dial: d.dial, + } + + c := p.Get() + c.Do("ERR", io.EOF) + if c.Err() == nil { + t.Errorf("expected c.Err() != nil") + } + c.Close() + + c = p.Get() + c.Do("ERR", io.EOF) + c.Close() + + d.check(".", p, 2, 0) +} + +func TestPoolClose(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + Dial: d.dial, + } + + c1 := p.Get() + c1.Do("PING") + c2 := p.Get() + c2.Do("PING") + c3 := p.Get() + c3.Do("PING") + + c1.Close() + if _, err := c1.Do("PING"); err == nil { + t.Errorf("expected error after connection closed") + } + + c2.Close() + c2.Close() + + p.Close() + + d.check("after pool close", p, 3, 1) + + if _, err := c1.Do("PING"); err == nil { + t.Errorf("expected error after connection and pool closed") + } + + c3.Close() + + d.check("after conn close", p, 3, 0) + + c1 = p.Get() + if _, err := c1.Do("PING"); err == nil { + t.Errorf("expected error after pool closed") + } +} + +func TestPoolTimeout(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + IdleTimeout: 300 * time.Second, + Dial: d.dial, + } + + now := time.Now() + redis.SetNowFunc(func() time.Time { return now }) + defer redis.SetNowFunc(time.Now) + + c := p.Get() + c.Do("PING") + c.Close() + + d.check("1", p, 1, 1) + + now = now.Add(p.IdleTimeout) + + c = p.Get() + c.Do("PING") + c.Close() + + d.check("2", p, 2, 1) + + p.Close() +} + +func TestPoolConcurrenSendReceive(t *testing.T) { + p := &redis.Pool{ + Dial: redistest.Dial, + } + c := p.Get() + done := make(chan error, 1) + go func() { + _, err := c.Receive() + done <- err + }() + c.Send("PING") + c.Flush() + err := <-done + if err != nil { + t.Fatalf("Receive() returned error %v", err) + } + _, err = c.Do("") + if err != nil { + t.Fatalf("Do() returned error %v", err) + } + c.Close() + p.Close() +} + +func TestPoolBorrowCheck(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + Dial: d.dial, + TestOnBorrow: func(redis.Conn, time.Time) error { return redis.Error("BLAH") }, + } + + for i := 0; i < 10; i++ { + c := p.Get() + c.Do("PING") + c.Close() + } + d.check("1", p, 10, 1) + p.Close() +} + +func TestPoolMaxActive(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + MaxActive: 2, + Dial: d.dial, + } + c1 := p.Get() + c1.Do("PING") + c2 := p.Get() + c2.Do("PING") + + d.check("1", p, 2, 2) + + c3 := p.Get() + if _, err := c3.Do("PING"); err != redis.ErrPoolExhausted { + t.Errorf("expected pool exhausted") + } + + c3.Close() + d.check("2", p, 2, 2) + c2.Close() + d.check("3", p, 2, 2) + + c3 = p.Get() + if _, err := c3.Do("PING"); err != nil { + t.Errorf("expected good channel, err=%v", err) + } + c3.Close() + + d.check("4", p, 2, 2) + p.Close() +} + +func TestPoolMonitorCleanup(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + MaxActive: 2, + Dial: d.dial, + } + c := p.Get() + c.Send("MONITOR") + c.Close() + + d.check("", p, 1, 0) + p.Close() +} + +func TestPoolPubSubCleanup(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + MaxActive: 2, + Dial: d.dial, + } + + c := p.Get() + c.Send("SUBSCRIBE", "x") + c.Close() + + want := []string{"SUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE", "ECHO"} + if !reflect.DeepEqual(d.commands, want) { + t.Errorf("got commands %v, want %v", d.commands, want) + } + d.commands = nil + + c = p.Get() + c.Send("PSUBSCRIBE", "x*") + c.Close() + + want = []string{"PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE", "ECHO"} + if !reflect.DeepEqual(d.commands, want) { + t.Errorf("got commands %v, want %v", d.commands, want) + } + d.commands = nil + + p.Close() +} + +func TestPoolTransactionCleanup(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + MaxActive: 2, + Dial: d.dial, + } + + c := p.Get() + c.Do("WATCH", "key") + c.Do("PING") + c.Close() + + want := []string{"WATCH", "PING", "UNWATCH"} + if !reflect.DeepEqual(d.commands, want) { + t.Errorf("got commands %v, want %v", d.commands, want) + } + d.commands = nil + + c = p.Get() + c.Do("WATCH", "key") + c.Do("UNWATCH") + c.Do("PING") + c.Close() + + want = []string{"WATCH", "UNWATCH", "PING"} + if !reflect.DeepEqual(d.commands, want) { + t.Errorf("got commands %v, want %v", d.commands, want) + } + d.commands = nil + + c = p.Get() + c.Do("WATCH", "key") + c.Do("MULTI") + c.Do("PING") + c.Close() + + want = []string{"WATCH", "MULTI", "PING", "DISCARD"} + if !reflect.DeepEqual(d.commands, want) { + t.Errorf("got commands %v, want %v", d.commands, want) + } + d.commands = nil + + c = p.Get() + c.Do("WATCH", "key") + c.Do("MULTI") + c.Do("DISCARD") + c.Do("PING") + c.Close() + + want = []string{"WATCH", "MULTI", "DISCARD", "PING"} + if !reflect.DeepEqual(d.commands, want) { + t.Errorf("got commands %v, want %v", d.commands, want) + } + d.commands = nil + + c = p.Get() + c.Do("WATCH", "key") + c.Do("MULTI") + c.Do("EXEC") + c.Do("PING") + c.Close() + + want = []string{"WATCH", "MULTI", "EXEC", "PING"} + if !reflect.DeepEqual(d.commands, want) { + t.Errorf("got commands %v, want %v", d.commands, want) + } + d.commands = nil + + p.Close() +} + +func startGoroutines(p *redis.Pool, cmd string, args ...interface{}) chan error { + errs := make(chan error, 10) + for i := 0; i < cap(errs); i++ { + go func() { + c := p.Get() + _, err := c.Do(cmd, args...) + errs <- err + c.Close() + }() + } + + // Wait for goroutines to block. + time.Sleep(time.Second / 4) + + return errs +} + +func TestWaitPool(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 1, + MaxActive: 1, + Dial: d.dial, + Wait: true, + } + defer p.Close() + c := p.Get() + errs := startGoroutines(p, "PING") + d.check("before close", p, 1, 1) + c.Close() + timeout := time.After(2 * time.Second) + for i := 0; i < cap(errs); i++ { + select { + case err := <-errs: + if err != nil { + t.Fatal(err) + } + case <-timeout: + t.Fatalf("timeout waiting for blocked goroutine %d", i) + } + } + d.check("done", p, 1, 1) +} + +func TestWaitPoolClose(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 1, + MaxActive: 1, + Dial: d.dial, + Wait: true, + } + c := p.Get() + if _, err := c.Do("PING"); err != nil { + t.Fatal(err) + } + errs := startGoroutines(p, "PING") + d.check("before close", p, 1, 1) + p.Close() + timeout := time.After(2 * time.Second) + for i := 0; i < cap(errs); i++ { + select { + case err := <-errs: + switch err { + case nil: + t.Fatal("blocked goroutine did not get error") + case redis.ErrPoolExhausted: + t.Fatal("blocked goroutine got pool exhausted error") + } + case <-timeout: + t.Fatal("timeout waiting for blocked goroutine") + } + } + c.Close() + d.check("done", p, 1, 0) +} + +func TestWaitPoolCommandError(t *testing.T) { + testErr := errors.New("test") + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 1, + MaxActive: 1, + Dial: d.dial, + Wait: true, + } + defer p.Close() + c := p.Get() + errs := startGoroutines(p, "ERR", testErr) + d.check("before close", p, 1, 1) + c.Close() + timeout := time.After(2 * time.Second) + for i := 0; i < cap(errs); i++ { + select { + case err := <-errs: + if err != nil { + t.Fatal(err) + } + case <-timeout: + t.Fatalf("timeout waiting for blocked goroutine %d", i) + } + } + d.check("done", p, cap(errs), 0) +} + +func TestWaitPoolDialError(t *testing.T) { + testErr := errors.New("test") + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 1, + MaxActive: 1, + Dial: d.dial, + Wait: true, + } + defer p.Close() + c := p.Get() + errs := startGoroutines(p, "ERR", testErr) + d.check("before close", p, 1, 1) + + d.dialErr = errors.New("dial") + c.Close() + + nilCount := 0 + errCount := 0 + timeout := time.After(2 * time.Second) + for i := 0; i < cap(errs); i++ { + select { + case err := <-errs: + switch err { + case nil: + nilCount++ + case d.dialErr: + errCount++ + default: + t.Fatalf("expected dial error or nil, got %v", err) + } + case <-timeout: + t.Fatalf("timeout waiting for blocked goroutine %d", i) + } + } + if nilCount != 1 { + t.Errorf("expected one nil error, got %d", nilCount) + } + if errCount != cap(errs)-1 { + t.Errorf("expected %d dial erors, got %d", cap(errs)-1, errCount) + } + d.check("done", p, cap(errs), 0) +} + +// Borrowing requires us to iterate over the idle connections, unlock the pool, +// and perform a blocking operation to check the connection still works. If +// TestOnBorrow fails, we must reacquire the lock and continue iteration. This +// test ensures that iteration will work correctly if multiple threads are +// iterating simultaneously. +func TestLocking_TestOnBorrowFails_PoolDoesntCrash(t *testing.T) { + count := 100 + + // First we'll Create a pool where the pilfering of idle connections fails. + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: count, + MaxActive: count, + Dial: d.dial, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + return errors.New("No way back into the real world.") + }, + } + defer p.Close() + + // Fill the pool with idle connections. + b1 := sync.WaitGroup{} + b1.Add(count) + b2 := sync.WaitGroup{} + b2.Add(count) + for i := 0; i < count; i++ { + go func() { + c := p.Get() + if c.Err() != nil { + t.Errorf("pool get failed: %v", c.Err()) + } + b1.Done() + b1.Wait() + c.Close() + b2.Done() + }() + } + b2.Wait() + if d.dialed != count { + t.Errorf("Expected %d dials, got %d", count, d.dialed) + } + + // Spawn a bunch of goroutines to thrash the pool. + b2.Add(count) + for i := 0; i < count; i++ { + go func() { + c := p.Get() + if c.Err() != nil { + t.Errorf("pool get failed: %v", c.Err()) + } + c.Close() + b2.Done() + }() + } + b2.Wait() + if d.dialed != count*2 { + t.Errorf("Expected %d dials, got %d", count*2, d.dialed) + } +} + +func BenchmarkPoolGet(b *testing.B) { + b.StopTimer() + p := redis.Pool{Dial: redistest.Dial, MaxIdle: 2} + c := p.Get() + if err := c.Err(); err != nil { + b.Fatal(err) + } + c.Close() + defer p.Close() + b.StartTimer() + for i := 0; i < b.N; i++ { + c = p.Get() + c.Close() + } +} + +func BenchmarkPoolGetErr(b *testing.B) { + b.StopTimer() + p := redis.Pool{Dial: redistest.Dial, MaxIdle: 2} + c := p.Get() + if err := c.Err(); err != nil { + b.Fatal(err) + } + c.Close() + defer p.Close() + b.StartTimer() + for i := 0; i < b.N; i++ { + c = p.Get() + if err := c.Err(); err != nil { + b.Fatal(err) + } + c.Close() + } +} + +func BenchmarkPoolGetPing(b *testing.B) { + b.StopTimer() + p := redis.Pool{Dial: redistest.Dial, MaxIdle: 2} + c := p.Get() + if err := c.Err(); err != nil { + b.Fatal(err) + } + c.Close() + defer p.Close() + b.StartTimer() + for i := 0; i < b.N; i++ { + c = p.Get() + if _, err := c.Do("PING"); err != nil { + b.Fatal(err) + } + c.Close() + } +} diff --git a/redis/pubsub.go b/redis/pubsub.go new file mode 100644 index 0000000..f079042 --- /dev/null +++ b/redis/pubsub.go @@ -0,0 +1,129 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "errors" +) + +// Subscription represents a subscribe or unsubscribe notification. +type Subscription struct { + + // Kind is "subscribe", "unsubscribe", "psubscribe" or "punsubscribe" + Kind string + + // The channel that was changed. + Channel string + + // The current number of subscriptions for connection. + Count int +} + +// Message represents a message notification. +type Message struct { + + // The originating channel. + Channel string + + // The message data. + Data []byte +} + +// PMessage represents a pmessage notification. +type PMessage struct { + + // The matched pattern. + Pattern string + + // The originating channel. + Channel string + + // The message data. + Data []byte +} + +// PubSubConn wraps a Conn with convenience methods for subscribers. +type PubSubConn struct { + Conn Conn +} + +// Close closes the connection. +func (c PubSubConn) Close() error { + return c.Conn.Close() +} + +// Subscribe subscribes the connection to the specified channels. +func (c PubSubConn) Subscribe(channel ...interface{}) error { + c.Conn.Send("SUBSCRIBE", channel...) + return c.Conn.Flush() +} + +// PSubscribe subscribes the connection to the given patterns. +func (c PubSubConn) PSubscribe(channel ...interface{}) error { + c.Conn.Send("PSUBSCRIBE", channel...) + return c.Conn.Flush() +} + +// Unsubscribe unsubscribes the connection from the given channels, or from all +// of them if none is given. +func (c PubSubConn) Unsubscribe(channel ...interface{}) error { + c.Conn.Send("UNSUBSCRIBE", channel...) + return c.Conn.Flush() +} + +// PUnsubscribe unsubscribes the connection from the given patterns, or from all +// of them if none is given. +func (c PubSubConn) PUnsubscribe(channel ...interface{}) error { + c.Conn.Send("PUNSUBSCRIBE", channel...) + return c.Conn.Flush() +} + +// Receive returns a pushed message as a Subscription, Message, PMessage or +// error. The return value is intended to be used directly in a type switch as +// illustrated in the PubSubConn example. +func (c PubSubConn) Receive() interface{} { + reply, err := Values(c.Conn.Receive()) + if err != nil { + return err + } + + var kind string + reply, err = Scan(reply, &kind) + if err != nil { + return err + } + + switch kind { + case "message": + var m Message + if _, err := Scan(reply, &m.Channel, &m.Data); err != nil { + return err + } + return m + case "pmessage": + var pm PMessage + if _, err := Scan(reply, &pm.Pattern, &pm.Channel, &pm.Data); err != nil { + return err + } + return pm + case "subscribe", "psubscribe", "unsubscribe", "punsubscribe": + s := Subscription{Kind: kind} + if _, err := Scan(reply, &s.Channel, &s.Count); err != nil { + return err + } + return s + } + return errors.New("redigo: unknown pubsub notification") +} diff --git a/redis/pubsub_test.go b/redis/pubsub_test.go new file mode 100644 index 0000000..707f5a4 --- /dev/null +++ b/redis/pubsub_test.go @@ -0,0 +1,143 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis_test + +import ( + "fmt" + "net" + "reflect" + "sync" + "testing" + "time" + + "github.com/garyburd/redigo/internal/redistest" + "github.com/garyburd/redigo/redis" +) + +func publish(channel, value interface{}) { + c, err := dial() + if err != nil { + panic(err) + } + defer c.Close() + c.Do("PUBLISH", channel, value) +} + +// Applications can receive pushed messages from one goroutine and manage subscriptions from another goroutine. +func ExamplePubSubConn() { + c, err := dial() + if err != nil { + panic(err) + } + defer c.Close() + var wg sync.WaitGroup + wg.Add(2) + + psc := redis.PubSubConn{Conn: c} + + // This goroutine receives and prints pushed notifications from the server. + // The goroutine exits when the connection is unsubscribed from all + // channels or there is an error. + go func() { + defer wg.Done() + for { + switch n := psc.Receive().(type) { + case redis.Message: + fmt.Printf("Message: %s %s\n", n.Channel, n.Data) + case redis.PMessage: + fmt.Printf("PMessage: %s %s %s\n", n.Pattern, n.Channel, n.Data) + case redis.Subscription: + fmt.Printf("Subscription: %s %s %d\n", n.Kind, n.Channel, n.Count) + if n.Count == 0 { + return + } + case error: + fmt.Printf("error: %v\n", n) + return + } + } + }() + + // This goroutine manages subscriptions for the connection. + go func() { + defer wg.Done() + + psc.Subscribe("example") + psc.PSubscribe("p*") + + // The following function calls publish a message using another + // connection to the Redis server. + publish("example", "hello") + publish("example", "world") + publish("pexample", "foo") + publish("pexample", "bar") + + // Unsubscribe from all connections. This will cause the receiving + // goroutine to exit. + psc.Unsubscribe() + psc.PUnsubscribe() + }() + + wg.Wait() + + // Output: + // Subscription: subscribe example 1 + // Subscription: psubscribe p* 2 + // Message: example hello + // Message: example world + // PMessage: p* pexample foo + // PMessage: p* pexample bar + // Subscription: unsubscribe example 1 + // Subscription: punsubscribe p* 0 +} + +func expectPushed(t *testing.T, c redis.PubSubConn, message string, expected interface{}) { + actual := c.Receive() + if !reflect.DeepEqual(actual, expected) { + t.Errorf("%s = %v, want %v", message, actual, expected) + } +} + +func TestPushed(t *testing.T) { + pc, err := redistest.Dial() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer pc.Close() + + nc, err := net.Dial("tcp", ":6379") + if err != nil { + t.Fatal(err) + } + defer nc.Close() + nc.SetReadDeadline(time.Now().Add(4 * time.Second)) + + c := redis.PubSubConn{Conn: redis.NewConn(nc, 0, 0)} + + c.Subscribe("c1") + expectPushed(t, c, "Subscribe(c1)", redis.Subscription{Kind: "subscribe", Channel: "c1", Count: 1}) + c.Subscribe("c2") + expectPushed(t, c, "Subscribe(c2)", redis.Subscription{Kind: "subscribe", Channel: "c2", Count: 2}) + c.PSubscribe("p1") + expectPushed(t, c, "PSubscribe(p1)", redis.Subscription{Kind: "psubscribe", Channel: "p1", Count: 3}) + c.PSubscribe("p2") + expectPushed(t, c, "PSubscribe(p2)", redis.Subscription{Kind: "psubscribe", Channel: "p2", Count: 4}) + c.PUnsubscribe() + expectPushed(t, c, "Punsubscribe(p1)", redis.Subscription{Kind: "punsubscribe", Channel: "p1", Count: 3}) + expectPushed(t, c, "Punsubscribe()", redis.Subscription{Kind: "punsubscribe", Channel: "p2", Count: 2}) + + pc.Do("PUBLISH", "c1", "hello") + expectPushed(t, c, "PUBLISH c1 hello", redis.Message{Channel: "c1", Data: []byte("hello")}) +} diff --git a/redis/redis.go b/redis/redis.go new file mode 100644 index 0000000..c90a48e --- /dev/null +++ b/redis/redis.go @@ -0,0 +1,44 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis + +// Error represents an error returned in a command reply. +type Error string + +func (err Error) Error() string { return string(err) } + +// Conn represents a connection to a Redis server. +type Conn interface { + // Close closes the connection. + Close() error + + // Err returns a non-nil value if the connection is broken. The returned + // value is either the first non-nil value returned from the underlying + // network connection or a protocol parsing error. Applications should + // close broken connections. + Err() error + + // Do sends a command to the server and returns the received reply. + Do(commandName string, args ...interface{}) (reply interface{}, err error) + + // Send writes the command to the client's output buffer. + Send(commandName string, args ...interface{}) error + + // Flush flushes the output buffer to the Redis server. + Flush() error + + // Receive receives a single reply from the Redis server + Receive() (reply interface{}, err error) +} diff --git a/redis/reply.go b/redis/reply.go new file mode 100644 index 0000000..5648f93 --- /dev/null +++ b/redis/reply.go @@ -0,0 +1,312 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "errors" + "fmt" + "strconv" +) + +// ErrNil indicates that a reply value is nil. +var ErrNil = errors.New("redigo: nil returned") + +// Int is a helper that converts a command reply to an integer. If err is not +// equal to nil, then Int returns 0, err. Otherwise, Int converts the +// reply to an int as follows: +// +// Reply type Result +// integer int(reply), nil +// bulk string parsed reply, nil +// nil 0, ErrNil +// other 0, error +func Int(reply interface{}, err error) (int, error) { + if err != nil { + return 0, err + } + switch reply := reply.(type) { + case int64: + x := int(reply) + if int64(x) != reply { + return 0, strconv.ErrRange + } + return x, nil + case []byte: + n, err := strconv.ParseInt(string(reply), 10, 0) + return int(n), err + case nil: + return 0, ErrNil + case Error: + return 0, reply + } + return 0, fmt.Errorf("redigo: unexpected type for Int, got type %T", reply) +} + +// Int64 is a helper that converts a command reply to 64 bit integer. If err is +// not equal to nil, then Int returns 0, err. Otherwise, Int64 converts the +// reply to an int64 as follows: +// +// Reply type Result +// integer reply, nil +// bulk string parsed reply, nil +// nil 0, ErrNil +// other 0, error +func Int64(reply interface{}, err error) (int64, error) { + if err != nil { + return 0, err + } + switch reply := reply.(type) { + case int64: + return reply, nil + case []byte: + n, err := strconv.ParseInt(string(reply), 10, 64) + return n, err + case nil: + return 0, ErrNil + case Error: + return 0, reply + } + return 0, fmt.Errorf("redigo: unexpected type for Int64, got type %T", reply) +} + +var errNegativeInt = errors.New("redigo: unexpected value for Uint64") + +// Uint64 is a helper that converts a command reply to 64 bit integer. If err is +// not equal to nil, then Int returns 0, err. Otherwise, Int64 converts the +// reply to an int64 as follows: +// +// Reply type Result +// integer reply, nil +// bulk string parsed reply, nil +// nil 0, ErrNil +// other 0, error +func Uint64(reply interface{}, err error) (uint64, error) { + if err != nil { + return 0, err + } + switch reply := reply.(type) { + case int64: + if reply < 0 { + return 0, errNegativeInt + } + return uint64(reply), nil + case []byte: + n, err := strconv.ParseUint(string(reply), 10, 64) + return n, err + case nil: + return 0, ErrNil + case Error: + return 0, reply + } + return 0, fmt.Errorf("redigo: unexpected type for Uint64, got type %T", reply) +} + +// Float64 is a helper that converts a command reply to 64 bit float. If err is +// not equal to nil, then Float64 returns 0, err. Otherwise, Float64 converts +// the reply to an int as follows: +// +// Reply type Result +// bulk string parsed reply, nil +// nil 0, ErrNil +// other 0, error +func Float64(reply interface{}, err error) (float64, error) { + if err != nil { + return 0, err + } + switch reply := reply.(type) { + case []byte: + n, err := strconv.ParseFloat(string(reply), 64) + return n, err + case nil: + return 0, ErrNil + case Error: + return 0, reply + } + return 0, fmt.Errorf("redigo: unexpected type for Float64, got type %T", reply) +} + +// String is a helper that converts a command reply to a string. If err is not +// equal to nil, then String returns "", err. Otherwise String converts the +// reply to a string as follows: +// +// Reply type Result +// bulk string string(reply), nil +// simple string reply, nil +// nil "", ErrNil +// other "", error +func String(reply interface{}, err error) (string, error) { + if err != nil { + return "", err + } + switch reply := reply.(type) { + case []byte: + return string(reply), nil + case string: + return reply, nil + case nil: + return "", ErrNil + case Error: + return "", reply + } + return "", fmt.Errorf("redigo: unexpected type for String, got type %T", reply) +} + +// Bytes is a helper that converts a command reply to a slice of bytes. If err +// is not equal to nil, then Bytes returns nil, err. Otherwise Bytes converts +// the reply to a slice of bytes as follows: +// +// Reply type Result +// bulk string reply, nil +// simple string []byte(reply), nil +// nil nil, ErrNil +// other nil, error +func Bytes(reply interface{}, err error) ([]byte, error) { + if err != nil { + return nil, err + } + switch reply := reply.(type) { + case []byte: + return reply, nil + case string: + return []byte(reply), nil + case nil: + return nil, ErrNil + case Error: + return nil, reply + } + return nil, fmt.Errorf("redigo: unexpected type for Bytes, got type %T", reply) +} + +// Bool is a helper that converts a command reply to a boolean. If err is not +// equal to nil, then Bool returns false, err. Otherwise Bool converts the +// reply to boolean as follows: +// +// Reply type Result +// integer value != 0, nil +// bulk string strconv.ParseBool(reply) +// nil false, ErrNil +// other false, error +func Bool(reply interface{}, err error) (bool, error) { + if err != nil { + return false, err + } + switch reply := reply.(type) { + case int64: + return reply != 0, nil + case []byte: + return strconv.ParseBool(string(reply)) + case nil: + return false, ErrNil + case Error: + return false, reply + } + return false, fmt.Errorf("redigo: unexpected type for Bool, got type %T", reply) +} + +// MultiBulk is deprecated. Use Values. +func MultiBulk(reply interface{}, err error) ([]interface{}, error) { return Values(reply, err) } + +// Values is a helper that converts an array command reply to a []interface{}. +// If err is not equal to nil, then Values returns nil, err. Otherwise, Values +// converts the reply as follows: +// +// Reply type Result +// array reply, nil +// nil nil, ErrNil +// other nil, error +func Values(reply interface{}, err error) ([]interface{}, error) { + if err != nil { + return nil, err + } + switch reply := reply.(type) { + case []interface{}: + return reply, nil + case nil: + return nil, ErrNil + case Error: + return nil, reply + } + return nil, fmt.Errorf("redigo: unexpected type for Values, got type %T", reply) +} + +// Strings is a helper that converts an array command reply to a []string. If +// err is not equal to nil, then Strings returns nil, err. Nil array items are +// converted to "" in the output slice. Strings returns an error if an array +// item is not a bulk string or nil. +func Strings(reply interface{}, err error) ([]string, error) { + if err != nil { + return nil, err + } + switch reply := reply.(type) { + case []interface{}: + result := make([]string, len(reply)) + for i := range reply { + if reply[i] == nil { + continue + } + p, ok := reply[i].([]byte) + if !ok { + return nil, fmt.Errorf("redigo: unexpected element type for Strings, got type %T", reply[i]) + } + result[i] = string(p) + } + return result, nil + case nil: + return nil, ErrNil + case Error: + return nil, reply + } + return nil, fmt.Errorf("redigo: unexpected type for Strings, got type %T", reply) +} + +// Ints is a helper that converts an array command reply to a []int. If +// err is not equal to nil, then Ints returns nil, err. +func Ints(reply interface{}, err error) ([]int, error) { + var ints []int + if reply == nil { + return ints, ErrNil + } + values, err := Values(reply, err) + if err != nil { + return ints, err + } + if err := ScanSlice(values, &ints); err != nil { + return ints, err + } + return ints, nil +} + +// StringMap is a helper that converts an array of strings (alternating key, value) +// into a map[string]string. The HGETALL and CONFIG GET commands return replies in this format. +// Requires an even number of values in result. +func StringMap(result interface{}, err error) (map[string]string, error) { + values, err := Values(result, err) + if err != nil { + return nil, err + } + if len(values)%2 != 0 { + return nil, errors.New("redigo: StringMap expects even number of values result") + } + m := make(map[string]string, len(values)/2) + for i := 0; i < len(values); i += 2 { + key, okKey := values[i].([]byte) + value, okValue := values[i+1].([]byte) + if !okKey || !okValue { + return nil, errors.New("redigo: ScanMap key not a bulk string value") + } + m[string(key)] = string(value) + } + return m, nil +} diff --git a/redis/reply_test.go b/redis/reply_test.go new file mode 100644 index 0000000..92744c5 --- /dev/null +++ b/redis/reply_test.go @@ -0,0 +1,166 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis_test + +import ( + "fmt" + "reflect" + "testing" + + "github.com/garyburd/redigo/internal/redistest" + "github.com/garyburd/redigo/redis" +) + +type valueError struct { + v interface{} + err error +} + +func ve(v interface{}, err error) valueError { + return valueError{v, err} +} + +var replyTests = []struct { + name interface{} + actual valueError + expected valueError +}{ + { + "ints([v1, v2])", + ve(redis.Ints([]interface{}{[]byte("4"), []byte("5")}, nil)), + ve([]int{4, 5}, nil), + }, + { + "ints(nil)", + ve(redis.Ints(nil, nil)), + ve([]int(nil), redis.ErrNil), + }, + { + "strings([v1, v2])", + ve(redis.Strings([]interface{}{[]byte("v1"), []byte("v2")}, nil)), + ve([]string{"v1", "v2"}, nil), + }, + { + "strings(nil)", + ve(redis.Strings(nil, nil)), + ve([]string(nil), redis.ErrNil), + }, + { + "values([v1, v2])", + ve(redis.Values([]interface{}{[]byte("v1"), []byte("v2")}, nil)), + ve([]interface{}{[]byte("v1"), []byte("v2")}, nil), + }, + { + "values(nil)", + ve(redis.Values(nil, nil)), + ve([]interface{}(nil), redis.ErrNil), + }, + { + "float64(1.0)", + ve(redis.Float64([]byte("1.0"), nil)), + ve(float64(1.0), nil), + }, + { + "float64(nil)", + ve(redis.Float64(nil, nil)), + ve(float64(0.0), redis.ErrNil), + }, + { + "uint64(1)", + ve(redis.Uint64(int64(1), nil)), + ve(uint64(1), nil), + }, + { + "uint64(-1)", + ve(redis.Uint64(int64(-1), nil)), + ve(uint64(0), redis.ErrNegativeInt), + }, +} + +func TestReply(t *testing.T) { + for _, rt := range replyTests { + if rt.actual.err != rt.expected.err { + t.Errorf("%s returned err %v, want %v", rt.name, rt.actual.err, rt.expected.err) + continue + } + if !reflect.DeepEqual(rt.actual.v, rt.expected.v) { + t.Errorf("%s=%+v, want %+v", rt.name, rt.actual.v, rt.expected.v) + } + } +} + +// dial wraps DialTestDB() with a more suitable function name for examples. +func dial() (redis.Conn, error) { + return redistest.Dial() +} + +func ExampleBool() { + c, err := dial() + if err != nil { + panic(err) + } + defer c.Close() + + c.Do("SET", "foo", 1) + exists, _ := redis.Bool(c.Do("EXISTS", "foo")) + fmt.Printf("%#v\n", exists) + // Output: + // true +} + +func ExampleInt() { + c, err := dial() + if err != nil { + panic(err) + } + defer c.Close() + + c.Do("SET", "k1", 1) + n, _ := redis.Int(c.Do("GET", "k1")) + fmt.Printf("%#v\n", n) + n, _ = redis.Int(c.Do("INCR", "k1")) + fmt.Printf("%#v\n", n) + // Output: + // 1 + // 2 +} + +func ExampleInts() { + c, err := dial() + if err != nil { + panic(err) + } + defer c.Close() + + c.Do("SADD", "set_with_integers", 4, 5, 6) + ints, _ := redis.Ints(c.Do("SMEMBERS", "set_with_integers")) + fmt.Printf("%#v\n", ints) + // Output: + // []int{4, 5, 6} +} + +func ExampleString() { + c, err := dial() + if err != nil { + panic(err) + } + defer c.Close() + + c.Do("SET", "hello", "world") + s, err := redis.String(c.Do("GET", "hello")) + fmt.Printf("%#v\n", s) + // Output: + // "world" +} diff --git a/redis/scan.go b/redis/scan.go new file mode 100644 index 0000000..8c9cfa1 --- /dev/null +++ b/redis/scan.go @@ -0,0 +1,513 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "errors" + "fmt" + "reflect" + "strconv" + "strings" + "sync" +) + +func ensureLen(d reflect.Value, n int) { + if n > d.Cap() { + d.Set(reflect.MakeSlice(d.Type(), n, n)) + } else { + d.SetLen(n) + } +} + +func cannotConvert(d reflect.Value, s interface{}) error { + return fmt.Errorf("redigo: Scan cannot convert from %s to %s", + reflect.TypeOf(s), d.Type()) +} + +func convertAssignBytes(d reflect.Value, s []byte) (err error) { + switch d.Type().Kind() { + case reflect.Float32, reflect.Float64: + var x float64 + x, err = strconv.ParseFloat(string(s), d.Type().Bits()) + d.SetFloat(x) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + var x int64 + x, err = strconv.ParseInt(string(s), 10, d.Type().Bits()) + d.SetInt(x) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + var x uint64 + x, err = strconv.ParseUint(string(s), 10, d.Type().Bits()) + d.SetUint(x) + case reflect.Bool: + var x bool + x, err = strconv.ParseBool(string(s)) + d.SetBool(x) + case reflect.String: + d.SetString(string(s)) + case reflect.Slice: + if d.Type().Elem().Kind() != reflect.Uint8 { + err = cannotConvert(d, s) + } else { + d.SetBytes(s) + } + default: + err = cannotConvert(d, s) + } + return +} + +func convertAssignInt(d reflect.Value, s int64) (err error) { + switch d.Type().Kind() { + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + d.SetInt(s) + if d.Int() != s { + err = strconv.ErrRange + d.SetInt(0) + } + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + if s < 0 { + err = strconv.ErrRange + } else { + x := uint64(s) + d.SetUint(x) + if d.Uint() != x { + err = strconv.ErrRange + d.SetUint(0) + } + } + case reflect.Bool: + d.SetBool(s != 0) + default: + err = cannotConvert(d, s) + } + return +} + +func convertAssignValue(d reflect.Value, s interface{}) (err error) { + switch s := s.(type) { + case []byte: + err = convertAssignBytes(d, s) + case int64: + err = convertAssignInt(d, s) + default: + err = cannotConvert(d, s) + } + return err +} + +func convertAssignValues(d reflect.Value, s []interface{}) error { + if d.Type().Kind() != reflect.Slice { + return cannotConvert(d, s) + } + ensureLen(d, len(s)) + for i := 0; i < len(s); i++ { + if err := convertAssignValue(d.Index(i), s[i]); err != nil { + return err + } + } + return nil +} + +func convertAssign(d interface{}, s interface{}) (err error) { + // Handle the most common destination types using type switches and + // fall back to reflection for all other types. + switch s := s.(type) { + case nil: + // ingore + case []byte: + switch d := d.(type) { + case *string: + *d = string(s) + case *int: + *d, err = strconv.Atoi(string(s)) + case *bool: + *d, err = strconv.ParseBool(string(s)) + case *[]byte: + *d = s + case *interface{}: + *d = s + case nil: + // skip value + default: + if d := reflect.ValueOf(d); d.Type().Kind() != reflect.Ptr { + err = cannotConvert(d, s) + } else { + err = convertAssignBytes(d.Elem(), s) + } + } + case int64: + switch d := d.(type) { + case *int: + x := int(s) + if int64(x) != s { + err = strconv.ErrRange + x = 0 + } + *d = x + case *bool: + *d = s != 0 + case *interface{}: + *d = s + case nil: + // skip value + default: + if d := reflect.ValueOf(d); d.Type().Kind() != reflect.Ptr { + err = cannotConvert(d, s) + } else { + err = convertAssignInt(d.Elem(), s) + } + } + case []interface{}: + switch d := d.(type) { + case *[]interface{}: + *d = s + case *interface{}: + *d = s + case nil: + // skip value + default: + if d := reflect.ValueOf(d); d.Type().Kind() != reflect.Ptr { + err = cannotConvert(d, s) + } else { + err = convertAssignValues(d.Elem(), s) + } + } + case Error: + err = s + default: + err = cannotConvert(reflect.ValueOf(d), s) + } + return +} + +// Scan copies from src to the values pointed at by dest. +// +// The values pointed at by dest must be an integer, float, boolean, string, +// []byte, interface{} or slices of these types. Scan uses the standard strconv +// package to convert bulk strings to numeric and boolean types. +// +// If a dest value is nil, then the corresponding src value is skipped. +// +// If a src element is nil, then the corresponding dest value is not modified. +// +// To enable easy use of Scan in a loop, Scan returns the slice of src +// following the copied values. +func Scan(src []interface{}, dest ...interface{}) ([]interface{}, error) { + if len(src) < len(dest) { + return nil, errors.New("redigo: Scan array short") + } + var err error + for i, d := range dest { + err = convertAssign(d, src[i]) + if err != nil { + break + } + } + return src[len(dest):], err +} + +type fieldSpec struct { + name string + index []int + //omitEmpty bool +} + +type structSpec struct { + m map[string]*fieldSpec + l []*fieldSpec +} + +func (ss *structSpec) fieldSpec(name []byte) *fieldSpec { + return ss.m[string(name)] +} + +func compileStructSpec(t reflect.Type, depth map[string]int, index []int, ss *structSpec) { + for i := 0; i < t.NumField(); i++ { + f := t.Field(i) + switch { + case f.PkgPath != "": + // Ignore unexported fields. + case f.Anonymous: + // TODO: Handle pointers. Requires change to decoder and + // protection against infinite recursion. + if f.Type.Kind() == reflect.Struct { + compileStructSpec(f.Type, depth, append(index, i), ss) + } + default: + fs := &fieldSpec{name: f.Name} + tag := f.Tag.Get("redis") + p := strings.Split(tag, ",") + if len(p) > 0 { + if p[0] == "-" { + continue + } + if len(p[0]) > 0 { + fs.name = p[0] + } + for _, s := range p[1:] { + switch s { + //case "omitempty": + // fs.omitempty = true + default: + panic(errors.New("redigo: unknown field flag " + s + " for type " + t.Name())) + } + } + } + d, found := depth[fs.name] + if !found { + d = 1 << 30 + } + switch { + case len(index) == d: + // At same depth, remove from result. + delete(ss.m, fs.name) + j := 0 + for i := 0; i < len(ss.l); i++ { + if fs.name != ss.l[i].name { + ss.l[j] = ss.l[i] + j += 1 + } + } + ss.l = ss.l[:j] + case len(index) < d: + fs.index = make([]int, len(index)+1) + copy(fs.index, index) + fs.index[len(index)] = i + depth[fs.name] = len(index) + ss.m[fs.name] = fs + ss.l = append(ss.l, fs) + } + } + } +} + +var ( + structSpecMutex sync.RWMutex + structSpecCache = make(map[reflect.Type]*structSpec) + defaultFieldSpec = &fieldSpec{} +) + +func structSpecForType(t reflect.Type) *structSpec { + + structSpecMutex.RLock() + ss, found := structSpecCache[t] + structSpecMutex.RUnlock() + if found { + return ss + } + + structSpecMutex.Lock() + defer structSpecMutex.Unlock() + ss, found = structSpecCache[t] + if found { + return ss + } + + ss = &structSpec{m: make(map[string]*fieldSpec)} + compileStructSpec(t, make(map[string]int), nil, ss) + structSpecCache[t] = ss + return ss +} + +var errScanStructValue = errors.New("redigo: ScanStruct value must be non-nil pointer to a struct") + +// ScanStruct scans alternating names and values from src to a struct. The +// HGETALL and CONFIG GET commands return replies in this format. +// +// ScanStruct uses exported field names to match values in the response. Use +// 'redis' field tag to override the name: +// +// Field int `redis:"myName"` +// +// Fields with the tag redis:"-" are ignored. +// +// Integer, float, boolean, string and []byte fields are supported. Scan uses the +// standard strconv package to convert bulk string values to numeric and +// boolean types. +// +// If a src element is nil, then the corresponding field is not modified. +func ScanStruct(src []interface{}, dest interface{}) error { + d := reflect.ValueOf(dest) + if d.Kind() != reflect.Ptr || d.IsNil() { + return errScanStructValue + } + d = d.Elem() + if d.Kind() != reflect.Struct { + return errScanStructValue + } + ss := structSpecForType(d.Type()) + + if len(src)%2 != 0 { + return errors.New("redigo: ScanStruct expects even number of values in values") + } + + for i := 0; i < len(src); i += 2 { + s := src[i+1] + if s == nil { + continue + } + name, ok := src[i].([]byte) + if !ok { + return errors.New("redigo: ScanStruct key not a bulk string value") + } + fs := ss.fieldSpec(name) + if fs == nil { + continue + } + if err := convertAssignValue(d.FieldByIndex(fs.index), s); err != nil { + return err + } + } + return nil +} + +var ( + errScanSliceValue = errors.New("redigo: ScanSlice dest must be non-nil pointer to a struct") +) + +// ScanSlice scans src to the slice pointed to by dest. The elements the dest +// slice must be integer, float, boolean, string, struct or pointer to struct +// values. +// +// Struct fields must be integer, float, boolean or string values. All struct +// fields are used unless a subset is specified using fieldNames. +func ScanSlice(src []interface{}, dest interface{}, fieldNames ...string) error { + d := reflect.ValueOf(dest) + if d.Kind() != reflect.Ptr || d.IsNil() { + return errScanSliceValue + } + d = d.Elem() + if d.Kind() != reflect.Slice { + return errScanSliceValue + } + + isPtr := false + t := d.Type().Elem() + if t.Kind() == reflect.Ptr && t.Elem().Kind() == reflect.Struct { + isPtr = true + t = t.Elem() + } + + if t.Kind() != reflect.Struct { + ensureLen(d, len(src)) + for i, s := range src { + if s == nil { + continue + } + if err := convertAssignValue(d.Index(i), s); err != nil { + return err + } + } + return nil + } + + ss := structSpecForType(t) + fss := ss.l + if len(fieldNames) > 0 { + fss = make([]*fieldSpec, len(fieldNames)) + for i, name := range fieldNames { + fss[i] = ss.m[name] + if fss[i] == nil { + return errors.New("redigo: ScanSlice bad field name " + name) + } + } + } + + if len(fss) == 0 { + return errors.New("redigo: ScanSlice no struct fields") + } + + n := len(src) / len(fss) + if n*len(fss) != len(src) { + return errors.New("redigo: ScanSlice length not a multiple of struct field count") + } + + ensureLen(d, n) + for i := 0; i < n; i++ { + d := d.Index(i) + if isPtr { + if d.IsNil() { + d.Set(reflect.New(t)) + } + d = d.Elem() + } + for j, fs := range fss { + s := src[i*len(fss)+j] + if s == nil { + continue + } + if err := convertAssignValue(d.FieldByIndex(fs.index), s); err != nil { + return err + } + } + } + return nil +} + +// Args is a helper for constructing command arguments from structured values. +type Args []interface{} + +// Add returns the result of appending value to args. +func (args Args) Add(value ...interface{}) Args { + return append(args, value...) +} + +// AddFlat returns the result of appending the flattened value of v to args. +// +// Maps are flattened by appending the alternating keys and map values to args. +// +// Slices are flattened by appending the slice elements to args. +// +// Structs are flattened by appending the alternating names and values of +// exported fields to args. If v is a nil struct pointer, then nothing is +// appended. The 'redis' field tag overrides struct field names. See ScanStruct +// for more information on the use of the 'redis' field tag. +// +// Other types are appended to args as is. +func (args Args) AddFlat(v interface{}) Args { + rv := reflect.ValueOf(v) + switch rv.Kind() { + case reflect.Struct: + args = flattenStruct(args, rv) + case reflect.Slice: + for i := 0; i < rv.Len(); i++ { + args = append(args, rv.Index(i).Interface()) + } + case reflect.Map: + for _, k := range rv.MapKeys() { + args = append(args, k.Interface(), rv.MapIndex(k).Interface()) + } + case reflect.Ptr: + if rv.Type().Elem().Kind() == reflect.Struct { + if !rv.IsNil() { + args = flattenStruct(args, rv.Elem()) + } + } else { + args = append(args, v) + } + default: + args = append(args, v) + } + return args +} + +func flattenStruct(args Args, v reflect.Value) Args { + ss := structSpecForType(v.Type()) + for _, fs := range ss.l { + fv := v.FieldByIndex(fs.index) + args = append(args, fs.name, fv.Interface()) + } + return args +} diff --git a/redis/scan_test.go b/redis/scan_test.go new file mode 100644 index 0000000..b57dd89 --- /dev/null +++ b/redis/scan_test.go @@ -0,0 +1,412 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis_test + +import ( + "fmt" + "github.com/garyburd/redigo/redis" + "math" + "reflect" + "testing" +) + +var scanConversionTests = []struct { + src interface{} + dest interface{} +}{ + {[]byte("-inf"), math.Inf(-1)}, + {[]byte("+inf"), math.Inf(1)}, + {[]byte("0"), float64(0)}, + {[]byte("3.14159"), float64(3.14159)}, + {[]byte("3.14"), float32(3.14)}, + {[]byte("-100"), int(-100)}, + {[]byte("101"), int(101)}, + {int64(102), int(102)}, + {[]byte("103"), uint(103)}, + {int64(104), uint(104)}, + {[]byte("105"), int8(105)}, + {int64(106), int8(106)}, + {[]byte("107"), uint8(107)}, + {int64(108), uint8(108)}, + {[]byte("0"), false}, + {int64(0), false}, + {[]byte("f"), false}, + {[]byte("1"), true}, + {int64(1), true}, + {[]byte("t"), true}, + {[]byte("hello"), "hello"}, + {[]byte("world"), []byte("world")}, + {[]interface{}{[]byte("foo")}, []interface{}{[]byte("foo")}}, + {[]interface{}{[]byte("foo")}, []string{"foo"}}, + {[]interface{}{[]byte("hello"), []byte("world")}, []string{"hello", "world"}}, + {[]interface{}{[]byte("bar")}, [][]byte{[]byte("bar")}}, + {[]interface{}{[]byte("1")}, []int{1}}, + {[]interface{}{[]byte("1"), []byte("2")}, []int{1, 2}}, + {[]interface{}{[]byte("1"), []byte("2")}, []float64{1, 2}}, + {[]interface{}{[]byte("1")}, []byte{1}}, + {[]interface{}{[]byte("1")}, []bool{true}}, +} + +func TestScanConversion(t *testing.T) { + for _, tt := range scanConversionTests { + values := []interface{}{tt.src} + dest := reflect.New(reflect.TypeOf(tt.dest)) + values, err := redis.Scan(values, dest.Interface()) + if err != nil { + t.Errorf("Scan(%v) returned error %v", tt, err) + continue + } + if !reflect.DeepEqual(tt.dest, dest.Elem().Interface()) { + t.Errorf("Scan(%v) returned %v, want %v", tt, dest.Elem().Interface(), tt.dest) + } + } +} + +var scanConversionErrorTests = []struct { + src interface{} + dest interface{} +}{ + {[]byte("1234"), byte(0)}, + {int64(1234), byte(0)}, + {[]byte("-1"), byte(0)}, + {int64(-1), byte(0)}, + {[]byte("junk"), false}, + {redis.Error("blah"), false}, +} + +func TestScanConversionError(t *testing.T) { + for _, tt := range scanConversionErrorTests { + values := []interface{}{tt.src} + dest := reflect.New(reflect.TypeOf(tt.dest)) + values, err := redis.Scan(values, dest.Interface()) + if err == nil { + t.Errorf("Scan(%v) did not return error", tt) + } + } +} + +func ExampleScan() { + c, err := dial() + if err != nil { + panic(err) + } + defer c.Close() + + c.Send("HMSET", "album:1", "title", "Red", "rating", 5) + c.Send("HMSET", "album:2", "title", "Earthbound", "rating", 1) + c.Send("HMSET", "album:3", "title", "Beat") + c.Send("LPUSH", "albums", "1") + c.Send("LPUSH", "albums", "2") + c.Send("LPUSH", "albums", "3") + values, err := redis.Values(c.Do("SORT", "albums", + "BY", "album:*->rating", + "GET", "album:*->title", + "GET", "album:*->rating")) + if err != nil { + panic(err) + } + + for len(values) > 0 { + var title string + rating := -1 // initialize to illegal value to detect nil. + values, err = redis.Scan(values, &title, &rating) + if err != nil { + panic(err) + } + if rating == -1 { + fmt.Println(title, "not-rated") + } else { + fmt.Println(title, rating) + } + } + // Output: + // Beat not-rated + // Earthbound 1 + // Red 5 +} + +type s0 struct { + X int + Y int `redis:"y"` + Bt bool +} + +type s1 struct { + X int `redis:"-"` + I int `redis:"i"` + U uint `redis:"u"` + S string `redis:"s"` + P []byte `redis:"p"` + B bool `redis:"b"` + Bt bool + Bf bool + s0 +} + +var scanStructTests = []struct { + title string + reply []string + value interface{} +}{ + {"basic", + []string{"i", "-1234", "u", "5678", "s", "hello", "p", "world", "b", "t", "Bt", "1", "Bf", "0", "X", "123", "y", "456"}, + &s1{I: -1234, U: 5678, S: "hello", P: []byte("world"), B: true, Bt: true, Bf: false, s0: s0{X: 123, Y: 456}}, + }, +} + +func TestScanStruct(t *testing.T) { + for _, tt := range scanStructTests { + + var reply []interface{} + for _, v := range tt.reply { + reply = append(reply, []byte(v)) + } + + value := reflect.New(reflect.ValueOf(tt.value).Type().Elem()) + + if err := redis.ScanStruct(reply, value.Interface()); err != nil { + t.Fatalf("ScanStruct(%s) returned error %v", tt.title, err) + } + + if !reflect.DeepEqual(value.Interface(), tt.value) { + t.Fatalf("ScanStruct(%s) returned %v, want %v", tt.title, value.Interface(), tt.value) + } + } +} + +func TestBadScanStructArgs(t *testing.T) { + x := []interface{}{"A", "b"} + test := func(v interface{}) { + if err := redis.ScanStruct(x, v); err == nil { + t.Errorf("Expect error for ScanStruct(%T, %T)", x, v) + } + } + + test(nil) + + var v0 *struct{} + test(v0) + + var v1 int + test(&v1) + + x = x[:1] + v2 := struct{ A string }{} + test(&v2) +} + +var scanSliceTests = []struct { + src []interface{} + fieldNames []string + ok bool + dest interface{} +}{ + { + []interface{}{[]byte("1"), nil, []byte("-1")}, + nil, + true, + []int{1, 0, -1}, + }, + { + []interface{}{[]byte("1"), nil, []byte("2")}, + nil, + true, + []uint{1, 0, 2}, + }, + { + []interface{}{[]byte("-1")}, + nil, + false, + []uint{1}, + }, + { + []interface{}{[]byte("hello"), nil, []byte("world")}, + nil, + true, + [][]byte{[]byte("hello"), nil, []byte("world")}, + }, + { + []interface{}{[]byte("hello"), nil, []byte("world")}, + nil, + true, + []string{"hello", "", "world"}, + }, + { + []interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")}, + nil, + true, + []struct{ A, B string }{{"a1", "b1"}, {"a2", "b2"}}, + }, + { + []interface{}{[]byte("a1"), []byte("b1")}, + nil, + false, + []struct{ A, B, C string }{{"a1", "b1", ""}}, + }, + { + []interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")}, + nil, + true, + []*struct{ A, B string }{{"a1", "b1"}, {"a2", "b2"}}, + }, + { + []interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")}, + []string{"A", "B"}, + true, + []struct{ A, C, B string }{{"a1", "", "b1"}, {"a2", "", "b2"}}, + }, + { + []interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")}, + nil, + false, + []struct{}{}, + }, +} + +func TestScanSlice(t *testing.T) { + for _, tt := range scanSliceTests { + + typ := reflect.ValueOf(tt.dest).Type() + dest := reflect.New(typ) + + err := redis.ScanSlice(tt.src, dest.Interface(), tt.fieldNames...) + if tt.ok != (err == nil) { + t.Errorf("ScanSlice(%v, []%s, %v) returned error %v", tt.src, typ, tt.fieldNames, err) + continue + } + if tt.ok && !reflect.DeepEqual(dest.Elem().Interface(), tt.dest) { + t.Errorf("ScanSlice(src, []%s) returned %#v, want %#v", typ, dest.Elem().Interface(), tt.dest) + } + } +} + +func ExampleScanSlice() { + c, err := dial() + if err != nil { + panic(err) + } + defer c.Close() + + c.Send("HMSET", "album:1", "title", "Red", "rating", 5) + c.Send("HMSET", "album:2", "title", "Earthbound", "rating", 1) + c.Send("HMSET", "album:3", "title", "Beat", "rating", 4) + c.Send("LPUSH", "albums", "1") + c.Send("LPUSH", "albums", "2") + c.Send("LPUSH", "albums", "3") + values, err := redis.Values(c.Do("SORT", "albums", + "BY", "album:*->rating", + "GET", "album:*->title", + "GET", "album:*->rating")) + if err != nil { + panic(err) + } + + var albums []struct { + Title string + Rating int + } + if err := redis.ScanSlice(values, &albums); err != nil { + panic(err) + } + fmt.Printf("%v\n", albums) + // Output: + // [{Earthbound 1} {Beat 4} {Red 5}] +} + +var argsTests = []struct { + title string + actual redis.Args + expected redis.Args +}{ + {"struct ptr", + redis.Args{}.AddFlat(&struct { + I int `redis:"i"` + U uint `redis:"u"` + S string `redis:"s"` + P []byte `redis:"p"` + Bt bool + Bf bool + }{ + -1234, 5678, "hello", []byte("world"), true, false, + }), + redis.Args{"i", int(-1234), "u", uint(5678), "s", "hello", "p", []byte("world"), "Bt", true, "Bf", false}, + }, + {"struct", + redis.Args{}.AddFlat(struct{ I int }{123}), + redis.Args{"I", 123}, + }, + {"slice", + redis.Args{}.Add(1).AddFlat([]string{"a", "b", "c"}).Add(2), + redis.Args{1, "a", "b", "c", 2}, + }, +} + +func TestArgs(t *testing.T) { + for _, tt := range argsTests { + if !reflect.DeepEqual(tt.actual, tt.expected) { + t.Fatalf("%s is %v, want %v", tt.title, tt.actual, tt.expected) + } + } +} + +func ExampleArgs() { + c, err := dial() + if err != nil { + panic(err) + } + defer c.Close() + + var p1, p2 struct { + Title string `redis:"title"` + Author string `redis:"author"` + Body string `redis:"body"` + } + + p1.Title = "Example" + p1.Author = "Gary" + p1.Body = "Hello" + + if _, err := c.Do("HMSET", redis.Args{}.Add("id1").AddFlat(&p1)...); err != nil { + panic(err) + } + + m := map[string]string{ + "title": "Example2", + "author": "Steve", + "body": "Map", + } + + if _, err := c.Do("HMSET", redis.Args{}.Add("id2").AddFlat(m)...); err != nil { + panic(err) + } + + for _, id := range []string{"id1", "id2"} { + + v, err := redis.Values(c.Do("HGETALL", id)) + if err != nil { + panic(err) + } + + if err := redis.ScanStruct(v, &p2); err != nil { + panic(err) + } + + fmt.Printf("%+v\n", p2) + } + + // Output: + // {Title:Example Author:Gary Body:Hello} + // {Title:Example2 Author:Steve Body:Map} +} diff --git a/redis/script.go b/redis/script.go new file mode 100644 index 0000000..78605a9 --- /dev/null +++ b/redis/script.go @@ -0,0 +1,86 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "crypto/sha1" + "encoding/hex" + "io" + "strings" +) + +// Script encapsulates the source, hash and key count for a Lua script. See +// http://redis.io/commands/eval for information on scripts in Redis. +type Script struct { + keyCount int + src string + hash string +} + +// NewScript returns a new script object. If keyCount is greater than or equal +// to zero, then the count is automatically inserted in the EVAL command +// argument list. If keyCount is less than zero, then the application supplies +// the count as the first value in the keysAndArgs argument to the Do, Send and +// SendHash methods. +func NewScript(keyCount int, src string) *Script { + h := sha1.New() + io.WriteString(h, src) + return &Script{keyCount, src, hex.EncodeToString(h.Sum(nil))} +} + +func (s *Script) args(spec string, keysAndArgs []interface{}) []interface{} { + var args []interface{} + if s.keyCount < 0 { + args = make([]interface{}, 1+len(keysAndArgs)) + args[0] = spec + copy(args[1:], keysAndArgs) + } else { + args = make([]interface{}, 2+len(keysAndArgs)) + args[0] = spec + args[1] = s.keyCount + copy(args[2:], keysAndArgs) + } + return args +} + +// Do evaluates the script. Under the covers, Do optimistically evaluates the +// script using the EVALSHA command. If the command fails because the script is +// not loaded, then Do evaluates the script using the EVAL command (thus +// causing the script to load). +func (s *Script) Do(c Conn, keysAndArgs ...interface{}) (interface{}, error) { + v, err := c.Do("EVALSHA", s.args(s.hash, keysAndArgs)...) + if e, ok := err.(Error); ok && strings.HasPrefix(string(e), "NOSCRIPT ") { + v, err = c.Do("EVAL", s.args(s.src, keysAndArgs)...) + } + return v, err +} + +// SendHash evaluates the script without waiting for the reply. The script is +// evaluated with the EVALSHA command. The application must ensure that the +// script is loaded by a previous call to Send, Do or Load methods. +func (s *Script) SendHash(c Conn, keysAndArgs ...interface{}) error { + return c.Send("EVALSHA", s.args(s.hash, keysAndArgs)...) +} + +// Send evaluates the script without waiting for the reply. +func (s *Script) Send(c Conn, keysAndArgs ...interface{}) error { + return c.Send("EVAL", s.args(s.src, keysAndArgs)...) +} + +// Load loads the script without evaluating it. +func (s *Script) Load(c Conn) error { + _, err := c.Do("SCRIPT", "LOAD", s.src) + return err +} diff --git a/redis/script_test.go b/redis/script_test.go new file mode 100644 index 0000000..c9635bf --- /dev/null +++ b/redis/script_test.go @@ -0,0 +1,93 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis_test + +import ( + "fmt" + "reflect" + "testing" + "time" + + "github.com/garyburd/redigo/internal/redistest" + "github.com/garyburd/redigo/redis" +) + +func ExampleScript(c redis.Conn, reply interface{}, err error) { + // Initialize a package-level variable with a script. + var getScript = redis.NewScript(1, `return redis.call('get', KEYS[1])`) + + // In a function, use the script Do method to evaluate the script. The Do + // method optimistically uses the EVALSHA command. If the script is not + // loaded, then the Do method falls back to the EVAL command. + reply, err = getScript.Do(c, "foo") +} + +func TestScript(t *testing.T) { + c, err := redistest.Dial() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + // To test fall back in Do, we make script unique by adding comment with current time. + script := fmt.Sprintf("--%d\nreturn {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}", time.Now().UnixNano()) + s := redis.NewScript(2, script) + reply := []interface{}{[]byte("key1"), []byte("key2"), []byte("arg1"), []byte("arg2")} + + v, err := s.Do(c, "key1", "key2", "arg1", "arg2") + if err != nil { + t.Errorf("s.Do(c, ...) returned %v", err) + } + + if !reflect.DeepEqual(v, reply) { + t.Errorf("s.Do(c, ..); = %v, want %v", v, reply) + } + + err = s.Load(c) + if err != nil { + t.Errorf("s.Load(c) returned %v", err) + } + + err = s.SendHash(c, "key1", "key2", "arg1", "arg2") + if err != nil { + t.Errorf("s.SendHash(c, ...) returned %v", err) + } + + err = c.Flush() + if err != nil { + t.Errorf("c.Flush() returned %v", err) + } + + v, err = c.Receive() + if !reflect.DeepEqual(v, reply) { + t.Errorf("s.SendHash(c, ..); c.Receive() = %v, want %v", v, reply) + } + + err = s.Send(c, "key1", "key2", "arg1", "arg2") + if err != nil { + t.Errorf("s.Send(c, ...) returned %v", err) + } + + err = c.Flush() + if err != nil { + t.Errorf("c.Flush() returned %v", err) + } + + v, err = c.Receive() + if !reflect.DeepEqual(v, reply) { + t.Errorf("s.Send(c, ..); c.Receive() = %v, want %v", v, reply) + } + +} diff --git a/redis/test_test.go b/redis/test_test.go new file mode 100644 index 0000000..b959a11 --- /dev/null +++ b/redis/test_test.go @@ -0,0 +1,38 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "bufio" + "net" + "time" +) + +func SetNowFunc(f func() time.Time) { + nowFunc = f +} + +type nopCloser struct{ net.Conn } + +func (nopCloser) Close() error { return nil } + +// NewConnBufio is a hook for tests. +func NewConnBufio(rw bufio.ReadWriter) Conn { + return &conn{br: rw.Reader, bw: rw.Writer, conn: nopCloser{}} +} + +var ( + ErrNegativeInt = errNegativeInt +) diff --git a/redis/zpop_example_test.go b/redis/zpop_example_test.go new file mode 100644 index 0000000..1d86ee6 --- /dev/null +++ b/redis/zpop_example_test.go @@ -0,0 +1,113 @@ +// Copyright 2013 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis_test + +import ( + "fmt" + "github.com/garyburd/redigo/redis" +) + +// zpop pops a value from the ZSET key using WATCH/MULTI/EXEC commands. +func zpop(c redis.Conn, key string) (result string, err error) { + + defer func() { + // Return connection to normal state on error. + if err != nil { + c.Do("DISCARD") + } + }() + + // Loop until transaction is successful. + for { + if _, err := c.Do("WATCH", key); err != nil { + return "", err + } + + members, err := redis.Strings(c.Do("ZRANGE", key, 0, 0)) + if err != nil { + return "", err + } + if len(members) != 1 { + return "", redis.ErrNil + } + + c.Send("MULTI") + c.Send("ZREM", key, members[0]) + queued, err := c.Do("EXEC") + if err != nil { + return "", err + } + + if queued != nil { + result = members[0] + break + } + } + + return result, nil +} + +// zpopScript pops a value from a ZSET. +var zpopScript = redis.NewScript(1, ` + local r = redis.call('ZRANGE', KEYS[1], 0, 0) + if r ~= nil then + r = r[1] + redis.call('ZREM', KEYS[1], r) + end + return r +`) + +// This example implements ZPOP as described at +// http://redis.io/topics/transactions using WATCH/MULTI/EXEC and scripting. +func Example_zpop() { + c, err := dial() + if err != nil { + fmt.Println(err) + return + } + defer c.Close() + + // Add test data using a pipeline. + + for i, member := range []string{"red", "blue", "green"} { + c.Send("ZADD", "zset", i, member) + } + if _, err := c.Do(""); err != nil { + fmt.Println(err) + return + } + + // Pop using WATCH/MULTI/EXEC + + v, err := zpop(c, "zset") + if err != nil { + fmt.Println(err) + return + } + fmt.Println(v) + + // Pop using a script. + + v, err = redis.String(zpopScript.Do(c, "zset")) + if err != nil { + fmt.Println(err) + return + } + fmt.Println(v) + + // Output: + // red + // blue +} diff --git a/redisx/connmux.go b/redisx/connmux.go new file mode 100644 index 0000000..af2cced --- /dev/null +++ b/redisx/connmux.go @@ -0,0 +1,152 @@ +// Copyright 2014 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redisx + +import ( + "errors" + "sync" + + "github.com/garyburd/redigo/internal" + "github.com/garyburd/redigo/redis" +) + +// ConnMux multiplexes one or more connections to a single underlying +// connection. The ConnMux connections do not support concurrency, commands +// that associate server side state with the connection or commands that put +// the connection in a special mode. +type ConnMux struct { + c redis.Conn + + sendMu sync.Mutex + sendID uint + + recvMu sync.Mutex + recvID uint + recvWait map[uint]chan struct{} +} + +func NewConnMux(c redis.Conn) *ConnMux { + return &ConnMux{c: c, recvWait: make(map[uint]chan struct{})} +} + +// Get gets a connection. The application must close the returned connection. +func (p *ConnMux) Get() redis.Conn { + c := &muxConn{p: p} + c.ids = c.buf[:0] + return c +} + +// Close closes the underlying connection. +func (p *ConnMux) Close() error { + return p.c.Close() +} + +type muxConn struct { + p *ConnMux + ids []uint + buf [8]uint +} + +func (c *muxConn) send(flush bool, cmd string, args ...interface{}) error { + if internal.LookupCommandInfo(cmd).Set != 0 { + return errors.New("command not supported by mux pool") + } + p := c.p + p.sendMu.Lock() + id := p.sendID + c.ids = append(c.ids, id) + p.sendID++ + err := p.c.Send(cmd, args...) + if flush { + err = p.c.Flush() + } + p.sendMu.Unlock() + return err +} + +func (c *muxConn) Send(cmd string, args ...interface{}) error { + return c.send(false, cmd, args...) +} + +func (c *muxConn) Flush() error { + p := c.p + p.sendMu.Lock() + err := p.c.Flush() + p.sendMu.Unlock() + return err +} + +func (c *muxConn) Receive() (interface{}, error) { + if len(c.ids) == 0 { + return nil, errors.New("mux pool underflow") + } + + id := c.ids[0] + c.ids = c.ids[1:] + if len(c.ids) == 0 { + c.ids = c.buf[:0] + } + + p := c.p + p.recvMu.Lock() + if p.recvID != id { + ch := make(chan struct{}) + p.recvWait[id] = ch + p.recvMu.Unlock() + <-ch + p.recvMu.Lock() + if p.recvID != id { + panic("out of sync") + } + } + + v, err := p.c.Receive() + + id++ + p.recvID = id + ch, ok := p.recvWait[id] + if ok { + delete(p.recvWait, id) + } + p.recvMu.Unlock() + if ok { + ch <- struct{}{} + } + + return v, err +} + +func (c *muxConn) Close() error { + var err error + if len(c.ids) == 0 { + return nil + } + c.Flush() + for _ = range c.ids { + _, err = c.Receive() + } + return err +} + +func (c *muxConn) Do(cmd string, args ...interface{}) (interface{}, error) { + if err := c.send(true, cmd, args...); err != nil { + return nil, err + } + return c.Receive() +} + +func (c *muxConn) Err() error { + return c.p.c.Err() +} diff --git a/redisx/connmux_test.go b/redisx/connmux_test.go new file mode 100644 index 0000000..9c3c8b1 --- /dev/null +++ b/redisx/connmux_test.go @@ -0,0 +1,259 @@ +// Copyright 2014 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redisx_test + +import ( + "net/textproto" + "sync" + "testing" + + "github.com/garyburd/redigo/internal/redistest" + "github.com/garyburd/redigo/redis" + "github.com/garyburd/redigo/redisx" +) + +func TestConnMux(t *testing.T) { + c, err := redistest.Dial() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + m := redisx.NewConnMux(c) + defer m.Close() + + c1 := m.Get() + c2 := m.Get() + c1.Send("ECHO", "hello") + c2.Send("ECHO", "world") + c1.Flush() + c2.Flush() + s, err := redis.String(c1.Receive()) + if err != nil { + t.Fatal(err) + } + if s != "hello" { + t.Fatalf("echo returned %q, want %q", s, "hello") + } + s, err = redis.String(c2.Receive()) + if err != nil { + t.Fatal(err) + } + if s != "world" { + t.Fatalf("echo returned %q, want %q", s, "world") + } + c1.Close() + c2.Close() +} + +func TestConnMuxClose(t *testing.T) { + c, err := redistest.Dial() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + m := redisx.NewConnMux(c) + defer m.Close() + + c1 := m.Get() + c2 := m.Get() + + if err := c1.Send("ECHO", "hello"); err != nil { + t.Fatal(err) + } + if err := c1.Close(); err != nil { + t.Fatal(err) + } + + if err := c2.Send("ECHO", "world"); err != nil { + t.Fatal(err) + } + if err := c2.Flush(); err != nil { + t.Fatal(err) + } + + s, err := redis.String(c2.Receive()) + if err != nil { + t.Fatal(err) + } + if s != "world" { + t.Fatalf("echo returned %q, want %q", s, "world") + } + c2.Close() +} + +func BenchmarkConn(b *testing.B) { + b.StopTimer() + c, err := redistest.Dial() + if err != nil { + b.Fatalf("error connection to database, %v", err) + } + defer c.Close() + b.StartTimer() + + for i := 0; i < b.N; i++ { + if _, err := c.Do("PING"); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkConnMux(b *testing.B) { + b.StopTimer() + c, err := redistest.Dial() + if err != nil { + b.Fatalf("error connection to database, %v", err) + } + m := redisx.NewConnMux(c) + defer m.Close() + + b.StartTimer() + + for i := 0; i < b.N; i++ { + c := m.Get() + if _, err := c.Do("PING"); err != nil { + b.Fatal(err) + } + c.Close() + } +} + +func BenchmarkPool(b *testing.B) { + b.StopTimer() + + p := redis.Pool{Dial: redistest.Dial, MaxIdle: 1} + defer p.Close() + + // Fill the pool. + c := p.Get() + if err := c.Err(); err != nil { + b.Fatal(err) + } + c.Close() + + b.StartTimer() + + for i := 0; i < b.N; i++ { + c := p.Get() + if _, err := c.Do("PING"); err != nil { + b.Fatal(err) + } + c.Close() + } +} + +const numConcurrent = 10 + +func BenchmarkConnMuxConcurrent(b *testing.B) { + b.StopTimer() + c, err := redistest.Dial() + if err != nil { + b.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + m := redisx.NewConnMux(c) + + var wg sync.WaitGroup + wg.Add(numConcurrent) + + b.StartTimer() + + for i := 0; i < numConcurrent; i++ { + go func() { + defer wg.Done() + for i := 0; i < b.N; i++ { + c := m.Get() + if _, err := c.Do("PING"); err != nil { + b.Fatal(err) + } + c.Close() + } + }() + } + wg.Wait() +} + +func BenchmarkPoolConcurrent(b *testing.B) { + b.StopTimer() + + p := redis.Pool{Dial: redistest.Dial, MaxIdle: numConcurrent} + defer p.Close() + + // Fill the pool. + conns := make([]redis.Conn, numConcurrent) + for i := range conns { + c := p.Get() + if err := c.Err(); err != nil { + b.Fatal(err) + } + conns[i] = c + } + for _, c := range conns { + c.Close() + } + + var wg sync.WaitGroup + wg.Add(numConcurrent) + + b.StartTimer() + + for i := 0; i < numConcurrent; i++ { + go func() { + defer wg.Done() + for i := 0; i < b.N; i++ { + c := p.Get() + if _, err := c.Do("PING"); err != nil { + b.Fatal(err) + } + c.Close() + } + }() + } + wg.Wait() +} + +func BenchmarkPipelineConcurrency(b *testing.B) { + b.StopTimer() + c, err := redistest.Dial() + if err != nil { + b.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + var wg sync.WaitGroup + wg.Add(numConcurrent) + + var pipeline textproto.Pipeline + + b.StartTimer() + + for i := 0; i < numConcurrent; i++ { + go func() { + defer wg.Done() + for i := 0; i < b.N; i++ { + id := pipeline.Next() + pipeline.StartRequest(id) + c.Send("PING") + c.Flush() + pipeline.EndRequest(id) + pipeline.StartResponse(id) + _, err := c.Receive() + if err != nil { + b.Fatal(err) + } + pipeline.EndResponse(id) + } + }() + } + wg.Wait() +} diff --git a/redisx/doc.go b/redisx/doc.go new file mode 100644 index 0000000..91653db --- /dev/null +++ b/redisx/doc.go @@ -0,0 +1,17 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +// Package redisx contains experimental features for Redigo. Features in this +// package may be modified or deleted at any time. +package redisx // import "github.com/garyburd/redigo/redisx"