Skip to content
This repository was archived by the owner on Jun 25, 2025. It is now read-only.

Commit 7123b6a

Browse files
committed
chore(transport): Refactor to use pluggable Aggregator component...
1 parent 3bf21b8 commit 7123b6a

19 files changed

Lines changed: 392 additions & 300 deletions

Makefile

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@ GOLINT = golint
55
GOTEST = $(GO) test --cover --race -v
66
GOVET = $(GO) vet
77
GO_FILES = $(wildcard *.go)
8-
GO_PACKAGES = storage syslogish weblog
8+
GO_PACKAGES = storage log weblog
99
GO_PACKAGES_REPO_PATH = $(addprefix $(REPO_PATH)/,$(GO_PACKAGES))
10-
GO_TESTABLE_PACKAGES_REPO_PATH = $(addprefix $(REPO_PATH)/,storage storage/file storage/ringbuffer)
1110

1211
# the filepath to this repository, relative to $GOPATH/src
1312
REPO_PATH = github.com/deis/logger
@@ -50,21 +49,21 @@ bootstrap: check-docker
5049
build-binary:
5150
GOOS=linux GOARCH=amd64 go build -ldflags ${LDFLAGS} -o $(BINARY_DEST_DIR)/logger .
5251

53-
build: build-with-container docker-build
52+
build: docker-build
53+
build-without-container: build-binary build-image
54+
push: docker-push
55+
upgrade: kube-update
56+
install: kube-install
57+
uninstall: kube-delete
5458

5559
# Containerized build of the binary
5660
build-with-container: check-docker
5761
mkdir -p ${BINARY_DEST_DIR}
5862
${DEV_ENV_CMD} make build-binary
59-
docker build --rm -t ${IMAGE} rootfs
6063

61-
build-without-container: build-binary
62-
docker build -t ${IMAGE} rootfs
63-
docker tag ${IMAGE} ${MUTABLE_IMAGE}
64-
65-
push: docker-push
64+
docker-build: build-with-container build-image
6665

67-
docker-build: build-with-container
66+
build-image:
6867
docker build -t ${IMAGE} rootfs
6968
docker tag ${IMAGE} ${MUTABLE_IMAGE}
7069

@@ -91,22 +90,19 @@ style-check:
9190
shellcheck $(SHELL_SCRIPTS)
9291

9392
test-unit:
94-
${DEV_ENV_CMD} $(GOTEST) $(GO_TESTABLE_PACKAGES_REPO_PATH)
93+
${DEV_ENV_CMD} $(GOTEST) $$(glide nv)
9594

9695
kube-install:
9796
kubectl create -f manifests/deis-logger-svc.yaml
9897
kubectl create -f manifests/deis-logger-rc.yaml
99-
kubectl create -f manifests/deis-logger-fluentd-daemon.yaml
10098

10199
kube-delete:
102100
-kubectl delete -f manifests/deis-logger-svc.yaml
103101
-kubectl delete -f manifests/deis-logger-rc.tmp.yaml
104-
-kubectl delete -f manifests/deis-logger-fluentd-daemon.yaml
105102

106103
kube-create: update-manifests
107104
kubectl create -f manifests/deis-logger-svc.yaml
108105
kubectl create -f manifests/deis-logger-rc.tmp.yaml
109-
kubectl create -f manifests/deis-logger-fluentd-daemon.yaml
110106

111107
kube-replace: build push update-manifests
112108
kubectl replace --force -f manifests/deis-logger-rc.tmp.yaml

config.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ const (
99
)
1010

1111
type config struct {
12-
StorageType string `envconfig:"STORAGE_ADAPTER" default:"memory"`
13-
NumLines int `envconfig:"NUMBER_OF_LINES" default:"1000"`
12+
StorageType string `envconfig:"STORAGE_ADAPTER" default:"memory"`
13+
NumLines int `envconfig:"NUMBER_OF_LINES" default:"1000"`
14+
AggregatorType string `envconfig:"AGGREGATOR_TYPE" default:"nsq"`
1415
}
1516

1617
func parseConfig(appName string) (*config, error) {

glide.lock

Lines changed: 14 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

glide.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ import:
33
- package: github.com/gorilla/mux
44
- package: github.com/gorilla/context
55
- package: github.com/kelseyhightower/envconfig
6+
- package: github.com/nsqio/go-nsq

log/aggregator.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package log
2+
3+
// Aggregator is an interface for pluggable components that collect log messages delivered via
4+
// some transport mechanism
5+
type Aggregator interface {
6+
// Listen causes an aggregator to begin consuming messages via its underlying transport
7+
// mechanism. Implementations of this must be non-blocking. Stop() can be called to stop the
8+
// aggregator.
9+
Listen() error
10+
// Stop stops the consumer and blocks until it stops or the configured duration passes. In the
11+
// latter case, returns an error of type ErrStopTimedOut
12+
Stop() error
13+
// Stopped returns a channel that will receive when the consumer has stopped. The error received
14+
// may be nil, which means it was stopped cleanly. A non-nil error means it stopped because it
15+
// errored.
16+
Stopped() <-chan error
17+
}

log/aggregator_factory.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package log
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/deis/logger/storage"
7+
)
8+
9+
// NewAggregator returns a pointer to an appropriate implementation of the Aggregator interface, as
10+
// determined by the aggregatorType string it is passed.
11+
func NewAggregator(aggregatorType string, storageAdapter storage.Adapter) (Aggregator, error) {
12+
if aggregatorType == "nsq" {
13+
return newNSQAggregator(storageAdapter), nil
14+
}
15+
return nil, fmt.Errorf("Unrecognized aggregator type: '%s'", aggregatorType)
16+
}

