当前位置: 代码迷 >> 综合 >> Go 并发基础:Goroutines 和 Channels 的声明与使用
  详细解决方案

Go 并发基础:Goroutines 和 Channels 的声明与使用

热度:76   发布时间:2023-09-30 12:35:19.0

go语言可以通过轻量级线程,也就是协程来完成几百万,上百万的并发。 

 

什么是并发


前面的课程中,我所写的代码都按照顺序执行,也就是上一句代码执行完,才会执行下一句,这样的代码逻辑简单,也符合我们的阅读习惯。

但这样是不够的,因为计算机很强大,如果只让它干完一件事情再干另外一件事情就太浪费了。比如一款音乐软件,使用它听音乐的时候还想让它下载歌曲,同一时刻做了两件事,在编程中,这就是并发,并发可以让你编写的程序在同一时刻做多几件事情

 

并发和并行


Go 并发基础:Goroutines 和 Channels 的声明与使用

并发就是将多个线程,放到一个cpu上面,那么一段时间给线程1,一段时间给线程2,这样交替运行,从人的感受来说是所有线程是一起执行的,这种场景叫做并发,它并不是真正的同时运行,只不过cpu将时间切片了,分给了不同的程序。

并行就是计算机有多个cpu,每个cpu在处理不同的线程,这种情况就叫做并行。

go语言上面两种情况都包含了。

 

