本文主要介绍了 Go 语言(golang)中的 chan,并从源码层面分析其具体实现,包括创建 channel,发送数据,接收数据以及相关调度等。

以下分析基于 Go 1.17.5

1. 概述

官方对 chan 的描述如下:

A channel provides a mechanism for concurrently executing functions to communicate by sending and receivingvalues of a specified element type. The value of an uninitialized channel is nil.

chan 提供了一种并发通信机制,用于生产和消费某一指定类型数据,未初始化的 chan 的值是nil。

2. 特性与实现

Chan 是 Go 里面的一种数据结构,具有以下特性:

  • goroutine-safe,多个 goroutine 可以同时访问一个 channel 而不会出现并发问题
  • 可以用于在 goroutine 之间存储和传递值
  • 其语义是先入先出(FIFO)
  • 可以导致 goroutine 的 block 和 unblock

内部结构

chan 内部结构如下图:

chan_struct

主要包含以下几个部分:

  • 1)circular queue:循环队列,用于存储数据
  • 2)send index 记录发送的位置
  • 3)receive index 记录接收的位置
  • 4)mutex 锁,用于实现 goroutine safe。

buf 的具体实现很简单,就是一个环形队列,使用 sendx 和 recvx 分别用来记录发送、接收的 offset,然后通过 mutex 互斥锁来保证并发安全。

创建 chan

chan 使用 make 进行初始化,第一个参数指定 chan 中的元素类型,第二个参数用于指定 chan 的缓冲区大小。

1
ch := make(chan string, 3)

上述代码中 make 返回的 ch 实际上是一个指向 heap 中真正的 chan 对象的指针。

chan(即 hchan 结构体) 默认会被分配在堆上,make 返回的只是一个指向该对象的指针

这也是为什么我们可以在函数之间传递 chan,而不是 chan 的指针。

发送、接收与关闭

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func main() {
    ch := make(chan Task, 3)
    for _,task := range hellaTasks {
    	taskCh <- task // 发送
    }
    close(tashCh) // 关闭
}

func worker(ch) {
    for {
        task:= <-taskCh // 接收
        process(task)
    }
}

main goroutine 发送 task 到 chan,然后 worker goroutine 从 chan 中接收 task 并处理,最后 main goroutine 发送完成后关闭 chan。

具体发送过程如下:

  • 1)acquire 加锁
  • 2)enqueue,将 task 对象拷贝到数组里
  • 3)release 释放锁

对于 chan 的关闭,最佳实践是由发送方进行关闭。

接收过程

  • 1)acquire 加锁
  • 2)dequeue
    • 将 task 对象从 数组 中 拷贝出来赋值给用户用于接收的对象
    • task:= <-taskCh,比如这里就是拷贝出来赋值给 task
  • 3)release 释放锁

整个过程中没有任何共享内存,数据都是通过 copy 进行传递,这遵循了 Go 并发设计中很核心的一个理念:

Do not communicate by sharing memory; instead, share memory by communicating.

阻塞与唤醒

hchan 中的 buf 数组大小就是 make chan 时指定的大小。

当 buf 满之后再往 chan 中发送值就会阻塞。

复习一下 goroutine 调度:G 阻塞之后并不会阻塞 M。M 会先把这个 G 暂停(gopark),然后把执行栈切换到 g0,g0 会执行 schedule() 函数,从当前 M 绑定的 P 中查找有没有可以执行的G,有就捞出来继续执行。

先发后收

假设 chan 中已经有 3 个 task 了,然后我们再试着往里面发送一个

1
 taskCh <- task 

runtime 会调用 gopark 将这个 goroutine(姑且称作G1) 切换到 wait 状态。

什么时候会被唤醒呢?

hchan 结构体中还有 sendq、recvq 两个列表,分别记录了等待发送或者接收的 goroutine,如下图所示:

1
2
3
4
5
6
7
8
type hchan struct {
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters
}
type waitq struct {
	first *sudog
	last  *sudog
}

比如前面被阻塞的 G1 就会存入 sendq

sendq

假设此时 G2 从 chan 中取走一个消息

1
 task:= <-taskCh 

