Skip to content

Commit a86818e

Browse files
author
Kent Rancourt
committed
fix(logger): improve drain efficiency
1 parent 386ff9e commit a86818e

3 files changed

Lines changed: 86 additions & 26 deletions

File tree

logger/drain/tcp/drain.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,16 @@ import (
44
"fmt"
55
"net"
66
"net/url"
7+
"sync"
78
)
89

10+
const maxConnUses = 100
11+
912
type logDrain struct {
10-
uri string
13+
uri string
14+
conn *net.Conn
15+
useCount int
16+
mutex sync.RWMutex
1117
}
1218

1319
// NewDrain returns a pointer to a new instance of a TCP-based drain.LogDrain.
@@ -24,11 +30,21 @@ func NewDrain(drainURL string) (*logDrain, error) {
2430

2531
// Send forwards the provided log message to an external destination using TCP for transport.
2632
func (d *logDrain) Send(message string) error {
27-
conn, err := net.Dial("tcp", d.uri)
28-
if err != nil {
29-
return fmt.Errorf("Error dialing log drain at %s over tcp", d.uri)
33+
d.mutex.Lock()
34+
defer d.mutex.Unlock()
35+
if d.useCount == maxConnUses {
36+
(*d.conn).Close()
37+
d.conn = nil
38+
d.useCount = 0
39+
}
40+
if d.conn == nil {
41+
conn, err := net.Dial("tcp", d.uri)
42+
if err != nil {
43+
return fmt.Errorf("Error dialing log drain at %s over tcp: %s", d.uri, err)
44+
}
45+
d.conn = &conn
3046
}
31-
defer conn.Close()
32-
fmt.Fprintln(conn, message)
47+
fmt.Fprintln(*d.conn, message)
48+
d.useCount++
3349
return nil
3450
}

logger/drain/udp/drain.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,16 @@ import (
44
"fmt"
55
"net"
66
"net/url"
7+
"sync"
78
)
89

10+
const maxConnUses = 100
11+
912
type logDrain struct {
10-
uri string
13+
uri string
14+
conn *net.Conn
15+
useCount int
16+
mutex sync.RWMutex
1117
}
1218

1319
// NewDrain returns a pointer to a new instance of a UDP-based drain.LogDrain.
@@ -24,11 +30,21 @@ func NewDrain(drainURL string) (*logDrain, error) {
2430

2531
// Send forwards the provided log message to an external destination using UDP for transport.
2632
func (d *logDrain) Send(message string) error {
27-
conn, err := net.Dial("udp", d.uri)
28-
if err != nil {
29-
return fmt.Errorf("Error dialing log drain at %s over udp", d.uri)
33+
d.mutex.Lock()
34+
defer d.mutex.Unlock()
35+
if d.useCount == maxConnUses {
36+
(*d.conn).Close()
37+
d.conn = nil
38+
d.useCount = 0
39+
}
40+
if d.conn == nil {
41+
conn, err := net.Dial("udp", d.uri)
42+
if err != nil {
43+
return fmt.Errorf("Error dialing log drain at %s over udp: %s", d.uri, err)
44+
}
45+
d.conn = &conn
3046
}
31-
defer conn.Close()
32-
fmt.Fprintln(conn, message)
47+
fmt.Fprintln(*d.conn, message)
48+
d.useCount++
3349
return nil
3450
}

logger/syslogish/server.go

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"github.com/deis/deis/logger/storage"
1414
)
1515

16+
const queueSize = 500
17+
1618
var appRegex *regexp.Regexp
1719

1820
func init() {
@@ -26,8 +28,9 @@ func init() {
2628
type Server struct {
2729
conn net.PacketConn
2830
listening bool
29-
queue chan string
31+
storageQueue chan string
3032
storageAdapter storage.Adapter
33+
drainageQueue chan string
3134
drain drain.LogDrain
3235
adapterMutex sync.RWMutex
3336
drainMutex sync.RWMutex
@@ -43,7 +46,11 @@ func NewServer(bindHost string, bindPort int) (*Server, error) {
4346
if err != nil {
4447
return nil, err
4548
}
46-
return &Server{conn: c, queue: make(chan string, 1000)}, nil
49+
return &Server{
50+
conn: c,
51+
storageQueue: make(chan string, queueSize),
52+
drainageQueue: make(chan string, queueSize),
53+
}, nil
4754
}
4855

4956
// SetStorageAdapter permits a server's underlying storage.Adapter to be reconfigured (replaced)
@@ -71,7 +78,8 @@ func (s *Server) Listen() {
7178
if !s.listening {
7279
s.listening = true
7380
go s.receive()
74-
go s.process()
81+
go s.processStorage()
82+
go s.processDrainage()
7583
log.Println("syslogish server running")
7684
}
7785
}
@@ -86,22 +94,21 @@ func (s *Server) receive() {
8694
}
8795
message := strings.TrimSuffix(string(buf[:n]), "\n")
8896
select {
89-
case s.queue <- message:
97+
case s.storageQueue <- message:
9098
default:
9199
}
92100
}
93101
}
94102

95-
func (s *Server) process() {
96-
for message := range s.queue {
103+
func (s *Server) processStorage() {
104+
for message := range s.storageQueue {
97105
app, err := getAppName(message)
98106
if err != nil {
99107
log.Println(err)
100108
return
101109
}
102-
// Get a read lock to ensure the storage adapater pointer can't be updated by another
103-
// goroutine in the time between we check if it's nil and the time we invoke .Write() upon
104-
// it.
110+
// Get a read lock to ensure the storage adapater pointer can't be nilled by the configurer
111+
// in the time between we check if it's nil and the time we invoke .Write() upon it.
105112
s.adapterMutex.RLock()
106113
// DONT'T defer unlocking... defered statements are executed when the function returns, but
107114
// we are inside an infinite loop here. If we defer, we would never release the lock.
@@ -118,16 +125,37 @@ func (s *Server) process() {
118125
// Treating this as a fatal event would cause the deis-logger unit to restart-- sending
119126
// even more log messages to STDOUT. The overall effect would be the same as described
120127
// above with the added disadvantages of flapping.
121-
//
122-
// But, do not return preemptively. It's possible the message can still be sent to
123-
// the drain.
124128
}
125129
s.adapterMutex.RUnlock()
126-
// Same story as above for the lock on the drain
130+
// Add the message to the drainage queue. This allows the storage loop to continue right
131+
// away instead of waiting while the message is sent to an external service-- since that
132+
// could be a bottleneck and error prone depending on rate limiting, network congestion, etc.
133+
select {
134+
case s.drainageQueue <- message:
135+
default:
136+
}
137+
}
138+
}
139+
140+
func (s *Server) processDrainage() {
141+
for message := range s.drainageQueue {
142+
// Get a read lock to ensure the drain pointer can't be nilled by the configurer in the time
143+
// between we check if it's nil and the time we invoke .Send() upon it.
127144
s.drainMutex.RLock()
145+
// DONT'T defer unlocking... defered statements are executed when the function returns, but
146+
// we are inside an infinite loop here. If we defer, we would never release the lock.
147+
// Instead, release it manually below.
128148
if s.drain != nil {
129149
s.drain.Send(message)
130-
// We don't bother trapping errors here. The rationale is the same as above.
150+
// We don't bother trapping errors here, so failed sends to the drain are silent. This is
151+
// by design. If we sent a log message to STDOUT in response to the failure, deis-logspout
152+
// would read it and forward it back to deis-logger, which would fail again to send to the
153+
// drain and spawn ANOTHER log message. The effect would be an infinite loop of undrainable
154+
// log messages that would nevertheless fill up journal logs and eventually overake the disk.
155+
//
156+
// Treating this as a fatal event would cause the deis-logger unit to restart-- sending
157+
// even more log messages to STDOUT. The overall effect would be the same as described
158+
// above with the added disadvantages of flapping.
131159
}
132160
s.drainMutex.RUnlock()
133161
}

0 commit comments

Comments
 (0)