进程与线程


  • 任何语言的并行,到操作系统层面,都是内核线程的并行。(比如线程1执行某个函数,线程2执行某个函数)(不管你开辟了多少线程,不是说开辟多少线程就有多少并发,关键看你有多少核数,四核只能有4个并发,8核的有八个并发,多余核数,剩下的都处于等待状态,真正的其实就是有多少核,就有多少并发
  • 同一个进程内的多个线程共享系统资源,进程的创建、销毁、切换比线程大很多。(在进程里面打开了一个文件,这样一个文件可以在多个线程里面共用了,只需要打开一次即可)
  • 从进程到线程再到协程,其实是一个不断共享,不断减少切换成本的过程。(开辟一个进程的开销会很高)Go 并发基础:Goroutines 和 Channels 的声明与使用

并发模型

Go 并发基础:Goroutines 和 Channels 的声明与使用

在go中内核线程和用户,也就是代码里面写的,绑定关系是动态变化的。某个时刻内核线程绑定到了线程1,下个时刻绑定到线程2。

进程


在操作系统中,进程是一个非常重要的概念。当你启动一个软件(比如浏览器)的时候,操作系统会为这个软件创建一个进程,这个进程是该软件的工作空间,它包含了软件运行所需的所有资源,比如内存空间、文件句柄,还有下面要讲的线程等。下面的图片就是我的电脑上运行的进程:

Go 并发基础:Goroutines 和 Channels 的声明与使用                                                              (电脑运行的进程)

那么线程是什么呢?

线程


线程是进程的执行空间,一个进程可以有多个线程,线程被操作系统调度执行,比如下载一个文件,发送一个消息等。这种多个线程被操作系统同时调度执行的情况,就是多线程的并发。

一个程序启动,就会有对应的进程被创建,同时进程也会启动一个线程,这个线程叫作主线程。如果主线程结束,那么整个程序就退出了有了主线程,就可以从主线里启动很多其他线程,也就有了多线程的并发。

 

协程


进程:
         ? 分配系统资源(CPU 时间、内存等)基本单位(你要运行一个程序,这个程序需要cpu,内存,所有的这些分配的基本单位最后形成了一个进程)
         ? 有独立的内存空间,切换开销大(进程都有自己独立的内存开销,进程都有自己的虚拟内存空间,当你切换的时候,开销是比较大的)
线程:(进程一般都是一个个的执行程序,一个可运行文件基本上就是一个进程,线程可以在进程里面开启一个新的执行流)
         ? 进程的一个执行流,是 CPU 调度并能独立运行的的基本单位
         ? 同一进程中的多线程共享内存空间,线程切换代价小
         ? 多线程通信方便
         ? 从内核层面来看线程其实也是一种特殊的进程,它跟父进程共享了打开的文件和文件系统信息,共享了地址空间和信号处理函数
协程
? Go 语言中的轻量级线程实现(协程是在用户态,进程和线程是在内核感知的,它们都是linux里面的一个task struct。但是协程不一样,协程是用户态维护的一些轻量级线程的实现,它的所有调度都是通过go语言来完成)
? Golang 在 runtime、系统调用等多方面对 goroutine 调度进行了封装和处理,当遇到长时间执行或者进行系统调用时,会主动把当前 goroutine 的 CPU (P) 转让出去,让其他 goroutine 能被调度并执行,也就是 Golang 从语言层面支持了协程。(一旦go语言的程序获得了cpu的时间片,它会抓住cpu时间不放出去,只在用户态通过自己的调度器将时间片分出去给不同协程)

 

Communicating Sequential Process
CSP
       ? 描述两个独立的并发实体通过共享的通讯 channel 进行通信的并发模型。
Go 协程 goroutine
      ? 是一种轻量线程,用户态的线程,它不是操作系统的线程,而是将一个操作系统线程分段使用,通过调度器实现协作式调度。
      ? 是一种绿色线程,微线程,它与 Coroutine 协程也有区别,能够在发现堵塞后启动新的微线程。
通道 channel
? 类似 Unix 的 Pipe,用于协程之间通讯和同步。协程之间虽然解耦,但是它们和 Channel 有着耦合。

 channel类似于两个协程之间的管道,一面进,一面出。

java在做多线程通信的时候,只能通过共享变量,两个线程都要去访问同一块内存地址,任何一个线程要去访问这个变量的时候都要加锁来保证线程安全。这种加锁的机制是程序员可能会出现错误,忘记解锁,出现死锁,一个是其本身的效率会比较低。

通过channel就将复杂的逻辑隐藏起来了,程序员不再需要关心加锁或者解锁的问题。

 

 

线程和协程的差异
每个 goroutine (协程) 默认占用内存远比 Java 、C 的线程少
       ? goroutine: 2KB
       ? 线程:8MB
线程 goroutine 切换开销方面,goroutine 远比线程小
      ? 线程: 涉及模式切换(从用户态切换到内核态)、16个寄存器、PC、SP...等寄存器的刷新
      ? goroutine: 只有三个寄存器的值修改 - PC / SP / DX.
GOMAXPROCS
      ? 控制并行线程数量

 如果print是开销比较重的操作,那么是顺序执行了,程序效率就不高了。

 print("a")print("b")print("c")

 那如何利用多核的优势呢?通过协程来解决

 go print("a")go print("b")go print("c")time.Sleep(time.Second)cab

 

 

 协程vs线程


Go 并发基础:Goroutines 和 Channels 的声明与使用随着协程运行,如果需要开辟很大的slice,或者开辟很多的map,这样就动态的添加一点内存,内存大小是可以随着协程的运行,而动态进行增长的。

线程不一样,不管你将来执行什么代码,在最开始的时候分配固定大小的内存空间,为了防止将来内存空间不够用,会倾向于分配的多一点。

可以看到协程是按需分配,线程是在一开始就写死了,造成一个很大的内存浪费。

在写java的时候,比如Spring boot这种框架,随便搞个简单的系统项目,随随便便启动就是几个g内存,但是go程序可能启动就几百M。

Go 并发基础:Goroutines 和 Channels 的声明与使用

Go 并发基础:Goroutines 和 Channels 的声明与使用协程创建和销毁是用户级别的,go runtime用来创建销毁调度协程的。

而线程的创建和销毁不是java自己控制的,是内核线程去控制的,涉及到用户态和内核态来回切换的话,成本自然就高了。

查看逻辑核心数 Go 并发基础:Goroutines 和 Channels 的声明与使用

查看当前的go进程可用的cpu核数是几个,一般是机器上面cpu的总数。也可以控制使用某几个核。

func main()  {fmt.Println(runtime.NumCPU())
}
8

MPG并发模型


Go 并发基础:Goroutines 和 Channels 的声明与使用

协程(Goroutine)


Go 语言中没有线程的概念,只有协程,也称为 goroutine。相比线程来说,协程更加轻量,一个程序可以随意启动成千上万个 goroutine。

goroutine 被 Go runtime 所调度,这一点和线程不一样。也就是说,Go 语言的并发是由 Go 自己所调度的,自己决定同时执行多少个 goroutine,什么时候执行哪几个。这些对于我们开发者来说完全透明,只需要在编码的时候告诉 Go 语言要启动几个 goroutine,至于如何调度执行,我们不用关心。

要启动一个 goroutine 非常简单,Go 语言为我们提供了 go 关键字,相比其他编程语言简化了很多,如下面的代码所示:

func main() {go fmt.Println("飞雪无情")fmt.Println("我是 main goroutine")time.Sleep(time.Second)
}

这样就启动了一个 goroutine,用来调用 fmt.Println 函数,打印“飞雪无情”。所以这段代码里有两个 goroutine,一个是 main 函数启动的 main goroutine,一个是我自己通过 go 关键字启动的 goroutine。

从示例中可以总结出 go 关键字的语法,如下所示:

go function()

go 关键字后跟一个方法或者函数的调用,就可以启动一个 goroutine,让方法在这个新启动的 goroutine 中运行。运行以上示例,可以看到如下输出:

我是 main goroutine
飞雪无情

从输出结果也可以看出,程序是并发的,go 关键字启动的 goroutine 并不阻塞 main goroutine 的执行,所以我们才会看到如上打印结果。

小提示:示例中的 time.Sleep(time.Second) 表示等待一秒,这里是让 main goroutine 等一秒,不然 main goroutine 执行完毕程序就退出了,也就看不到启动的新 goroutine 中“飞雪无情”的打印结果了。

  • 如果阻塞发生在main协程里,并且没有其他子协程可以执行,那就可以确定"希望永远等不来",自已把自己杀掉,报一个fata error∶deadlock出来。
  • 如果阻塞发生在子协程里,就不会发生死锁,因为至少main协程是一个值得等待的"希望",会一直等(阻塞))下去。

