多图详解Go中的Channel源码

chan介绍

package mainimport "fmt"

func main() {

c := make(chan int)

go func() {

c <- 1 // send to channel

}()

x := <-c // recv from channel

fmt.Println(x)

}

我们可以这样查看汇编结果:

go tool compile -N -l -S hello.go-N表示禁用优化-l禁用内联-S打印结果

通过上面这样的方流量交易式,我们可以直到chan是调用的哪些函数:

源码分析

结构体与创建

type hchan struct {

qcount   uint           // 循环列表元素个数

dataqsiz uint           // 循环队列的大小

buf      unsafe.Pointer // 循环队列的指针

elemsize uint16// chan中元素的大小

closed   uint32// 是否已close

elemtype *_type // chan中元素类型

sendx    uint   // send在buffer中的索引

recvx    uint   // recv在buffer中的索引

recvq    waitq // receiver的等待队列

sendq    waitq  // sender的等待队列

// 互拆锁

lock mutex

}

qcount代表chan 中已经接收但还没被取走的元素的个数,函数 len 可以返回这个字段的值;

dataqsiz和buf分别代表队列buffer的大小,cap函数可以返回这个字段的值以及队列buffer的指针,是一个定长的环形数组;

elemtype 和 elemsiz表示chan 中元素的类型和 元素的大小;

sendx:发送数据的指针在 buffer中的位置;

recvx:接收请求时的指针在 buffer 中的位置;

recvq和sendq分别表示等待接收数据的 goroutine 与等待发送数据的 goroutine;

sendq和recvq的类型是waitq的结构体:

type waitq struct {

first *sudog

last  *sudog

}

waitq里面连接的是一个sudog双向链表,保存的是等待的goroutine 。整个chan的图例大概是这样:下面看一下创建chan,我们通过汇编结果也可以查看到make(chan int)这句代码会调用到runtime的makechan函数中:

const (

maxAlign  = 8

hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))

)

func makechan(t *chantype, size int) *hchan {

elem := t.elem

// 略去检查代码

...

//计算需要分配的buf空间

mem, overflow := math.MulUintptr(elem.size, uintptr(size))

if overflow || mem > maxAlloc-hchanSize || size < 0 {

panic(plainError("makechan: size out of range"))

}

var c *hchan

switch {

case mem == 0:

// chan的size或者元素的size是0,不必创建buf

c = (*hchan)(mallocgc(hchanSize, nil, true))

// Race detector

c.buf = c.raceaddr()

case elem.ptrdata == 0:

// 元素不是指针,分配一块连续的内存给hchan数据结构和buf

c = (*hchan)(mallocgc(hchanSize+mem, nil, true))

// 表示hchan后面在内存里紧跟着就是buf

c.buf = add(unsafe.Pointer(c), hchanSize)

default:

// 元素包含指针,那么单独分配buf

c = new(hchan)

c.buf = mallocgc(mem, elem, true)

}

c.elemsize = uint16(elem.size)

c.elemtype = elem

c.dataqsiz = uint(size)

return c

}

首先我们可以看到计算hchanSize:

maxAlign  = 8

hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))

maxAlign是8,那么maxAlign-1的二进制就是111,然后和int(unsafe.Sizeof(hchan{}))取与就是取它的低三位,hchanSize就得到的是8的整数倍,做对齐使用。

这里switch有三种情况,第一种情况是缓冲区所需大小为 0,那么在为 hchan 分配内存时,只需要分配 sizeof(hchan) 大小的内存;

第二种情况是缓冲区所需大小不为 0,而且数据类型不包含指针,那么就分配连续的内存。注意的是,我们在创建channel的时候可以指定类型为指针类型:

//chan里存入的是int的指针

c := make(chan *int)//chan里存入的是int的值

c := make(chan int)

第三种情况是缓冲区所需大小不为 0,而且数据类型包含指针,那么就不使用add的方式让hchan和buf放在一起了,而是单独的为buf申请一块内存。

发送数据

channel的阻塞非阻塞

在看发送数据的代码之前,我们先看一下什么是channel的阻塞和非阻塞。

一般情况下,传入的参数都是 block=true,即阻塞调用,一个往 channel 中插入数据的 goroutine 会阻塞到插入成功为止。

非阻塞是只这种情况:

select {case c <- v:

... foodefault:

... bar}

编译器会将其改为:

if selectnbsend(c, v) {

... foo

} else {

... bar

}

selectnbsend方法传入的block就是false:

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {

return chansend(c, elem, false, getcallerpc())

}

chansend方法

向通道发送数据我们通过汇编结果可以发现是在runtime 中通过 chansend 实现的,方法比较长下面我们分段来进行理解:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

if c == nil {

// 对于非阻塞的发送,直接返回

if !block {

return false

}

// 对于阻塞的通道,将 goroutine 挂起

gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)

