From 71b380d31eaaacaad4a17227fed3926c17e321c1 Mon Sep 17 00:00:00 2001 From: Abirdcfly Date: Fri, 28 Feb 2025 11:44:40 +0800 Subject: [PATCH] fix: correct non-inherited context Signed-off-by: Abirdcfly --- .../modeladapter/modeladapter_controller.go | 8 ++++---- .../podautoscaler/podautoscaler_controller.go | 2 +- pkg/controller/rayclusterfleet/progress.go | 2 +- .../rayclusterfleet_controller.go | 6 +++--- .../rayclusterreplicaset_controller.go | 6 +++--- pkg/metadata/users.go | 14 +++++++------- pkg/plugins/gateway/gateway.go | 4 ++-- pkg/utils/users.go | 16 ++++++++-------- 8 files changed, 29 insertions(+), 29 deletions(-) diff --git a/pkg/controller/modeladapter/modeladapter_controller.go b/pkg/controller/modeladapter/modeladapter_controller.go index 87d6621e7..bd0ac70c0 100644 --- a/pkg/controller/modeladapter/modeladapter_controller.go +++ b/pkg/controller/modeladapter/modeladapter_controller.go @@ -302,7 +302,7 @@ func (r *ModelAdapterReconciler) Reconcile(ctx context.Context, req ctrl.Request // the finalizer is present, so let's unload lora from those inference engines // note: the base model pod could be deleted as well, so here we do best effort offloading // we do not need to reconcile the object if it encounters the unloading error. - if err := r.unloadModelAdapter(modelAdapter); err != nil { + if err := r.unloadModelAdapter(ctx, modelAdapter); err != nil { return ctrl.Result{}, err } if ok := controllerutil.RemoveFinalizer(modelAdapter, ModelAdapterFinalizer); !ok { @@ -682,7 +682,7 @@ func (r *ModelAdapterReconciler) loadModelAdapter(url string, instance *modelv1a // unloadModelAdapter unloads the loras from inference engines // base model pod could be deleted, in this case, we just do optimistic unloading. It only returns some necessary errors and http errors should not be returned. -func (r *ModelAdapterReconciler) unloadModelAdapter(instance *modelv1alpha1.ModelAdapter) error { +func (r *ModelAdapterReconciler) unloadModelAdapter(ctx context.Context, instance *modelv1alpha1.ModelAdapter) error { if len(instance.Status.Instances) == 0 { klog.Warningf("model adapter %s/%s has not been deployed to any pods yet, skip unloading", instance.GetNamespace(), instance.GetName()) return nil @@ -692,7 +692,7 @@ func (r *ModelAdapterReconciler) unloadModelAdapter(instance *modelv1alpha1.Mode podName := instance.Status.Instances[0] targetPod := &corev1.Pod{} - if err := r.Get(context.TODO(), types.NamespacedName{ + if err := r.Get(ctx, types.NamespacedName{ Namespace: instance.Namespace, Name: podName, }, targetPod); err != nil { @@ -803,7 +803,7 @@ func (r *ModelAdapterReconciler) reconcileEndpointSlice(ctx context.Context, ins // TODO: do necessary refactor to support multiple lora instance podName := instance.Status.Instances[0] pod := &corev1.Pod{} - if err := r.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: podName}, pod); err != nil { + if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: podName}, pod); err != nil { if !apierrors.IsNotFound(err) { klog.Warning("Error getting Pod from lora instance list", err) return ctrl.Result{}, err diff --git a/pkg/controller/podautoscaler/podautoscaler_controller.go b/pkg/controller/podautoscaler/podautoscaler_controller.go index 2c1e24d73..60cf72d07 100644 --- a/pkg/controller/podautoscaler/podautoscaler_controller.go +++ b/pkg/controller/podautoscaler/podautoscaler_controller.go @@ -516,7 +516,7 @@ func (r *PodAutoscalerReconciler) scaleForResourceMappings(ctx context.Context, scale.SetNamespace(namespace) scale.SetName(name) - err := r.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: name}, scale) + err := r.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, scale) if err == nil { return scale, targetGR, nil } diff --git a/pkg/controller/rayclusterfleet/progress.go b/pkg/controller/rayclusterfleet/progress.go index a3aaf9ffb..b07bc6b3d 100644 --- a/pkg/controller/rayclusterfleet/progress.go +++ b/pkg/controller/rayclusterfleet/progress.go @@ -112,7 +112,7 @@ func (r *RayClusterFleetReconciler) syncRolloutStatus(ctx context.Context, allRS newDeployment := d newDeployment.Status = newStatus - err := r.Status().Update(context.Background(), newDeployment) + err := r.Status().Update(ctx, newDeployment) return err } diff --git a/pkg/controller/rayclusterfleet/rayclusterfleet_controller.go b/pkg/controller/rayclusterfleet/rayclusterfleet_controller.go index 6359e6f5f..52dd046f1 100644 --- a/pkg/controller/rayclusterfleet/rayclusterfleet_controller.go +++ b/pkg/controller/rayclusterfleet/rayclusterfleet_controller.go @@ -140,7 +140,7 @@ func (r *RayClusterFleetReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, err } - clusterMap, err := r.getRayClusterMapForFleet(f, rsList) + clusterMap, err := r.getRayClusterMapForFleet(ctx, f, rsList) if err != nil { return ctrl.Result{}, err } @@ -209,7 +209,7 @@ func (r *RayClusterFleetReconciler) getReplicaSetsForFleet(ctx context.Context, return ownedReplicaSets, nil } -func (r *RayClusterFleetReconciler) getRayClusterMapForFleet(f *orchestrationv1alpha1.RayClusterFleet, rsList []*orchestrationv1alpha1.RayClusterReplicaSet) (map[types.UID][]*rayclusterv1.RayCluster, error) { +func (r *RayClusterFleetReconciler) getRayClusterMapForFleet(ctx context.Context, f *orchestrationv1alpha1.RayClusterFleet, rsList []*orchestrationv1alpha1.RayClusterReplicaSet) (map[types.UID][]*rayclusterv1.RayCluster, error) { clusterList := &rayclusterv1.RayClusterList{} // Get all RayClusters matches the fleet @@ -218,7 +218,7 @@ func (r *RayClusterFleetReconciler) getRayClusterMapForFleet(f *orchestrationv1a return nil, err } - err = r.List(context.TODO(), clusterList, client.InNamespace(f.Namespace), client.MatchingLabelsSelector{Selector: selector}) + err = r.List(ctx, clusterList, client.InNamespace(f.Namespace), client.MatchingLabelsSelector{Selector: selector}) if err != nil { return nil, err } diff --git a/pkg/controller/rayclusterreplicaset/rayclusterreplicaset_controller.go b/pkg/controller/rayclusterreplicaset/rayclusterreplicaset_controller.go index eaa7c543d..0f55cb6eb 100644 --- a/pkg/controller/rayclusterreplicaset/rayclusterreplicaset_controller.go +++ b/pkg/controller/rayclusterreplicaset/rayclusterreplicaset_controller.go @@ -153,7 +153,7 @@ func (r *RayClusterReplicaSetReconciler) Reconcile(ctx context.Context, req ctrl // status update if necessary newStatus := calculateStatus(replicaset, filteredClusters, scaleError) - if err := r.updateReplicaSetStatus(replicaset, newStatus, rsKey); err != nil { + if err := r.updateReplicaSetStatus(ctx, replicaset, newStatus, rsKey); err != nil { return reconcile.Result{}, err } @@ -202,7 +202,7 @@ func (r *RayClusterReplicaSetReconciler) scaleDown(ctx context.Context, replicas } // updateReplicaSetStatus attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry. -func (r *RayClusterReplicaSetReconciler) updateReplicaSetStatus(rs *orchestrationv1alpha1.RayClusterReplicaSet, newStatus orchestrationv1alpha1.RayClusterReplicaSetStatus, rsKey string) error { +func (r *RayClusterReplicaSetReconciler) updateReplicaSetStatus(ctx context.Context, rs *orchestrationv1alpha1.RayClusterReplicaSet, newStatus orchestrationv1alpha1.RayClusterReplicaSetStatus, rsKey string) error { // Check if the expectations have been fulfilled for this ReplicaSet if !r.Expectations.SatisfiedExpectations(rsKey) { klog.V(4).Info("Expectations not yet fulfilled for ReplicaSet, delaying status update", "replicaSet", rsKey) @@ -227,7 +227,7 @@ func (r *RayClusterReplicaSetReconciler) updateReplicaSetStatus(rs *orchestratio // Update ReplicaSet status if necessary newInstance := rs.DeepCopy() newInstance.Status = newStatus - if err := r.Status().Update(context.Background(), newInstance); err != nil { + if err := r.Status().Update(ctx, newInstance); err != nil { klog.ErrorS(err, "unable to update ReplicaSet status") return err } diff --git a/pkg/metadata/users.go b/pkg/metadata/users.go index 09b9ac93d..24490eacb 100644 --- a/pkg/metadata/users.go +++ b/pkg/metadata/users.go @@ -62,12 +62,12 @@ func (s *httpServer) createUser(w http.ResponseWriter, r *http.Request) { return } - if utils.CheckUser(u, s.redisClient) { + if utils.CheckUser(r.Context(), u, s.redisClient) { fmt.Fprintf(w, "User: %+v exists", u.Name) return } - if err := utils.SetUser(u, s.redisClient); err != nil { + if err := utils.SetUser(r.Context(), u, s.redisClient); err != nil { http.Error(w, fmt.Sprintf("error occurred on creating user: %+v", err), http.StatusInternalServerError) return } @@ -90,7 +90,7 @@ func (s *httpServer) readUser(w http.ResponseWriter, r *http.Request) { return } - user, err := utils.GetUser(u, s.redisClient) + user, err := utils.GetUser(r.Context(), u, s.redisClient) if err != nil { fmt.Fprint(w, "user does not exists") return @@ -114,12 +114,12 @@ func (s *httpServer) updateUser(w http.ResponseWriter, r *http.Request) { return } - if !utils.CheckUser(u, s.redisClient) { + if !utils.CheckUser(r.Context(), u, s.redisClient) { fmt.Fprintf(w, "User: %+v does not exists", u.Name) return } - if err := utils.SetUser(u, s.redisClient); err != nil { + if err := utils.SetUser(r.Context(), u, s.redisClient); err != nil { http.Error(w, fmt.Sprintf("error occurred on updating user: %+v", err), http.StatusInternalServerError) return } @@ -142,12 +142,12 @@ func (s *httpServer) deleteUser(w http.ResponseWriter, r *http.Request) { return } - if !utils.CheckUser(u, s.redisClient) { + if !utils.CheckUser(r.Context(), u, s.redisClient) { fmt.Fprintf(w, "User: %+v does not exists", u.Name) return } - if err := utils.DelUser(u, s.redisClient); err != nil { + if err := utils.DelUser(r.Context(), u, s.redisClient); err != nil { http.Error(w, fmt.Sprintf("error occurred on deleting user: %+v", err), http.StatusInternalServerError) return } diff --git a/pkg/plugins/gateway/gateway.go b/pkg/plugins/gateway/gateway.go index 7230c8afd..e61f1e883 100644 --- a/pkg/plugins/gateway/gateway.go +++ b/pkg/plugins/gateway/gateway.go @@ -45,7 +45,7 @@ import ( envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/vllm-project/aibrix/pkg/cache" routing "github.com/vllm-project/aibrix/pkg/plugins/gateway/algorithms" - ratelimiter "github.com/vllm-project/aibrix/pkg/plugins/gateway/ratelimiter" + "github.com/vllm-project/aibrix/pkg/plugins/gateway/ratelimiter" "github.com/vllm-project/aibrix/pkg/utils" healthPb "google.golang.org/grpc/health/grpc_health_v1" ) @@ -254,7 +254,7 @@ func (s *Server) HandleRequestHeaders(ctx context.Context, requestID string, req } if username != "" { - user, err = utils.GetUser(utils.User{Name: username}, s.redisClient) + user, err = utils.GetUser(ctx, utils.User{Name: username}, s.redisClient) if err != nil { klog.ErrorS(err, "unable to process user info", "requestID", requestID, "username", username) return generateErrorResponse( diff --git a/pkg/utils/users.go b/pkg/utils/users.go index 9478a7c5d..b70391b39 100644 --- a/pkg/utils/users.go +++ b/pkg/utils/users.go @@ -30,8 +30,8 @@ type User struct { Tpm int64 `json:"tpm"` } -func CheckUser(u User, redisClient *redis.Client) bool { - val, err := redisClient.Exists(context.Background(), genKey(u.Name)).Result() +func CheckUser(ctx context.Context, u User, redisClient *redis.Client) bool { + val, err := redisClient.Exists(ctx, genKey(u.Name)).Result() if err != nil { return false } @@ -39,8 +39,8 @@ func CheckUser(u User, redisClient *redis.Client) bool { return val != 0 } -func GetUser(u User, redisClient *redis.Client) (User, error) { - val, err := redisClient.Get(context.Background(), genKey(u.Name)).Result() +func GetUser(ctx context.Context, u User, redisClient *redis.Client) (User, error) { + val, err := redisClient.Get(ctx, genKey(u.Name)).Result() if err != nil { return User{}, err } @@ -53,7 +53,7 @@ func GetUser(u User, redisClient *redis.Client) (User, error) { return *user, nil } -func SetUser(u User, redisClient *redis.Client) error { +func SetUser(ctx context.Context, u User, redisClient *redis.Client) error { if u.Rpm < 0 || u.Tpm < 0 { return fmt.Errorf("rpm or tpm can not negative") } @@ -63,11 +63,11 @@ func SetUser(u User, redisClient *redis.Client) error { return err } - return redisClient.Set(context.Background(), genKey(u.Name), string(b), 0).Err() + return redisClient.Set(ctx, genKey(u.Name), string(b), 0).Err() } -func DelUser(u User, redisClient *redis.Client) error { - return redisClient.Del(context.Background(), genKey(u.Name)).Err() +func DelUser(ctx context.Context, u User, redisClient *redis.Client) error { + return redisClient.Del(ctx, genKey(u.Name)).Err() } func genKey(s string) string {