Channel


那么如果启动了多个 goroutine,它们之间该如何通信呢?这就是 Go 语言提供的 channel(通道)要解决的问题。

共享内存


很多语言通过共享内存来实现线程间的通信,通过加锁来访问共享数据,如数组、map或结构体。go语言也实现了这种并发模型。

Go 并发基础:Goroutines 和 Channels 的声明与使用

CSP(communicating sequential processes)讲究的是“以通信的方式来共享内存”,在go语言里channel是这种模式的具体实现。

Go 并发基础:Goroutines 和 Channels 的声明与使用

声明一个 channel  同步channel


在 Go 语言中,声明一个 channel 非常简单,使用内置的 make 函数即可,如下所示:

ch:=make(chan string)

其中 chan 是一个关键字,表示是 channel 类型。后面的 string 表示 channel 里的数据是 string 类型。通过 channel 的声明也可以看到,chan 是一个集合类型。

定义好 chan 后就可以使用了,一个 chan 的操作只有两种:发送和接收。

  1. 接收:获取 chan 中的值,操作符为 <- chan  消费者

  2. 发送:向 chan 发送值,把值放在 chan 中,操作符为 chan <-  生产者

小技巧:这里注意发送和接收的操作符,都是 <- ,只不过位置不同。接收的 <- 操作符在 chan 的左侧,发送的 <- 操作符在 chan 的右侧。 

现在我把上个示例改造下,使用 chan 来代替 time.Sleep 函数的等待工作,如下面的代码所示:

func main() {ch:=make(chan string)go func() {fmt.Println("飞雪无情")ch <- "goroutine 完成"}()fmt.Println("我是 main goroutine")v:=<-chfmt.Println("接收到的chan中的值为:",v)}

运行这个示例,可以发现程序并没有退出,可以看到"飞雪无情"的输出结果,达到了 time.Sleep 函数的效果,如下所示:

