Skip to content
This repository was archived by the owner on Jun 25, 2025. It is now read-only.

Commit 2da72a5

Browse files
committed
feat(redis): Optimize with more aggresive pipelining
1 parent dcd4077 commit 2da72a5

7 files changed

Lines changed: 152 additions & 24 deletions

File tree

log/aggregator_factory_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ import (
99
type stubStorageAdapter struct {
1010
}
1111

12+
func (a *stubStorageAdapter) Start() {
13+
}
14+
1215
func (a *stubStorageAdapter) Write(app string, message string) error {
1316
return nil
1417
}
@@ -25,6 +28,9 @@ func (a *stubStorageAdapter) Reopen() error {
2528
return nil
2629
}
2730

31+
func (a *stubStorageAdapter) Stop() {
32+
}
33+
2834
func TestGetUsingInvalidValues(t *testing.T) {
2935
_, err := NewAggregator("bogus", &stubStorageAdapter{})
3036
if err == nil || err.Error() != fmt.Sprintf("Unrecognized aggregator type: '%s'", "bogus") {

main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ func main() {
1818
if err != nil {
1919
l.Fatal("Error creating storage adapter: ", err)
2020
}
21+
storageAdapter.Start()
22+
defer storageAdapter.Stop()
2123

2224
aggregator, err := log.NewAggregator(cfg.AggregatorType, storageAdapter)
2325
if err != nil {

storage/adapter.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package storage
22

33
// Adapter is an interface for pluggable components that store log messages.
44
type Adapter interface {
5+
Start()
56
Write(string, string) error
67
Read(string, int) ([]string, error)
78
Destroy(string) error
89
Reopen() error
10+
Stop()
911
}

storage/file_adapter.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ func NewFileAdapter() (Adapter, error) {
2222
return &fileAdapter{files: make(map[string]*os.File)}, nil
2323
}
2424

25+
// Start the storage adapter-- in the case of this implementation, a no-op
26+
func (a *fileAdapter) Start() {
27+
}
28+
2529
// Write adds a log message to to an app-specific log file
2630
func (a *fileAdapter) Write(app string, message string) error {
2731
// Check first if we might actually have to add to the map of file pointers so we can avoid
@@ -93,6 +97,7 @@ func (a *fileAdapter) Destroy(app string) error {
9397
return nil
9498
}
9599

100+
// Reopen every file referenced by this storage adapter
96101
func (a *fileAdapter) Reopen() error {
97102
// Ensure no other goroutine is trying to add a file pointer to the map of file pointers while
98103
// we're trying to clear it out
@@ -102,6 +107,10 @@ func (a *fileAdapter) Reopen() error {
102107
return nil
103108
}
104109

110+
// Stop the storage adapter-- in the case of this implementation, a no-op
111+
func (a *fileAdapter) Stop() {
112+
}
113+
105114
func (a *fileAdapter) getFile(app string) (*os.File, error) {
106115
filePath := a.getFilePath(app)
107116
exists, err := fileExists(filePath)

storage/redis_adapter.go

Lines changed: 110 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,71 @@ package storage
33
import (
44
"fmt"
55
"log"
6+
"time"
67

78
r "gopkg.in/redis.v3"
89
)
910

11+
type message struct {
12+
app string
13+
messageBody string
14+
}
15+
16+
func newMessage(app string, messageBody string) *message {
17+
return &message{
18+
app: app,
19+
messageBody: messageBody,
20+
}
21+
}
22+
23+
type messagePipeliner struct {
24+
bufferSize int
25+
messageCount int
26+
pipeline *r.Pipeline
27+
timeoutTicker *time.Ticker
28+
queuedApps map[string]bool
29+
errCh chan error
30+
}
31+
32+
func newMessagePipeliner(bufferSize int, redisClient *r.Client, errCh chan error) *messagePipeliner {
33+
return &messagePipeliner{
34+
bufferSize: bufferSize,
35+
pipeline: redisClient.Pipeline(),
36+
timeoutTicker: time.NewTicker(time.Second),
37+
queuedApps: map[string]bool{},
38+
errCh: errCh,
39+
}
40+
}
41+
42+
func (mp *messagePipeliner) addMessage(message *message) {
43+
if err := mp.pipeline.RPush(message.app, message.messageBody).Err(); err == nil {
44+
mp.queuedApps[message.app] = true
45+
mp.messageCount++
46+
} else {
47+
mp.errCh <- fmt.Errorf("Error adding rpush to %s to the pipeline: %s", message.app, err)
48+
}
49+
}
50+
51+
func (mp messagePipeliner) execPipeline() {
52+
for app := range mp.queuedApps {
53+
if err := mp.pipeline.LTrim(app, int64(-1*mp.bufferSize), -1).Err(); err != nil {
54+
mp.errCh <- fmt.Errorf("Error adding ltrim of %s to the pipeline: %s", app, err)
55+
}
56+
}
57+
go func() {
58+
defer mp.pipeline.Close()
59+
if _, err := mp.pipeline.Exec(); err != nil {
60+
mp.errCh <- fmt.Errorf("Error executing pipeline: %s", err)
61+
}
62+
}()
63+
}
64+
1065
type redisAdapter struct {
11-
bufferSize int
12-
redisClient *r.Client
66+
started bool
67+
bufferSize int
68+
redisClient *r.Client
69+
messageChannel chan *message
70+
stopCh chan struct{}
1371
}
1472

1573
// NewRedisStorageAdapter returns a pointer to a new instance of a redis-based storage.Adapter.
@@ -24,35 +82,59 @@ func NewRedisStorageAdapter(bufferSize int) (*redisAdapter, error) {
2482
if err != nil {
2583
return nil, err
2684
}
27-
return &redisAdapter{
85+
rsa := &redisAdapter{
2886
bufferSize: bufferSize,
2987
redisClient: r.NewClient(&r.Options{
3088
Addr: fmt.Sprintf("%s:%d", cfg.RedisHost, cfg.RedisPort),
3189
Password: cfg.RedisPassword, // "" == no password
3290
DB: int64(cfg.RedisDB),
3391
}),
34-
}, nil
92+
messageChannel: make(chan *message),
93+
stopCh: make(chan struct{}),
94+
}
95+
return rsa, nil
3596
}
3697

37-
// Write adds a log message to to an app-specific list in redis using ring-buffer-like semantics
38-
func (a *redisAdapter) Write(app string, message string) error {
39-
// Note: Deliberately NOT using MULTI / transactions here since in this implementation of the
40-
// redis client, MULTI is not safe for concurrent use by multiple goroutines. It's been advised
41-
// by the authors of the gopkg.in/redis.v3 package to just use pipelining when possible...
42-
// and here that is technically possible. In the WORST case scenario, not having transactions
43-
// means we may momentarily have more than the desired number of log entries in the list /
44-
// buffer, but an LTRIM will eventually correct that, bringing the list / buffer back down to
45-
// its desired max size.
46-
pipeline := a.redisClient.Pipeline()
47-
if err := pipeline.RPush(app, message).Err(); err != nil {
48-
return err
49-
}
50-
if err := pipeline.LTrim(app, int64(-1*a.bufferSize), -1).Err(); err != nil {
51-
return err
52-
}
53-
if _, err := pipeline.Exec(); err != nil {
54-
return err
98+
// Start the storage adapter. Invocations of this function are not concurrency safe and multiple
99+
// serialized invocations have no effect.
100+
func (a *redisAdapter) Start() {
101+
if !a.started {
102+
a.started = true
103+
errCh := make(chan error)
104+
mp := newMessagePipeliner(a.bufferSize, a.redisClient, errCh)
105+
go func() {
106+
for {
107+
select {
108+
case err := <-errCh:
109+
log.Println(err)
110+
case <-a.stopCh:
111+
return
112+
}
113+
}
114+
}()
115+
go func() {
116+
for {
117+
select {
118+
case message := <-a.messageChannel:
119+
mp.addMessage(message)
120+
if mp.messageCount == 50 {
121+
mp.execPipeline()
122+
mp = newMessagePipeliner(a.bufferSize, a.redisClient, errCh)
123+
}
124+
case <-mp.timeoutTicker.C:
125+
mp.execPipeline()
126+
mp = newMessagePipeliner(a.bufferSize, a.redisClient, errCh)
127+
case <-a.stopCh:
128+
return
129+
}
130+
}
131+
}()
55132
}
133+
}
134+
135+
// Write adds a log message to to an app-specific list in redis using ring-buffer-like semantics
136+
func (a *redisAdapter) Write(app string, messageBody string) error {
137+
a.messageChannel <- newMessage(app, messageBody)
56138
return nil
57139
}
58140

@@ -77,7 +159,12 @@ func (a *redisAdapter) Destroy(app string) error {
77159
return nil
78160
}
79161

162+
// Reopen the storage adapter-- in the case of this implementation, a no-op
80163
func (a *redisAdapter) Reopen() error {
81-
// No-op
82164
return nil
83165
}
166+
167+
// Stop the storage adapter. Additional writes may not be performed after stopping.
168+
func (a *redisAdapter) Stop() {
169+
close(a.stopCh)
170+
}

storage/redis_adapter_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package storage
55
import (
66
"fmt"
77
"testing"
8+
"time"
89
)
910

1011
func TestRedisReadFromNonExistingApp(t *testing.T) {
@@ -42,12 +43,17 @@ func TestRedisLogs(t *testing.T) {
4243
if err != nil {
4344
t.Error(err)
4445
}
46+
a.Start()
47+
defer a.Stop()
4548
// And write a few logs to it, but do NOT fill it up
4649
for i := 0; i < 5; i++ {
4750
if err := a.Write(app, fmt.Sprintf("message %d", i)); err != nil {
4851
t.Error(err)
4952
}
5053
}
54+
// Sleep for a bit because the adapter queues logs internally and writes them to Redis only when
55+
// there are 50 queued up OR a 1 second timeout has been reached.
56+
time.Sleep(time.Second * 2)
5157
// Read more logs than there are
5258
messages, err := a.Read(app, 8)
5359
if err != nil {
@@ -78,6 +84,9 @@ func TestRedisLogs(t *testing.T) {
7884
t.Error(err)
7985
}
8086
}
87+
// Sleep for a bit because the adapter queues logs internally and writes them to Redis only when
88+
// there are 50 queued up OR a 1 second timeout has been reached.
89+
time.Sleep(time.Second * 2)
8190
// Read more logs than the buffer can hold
8291
messages, err = a.Read(app, 20)
8392
if err != nil {
@@ -101,10 +110,15 @@ func TestRedisDestroy(t *testing.T) {
101110
if err != nil {
102111
t.Error(err)
103112
}
113+
a.Start()
114+
defer a.Stop()
104115
// Write a log to create the file
105116
if err := a.Write(app, "Hello, log!"); err != nil {
106117
t.Error(err)
107118
}
119+
// Sleep for a bit because the adapter queues logs internally and writes them to Redis only when
120+
// there are 50 queued up OR a 1 second timeout has been reached.
121+
time.Sleep(time.Second * 2)
108122
// A redis list should exist for the app
109123
exists, err := a.redisClient.Exists(app).Result()
110124
if err != nil {

storage/ring_buffer_adapter.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ func NewRingBufferAdapter(bufferSize int) (Adapter, error) {
6363
return &ringBufferAdapter{bufferSize: bufferSize, ringBuffers: make(map[string]*ringBuffer)}, nil
6464
}
6565

66+
// Start the storage adapter-- in the case of this implementation, a no-op
67+
func (a *ringBufferAdapter) Start() {
68+
}
69+
6670
// Write adds a log message to to an app-specific ringBuffer
6771
func (a *ringBufferAdapter) Write(app string, message string) error {
6872
// Check first if we might actually have to add to the map of ringBuffer pointers so we can avoid
@@ -110,7 +114,11 @@ func (a *ringBufferAdapter) Destroy(app string) error {
110114
return nil
111115
}
112116

117+
// Reopen the storage adapter-- in the case of this implementation, a no-op
113118
func (a *ringBufferAdapter) Reopen() error {
114-
// No-op
115119
return nil
116120
}
121+
122+
// Stop the storage adapter-- in the case of this implementation, a no-op
123+
func (a *ringBufferAdapter) Stop() {
124+
}

0 commit comments

Comments
 (0)