Skip to content

Commit df64a3d

Browse files
committed
fix(k8s): miss imagebuilder log
1 parent 6e88461 commit df64a3d

4 files changed

Lines changed: 173 additions & 31 deletions

File tree

pkg/controller/token/token.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ type tokenResponse struct {
6868
ExpiresIn int64 `json:"expires_in"`
6969
}
7070

71+
// introspectResponse models the passport /oauth/introspect/ response.
72+
type introspectResponse struct {
73+
Active bool `json:"active"`
74+
Scope string `json:"scope"`
75+
}
76+
7177
// Manager owns the Valkey client and the configuration needed to refresh
7278
// tokens. It is safe for concurrent use.
7379
type Manager struct {
@@ -237,6 +243,9 @@ func (m *Manager) readValid(ctx context.Context, buffer time.Duration) (*payload
237243
if p.ExpiresAt <= cutoff {
238244
return nil, nil
239245
}
246+
if !m.introspectToken(ctx, p.AccessToken) {
247+
return nil, nil
248+
}
240249
return &p, nil
241250
}
242251

@@ -269,6 +278,60 @@ func (m *Manager) fetchAndSave(ctx context.Context) (*payload, error) {
269278
return &p, nil
270279
}
271280

281+
func (m *Manager) introspectToken(ctx context.Context, token string) bool {
282+
if m.passportScopes == "" {
283+
return true
284+
}
285+
286+
endpoint := strings.TrimRight(m.passportURL, "/") + "/oauth/introspect/"
287+
form := url.Values{}
288+
form.Set("token", token)
289+
290+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, strings.NewReader(form.Encode()))
291+
if err != nil {
292+
return false
293+
}
294+
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
295+
req.SetBasicAuth(m.passportKey, m.passportSecret)
296+
297+
resp, err := m.httpClient.Do(req)
298+
if err != nil {
299+
return false
300+
}
301+
defer resp.Body.Close()
302+
303+
if resp.StatusCode != http.StatusOK {
304+
return false
305+
}
306+
307+
var ir introspectResponse
308+
if err := json.NewDecoder(resp.Body).Decode(&ir); err != nil {
309+
return false
310+
}
311+
312+
if !ir.Active {
313+
return false
314+
}
315+
316+
requiredScopes := strings.Fields(m.passportScopes)
317+
tokenScopes := strings.Fields(ir.Scope)
318+
319+
if len(requiredScopes) != len(tokenScopes) {
320+
return false
321+
}
322+
323+
reqMap := make(map[string]bool)
324+
for _, s := range requiredScopes {
325+
reqMap[s] = true
326+
}
327+
for _, s := range tokenScopes {
328+
if !reqMap[s] {
329+
return false
330+
}
331+
}
332+
return true
333+
}
334+
272335
func (m *Manager) requestToken(ctx context.Context) (*tokenResponse, error) {
273336
endpoint := strings.TrimRight(m.passportURL, "/") + "/oauth/token/"
274337
form := url.Values{}

pkg/controller/token/token_test.go

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func newTestManager(t *testing.T, passportHandler http.HandlerFunc) (*Manager, *
3636
t.Setenv("DRYCC_PASSPORT_URL", ts.URL)
3737
t.Setenv("DRYCC_PASSPORT_KEY", "test-key")
3838
t.Setenv("DRYCC_PASSPORT_SECRET", "test-secret")
39-
t.Setenv("DRYCC_PASSPORT_SCOPES", "controller:hook")
39+
t.Setenv("DRYCC_PASSPORT_SCOPES", "passport:message")
4040

4141
mgr, err := NewManager(client)
4242
require.NoError(t, err)
@@ -57,7 +57,20 @@ func passportJSON(t *testing.T, accessToken string, expiresIn int64) http.Handle
5757
}
5858

5959
func TestGet_FastPathReturnsCachedToken(t *testing.T) {
60-
mgr, mr, _ := newTestManager(t, passportJSON(t, "should-not-be-fetched", 2592000))
60+
// The introspect endpoint is required for the fast path validity check now.
61+
handler := func(w http.ResponseWriter, req *http.Request) {
62+
if req.URL.Path == "/oauth/introspect/" {
63+
w.Header().Set("Content-Type", "application/json")
64+
_ = json.NewEncoder(w).Encode(map[string]any{
65+
"active": true,
66+
"scope": "passport:message",
67+
})
68+
return
69+
}
70+
// Fallback for token requests
71+
passportJSON(t, "should-not-be-fetched", 2592000)(w, req)
72+
}
73+
mgr, mr, _ := newTestManager(t, handler)
6174

6275
// Pre-populate Valkey with a still-valid token.
6376
p := payload{AccessToken: "cached-token", ExpiresAt: time.Now().Add(24 * time.Hour).Unix()}
@@ -96,13 +109,30 @@ func TestGet_ColdStartFetchesFromPassport(t *testing.T) {
96109
}
97110

98111
func TestGet_ConcurrentCallsHitPassportOnce(t *testing.T) {
99-
var calls int32
100-
handler := func(w http.ResponseWriter, _ *http.Request) {
101-
atomic.AddInt32(&calls, 1)
112+
var activeCalls int32
113+
var tokenCalls int32
114+
var mu sync.Mutex
115+
116+
handler := func(w http.ResponseWriter, req *http.Request) {
117+
mu.Lock()
118+
if req.URL.Path == "/oauth/introspect/" {
119+
activeCalls++
120+
mu.Unlock()
121+
w.Header().Set("Content-Type", "application/json")
122+
_ = json.NewEncoder(w).Encode(map[string]any{
123+
"active": true,
124+
"scope": "passport:message",
125+
})
126+
return
127+
}
128+
129+
tokenCalls++
130+
mu.Unlock()
131+
102132
// Slow handler to widen the race window.
103133
time.Sleep(50 * time.Millisecond)
104134
w.Header().Set("Content-Type", "application/json")
105-
_, _ = w.Write([]byte(`{"access_token":"only-one","token_type":"Bearer","expires_in":2592000}`))
135+
_, _ = w.Write([]byte(`{"access_token":"only-one","token_type":"Bearer","expires_in":2592000,"scope":"passport:message"}`))
106136
}
107137
mgr, _, _ := newTestManager(t, handler)
108138

@@ -123,15 +153,24 @@ func TestGet_ConcurrentCallsHitPassportOnce(t *testing.T) {
123153
require.NoErrorf(t, errs[i], "goroutine %d", i)
124154
assert.Equal(t, "only-one", results[i])
125155
}
126-
assert.Equal(t, int32(1), atomic.LoadInt32(&calls), "passport should be called exactly once")
156+
assert.Equal(t, int32(1), atomic.LoadInt32(&tokenCalls), "passport should be called exactly once")
127157
}
128158

129159
func TestRefresh_SkipsWhenPlentyOfLifetimeRemains(t *testing.T) {
130-
var calls int32
131-
handler := func(w http.ResponseWriter, _ *http.Request) {
132-
atomic.AddInt32(&calls, 1)
160+
var tokenCalls int32
161+
handler := func(w http.ResponseWriter, req *http.Request) {
162+
if req.URL.Path == "/oauth/introspect/" {
163+
w.Header().Set("Content-Type", "application/json")
164+
_ = json.NewEncoder(w).Encode(map[string]any{
165+
"active": true,
166+
"scope": "passport:message",
167+
})
168+
return
169+
}
170+
171+
atomic.AddInt32(&tokenCalls, 1)
133172
w.WriteHeader(http.StatusOK)
134-
_, _ = w.Write([]byte(`{"access_token":"new","expires_in":2592000}`))
173+
_, _ = w.Write([]byte(`{"access_token":"new","expires_in":2592000,"scope":"passport:message"}`))
135174
}
136175
mgr, mr, _ := newTestManager(t, handler)
137176

@@ -141,7 +180,7 @@ func TestRefresh_SkipsWhenPlentyOfLifetimeRemains(t *testing.T) {
141180
require.NoError(t, mr.Set(TokenKey, string(raw)))
142181

143182
require.NoError(t, mgr.Refresh(context.Background(), false))
144-
assert.Equal(t, int32(0), atomic.LoadInt32(&calls))
183+
assert.Equal(t, int32(0), atomic.LoadInt32(&tokenCalls))
145184

146185
got, _ := mr.Get(TokenKey)
147186
assert.Equal(t, string(raw), got, "token must be untouched")
@@ -386,6 +425,15 @@ func TestNewClientFromEnv_MissingURL(t *testing.T) {
386425
func TestRequestToken_SendsControllerHookScope(t *testing.T) {
387426
var captured url.Values
388427
handler := func(w http.ResponseWriter, r *http.Request) {
428+
if r.URL.Path == "/oauth/introspect/" {
429+
w.Header().Set("Content-Type", "application/json")
430+
_ = json.NewEncoder(w).Encode(map[string]any{
431+
"active": true,
432+
"scope": "passport:message",
433+
})
434+
return
435+
}
436+
389437
body, _ := io.ReadAll(r.Body)
390438
captured, _ = url.ParseQuery(string(body))
391439
w.Header().Set("Content-Type", "application/json")
@@ -399,5 +447,5 @@ func TestRequestToken_SendsControllerHookScope(t *testing.T) {
399447
assert.Equal(t, "client_credentials", captured.Get("grant_type"))
400448
assert.Equal(t, "test-key", captured.Get("client_id"))
401449
assert.Equal(t, "test-secret", captured.Get("client_secret"))
402-
assert.Equal(t, "controller:hook", captured.Get("scope"))
450+
assert.Equal(t, "passport:message", captured.Get("scope"))
403451
}

pkg/gitreceive/build.go

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ import (
2525
"gopkg.in/yaml.v3"
2626
corev1 "k8s.io/api/core/v1"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/labels"
2829
"k8s.io/client-go/kubernetes"
2930
"k8s.io/client-go/kubernetes/scheme"
31+
"k8s.io/client-go/tools/cache"
3032
)
3133

3234
const (
@@ -186,24 +188,25 @@ func build(
186188
return fmt.Errorf("creating builder pod (%s)", err)
187189
}
188190

189-
pw := k8s.NewPodWatcher(*kubeClient, conf.PodNamespace)
191+
pw := k8s.NewPodWatcher(*kubeClient, conf.PodNamespace, builderPodLabelSelector(newJob.Name))
190192
stopCh := make(chan struct{})
191193
defer close(stopCh)
192194
go pw.Controller.Run(stopCh)
193195

196+
if !cache.WaitForCacheSync(stopCh, pw.Controller.HasSynced) {
197+
return fmt.Errorf("pod watcher cache failed to sync for job %s", newJob.Name)
198+
}
199+
194200
if err := waitForPod(pw, newJob.Name, conf.SessionIdleInterval(), conf.BuilderPodTickDuration(), conf.BuilderPodWaitDuration()); err != nil {
195201
return fmt.Errorf("watching events for builder pod startup (%s)", err)
196202
}
197203

198-
options := metav1.ListOptions{
199-
LabelSelector: fmt.Sprintf("job-name=%s", newJob.Name),
200-
}
201-
podList, err := kubeClient.CoreV1().Pods(newJob.Namespace).List(context.Background(), options)
204+
builderPod, err := resolveBuilderPod(pw, newJob.Name)
202205
if err != nil {
203-
return fmt.Errorf("list pods %s fail: (%s)", newJob.Name, err)
206+
return err
204207
}
205208

206-
req := kubeClient.CoreV1().RESTClient().Get().Namespace(newJob.Namespace).Name(podList.Items[0].Name).Resource("pods").SubResource("log").VersionedParams(
209+
req := kubeClient.CoreV1().RESTClient().Get().Namespace(newJob.Namespace).Name(builderPod.Name).Resource("pods").SubResource("log").VersionedParams(
207210
&corev1.PodLogOptions{
208211
Follow: true,
209212
}, scheme.ParameterCodec)
@@ -234,7 +237,7 @@ func build(
234237
}
235238
log.Debug("Done")
236239
log.Debug("Checking for builder pod exit code")
237-
buildPod, err := kubeClient.CoreV1().Pods(newJob.Namespace).Get(context.TODO(), podList.Items[0].Name, metav1.GetOptions{})
240+
buildPod, err := kubeClient.CoreV1().Pods(newJob.Namespace).Get(context.TODO(), builderPod.Name, metav1.GetOptions{})
238241
if err != nil {
239242
return fmt.Errorf("error getting builder pod status (%s)", err)
240243
}
@@ -331,3 +334,22 @@ func getDockerfile(dirName string, stack map[string]string) (string, error) {
331334
}
332335
return "", nil
333336
}
337+
338+
func builderPodLabelSelector(jobName string) string {
339+
return fmt.Sprintf("job-name=%s,heritage=drycc", jobName)
340+
}
341+
342+
func resolveBuilderPod(pw *k8s.PodWatcher, jobName string) (*corev1.Pod, error) {
343+
selector := labels.Set{
344+
"job-name": jobName,
345+
"heritage": "drycc",
346+
}.AsSelector()
347+
pods, err := pw.Store.List(selector)
348+
if err != nil {
349+
return nil, fmt.Errorf("listing builder pods for job %s: %s", jobName, err)
350+
}
351+
if len(pods) == 0 {
352+
return nil, fmt.Errorf("no builder pod found in cache for job %s", jobName)
353+
}
354+
return pods[0], nil
355+
}

pkg/k8s/watch.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,29 +38,38 @@ func (s *StoreToPodLister) List(selector labels.Selector) (pods []*v1.Pod, err e
3838
return pods, nil
3939
}
4040

41-
// NewPodWatcher creates a new BuildPodWatcher useful to list the pods using a cache which gets updated based on the watch func.
42-
func NewPodWatcher(c kubernetes.Clientset, ns string) *PodWatcher {
41+
// NewPodWatcher creates a new PodWatcher backed by a cache populated from a
42+
// label-scoped list/watch. A tight labelSelector keeps the WatchList initial
43+
// events stream small enough for the apiserver to deliver the terminating
44+
// bookmark within client-go's timeout; an empty string watches the namespace.
45+
func NewPodWatcher(c kubernetes.Clientset, ns, labelSelector string) *PodWatcher {
4346
pw := &PodWatcher{}
4447

4548
pw.Store.Store, pw.Controller = cache.NewInformerWithOptions(cache.InformerOptions{
4649
ListerWatcher: &cache.ListWatch{
47-
ListFunc: podListFunc(c, ns),
48-
WatchFunc: podWatchFunc(c, ns),
50+
ListFunc: podListFunc(c, ns, labelSelector),
51+
WatchFunc: podWatchFunc(c, ns, labelSelector),
4952
},
5053
ObjectType: &v1.Pod{},
5154
Handler: cache.ResourceEventHandlerFuncs{},
5255
})
5356
return pw
5457
}
5558

56-
func podListFunc(c kubernetes.Clientset, ns string) func(options metav1.ListOptions) (runtime.Object, error) {
57-
return func(metav1.ListOptions) (runtime.Object, error) {
58-
return c.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{})
59+
// podListFunc and podWatchFunc preserve any options injected by the reflector
60+
// (ResourceVersion, AllowWatchBookmarks, SendInitialEvents, ...) and only
61+
// override LabelSelector. Overwriting options with a fresh metav1.ListOptions{}
62+
// here would break the WatchList path used by client-go >= v0.35.
63+
func podListFunc(c kubernetes.Clientset, ns, labelSelector string) func(options metav1.ListOptions) (runtime.Object, error) {
64+
return func(options metav1.ListOptions) (runtime.Object, error) {
65+
options.LabelSelector = labelSelector
66+
return c.CoreV1().Pods(ns).List(context.TODO(), options)
5967
}
6068
}
6169

62-
func podWatchFunc(c kubernetes.Clientset, ns string) func(options metav1.ListOptions) (watch.Interface, error) {
63-
return func(metav1.ListOptions) (watch.Interface, error) {
64-
return c.CoreV1().Pods(ns).Watch(context.TODO(), metav1.ListOptions{})
70+
func podWatchFunc(c kubernetes.Clientset, ns, labelSelector string) func(options metav1.ListOptions) (watch.Interface, error) {
71+
return func(options metav1.ListOptions) (watch.Interface, error) {
72+
options.LabelSelector = labelSelector
73+
return c.CoreV1().Pods(ns).Watch(context.TODO(), options)
6574
}
6675
}

0 commit comments

Comments
 (0)