G2 取走一个消息后就会找到 sendq 中的第一个对象,把待发送的 elem 直接写入 buf 数组。然后 调用 goready 把对应的 goroutine G1 设置为 runnable 状态。

先收后发

之前是先发送,后接收。现在看一下先接收后发送的情况。

1
 task:= <-taskCh 

G2 直接从空的 chan 中取消息,同样会被阻塞,然后被写入到 hchan 的 recqv 中。

注意:elem 这里的 t 存的是 G2 栈里的地址。

recvq

然后 G1 往 chan 中发送一条消息。

1
 taskCh <- task 

按照上面的逻辑应该是,将 task 写入 buf 数组后,,再把 recvq 中的第一个 goroutine G2 唤醒。

但是 Go 官方这里进行了优化,可以说是一个骚操作。因为 recvq 里的 elem 对象 t 存的就是接收者的内存地址

所以我们可以直接把 G1 发送来的 task 写入 elem 对应的 t 里,即在 G1 里修改 G2 的栈对象。

因为这个时候 G2 还是 gopark,处于 waiting 状态,所以不会出问题。

正常情况下因为不知道两个线程谁先谁后,这样改肯定会出问题。但是在 go runtime 这里,肯定是 G2 先执行,满足 happen-before 所以不存在问题。

通过这样一个骚操作省去了发送和接收时的两次加解锁和内存拷贝。

特性实现原理

到此我们应该明白了 chan 的这些特性的实现原理

  • goroutine-safe.
    • hchan mutex,通过加锁来避免数据竞争。
  • 可以用于在 goroutine 之间存储和传递值,以及先入先出(FIFO)语义。
    • copying into and out of hchan buffer
  • 可以导致 goroutine 的 block 和 unblock
    • 通过 sudog queues 来记录阻塞的 goroutine。
    • 通过 runtime scheduler(gopark, goready)来实现阻塞与唤醒。

3. 源码分析

chan 的所有相关代码都在runtime/chan.go中,还是比较好找的。

内部结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

type waitq struct {
	first *sudog
	last  *sudog
}

其中的 sendx/recvx 、sendq/recvq、buf以及 lock是核心字段,前面都有介绍过,应该比较熟悉了。

创建

在源码中通道的创建由 makechan 方法实现:

1
func makechan(t *chantype, size int) *hchan {}

然后还有两个包装方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
//go:linkname reflect_makechan reflect.makechan
func reflect_makechan(t *chantype, size int) *hchan {
	return makechan(t, size)
}

func makechan64(t *chantype, size int64) *hchan {
	if int64(int(size)) != size {
		panic(plainError("makechan: size out of range"))
	}

	return makechan(t, int(size))
}

内部都是调用的 makechan 方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// 编译器检查 typesize 和 align
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}
	// 计算存放数据元素的内存大小以及是否溢出
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	var c *hchan
	switch {
	case mem == 0:
        // chan的size为0,或者每个元素占用的大小为0(比如struct{}大小就是0,不占空间)
        // 这种情况就不需要单独为buf分配空间
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// 如果队列中不存在指针,那么每个元素都需要被存储并占用空间,占用大小为前面乘法算出来的mem
        // 同时还要加上hchan本身占用的空间大小,加起来就是整个hchan占用的空间大小
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        // 把buf指针指向空的hchan占用空间大小的末尾
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		 // 如果chan中的元素是指针类型的数据,为buf单独开辟mem大小的空间,用来保存所有的数据
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}
     // 元素大小、类型以及缓冲区大小赋值
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
     // 初始化锁
	lockInit(&c.lock, lockRankHchan)

	return c
}

具体流程如下:

  • 1)首先是编译器检查,包括通道元素类型的size以及通道和元素的对齐,然后计算存放数据元素的内存大小以及是否溢出

  • 2)然后根据不同条件进行内存分配

    • 总体的原则是:总内存大小 = hchan需要的内存大小 + 元素需要的内存大小
    • 队列为空或元素大小为0:只需要开辟的内存空间为hchan本身的大小
    • 元素不是指针类型:需要开辟的内存空间=hchan本身大小+每个元素的大小*申请的队列长度
    • 元素是指针类型:这种情况下buf需要单独开辟空间,buf占用内存大小为每个元素的大小*申请的队列长度

3)最后则对chan的其他字段赋值

