Skip to content

Commit bddd2c0

Browse files
author
Kent Rancourt
committed
chore(logger): improve drains
1 parent e8ef0ee commit bddd2c0

8 files changed

Lines changed: 142 additions & 129 deletions

File tree

logger/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ repo_path = github.com/deis/deis/logger
66
GO_FILES = $(wildcard *.go)
77
GO_PACKAGES = configurer drain publisher storage syslogish tests weblog
88
GO_PACKAGES_REPO_PATH = $(addprefix $(repo_path)/,$(GO_PACKAGES))
9-
GO_TESTABLE_PACKAGES_REPO_PATH = $(addprefix $(repo_path)/,drain drain/udp drain/tcp storage storage/file storage/ringbuffer)
9+
GO_TESTABLE_PACKAGES_REPO_PATH = $(addprefix $(repo_path)/,drain drain/simple storage storage/file storage/ringbuffer)
1010

1111
COMPONENT = $(notdir $(repo_path))
1212
IMAGE = $(IMAGE_PREFIX)$(COMPONENT):$(BUILD_TAG)

logger/drain/factory.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ import (
44
"fmt"
55
"strings"
66

7-
"github.com/deis/deis/logger/drain/tcp"
8-
"github.com/deis/deis/logger/drain/udp"
7+
"github.com/deis/deis/logger/drain/simple"
98
)
109

1110
// NewDrain returns a pointer to an appropriate implementation of the LogDrain interface, as
@@ -15,19 +14,14 @@ func NewDrain(drainURL string) (LogDrain, error) {
1514
// nil means no drain-- which is valid
1615
return nil, nil
1716
}
18-
if strings.HasPrefix(drainURL, "udp://") || strings.HasPrefix(drainURL, "syslog://") {
19-
drain, err := udp.NewDrain(drainURL)
20-
if err != nil {
21-
return nil, err
22-
}
23-
return drain, nil
24-
}
25-
if strings.HasPrefix(drainURL, "tcp://") {
26-
drain, err := tcp.NewDrain(drainURL)
17+
// Any of these three can use the same drain implementation
18+
if strings.HasPrefix(drainURL, "udp://") || strings.HasPrefix(drainURL, "syslog://") || strings.HasPrefix(drainURL, "tcp://") {
19+
drain, err := simple.NewDrain(drainURL)
2720
if err != nil {
2821
return nil, err
2922
}
3023
return drain, nil
3124
}
25+
// TODO: Add more drain implementations-- TLS over TCP and HTTP/S
3226
return nil, fmt.Errorf("Cannot construct a drain for URL: '%s'", drainURL)
3327
}

