介绍
controller中的传感器主要由Reflector, Informer, Indexer组成
- Reflector通过List&Watch kube-apiserver来获取k8s资源数据,获取到资源数据后,会在Delta队列放入一个包括资源对象信息本身以及资源对象事件类型的Delta记录
- Informer不断从Delta队列中弹出Delta记录,一方面把事件交给事件回调函数,另一方面把资源对象交给Indexer。
- Indexer把资源记录在一个缓存中
controller中的控制器主要由事件处理函数及worker组成
- 事件处理函数会监听Informer中资源的新增、更新、删除事件,并根据控制器的逻辑决定是否需要处理
- 对于需要处理的事件,会把相关信息放到工作队列中,并由后续worker池中的worker来处理
- worker在处理资源对象时一般需要用资源对象的名字去缓存中重新获取最新的资源数据
源码阅读
1、kube-controller-manager.Run方法会调用sharedInformerFactory.Start;
/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go
controllerContext.InformerFactory.Start(controllerContext.Stop)
/k8s.io/kubernetes/staging/src/k8s.io/client-go/informers/factory.go
go informer.Run(stopCh) 这句话表明调用了sharedInformerFactory.Start
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {f.lock.Lock()defer f.lock.Unlock()
?for informerType, informer := range f.informers {if !f.startedInformers[informerType] {//informer的Run方法。此方法根据informer的配置实例化一个client-go包中的controller对象(此controller非controller manager的controller),//然后调用controller的Run方法。见k8s.io/client-go/tools/cache/controller.go 的Run方法//~重点~启动Informer本质上是调用了controller的reflector的Run方法。go informer.Run(stopCh)f.startedInformers[informerType] = true}}
}
可以看出sharedInformerFactory中包含若干个SharedIndexInformer
type sharedInformerFactory struct {client kubernetes.Interfacenamespace stringtweakListOptions internalinterfaces.TweakListOptionsFunclock sync.MutexdefaultResync time.DurationcustomResync map[reflect.Type]time.Duration
?informers map[reflect.Type]cache.SharedIndexInformer// startedInformers is used for tracking which informers have been started.// This allows Start() to be called multiple times safely.startedInformers map[reflect.Type]bool
}
2、sharedInformerFactory.Start会调用SharedIndexInformer.Run;
/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/shared_informer.go
首先看一下结构体,SharedIndexInformer中包含controller,processor,listerWatcher等
type sharedIndexInformer struct {indexer Indexercontroller Controller
?processor *sharedProcessorcacheMutationDetector MutationDetector
?// This block is tracked to handle late initialization of the controllerlisterWatcher ListerWatcherobjectType runtime.Object
?// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call// shouldResync to check if any of our listeners need a resync.resyncCheckPeriod time.Duration// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default// value).defaultEventHandlerResyncPeriod time.Duration// clock allows for testabilityclock clock.Clock
?started, stopped boolstartedLock sync.Mutex
?// blockDeltas gives a way to stop all event distribution so that a late event handler// can safely join the shared informer.blockDeltas sync.Mutex
}
其次重点看一下Run方法
- 调用NewDeltaFIFO,创建queue;
- 定义Deltas处理函数s.HandleDeltas;
- 调用New(cfg),构建sharedIndexInformer的controller;
- 调用s.cacheMutationDetector.Run,检查缓存对象是否变化;
- 调用s.processor.run,将调用sharedProcessor.run,会调用Listener.run和Listener.pop,执行处理queue的函数;
- 调用s.controller.Run,构建Reflector,进行对etcd的缓存;这一步会详细分析
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()
?//调用NewDeltaFIFO,创建queue;fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
?cfg := &Config{Queue: fifo,ListerWatcher: s.listerWatcher,ObjectType: s.objectType,FullResyncPeriod: s.resyncCheckPeriod,RetryOnError: false,ShouldResync: s.processor.shouldResync,
?//定义Deltas处理函数s.HandleDeltasProcess: s.HandleDeltas,}
?func() {s.startedLock.Lock()defer s.startedLock.Unlock()
?//调用New(cfg),构建sharedIndexInformer的controller;s.controller = New(cfg)s.controller.(*controller).clock = s.clocks.started = true}()
?// Separate stop channel because Processor should be stopped strictly after controllerprocessorStopCh := make(chan struct{})var wg wait.Groupdefer wg.Wait() // Wait for Processor to stopdefer close(processorStopCh) // Tell Processor to stop
?//调用s.cacheMutationDetector.Run,检查缓存对象是否变化;wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)//调用s.processor.run,将调用sharedProcessor.run,会调用Listener.run和Listener.pop,执行处理queue的函数;wg.StartWithChannel(processorStopCh, s.processor.run)
?defer func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.stopped = true // Don't want any new listeners}()//调用s.controller.Run,构建Reflector,进行对etcd的缓存;s.controller.Run(stopCh)
}
看一下sharedProcessor的结构,负责管理listener
//sharedProcessor负责管理listener
type sharedProcessor struct {listenersStarted boollistenersLock sync.RWMutexlisteners []*processorListenersyncingListeners []*processorListenerclock clock.Clockwg wait.Group
}
sharedProcessor.run,会调用Listener.run和Listener.pop,执行处理queue的函数
func (p *sharedProcessor) run(stopCh <-chan struct{}) {func() {p.listenersLock.RLock()defer p.listenersLock.RUnlock()//遍历p.listeners,调用listener.run 以及 listener.pop方法for _, listener := range p.listeners {p.wg.Start(listener.run)p.wg.Start(listener.pop)}p.listenersStarted = true}()<-stopChp.listenersLock.RLock()defer p.listenersLock.RUnlock()for _, listener := range p.listeners {close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop}p.wg.Wait() // Wait for all .pop() and .run() to stop
}
3、sharedIndexedInformer.controller.Run
/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/controller.go
先看一下controller的结构
// Controller is a generic controller framework.
type controller struct {config Configreflector *ReflectorreflectorMutex sync.RWMutexclock clock.Clock
}
再看一下run方法的步骤:
- 调用NewReflector,构建Reflector;
- Reflector对象,包括ListerWatcher、ObjectType、Queue、FullResyncPeriod;
- 调用r.run,将调用reflector.ListAndWatch,执行r.List、r.watch、r.watchHandler,进行对etcd的缓存;
- 调用c.processLoop(controller.processLoop),reflector向queue里面添加数据,processLoop会不停去消费这里这些数据;这一步会详细分析
func (c *controller) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()go func() {<-stopChc.config.Queue.Close()}()//实例化一个Reflector对象// Reflector也是k8s中的一个概念,作用在于通过List-Watch机制,与API Server连接,及时获取监听的k8s资源的变化。// 这一步通过调用reflector的Run方法来实现。Informer正是通过这一机制,在自身被动传达API Server发送的通知的同时,也会主动向API Server获取资源变化。//Reflector对象,包括ListerWatcher、ObjectType、Queue、FullResyncPeriod;r := NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,)r.ShouldResync = c.config.ShouldResyncr.clock = c.clock
?c.reflectorMutex.Lock()// 将自己的reflector字段设置为这个对象c.reflector = rc.reflectorMutex.Unlock()
?var wg wait.Groupdefer wg.Wait()
?//调用对象的Run方法//调用r.run,将调用reflector.ListAndWatch,执行r.List、r.watch、r.watchHandler,进行对etcd的缓存;wg.StartWithChannel(stopCh, r.Run)
?//调用c.processLoop,reflector向queue里面添加数据,processLoop会不停去消费这里这些数据;wait.Until(c.processLoop, time.Second, stopCh)
}
4、controller.processLoop
/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/controller.go
先看一下config的结构
type Config struct {// The queue for your objects - has to be a DeltaFIFO due to// assumptions in the implementation. Your Process() function// should accept the output of this Queue's Pop() method.Queue
?// Something that can list and watch your objects.ListerWatcher
?// Something that can process your objects.Process ProcessFunc
?// The type of your objects.ObjectType runtime.Object
?// Reprocess everything at least this often.// Note that if it takes longer for you to clear the queue than this// period, you will end up processing items in the order determined// by FIFO.Replace(). Currently, this is random. If this is a// problem, we can change that replacement policy to append new// things to the end of the queue instead of replacing the entire// queue.FullResyncPeriod time.Duration
?// ShouldResync, if specified, is invoked when the controller's reflector determines the next// periodic sync should occur. If this returns true, it means the reflector should proceed with// the resync.ShouldResync ShouldResyncFunc
?// If true, when Process() returns an error, re-enqueue the object.// TODO: add interface to let you inject a delay/backoff or drop// the object completely if desired. Pass the object in// question to this interface as a parameter.RetryOnError bool
}
PopProcessFunc(c.config.Process)将前面HandleDeltas方法传递进去;
func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == ErrFIFOClosed {return}if c.config.RetryOnError {// This is the safe way to re-enqueue.c.config.Queue.AddIfNotPresent(obj)}}}
}
DeltaFIFO.Pop/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go主要从DeltaFIFO取出object,然后调用HandleDeltas方法进行处理;func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {f.lock.Lock()defer f.lock.Unlock()for {for len(f.queue) == 0 {// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.// When Close() is called, the f.closed is set and the condition is broadcasted.// Which causes this loop to continue and return from the Pop().if f.IsClosed() {return nil, ErrFIFOClosed}
?f.cond.Wait()}id := f.queue[0]f.queue = f.queue[1:]if f.initialPopulationCount > 0 {f.initialPopulationCount--}//从DeltaFIFO取出objectitem, ok := f.items[id]if !ok {// Item may have been deleted subsequently.continue}delete(f.items, id)//调用HandleDeltas方法进行处理err := process(item)if e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)err = e.Err}// Don't need to copyDeltas here, because we're transferring// ownership to the caller.return item, err}
}
处理DeltaFIFO
/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/shared_informer.go
HandleDeltas方法
//处理DeltaFIFO的方法
?
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {s.blockDeltas.Lock()defer s.blockDeltas.Unlock()
?// from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {case Sync, Added, Updated:isSync := d.Type == Syncs.cacheMutationDetector.AddObject(d.Object)if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {if err := s.indexer.Update(d.Object); err != nil {return err}//调用s.processor.distribute方法,将调用Listener.add,负责将watch的资源传到listener;s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)} else {if err := s.indexer.Add(d.Object); err != nil {return err}s.processor.distribute(addNotification{newObj: d.Object}, isSync)}case Deleted:if err := s.indexer.Delete(d.Object); err != nil {return err}s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil
}
/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {p.listenersLock.RLock()defer p.listenersLock.RUnlock()
?if sync {for _, listener := range p.syncingListeners {//调用Listener.add,负责将watch的资源传到listenerlistener.add(obj)}} else {for _, listener := range p.listeners {listener.add(obj)}}
}
Listener.add/pop/run
/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/cache/shared_informer.go
listenser的add方法负责将notify装进pendingNotifications;
pop函数取出pendingNotifications的第一个notify,输出到nextCh channel;
run函数则负责取出notify,然后根据notify的类型(增加、删除、更新)触发相应的处理函数,这些函数在ReplicaSetController注册,分别是:rsc.addPod、rsc.updatePod、rsc.deletePod、rsc.enqueueReplicaSet、rsc.updateRS、rsc.enqueueReplicaSet
//listenser的add方法负责将notify装进pendingNotifications;
func (p *processorListener) add(notification interface{}) {p.addCh <- notification
}
?
//pop函数取出pendingNotifications的第一个notify,输出到nextCh channel;
func (p *processorListener) pop() {defer utilruntime.HandleCrash()defer close(p.nextCh) // Tell .run() to stop
?var nextCh chan<- interface{}var notification interface{}for {select {case nextCh <- notification:// Notification dispatchedvar ok boolnotification, ok = p.pendingNotifications.ReadOne()if !ok { // Nothing to popnextCh = nil // Disable this select case}case notificationToAdd, ok := <-p.addCh:if !ok {return}if notification == nil { // No notification to pop (and pendingNotifications is empty)// Optimize the case - skip adding to pendingNotificationsnotification = notificationToAddnextCh = p.nextCh} else { // There is already a notification waiting to be dispatchedp.pendingNotifications.WriteOne(notificationToAdd)}}}
}
?
?
//run函数则负责取出notify,然后根据notify的类型(增加、删除、更新)触发相应的处理函数,这些函数在ReplicaSetController注册,
// 分别是:rsc.addPod、rsc.updatePod、rsc.deletePod、rsc.enqueueReplicaSet、rsc.updateRS、rsc.enqueueReplicaSet
func (p *processorListener) run() {// this call blocks until the channel is closed. When a panic happens during the notification// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)// the next notification will be attempted. This is usually better than the alternative of never// delivering again.stopCh := make(chan struct{})wait.Until(func() {// this gives us a few quick retries before a long pause and then a few more quick retrieserr := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {for next := range p.nextCh {switch notification := next.(type) {case updateNotification:p.handler.OnUpdate(notification.oldObj, notification.newObj)case addNotification:p.handler.OnAdd(notification.newObj)case deleteNotification:p.handler.OnDelete(notification.oldObj)default:utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))}}// the only way to get here is if the p.nextCh is empty and closedreturn true, nil})
?// the only way to get here is if the p.nextCh is empty and closedif err == nil {close(stopCh)}}, 1*time.Minute, stopCh)
}
rsc.addPod
/k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go
首先会根据pod返回rc,当pod不属于任何rc时,则返回。找到rc以后,更新rm.expectations.CreationObserved这个rc的期望值,也就是假如一个rc有4个pod,现在检测到创建了一个pod,则会将这个rc的期望值减少,变为3。然后将这个rc放入队列
调用rsc.enqueueReplicaSet,将调用rsc.queue.Add;
func (rsc *ReplicaSetController) addPod(obj interface{}) {pod := obj.(*v1.Pod)
?if pod.DeletionTimestamp != nil {// on a restart of the controller manager, it's possible a new pod shows up in a state that// is already pending deletion. Prevent the pod from being a creation observation.rsc.deletePod(pod)return}
?// If it has a ControllerRef, that's all that matters.if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {//首先会根据pod返回rc,当pod不属于任何rc时,则返回。rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)if rs == nil {return}rsKey, err := controller.KeyFunc(rs)if err != nil {return}klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)//找到rc以后,更新rm.expectations.CreationObserved这个rc的期望值,也就是假如一个rc有4个pod,现在检测到创建了一个pod,则会将这个rc的期望值减少,变为3。rsc.expectations.CreationObserved(rsKey)//然后将这个rc放入队列;rsc.enqueueReplicaSet(rs)return}
?// Otherwise, it's an orphan. Get a list of all matching ReplicaSets and sync// them to see if anyone wants to adopt it.// DO NOT observe creation because no controller should be waiting for an// orphan.rss := rsc.getPodReplicaSets(pod)if len(rss) == 0 {return}klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)for _, rs := range rss {rsc.enqueueReplicaSet(rs)}
}
rsc.worker
/k8s.io/kubernetes/pkg/controller/replicaset/replica_set.go
调用rsc.syncHandler,这里会调用rsc.syncReplicaSet,syncReplicaSet负责pod与rc的同步,确保Pod副本数与rc规定的相同;
func (rsc *ReplicaSetController) worker() {for rsc.processNextWorkItem() {}
}
func (rsc *ReplicaSetController) processNextWorkItem() bool {key, quit := rsc.queue.Get()if quit {return false}defer rsc.queue.Done(key)
?err := rsc.syncHandler(key.(string))if err == nil {rsc.queue.Forget(key)return true}
?utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))rsc.queue.AddRateLimited(key)
?return true
}