我是 main goroutine飞雪无情接收到的chan中的值为: goroutine 完成

可以这样理解:在上面的示例中,我们在新启动的 goroutine 中向 chan 类型的变量 ch 发送值;在 main goroutine 中,从变量 ch 接收值;如果 ch 中没有值,则阻塞等待到 ch 中有值可以接收为止。

相信你应该明白为什么程序不会在新的 goroutine 完成之前退出了,因为通过 make 创建的 chan 中没有值,而 main goroutine 又想从 chan 中获取值,获取不到就一直等待,等到另一个 goroutine 向 chan 发送值为止。

channel 有点像在两个 goroutine 之间架设的管道,一个 goroutine 可以往这个管道里发送数据,另外一个可以从这个管道里取数据,有点类似于我们说的队列。

异步管道


asynChann := make(chan int, 8)

channel底层维护一个环形队列(先进先出),make初始化时指定队列的长度。队列满时,写阻塞;队列空时,读阻塞。

sendx指向下一次写入的位置, recvx指向下一次读取的位置。 recvq维护因读管道而被阻塞的协程,sendq维护因写管道而被阻塞的协程。

Go 并发基础:Goroutines 和 Channels 的声明与使用同步管道可以认为队列容量为0,当读协程和写协程同时就绪时它们才会彼此帮对方解除阻塞。

syncChann := make(chan int)

 
 

chan struct{}


  •  channel仅作为协程间同步的工具,不需要传递具体的数据,管道类型可以用struct{}
  • sc := make(chan struct{})
  • sc<- struct{}{}
  • 空结构体变量的内存占用为0,因此struct{}类型的管道比bool类型的管道还要省内存

关于channel的死锁与阻塞


  1. Channel满了,就阻塞写;Channel空了,就阻塞读。
  2. 阻塞之后会交出cpu,去执行其他协程,希望其他协程能帮自己解除阻塞。
  3. 如果阻塞发生在main协程里,并且没有其他子协程可以执行,那就可以确定“希望永远等不来”,自已把自己杀掉,报一个fatal error:deadlock出来。
  4. 如果阻塞发生在子协程里,就不会发生死锁,因为至少main协程是一个值得等待的“希望”,会一直等(阻塞)下去。
package mainimport ("fmt""time"
)func main() {ch := make(chan struct{}, 1)ch <- struct{}{} //有1个缓冲可以用,无需阻塞,可以立即执行go func() {      //子协程1time.Sleep(5 * time.Second) //sleep一个很长的时间<-ch                        //如果把本行代码注释掉,main协程5秒钟后会报fatal errorfmt.Println("sub routine 1 over")}()ch <- struct{}{} //由于子协程1已经启动,寄希望于子协程1帮自己解除阻塞,所以会一直等子协程1执行结束。如果子协程1执行结束后没帮自己解除阻塞,则希望完全破灭,报出deadlockfmt.Println("send to channel in main routine")go func() { //子协程2ch <- struct{}{} //channel已满,子协程2会一直阻塞在这一行fmt.Println("sub routine 2 over")}()time.Sleep(3 * time.Second)fmt.Println("main routine exit")
}

ch := make(chan struct{}, 1). 如果不让管道存储有实际意义的信息,可以将其设置为空的结构体。

如果想让routine2可以执行完,不让其阻塞,可以使用如下:

	<-chtime.Sleep(2 * time.Second)fmt.Println("main routine exit")

无缓冲 channel


上面的示例中,使用 make 创建的 chan 就是一个无缓冲 channel,它的容量是 0,不能存储任何数据。所以无缓冲 channel 只起到传输数据的作用,数据并不会在 channel 中做任何停留。这也意味着,无缓冲 channel 的发送和接收操作是同时进行的,它也可以称为同步 channel。

有缓冲 channel


有缓冲 channel 类似一个可阻塞的队列,内部的元素先进先出。通过 make 函数的第二个参数可以指定 channel 容量的大小,进而创建一个有缓冲 channel,如下面的代码所示:

cacheCh:=make(chan int,5)

