@@ -44,10 +44,9 @@ func main() {
4444 flag .Parse ()
4545
4646 client := etcd .NewClient ([]string {"http://" + publishHost + ":" + publishPort })
47-
47+ ticker := time . NewTicker ( time . Duration ( publishInterval ) * time . Second )
4848 signalChan := make (chan os.Signal , 1 )
4949 drainChan := make (chan string )
50- stopChan := make (chan bool )
5150 exitChan := make (chan bool )
5251 cleanupChan := make (chan bool )
5352 signal .Notify (signalChan , syscall .SIGTERM , syscall .SIGINT )
@@ -59,15 +58,19 @@ func main() {
5958
6059 go syslogd .Listen (exitChan , cleanupChan , drainChan , fmt .Sprintf ("%s:%d" , logAddr , logPort ))
6160 if enablePublish {
62- go publishService ( exitChan , client , publishHost , publishPath , strconv .Itoa (logPort ), uint64 (time .Duration (publishTTL )* time .Second ))
61+ publishKeys ( client , publishHost , publishPath , strconv .Itoa (logPort ), uint64 (time .Duration (publishTTL )* time .Second ))
6362 }
6463
65- // HACK (bacongobbler): poll etcd for changes in the log drain value
66- // etcd's .Watch() implementation is broken when you use TTLs
67- //
68- // https://github.com/coreos/etcd/issues/2679
69- go func () {
70- for {
64+ for {
65+ select {
66+ case <- ticker .C :
67+ if enablePublish {
68+ publishKeys (client , publishHost , publishPath , strconv .Itoa (logPort ), uint64 (time .Duration (publishTTL )* time .Second ))
69+ }
70+ // HACK (bacongobbler): poll etcd every publishInterval for changes in the log drain value.
71+ // etcd's .Watch() implementation is broken when you use TTLs
72+ //
73+ // https://github.com/coreos/etcd/issues/2679
7174 resp , err := client .Get (publishPath + "/drain" , false , false )
7275 if err != nil {
7376 log .Printf ("warning: could not retrieve drain URI from etcd: %v\n " , err )
@@ -76,16 +79,10 @@ func main() {
7679 if resp != nil && resp .Node != nil {
7780 drainChan <- resp .Node .Value
7881 }
79- time .Sleep (time .Duration (publishInterval ) * time .Second )
80- }
81- }()
82-
83- for {
84- select {
8582 case <- signalChan :
8683 close (exitChan )
87- stopChan <- true
8884 case <- cleanupChan :
85+ ticker .Stop ()
8986 return
9087 }
9188 }
@@ -97,21 +94,6 @@ func publishKeys(client *etcd.Client, host, etcdPath, port string, ttl uint64) {
9794 setEtcd (client , etcdPath + "/port" , port , ttl )
9895}
9996
100- // publishServices publishes keys immediately, then every publishInterval seconds until it receives
101- // something on exitChan.
102- func publishService (exitChan chan bool , client * etcd.Client , host string , etcdPath string , port string , ttl uint64 ) {
103- publishKeys (client , host , etcdPath , port , ttl )
104- t := time .NewTicker (time .Duration (publishInterval ) * time .Second )
105- for {
106- select {
107- case <- t .C :
108- publishKeys (client , host , etcdPath , port , ttl )
109- case <- exitChan :
110- return
111- }
112- }
113- }
114-
11597func setEtcd (client * etcd.Client , key , value string , ttl uint64 ) {
11698 _ , err := client .Set (key , value , ttl )
11799 if err != nil && ! strings .Contains (err .Error (), "Key already exists" ) {
0 commit comments