Skip to content
This repository was archived by the owner on Jun 25, 2025. It is now read-only.

Commit 9bd19a3

Browse files
author
Matthew Fisher
committed
fix(weblog): fix up CPU issues
The way we spawned and checked to ensure the weblog server was running was a bit backwards, causing the logger to run with massive CPU usage after a few hours because of the state loop. Changing the server logic to instead defer to net/http for serving requests and shutting down the connection at the net.Listener reduces CPU usage significantly. Additionally the Redis adapter had a similar issue; creating connections, writing every second then closing the connection. Re-using the connection will reduce CPU cycle usage significantly.
1 parent fc4fcbc commit 9bd19a3

5 files changed

Lines changed: 178 additions & 66 deletions

File tree

main.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package main
22

33
import (
44
l "log"
5+
"net/http"
6+
7+
_ "net/http/pprof"
58

69
"github.com/deis/logger/log"
710
"github.com/deis/logger/storage"
@@ -29,16 +32,19 @@ func main() {
2932
if err != nil {
3033
l.Fatal("Error starting log aggregator: ", err)
3134
}
35+
defer aggregator.Stop()
3236
l.Println("Log aggregator running")
3337

34-
weblogServer, err := weblog.NewServer(storageAdapter)
35-
if err != nil {
36-
l.Fatal("Error creating weblog server: ", err)
37-
}
38-
serverErrCh := weblogServer.Listen()
39-
l.Println("Weblog server running")
38+
weblogServer := weblog.NewServer(storageAdapter)
39+
weblogServer.Start()
40+
defer weblogServer.Close()
41+
l.Printf("Weblog server serving at %s\n", weblogServer.URL)
42+
43+
// start a Go Profiler
44+
go func() {
45+
l.Println(http.ListenAndServe("0.0.0.0:8099", nil))
46+
}()
4047

41-
defer aggregator.Stop()
4248
stoppedCh := aggregator.Stopped()
4349
select {
4450
case stopErr := <-stoppedCh:
@@ -47,7 +53,5 @@ func main() {
4753
} else {
4854
l.Fatal("Log aggregator has stopped with no error")
4955
}
50-
case serverErr := <-serverErrCh:
51-
l.Fatal("Weblog server failed: ", serverErr)
5256
}
5357
}

storage/redis_adapter.go

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,9 @@ func (mp messagePipeliner) execPipeline() {
5454
mp.errCh <- fmt.Errorf("Error adding ltrim of %s to the pipeline: %s", app, err)
5555
}
5656
}
57-
go func() {
58-
defer mp.pipeline.Close()
59-
if _, err := mp.pipeline.Exec(); err != nil {
60-
mp.errCh <- fmt.Errorf("Error executing pipeline: %s", err)
61-
}
62-
}()
57+
if _, err := mp.pipeline.Exec(); err != nil {
58+
mp.errCh <- fmt.Errorf("Error executing pipeline: %s", err)
59+
}
6360
}
6461