发送

发送数据到channel时,直观的理解是将数据放到chan的环形队列中,不过go做了一些优化:

  • 先判断是否有等待接收数据的groutine,如果有,直接将数据发给Groutine,唤醒groutine,就不放入队列中了。
    • 这样省去了两次内存拷贝和加锁的开销
  • 当然还有另外一种情况就是:队列如果满了,那就只能放到队列中等待,直到有数据被取走才能发送。

调用链

chan 的发送逻辑涉及到5个方法:

1
2
3
4
5
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {}
func chansend1(c *hchan, elem unsafe.Pointer) {}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {}

chansend1 方法是 go编译代码中c <- x这种写法的入口点,即当我们编写代码 c <- x其实就是调用此方法。 这四个方法的调用关系:chansend1 -> chansend -> send -> sendDirect

具体发送逻辑在chansend这个方法里,然后真正使用的方法其实是对该方法的一层包装。

1
2
3
4
5
6
7
func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
	return chansend(c, elem, false, getcallerpc())
}

chansend

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 判断 channel 是否为 nil
	if c == nil {
		if !block {// 如果非阻塞,直接返回 false
			return false
		}
        // 当向 nil channel 发送数据时,会调用 gopark
		// 而 gopark 会将当前的 goroutine 休眠,并用过第一个参数的 unlockf 来回调唤醒
		// 但此处传递的参数为 nil,因此向 channel 发送数据的 goroutine 和接收数据的 goroutine 都会阻塞,进而死锁
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
	}

	// 对于不阻塞的 send,快速检测失败场景
	// 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
	// 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
	// 2. channel 是缓冲型的,但循环数组已经装满了元素
	// 主要用于 select 语句中,涉及到指令重排队+可观测性
	if !block && c.closed == 0 && full(c) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}
	// 加锁,避免竞争
	lock(&c.lock)
	// 检查 channel 是否已关闭,不允许向关闭的 channel 发送数据
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel")) // 直接panic
	}
	// 从 recvq 队首取出一个接收者,如果存在接收者,就绕过环形队列(buf)直接把 ep 拷贝给 sg,并释放锁
    // 这就是前面提到的,官方做的一个优化,如果有goroutine在等待就直接把数据给该goroutine,没必要在写到buf,然后接收者又从buf中拷贝出来
	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
    // 到这里说明当前没有等待状态的接收者
	// 如果环形队列还未满
	if c.qcount < c.dataqsiz {
        // 拿到 sendx 索引的位置
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
        // 直接把数据从 qp 拷贝到 qp,就是把数据拷贝到环形队列中
		typedmemmove(c.elemtype, qp, ep)
        // 维护 snedx 的值,因为是环形队列,所以到最大值时就重置为0
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
        //qcount即当前chan中的元素个数
		c.qcount++
		unlock(&c.lock)
		return true
	}
    // 到这里说明环形队列已经满了
	// 如果还是要非阻塞的方式发送,就只能返回错误了
	if !block {
		unlock(&c.lock)
		return false
	}
	// 到这里说明缓存队列满了,然后调用法指定是阻塞方式进行发送
	// channel 满了,发送方会被阻塞。接下来会构造一个 sudog
	gp := getg()	// 获取当前 goroutine
	mysg := acquireSudog()// 从对象池获取 sudog
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
    // 把发送的数据(ep)、当前g(gp)、已经当前这个chan(c)都存到sudog中
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
    // 保存当前 sudog,下面要用到做校验
	gp.waiting = mysg
	gp.param = nil
     // 把这个sudog存入sendq队列
	c.sendq.enqueue(mysg)
	atomic.Store8(&gp.parkingOnChan, 1)
    // 调用gopark,挂起当前的 g,将当前的 g 移出调度器的队列
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    // 等到有接收者从chan中取值的时候,这个发送的g又会被重新调度,然后从这里开始继续执行
	KeepAlive(ep)

	// 检验是否为当前的 sudog
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
    // 这里sudog中的success表示的是当前这个通道上是否进行过通信
    // 为 true 则说明是真正的唤醒,chan上有活动(有数据写进来,或者有数据被读取出去)
    // 为 false 则说明是假的唤醒,即当前唤醒是否关闭chan导致的
    // 这里主要根据这个值判断chan是否被关闭了
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
    // 将 sudog 放回对象池
	releaseSudog(mysg)
	if closed {
        // 如果chan被关闭了也是直接panic
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
}

