Skip to content

Commit ef8e07e

Browse files
author
Matthew Fisher
committed
Merge pull request #1992 from bacongobbler/logspout
feat(logspout): introduce deis/logspout
2 parents df245aa + cbb3b0f commit ef8e07e

27 files changed

Lines changed: 1057 additions & 244 deletions

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
include includes.mk
66

7-
COMPONENTS=builder cache controller database logger publisher registry router
8-
START_ORDER=publisher logger database cache registry controller builder router
7+
COMPONENTS=builder cache controller database logger logspout publisher registry router
8+
START_ORDER=publisher logger logspout database cache registry controller builder router
99
CLIENTS=client deisctl
1010

1111
all: build run

controller/scheduler/coreos.py

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import cStringIO
22
import base64
33
import copy
4-
import functools
54
import json
65
import httplib
76
import paramiko
@@ -105,7 +104,6 @@ def create(self, name, image, command='', template=None, **kwargs):
105104
"""Create a container"""
106105
self._create_container(name, image, command,
107106
template or copy.deepcopy(CONTAINER_TEMPLATE), **kwargs)
108-
self._create_log(name, image, command, copy.deepcopy(LOG_TEMPLATE))
109107

110108
def _create_container(self, name, image, command, unit, **kwargs):
111109
l = locals().copy()
@@ -138,15 +136,6 @@ def _create_container(self, name, image, command, unit, **kwargs):
138136
# post unit to fleet
139137
self._put_unit(name, {"desiredState": "launched", "options": unit})
140138

141-
def _create_log(self, name, image, command, unit):
142-
l = locals().copy()
143-
l.update(re.match(MATCH, name).groupdict())
144-
# construct unit from template
145-
for f in unit:
146-
f['value'] = f['value'].format(**l)
147-
# post unit to fleet
148-
self._put_unit(name+'-log', {"desiredState": "launched", "options": unit})
149-
150139
def start(self, name):
151140
"""Start a container"""
152141
self._wait_for_container(name)
@@ -181,23 +170,16 @@ def stop(self, name):
181170

182171
def destroy(self, name):
183172
"""Destroy a container"""
184-
funcs = []
185-
funcs.append(functools.partial(self._destroy_container, name))
186-
funcs.append(functools.partial(self._destroy_log, name))
187173
# call all destroy functions, ignoring any errors
188-
for f in funcs:
189-
try:
190-
f()
191-
except:
192-
pass
174+
try:
175+
self._destroy_container(name)
176+
except:
177+
pass
193178
self._wait_for_destroy(name)
194179

195180
def _destroy_container(self, name):
196181
return self._delete_unit(name)
197182

198-
def _destroy_log(self, name):
199-
return self._delete_unit(name+'-log')
200-
201183
def run(self, name, image, entrypoint, command): # noqa
202184
"""Run a one-off command"""
203185
self._create_container(name, image, command, copy.deepcopy(RUN_TEMPLATE),
@@ -309,16 +291,6 @@ def attach(self, name):
309291
]
310292

311293