6562
type redisAdapter struct {
@@ -105,29 +102,20 @@ func (a *redisAdapter) Start() {
105102
errCh := make(chan error)
106103
mp := newMessagePipeliner(a.bufferSize, a.redisClient, a.config.PipelineTimeout, errCh)
107104
go func() {
105+
defer mp.pipeline.Close()
108106
for {
109107
select {
110108
case err := <-errCh:
111109
log.Println(err)
112110
case <-a.stopCh:
113111
return
114-
}
115-
}
116-
}()
117-
go func() {
118-
for {
119-
select {
120112
case message := <-a.messageChannel:
121113
mp.addMessage(message)
122114
if mp.messageCount == a.config.PipelineLength {
123-
mp.execPipeline()
124-
mp = newMessagePipeliner(a.bufferSize, a.redisClient, a.config.PipelineTimeout, errCh)
115+
go mp.execPipeline()
125116
}
126117
case <-mp.timeoutTicker.C:
127-
mp.execPipeline()
128-
mp = newMessagePipeliner(a.bufferSize, a.redisClient, a.config.PipelineTimeout, errCh)
129-
case <-a.stopCh:
130-
return
118+
go mp.execPipeline()
131119
}
132120
}
133121
}()

weblog/router.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,10 @@
11
package weblog
22

33
import (
4-
"log"
5-
"net/http"
6-
7-
_ "net/http/pprof"
8-
94
"github.com/gorilla/mux"
105
)
116

127
func newRouter(rh *requestHandler) *mux.Router {
13-
14-
go func() {
15-
log.Println(http.ListenAndServe("0.0.0.0:8099", nil))
16-
}()
17-
188
r := mux.NewRouter()
199
r.HandleFunc("/healthz", rh.getHealthz).Methods("GET")
2010
r.HandleFunc("/healthz/", rh.getHealthz).Methods("GET")

weblog/server.go

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,49 +2,65 @@ package weblog
22

33
import (
44
"fmt"
5-
"log"
5+
"net"
66
"net/http"
77

8-
"github.com/gorilla/mux"
9-
108
"github.com/deis/logger/storage"
119
)
1210

1311
const (
14-
bindHost = "0.0.0.0"
15-
bindPort = 8088
12+
bindAddr = "0.0.0.0:8088"
1613
)
1714

18-
// Server implements a simple HTTP server that handles GET and DELETE requests for application
19-
// logs. These actions are accomplished by delegating to a storage.Adapter.
15+
// Server implements an HTTP server.
2016
type Server struct {
21-
listening bool
22-
router *mux.Router
23-
errCh chan error
17+
// The Listener used for incoming HTTP connections
18+
Listener net.Listener
19+
// Server may be changed before calling Start()
20+
Server *http.Server
21+
// base URL of form http://ipaddr:port with no trailing slash
22+
URL string
23+
// started defines whether the server has started or not.
24+
started bool
2425
}
2526

26-
// NewServer returns a pointer to a new Server instance.
27-
func NewServer(storageAdapter storage.Adapter) (*Server, error) {
28-
return &Server{
29-
router: newRouter(newRequestHandler(storageAdapter)),
30-
}, nil
27+
// New returns a new HTTP Server. The caller should call Start to start it and Close when finished
28+
// to shut it down.
29+
func NewServer(storageAdapter storage.Adapter) *Server {
30+
s := &Server{
31+
Listener: defaultListener(),
32+
Server: &http.Server{Handler: newRouter(newRequestHandler(storageAdapter))},
33+
}
34+
return s
3135
}
3236

33-
// Listen starts the server's main loop.
34-
func (s *Server) Listen() <-chan error {
35-
// Should only ever be called once
36-
if !s.listening {
37-
s.listening = true
38-
go func() {
39-
s.errCh <- s.listen()
40-
}()
41-
log.Printf("weblog server running on %s:%d", bindHost, bindPort)
37+
// Start starts an HTTP server.
38+
func (s *Server) Start() {
39+
if s.started {
40+
panic("weblog: server already started")
41+
}
42+
if s.URL == "" {
43+
s.URL = "http://" + s.Listener.Addr().String()
4244
}
43-
return s.errCh
45+
go func() {
46+
s.Server.Serve(s.Listener)
47+
}()
48+
s.started = true
4449
}
4550

46-
func (s *Server) listen() error {
47-
mux := http.NewServeMux()
48-
mux.Handle("/", s.router)
49-
return http.ListenAndServe(fmt.Sprintf("%s:%d", bindHost, bindPort), mux)
51+
// Close closes the HTTP Server from listening for the inbound requests.
52+
func (s *Server) Close() {
53+
s.Server.SetKeepAlivesEnabled(false)
54+
s.Listener.Close()
55+
s.started = false
56+
}
57+
58+
// defaultListener provides a net.Listener on bindAddr, panicking if it cannot listen on that
59+
// address.
60+
func defaultListener() net.Listener {
61+
l, err := net.Listen("tcp", bindAddr)
62+
if err != nil {
63+
panic(fmt.Sprintf("weblog: failed to listen on %v: %v", bindAddr, err))
64+
}
65+
return l
5066
}

weblog/server_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package weblog
2+
3+
import (
4+
"bufio"
5+
"fmt"
6+
"net"
7+
"net/http"
8+
"strings"
9+
"testing"
10+
11+
"github.com/deis/logger/storage"
12+
)
13+
14+
// TODO(bacongobbler): stop relying that port 6666 is not in use
15+
var testBindAddr string = "127.0.0.1:6666"
16+
17+
// testListener provides a net.Listener for testing, panicking if it cannot listen on that
18+
// address.
19+
func newTestListener(t *testing.T) net.Listener {
20+
l, err := net.Listen("tcp", testBindAddr)
21+
if err != nil {
22+
t.Fatalf("failed to listen on %s: %v", testBindAddr, err)
23+
}
24+
return l
25+
}
26+
27+
func newTestStorageAdapter(t *testing.T) storage.Adapter {
28+
storageAdapter, err := storage.NewAdapter("memory", 1)
29+
if err != nil {
30+
t.Fatalf("Error creating storage adapter: %v", err)
31+
}
32+
return storageAdapter
33+
}
34+
35+
func TestServerStart(t *testing.T) {
36+
storageAdapter := newTestStorageAdapter(t)
37+
storageAdapter.Start()
38+
defer storageAdapter.Stop()
39+
40+
s := &Server{
41+
Listener: newTestListener(t),
42+
Server: &http.Server{Handler: newRouter(newRequestHandler(storageAdapter))},
43+
}
44+
45+
s.Start()
46+
47+
conn, err := net.Dial("tcp", testBindAddr)
48+
if err != nil {
49+
t.Fatalf("could not connect to test server: %v", err)
50+
}
51+
defer conn.Close()
52+
fmt.Fprintf(conn, "GET /healthz HTTP/1.0\r\n\r\n")
53+
status, err := bufio.NewReader(conn).ReadString('\n')
54+
if err != nil {
55+
t.Errorf("there was an error reading from the response: %v", err)
56+
}
57+
if !strings.Contains(status, "200 OK") {
58+
t.Errorf("Did not receive 200 OK, got '%s'", status)
59+
}
60+
61+
// explicitly close the connection so other tests can run
62+
s.Close()
63+
}
64+
65+
func TestServerClose(t *testing.T) {
66+
storageAdapter := newTestStorageAdapter(t)
67+
storageAdapter.Start()
68+
defer storageAdapter.Stop()
69+
70+
s := &Server{
71+
Listener: newTestListener(t),
72+
Server: &http.Server{Handler: newRouter(newRequestHandler(storageAdapter))},
73+
}
74+
75+
s.Start()
76+
s.Close()
77+
78+
// try reading from the server, expecting it to fail
79+
_, err := net.Dial("tcp", testBindAddr)
80+
if err == nil {
81+
t.Error("server returned nil. Calling s.Close() did not close the server connection!")
82+
}
83+
}
84+
85+
func TestServerURL(t *testing.T) {
86+
storageAdapter := newTestStorageAdapter(t)
87+
storageAdapter.Start()
88+
defer storageAdapter.Stop()
89+
90+
s := &Server{
91+
Listener: newTestListener(t),
92+
Server: &http.Server{Handler: newRouter(newRequestHandler(storageAdapter))},
93+
URL: "foo",
94+
}
95+
96+
s.Start()
97+
98+
if s.URL != "foo" {
99+
t.Errorf("URL is not 'foo', got '%s'", s.URL)
100+
}
101+
102+
s.Close()
103+
104+
s.URL = ""
105+
106+
s.Start()
107+
108+
if s.URL != "http://" + testBindAddr {
109+
t.Errorf("URL is not 'http://%s', got '%s'", testBindAddr, s.URL)
110+
}
111+
112+
// explicitly close the connection so other tests can run
113+
s.Close()
114+
}

0 commit comments

Comments
 (0)