我创建了一个容量为 5 的 channel,内部的元素类型是 int,也就是说这个 channel 内部最多可以存放 5 个类型为 int 的元素,如下图所示:

Go 并发基础:Goroutines 和 Channels 的声明与使用

一个有缓冲 channel 具备以下特点:

  1. 有缓冲 channel 的内部有一个缓冲队列

  2. 发送操作是向队列的尾部插入元素,如果队列已满,则阻塞等待,直到另一个 goroutine 执行,接收操作释放队列的空间

  3. 接收操作是从队列的头部获取元素并把它从队列中删除,如果队列为空,则阻塞等待,直到另一个 goroutine 执行,发送操作插入新的元素

因为有缓冲 channel 类似一个队列,可以获取它的容量和里面元素的个数。如下面的代码所示:

cacheCh:=make(chan int,5)cacheCh <- 2
cacheCh <- 3fmt.Println("cacheCh容量为:",cap(cacheCh),",元素个数为:",len(cacheCh))

其中,通过内置函数 cap 可以获取 channel 的容量,也就是最大能存放多少个元素,通过内置函数 len 可以获取 channel 中元素的个数。

小提示:无缓冲 channel 其实就是一个容量大小为 0 的 channel。比如 make(chan int,0)。

关闭 channel(用于遍历管道和解除阻塞)


channel 还可以使用内置函数 close 关闭,如下面的代码所示:

???????close(cacheCh)

如果一个 channel 被关闭了,就不能向里面发送数据了,如果发送的话,会引起 painc 异常。但是还可以接收 channel 里的数据,如果 channel 里没有数据的话,接收的数据是元素类型的零值。???????

  • 只有当管道关闭时,才能通过range遍历管道里的数据,否则会发生fatal error。
  • 管道关闭后读操作会立即返回(即使之前阻塞的,但是关闭之后立刻返回,相当于解除了阻塞),如果缓冲已空会返回“0值”。(调用close之后,即使缓冲区为空也不会阻塞)
  • ele, ok := <-ch ok==true代表ele是管道里的真实数据。
  • 向已关闭的管道里send数据会发生panic。
  • 不能重复关闭管道,不能关闭值为nil的管道,否则都会panic。
	ch := make(chan int,5)ch <- 1ch <- 2close(ch)for ele:= range ch{fmt.Println(ele)}v,ok := <-ch //上面元素取走了,但是并没有阻塞,因为调用了close。channel里面没有元素,返回0值fmt.Println(v,ok)

单向 channel


有时候,我们有一些特殊的业务需求,比如限制一个 channel 只可以接收但是不能发送,或者限制一个 channel 只能发送但不能接收,这种 channel 称为单向 channel。

单向 channel 的声明也很简单,只需要在声明的时候带上 <- 操作符即可,如下面的代码所示:

onlySend := make(chan<- int)onlyReceive:=make(<-chan int)

注意,声明单向 channel <- 操作符的位置和上面讲到的发送和接收操作是一样的。

在函数或者方法的参数中,使用单向 channel 的较多,这样可以防止一些操作影响了 channel。

下面示例中的 counter 函数,它的参数 out 是一个只能发送的 channel,所以在 counter 函数体内使用参数 out 时,只能对其进行发送操作,如果执行接收操作,则程序不能编译通过。

