主要逻辑
nginx controller 入口函数
// file:k8s.io/ingress-nginx/nginx/main.go
func main() {// step1: 初始化日志组件klog.InitFlags(nil)......// step2:创建必要的目录err = file.CreateRequiredDirectories()......// step 3 :初始化ApiserverClientkubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)......// step4: 检查service配置if len(conf.DefaultService) > 0 {err := checkService(conf.DefaultService, kubeClient)......klog.Infof("Validated %v as the default backend.", conf.DefaultService)}if len(conf.PublishService) > 0 {err := checkService(conf.PublishService, kubeClient)......}// step5:获取namespaceif conf.Namespace != "" {_, err = kubeClient.CoreV1().Namespaces().Get(context.TODO(), conf.Namespace, metav1.GetOptions{})if err != nil {klog.Fatalf("No namespace with name %v found: %v", conf.Namespace, err)}}// step6: 创建默认证书conf.FakeCertificate = ssl.GetFakeSSLCert()klog.Infof("SSL fake certificate created %v", conf.FakeCertificate.PemFileName)// step7: 检查是否支持v1beta API 、k8s 版本是否高于1.18.0k8s.IsNetworkingIngressAvailable, k8s.IsIngressV1Ready = k8s.NetworkingIngressAvailable(kubeClient)if !k8s.IsNetworkingIngressAvailable {klog.Warningf("Using deprecated \"k8s.io/api/extensions/v1beta1\" package because Kubernetes version is < v1.14.0")}if k8s.IsIngressV1Ready {......}conf.Client = kubeClient// step8: 注册prometheusreg := prometheus.NewRegistry()reg.MustRegister(prometheus.NewGoCollector())reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{PidFn: func() (int, error) { return os.Getpid(), nil },ReportErrors: true,}))......// step9:启动profileif conf.EnableProfiling {go registerProfiler()}// step10: 实例化nginxcontroller (*)ngx := controller.NewNGINXController(conf, mc)// step11: 启动健康探测和metrics APImux := http.NewServeMux()registerHealthz(nginx.HealthPath, ngx, mux)registerMetrics(reg, mux)go startHTTPServer(conf.ListenPorts.Health, mux)// step12: 启动nginx master进程go ngx.Start()......
}
nginx controller 初始化
// NewNGINXController creates a new NGINX Ingress controller.
func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController {// 初始化 event broadcastereventBroadcaster := record.NewBroadcaster()eventBroadcaster.StartLogging(klog.Infof)eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: config.Client.CoreV1().Events(config.Namespace),})// 获取/etc/resolv.conf 中的nameserver 列表h, err := dns.GetSystemNameServers()if err != nil {klog.Warningf("Error reading system nameservers: %v", err)}// 实例化NGINXControllern := &NGINXController{isIPV6Enabled: ing_net.IsIPv6Enabled(),resolver: h,cfg: config,syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "nginx-ingress-controller",}),stopCh: make(chan struct{}),updateCh: channels.NewRingChannel(1024),ngxErrCh: make(chan error),stopLock: &sync.Mutex{},runningConfig: new(ingress.Configuration),Proxy: &TCPProxy{},metricCollector: mc,command: NewNginxCommand(),}// 启动webhook 服务if n.cfg.ValidationWebhook != "" {n.validationWebhookServer = &http.Server{Addr: config.ValidationWebhook,Handler: adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}),TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(),}}// 获取pod runtime信息pod, err := k8s.GetPodDetails(config.Client)if err != nil {klog.Fatalf("unexpected error obtaining pod information: %v", err)}n.podInfo = pod// 实例化store(本地缓存)n.store = store.New(config.Namespace,config.ConfigMapName,config.TCPConfigMapName,config.UDPConfigMapName,config.DefaultSSLCertificate,config.ResyncPeriod,config.Client,n.updateCh,pod,config.DisableCatchAll)// 创建同步队列n.syncQueue = task.NewTaskQueue(n.syncIngress)... ...// 格式化template配置模板onTemplateChange := func() {template, err := ngx_template.NewTemplate(nginx.TemplatePath)if err != nil {// this error is different from the rest because it must be clear why nginx is not workingklog.Errorf(`
-------------------------------------------------------------------------------
Error loading new template: %v
-------------------------------------------------------------------------------
`, err)return}// 若模板格式化正确,则更新到nginxcontroller 对象中,并往同步队列发送一个template-change事件n.t = templateklog.Info("New NGINX configuration template loaded.")n.syncQueue.EnqueueTask(task.GetDummyObject("template-change"))}// 首次启动加载配置模板文件ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath)......n.t = ngxTpl// 监听模板文件变化// 监听 /etc/nginx/template/nginx.tmpl 模板文件是否有变化,有变化则调用onTemplateChange_, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange)... ...// 监听/etc/nginx/geoip/ 目录下配置文件变化filesToWatch := []string{}err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {......filesToWatch = append(filesToWatch, path)......})......for _, f := range filesToWatch {_, err = watch.NewFileWatcher(f, func() {klog.Infof("File %v changed. Reloading NGINX", f)// 配置文件有变化则往同步队列发送一个file-change 事件n.syncQueue.EnqueueTask(task.GetDummyObject("file-change"))})......}return n
}
ingress controller 结构体
type NGINXController struct {// pod runtime 信息podInfo *k8s.PodInfo// 配置信息cfg *Configuration// 事件通知器recorder record.EventRecorder// 同步队列syncQueue *task.Queue// 同步状态syncStatus status.Syncer// 同步限流器syncRateLimiter flowcontrol.RateLimiterstopLock *sync.MutexstopCh chan struct{}// 更新环状channelupdateCh *channels.RingChannel// 接受nginx 错误信息channelngxErrCh chan error// 当前配置文件runningConfig *ingress.Configuration// nginx 配置模板文件t ngx_template.TemplateWriter// nameserver 列表resolver []net.IP// 是否启用ipv6isIPV6Enabled bool// 是否关闭isShuttingDown bool// TCP代理Proxy *TCPProxy// 本地缓存store store.Storer// metrics 收集器metricCollector metric.Collector// webhookvalidationWebhookServer *http.Server// nginx 二进制命令command NginxExecTester
}
ngx.Start()
ngx.Start() 主要做3个事情
启动store 协程
启动syncQueue协程
监听updateCh
当从updateCh 见到变化事件时,向syncQueue 发送一个task
// file:internal/ingress/controller/nginx.go
// Start starts a new NGINX master process running in the foreground.
func (n *NGINXController) Start() {klog.Info("Starting NGINX Ingress controller")// 初始化同步informers 及secretn.store.Run(n.stopCh)// we need to use the defined ingress class to allow multiple leaders// in order to update information about ingress status// 定义节点选举ID (ingress class 用于区分不同集群)// 使用定义的ingress class 来允许多个leader节点更新ingress状态electionID := fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.DefaultClass)if class.IngressClass != "" {electionID = fmt.Sprintf("%v-%v", n.cfg.ElectionID, class.IngressClass)}// leader节点选举setupLeaderElection(&leaderElectionConfig{......})cmd := n.command.ExecCommand()......if n.cfg.EnableSSLPassthrough {n.setupSSLProxy()}// 启动nginxklog.Info("Starting NGINX process")n.start(cmd)// 启动同步队列go n.syncQueue.Run(time.Second, n.stopCh)// force initial sync// 发送initial-sync 事件n.syncQueue.EnqueueTask(task.GetDummyObject("initial-sync"))// In case of error the temporal configuration file will// be available up to five minutes after the error// 每隔5分钟删除临时配置文件go func() {for {time.Sleep(5 * time.Minute)err := cleanTempNginxCfg()......}}()......for {select {case err := <-n.ngxErrCh:if n.isShuttingDown {return}// if the nginx master process dies, the workers continue to process requests// until the failure of the configured livenessProbe and restart of the pod.// master 进程挂掉时,workerInc进程将继续处理请求,直到配置的liveness探针探测失败if process.IsRespawnIfRequired(err) {return}// 循环从updateCh里面获取事件case event := <-n.updateCh.Out():if n.isShuttingDown {break}if evt, ok := event.(store.Event); ok {klog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj)if evt.Type == store.ConfigurationEvent {// TODO: is this necessary? Consider removing this special casen.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change"))continue}// 放入可忽略的同步队列n.syncQueue.EnqueueSkippableTask(evt.Obj)} else {klog.Warningf("Unexpected event type received %T", event)}case <-n.stopCh:return}}
}
事件类型
const (// CreateEvent event associated with new objects in an informerCreateEvent EventType = "CREATE"// UpdateEvent event associated with an object update in an informerUpdateEvent EventType = "UPDATE"// DeleteEvent event associated when an object is removed from an informerDeleteEvent EventType = "DELETE"// ConfigurationEvent event associated when a controller configuration object is created or updatedConfigurationEvent EventType = "CONFIGURATION"
)
同步队列
结构体
// Queue manages a time work queue through an independent worker that invokes the
// given sync function for every work item inserted.
// The queue uses an internal timestamp that allows the removal of certain elements
// which timestamp is older than the last successful get operation.
type Queue struct {// queue is the work queue the worker pollsqueue workqueue.RateLimitingInterface// sync is called for each item in the queuesync func(interface{}) error// workerDone is closed when the worker exitsworkerDone chan bool// fn makes a key for an API objectfn func(obj interface{}) (interface{}, error)// lastSync is the Unix epoch time of the last execution of 'sync'lastSync int64
}
队列类型
(1) 可忽略队列 EnqueueSkippableTask
(2) 不可忽略队列
// EnqueueTask enqueues ns/name of the given api object in the task queue.
func (t *Queue) EnqueueTask(obj interface{}) {t.enqueue(obj, false)
}// EnqueueSkippableTask enqueues ns/name of the given api object in
// the task queue that can be skipped
func (t *Queue) EnqueueSkippableTask(obj interface{}) {t.enqueue(obj, true)
}// 入队列
// enqueue enqueues ns/name of the given api object in the task queue.
func (t *Queue) enqueue(obj interface{}, skippable bool) {if t.IsShuttingDown() {klog.Errorf("queue has been shutdown, failed to enqueue: %v", obj)return}ts := time.Now().UnixNano()if !skippable {// make sure the timestamp is bigger than lastSyncts = time.Now().Add(24 * time.Hour).UnixNano()}klog.V(3).Infof("queuing item %v", obj)key, err := t.fn(obj)if err != nil {klog.Errorf("%v", err)return}t.queue.Add(Element{Key: key,Timestamp: ts,})
}
store 协程
// file : k8s.io/ingress-nginx/internal/controller/store/store.go
// Run initiates the synchronization of the informers and the initial
// synchronization of the secrets.
func (s *k8sStore) Run(stopCh chan struct{}) {// start informerss.informers.Run(stopCh)
}
调用了informers.Run()方法
起多个协程去监听ingress、secret、endpoint、service、configmap、pod 的变化
// Run initiates the synchronization of the informers against the API server.
func (i *Informer) Run(stopCh chan struct{}) {// 启动secret、endpoint、service、configmap、pod 的informergo i.Secret.Run(stopCh)go i.Endpoint.Run(stopCh)go i.Service.Run(stopCh)go i.ConfigMap.Run(stopCh)go i.Pod.Run(stopCh)......time.Sleep(1 * time.Second)go i.Ingress.Run(stopCh)......
}
这里以监听 ingress 变化为例,接着分析具体实现
// New creates a new object store to be used in the ingress controller
func New(namespace, configmap, tcp, udp, defaultSSLCertificate string,resyncPeriod time.Duration,client clientset.Interface,updateCh *channels.RingChannel,pod *k8s.PodInfo,disableCatchAll bool) Storer {store := &k8sStore{informers: &Informer{},listers: &Lister{},sslStore: NewSSLCertTracker(),updateCh: updateCh,backendConfig: ngx_config.NewDefault(),syncSecretMu: &sync.Mutex{},backendConfigMu: &sync.RWMutex{},secretIngressMap: NewObjectRefMap(),defaultSSLCertificate: defaultSSLCertificate,pod: pod,}......// k8sStore fulfills resolver.Resolver interface// 格式化annotationstore.annotations = annotations.NewAnnotationExtractor(store)store.listers.IngressWithAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)......// create informers factory, enable and assign required informers// informer 工厂函数infFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,informers.WithNamespace(namespace),informers.WithTweakListOptions(tweakListOptionsFunc))if k8s.IsNetworkingIngressAvailable {store.informers.Ingress = infFactory.Networking().V1beta1().Ingresses().Informer()} else {store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()}store.listers.Ingress.Store = store.informers.Ingress.GetStore()store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()store.informers.Secret = infFactory.Core().V1().Secrets().Informer()store.listers.Secret.Store = store.informers.Secret.GetStore()store.informers.ConfigMap = infFactory.Core().V1().ConfigMaps().Informer()store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()store.informers.Service = infFactory.Core().V1().Services().Informer()store.listers.Service.Store = store.informers.Service.GetStore()labelSelector := labels.SelectorFromSet(store.pod.Labels)// list and watch 机制store.informers.Pod = cache.NewSharedIndexInformer(&cache.ListWatch{ListFunc: func(options metav1.ListOptions) (k8sruntime.Object, error) {options.LabelSelector = labelSelector.String()return client.CoreV1().Pods(store.pod.Namespace).List(context.TODO(), options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {options.LabelSelector = labelSelector.String()return client.CoreV1().Pods(store.pod.Namespace).Watch(context.TODO(), options)},},&corev1.Pod{},resyncPeriod,cache.Indexers{},)store.listers.Pod.Store = store.informers.Pod.GetStore()ingDeleteHandler := func(obj interface{}) {ing, ok := toIngress(obj)if !ok {// If we reached here it means the ingress was deleted but its final state is unrecorded.tombstone, ok := obj.(cache.DeletedFinalStateUnknown)if !ok {klog.Errorf("couldn't get object from tombstone %#v", obj)return}ing, ok = tombstone.Obj.(*networkingv1beta1.Ingress)if !ok {klog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)return}}if !class.IsValid(ing) {klog.Infof("ignoring delete for ingress %v based on annotation %v", ing.Name, class.IngressKey)return}if isCatchAllIngress(ing.Spec) && disableCatchAll {klog.Infof("ignoring delete for catch-all ingress %v/%v because of --disable-catch-all", ing.Namespace, ing.Name)return}recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))store.listers.IngressWithAnnotation.Delete(ing)key := k8s.MetaNamespaceKey(ing)store.secretIngressMap.Delete(key)updateCh.In() <- Event{Type: DeleteEvent,Obj: obj,}}ingEventHandler := cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {ing, _ := toIngress(obj)if !class.IsValid(ing) {a, _ := parser.GetStringAnnotation(class.IngressKey, ing)klog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)return}if isCatchAllIngress(ing.Spec) && disableCatchAll {klog.Infof("ignoring add for catch-all ingress %v/%v because of --disable-catch-all", ing.Namespace, ing.Name)return}recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))store.syncIngress(ing)store.updateSecretIngressMap(ing)store.syncSecrets(ing)updateCh.In() <- Event{Type: CreateEvent,Obj: obj,}},DeleteFunc: ingDeleteHandler,UpdateFunc: func(old, cur interface{}) {oldIng, _ := toIngress(old)curIng, _ := toIngress(cur)validOld := class.IsValid(oldIng)validCur := class.IsValid(curIng)if !validOld && validCur {if isCatchAllIngress(curIng.Spec) && disableCatchAll {klog.Infof("ignoring update for catch-all ingress %v/%v because of --disable-catch-all", curIng.Namespace, curIng.Name)return}klog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)recorder.Eventf(curIng, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))} else if validOld && !validCur {klog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)ingDeleteHandler(old)return} else if validCur && !reflect.DeepEqual(old, cur) {if isCatchAllIngress(curIng.Spec) && disableCatchAll {klog.Infof("ignoring update for catch-all ingress %v/%v and delete old one because of --disable-catch-all", curIng.Namespace, curIng.Name)ingDeleteHandler(old)return}recorder.Eventf(curIng, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))} else {klog.V(3).Infof("No changes on ingress %v/%v. Skipping update", curIng.Namespace, curIng.Name)return}store.syncIngress(curIng)store.updateSecretIngressMap(curIng)store.syncSecrets(curIng)updateCh.In() <- Event{Type: UpdateEvent,Obj: cur,}},}secrEventHandler := cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {...},UpdateFunc: func(old, cur interface{}) {...},DeleteFunc: func(obj interface{}) {...},}epEventHandler := cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {...},DeleteFunc: func(obj interface{}) {...},UpdateFunc: func(old, cur interface{}) {...},}......cmEventHandler := cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {...},UpdateFunc: func(old, cur interface{}) {...},}podEventHandler := cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {...},UpdateFunc: func(old, cur interface{}) {...},DeleteFunc: func(obj interface{}) {...},}serviceHandler := cache.ResourceEventHandlerFuncs{UpdateFunc: func(old, cur interface{}) {...},}// 注册各种类型的eventHandlerstore.informers.Ingress.AddEventHandler(ingEventHandler)store.informers.Endpoint.AddEventHandler(epEventHandler)store.informers.Secret.AddEventHandler(secrEventHandler)store.informers.ConfigMap.AddEventHandler(cmEventHandler)store.informers.Service.AddEventHandler(serviceHandler)store.informers.Pod.AddEventHandler(podEventHandler)// do not wait for informers to read the configmap configurationns, name, _ := k8s.ParseNameNS(configmap)cm, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), name, metav1.GetOptions{})if err != nil {klog.Warningf("Unexpected error reading configuration configmap: %v", err)}store.setConfig(cm)return store
}
可以看到,每种类型的informer 基本都有相关的回调方法,包括:
AddFunc: func(obj interface{}) {...},
UpdateFunc: func(old, cur interface{}) {...},
DeleteFunc: func(obj interface{}) {...},
每个方法里面都会往updateCh 写入不同类型的事件(CreateEvent、DeleteEvent、UpdateEvent)
这一步跟store 协程协同工作,informer 通过list&watch 方法监听资源变化,一旦资源有变化则向updateCh 里面写入事件,store 协程循环监听updateCh变化,一旦收到事件则往syncQueue 写入一个task
队列消费
// file : k8s.io/ingress-controller/internal/ingress/controller/nginx.go
// 初始化Queue
n.syncQueue = task.NewTaskQueue(n.syncIngress)// NewTaskQueue creates a new task queue with the given sync function.
// The sync function is called for every element inserted into the queue.
// 对于每个插入进来的项目都会调用sync function
func NewTaskQueue(syncFn func(interface{}) error) *Queue {return NewCustomTaskQueue(syncFn, nil)
}// NewCustomTaskQueue
func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (interface{}, error)) *Queue {// syncFn(也就是syncIngress)被赋值到Queue.sync q := &Queue{queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),sync: syncFn,workerDone: make(chan bool),fn: fn,}if fn == nil {q.fn = q.defaultKeyFunc}return q
}
消费Queue队列
核心方法:
t.queue.Get() -> t.sync()
// file: k8s.io/ingress-nginx/internal/ingress/controller/nginx.go
func (n *NGINXController) Start() {......go n.syncQueue.Run(time.Second, n.stopCh)......
}// file: k8s.io/ingress-nginx/internal/task/queue.go
// Run starts processing elements in the queue
func (t *Queue) Run(period time.Duration, stopCh <-chan struct{}) {wait.Until(t.worker, period, stopCh)
}// worker processes work in the queue through sync.
// 消费Queue队列
func (t *Queue) worker() {for {key, quit := t.queue.Get()......ts := time.Now().UnixNano()item := key.(Element)// 比对最后一次同步的时间戳与Queue中取出item里面带的时间戳,如果小于最后一次同步时间戳则忽略改变更if t.lastSync > item.Timestamp {klog.V(3).Infof("skipping %v sync (%v > %v)", item.Key, t.lastSync, item.Timestamp)t.queue.Forget(key)t.queue.Done(key)continue}klog.V(3).Infof("syncing %v", item.Key)// 调用syncIngressif err := t.sync(key); err != nil {klog.Warningf("requeuing %v, err %v", item.Key, err)t.queue.AddRateLimited(Element{Key: item.Key,Timestamp: time.Now().UnixNano(),})} else {t.queue.Forget(key)t.lastSync = ts}t.queue.Done(key)}
}
syncIngress 工作原理
比对线上在跑的配置跟新生成的配置是否相同,并判断是否能够动态重载配置(仅更新endpoint),减少nginx频繁reload带来性能损耗.
pcfg :当前格式化出来的配置
n.runningConfig : 当前线上环境运行的配置
比对pcfg 和 n.runningConfig 配置,判断是否可以动态更新配置(仅endpoint列表变化)
(1)支持动态更新配置:调用n.configureDynamically(pcfg)
将backend 列表以json格式post 到/configuration/backends 这个LUA Handler,动态更新endpoint 列表
(2)不支持动态更新配置,调用 n.OnUpdate(*pcfg)
生成临时配置文件
检测临时配置文件语法
diff 临时配置文件与当前线上配置文件
删除临时配置文件
将新生成的配置写入线上配置文件
执行nginx -s reload 重载配置
// file: k8s.io/ingress-nginx/internal/ingress/controller/controller.go
// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
// 组装nginx 配置文件
// 需要reload 时,调用OnUpdate
func (n *NGINXController) syncIngress(interface{}) error {......ings := n.store.ListIngresses(nil)// 格式化新配置hosts, servers, pcfg := n.getConfiguration(ings)......// 判断配置是否有变化if n.runningConfig.Equal(pcfg) {klog.V(3).Infof("No configuration change detected, skipping backend reload.")return nil}......// 配置有变化,则判断是否需要reload nginxif !n.IsDynamicConfigurationEnough(pcfg) {klog.Infof("Configuration changes detected, backend reload required.")// 生成checksum hash值hash, _ := hashstructure.Hash(pcfg, &hashstructure.HashOptions{TagName: "json",})pcfg.ConfigurationChecksum = fmt.Sprintf("%v", hash)//调用onUpdate 方法err := n.OnUpdate(*pcfg)......klog.Infof("Backend successfully reloaded.")......}// 是否首次同步(ingress.Configuration 结构体是否为空)isFirstSync := n.runningConfig.Equal(&ingress.Configuration{})if isFirstSync {// For the initial sync it always takes some time for NGINX to start listening// For large configurations it might take a while so we loop and back off// 首次初始化需要耗费一定的时间,睡眠1秒klog.Info("Initial sync, sleeping for 1 second.")time.Sleep(1 * time.Second)}// 重试机制retry := wait.Backoff{Steps: 15,Duration: 1 * time.Second,Factor: 0.8,Jitter: 0.1,}err := wait.ExponentialBackoff(retry, func() (bool, error) {// 动态更新nginx 配置err := n.configureDynamically(pcfg)if err == nil {klog.V(2).Infof("Dynamic reconfiguration succeeded.")return true, nil}klog.Warningf("Dynamic reconfiguration failed: %v", err)return false, err})......n.runningConfig = pcfgreturn nil
}
判断是否可以动态更新配置
不需要reload的场景
- endpoint 变化
需要reload的场景
- 新增ingress
- 新增证书配置
- ingress 增加/删除 PATH
- 删除ingress、service、secret
- Secret 更新
- 部分annotation变更,造成上述状态更新
// file: k8s.io/ingress-contoller/internal/ingress/controller/nginx.go
// IsDynamicConfigurationEnough returns whether a Configuration can be
// dynamically applied, without reloading the backend.
// 判断是否nginx 可以动态重载,不需要执行reload
func (n *NGINXController) IsDynamicConfigurationEnough(pcfg *ingress.Configuration) bool {copyOfRunningConfig := *n.runningConfigcopyOfPcfg := *pcfgcopyOfRunningConfig.Backends = []*ingress.Backend{}copyOfPcfg.Backends = []*ingress.Backend{}clearL4serviceEndpoints(?OfRunningConfig)clearL4serviceEndpoints(?OfPcfg)copyOfRunningConfig.ControllerPodsCount = 0copyOfPcfg.ControllerPodsCount = 0clearCertificates(?OfRunningConfig)clearCertificates(?OfPcfg)return copyOfRunningConfig.Equal(?OfPcfg)
}
不能动态更新,调用nginx reload 重载配置
// OnUpdate is called by the synchronization loop whenever configuration
// changes were detected. The received backend Configuration is merged with the
// configuration ConfigMap before generating the final configuration file.
// Returns nil in case the backend was successfully reloaded.
// 当监听到配置发生变化,同步循环将调用OnUdate
// 接收到的backend 配置会跟当前配置的configmap 进行合并
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {cfg := n.store.GetBackendConfiguration()cfg.Resolver = n.resolver// 生成临时配置content, err := n.generateTemplate(cfg, ingressCfg)......// 检查配置是否正确err = n.testTemplate(content)......if klog.V(2) {src, _ := ioutil.ReadFile(cfgPath)if !bytes.Equal(src, content) {tmpfile, err := ioutil.TempFile("", "new-nginx-cfg")if err != nil {return err}defer tmpfile.Close()// 创建临时配置文件err = ioutil.WriteFile(tmpfile.Name(), content, file.ReadWriteByUser)......// diff 比对生成的临时配置跟当前生效配置diffOutput, err := exec.Command("diff", "-I", "'# Configuration.*'", "-u", cfgPath, tmpfile.Name()).CombinedOutput()......klog.Infof("NGINX configuration diff:\n%v", string(diffOutput))// 删除临时配置文件os.Remove(tmpfile.Name())}}// 将新配置写入cfgPatherr = ioutil.WriteFile(cfgPath, content, file.ReadWriteByUser)......// reload nginxo, err := n.command.ExecCommand("-s", "reload").CombinedOutput()......return nil
}
动态更新
// file: k8s.io/ingress-contoller/internal/ingress/controller/nginx.go
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
// 以json 的格式封装backend 列表并post 到lua API
func (n *NGINXController) configureDynamically(pcfg *ingress.Configuration) error {backendsChanged := !reflect.DeepEqual(n.runningConfig.Backends, pcfg.Backends)if backendsChanged {// 更新endpoint 列表err := configureBackends(pcfg.Backends)......}// 比对TCP/UDP endpoint 列表streamConfigurationChanged := !reflect.DeepEqual(n.runningConfig.TCPEndpoints, pcfg.TCPEndpoints) || !reflect.DeepEqual(n.runningConfig.UDPEndpoints, pcfg.UDPEndpoints)if streamConfigurationChanged {err := updateStreamConfiguration(pcfg.TCPEndpoints, pcfg.UDPEndpoints)......}if n.runningConfig.ControllerPodsCount != pcfg.ControllerPodsCount {// post pod 数目statusCode, _, err := nginx.NewPostStatusRequest("/configuration/general", "application/json", ingress.GeneralConfig{ControllerPodsCount: pcfg.ControllerPodsCount,})......}// 比对servers 变化serversChanged := !reflect.DeepEqual(n.runningConfig.Servers, pcfg.Servers)if serversChanged {err := configureCertificates(pcfg.Servers)......}return nil
}
以JSON 格式 POST 调用LUA Handler /configuration/backends
// file: k8s.io/ingress-nginx/internal/controller/nginx.go
func configureBackends(rawBackends []*ingress.Backend) error {backends := make([]*ingress.Backend, len(rawBackends))for i, backend := range rawBackends {var service *apiv1.Serviceif backend.Service != nil {service = &apiv1.Service{Spec: backend.Service.Spec}}luaBackend := &ingress.Backend{Name: backend.Name,Port: backend.Port,SSLPassthrough: backend.SSLPassthrough,SessionAffinity: backend.SessionAffinity,UpstreamHashBy: backend.UpstreamHashBy,LoadBalancing: backend.LoadBalancing,Service: service,NoServer: backend.NoServer,TrafficShapingPolicy: backend.TrafficShapingPolicy,AlternativeBackends: backend.AlternativeBackends,}var endpoints []ingress.Endpointfor _, endpoint := range backend.Endpoints {endpoints = append(endpoints, ingress.Endpoint{Address: endpoint.Address,Port: endpoint.Port,})}luaBackend.Endpoints = endpointsbackends[i] = luaBackend}// 更新endpoint 列表statusCode, _, err := nginx.NewPostStatusRequest("/configuration/backends", "application/json", backends)if err != nil {return err}if statusCode != http.StatusCreated {return fmt.Errorf("unexpected error code: %d", statusCode)}return nil
}