核心逻辑

  • 如果recvq不为空,从recvq中取出一个等待接收数据的Groutine,直接将数据发送给该Groutine
  • 如果recvq为空,才将数据放入buf中
  • 如果buf已满,则将要发送的数据和当前的Groutine打包成Sudog对象放入sendq,并将groutine置为等待状态
  • 等goroutine再次被调度时程序继续执行

send

然后追踪一下 send 方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	// 忽略 race 检查..
	if sg.elem != nil {
        // 直接拷贝到接受者内存,使用写屏障
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g // 取出sudog中记录的g,这里的g就是被阻塞接收者
	unlockf()
	gp.param = unsafe.Pointer(sg) // 更新接收者g的param字段,在recv方法中会用到
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
    // 最后把被阻塞的接收者g唤醒
	goready(gp, skip+1)
}

sendDirect

继续看sendDirect 方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	// src 在当前 goroutine 的栈上,dst 是另一个 goroutine 的栈
    // 直接进行内存"搬迁"
	// 如果目标地址的栈发生了栈收缩,当我们读出了 sg.elem 后
	// 就不能修改真正的 dst 位置的值了
	// 因此需要在读和写之前加上一个屏障
	dst := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
 	// 拷贝内存
	memmove(dst, src, t.size)
}

这里涉及到一个 goroutine 直接写另一个 goroutine 栈的操作,一般而言,不同 goroutine 的栈是各自独有的。而这也违反了 GC 的一些假设。为了不出问题,写的过程中增加了写屏障,保证正确地完成写操作。这样做的好处是减少了一次内存 copy:不用先拷贝到 channel 的 buf,直接由发送者到接收者,没有中间商赚差价,效率得以提高,完美。

接收

从channel读取数据的流程和发送的类似,基本是发送操作的逆操作。

这里同样存在和send一样的优化:从channel读取数据时,不是直接去环形队列中去数据,而是先判断是否有等待发送数据的groutine。如果有,直接将groutine出队列,取出数据返回,并唤醒groutine。如果没有等待发送数据的groutine,再从环形队列中取数据。

调用链

chan的接收涉及到7个方法:

1
2
3
4
5
6
7
func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {}
func chanrecv1(c *hchan, elem unsafe.Pointer) {}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {}

按照发送时的套路可知,只有 chanrecv 是具体逻辑,上面几个都是包装方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
//go:linkname reflect_chanrecv reflect.chanrecv
func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
	return chanrecv(c, elem, !nb)
}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
	return chanrecv(c, elem, false)
}
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
	_, received = chanrecv(c, elem, true)
	return
}

接收操作有两种写法,一种带 “ok”,反应 channel 是否关闭;

一种不带 “ok”,这种写法,当接收到相应类型的零值时无法知道是真实的发送者发送过来的值,还是 channel 被关闭后,返回给接收者的默认类型的零值。

两种写法,都有各自的应用场景。

经过编译器的处理后,这两种写法最后对应源码里的就是不带okchanrecv1和带okchanrecv2这两个函数。

