目录
前言
1.goroutine
1.1 交互体验
1.2 服务端
2.Channels
2.0 最简易用法:
2.1 无缓存channel & 同步
2.2 串联的Channels(pipeline)
2.3 单向channel
2.4 带缓存buffer的Channel
2.5 并发的循环
2.6 join等待
2.6.1 计数sync.WaitGroup
2.6.2 chennel信号量
2.7 select多路复用
2.8 goroutine退出
前言
应用程序通常需要
- 图形化界面异步执行一部分代码,以提升交互体验
- 需要并发地执行以提升效率
- 服务端并发处理连接,提升响应速度和qps
这些功能都是通过并发实现地,在Go语言中,每一个并发的执行单元叫作一个goroutine,可以暂时理解为Java地线程Thread。语法非常简单,就是在调用需要异步执行的方法前加上go关键字:
f() // call f(); 同步调用,等待返回
go f() // 创建 goroutine调用f(); 不等待结果,当前goroutine继续执行下面地代码
1.goroutine
1.1 交互体验
当程序在做复杂性高的工作,用户等待在那里得不到响应,用户体验非常差,所以我们通常看到有很多应用会展示进度条或者一朵菊花似的圈圈在那转(如图),以提示用户等待??。
下面是一个最简单的样例,在执行fib斐波那契数列计算时,并发地打印等待的圈圈spinner。两个独立执行单元分别在独立的函数中,但两个函数会并发地执行(cpu多核的情况可以同时执行,非多核是两个单元轮流执行一个时间片)。
func main() {go spinner(100 * time.Millisecond) // 并发执行函数spinnerconst n = 45fibN := fib(n) // slow 递归深度大,执行效率低fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}
执行结果:一直旋转,直到fib返回 主函数main也执行结束,主函数返回时,所有的goroutine都会被直接打断,程序退出。除了从主函数退出或者直接终止程序之外,没有其它的编程方法能够让一个goroutine来打断另一个的执行,但可以通过goroutine之间的通信来让一个goroutine A 请求goroutine B,并让goroutine B自行结束执行。
1.2 服务端
一个TCP服务端如果只能同步执行,那一次只能处理一个请求,执行结果如下,第一个客户端连接,能后拿到响应;第二个客户端连接了
// TCP 服务端
func main() {listener, err := net.Listen("tcp", "localhost:8000")if err != nil {log.Fatal(err)}for {conn, err := listener.Accept() // 监听连接,有连接进来返回一个连接if err != nil {log.Print(err) // e.g., connection abortedcontinue}handleConn(conn) // 一次只能处理一个连接,一直等到这个函数返回,才能下个循环响应下一个连接}
}
// 返回给客户端当前时间,死循环,一直不能退出
func handleConn(c net.Conn) {defer c.Close()for {_, err := io.WriteString(c, time.Now().Format("15:04:05\n"))if err != nil {return // e.g., client disconnected}time.Sleep(1 * time.Second)}
}
client1 | client2 |
---|---|
服务端程序只需要做一点小改动,就能使其支持并发:在handleConn函数调用的地方增加go关键字,让每一次handleConn的调用都进入一个独立的goroutine。
for {conn, err := listener.Accept()// 省略错误处理go handleConn(conn) // 并发地处理每个连接
}
client1 | client2 |
---|---|
2.Channels
channel是一个通信机制,它可以让goroutine A给另一个goroutine B发送信息。类似于Java的BlockQueue,通过线程间共享阻塞队列叨叨通过和传递数据的目的。
每个channel都有一个类型,也就是channel可发送数据的类型,类似于Java ArrayList<Integer>容器中的类型,如发送int类型数据的channel一般写为chan int。
channel有发送和接收两个主要操作,都是通信行为。
// 创建
ch := make(chan int) // 定义一个传输int数据的channel,返回底层数据结构的引用
ch = make(chan int) // 无缓冲区
ch = make(chan int, 0) // 无缓冲区
ch = make(chan int, 3) // 带大小为3的缓冲区
//
ch <- x // 发送x
x = <-ch // 接收并赋值给x
<-ch // 接收并丢弃close(ch)
goroutine A 将一个值通过channel发送到另一个执行接收操作的goroutine B。
发送和接收都使用<-
运算符,方向左,ch在左表示发送,ch在右表示接收。
操作 | 操作数顺序 |
---|---|
发送 | ch在左,箭头方向是把x发给ch中 |
接收 | x在左,类似于赋值 range接收 |
关闭 | 关闭channel,随后对该channel的任何发送操作都将导致panic异常 ,但仍然可以接收 试图重复关闭一个channel将导致panic异常,试图关闭一个nil值的channel也将导致panic异常。 |
2.0 最简易用法:
var ch chan string = make(chan string) // ch has type 'chan int'func main() {fmt.Printf("%T\n", ch)go receive()ch <- "send from main"close(ch)
}
// receive from channel
func receive() {x := <-chfmt.Println("rev:"+x)
}
2.1 无缓存channel & 同步
一个基于无缓存Channels的发送操作将导致发送者goroutine阻塞,直到接收者goroutine在相同的Channels上执行接收操作,当发送的值通过Channels成功传输之后,两个goroutine可以继续执行后面的语句。类似于Java的1大小的BlockQueue。
由于发送和接收是阻塞的操作,可以通过channel进行同步。
func main() {conn, err := net.Dial("tcp", "localhost:8000")if err != nil {log.Fatal(err)}done := make(chan struct{}) // 空类型的通道go func() { // 异步routineio.Copy(os.Stdout, conn) // 打印服务端响应log.Println("done")done <- struct{}{} // 发送 }()mustCopy(conn, os.Stdin) // 发送请求到服务端conn.Close()<-done // 主线程阻塞在通道上等待异步routine发送完
}
2.2 串联的Channels(pipeline)
对于pipeline管道可能很熟悉,shell中用于串联多个进程,而go中就是串联goroutine,
当一个channel被关闭后,再向该channel发送数据将导致panic异常。当一个被关闭的channel中已经发送的数据都被成功接收后,后续的接收操作将不再阻塞,它们会立即返回一个零值。
没有办法直接测试一个channel是否被关闭,但是接收操作有一个变体形式:它多接收一个结果,多接收的第二个结果是一个布尔值ok,ture表示成功从channels接收到值,false表示channels已经被关闭并且里面没有值可接收。
x, ok := <-naturals
if !ok {break //通道已经关闭了,接收不到值了
}
但上面的方式有点不方便,可以用range直接遍历
for x := range naturals { // 依次从channel接收数据,当channel被关闭并且没有值可接收时跳出循环squares <- x * x
}
与网络连接或者文件不同,不一定要关闭channel,只有需要signal接收者goroutine发送完毕时才需要关闭channel。
或者不管一个channel是否被关闭,当它没有被引用时将会被Go语言的垃圾自动回收器回收。
2.3 单向channel
out chan<- int // 发送
in <-chan int // 接收
隐式转换:任何双向channel向单向channel变量的赋值操作都将导致该隐式转换,可以隐式地从chan int转换成chan<- int,也可以转换为<-chan int
。
这里并没有反向转换的语法:不能将一个chan<- int或者 <-channel int类型
转换为chan int
类型的双向型的channel。
2.4 带缓存buffer的Channel
带缓存的Channel持有一个元素队列,类似于Java的BlockQueue,队列容量cap在调用make创建channel时第二个参数指定。
下面的语句创建了一个可以持有三个字符串元素的带缓存Channel。
ch = make(chan string, 3)cap(ch) // 获取容量
len(ch) // 发送了多少个元素
发送就是入队,接收操作则是出队。
如果内部缓存队列是满的,发送将阻塞直到因另一个goroutine执行接收操作而释放了新空间。
相反,如果channel是空的,接收操作将阻塞直到有另一个goroutine执行发送操作而向队列插入元素。
那么channel的缓存队列将不是满的也不是空的,因此发送或接收操作都不会发生阻塞。这样就解耦了接收和发送的goroutine。
channel可以是多对多的,不一定是一对一的。
goroutines泄漏:当goroutines阻塞在channel上,泄漏的goroutines不会被自动回收,应确保不再需要的goroutine能正常退出是重要的。
2.5 并发的循环
并发处理通常在循环中,并行处理类似的事情。与前面将函数闭包时一样,如果在循环中引用外部变量,存在快照问题,最终使用的是同一个变量,而不是枚举过程中不断迭代的变量。所以需要显式地定义局部变量,而不是直接使用外部变量。
for _, f := range filenames {// 异步处理go thumbnail.ImageFile(f) // 闭包引用 外部变量 错误!!!
}for _, f := range filenames {go func(f string) { // 显式变量f,非闭包引用,否则变量快照问题thumbnail.ImageFile(f) }(f)
}
2.6 join等待
通常会需要主线程等待自线程执行完成后再退出,否则整个程序退出了,自线程没有处理完成。但是没有提供显式的方法完成这个功能,通常通过channel发送事件。
func makeThumbnails3(filenames []string) {ch := make(chan struct{})for _, f := range filenames {go func(f string) {thumbnail.ImageFile(f) ch <- struct{}{} // 每个线程处理完才会发送信号}(f)}// 主线程等待所有子线程处理完成后,接收事件信号for range filenames {<-ch}
}
2.6.1 计数sync.WaitGroup
类似于countDownLatch的功能,WaitGroup通常用于等待一组线程执行完成,就是上一个例子中主线程等待子线程完成后退出的功能。
var wg sync.WaitGroup // number of working goroutines 工作线程数 类似于countdownlatch
for f := range filenames {wg.Add(1)// workergo func(f string) {defer wg.Done()...}(f)
}
go func() {wg.Wait()close(sizes)
}()
在启动一个线程时Add,一个线程退出时Done,最终用Wait等待其状态到0。
2.6.2 chennel信号量
无限制地新增线程并发处理,不一定可以提高效率,与计算机硬件有关;而且可能会耗尽更有限地资源,所以需要控制并发数量。就像Java地线程池指定大小来控制并发,go可以通过信号量来控制这一点,信号量也是通过channel来实现的。
channel也可以实现信号量,类似Java中的Samphore信号量,用一个有容量限制的buffered channel来控制并发,channel里的n个空槽代表n个可以处理内容的token(通行证),只有先占了一个槽,线程才能继续执行。
var tokens = make(chan struct{}, 20)func crawl(url string) []string {tokens <- struct{}{} // 获取信号量,发送到有缓冲的channel,如果满了,则阻塞等待list, err := links.Extract(url)<-tokens // 处理完成,释放信号量return list
}
2.7 select多路复用
类似于IO的select多路复用,用于线程监听多个io信号,处理多个io事件,减少io阻塞等待的事件,go中channel也有阻塞的情况,所以也就有了select多路复用的场景,以优化性能。
select {
case <-ch1:// ...
case x := <-ch2:// ...use x...
case ch3 <- y:// ...
default:// ...
}
select会等待case中有能够执行的case时去执行。如case <-ch1指ch1中可以接收数据,就会执行这个case 下的语句,如果没有的话会阻塞。当条件满足时,select才会去通信并执行case之后的语句;这时候其它case是不会执行的。当有多个case都满足时,随机选择一个case进行。
一个没有任何case的select语句写作select{},会永远地等待下去。select还可以指定default子句,没有满足的case会执行default。
channel的零值是nil,对一个nil的channel发送和接收会永远阻塞,永远都不会被select到。
select {
case <-abort:fmt.Printf("Launch aborted!\n")return
default:// do nothing
}
2.8 goroutine退出
类似于Java中线程A调用B的中断方法将其中断位置位,线程B处理逻辑中检查其中断位并退出这样的设计才能实现一个线程停止另一个线程。go中可以定一个channel,线程A向channel发送中断信号,线程B在多路复用select从channel中接收中断信号,如果接收到了退出。
但是如果我们想要退出两个或者任意多个goroutine怎么办呢?一对多广播
关闭了一个channel并且被消费掉了所有已发送的值,接收操作会产生零值。我们可以将这个机制扩展一下,来作为我们的广播机制:不要向channel发送值,而是用关闭一个channel来进行广播。
var done = make(chan struct{})func cancelled() bool {select {case <-done:return truedefault:return false}
}go func() {os.Stdin.Read(make([]byte, 1)) // read a single byteclose(done)// 关闭通道,实现广播,所有cancel方法进入第一个case,不再阻塞
}()