Skip to content

Commit 79d61c0

Browse files
author
Keerthan Mala
committed
Use k8s cache indexer to look for pod status
1 parent aeb0a0e commit 79d61c0

3 files changed

Lines changed: 75 additions & 15 deletions

File tree

pkg/gitreceive/build.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,12 @@ func build(
183183
return fmt.Errorf("creating builder pod (%s)", err)
184184
}
185185

186-
if err := waitForPod(kubeClient, newPod.Namespace, newPod.Name, conf.SessionIdleInterval(), conf.BuilderPodTickDuration(), conf.BuilderPodWaitDuration()); err != nil {
186+
pw := k8s.NewPodWatcher(kubeClient, "deis")
187+
stopCh := make(chan struct{})
188+
defer close(stopCh)
189+
go pw.Controller.Run(stopCh)
190+
191+
if err := waitForPod(pw, newPod.Namespace, newPod.Name, conf.SessionIdleInterval(), conf.BuilderPodTickDuration(), conf.BuilderPodWaitDuration()); err != nil {
187192
return fmt.Errorf("watching events for builder pod startup (%s)", err)
188193
}
189194

@@ -213,7 +218,7 @@ func build(
213218
)
214219
// check the state and exit code of the build pod.
215220
// if the code is not 0 return error
216-
if err := waitForPodEnd(kubeClient, newPod.Namespace, newPod.Name, conf.BuilderPodTickDuration(), conf.BuilderPodWaitDuration()); err != nil {
221+
if err := waitForPodEnd(pw, newPod.Namespace, newPod.Name, conf.BuilderPodTickDuration(), conf.BuilderPodWaitDuration()); err != nil {
217222
return fmt.Errorf("error getting builder pod status (%s)", err)
218223
}
219224
log.Debug("Done")

pkg/gitreceive/k8s_util.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import (
44
"fmt"
55
"time"
66

7+
"github.com/deis/builder/pkg/k8s"
78
"github.com/pborman/uuid"
89
"k8s.io/kubernetes/pkg/api"
9-
apierrs "k8s.io/kubernetes/pkg/api/errors"
10-
client "k8s.io/kubernetes/pkg/client/unversioned"
10+
"k8s.io/kubernetes/pkg/labels"
1111
"k8s.io/kubernetes/pkg/util/wait"
1212
)
1313

@@ -171,7 +171,7 @@ func addEnvToPod(pod api.Pod, key, value string) {
171171
}
172172

173173
// waitForPod waits for a pod in state running, succeeded or failed
174-
func waitForPod(c *client.Client, ns, podName string, ticker, interval, timeout time.Duration) error {
174+
func waitForPod(pw *k8s.PodWatcher, ns, podName string, ticker, interval, timeout time.Duration) error {
175175
condition := func(pod *api.Pod) (bool, error) {
176176
if pod.Status.Phase == api.PodRunning {
177177
return true, nil
@@ -186,14 +186,14 @@ func waitForPod(c *client.Client, ns, podName string, ticker, interval, timeout
186186
}
187187

188188
quit := progress("...", ticker)
189-
err := waitForPodCondition(c, ns, podName, condition, interval, timeout)
189+
err := waitForPodCondition(pw, ns, podName, condition, interval, timeout)
190190
quit <- true
191191
<-quit
192192
return err
193193
}
194194

195195
// waitForPodEnd waits for a pod in state succeeded or failed
196-
func waitForPodEnd(c *client.Client, ns, podName string, interval, timeout time.Duration) error {
196+
func waitForPodEnd(pw *k8s.PodWatcher, ns, podName string, interval, timeout time.Duration) error {
197197
condition := func(pod *api.Pod) (bool, error) {
198198
if pod.Status.Phase == api.PodSucceeded {
199199
return true, nil
@@ -204,21 +204,19 @@ func waitForPodEnd(c *client.Client, ns, podName string, interval, timeout time.
204204
return false, nil
205205
}
206206

207-
return waitForPodCondition(c, ns, podName, condition, interval, timeout)
207+
return waitForPodCondition(pw, ns, podName, condition, interval, timeout)
208208
}
209209

210210
// waitForPodCondition waits for a pod in state defined by a condition (func)
211-
func waitForPodCondition(c *client.Client, ns, podName string, condition func(pod *api.Pod) (bool, error),
211+
func waitForPodCondition(pw *k8s.PodWatcher, ns, podName string, condition func(pod *api.Pod) (bool, error),
212212
interval, timeout time.Duration) error {
213213
return wait.PollImmediate(interval, timeout, func() (bool, error) {
214-
pod, err := c.Pods(ns).Get(podName)
215-
if err != nil {
216-
if apierrs.IsNotFound(err) {
217-
return false, nil
218-
}
214+
pods, err := pw.Store.List(labels.Set{"heritage": podName}.AsSelector())
215+
if err != nil || len(pods) == 0 {
216+
return false, nil
219217
}
220218

221-
done, err := condition(pod)
219+
done, err := condition(pods[0])
222220
if err != nil {
223221
return false, err
224222
}

pkg/k8s/watch.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package k8s
2+
3+
import (
4+
"time"
5+
6+
"k8s.io/kubernetes/pkg/api"
7+
"k8s.io/kubernetes/pkg/client/cache"
8+
client "k8s.io/kubernetes/pkg/client/unversioned"
9+
"k8s.io/kubernetes/pkg/controller/framework"
10+
"k8s.io/kubernetes/pkg/labels"
11+
"k8s.io/kubernetes/pkg/runtime"
12+
"k8s.io/kubernetes/pkg/watch"
13+
)
14+
15+
var (
16+
resyncPeriod = 30 * time.Second
17+
)
18+
19+
//PodWatcher is a struct which holds the return values of (k8s.io/kubernetes/pkg/controller/framework).NewIndexerInformer together.
20+
type PodWatcher struct {
21+
Store cache.StoreToPodLister
22+
Controller *framework.Controller
23+
}
24+
25+
//NewPodWatcher creates a new BuildPodWatcher useful to list the pods using a cache which gets updated based on the watch func.
26+
func NewPodWatcher(c *client.Client, ns string) *PodWatcher {
27+
pw := &PodWatcher{}
28+
29+
pw.Store.Store, pw.Controller = framework.NewIndexerInformer(
30+
&cache.ListWatch{
31+
ListFunc: podListFunc(c, ns),
32+
WatchFunc: podWatchFunc(c, ns),
33+
},
34+
&api.Pod{},
35+
resyncPeriod,
36+
framework.ResourceEventHandlerFuncs{},
37+
cache.Indexers{},
38+
)
39+
40+
return pw
41+
}
42+
43+
func podListFunc(c *client.Client, ns string) func(options api.ListOptions) (runtime.Object, error) {
44+
return func(opts api.ListOptions) (runtime.Object, error) {
45+
return c.Pods(ns).List(api.ListOptions{
46+
LabelSelector: labels.Everything(),
47+
})
48+
}
49+
}
50+
51+
func podWatchFunc(c *client.Client, ns string) func(options api.ListOptions) (watch.Interface, error) {
52+
return func(opts api.ListOptions) (watch.Interface, error) {
53+
return c.Pods(ns).Watch(api.ListOptions{
54+
LabelSelector: labels.Everything(),
55+
})
56+
}
57+
}

0 commit comments

Comments
 (0)