chanrecv

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil,说明忽略了接收值。比如 <-ch 这样,没有接收取到的值
// 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
// 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// 如果是一个 nil 的 channel	
	if c == nil {
        // 如果不阻塞,直接返回 (false, false)
		if !block {
			return
		}
        // 否则,接收一个 nil 的 channel,调用gopark将goroutine 挂起
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable") // 被挂起之后不会执行到这一句
	}
	// 这块主要用在 select 语句中,先大概了解下,比较难懂。。。
	// 快速路径: 在不需要锁的情况下检查失败的非阻塞操作
	// 注意到 channel 不能由已关闭转换为未关闭,则失败的条件是:
	// 1. channel 是非缓冲型的,recvq 队列为空
	// 2. channel 是缓冲型的,buf 为空
	if !block && empty(c) {
		// 此处的 c.closed 必须在条件判断之后进行验证,
        // 因为指令重排后,如果先判断 c.closed,得出 channel 未关闭,无法判断失败条件中channel 是已关闭还是未关闭(从而需要 atomic 操作)
		if atomic.Load(&c.closed) == 0 {
			return
		}
		// 再次检查 channel 是否为空
		if empty(c) {
			// 接收者不为 nil 时返回该类型的零值
			if ep != nil {
				// typedmemclr 逻辑是根据类型清理相应地址的内存
				typedmemclr(c.elemtype, ep)
			}
            // 返回(true,fasle)
            // 返回值1--true:表示被 select case 选中,
            // 返回值2--fasle 表示是否正常收到数据
			return true, false
		}
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}
	// 加锁,保证并发安全
	lock(&c.lock)
	// channel 已关闭,并且循环数组 buf 里没有元素
	// 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
	// 也就是说即使是关闭状态,但在缓冲型的 channel,
	// buf 里有元素的情况下还能接收到元素
	if c.closed != 0 && c.qcount == 0 {
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}
	// 等待发送队列里有 goroutine 存在,说明 buf 是满的
	// 这有可能是:
	// 1. 非缓冲型的 channel
	// 2. 缓冲型的 channel,但 buf 满了
	// 针对 1,直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
	// 针对 2,接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
	if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
	// chan的buf 里有元素,可以正常接收
	if c.qcount > 0 {
		// 直接从循环数组里找到要接收的元素
		qp := chanbuf(c, c.recvx)
        // ep != nil表示代码里,没有忽略要接收的值
        // 即接收的代码不是 "<- ch",而是 "val <- ch"这种,ep 指向 val
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
        // 清理掉循环数组里相应位置的值
		typedmemclr(c.elemtype, qp)
        // 维护接收游标
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
        // buf 数组里的元素个数减 1
		c.qcount--
        // 处理完成,解锁返回
		unlock(&c.lock)
		return true, true
	}
   
	// 到这里说明chan的buf里没有数据了,如果是非阻塞接收就直接返回了
	if !block {
		unlock(&c.lock)
		return false, false
	}

	// 接下来就是要被阻塞的情况了
    // 和发送类似的,构造一个 sudog
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
    // 这里需要注意一下,ep就是我们用来接收值得对象
    // 这里把ep直接存到sudog.elem字段上
	mysg.elem = ep 
	mysg.waitlink = nil
	gp.waiting = mysg // 这个waiting同样是用来唤醒后做校验的
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
    // 加入到chan的recvq队列里
	c.recvq.enqueue(mysg)
	atomic.Store8(&gp.parkingOnChan, 1)
    // 将当前 goroutine 挂起
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// 唤醒后,继续往下执行
    
    // 同样是进行数据校验
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
    // 又是mysg.success,如果chan活动过就是true,否则是false
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)// 将 sudog 放回对象池
    // 到这里如果goroutine被正常唤醒肯定是可以取到数据的
    // 因为recvq的数据是由发送的时候直接copy过来了
	return true, success
}

recv

继续追踪一下 recv 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 非缓冲型的 channel
	if c.dataqsiz == 0 {
        // 并且需要接收值
		if ep != nil {
			// 直接进行内存拷贝
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
        // 需要注意:进入recv方法说明sendq队列里是有值的
		// 那么对缓冲型的 channel来说,sendq有值就意味着buf满了
        // 也就是 recvx和sendx重合了都
        // 这里要做的就是先从buf中读一个数据出来,然后再把发送者发送的数据写入buf
		qp := chanbuf(c, c.recvx)
		// 将接收游标处的数据拷贝给接收者
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		// 从发送者把数据写入 recvx
		typedmemmove(c.elemtype, qp, sg.elem)
        // 然后修改 recvx和sendx 的位置
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
    // 解锁
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
    // 最后唤醒发送的 goroutine
	goready(gp, skip+1)
}

recvDirect

再看一下 recvDirect:

1
2
3
4
5
6
7
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	// 如果是非缓冲型的,就直接从发送者的栈拷贝到接收者的栈。
    // 和sendDirect一样的需要加内存屏障
	src := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size)
}

小结

看了接收部分代码后,整个流程就更新清晰了。

根据前面的发送逻辑可以知道,不管是接收还是发送只要被阻塞了,加入到了 sendq 或者recvq 之后,那么后续的发送或者接收都是由对方进行处理了。

