@@ -31,6 +31,7 @@ import (
31
31
"google.golang.org/grpc"
32
32
"google.golang.org/grpc/credentials/insecure"
33
33
corev1 "k8s.io/api/core/v1"
34
+ "k8s.io/apimachinery/pkg/api/errors"
34
35
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35
36
"k8s.io/apimachinery/pkg/types"
36
37
"k8s.io/apimachinery/pkg/util/wait"
@@ -52,7 +53,7 @@ const (
52
53
)
53
54
54
55
type podEvaluator struct {
55
- requestCh chan * clientConnRequest
56
+ requestCh chan <- * clientConnRequest
56
57
57
58
podCacheManager * podCacheManager
58
59
}
@@ -80,7 +81,7 @@ func NewPodEvaluator(namespace, wrapperServerImage string, interval, ttl time.Du
80
81
requestCh : reqCh ,
81
82
podReadyCh : readyCh ,
82
83
cache : map [string ]* podAndGRPCClient {},
83
- waitlists : map [string ][]chan * clientConnAndError {},
84
+ waitlists : map [string ][]chan <- * clientConnAndError {},
84
85
85
86
podManager : & podManager {
86
87
kubeClient : cl ,
@@ -103,8 +104,7 @@ func (pe *podEvaluator) EvaluateFunction(ctx context.Context, req *evaluator.Eva
103
104
// make a buffer for the channel to prevent unnecessary blocking when the pod cache manager sends it to multiple waiting gorouthine in batch.
104
105
ccChan := make (chan * clientConnAndError , 1 )
105
106
// Send a request to request a grpc client.
106
- pe .podCacheManager .requestCh <- & clientConnRequest {
107
- ctx : ctx ,
107
+ pe .requestCh <- & clientConnRequest {
108
108
image : req .Image ,
109
109
grpcClientCh : ccChan ,
110
110
}
@@ -126,26 +126,25 @@ type podCacheManager struct {
126
126
gcScanInternal time.Duration
127
127
podTTL time.Duration
128
128
129
- // requestCh is a channel to send the request to cache manager. The cache manager will send back the grpc client in the embedded channel.
130
- requestCh chan * clientConnRequest
129
+ // requestCh is a receive-only channel to receive
130
+ requestCh <- chan * clientConnRequest
131
131
// podReadyCh is a channel to receive the information when a pod is ready.
132
- podReadyCh chan * imagePodAndGRPCClient
132
+ podReadyCh <- chan * imagePodAndGRPCClient
133
133
134
134
cache map [string ]* podAndGRPCClient
135
- waitlists map [string ][]chan * clientConnAndError
135
+ waitlists map [string ][]chan <- * clientConnAndError
136
136
137
137
podManager * podManager
138
138
}
139
139
140
140
type clientConnRequest struct {
141
- ctx context.Context
142
141
image string
143
142
144
143
unavavilable bool
145
144
currClientTarget string
146
145
147
146
// grpcConn is a channel that a grpc client should be sent back.
148
- grpcClientCh chan * clientConnAndError
147
+ grpcClientCh chan <- * clientConnAndError
149
148
}
150
149
151
150
type clientConnAndError struct {
@@ -172,37 +171,49 @@ func (pcm *podCacheManager) podCacheManager() {
172
171
podAndCl , found := pcm .cache [req .image ]
173
172
if found && podAndCl != nil {
174
173
// Ensure the pod still exists and is not being deleted before sending the gprc client back to the channel.
175
- // We can't simplly return grpc client from the cache and let evaluator try to connect to the pod.
174
+ // We can't simply return grpc client from the cache and let evaluator try to connect to the pod.
176
175
// If the pod is deleted by others, it will take ~10 seconds for the evaluator to fail.
177
176
// Wasting 10 second is so much, so we check if the pod still exist first.
178
177
pod := & corev1.Pod {}
179
- err := pcm .podManager .kubeClient .Get (req .ctx , podAndCl .pod , pod )
180
- if err == nil && pod .DeletionTimestamp == nil {
181
- klog .Infof ("reusing the connection to pod %v/%v to evaluate %v" , pod .Namespace , pod .Name , req .image )
182
- req .grpcClientCh <- & clientConnAndError {grpcClient : podAndCl .grpcClient }
183
- go patchPodWithUnixTimeAnnotation (pcm .podManager .kubeClient , podAndCl .pod )
184
- break
178
+ err := pcm .podManager .kubeClient .Get (context .Background (), podAndCl .pod , pod )
179
+ deleteCacheEntry := false
180
+ if err == nil {
181
+ if pod .DeletionTimestamp == nil {
182
+ klog .Infof ("reusing the connection to pod %v/%v to evaluate %v" , pod .Namespace , pod .Name , req .image )
183
+ req .grpcClientCh <- & clientConnAndError {grpcClient : podAndCl .grpcClient }
184
+ go patchPodWithUnixTimeAnnotation (pcm .podManager .kubeClient , podAndCl .pod )
185
+ break
186
+ } else {
187
+ deleteCacheEntry = true
188
+ }
189
+ } else if errors .IsNotFound (err ) {
190
+ deleteCacheEntry = true
191
+ }
192
+ // We delete the cache entry if the pod has been deleted or being deleted.
193
+ if deleteCacheEntry {
194
+ delete (pcm .cache , req .image )
185
195
}
186
196
}
187
197
_ , found = pcm .waitlists [req .image ]
188
198
if ! found {
189
- pcm .waitlists [req .image ] = []chan * clientConnAndError {}
199
+ pcm .waitlists [req .image ] = []chan <- * clientConnAndError {}
190
200
}
191
201
list := pcm .waitlists [req .image ]
192
- list = append (list , req .grpcClientCh )
193
- pcm .waitlists [req .image ] = list
194
- go pcm .podManager .getFuncEvalPodClient (req .ctx , req .image )
202
+ pcm .waitlists [req .image ] = append (list , req .grpcClientCh )
203
+ go pcm .podManager .getFuncEvalPodClient (context .Background (), req .image )
195
204
case resp := <- pcm .podReadyCh :
196
- if resp .err == nil {
205
+ if resp .err != nil {
206
+ klog .Warningf ("received error from the pod manager: %v" , resp .err )
197
207
pcm .cache [resp .image ] = resp .podAndGRPCClient
198
- channels := pcm .waitlists [resp .image ]
199
- delete (pcm .waitlists , resp .image )
200
- for i := range channels {
201
- // The channel has one buffer size, nothing will be blocking.
202
- channels [i ] <- & clientConnAndError {grpcClient : resp .grpcClient }
208
+ }
209
+ channels := pcm .waitlists [resp .image ]
210
+ delete (pcm .waitlists , resp .image )
211
+ for i := range channels {
212
+ // The channel has one buffer size, nothing will be blocking.
213
+ channels [i ] <- & clientConnAndError {
214
+ grpcClient : resp .grpcClient ,
215
+ err : resp .err ,
203
216
}
204
- } else {
205
- klog .Warningf ("received error from the pod manager: %v" , resp .err )
206
217
}
207
218
case <- tick :
208
219
// synchronous GC
@@ -227,40 +238,44 @@ func (pcm *podCacheManager) garbageCollector() {
227
238
continue
228
239
}
229
240
lastUse , found := pod .Annotations [lastUseTimeAnnotation ]
230
- // If a pod doesn't have a last-use annotation, we patch it.
241
+ // If a pod doesn't have a last-use annotation, we patch it. This should not happen, but if it happens,
242
+ // we give another TTL before deleting it.
231
243
if ! found {
232
244
go patchPodWithUnixTimeAnnotation (pcm .podManager .kubeClient , client .ObjectKeyFromObject (& pod ))
233
245
continue
234
246
} else {
235
247
lu , err := strconv .ParseInt (lastUse , 10 , 64 )
236
248
// If the annotation is ill-formatted, we patch it with the current time and will try to GC it later.
249
+ // This should not happen, but if it happens, we give another TTL before deleting it.
237
250
if err != nil {
238
251
klog .Warningf ("unable to convert the Unix time string to int64: %w" , err )
239
252
go patchPodWithUnixTimeAnnotation (pcm .podManager .kubeClient , client .ObjectKeyFromObject (& pod ))
240
253
continue
241
254
}
242
255
if time .Unix (lu , 0 ).Add (pcm .podTTL ).Before (time .Now ()) {
256
+ podIP := pod .Status .PodIP
257
+ go func (po corev1.Pod ) {
258
+ klog .Infof ("deleting pod %v/%v" , po .Namespace , po .Name )
259
+ err := pcm .podManager .kubeClient .Delete (context .Background (), & po )
260
+ if err != nil {
261
+ klog .Warningf ("unable to delete pod %v/%v: %w" , po .Namespace , po .Name , err )
262
+ }
263
+ }(podList .Items [i ])
264
+
243
265
image := pod .Spec .Containers [0 ].Image
244
266
podAndCl , found := pcm .cache [image ]
245
267
if found {
246
- // We delete the cache entry when its grpc client points to the old pod IP.
247
268
host , _ , err := net .SplitHostPort (podAndCl .grpcClient .Target ())
248
- if err != nil {
249
- klog .Warningf ("unable to split the GRPC dialer target to host and port : %w" , err )
269
+ // If the client target in the cache points to a different pod IP, it means the matching pod is not the current pod.
270
+ // We will keep this cache entry.
271
+ if err == nil && host != podIP {
250
272
continue
251
273
}
252
- if host == pod . Status . PodIP {
253
- delete ( pcm . cache , image )
254
- }
274
+ // We delete the cache entry when the IP of the old pod match the client target in the cache
275
+ // or we can't split the host and port in the client target.
276
+ delete ( pcm . cache , image )
255
277
}
256
278
257
- go func (po corev1.Pod ) {
258
- klog .Infof ("deleting pod %v/%v" , po .Namespace , po .Name )
259
- err := pcm .podManager .kubeClient .Delete (context .Background (), & po )
260
- if err != nil {
261
- klog .Warningf ("unable to delete pod %v/%v: %w" , po .Namespace , po .Name , err )
262
- }
263
- }(podList .Items [i ])
264
279
}
265
280
}
266
281
}
@@ -274,9 +289,8 @@ type podManager struct {
274
289
// wrapperServerImage is the image name of the wrapper server
275
290
wrapperServerImage string
276
291
277
- // podReadyCh is a channel to send the grpc client information.
278
- // podCacheManager receives from this channel.
279
- podReadyCh chan * imagePodAndGRPCClient
292
+ // podReadyCh is a channel to receive requests to get GRPC client from each function evaluation request handler.
293
+ podReadyCh chan <- * imagePodAndGRPCClient
280
294
281
295
// entrypointCache is a cache of image name to entrypoint.
282
296
// Only podManager is allowed to touch this cache.
@@ -314,8 +328,8 @@ func (pm *podManager) getFuncEvalPodClient(ctx context.Context, image string) {
314
328
}
315
329
}
316
330
331
+ // imageEntrypoint get the entrypoint of a container image by looking at its metadata.
317
332
func (pm * podManager ) imageEntrypoint (image string ) ([]string , error ) {
318
- // Create pod otherwise.
319
333
var entrypoint []string
320
334
ref , err := name .ParseReference (image )
321
335
if err != nil {
@@ -342,8 +356,7 @@ func (pm *podManager) imageEntrypoint(image string) ([]string, error) {
342
356
}
343
357
344
358
func (pm * podManager ) retrieveOrCreatePod (ctx context.Context , image string ) (client.ObjectKey , error ) {
345
- // Try to retrieve the pod first.
346
- // Lookup the pod by label to see if there is a pod that can be reused.
359
+ // Try to retrieve the pod. Lookup the pod by label to see if there is a pod that can be reused.
347
360
// Looking it up locally may not work if there are more than one instance of the function runner,
348
361
// since the pod may be created by one the other instance and the current instance is not aware of it.
349
362
// TODO: It's possible to set up a Watch in the fn runner namespace, and always try to maintain a up-to-date local cache.
0 commit comments