312-
LOG_TEMPLATE = [
313-
{"section": "Unit", "name": "Description", "value": "{name} log"},
314-
{"section": "Unit", "name": "BindsTo", "value": "{name}.service"},
315-
{"section": "Service", "name": "ExecStartPre", "value": '''/bin/sh -c "until docker inspect {name} >/dev/null 2>&1; do sleep 1; done"'''}, # noqa
316-
{"section": "Service", "name": "ExecStart", "value": '''/bin/sh -c "docker logs -f {name} 2>&1 | logger -p local0.info -t {app}[{c_type}.{c_num}] --udp --server $(etcdctl get /deis/logs/host) --port $(etcdctl get /deis/logs/port)"'''}, # noqa
317-
{"section": "Service", "name": "TimeoutStartSec", "value": "20m"},
318-
{"section": "X-Fleet", "name": "MachineOf", "value": "{name}.service"},
319-
]
320-
321-
322294
RUN_TEMPLATE = [
323295
{"section": "Unit", "name": "Description", "value": "{name} admin command"},
324296
{"section": "Service", "name": "ExecStartPre", "value": '''/bin/sh -c "IMAGE=$(etcdctl get /deis/registry/host 2>&1):$(etcdctl get /deis/registry/port 2>&1)/{image}; docker pull $IMAGE"'''}, # noqa

deisctl/cmd/cmd.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func startDefaultServices(b backend.Backend) error {
9292
if err := Start(b, []string{"logger@1"}); err != nil {
9393
return err
9494
}
95-
if err := Start(b, []string{"publisher", "cache@1", "router@1", "database@1", "controller@1", "registry@1", "builder@1"}); err != nil {
95+
if err := Start(b, []string{"publisher", "logspout", "cache@1", "router@1", "database@1", "controller@1", "registry@1", "builder@1"}); err != nil {
9696
return err
9797
}
9898
fmt.Println("Service containers launched.")
@@ -118,7 +118,7 @@ func StopPlatform(b backend.Backend) error {
118118

119119
func stopDefaultServices(b backend.Backend) error {
120120
fmt.Println("Stopping service containers...")
121-
if err := Stop(b, []string{"publisher", "builder@1", "registry@1", "controller@1", "database@1", "cache@1", "router@1", "logger@1"}); err != nil {
121+
if err := Stop(b, []string{"publisher", "logspout", "builder@1", "registry@1", "controller@1", "database@1", "cache@1", "router@1", "logger@1"}); err != nil {
122122
return err
123123
}
124124
fmt.Println("Service containers stopped.")
@@ -196,7 +196,7 @@ func installDefaultServices(b backend.Backend) error {
196196
if err := Scale(b, targets); err != nil {
197197
return err
198198
}
199-
if err := b.Create([]string{"publisher"}); err != nil {
199+
if err := b.Create([]string{"publisher", "logspout"}); err != nil {
200200
return err
201201
}
202202
fmt.Println("Service containers scheduled.")
@@ -226,7 +226,7 @@ func uninstallAllServices(b backend.Backend) error {
226226
if err := Scale(b, targets); err != nil {
227227
return err
228228
}
229-
if err := b.Destroy([]string{"publisher"}); err != nil {
229+
if err := b.Destroy([]string{"publisher", "logspout"}); err != nil {
230230
return err
231231
}
232232
fmt.Println("Service containers destroyed.")
@@ -309,6 +309,7 @@ Options:
309309
"deis-database-data.service",
310310
"deis-logger.service",
311311
"deis-logger-data.service",
312+
"deis-logspout.service",
312313
"deis-publisher.service",
313314
"deis-registry.service",
314315
"deis-registry-data.service",
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[Unit]
2+
Description=deis-logspout
3+
Requires=docker.socket etcd.service
4+
After=docker.socket etcd.service
5+
6+
[Service]
7+
EnvironmentFile=/etc/environment
8+
TimeoutStartSec=20m
9+
ExecStartPre=/bin/sh -c "IMAGE=`/run/deis/bin/get_image /deis/logspout`; docker history $IMAGE >/dev/null || docker pull $IMAGE"
10+
ExecStartPre=/bin/sh -c "docker inspect deis-logspout >/dev/null && docker rm -f deis-logspout || true"
11+
ExecStart=/bin/sh -c "IMAGE=`/run/deis/bin/get_image /deis/logspout` && docker run --name deis-logspout --rm -v /var/run/docker.sock:/tmp/docker.sock -e ETCD_HOST=$COREOS_PRIVATE_IPV4 -e HOST=$COREOS_PRIVATE_IPV4 -e DEBUG=1 $IMAGE"
12+
13+
[Install]
14+
WantedBy=multi-user.target
15+
16+
[X-Fleet]
17+
Global=true

deisctl/units/deis-publisher.service

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ TimeoutStartSec=20m
99
ExecStartPre=/bin/sh -c "IMAGE=`/run/deis/bin/get_image /deis/publisher`; docker history $IMAGE >/dev/null || docker pull $IMAGE"
1010
ExecStartPre=/bin/sh -c "docker inspect deis-publisher >/dev/null && docker rm -f deis-publisher || true"
1111
ExecStart=/bin/sh -c "IMAGE=`/run/deis/bin/get_image /deis/publisher` && docker run --name deis-publisher --rm -e HOST=$COREOS_PRIVATE_IPV4 -e ETCD_HOST=$COREOS_PRIVATE_IPV4 -v /var/run/docker.sock:/var/run/docker.sock $IMAGE"
12-
ExecStopPost=/usr/bin/docker stop deis-publisher
1312

1413
[Install]
1514
WantedBy=multi-user.target

logger/syslog/filehandler.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type FileHandler struct {
1919

2020
// NewFileHandler accepts all arguments expected by NewBaseHandler plus
2121
// filename which is the path to the log file.
22-
func NewFileHandler(filename string, qlen int, filter func(*Message) bool,
22+
func NewFileHandler(filename string, qlen int, filter func(SyslogMessage) bool,
2323
ft bool) *FileHandler {
2424

2525
h := &FileHandler{
@@ -61,7 +61,7 @@ func (h *FileHandler) mainLoop() {
6161
}
6262
}
6363

64-
func (h *FileHandler) saveMessage(m *Message) {
64+
func (h *FileHandler) saveMessage(m SyslogMessage) {
6565
var err error
6666
if h.f == nil {
6767
h.f, err = os.OpenFile(
@@ -90,6 +90,6 @@ func (h *FileHandler) checkErr(err error) bool {
9090
}
9191

9292
// Handle queues and dispatches a message. See BaseHandler.Handle
93-
func (h *FileHandler) Handle(m *Message) *Message {
93+
func (h *FileHandler) Handle(m SyslogMessage) SyslogMessage {
9494
return h.bh.Handle(m)
9595
}

logger/syslog/filehandler_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package syslog
22

33
import (
44
"testing"
5-
"time"
65
)
76

87
type TestLogger struct{}
@@ -12,24 +11,24 @@ func (tl TestLogger) Printf(format string, v ...interface{}) {}
1211
func (tl TestLogger) Println(...interface{}) {}
1312

1413
func TestNewFileHandler(t *testing.T) {
15-
fh := NewFileHandler("", 1, func(m *Message) bool { return true }, true)
14+
fh := NewFileHandler("", 1, func(m SyslogMessage) bool { return true }, true)
1615
if fh == nil {
1716
t.Errorf("expected filehandler, got nil")
1817
}
1918
}
2019

2120
func TestSetLogger(t *testing.T) {
2221
tl := TestLogger{}
23-
fh := NewFileHandler("", 1, func(m *Message) bool { return true }, true)
22+
fh := NewFileHandler("", 1, func(m SyslogMessage) bool { return true }, true)
2423
fh.SetLogger(tl)
2524
if fh.l != tl {
2625
t.Errorf("expected the logger to be set")
2726
}
2827
}
2928

3029
func TestHandle(t *testing.T) {
31-
fh := NewFileHandler("/tmp/test", 1, func(m *Message) bool { return true }, true)
32-
handle := fh.Handle(&Message{time.Now(), nil, 0, 0, time.Now(), "localhost", "test", "message", "", ""})
30+
fh := NewFileHandler("/tmp/test", 1, func(m SyslogMessage) bool { return true }, true)
31+
handle := fh.Handle(&Message{"localhost test message"})
3332
if handle == nil {
3433
t.Errorf("expected a handle, got nil")
3534
}

logger/syslog/handler.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,26 @@ type Handler interface {
55
// Handle should return Message (maybe modified) for future processing by
66
// other handlers or return nil. If Handle is called with nil message it
77
// should complete all remaining work and properly shutdown before return.
8-
Handle(*Message) *Message
8+
Handle(SyslogMessage) SyslogMessage
99
}
1010

1111
// BaseHandler is designed to simplify the creation of real handlers. It
1212
// implements Handler interface using nonblocking queuing of messages and
1313
// simple message filtering.
1414
type BaseHandler struct {
15-
queue chan *Message
15+
queue chan SyslogMessage
1616
end chan struct{}
17-
filter func(*Message) bool
17+
filter func(SyslogMessage) bool
1818
ft bool
1919
}
2020

2121
// NewBaseHandler creates BaseHandler using a specified filter. If filter is nil
2222
// or if it returns true messages are passed to BaseHandler internal queue
2323
// (of qlen length). If filter returns false or ft is true messages are returned
2424
// to server for future processing by other handlers.
25-
func NewBaseHandler(qlen int, filter func(*Message) bool, ft bool) *BaseHandler {
25+
func NewBaseHandler(qlen int, filter func(SyslogMessage) bool, ft bool) *BaseHandler {
2626
return &BaseHandler{
27-
queue: make(chan *Message, qlen),
27+
queue: make(chan SyslogMessage, qlen),
2828
end: make(chan struct{}),
2929
filter: filter,
3030
ft: ft,
@@ -34,7 +34,7 @@ func NewBaseHandler(qlen int, filter func(*Message) bool, ft bool) *BaseHandler
3434
// Handle inserts m in an internal queue. It immediately returns even if
3535
// queue is full. If m == nil it closes queue and waits for End method call
3636
// before return.
37-
func (h *BaseHandler) Handle(m *Message) *Message {
37+
func (h *BaseHandler) Handle(m SyslogMessage) SyslogMessage {
3838
if m == nil {
3939
close(h.queue) // signal that there is no more messages for processing
4040
<-h.end // wait for handler shutdown
@@ -58,7 +58,7 @@ func (h *BaseHandler) Handle(m *Message) *Message {
5858
// Get returns first message from internal queue. It waits for message if queue
5959
// is empty. It returns nil if there is no more messages to process and handler
6060
// should shutdown.
61-
func (h *BaseHandler) Get() *Message {
61+
func (h *BaseHandler) Get() SyslogMessage {
6262
m, ok := <-h.queue
6363
if ok {
6464
return m
@@ -70,7 +70,7 @@ func (h *BaseHandler) Get() *Message {
7070
// it directly, especially if your handler needs to select from multiple channels
7171
// or have to work without blocking. You need to check if this channel is closed by
7272
// sender and properly shutdown in this case.
73-
func (h *BaseHandler) Queue() <-chan *Message {
73+
func (h *BaseHandler) Queue() <-chan SyslogMessage {
7474
return h.queue
7575
}
7676

logger/syslog/message.go

Lines changed: 7 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,47 +2,18 @@ package syslog
22

33
import (
44
"fmt"
5-
"net"
6-
"time"
5+
"strings"
76
)
87

9-
// Message defines an RFC 3164 syslog message.
10-
type Message struct {
11-
Time time.Time // time the message was logged
12-
Source net.Addr // source address of the log message
13-
Facility // facility tag (see type Facility)
14-
Severity // severity tag (see type Severity)
15-
Timestamp time.Time // optional
16-
Hostname string // optional
17-
Tag string // message tag as defined in RFC 3164
18-
Content string // message content as defined in RFC 3164
19-
Tag1 string // alternate message tag (white rune as separator)
20-
Content1 string // alternate message content (white rune as separator)
8+
type SyslogMessage interface {
9+
fmt.Stringer
2110
}
2211

23-
// NetSrc only network part of Source as string (IP for UDP or Name for UDS)
24-
func (m *Message) NetSrc() string {
25-
switch a := m.Source.(type) {
26-
case *net.UDPAddr:
27-
return a.IP.String()
28-
case *net.UnixAddr:
29-
return a.Name
30-
case *net.TCPAddr:
31-
return a.IP.String()
32-
}
33-
// Unknown type
34-
return m.Source.String()
12+
// Message defines a syslog message.
13+
type Message struct {
14+
Msg string
3515
}
3616

37-
// String returns the Message in a string format. This satisfies the fmt.Stringer
38-
// interface.
3917
func (m *Message) String() string {
40-
timeLayout := "2006-01-02 15:04:05"
41-
return fmt.Sprintf(
42-
"%s %s %s%s",
43-
m.Time.Format(timeLayout),
44-
m.Hostname,
45-
m.Tag,
46-
m.Content,
47-
)
18+
return strings.TrimSuffix(m.Msg, "\n")
4819
}

0 commit comments

Comments
 (0)