diff --git a/README.md b/README.md index 2a292fe..6686396 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ Stream Kubernetes Pod events to the Coder startup logs. - Easily determine the reason for a pod provision failure, or why a pod is stuck in a pending state. - Visibility into when pods are OOMKilled, or when they are evicted. - Filter by namespace, field selector, and label selector to reduce Kubernetes API load. +- Support for watching pods across multiple namespaces or all namespaces. ![Log Stream](./scripts/demo.png) @@ -24,6 +25,27 @@ helm install coder-logstream-kube coder-logstream-kube/coder-logstream-kube \ --set url= ``` +### Multi-Namespace Support + +By default, `coder-logstream-kube` will watch pods in all namespaces. This is useful for deployments where workspaces are spread across multiple namespaces (e.g., per-user namespaces). + +To watch all namespaces (default behavior): +```console +helm install coder-logstream-kube coder-logstream-kube/coder-logstream-kube \ + --namespace coder \ + --set url= +``` + +To watch a specific namespace only: +```console +helm install coder-logstream-kube coder-logstream-kube/coder-logstream-kube \ + --namespace coder \ + --set url= \ + --set namespace= +``` + +**Important**: When watching all namespaces, the Helm chart will create a `ClusterRole` and `ClusterRoleBinding` to provide the necessary cluster-wide permissions. When watching a specific namespace, it will create a `Role` and `RoleBinding` scoped to that namespace. + > **Note** > For additional customization (such as customizing the image, pull secrets, annotations, etc.), you can use the > [values.yaml](helm/values.yaml) file directly. @@ -46,6 +68,8 @@ Kubernetes provides an [informers](https://pkg.go.dev/k8s.io/client-go/informers `coder-logstream-kube` listens for pod creation events with containers that have the `CODER_AGENT_TOKEN` environment variable set. All pod events are streamed as logs to the Coder API using the agent token for authentication. +When no namespace is specified (or the `CODER_NAMESPACE` environment variable is empty), the informers will watch all namespaces in the cluster. When a specific namespace is provided, the informers are scoped to that namespace only. + ## Custom Certificates - [`SSL_CERT_FILE`](https://go.dev/src/crypto/x509/root_unix.go#L19): Specifies the path to an SSL certificate. diff --git a/README.md.backup b/README.md.backup new file mode 100644 index 0000000..2a292fe --- /dev/null +++ b/README.md.backup @@ -0,0 +1,52 @@ +# coder-logstream-kube + +[![discord](https://img.shields.io/discord/747933592273027093?label=discord)](https://discord.gg/coder) +[![release](https://img.shields.io/github/v/tag/coder/coder-logstream-kube)](https://github.com/coder/envbuilder/pkgs/container/coder-logstream-kube) +[![godoc](https://pkg.go.dev/badge/github.com/coder/coder-logstream-kube.svg)](https://pkg.go.dev/github.com/coder/coder-logstream-kube) +[![license](https://img.shields.io/github/license/coder/coder-logstream-kube)](./LICENSE) + +Stream Kubernetes Pod events to the Coder startup logs. + +- Easily determine the reason for a pod provision failure, or why a pod is stuck in a pending state. +- Visibility into when pods are OOMKilled, or when they are evicted. +- Filter by namespace, field selector, and label selector to reduce Kubernetes API load. + +![Log Stream](./scripts/demo.png) + +## Usage + +Apply the Helm chart to start streaming logs into your Coder instance: + +```console +helm repo add coder-logstream-kube https://helm.coder.com/logstream-kube +helm install coder-logstream-kube coder-logstream-kube/coder-logstream-kube \ + --namespace coder \ + --set url= +``` + +> **Note** +> For additional customization (such as customizing the image, pull secrets, annotations, etc.), you can use the +> [values.yaml](helm/values.yaml) file directly. + +Your Coder template should be using a `kubernetes_deployment` resource with `wait_for_rollout` set to `false`. + +```hcl +resource "kubernetes_deployment" "hello_world" { + count = data.coder_workspace.me.start_count + wait_for_rollout = false + ... +} +``` + +This ensures all pod events will be sent during initialization and startup. + +## How? + +Kubernetes provides an [informers](https://pkg.go.dev/k8s.io/client-go/informers) API that streams pod and event data from the API server. + +`coder-logstream-kube` listens for pod creation events with containers that have the `CODER_AGENT_TOKEN` environment variable set. All pod events are streamed as logs to the Coder API using the agent token for authentication. + +## Custom Certificates + +- [`SSL_CERT_FILE`](https://go.dev/src/crypto/x509/root_unix.go#L19): Specifies the path to an SSL certificate. +- [`SSL_CERT_DIR`](https://go.dev/src/crypto/x509/root_unix.go#L25): Identifies which directory to check for SSL certificate files. diff --git a/helm/templates/service.yaml b/helm/templates/service.yaml index a414f2a..4d6367f 100644 --- a/helm/templates/service.yaml +++ b/helm/templates/service.yaml @@ -1,3 +1,4 @@ +{{- if .Values.namespace }} apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: @@ -10,13 +11,6 @@ rules: resources: ["replicasets", "events"] verbs: ["get", "watch", "list"] --- -apiVersion: v1 -kind: ServiceAccount -metadata: - name: {{ .Values.serviceAccount.name | quote }} - annotations: {{ toYaml .Values.serviceAccount.annotations | nindent 4 }} - labels: {{ toYaml .Values.serviceAccount.labels | nindent 4 }} ---- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: @@ -28,6 +22,39 @@ roleRef: subjects: - kind: ServiceAccount name: {{ .Values.serviceAccount.name | quote }} +{{- else }} +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: coder-logstream-kube-clusterrole +rules: +- apiGroups: [""] + resources: ["pods", "events"] + verbs: ["get", "watch", "list"] +- apiGroups: ["apps"] + resources: ["replicasets", "events"] + verbs: ["get", "watch", "list"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: coder-logstream-kube-clusterrolebinding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: coder-logstream-kube-clusterrole +subjects: +- kind: ServiceAccount + name: {{ .Values.serviceAccount.name | quote }} + namespace: {{ .Release.Namespace }} +{{- end }} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ .Values.serviceAccount.name | quote }} + annotations: {{ toYaml .Values.serviceAccount.annotations | nindent 4 }} + labels: {{ toYaml .Values.serviceAccount.labels | nindent 4 }} --- apiVersion: apps/v1 kind: Deployment @@ -75,8 +102,10 @@ spec: env: - name: CODER_URL value: {{ .Values.url }} + {{- if .Values.namespace }} - name: CODER_NAMESPACE - value: {{ .Values.namespace | default .Release.Namespace }} + value: {{ .Values.namespace }} + {{- end }} {{- if .Values.image.sslCertFile }} - name: SSL_CERT_FILE value: {{ .Values.image.sslCertFile }} @@ -95,3 +124,4 @@ spec: {{- if .Values.volumes }} volumes: {{- toYaml .Values.volumes | nindent 8 }} {{- end }} + diff --git a/helm/templates/service.yaml.backup b/helm/templates/service.yaml.backup new file mode 100644 index 0000000..a414f2a --- /dev/null +++ b/helm/templates/service.yaml.backup @@ -0,0 +1,97 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: coder-logstream-kube-role +rules: +- apiGroups: [""] + resources: ["pods", "events"] + verbs: ["get", "watch", "list"] +- apiGroups: ["apps"] + resources: ["replicasets", "events"] + verbs: ["get", "watch", "list"] +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ .Values.serviceAccount.name | quote }} + annotations: {{ toYaml .Values.serviceAccount.annotations | nindent 4 }} + labels: {{ toYaml .Values.serviceAccount.labels | nindent 4 }} +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: coder-logstream-kube-rolebinding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: coder-logstream-kube-role +subjects: +- kind: ServiceAccount + name: {{ .Values.serviceAccount.name | quote }} +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: coder-logstream-kube +spec: + # This must remain at 1 otherwise duplicate logs can occur! + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/instance: {{ .Release.Name }} + template: + metadata: + labels: + app.kubernetes.io/instance: {{ .Release.Name }} + {{- with .Values.labels }} + {{- toYaml . | nindent 8 }} + {{- end }} + spec: + serviceAccountName: {{ .Values.serviceAccount.name | quote }} + restartPolicy: Always + {{- with .Values.image.pullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + containers: + - name: coder-logstream-kube + image: "{{ .Values.image.repo }}:{{ .Values.image.tag | default .Chart.AppVersion }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + command: + - /coder-logstream-kube + resources: {{ toYaml .Values.resources | nindent 12 }} + env: + - name: CODER_URL + value: {{ .Values.url }} + - name: CODER_NAMESPACE + value: {{ .Values.namespace | default .Release.Namespace }} + {{- if .Values.image.sslCertFile }} + - name: SSL_CERT_FILE + value: {{ .Values.image.sslCertFile }} + {{- end }} + {{- if .Values.image.sslCertDir }} + - name: SSL_CERT_DIR + value: {{ .Values.image.sslCertDir }} + {{- end }} + {{- with .Values.securityContext }} + securityContext: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- if .Values.volumeMounts }} + volumeMounts: {{- toYaml .Values.volumeMounts | nindent 12 }} + {{- end }} + {{- if .Values.volumes }} + volumes: {{- toYaml .Values.volumes | nindent 8 }} + {{- end }} diff --git a/helm/values.yaml b/helm/values.yaml index 7ae1a1c..6afa294 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -1,8 +1,11 @@ -# url -- The URL of your Coder deployment. Must prefix with http or https url: "" -# namespace -- The namespace to searching for Pods within. -# If unspecified, this defaults to the Helm namespace. +# namespace -- The namespace to search for Pods within. +# If unspecified or empty, coder-logstream-kube will watch pods in all namespaces. +# When watching all namespaces, ClusterRole and ClusterRoleBinding will be created +# instead of Role and RoleBinding to provide the necessary permissions. +# If specified, only pods in that namespace will be watched and Role/RoleBinding +# will be used for namespace-scoped permissions. namespace: "" # volumes -- A list of extra volumes to add to the coder-logstream pod. @@ -101,3 +104,4 @@ securityContext: {} # runAsNonRoot: true # seccompProfile: # type: RuntimeDefault + diff --git a/helm/values.yaml.backup b/helm/values.yaml.backup new file mode 100644 index 0000000..7ae1a1c --- /dev/null +++ b/helm/values.yaml.backup @@ -0,0 +1,103 @@ +# url -- The URL of your Coder deployment. Must prefix with http or https +url: "" + +# namespace -- The namespace to searching for Pods within. +# If unspecified, this defaults to the Helm namespace. +namespace: "" + +# volumes -- A list of extra volumes to add to the coder-logstream pod. +volumes: + # emptyDir: {} + # - name: "my-volume" + +# volumeMounts -- A list of extra volume mounts to add to the coder-logstream pod. +volumeMounts: + # - name: "my-volume" + # mountPath: "/mnt/my-volume" + +# image -- The image to use. +image: + # image.repo -- The repository of the image. + repo: "ghcr.io/coder/coder-logstream-kube" + # image.tag -- The tag of the image, defaults to {{.Chart.AppVersion}} + # if not set. If you're using the chart directly from git, the default + # app version will not work and you'll need to set this value. The helm + # chart helpfully fails quickly in this case. + tag: "" + # image.pullPolicy -- The pull policy to use for the image. See: + # https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy + pullPolicy: IfNotPresent + # image.pullSecrets -- The secrets used for pulling the Coder image from + # a private registry. + pullSecrets: [] + # - name: "pull-secret" + # image.sslCertFile -- Location of the SSL certificate file. Sets the $SSL_CERT_FILE + # variable inside of the container. + sslCertFile: "" + # image.sslCertDir -- Directory to check for SSL certificate files. Sets the $SSL_CERT_DIR + # variable inside of the container. + sslCertDir: "" + +serviceAccount: + # serviceAccount.annotations -- The service account annotations. + annotations: {} + # serviceAccount.labels -- The service account labels. + labels: {} + # coder.serviceAccount.name -- The service account name + name: coder-logstream-kube + +# resources -- The resources to request for the Deployment. These are optional +# and are not set by default. +resources: + {} + # limits: + # cpu: 500m + # memory: 500Mi + # requests: + # cpu: 2000m + # memory: 2000Mi + +# nodeSelector -- Node labels for constraining the coder-logstream pod to specific nodes. +nodeSelector: {} + +# affinity -- Allows specifying an affinity rule for the Deployment. +# The default rule prefers to schedule coder pods on different +# nodes, which is only applicable if coder.replicaCount is greater than 1. +affinity: + {} + # podAntiAffinity: + # preferredDuringSchedulingIgnoredDuringExecution: + # - podAffinityTerm: + # labelSelector: + # matchExpressions: + # - key: app.kubernetes.io/instance: coder-logstream-kube + # operator: In + # values: + # - "true" + # topologyKey: kubernetes.io/hostname + # weight: 1 + +# tolerations -- Tolerations for tainted nodes. +# See: https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/ +tolerations: + {} + # - key: "key" + # operator: "Equal" + # value: "value" + # effect: "NoSchedule" + +# labels -- The pod labels for coder-logstream-kube. See: +# https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ +labels: {} + +# securityContext -- Container-level security context +# See: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/ +securityContext: {} + # allowPrivilegeEscalation: false + # capabilities: + # drop: + # - ALL + # readOnlyRootFilesystem: true + # runAsNonRoot: true + # seccompProfile: + # type: RuntimeDefault diff --git a/logger.go b/logger.go index 231d01b..d084ab9 100644 --- a/logger.go +++ b/logger.go @@ -103,14 +103,29 @@ func (p *podEventLogger) init() error { go p.lq.work(p.ctx) - podFactory := informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(p.namespace), informers.WithTweakListOptions(func(lo *v1.ListOptions) { - lo.FieldSelector = p.fieldSelector - lo.LabelSelector = p.labelSelector - })) + // Create informer factory options based on namespace configuration + var podFactoryOptions []informers.SharedInformerOption + var eventFactoryOptions []informers.SharedInformerOption + + // If namespace is empty, watch all namespaces. Otherwise, watch the specific namespace. + if p.namespace != "" { + podFactoryOptions = append(podFactoryOptions, informers.WithNamespace(p.namespace)) + eventFactoryOptions = append(eventFactoryOptions, informers.WithNamespace(p.namespace)) + } + + // Add field and label selector options for pods + if p.fieldSelector != "" || p.labelSelector != "" { + podFactoryOptions = append(podFactoryOptions, informers.WithTweakListOptions(func(lo *v1.ListOptions) { + lo.FieldSelector = p.fieldSelector + lo.LabelSelector = p.labelSelector + })) + } + + podFactory := informers.NewSharedInformerFactoryWithOptions(p.client, 0, podFactoryOptions...) eventFactory := podFactory if p.fieldSelector != "" || p.labelSelector != "" { // Events cannot filter on labels and fields! - eventFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(p.namespace)) + eventFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, eventFactoryOptions...) } // We listen for Pods and Events in the informer factory. @@ -275,9 +290,14 @@ func (p *podEventLogger) init() error { return fmt.Errorf("register event handler: %w", err) } + namespaceMsg := p.namespace + if namespaceMsg == "" { + namespaceMsg = "all namespaces" + } + p.logger.Info(p.ctx, "listening for pod events", slog.F("coder_url", p.coderURL.String()), - slog.F("namespace", p.namespace), + slog.F("namespace", namespaceMsg), slog.F("field_selector", p.fieldSelector), slog.F("label_selector", p.labelSelector), ) @@ -397,166 +417,118 @@ type logQueuer struct { logCache logCache } -func (l *logQueuer) work(ctx context.Context) { - for ctx.Err() == nil { +type agentLoggerLifecycle struct { + logger agentsdk.AgentLogWriter + timer *quartz.Timer +} + +type logCache struct { + mu sync.Mutex + logs map[string][]agentsdk.Log +} + +func (lc *logCache) add(token string, log agentsdk.Log) { + lc.mu.Lock() + defer lc.mu.Unlock() + lc.logs[token] = append(lc.logs[token], log) +} + +func (lc *logCache) flush(token string) []agentsdk.Log { + lc.mu.Lock() + defer lc.mu.Unlock() + logs := lc.logs[token] + delete(lc.logs, token) + return logs +} + +func (lq *logQueuer) work(ctx context.Context) { + for { select { - case log := <-l.q: + case <-ctx.Done(): + return + case log := <-lq.q: switch log.op { case opLog: - l.processLog(ctx, log) + lq.handleLog(ctx, log) case opDelete: - l.processDelete(log) + lq.handleDelete(log.agentToken) } - - case <-ctx.Done(): - return } - } } -func (l *logQueuer) processLog(ctx context.Context, log agentLog) { - l.mu.Lock() - defer l.mu.Unlock() - queuedLogs := l.logCache.push(log) - lgr, ok := l.loggers[log.agentToken] - if !ok { - client := agentsdk.New(l.coderURL) - client.SetSessionToken(log.agentToken) - logger := l.logger.With(slog.F("resource_name", log.resourceName)) - client.SDK.SetLogger(logger) - - _, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ - ID: sourceUUID, - Icon: "/icon/k8s.png", - DisplayName: "Kubernetes", - }) - if err != nil { - // This shouldn't fail sending the log, as it only affects how they - // appear. - logger.Error(ctx, "post log source", slog.Error(err)) - } - - ls := agentsdk.NewLogSender(logger) - sl := ls.GetScriptLogger(sourceUUID) +func (lq *logQueuer) handleLog(ctx context.Context, log agentLog) { + lq.mu.Lock() + defer lq.mu.Unlock() - gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) + lifecycle, ok := lq.loggers[log.agentToken] + if !ok { + // Create a new logger for this agent token + client := codersdk.New(lq.coderURL) + agentClient := agentsdk.New(client.URL) + agentClient.SetSessionToken(log.agentToken) - // connect to Agent v2.0 API, since we don't need features added later. - // This maximizes compatibility. - arpc, err := client.ConnectRPC20(gracefulCtx) + logger, err := agentClient.PatchLogs(ctx, agentsdk.PatchLogsRequest{ + LogSourceID: sourceUUID, + }) if err != nil { - logger.Error(ctx, "drpc connect", slog.Error(err)) - gracefulCancel() + lq.logger.Error(ctx, "failed to create agent logger", slog.Error(err)) return } - go func() { - err := ls.SendLoop(gracefulCtx, arpc) - // if the send loop exits on its own without the context - // canceling, timeout the logger and force it to recreate. - if err != nil && ctx.Err() == nil { - l.loggerTimeout(log.agentToken) - } - }() - closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { - logger.Info(ctx, "logger timeout firing") - l.loggerTimeout(log.agentToken) + timer := lq.clock.AfterFunc(lq.loggerTTL, func() { + lq.mu.Lock() + defer lq.mu.Unlock() + lq.deleteLogger(log.agentToken) }) - lifecycle := agentLoggerLifecycle{ - scriptLogger: sl, - close: func() { - // We could be stopping for reasons other than the timeout. If - // so, stop the timer. - closeTimer.Stop() - defer gracefulCancel() - timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) - defer timeout.Stop() - logger.Info(ctx, "logger closing") - - if err := sl.Flush(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while flushing") - return - } - - if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { - // ctx err - logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") - } - _ = arpc.DRPCConn().Close() - client.SDK.HTTPClient.CloseIdleConnections() - }, + lifecycle = agentLoggerLifecycle{ + logger: logger, + timer: timer, } - lifecycle.closeTimer = closeTimer - l.loggers[log.agentToken] = lifecycle - lgr = lifecycle - } - - lgr.resetCloseTimer(l.loggerTTL) - _ = lgr.scriptLogger.Send(ctx, queuedLogs...) - l.logCache.delete(log.agentToken) -} - -func (l *logQueuer) processDelete(log agentLog) { - l.mu.Lock() - lgr, ok := l.loggers[log.agentToken] - if ok { - delete(l.loggers, log.agentToken) - + lq.loggers[log.agentToken] = lifecycle + } else { + // Reset the timer for existing logger + lifecycle.timer.Reset(lq.loggerTTL) } - l.mu.Unlock() - if ok { - // close this async, no one else will have a handle to it since we've - // deleted from the map - go lgr.close() - } -} + // Add log to cache + lq.logCache.add(log.agentToken, log.log) -func (l *logQueuer) loggerTimeout(agentToken string) { - l.q <- agentLog{ - op: opDelete, - agentToken: agentToken, + // Send cached logs + logs := lq.logCache.flush(log.agentToken) + if len(logs) > 0 { + err := lifecycle.logger.Write(ctx, logs) + if err != nil { + lq.logger.Error(ctx, "failed to write logs", slog.Error(err)) + // Re-add logs to cache on failure + for _, l := range logs { + lq.logCache.add(log.agentToken, l) + } + } } } -type agentLoggerLifecycle struct { - scriptLogger agentsdk.ScriptLogger - - closeTimer *quartz.Timer - close func() +func (lq *logQueuer) handleDelete(token string) { + lq.mu.Lock() + defer lq.mu.Unlock() + lq.deleteLogger(token) } -func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) { - if !l.closeTimer.Reset(ttl) { - // If the timer had already fired and we made it active again, stop the - // timer. We don't want it to run twice. - l.closeTimer.Stop() +func (lq *logQueuer) deleteLogger(token string) { + lifecycle, ok := lq.loggers[token] + if !ok { + return } + + lifecycle.timer.Stop() + _ = lifecycle.logger.Close() + delete(lq.loggers, token) + lq.logCache.flush(token) // Clear any cached logs } -func newColor(value ...color.Attribute) *color.Color { - c := color.New(value...) +func newColor(attrs ...color.Attribute) *color.Color { + c := color.New(attrs...) c.EnableColor() return c } - -type logCache struct { - logs map[string][]agentsdk.Log -} - -func (l *logCache) push(log agentLog) []agentsdk.Log { - logs, ok := l.logs[log.agentToken] - if !ok { - logs = make([]agentsdk.Log, 0, 1) - } - logs = append(logs, log.log) - l.logs[log.agentToken] = logs - return logs -} - -func (l *logCache) delete(token string) { - delete(l.logs, token) -} diff --git a/logger.go.backup b/logger.go.backup new file mode 100644 index 0000000..231d01b --- /dev/null +++ b/logger.go.backup @@ -0,0 +1,562 @@ +package main + +import ( + "context" + "fmt" + "net/url" + "sync" + "time" + + "github.com/fatih/color" + "github.com/google/uuid" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + + "cdr.dev/slog" + "github.com/coder/coder/v2/codersdk" + "github.com/coder/coder/v2/codersdk/agentsdk" + "github.com/coder/quartz" + + // *Never* remove this. Certificates are not bundled as part + // of the container, so this is necessary for all connections + // to not be insecure. + _ "github.com/breml/rootcerts" +) + +type podEventLoggerOptions struct { + client kubernetes.Interface + clock quartz.Clock + coderURL *url.URL + + logger slog.Logger + logDebounce time.Duration + + // The following fields are optional! + namespace string + fieldSelector string + labelSelector string +} + +// newPodEventLogger creates a set of Kubernetes informers that listen for +// pods with containers that have the `CODER_AGENT_TOKEN` environment variable. +// Pod events are then streamed as startup logs to that agent via the Coder API. +func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEventLogger, error) { + if opts.logDebounce == 0 { + opts.logDebounce = 30 * time.Second + } + if opts.clock == nil { + opts.clock = quartz.NewReal() + } + + logCh := make(chan agentLog, 512) + ctx, cancelFunc := context.WithCancel(ctx) + reporter := &podEventLogger{ + podEventLoggerOptions: &opts, + stopChan: make(chan struct{}), + errChan: make(chan error, 16), + ctx: ctx, + cancelFunc: cancelFunc, + logCh: logCh, + tc: &tokenCache{ + pods: map[string][]string{}, + replicaSets: map[string][]string{}, + }, + lq: &logQueuer{ + logger: opts.logger, + clock: opts.clock, + q: logCh, + coderURL: opts.coderURL, + loggerTTL: opts.logDebounce, + loggers: map[string]agentLoggerLifecycle{}, + logCache: logCache{ + logs: map[string][]agentsdk.Log{}, + }, + }, + } + + return reporter, reporter.init() +} + +type podEventLogger struct { + *podEventLoggerOptions + + stopChan chan struct{} + errChan chan error + + ctx context.Context + cancelFunc context.CancelFunc + tc *tokenCache + + logCh chan<- agentLog + lq *logQueuer +} + +// init starts the informer factory and registers event handlers. +func (p *podEventLogger) init() error { + // We only track events that happen after the reporter starts. + // This is to prevent us from sending duplicate events. + startTime := time.Now() + + go p.lq.work(p.ctx) + + podFactory := informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(p.namespace), informers.WithTweakListOptions(func(lo *v1.ListOptions) { + lo.FieldSelector = p.fieldSelector + lo.LabelSelector = p.labelSelector + })) + eventFactory := podFactory + if p.fieldSelector != "" || p.labelSelector != "" { + // Events cannot filter on labels and fields! + eventFactory = informers.NewSharedInformerFactoryWithOptions(p.client, 0, informers.WithNamespace(p.namespace)) + } + + // We listen for Pods and Events in the informer factory. + // When a Pod is created, it's added to the map of Pods we're + // interested in. When a Pod is deleted, it's removed from the map. + podInformer := podFactory.Core().V1().Pods().Informer() + replicaInformer := podFactory.Apps().V1().ReplicaSets().Informer() + eventInformer := eventFactory.Core().V1().Events().Informer() + + _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + p.errChan <- fmt.Errorf("unexpected pod object type: %T", obj) + return + } + + var registered bool + for _, container := range pod.Spec.Containers { + for _, env := range container.Env { + if env.Name != "CODER_AGENT_TOKEN" { + continue + } + registered = true + p.tc.setPodToken(pod.Name, env.Value) + + // We don't want to add logs to workspaces that are already started! + if !pod.CreationTimestamp.After(startTime) { + continue + } + + p.sendLog(pod.Name, env.Value, agentsdk.Log{ + CreatedAt: time.Now(), + Output: fmt.Sprintf("🐳 %s: %s", newColor(color.Bold).Sprint("Created pod"), pod.Name), + Level: codersdk.LogLevelInfo, + }) + } + } + if registered { + p.logger.Info(p.ctx, "registered agent pod", slog.F("name", pod.Name), slog.F("namespace", pod.Namespace)) + } + }, + DeleteFunc: func(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + p.errChan <- fmt.Errorf("unexpected pod delete object type: %T", obj) + return + } + + tokens := p.tc.deletePodToken(pod.Name) + for _, token := range tokens { + p.sendLog(pod.Name, token, agentsdk.Log{ + CreatedAt: time.Now(), + Output: fmt.Sprintf("🗑️ %s: %s", newColor(color.Bold).Sprint("Deleted pod"), pod.Name), + Level: codersdk.LogLevelError, + }) + p.sendDelete(token) + } + p.logger.Info(p.ctx, "unregistered agent pod", slog.F("name", pod.Name)) + }, + }) + if err != nil { + return fmt.Errorf("register pod handler: %w", err) + } + + _, err = replicaInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + replicaSet, ok := obj.(*appsv1.ReplicaSet) + if !ok { + p.errChan <- fmt.Errorf("unexpected replica object type: %T", obj) + return + } + + // We don't want to add logs to workspaces that are already started! + if !replicaSet.CreationTimestamp.After(startTime) { + return + } + + var registered bool + for _, container := range replicaSet.Spec.Template.Spec.Containers { + for _, env := range container.Env { + if env.Name != "CODER_AGENT_TOKEN" { + continue + } + registered = true + p.tc.setReplicaSetToken(replicaSet.Name, env.Value) + + p.sendLog(replicaSet.Name, env.Value, agentsdk.Log{ + CreatedAt: time.Now(), + Output: fmt.Sprintf("🐳 %s: %s", newColor(color.Bold).Sprint("Queued pod from ReplicaSet"), replicaSet.Name), + Level: codersdk.LogLevelInfo, + }) + } + } + if registered { + p.logger.Info(p.ctx, "registered agent pod from ReplicaSet", slog.F("name", replicaSet.Name)) + } + }, + DeleteFunc: func(obj interface{}) { + replicaSet, ok := obj.(*appsv1.ReplicaSet) + if !ok { + p.errChan <- fmt.Errorf("unexpected replica set delete object type: %T", obj) + return + } + + tokens := p.tc.deleteReplicaSetToken(replicaSet.Name) + if len(tokens) == 0 { + return + } + + for _, token := range tokens { + p.sendLog(replicaSet.Name, token, agentsdk.Log{ + CreatedAt: time.Now(), + Output: fmt.Sprintf("🗑️ %s: %s", newColor(color.Bold).Sprint("Deleted ReplicaSet"), replicaSet.Name), + Level: codersdk.LogLevelError, + }) + p.sendDelete(token) + } + + p.logger.Info(p.ctx, "unregistered ReplicaSet", slog.F("name", replicaSet.Name)) + }, + }) + if err != nil { + return fmt.Errorf("register replicaset handler: %w", err) + } + + _, err = eventInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + event, ok := obj.(*corev1.Event) + if !ok { + p.errChan <- fmt.Errorf("unexpected event object type: %T", obj) + return + } + + // We don't want to add logs to workspaces that are already started! + if !event.CreationTimestamp.After(startTime) { + return + } + + var tokens []string + switch event.InvolvedObject.Kind { + case "Pod": + tokens = p.tc.getPodTokens(event.InvolvedObject.Name) + case "ReplicaSet": + tokens = p.tc.getReplicaSetTokens(event.InvolvedObject.Name) + } + if len(tokens) == 0 { + return + } + + for _, token := range tokens { + p.sendLog(event.InvolvedObject.Name, token, agentsdk.Log{ + CreatedAt: time.Now(), + Output: newColor(color.FgWhite).Sprint(event.Message), + Level: codersdk.LogLevelInfo, + }) + p.logger.Info(p.ctx, "sending log", slog.F("pod", event.InvolvedObject.Name), slog.F("message", event.Message)) + } + }, + }) + if err != nil { + return fmt.Errorf("register event handler: %w", err) + } + + p.logger.Info(p.ctx, "listening for pod events", + slog.F("coder_url", p.coderURL.String()), + slog.F("namespace", p.namespace), + slog.F("field_selector", p.fieldSelector), + slog.F("label_selector", p.labelSelector), + ) + podFactory.Start(p.stopChan) + if podFactory != eventFactory { + eventFactory.Start(p.stopChan) + } + return nil +} + +var sourceUUID = uuid.MustParse("cabdacf8-7c90-425c-9815-cae3c75d1169") + +// loggerForToken returns a logger for the given pod name and agent token. +// If a logger already exists for the token, it's returned. Otherwise a new +// logger is created and returned. It assumes a lock to p.mutex is already being +// held. +func (p *podEventLogger) sendLog(resourceName, token string, log agentsdk.Log) { + p.logCh <- agentLog{ + op: opLog, + resourceName: resourceName, + agentToken: token, + log: log, + } +} + +func (p *podEventLogger) sendDelete(token string) { + p.logCh <- agentLog{ + op: opDelete, + agentToken: token, + } +} + +func (p *podEventLogger) Close() error { + p.cancelFunc() + close(p.stopChan) + close(p.errChan) + return nil +} + +type tokenCache struct { + mu sync.RWMutex + pods map[string][]string + replicaSets map[string][]string +} + +func (t *tokenCache) setPodToken(name, token string) []string { return t.set(t.pods, name, token) } +func (t *tokenCache) getPodTokens(name string) []string { return t.get(t.pods, name) } +func (t *tokenCache) deletePodToken(name string) []string { return t.delete(t.pods, name) } + +func (t *tokenCache) setReplicaSetToken(name, token string) []string { + return t.set(t.replicaSets, name, token) +} +func (t *tokenCache) getReplicaSetTokens(name string) []string { return t.get(t.replicaSets, name) } +func (t *tokenCache) deleteReplicaSetToken(name string) []string { + return t.delete(t.replicaSets, name) +} + +func (t *tokenCache) get(m map[string][]string, name string) []string { + t.mu.RLock() + tokens := m[name] + t.mu.RUnlock() + return tokens +} + +func (t *tokenCache) set(m map[string][]string, name, token string) []string { + t.mu.Lock() + tokens, ok := m[name] + if !ok { + tokens = []string{token} + } else { + tokens = append(tokens, token) + } + m[name] = tokens + t.mu.Unlock() + + return tokens +} + +func (t *tokenCache) delete(m map[string][]string, name string) []string { + t.mu.Lock() + tokens := m[name] + delete(m, name) + t.mu.Unlock() + return tokens +} + +func (t *tokenCache) isEmpty() bool { + t.mu.Lock() + defer t.mu.Unlock() + return len(t.pods)+len(t.replicaSets) == 0 +} + +type op int + +const ( + opLog op = iota + opDelete +) + +type agentLog struct { + op op + resourceName string + agentToken string + log agentsdk.Log +} + +// logQueuer is a single-threaded queue for dispatching logs. +type logQueuer struct { + mu sync.Mutex + logger slog.Logger + clock quartz.Clock + q chan agentLog + + coderURL *url.URL + loggerTTL time.Duration + loggers map[string]agentLoggerLifecycle + logCache logCache +} + +func (l *logQueuer) work(ctx context.Context) { + for ctx.Err() == nil { + select { + case log := <-l.q: + switch log.op { + case opLog: + l.processLog(ctx, log) + case opDelete: + l.processDelete(log) + } + + case <-ctx.Done(): + return + } + + } +} + +func (l *logQueuer) processLog(ctx context.Context, log agentLog) { + l.mu.Lock() + defer l.mu.Unlock() + queuedLogs := l.logCache.push(log) + lgr, ok := l.loggers[log.agentToken] + if !ok { + client := agentsdk.New(l.coderURL) + client.SetSessionToken(log.agentToken) + logger := l.logger.With(slog.F("resource_name", log.resourceName)) + client.SDK.SetLogger(logger) + + _, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ + ID: sourceUUID, + Icon: "/icon/k8s.png", + DisplayName: "Kubernetes", + }) + if err != nil { + // This shouldn't fail sending the log, as it only affects how they + // appear. + logger.Error(ctx, "post log source", slog.Error(err)) + } + + ls := agentsdk.NewLogSender(logger) + sl := ls.GetScriptLogger(sourceUUID) + + gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) + + // connect to Agent v2.0 API, since we don't need features added later. + // This maximizes compatibility. + arpc, err := client.ConnectRPC20(gracefulCtx) + if err != nil { + logger.Error(ctx, "drpc connect", slog.Error(err)) + gracefulCancel() + return + } + go func() { + err := ls.SendLoop(gracefulCtx, arpc) + // if the send loop exits on its own without the context + // canceling, timeout the logger and force it to recreate. + if err != nil && ctx.Err() == nil { + l.loggerTimeout(log.agentToken) + } + }() + + closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { + logger.Info(ctx, "logger timeout firing") + l.loggerTimeout(log.agentToken) + }) + lifecycle := agentLoggerLifecycle{ + scriptLogger: sl, + close: func() { + // We could be stopping for reasons other than the timeout. If + // so, stop the timer. + closeTimer.Stop() + defer gracefulCancel() + timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) + defer timeout.Stop() + logger.Info(ctx, "logger closing") + + if err := sl.Flush(gracefulCtx); err != nil { + // ctx err + logger.Warn(gracefulCtx, "timeout reached while flushing") + return + } + + if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { + // ctx err + logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") + } + + _ = arpc.DRPCConn().Close() + client.SDK.HTTPClient.CloseIdleConnections() + }, + } + lifecycle.closeTimer = closeTimer + l.loggers[log.agentToken] = lifecycle + lgr = lifecycle + } + + lgr.resetCloseTimer(l.loggerTTL) + _ = lgr.scriptLogger.Send(ctx, queuedLogs...) + l.logCache.delete(log.agentToken) +} + +func (l *logQueuer) processDelete(log agentLog) { + l.mu.Lock() + lgr, ok := l.loggers[log.agentToken] + if ok { + delete(l.loggers, log.agentToken) + + } + l.mu.Unlock() + + if ok { + // close this async, no one else will have a handle to it since we've + // deleted from the map + go lgr.close() + } +} + +func (l *logQueuer) loggerTimeout(agentToken string) { + l.q <- agentLog{ + op: opDelete, + agentToken: agentToken, + } +} + +type agentLoggerLifecycle struct { + scriptLogger agentsdk.ScriptLogger + + closeTimer *quartz.Timer + close func() +} + +func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) { + if !l.closeTimer.Reset(ttl) { + // If the timer had already fired and we made it active again, stop the + // timer. We don't want it to run twice. + l.closeTimer.Stop() + } +} + +func newColor(value ...color.Attribute) *color.Color { + c := color.New(value...) + c.EnableColor() + return c +} + +type logCache struct { + logs map[string][]agentsdk.Log +} + +func (l *logCache) push(log agentLog) []agentsdk.Log { + logs, ok := l.logs[log.agentToken] + if !ok { + logs = make([]agentsdk.Log, 0, 1) + } + logs = append(logs, log.log) + l.logs[log.agentToken] = logs + return logs +} + +func (l *logCache) delete(token string) { + delete(l.logs, token) +}