当前位置: 代码迷 >> 综合 >> 从零开始阅读gitlab-runner源代码 003 worker 和 runner
  详细解决方案

从零开始阅读gitlab-runner源代码 003 worker 和 runner

热度:11   发布时间:2024-02-19 21:36:44.0

前言

gitlab-runner是当前炙手可热的一个CICD工具,和Gitlab集成非常好。之前在工作中应用了一下,效果相当理想。

为了进一步了解gitlab-runner的运行原理,打算学习一下gitlab-runner源码。

由于是零基础,所以打算把整个过程记录下来,包括每一步的思路,以便复盘;贻笑大方之处,欢迎评论区打脸。

gitlab联调

上一章大致看了一下gitlab-runner如何将runner注册到gitlab。因为比较简单,而且逻辑重点都在gitlab中,所以讲得比较粗糙。

本章验证一下gitlab-runner使用docker executor工作的原理,在接到gitlab的开始job通知后,如何开始启动docker container,并完成job。

本地安装一个gitlab

我们创建一个gitlab,配置一个runner,然后运行pipeline,跟踪gitlab-runner源代码。

参考资料:gitlab 安装指南 docker版

本地注册一个runner

参考资料:gitlab-runner使用docker executor

找到commands/register.go,使用logrus输出配置文件地址:

func (s *RegisterCommand) Execute(context *cli.Context) {
    ......err = s.saveConfig()......
}
//修改s.saveConfig,输出地址
func (c *configOptions) saveConfig() error {
    logrus.Printf("saveConfig at" + c.ConfigFile)return c.config.SaveConfig(c.ConfigFile)
}

跟踪后,发现config地址:~/.gitlab-runner/config.toml,如下图:

config location

如果我们要配置runner属性,就不用运行代码了,直接改文件就可以了。

启动pipeline

  • 为项目配个.gitlab-ci.yml,随意配下即可,后面根据具体的测试内容再调整。
  • 运行项目,等待pipeline接入。下面是debug用到的launch.json
        {
    "name": "gitlab run","type": "go","request": "launch","mode": "debug","program": "${workspaceFolder}","args": ["run"]},
  • 断点打在commands/multi.gofunc (mr *RunCommand) run() {}
  • 运行pipeline,可以看到断点被点亮,三个红框分别代表:监工(调度器),工人,等待结束。

go routine

  • 跟踪代码,可以看到multi.go的运行过程大致如下:
// Run命令被执行
func (mr *RunCommand) Execute(_ *cli.Context) {
    ......// 起了一个服务err = svc.Run() //跳转下面......
}
func (mr *RunCommand) Start(_ service.Service) error {
    ......// 运行命令go mr.run() //跳转下面......
}
// RunCommand运行
func (mr *RunCommand) run() {
    // 建立一些监控方面的服务mr.setupMetricsAndDebugServer()mr.setupSessionServer()// 按照config.toml创建通道runnersrunners := make(chan *common.RunnerConfig)// 不断把配置好的runner放入(feed)通道go mr.feedRunners(runners)// 系统信号接收signal.Notify(mr.stopSignals, syscall.SIGQUIT, syscall.SIGTERM, os.Interrupt)signal.Notify(mr.reloadSignal, syscall.SIGHUP)// 开启一个管理工人的协程startWorker := make(chan int)stopWorker := make(chan bool)// 每次 startWorker 收到编号,就会启动一个 processRunnersgo mr.startWorkers(startWorker, stopWorker, runners)workerIndex := 0for mr.stopSignal == nil {
    // 控制worker数量,多退少补// 注意方法中,对 startWorker 的处理signaled := mr.updateWorkers(&workerIndex, startWorker, stopWorker)if signaled != nil {
    break}// 更新config文件signaled = mr.updateConfig()if signaled != nil {
    break}}// 系统关闭处理for mr.currentWorkers > 0 {
    stopWorker <- truemr.currentWorkers--}mr.log().Info("All workers stopped. Can exit now")close(mr.runFinished)
}
  • 由以上代码可以知道,系统主要依靠以下不间断的方法或协程:
    • feedRunners
    • startWorkers
    • updateWorkers
    • updateConfig

下面我们对这几部分代码进行一些简单的阅读和分析。

feedRunners

主要功能,将config.runners放到chan *common.RunnerConfig

