k8s中的Informer

本篇文章由 AI 辅助完成

Informer 就是一个带“自动更新”功能的本地缓存(Local Cache)。

直观理解:订阅模式 vs. 轮询模式

想象你要==监控集群里的 Pod 状态==:

  1. 笨办法(直接调用 Client): 你每隔 5 秒问一次 API Server:“现在的 Pod 列表是什么?”

    • 后果:集群里有 1 万个 Pod,你每秒问一次,API Server 的 QPS 瞬间爆炸,你会被限流(Rate Limit)。
    • 术语:这叫 Polling(轮询),效率低,延迟高。
  2. 聪明办法(使用 Informer): 你告诉 API Server:“我要订阅 Pod 的变化”。

    • 过程
      1. 先全量拿一次列表(List),存到你本地内存里。
      2. 建立长连接(Watch),只要有变化,API Server 主动推给你。
      3. 你更新本地内存,并触发你的回调函数(Callback)。
    • 后果:大部分查询直接读本地内存,零网络开销。只有变化时才通信。
    • 术语:这叫 Watch 机制 + Client-side Caching

代码极简版

核心就三步:创建 -> 注册回调 -> 启动

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main

import (
    "context"
    "time"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/klog/v2"
)

func main() {
    // 假设 clientset 已经初始化好
    var clientset *kubernetes.Clientset 

    // 1. 创建 Informer:监听所有 Namespace 的 Pod
    // 参数 30*time.Minute 是 Resync 周期,意思是即使没变化,每隔这么久也重新同步一次(防丢失)
    factory := informers.NewSharedInformerFactory(clientset, 30*time.Minute)
    podInformer := factory.Core().V1().Pods().Informer()

    // 2. 注册回调:告诉 Informer 有事了叫我
    podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            pod := obj.(*corev1.Pod)
            klog.Infof("捕获新增 Pod: %s", pod.Name)
            // 这里写你的业务逻辑,比如加入工作队列
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            // 只有真正变化了才处理,避免噪音
            oldPod := oldObj.(*corev1.Pod)
            newPod := newObj.(*corev1.Pod)
            if oldPod.ResourceVersion == newPod.ResourceVersion {
                return 
            }
            klog.Infof("捕获更新 Pod: %s", newPod.Name)
        },
        DeleteFunc: func(obj interface{}) {
            klog.Infof("捕获删除 Pod")
        },
    })

    // 3. 启动
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // 异步启动 Informer
    go factory.Start(ctx.Done())

    // 等待缓存同步完成(这步很重要,否则查不到数据)
    factory.WaitForCacheSync(ctx.Done())

    klog.Info("Informer 已就绪,正在监听...")
    
    // 阻塞主线程
    <-ctx.Done()
}

为什么要用 Informer?

如果你只是写个脚本一次性查个数据,用 clientset.CoreV1().Pods().Get() 没问题。

但如果在写 Controller长期运行的服务

方案读数据来源API Server 压力实时性推荐场景
直接 Client远程 API Server高 (每次请求都打过去)高 (强一致)一次性脚本、Admission Webhook
Informer本地内存 Cache低 (长连接复用)中 (最终一致)Controller、Operator、后台服务

除非你有强一致性要求(比如必须确保读到的是毫秒前的状态),否则永远优先选 Informer**。保护 API Server 就是保护集群的稳定性。

初学者最容易踩的坑

  1. 在回调里写死循环或耗时操作AddFunc 是阻塞的。如果你在这里 sleep 10s 或者调外部 API,整个 Informer 就卡住了,后续事件全堆积。正确做法是:回调里只把对象 key 丢进一个 WorkQueue,另起一个 worker 去处理
  2. 忽略 WaitForCacheSync: 启动 Informer 后,本地缓存是空的。如果不等同步完成就直接查缓存,会得到空结果。

下一步思考

你现在明白了 Informer 是“本地缓存 + 事件通知”。

那么,如果你的程序重启了,重启期间发生的 Pod 变化事件,Informer 是怎么保证不丢失的?是靠 Resync 机制,还是靠 ResourceVersion 的断点续传?

试着去想一下这个问题,这能帮你理解 K8s 的 最终一致性(Eventually Consistent) 设计哲学。

Licensed under CC BY-NC-SA 4.0
使用 Hugo 构建
主题 StackJimmy 设计