Skip to content

Commit bdea672

Browse files
author
Matthew Fisher
committed
Merge pull request #3660 from kalbasit/issue_3657
fix(logger): avoid hitting etcd for each log line
2 parents 7830ade + 4cf9cab commit bdea672

3 files changed

Lines changed: 59 additions & 42 deletions

File tree

logger/drain/drain.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,8 @@ import (
66
"net"
77
"net/url"
88
"os"
9-
10-
"github.com/coreos/go-etcd/etcd"
119
)
1210

13-
func GetDrain() string {
14-
host := getopt("HOST", "127.0.0.1")
15-
16-
etcdPort := getopt("ETCD_PORT", "4001")
17-
etcdPath := getopt("ETCD_PATH", "/deis/logs")
18-
19-
client := etcd.NewClient([]string{"http://" + host + ":" + etcdPort})
20-
21-
s, err := client.Get(etcdPath+"/drain", true, false)
22-
if err != nil {
23-
return ""
24-
}
25-
26-
return s.Node.Value
27-
}
28-
2911
func SendToDrain(m string, drain string) error {
3012
u, err := url.Parse(drain)
3113
if err != nil {

logger/main.go

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
var (
1919
logAddr string
2020
logPort int
21+
drainURI string
2122
enablePublish bool
2223
publishHost string
2324
publishPath string
@@ -29,6 +30,7 @@ var (
2930
func init() {
3031
flag.StringVar(&logAddr, "log-addr", "0.0.0.0", "bind address for the logger")
3132
flag.IntVar(&logPort, "log-port", 514, "bind port for the logger")
33+
flag.StringVar(&drainURI, "drain-uri", "", "default drainURI, once set in etcd, this has no effect.")
3234
flag.StringVar(&syslogd.LogRoot, "log-root", "/data/logs", "log path to store logs")
3335
flag.BoolVar(&enablePublish, "enable-publish", false, "enable publishing to service discovery")
3436
flag.StringVar(&publishHost, "publish-host", getopt("HOST", "127.0.0.1"), "service discovery hostname")
@@ -43,26 +45,49 @@ func main() {
4345

4446
client := etcd.NewClient([]string{"http://" + publishHost + ":" + publishPort})
4547

46-
// Wait for terminating signal
47-
exitChan := make(chan os.Signal, 2)
48+
signalChan := make(chan os.Signal, 1)
49+
drainRespChan := make(chan *etcd.Response)
50+
drainChan := make(chan string)
51+
stopChan := make(chan bool)
52+
exitChan := make(chan bool)
4853
cleanupChan := make(chan bool)
49-
signal.Notify(exitChan, syscall.SIGTERM, syscall.SIGINT)
54+
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT)
5055

51-
go syslogd.Listen(exitChan, cleanupChan, fmt.Sprintf("%s:%d", logAddr, logPort))
56+
// ensure the drain key exists in etcd.
57+
if _, err := client.Get(publishPath+"/drain", false, false); err != nil {
58+
setEtcd(client, publishPath+"/drain", drainURI, 0)
59+
}
5260

61+
go client.Watch(publishPath+"/drain", 0, false, drainRespChan, stopChan)
62+
go syslogd.Listen(exitChan, cleanupChan, drainChan, fmt.Sprintf("%s:%d", logAddr, logPort))
5363
if enablePublish {
54-
go publishService(client, publishHost, publishPath, strconv.Itoa(logPort), uint64(time.Duration(publishTTL).Seconds()))
64+
go publishService(exitChan, client, publishHost, publishPath, strconv.Itoa(logPort), uint64(time.Duration(publishTTL).Seconds()))
5565
}
5666

57-
// Wait for the proper shutdown of the syslog server before exit
58-
<-cleanupChan
67+
for {
68+
select {
69+
case er := <-drainRespChan:
70+
drainChan <- er.Node.Value
71+
case <-signalChan:
72+
close(exitChan)
73+
stopChan <- true
74+
case <-cleanupChan:
75+
return
76+
}
77+
}
5978
}
6079

61-
func publishService(client *etcd.Client, host string, etcdPath string, port string, ttl uint64) {
80+
func publishService(exitChan chan bool, client *etcd.Client, host string, etcdPath string, port string, ttl uint64) {
81+
t := time.NewTicker(time.Duration(publishInterval))
82+
6283
for {
63-
setEtcd(client, etcdPath+"/host", host, ttl)
64-
setEtcd(client, etcdPath+"/port", port, ttl)
65-
time.Sleep(time.Duration(publishInterval))
84+
select {
85+
case <-t.C:
86+
setEtcd(client, etcdPath+"/host", host, ttl)
87+
setEtcd(client, etcdPath+"/port", port, ttl)
88+
case <-exitChan:
89+
return
90+
}
6691
}
6792
}
6893

logger/syslogd/syslogd.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ import (
1313
"github.com/deis/deis/logger/drain"
1414
)
1515

16+
// LogRoot is the log path to store logs.
1617
var LogRoot string
1718

1819
type handler struct {
1920
// To simplify implementation of our handler we embed helper
2021
// syslog.BaseHandler struct.
2122
*syslog.BaseHandler
23+
drainURI string
2224
}
2325

2426
// Simple fiter for named/bind messages which can be used with BaseHandler
@@ -27,8 +29,11 @@ func filter(m syslog.SyslogMessage) bool {
2729
}
2830

2931
func newHandler() *handler {
30-
h := handler{syslog.NewBaseHandler(5, filter, false)}
31-
go h.mainLoop() // BaseHandler needs some gorutine that reads from its queue
32+
h := handler{
33+
BaseHandler: syslog.NewBaseHandler(5, filter, false),
34+
}
35+
36+
go h.mainLoop() // BaseHandler needs some goroutine that reads from its queue
3237
return &h
3338
}
3439

@@ -84,9 +89,8 @@ func (h *handler) mainLoop() {
8489
if m == nil {
8590
break
8691
}
87-
d := drain.GetDrain()
88-
if d != "" {
89-
drain.SendToDrain(m.String(), d)
92+
if h.drainURI != "" {
93+
drain.SendToDrain(m.String(), h.drainURI)
9094
}
9195
err := writeToDisk(m)
9296
if err != nil {
@@ -97,7 +101,7 @@ func (h *handler) mainLoop() {
97101
}
98102

99103
// Listen starts a new syslog server which runs until it receives a signal.
100-
func Listen(signalChan chan os.Signal, cleanupDone chan bool, bindAddr string) {
104+
func Listen(exitChan, cleanupDone chan bool, drainChan chan string, bindAddr string) {
101105
fmt.Println("Starting syslog...")
102106
// If LogRoot doesn't exist, create it
103107
// equivalent to Python's `if not os.path.exists(filename)`
@@ -106,18 +110,24 @@ func Listen(signalChan chan os.Signal, cleanupDone chan bool, bindAddr string) {
106110
log.Fatalf("unable to create LogRoot at %s: %v", LogRoot, err)
107111
}
108112
}
109-
// Create a server with one handler and run one listen gorutine
113+
// Create a server with one handler and run one listen goroutine
110114
s := syslog.NewServer()
111-
s.AddHandler(newHandler())
115+
h := newHandler()
116+
s.AddHandler(h)
112117
s.Listen(bindAddr)
113118
fmt.Println("Syslog server started...")
114119
fmt.Println("deis-logger running")
115120

116121
// Wait for terminating signal
117-
for _ = range signalChan {
118-
// Shutdown the server
119-
fmt.Println("Shutting down...")
120-
s.Shutdown()
121-
cleanupDone <- true
122+
for {
123+
select {
124+
case <-exitChan:
125+
// Shutdown the server
126+
fmt.Println("Shutting down...")
127+
s.Shutdown()
128+
cleanupDone <- true
129+
case d := <-drainChan:
130+
h.drainURI = d
131+
}
122132
}
123133
}

0 commit comments

Comments
 (0)