log/aggregator_factory_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package log
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
"testing"
7+
)
8+
9+
type stubStorageAdapter struct {
10+
}
11+
12+
func (a *stubStorageAdapter) Write(app string, message string) error {
13+
return nil
14+
}
15+
16+
func (a *stubStorageAdapter) Read(app string, lines int) ([]string, error) {
17+
return []string{}, nil
18+
}
19+
20+
func (a *stubStorageAdapter) Destroy(app string) error {
21+
return nil
22+
}
23+
24+
func (a *stubStorageAdapter) Reopen() error {
25+
return nil
26+
}
27+
28+
func TestGetUsingInvalidValues(t *testing.T) {
29+
_, err := NewAggregator("bogus", &stubStorageAdapter{})
30+
if err == nil || err.Error() != fmt.Sprintf("Unrecognized aggregator type: '%s'", "bogus") {
31+
t.Error("Did not receive expected error message")
32+
}
33+
}
34+
35+
func TestNSQBasedAggregator(t *testing.T) {
36+
a, err := NewAggregator("nsq", &stubStorageAdapter{})
37+
if err != nil {
38+
t.Error(err)
39+
}
40+
expected := "*log.nsqAggregator"
41+
aType := reflect.TypeOf(a).String()
42+
if aType != expected {
43+
t.Errorf("Expected a %s, but got a %s", expected, aType)
44+
}
45+
}

log/config.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package log
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/kelseyhightower/envconfig"
8+
)
9+
10+
const (
11+
appName = "logger"
12+
)
13+
14+
type config struct {
15+
NSQHost string `envconfig:"DEIS_NSQD_SERVICE_HOST" default:""`
16+
NSQPort int `envconfig:"DEIS_NSQD_SERVICE_PORT_TRANSPORT" default:"4150"`
17+
NSQTopic string `envconfig:"NSQ_TOPIC" default:"logs"`
18+
NSQChannel string `envconfig:"NSQ_CHANNEL" default:"consume"`
19+
NSQHandlerCount int `envconfig:"NSQ_HANDLER_COUNT" default:"30"`
20+
StopTimeoutSeconds int `envconfig:"AGGREGATOR_STOP_TIMEOUT_SEC" default:"1"`
21+
}
22+
23+
func (c config) nsqURL() string {
24+
return fmt.Sprintf("%s:%d", c.NSQHost, c.NSQPort)
25+
}
26+
27+
func (c config) stopTimeoutDuration() time.Duration {
28+
return time.Duration(c.StopTimeoutSeconds) * time.Second
29+
}
30+
31+
func parseConfig(appName string) (*config, error) {
32+
ret := new(config)
33+
if err := envconfig.Process(appName, ret); err != nil {
34+
return nil, err
35+
}
36+
return ret, nil
37+
}

log/errors.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package log
2+
3+
import (
4+
"fmt"
5+
"time"
6+
)
7+
8+
// ErrStopTimedOut is the error returned if a (Aggregator).Stop call times out before the stop is
9+
// complete
10+
type ErrStopTimedOut struct {
11+
Timeout time.Duration
12+
}
13+
14+
func newErrStopTimedOut(timeout time.Duration) ErrStopTimedOut {
15+
return ErrStopTimedOut{Timeout: timeout}
16+
}
17+
18+
func (e ErrStopTimedOut) Error() string {
19+
return fmt.Sprintf("stopping a consumer timed out after %s", e.Timeout)
20+
}

log/message_handler.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package log
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"regexp"
7+
8+
"github.com/deis/logger/storage"
9+
)
10+
11+
const (
12+
controllerPattern = `^(INFO|WARN|DEBUG|ERROR)\s+(\[(\S+)\])+:(.*)`
13+
controllerContainerName = "deis-controller"
14+
)
15+
16+
var (
17+
regex = regexp.MustCompile(controllerPattern)
18+
)
19+
20+
func handle(rawMessage []byte, storageAdapter storage.Adapter) error {
21+
message := new(Message)
22+
if err := json.Unmarshal(rawMessage, message); err != nil {
23+
return err
24+
}
25+
if fromController(message) {
26+
storageAdapter.Write(getApplicationFromControllerMessage(message), message.Log)
27+
} else {
28+
labels := message.Kubernetes.Labels
29+
storageAdapter.Write(labels["app"], buildApplicationLogMessage(message))
30+
}
31+
return nil
32+
}
33+
34+
func fromController(message *Message) bool {
35+
matched, _ := regexp.MatchString(controllerContainerName, message.Kubernetes.ContainerName)
36+
return matched
37+
}
38+
39+
func getApplicationFromControllerMessage(message *Message) string {
40+
return regex.FindStringSubmatch(message.Log)[3]
41+
}
42+
43+
func buildApplicationLogMessage(message *Message) string {
44+
body := message.Log
45+
podName := message.Kubernetes.PodName
46+
return fmt.Sprintf("%s -- %s", podName, body)
47+
}

0 commit comments

Comments
 (0)