前言
gitlab-runner是当前炙手可热的一个CICD工具,和Gitlab集成非常好。之前在工作中应用了一下,效果相当理想。
为了进一步了解gitlab-runner的运行原理,打算学习一下gitlab-runner源码。
由于是零基础,所以打算把整个过程记录下来,包括每一步的思路,以便复盘;贻笑大方之处,欢迎评论区打脸。
gitlab联调
上一章大致看了一下gitlab-runner
如何将runner
注册到gitlab
。因为比较简单,而且逻辑重点都在gitlab
中,所以讲得比较粗糙。
本章验证一下gitlab-runner
使用docker executor
工作的原理,在接到gitlab
的开始job通知后,如何开始启动docker container
,并完成job。
本地安装一个gitlab
我们创建一个gitlab
,配置一个runner
,然后运行pipeline,跟踪gitlab-runner
源代码。
参考资料:gitlab 安装指南 docker版
本地注册一个runner
参考资料:gitlab-runner使用docker executor
找到commands/register.go
,使用logrus
输出配置文件地址:
func (s *RegisterCommand) Execute(context *cli.Context) {
......err = s.saveConfig()......
}
//修改s.saveConfig,输出地址
func (c *configOptions) saveConfig() error {
logrus.Printf("saveConfig at" + c.ConfigFile)return c.config.SaveConfig(c.ConfigFile)
}
跟踪后,发现config地址:~/.gitlab-runner/config.toml
,如下图:
如果我们要配置runner
属性,就不用运行代码了,直接改文件就可以了。
启动pipeline
- 为项目配个
.gitlab-ci.yml
,随意配下即可,后面根据具体的测试内容再调整。 - 运行项目,等待pipeline接入。下面是debug用到的launch.json
{
"name": "gitlab run","type": "go","request": "launch","mode": "debug","program": "${workspaceFolder}","args": ["run"]},
- 断点打在
commands/multi.go
的func (mr *RunCommand) run() {}
。 - 运行pipeline,可以看到断点被点亮,三个红框分别代表:监工(调度器),工人,等待结束。
- 跟踪代码,可以看到
multi.go
的运行过程大致如下:
// Run命令被执行
func (mr *RunCommand) Execute(_ *cli.Context) {
......// 起了一个服务err = svc.Run() //跳转下面......
}
func (mr *RunCommand) Start(_ service.Service) error {
......// 运行命令go mr.run() //跳转下面......
}
// RunCommand运行
func (mr *RunCommand) run() {
// 建立一些监控方面的服务mr.setupMetricsAndDebugServer()mr.setupSessionServer()// 按照config.toml创建通道runnersrunners := make(chan *common.RunnerConfig)// 不断把配置好的runner放入(feed)通道go mr.feedRunners(runners)// 系统信号接收signal.Notify(mr.stopSignals, syscall.SIGQUIT, syscall.SIGTERM, os.Interrupt)signal.Notify(mr.reloadSignal, syscall.SIGHUP)// 开启一个管理工人的协程startWorker := make(chan int)stopWorker := make(chan bool)// 每次 startWorker 收到编号,就会启动一个 processRunnersgo mr.startWorkers(startWorker, stopWorker, runners)workerIndex := 0for mr.stopSignal == nil {
// 控制worker数量,多退少补// 注意方法中,对 startWorker 的处理signaled := mr.updateWorkers(&workerIndex, startWorker, stopWorker)if signaled != nil {
break}// 更新config文件signaled = mr.updateConfig()if signaled != nil {
break}}// 系统关闭处理for mr.currentWorkers > 0 {
stopWorker <- truemr.currentWorkers--}mr.log().Info("All workers stopped. Can exit now")close(mr.runFinished)
}
- 由以上代码可以知道,系统主要依靠以下不间断的方法或协程:
- feedRunners
- startWorkers
- updateWorkers
- updateConfig
下面我们对这几部分代码进行一些简单的阅读和分析。
feedRunners
主要功能,将config.runners
放到chan *common.RunnerConfig
。
func (mr *RunCommand) feedRunners(runners chan *common.RunnerConfig) {
for mr.stopSignal == nil {
......//wait//将配置好的runner,依次放入通道,放一个,歇一会for _, runner := range config.Runners {
// 该方法只是给runner做了个健康检查mr.feedRunner(runner, runners)time.Sleep(interval)}}......
}
startWorkers
代码不多,大致意思就是拿到通道中的startworker
(工号),启动这个工人。
func (mr *RunCommand) startWorkers(startWorker chan int, stopWorker chan bool, runners chan *common.RunnerConfig) {
for mr.stopSignal == nil {
id := <-startWorker// 这是个非常关键的方法,后面有重点介绍go mr.processRunners(id, stopWorker, runners)}
}
updateWorkers
这个功能也比较简单,和系统设置的并发数比较一下,多退少补。增加的方式是使用通道(channel
)传递值workerIndex
。减少的模式是stopWorker=true
。
func (mr *RunCommand) updateWorkers(workerIndex *int, startWorker chan int, stopWorker chan bool) os.Signal {
concurrentLimit := mr.config.Concurrentif concurrentLimit < 1 {
mr.log().Fatalln("Concurrent is less than 1 - no jobs will be processed")}// 太多for mr.currentWorkers > concurrentLimit {
select {
case stopWorker <- true:case signaled := <-mr.runSignal:return signaled}mr.currentWorkers--}// 太少for mr.currentWorkers < concurrentLimit {
select {
case startWorker <- *workerIndex:case signaled := <-mr.runSignal:return signaled}mr.currentWorkers++*workerIndex++}return nil
}
updateConfig
比较简单,不赘述,就是两点:
- 根据指令读取配置
- 定时读取配置
func (mr *RunCommand) updateConfig() os.Signal {
select {
case <-time.After(common.ReloadConfigInterval * time.Second):err := mr.checkConfig()if err != nil {
mr.log().Errorln("Failed to load config", err)}case <-mr.reloadSignal:err := mr.loadConfig()if err != nil {
mr.log().Errorln("Failed to load config", err)}case signaled := <-mr.runSignal:return signaled}return nil
}
processRunners
如果之前的feedRunners
将一个runner
配置,放进了通道;那么下面的代码会把runner
放进worker
。
如果之前的updateWorkers
要关闭一个worker
;那么也是这里处理。
func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *common.RunnerConfig) {
mr.log().WithField("worker", id).Debugln("Starting worker")for mr.stopSignal == nil {
select {
case runner := <-runners:// 将runner放进worker,后面再细看err := mr.processRunner(id, runner, runners)if err != nil {
mr.log().WithFields(logrus.Fields{
"runner": runner.ShortDescription(),"executor": runner.Executor,}).WithError(err).Warn("Failed to process runner")}// force GC cycle after processing buildruntime.GC()case <-stopWorker:// 啥都不干,关闭mr.log().WithField("worker", id).Debugln("Stopping worker")return}}<-stopWorker
}
这里把processRunner
拿出来再细看一下,有点复杂,下一章跟踪调试一下,?
func (mr *RunCommand) processRunner( id int, runner *common.RunnerConfig, runners chan *common.RunnerConfig,) (err error) {
provider := common.GetExecutorProvider(runner.Executor)if provider == nil {
return}executorData, err := provider.Acquire(runner)if err != nil {
return fmt.Errorf("failed to update executor: %w", err)}defer provider.Release(runner, executorData)if !mr.buildsHelper.acquireBuild(runner) {
logrus.WithFields(logrus.Fields{
"runner": runner.ShortDescription(),"worker": id,}).Debug("Failed to request job, runner limit met")return}defer mr.buildsHelper.releaseBuild(runner)buildSession, sessionInfo, err := mr.createSession(provider)if err != nil {
return}// Receive a new buildtrace, jobData, err := mr.requestJob(runner, sessionInfo)if err != nil || jobData == nil {
return}defer func() {
mr.traceOutcome(trace, err) }()// Create a new buildbuild, err := common.NewBuild(*jobData, runner, mr.abortBuilds, executorData)if err != nil {
return}build.Session = buildSessionbuild.ArtifactUploader = mr.network.UploadRawArtifacts// Add build to list of builds to assign numbersmr.buildsHelper.addBuild(build)defer mr.buildsHelper.removeBuild(build)// Process the same runner by different worker again// to speed up taking the buildsmr.requeueRunner(runner, runners)// Process a buildreturn build.Run(mr.config, trace)
}
完毕。