throw("unreachable")

}

...

}

这里会对chan做一个判断,如果它是空的,那么对于非阻塞的发送,直接返回 false;对于阻塞的通道,将 goroutine 挂起,并且永远不会返回。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

...

// 非阻塞的情况下,如果通道没有关闭,满足以下一条:

// 1.没有缓冲区并且当前没有接收者

// 2.缓冲区不为0,并且已满

if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||

(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {

return false

}

...

}

需要注意的是这里是没有加锁的,go虽然在使用指针读取单个值的时候原子性的,但是读取多个值并不能保证,所以在判断完closed虽然是没有关闭的,那么在读取完之后依然可能在这一瞬间从未关闭状态转变成关闭状态。那么就有两种可能:

通道没有关闭,而且已经满了,那么需要返回false,没有问题;

通道关闭,而且已经满了,但是在非阻塞的发送中返回false,也没有问题;

有关go的一致性原语

上面的这些判断被称为 fast path,因为加锁的操作是一个很重的操作,所以能够在加锁之前返回的判断就在加锁之前做好是最好的。

下面接着看看加锁部分的代码:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

...

//加锁

lock(&c.lock)

// 是否关闭的判断

if c.closed != 0 {

unlock(&c.lock)

panic(plainError("send on closed channel"))

}

// 从 recvq 中取出一个接收者

if sg := c.recvq.dequeue(); sg != nil {

// 如果接收者存在,直接向该接收者发送数据,绕过buffer

send(c, sg, ep, func() { unlock(&c.lock) }, 3)

return true

}

...

}

进入了lock区域之后还需要再判断以下close的状态,然后从recvq 中取出一个接收者,如果已经有接收者,那么就向第一个接收者发送当前enqueue的消息。这里需要注意的是如果有接收者在队列中等待,则说明此时的缓冲区是空的。

既然是一行行分析代码,那么我们再进入到send看一下实现:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {

...

if sg.elem != nil {

// 直接把要发送的数据copy到reciever的栈空间

sendDirect(c.elemtype, sg, ep)

sg.elem = nil

gp := sg.g

unlockf()

gp.param = unsafe.Pointer(sg)

if sg.releasetime != 0 {

sg.releasetime = cputicks()

}

// 唤醒对应的 goroutine

goready(gp, skip+1)

}

在send方法里,sg就是goroutine打包好的对象,ep是对应要发送数据的指针,sendDirect方法会调用memmove进行数据的内存拷贝。然后goready函数会唤醒对应的 goroutine进行调度。

回到chansend方法,继续往下看:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

...

// 如果缓冲区没有满,直接将要发送的数据复制到缓冲区

if c.qcount < c.dataqsiz {

// 找到buf要填充数据的索引位置

qp := chanbuf(c, c.sendx)

...

// 将数据拷贝到 buffer 中

typedmemmove(c.elemtype, qp, ep)

// 数据索引前移,如果到了末尾,又从0开始

c.sendx++

if c.sendx == c.dataqsiz {

c.sendx = 0

}

// 元素个数加1,释放锁并返回

c.qcount++

unlock(&c.lock)

return true

}

...

}

这里会判断buf缓冲区有没有满,如果没有满,那么就找到buf要填充数据的索引位置,调用typedmemmove方法将数据拷贝到buf中,然后重新设值sendx偏移量。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

...

// 缓冲区没有空间了,所以对于非阻塞调用直接返回

if !block {

unlock(&c.lock)

return false

}

// 创建 sudog 对象

gp := getg()

mysg := acquireSudog()

mysg.releasetime = 0

if t0 != 0 {

mysg.releasetime = -1

}

mysg.elem = ep

mysg.waitlink = nil

mysg.g = gp

mysg.isSelect = false

mysg.c = c

gp.waiting = mysg

gp.param = nil

// 将sudog 对象入队

c.sendq.enqueue(mysg)

// 进入等待状态

gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

...

}

这里会做两部分的操作,对于非阻塞的调用会直接返回;对于阻塞的调用会创建sudog 对象,然后将sudog对象入队之后gopark将 goroutine 转入 waiting 状态,并解锁。调用gopark之后,在使用者看来该向 channel 发送数据的代码语句会进行阻塞。

这里也需要注意一下,如果缓冲区为0,那么也会进入到这里,会调用到gopark立马阻塞,所以在使用的时候需要记得接收数据,防止向chan发送数据的那一端永远阻塞,如:

func process(timeout time.Duration) bool {

ch := make(chan bool)

go func() {

// 模拟处理耗时的业务

time.Sleep((timeout + time.Second))

ch <- true // block

fmt.Println("exit goroutine")

}()

select {

case result := <-ch:

return result

case <-time.After(timeout):

return false

}

}

如果这里在select的时候直接timeout返回了,而没有调用 result := <-ch,那么goroutine 就会永远阻塞。