比如接收被阻塞了,当前g构成成一个sudog然后加入到recvq,接着调用了 gopark就已经阻塞了,啥也干不了了。

只能等到有发送者来的时候直接从recvq里把这个sudog取出来,并且直接把要他发送的值拷贝到这个sudog.elem字段上,也就是调用chan接收方法是传进来的哪个值. 最后发送方再调用goready把这个g给唤醒,这样再把剩下的逻辑走完,这个被阻塞了一会的接收者就可以拿着数据返回了。

核心逻辑:

  • 1)如果有等待发送数据的groutine,从sendq中取出一个等待发送数据的Groutine,取出数据
  • 2)如果没有等待的groutine,且环形队列中有数据,从队列中取出数据
  • 3)如果没有等待的groutine,且环形队列中也没有数据,则阻塞该Groutine,并将groutine打包为sudogo加入到recevq等待队列中

3. 关闭

调用链

close 就比较简单了,相关方法就两个:

1
2
3
4
5
//go:linkname reflect_chanclose reflect.chanclose
func reflect_chanclose(c *hchan) {
	closechan(c)
}
func closechan(c *hchan){}

其中一个还是包装方法,真正逻辑就在 clsoechan 里。

每个逻辑都有一个 reflect_xxx 的方法,根据名字猜测是反射的时候用的。

closechan

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func closechan(c *hchan) {
    // 关闭一个nil的chan直接panic
	if c == nil {
		panic(plainError("close of nil channel"))
	}
	// 同样是先加锁
	lock(&c.lock)
    // 判断一下是否被关闭过了,关闭一个已经关闭的chan也是直接panic
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}
	// 修改closed标记为,表示chan已经被关闭了
	c.closed = 1
	// gList 是通过 g.schedlink 链接 G 的列表,一个 G 只能是一次在一个 gQueue 或 gList 上
	// gList 模拟的是栈操作(FILO)
	// gQueue 模拟的是队列操作(FIFO)
	var glist gList

	// 释放所有的接收者
	for {
		sg := c.recvq.dequeue()
        // sg == nil,表示接收队列已为空,跳出循环
		if sg == nil {
			break
		}
        // 如果 elem 不为空说明未忽略接收值,赋值为该类型的零值
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// 释放所有的发送者
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

	// 循环读取 glist 里面的数据,挨个唤醒
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

核心流程:

  • 设置关闭状态
  • 唤醒所有等待读取chanel的协程
  • 所有等待写入channel的协程,抛出异常

4. 小结

存储实现

chan 内部使用一个环形队列实现存储,使用 sendx或recvx进行发送或读取。

并发安全

使用 mutex 保证并发安全。

调度

使用 sendq 和 recvq来暂存由于发送或接收而被阻塞的goroutine。

send/recv的时候都会判断recvq/sendq是否有goroutine正在等待,有则优先处理。

发送

发送的时候发现recvq有goroutine正在等待,说明此时chan的buf是空的,或者chan是个非缓存chan,根本没有buf。

对于发送来说,不管是buf为空还是chan没有buf都是一样的处理逻辑。

此时会直接从recvq中取出第一个g,然后把本次要发送的数据直接写给这个接收者g,并调用goready把这个g唤醒。

接收

如果接收的时候发现sendq有goroutine正在等待,说明buf满了,或者chan是个非缓存chan,根本没有buf。

对于接收来说buf满了或者chan没有buf二者的处理逻辑就不太一样了。

因为需要保证顺序,buf满了就不能直接去读sender的数据了,只能从buf中去。

如果是buf满了:那么会先从buf中读一个值出来(腾一个位置出来),然后把sender发送的值写入buf,并唤醒这个sender g。

如果是没有buf的无缓存chan:那就直接把sender要发送的数据取出来,作为本次取到的数据,然后唤醒sender g。

通过研究底层的源码实现才发现,chan 其实没有那么复杂,底层实现逻辑很清晰。

如果有调度基础的话,看起来就比较简单。

5. 参考

understanding-channels-kavya-joshi

图解Golang channel源码

Go夜读-第 56 期 channel & select 源码分析

Go源码阅读 | channel 设计与实现