func counter(out chan<- int) {//函数内容使用变量out,只能进行发送操作}

select+channel 示例


假设要从网上下载一个文件,我启动了 3 个 goroutine 进行下载,并把结果发送到 3 个 channel 中。其中,哪个先下载好,就会使用哪个 channel 的结果。

在这种情况下,如果我们尝试获取第一个 channel 的结果,程序就会被阻塞,无法获取剩下两个 channel 的结果,也无法判断哪个先下载好。这个时候就需要用到多路复用操作了,在 Go 语言中,通过 select 语句可以实现多路复用,其语句格式如下:

select {
case i1 = <-c1://todo
case c2 <- i2://todo
default:// default todo
}

整体结构和 switch 非常像,都有 case 和 default,只不过 select 的 case 是一个个可以操作的 channel

小提示:多路复用可以简单地理解为,N 个 channel 中,任意一个 channel 有数据产生,select 都可以监听到,然后执行相应的分支,接收数据并处理。

有了 select 语句,就可以实现下载的例子了。如下面的代码所示:

func main() {//声明三个存放结果的channelfirstCh := make(chan string)secondCh := make(chan string)threeCh := make(chan string)//同时开启3个goroutine下载go func() {firstCh <- downloadFile("firstCh")}()go func() {secondCh <- downloadFile("secondCh")}()go func() {threeCh <- downloadFile("threeCh")}()//开始select多路复用,哪个channel能获取到值,就说明哪个最先下载好,就用哪个。select {case filePath := <-firstCh:fmt.Println(filePath)case filePath := <-secondCh:fmt.Println(filePath)case filePath := <-threeCh:fmt.Println(filePath)}
}func downloadFile(chanName string) string {//模拟下载文件,可以自己随机time.Sleep点时间试试time.Sleep(time.Second)return chanName+":filePath"
}

如果这些 case 中有一个可以执行,select 语句会选择该 case 执行,如果同时有多个 case 可以被执行,则随机选择一个,这样每个 case 都有平等的被执行的机会。如果一个 select 没有任何 case,那么它会一直等待下去。

channle的应用场景


数据传输 

Go 并发基础:Goroutines 和 Channels 的声明与使用

上游: channle可以当作缓冲区来使用,为每个文件开辟一个协程,每个协程负责读这个文件,每个协程读出一行一行的数据将一行放到管道里面去,三个文件对应三个协程,每个协程序往里面放数据,这样这个channel相当于包含了所有文件的所有内容,只不过数据不是按照顺序的,是被打乱的。

中游:开辟5个协程,这5个协程就负责去读这个管道,这个channel就相当于全局变量,大家都去读这个channel,每个channel从里面读取内容进行处理放到另外一个channel。相当于有5个协程并行的消费channel1,同时并行的往channel2里面写入数据。

下游:开辟了两个协程,负责消费channel2,从这个channel2拿到内容之后,写到文件里面去。

这样最开始有3个输入文件,最后有两个输出文件。

上面分为上,中,下游三个任务,它们的处理速度是不一样的,在慢的地方耗时的地方多开几个协程,这样相对于多分配几个CPU,加快处理速度,这样使得每一个阶段的速度比较均衡。

广播

Go 并发基础:Goroutines 和 Channels 的声明与使用

hub这维护了一个channel,把想说的话写到管道里面去,hub就不断的去监控这个管道,一旦发现里面有内容就将内容读取出来,然后分发给每个client。

上面管道都是协程安全点数据结构,天然的支持多个协程并发点读取,不需要关心去加锁

协调同步

Go 并发基础:Goroutines 和 Channels 的声明与使用

当上游的三个协程执行完之后,才去开启下游的三个协程序。

本质上就是管道满了写阻塞,管道空了读阻塞。

简单举例:

import ("fmt""strconv""sync"
)var(buffer chan stringwg sync.WaitGroup
)func init()  {buffer = make(chan string,1000)
}func put(){for i := 0; i < 10; i++{buffer <- "hello" + strconv.Itoa(i)}wg.Done()
}func take()  {for i := 0; i < 10; i++{str := <- bufferfmt.Println(str)}wg.Done()
}func main()  {
wg.Add(5)
go put()
go put()
go put()go take()
go take()
wg.Wait()
}

总结


介绍了如何通过 go 关键字启动一个 goroutine,以及如何通过 channel 实现 goroutine 间的数据传递,这些都是 Go 语言并发的基础,理解它们可以更好地掌握并发。

在 Go 语言中,提倡通过通信来共享内存,而不是通过共享内存来通信,其实就是提倡通过 channel 发送接收消息的方式进行数据传递,而不是通过修改同一个变量。所以在数据流动、传递的场景中要优先使用 channel,它是并发安全的,性能也不错。 

Go 并发基础:Goroutines 和 Channels 的声明与使用

  相关解决方案