logger/drain/factory_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestGetUdpDrain(t *testing.T) {
3333
if err != nil {
3434
t.Error(err)
3535
}
36-
if want, got := "*udp.logDrain", reflect.TypeOf(d).String(); want != got {
36+
if want, got := "*simple.logDrain", reflect.TypeOf(d).String(); want != got {
3737
t.Errorf("Expected a %s, but got a %s", want, got)
3838
}
3939
}
@@ -43,7 +43,7 @@ func TestGetSyslogDrain(t *testing.T) {
4343
if err != nil {
4444
t.Error(err)
4545
}
46-
if want, got := "*udp.logDrain", reflect.TypeOf(d).String(); want != got {
46+
if want, got := "*simple.logDrain", reflect.TypeOf(d).String(); want != got {
4747
t.Errorf("Expected a %s, but got a %s", want, got)
4848
}
4949
}
@@ -53,7 +53,7 @@ func TestGetTcpDrain(t *testing.T) {
5353
if err != nil {
5454
t.Error(err)
5555
}
56-
if want, got := "*tcp.logDrain", reflect.TypeOf(d).String(); want != got {
56+
if want, got := "*simple.logDrain", reflect.TypeOf(d).String(); want != got {
5757
t.Errorf("Expected a %s, but got a %s", want, got)
5858
}
5959
}

logger/drain/simple/drain.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package simple
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"net"
7+
"net/url"
8+
"sync"
9+
"time"
10+
)
11+
12+
// For efficiency, we reuse connections for a while (instead of dialing every time). However,
13+
// there are two compelling reasons to redial periodically:
14+
//
15+
// 1. We don't want DNS changes on the remote end of the drain to go unnoticed for too long.
16+
//
17+
// 2. If the drain is using TCP, the underlying TCP stack can potentially take a very long time
18+
// waiting for acks and retrying send for packets that haven't been acked. This creates a
19+
// large window where packets can be spewed into the ether (without any warning) before the
20+
// problem is detected. By redialing periodically, we create the opportunity for a failed TCP
21+
// handshake-- which tells us sooner that something is wrong.
22+
//
23+
// For efficiency we want the refresh interval to be high. For resiliency, we want it to be low.
24+
// One minute has been arbitrarily selected as a sensible balance of these two concerns.
25+
const connRefreshInterval = 1 * time.Minute
26+
27+
// This determines how many failed dial attempts are required before the drain is muted.
28+
const maxFailedConns = 5
29+
30+
// This determines how much time we're willing to spend dialing.
31+
const dialTimeout = 10 * time.Second
32+
33+
// This is how long the drain is muted for after repeated connection failures.
34+
const mutePeriod = 5 * time.Minute
35+
36+
type logDrain struct {
37+
proto string
38+
uri string
39+
conn net.Conn
40+
muted bool
41+
mutex sync.Mutex
42+
}
43+
44+
// NewDrain returns a pointer to a new instance of a drain.LogDrain
45+
func NewDrain(drainURL string) (*logDrain, error) {
46+
u, err := url.Parse(drainURL)
47+
if err != nil {
48+
return nil, err
49+
}
50+
var proto string
51+
if u.Scheme == "udp" || u.Scheme == "syslog" {
52+
proto = "udp"
53+
} else if u.Scheme == "tcp" {
54+
proto = "tcp"
55+
} else {
56+
return nil, fmt.Errorf("Invalid drain url scheme: %s", u.Scheme)
57+
}
58+
return &logDrain{proto: proto, uri: u.Host + u.Path}, nil
59+
}
60+
61+
// Send forwards the provided log message to an external destination
62+
func (d *logDrain) Send(message string) error {
63+
if d.muted {
64+
return nil
65+
}
66+
d.mutex.Lock()
67+
defer d.mutex.Unlock()
68+
conn, err := d.getConnection(false)
69+
if err != nil {
70+
return err
71+
}
72+
_, err = fmt.Fprintln(conn, message)
73+
if err != nil {
74+
// Try again with a new connection in case the issue was a broken pipe
75+
conn, err = d.getConnection(true)
76+
if err != nil {
77+
return err
78+
}
79+
_, err = fmt.Fprintln(conn, message)
80+
if err != nil {
81+
return err
82+
}
83+
}
84+
return nil
85+
}
86+
87+
// getConnection returns a usable connection, often without needing to redial, but still
88+
// redialing when advised.
89+
func (d *logDrain) getConnection(forceNew bool) (net.Conn, error) {
90+
// If we have a connection, it's not old, and we're not focing a new one...
91+
if d.conn != nil && !forceNew {
92+
// then return the existing connection
93+
return d.conn, nil
94+
}
95+
// If ANY of those conditions weren't met, it's time for a new connection.
96+
// If we have an existing one, close it and nil it out, too for good measure.
97+
if d.conn != nil {
98+
if err := d.conn.Close(); err != nil {
99+
log.Println("drain: Error closing connection. Drain may be leaking connections.", err)
100+
}
101+
d.conn = nil
102+
}
103+
// Try a few times...
104+
var err error
105+
for attempt := 1; attempt <= maxFailedConns; attempt++ {
106+
d.conn, err = net.DialTimeout(d.proto, d.uri, dialTimeout)
107+
if err == nil {
108+
// We got our connection...
109+
// Make it good for only so long. See comment above on connRefreshInterval.
110+
err = d.conn.SetWriteDeadline(time.Now().Add(connRefreshInterval))
111+
if err != nil {
112+
return nil, err
113+
}
114+
// Break out of the loop and return
115+
return d.conn, nil
116+
}
117+
}
118+
// Multiple attempts to dial have failed. Whatever the problem is, we shouldn't expect that
119+
// it will resolve itself quickly.
120+
log.Printf("drain: Experienced %d consecutive failed connection attempts; muting drain for %s.", maxFailedConns, mutePeriod)
121+
// Immediately "mute" the drain. This will prevent us from wasting resources repeatedly dialing
122+
// and failing while the message queue gets backed up. This will give the network a break and
123+
// allow us to empty the queue.
124+
d.muted = true
125+
// Unmute the drain when the mute interval has elapsed
126+
go func() {
127+
time.Sleep(mutePeriod)
128+
d.muted = false
129+
}()
130+
// Return the error from the last failed connection attempt
131+
return nil, err
132+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package tcp
1+
package simple
22

33
import (
44
"fmt"

logger/drain/tcp/drain.go

Lines changed: 0 additions & 50 deletions
This file was deleted.

logger/drain/udp/drain.go

Lines changed: 0 additions & 50 deletions
This file was deleted.

logger/drain/udp/drain_test.go

Lines changed: 0 additions & 13 deletions
This file was deleted.

0 commit comments

Comments
 (0)