-
Notifications
You must be signed in to change notification settings - Fork 112
Expand file tree
/
Copy pathserver.go
More file actions
185 lines (170 loc) · 6.39 KB
/
server.go
File metadata and controls
185 lines (170 loc) · 6.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package syslogish
import (
"errors"
"fmt"
"log"
"net"
"regexp"
"strings"
"sync"
"github.com/deis/deis/logger/drain"
"github.com/deis/deis/logger/storage"
)
var appRegex *regexp.Regexp
func init() {
appRegex = regexp.MustCompile(`^.* ([-_a-z0-9]+)\[[a-z0-9-_\.]+\].*`)
}
// Server implements a UDP-based "syslog-like" server. Like syslog, as described by RFC 3164, it
// expects that each packet contains a single log message and that, conversely, log messages are
// encapsulated in their entirety by a single packet, however, no attempt is made to parse the
// messages received or validate that they conform to the specification.
type Server struct {
conn net.PacketConn
listening bool
queue chan string
storageAdapter storage.Adapter
drain drain.LogDrain
adapterMutex sync.RWMutex
drainMutex sync.RWMutex
}
// NewServer returns a pointer to a new Server instance.
func NewServer(bindHost string, bindPort int) (*Server, error) {
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", bindHost, bindPort))
if err != nil {
return nil, err
}
c, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
return &Server{conn: c, queue: make(chan string, 1000)}, nil
}
// SetStorageAdapter permits a server's underlying storage.Adapter to be reconfigured (replaced)
// at runtime.
func (s *Server) SetStorageAdapter(storageAdapter storage.Adapter) {
// Get an exclusive lock before updating the internal pointer to the storage adapter. Other
// goroutines holding read locks might depend on that pointer as it currently exists.
s.adapterMutex.Lock()
defer s.adapterMutex.Unlock()
s.storageAdapter = storageAdapter
}
// SetDrain permits a server's underlying drain.LogDrain to be reconfigured (replaced) at runtime.
func (s *Server) SetDrain(drain drain.LogDrain) {
// Get an exclusive lock before updating the internal pointer to the log drain. Other
// goroutines holding read locks might depend on that pointer as it currently exists.
s.drainMutex.Lock()
defer s.drainMutex.Unlock()
s.drain = drain
}
// Listen starts the server's main loop.
func (s *Server) Listen() {
// Should only ever be called once
if !s.listening {
s.listening = true
go s.receive()
go s.process()
log.Println("syslogish server running")
}
}
func (s *Server) receive() {
// Make buffer the same size as the max for a UDP packet
buf := make([]byte, 65535)
for {
n, _, err := s.conn.ReadFrom(buf)
if err != nil {
log.Fatal("syslogish server read error", err)
}
message := strings.TrimSuffix(string(buf[:n]), "\n")
select {
case s.queue <- message:
default:
}
}
}
func (s *Server) process() {
for message := range s.queue {
app, err := getAppName(message)
if err != nil {
log.Println(err)
return
}
// Get a read lock to ensure the storage adapater pointer can't be updated by another
// goroutine in the time between we check if it's nil and the time we invoke .Write() upon
// it.
s.adapterMutex.RLock()
// DONT'T defer unlocking... defered statements are executed when the function returns, but
// we are inside an infinite loop here. If we defer, we would never release the lock.
// Instead, release it manually below.
if s.storageAdapter != nil {
s.storageAdapter.Write(app, message)
// We don't bother trapping errors here, so failed writes to storage are silent. This is by
// design. If we sent a log message to STDOUT in response to the failure, deis-logspout
// would read it and forward it back to deis-logger, which would fail again to write to
// storage and spawn ANOTHER log message. The effect would be an infinite loop of
// unstoreable log messages that would nevertheless fill up journal logs and eventually
// overake the disk.
//
// Treating this as a fatal event would cause the deis-logger unit to restart-- sending
// even more log messages to STDOUT. The overall effect would be the same as described
// above with the added disadvantages of flapping.
//
// But, do not return preemptively. It's possible the message can still be sent to
// the drain.
}
s.adapterMutex.RUnlock()
// Same story as above for the lock on the drain
s.drainMutex.RLock()
if s.drain != nil {
s.drain.Send(message)
// We don't bother trapping errors here. The rationale is the same as above.
}
s.drainMutex.RUnlock()
}
}
func getAppName(message string) (string, error) {
match := appRegex.FindStringSubmatch(message)
if match == nil {
return "", fmt.Errorf("Could not find app name in message: %s", message)
}
return match[1], nil
}
// ReadLogs returns a specified number of log lines (if available) for a specified app by
// delegating to the server's underlying storage.Adapter.
func (s *Server) ReadLogs(app string, lines int) ([]string, error) {
// Get a read lock to ensure the storage adapater pointer can't be updated by another
// goroutine in the time between we check if it's nil and the time we invoke .Read() upon
// it.
s.adapterMutex.RLock()
defer s.adapterMutex.RUnlock()
if s.storageAdapter == nil {
return nil, fmt.Errorf("Could not find logs for '%s'. No storage adapter specified.", app)
}
return s.storageAdapter.Read(app, lines)
}
// DestroyLogs deletes all logs for a specified app by delegating to the server's underlying
// storage.Adapter.
func (s *Server) DestroyLogs(app string) error {
// Get a read lock to ensure the storage adapater pointer can't be updated by another
// goroutine in the time between we check if it's nil and the time we invoke .Destroy() upon
// it.
s.adapterMutex.RLock()
defer s.adapterMutex.RUnlock()
if s.storageAdapter == nil {
return fmt.Errorf("Could not destroy logs for '%s'. No storage adapter specified.", app)
}
return s.storageAdapter.Destroy(app)
}
// ReopenLogs delegate to the server's underlying storage.Adapter to, if applicable, refresh
// references to underlying storage mechanisms. This is useful, for instance, to ensure logging
// continues smoothly after log rotation when file-based storage is in use.
func (s *Server) ReopenLogs() error {
// Get a read lock to ensure the storage adapater pointer can't be updated by another
// goroutine in the time between we check if it's nil and the time we invoke .Reopen() upon
// it.
s.adapterMutex.RLock()
defer s.adapterMutex.RUnlock()
if s.storageAdapter == nil {
return errors.New("Could not reopen logs. No storage adapter specified.")
}
return s.storageAdapter.Reopen()
}