到这里发送的代码就讲解完了,整个流程大致如下:

比如我要执行:ch<-10

检查 recvq 是否为空,如果不为空,则从 recvq 头部取一个 goroutine,将数据发送过去;

如果 recvq 为空,,并且buf没有满,则将数据放入到 buf中;

如果 buf已满,则将要发送的数据和当前 goroutine 打包成sudog,然后入队到sendq队列中,并将当前 goroutine 置为 waiting 状态进行阻塞。

接收数据

从chan获取数据实现函数为 chanrecv。下面我们看一下代码实现:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

...

if c == nil {

// 如果 c 为空且是非阻塞调用,那么直接返回 (false,false)

if !block {

return

}

// 阻塞调用直接等待

gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)

throw("unreachable")

}

// 对于非阻塞的情况,并且没有关闭的情况

// 如果是无缓冲chan或者是chan中没有数据,那么直接返回 (false,false)

if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||

c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&

atomic.Load(&c.closed) == 0 {

return

}

// 上锁

lock(&c.lock)

// 如果已经关闭,并且chan中没有数据,返回 (true,false)

if c.closed != 0 && c.qcount == 0 {

if raceenabled {

raceacquire(c.raceaddr())

}

unlock(&c.lock)

if ep != nil {

typedmemclr(c.elemtype, ep)

}

return true, false

}

...

}

chanrecv方法和chansend方法是一样的,首先也是做非空判断,如果chan没有初始化,那么如果是非阻塞调用,那么直接返回 (false,false),阻塞调用会直接等待;

下面的两个if判断我放在一起来进行讲解,因为这里和chansend是不一样的,chanrecv要根据不同条件需要返回不同的结果。

在上锁之前的判断是边界条件的判断:如果是非阻塞调用会判断chan没有发送方(dataqsiz为空且发送队列为空),或chan的缓冲为空(dataqsiz>0 并且qcount==0)并且chan是没有close,那么需要返回 (false,false);而chan已经关闭了,并且buf中没有数据,需要返回 (true,false);

为了实现这个需求,所以在chanrecv方法里面边界条件的判断都使用atomic方法进行了获取。

因为需要正确的得到chan已关闭,并且 buf 空会返回 (true, false),而不是 (false,false),所以在lock上锁之前需要使用atomic来获取参数防止重排序(Happens Before),因此必须使此处的 qcount 和 closed 的读取操作的顺序通过原子操作得到顺序保障。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

...

// 从发送者队列获取数据

if sg := c.sendq.dequeue(); sg != nil {

// 发送者队列不为空,直接从发送者那里提取数据

recv(c, sg, ep, func() { unlock(&c.lock) }, 3)

return true, true

}

...

}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {

// 如果是无缓冲区chan

if c.dataqsiz == 0 {

...

if ep != nil {

// 直接从发送者拷贝数据

recvDirect(c.elemtype, sg, ep)

}

// 有缓冲区chan

} else {

// 获取buf的存放数据指针

qp := chanbuf(c, c.recvx)

...

// 直接从缓冲区拷贝数据给接收者

if ep != nil {

typedmemmove(c.elemtype, ep, qp)

}

// 从发送者拷贝数据到缓冲区

typedmemmove(c.elemtype, qp, sg.elem)

c.recvx++

if c.recvx == c.dataqsiz {

c.recvx = 0

}

c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz

}

sg.elem = nil

gp := sg.g

unlockf()

gp.param = unsafe.Pointer(sg)

if sg.releasetime != 0 {

sg.releasetime = cputicks()

}

// 将发送者唤醒

goready(gp, skip+1)

}

在这里如果有发送者在队列等待,那么直接从发送者那里提取数据,并且唤醒这个发送者。需要注意的是由于有发送者在等待,所以如果有缓冲区,那么缓冲区一定是满的。

在唤醒发送者之前需要对缓冲区做判断,如果是无缓冲区,那么直接从发送者那里提取数据;如果有缓冲区首先会获取recvx的指针,然后将从缓冲区拷贝数据给接收者,再将发送者数据拷贝到缓冲区。

然后将recvx加1,相当于将新的数据移到了队尾,再将recvx的值赋值给sendx,最后调用goready将发送者唤醒,这里有些绕,我们通过图片来展示:

这里展示的是在chansend中将数据拷贝到缓冲区中,当数据满的时候会将sendx的指针置为0,所以当buf环形队列是满的时候sendx等于recvx。

然后再来看看chanrecv中发送者队列有数据的时候移交缓冲区的数据是怎么做这里会将recvx为0处的数据直接从缓存区拷贝数据给接收者,然后将发送者拷贝数据到缓冲区recvx指针处,然后将recvx指针加1并将recvx赋值给sendx,由于是满的所以用recvx加1的效果实现了将新加入的数据入库到队尾的操作。