func (mr *RunCommand) feedRunners(runners chan *common.RunnerConfig) {
    for mr.stopSignal == nil {
    ......//wait//将配置好的runner,依次放入通道,放一个,歇一会for _, runner := range config.Runners {
    // 该方法只是给runner做了个健康检查mr.feedRunner(runner, runners)time.Sleep(interval)}}......
}

startWorkers

代码不多,大致意思就是拿到通道中的startworker(工号),启动这个工人。

func (mr *RunCommand) startWorkers(startWorker chan int, stopWorker chan bool, runners chan *common.RunnerConfig) {
    for mr.stopSignal == nil {
    id := <-startWorker// 这是个非常关键的方法,后面有重点介绍go mr.processRunners(id, stopWorker, runners)}
}

updateWorkers

这个功能也比较简单,和系统设置的并发数比较一下,多退少补。增加的方式是使用通道(channel)传递值workerIndex。减少的模式是stopWorker=true

func (mr *RunCommand) updateWorkers(workerIndex *int, startWorker chan int, stopWorker chan bool) os.Signal {
    concurrentLimit := mr.config.Concurrentif concurrentLimit < 1 {
    mr.log().Fatalln("Concurrent is less than 1 - no jobs will be processed")}// 太多for mr.currentWorkers > concurrentLimit {
    select {
    case stopWorker <- true:case signaled := <-mr.runSignal:return signaled}mr.currentWorkers--}// 太少for mr.currentWorkers < concurrentLimit {
    select {
    case startWorker <- *workerIndex:case signaled := <-mr.runSignal:return signaled}mr.currentWorkers++*workerIndex++}return nil
}

updateConfig

比较简单,不赘述,就是两点:

  1. 根据指令读取配置
  2. 定时读取配置
func (mr *RunCommand) updateConfig() os.Signal {
    select {
    case <-time.After(common.ReloadConfigInterval * time.Second):err := mr.checkConfig()if err != nil {
    mr.log().Errorln("Failed to load config", err)}case <-mr.reloadSignal:err := mr.loadConfig()if err != nil {
    mr.log().Errorln("Failed to load config", err)}case signaled := <-mr.runSignal:return signaled}return nil
}

processRunners

如果之前的feedRunners将一个runner配置,放进了通道;那么下面的代码会把runner放进worker
如果之前的updateWorkers要关闭一个worker;那么也是这里处理。

func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *common.RunnerConfig) {
    mr.log().WithField("worker", id).Debugln("Starting worker")for mr.stopSignal == nil {
    select {
    case runner := <-runners:// 将runner放进worker,后面再细看err := mr.processRunner(id, runner, runners)if err != nil {
    mr.log().WithFields(logrus.Fields{
    "runner":   runner.ShortDescription(),"executor": runner.Executor,}).WithError(err).Warn("Failed to process runner")}// force GC cycle after processing buildruntime.GC()case <-stopWorker:// 啥都不干,关闭mr.log().WithField("worker", id).Debugln("Stopping worker")return}}<-stopWorker
}

这里把processRunner拿出来再细看一下,有点复杂,下一章跟踪调试一下,?

func (mr *RunCommand) processRunner(    id int,    runner *common.RunnerConfig,    runners chan *common.RunnerConfig,) (err error) {
    provider := common.GetExecutorProvider(runner.Executor)if provider == nil {
    return}executorData, err := provider.Acquire(runner)if err != nil {
    return fmt.Errorf("failed to update executor: %w", err)}defer provider.Release(runner, executorData)if !mr.buildsHelper.acquireBuild(runner) {
    logrus.WithFields(logrus.Fields{
    "runner": runner.ShortDescription(),"worker": id,}).Debug("Failed to request job, runner limit met")return}defer mr.buildsHelper.releaseBuild(runner)buildSession, sessionInfo, err := mr.createSession(provider)if err != nil {
    return}// Receive a new buildtrace, jobData, err := mr.requestJob(runner, sessionInfo)if err != nil || jobData == nil {
    return}defer func() {
     mr.traceOutcome(trace, err) }()// Create a new buildbuild, err := common.NewBuild(*jobData, runner, mr.abortBuilds, executorData)if err != nil {
    return}build.Session = buildSessionbuild.ArtifactUploader = mr.network.UploadRawArtifacts// Add build to list of builds to assign numbersmr.buildsHelper.addBuild(build)defer mr.buildsHelper.removeBuild(build)// Process the same runner by different worker again// to speed up taking the buildsmr.requeueRunner(runner, runners)// Process a buildreturn build.Run(mr.config, trace)
}

完毕。

  相关解决方案