接着往下看:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

...

// 如果缓冲区中有数据

if c.qcount > 0 {

qp := chanbuf(c, c.recvx)

...

// 从缓冲区复制数据到 ep

if ep != nil {

typedmemmove(c.elemtype, ep, qp)

}

typedmemclr(c.elemtype, qp)

// 接收数据的指针前移

c.recvx++

// 环形队列,如果到了末尾,再从0开始

if c.recvx == c.dataqsiz {

c.recvx = 0

}

// 缓冲区中现存数据减一

c.qcount--

unlock(&c.lock)

return true, true

}

...

}

到了这里,说明缓冲区中有数据,但是发送者队列没有数据,那么将数据拷贝到接收数据的协程,然后将接收数据的指针前移,如果已经到了队尾,那么就从0开始,最后将缓冲区中现存数据减一并解锁。

下面就是缓冲区中没有数据的情况:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

...

// 非阻塞,直接返回

if !block {

unlock(&c.lock)

return false, false

}

// 创建sudog

gp := getg()

mysg := acquireSudog()

mysg.releasetime = 0

if t0 != 0 {

mysg.releasetime = -1

}

mysg.elem = ep

mysg.waitlink = nil

gp.waiting = mysg

mysg.g = gp

mysg.isSelect = false

mysg.c = c

gp.param = nil

// 将sudog添加到接收队列中

c.recvq.enqueue(mysg)

// 阻塞住goroutine,等待被唤醒

gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

...

}

如果是非阻塞调用,直接返回;阻塞调用会将当前goroutine 封装成sudog,然后将sudog添加到接收队列中,调用gopark阻塞住goroutine,等待被唤醒。

(0)

相关推荐

  • recover.panic.defer.2021.03.03

    Defer, Panic, and Recover 在 Go 语言中,recover 和 panic 的关系是什么? 我们先看一个基础的例子,在 main 方法体中启动一个协程,在协程内部主动调用 p ...

  • Go - httpclient 常用操作

    httpclient 模块介绍 httpclient 是基于 net/http  封装的 Go HTTP 客户端请求包,支持常用的请求方式.常用设置,比如: 支持设置 Mock 信息 支持设置失败时告 ...

  • 详解并发编程基础之原子操作(atomic包)

    Go语言中文网 今天 以下文章来源于Golang梦工厂 ,作者AsongGo Golang梦工厂Asong是一名Golang开发工程师,专注于Golang相关技术:Golang面试.Beego.Gin ...

  • 手把手教姐姐写消息队列

    前言 这周姐姐入职了新公司,老板想探探他的底,看了一眼他的简历,呦呵,精通kafka,这小姑娘有两下子,既然这样,那你写一个消息队列吧.因为要用go语言写,这可给姐姐愁坏了.赶紧来求助我,我这么坚贞不 ...

  • Go:如何优雅地实现并发编排任务

    Go语言中文网 昨天 以下文章来源于吴亲强的深夜食堂 ,作者吴亲库里 业务场景 在做任务开发的时候,你们一定会碰到以下场景: 场景1:调用第三方接口的时候, 一个需求你需要调用不同的接口,做数据组装. ...

  • Go并发处理

    写了一个web接口,想高并发的请求这个接口,进行压力测试,所以服务端就实现了一个线程池. 代码从网上理解了之后写的.代码实例 简单的介绍: 首先实现一个Job接口,只要有方法实现了Do方法即可 定义个 ...

  • Go 最细节篇 — chan 为啥没有判断 close 的接口 ?

    大纲 Go 为什么没有判断 close 的接口? Go 关闭 channel 究竟做了什么? `closechan` 一个判断 chan 是否 close 的函数 思考方法一:通过"写&qu ...

  • golang API开发过程的中的自动重启(基于gin框架)

    概要 实现方式 补充 syscall.Exec 概要 基于 golang Gin 框架开发 web 服务时, 需要时不时的 go build , 然后重启服务查看运行结果. go build 的过程集 ...

  • 更简的并发代码,更强的并发控制

    hxl Go语言中文网 昨天有没感觉 Go 的 sync 包不够用?有没遇到类型没有 sync/atomic 支持?我们一起看看 go-zero 的 syncx 包对标准库的一些增值补充.https: ...

  • go编程:说说channel哪些事

    channel是什么 channel中文翻译为通道,它是Go语言内置的数据类型,使用channel不需要导入任何包,像int/float一样直接使用.它主要用于goroutine之间的消息传递和事件通 ...

  • 学习channel设计:从入门到放弃

    前言 哈喽,大家好,我是asong.终于回归了,停更了两周了,这两周一直在搞留言号的事,经过漫长的等待,终于搞定了.兄弟们,以后就可以在留言区尽情开喷了,只要你敢喷,我就敢精选