Goroutine 与 Channel
Goroutine生命周期
不会引发
pannic
所有的
defer()
正常执行
exit := make(chan struct{})
go func() {
defer close(exit)
func() {
defer func() {
println("b", recover() == nil)
}()
func() {
runtime.Goexit() // 主动退出
}()
}()
}()
<-exit
停掉了服务器内部一个失败的协程而不影响其他协程的工作。因为加入了恢复模式,函数 do
(以及它调用的任何东西)可以通过调用 panic
来摆脱不好的情况。但是恢复是在 panicking
的协程内部的:不能被另外一个协程恢复。
func server(workChan <-chan *Work) {
for work := range workChan {
go safelyDo(work) // start the goroutine for that work
}
}
func safelyDo(work *Work) {
defer func() {
if err := recover(); err != nil {
log.Printf("Work failed with %s in %v", err, work)
}
}()
do(work)
}
Channel
Channel
是用于发送类型化数据的管道,由其负责协程之间的通信,从而避开所有由共享内存导致的陷阱;这种通过通道进行通信的方式保证了同步性。数据在通道中进行传递:在任何给定时间,一个数据被设计为只有一个协程可以对其访问,所以不会发生数据竞争。数据的所有权(可以读写数据的能力)也因此被传递。
Channel
的实质是类型化消息的队列:使数据得以传输。它是 FIFO 结构所以可以保证发送给他们的元素的顺序。
Channel
是第一类对象:可以存储在变量中,作为函数的参数传递,从函数返回以及通过通道发送它们自身。另外它们是类型化的,允许类型检查。
ch := make(chan float64) // 创建通道
defer close(ch) // 关闭通道
ch <- int1 // 流向通道(发送)
int2 = <- ch // 从通道流出(接收)
<- ch // 单独调用获取通道的(下一个)值,当前值会被丢弃
if <- ch != 1000 { // 取出值,然后和 1000 比较
}
// 带判断通道是否关闭的循环读取
for {
v, ok := <-ch
if !ok {
break
}
process(v)
}
// 使用 range 则会自动检测通道是否关闭
for input := range ch {
process(input)
}
只有在当需要告诉接收者不会再提供新的值的时候,才需要关闭通道。只有发送者需要关闭通道,接收者永远不会需要。
将通道限定为单向的
一般用这个限定通道方向来获得更严谨的逻辑,并且这个限定是不可逆的。
c := make(chan int)
var send chan<- int = c // 限定为只写
var recv <-chan int = c // 限定为只读
close(send) // 只能close发送端
无缓冲Channel
一个无缓冲的通道在没有空间来保存数据的时候:必须要一个接收者准备好接收通道的数据然后发送者可以直接把数据发送给接收者。所以通道的发送/接收操作在对方准备好之前是阻塞的:
对于同一个通道,发送操作(协程或者函数中的),在接收者准备好之前是阻塞的:如果ch中的数据无人接收,就无法再给通道传入其他数据:新的输入无法在通道非空的情况下传入。所以发送操作会等待 ch 再次变为可用状态:就是通道值被接收时(可以传入变量)。
对于同一个通道,接收操作是阻塞的(协程或函数中的),直到发送者可用:如果通道中没有数据,接收者就阻塞了。
func main() {
var wg sync.WaitGroup
wg.Add(3)
go doSomething(1, &wg)
go doSomething(2, &wg)
go doSomething(3, &wg)
wg.Wait()
}
func doSomething(id int, wg *sync.WaitGroup) {
defer wg.Done()
log.Printf("before do job:(%d) \n", id)
time.Sleep(3 * time.Second)
log.Printf("after do job:(%d) \n", id)
}
Select
select
可以随机收
,也可以随机发
,在任何一个 case 中执行 break
或者 return
,select 就结束了。
select
做的就是:选择处理列出的多个通信情况中的一个。
如果都阻塞了,会等待直到其中一个可以处理
如果多个可以处理,随机选择一个
如果没有通道操作可以处理并且写了
default
语句,它就会执行:default
永远是可运行的(这就是准备好了,可以执行)。
设置为
nil
的管道,将永远堵塞,不会再参与select
随机选取执行。
a, b := make(chan int), make(chan int)
go func() { // 接收端
for {
var name string
var x int
var ok bool
select { // 从准备好数据的通道中,随机选取一条进行读取
case x, ok = <-a:
name = "from a : "
case x, ok = <-b:
name = "from b : "
if !ok {
b = nil
println("b closed")
break // 跳出这次select
}
// defalt 的加入,可以使 select 避免陷入阻塞,但注意不要引起没必要的空转
default:
println("default : no channel ready")
time.Sleep(time.Second / 5)
}
if !ok {
return
}
println(name, x)
}
}()
// 发送端
go func() {
for i := 0; i < 10; i++ {
select { // 随机写入 a 或 b,每次selec只能写入一次
case a <- i:
time.Sleep(time.Second)
case b <- i * 10:
time.Sleep(time.Second)
}
}
}()
超时器与间时器
tick := time.Tick(3 * time.Second) // 每隔 3s,向通道中发送数据
boom := time.After(10 * time.Second) // 在10s后,向通道中发送数据
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(time.Second)
}
}
取消耗时很长的同步调用
ch := make(chan error, 1) // 缓冲大小设置为 1 是必要的,可以避免协程死锁以及确保超时的通道可以被垃圾回收
go func() { ch <- client.Call("Service.Method", args, &reply) } ()
select {
case resp := <-ch
// use resp and reply
case <-time.After(timeoutNs):
// call timed out
break
}
并行请求每一个数据库并返回收到的第一个响应
func Query(conns []Conn, query string) Result {
ch := make(chan Result, 1) // 必须是带缓冲的:以保证第一个发送进来的数据有地方可以存放
for _, conn := range conns {
go func(c Conn) {
select {
case ch <- c.DoQuery(query):
default:
}
}(conn)
}
return <- ch
}
优雅的关闭Channel
The Channel Closing Principle
不要从接收端关闭 channel
不要关闭有多个并发发送者的channel
sender是唯一 或者是channel最后一个活跃的sender,那么你应该在sender的goroutine关闭channel,从而通知 receivers 已经没有值可以读了
维持这些原则保证了:
永远不会发生向一个已经关闭的channel发送值(会导致panic)
关闭一个已经关闭的channel(会导致panic)
Golang甚至禁止关闭 receive-only 类型的channel
一发多收
这种情况下,直接在 sender 处,关闭就好
多发一收
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
const MaxRandomNumber = 1000
const NumSenders = 10
dataCh := make(chan int, 100)
stopCh := make(chan struct{}) // 额外的一个Channel
// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
value := rand.Intn(MaxRandomNumber)
select {
case <-stopCh:
fmt.Println("got from stop, returned")
return
case dataCh <- value:
}
}
}()
}
// one receiver
go func() {
for value := range dataCh {
if value == MaxRandomNumber-1 {
fmt.Println("close stopCh")
close(stopCh) // 这个命令让所有的sender都 return 了
fmt.Println("receiver returned")
return // 自己再退出,这样就没有引用 dataCh 和 stopCh 的 Goroutine 了
}
}
}()
time.Sleep(1000 * time.Second)
}
多发多收
通过一个主持人moderator
携程,可以优雅的退出所有使用到 dataChan 的协程,从而将Channel回收
func main() {
const (
MaxRandomNumber = 100000
NumSenders = 20
NumReceivers = 10
)
dataCh := make(chan int, 100) // senders 写入 receviers 接收
stopCh := make(chan struct{}) // senders 与 receviers 监听, 收到后退出协程
// moderator 监听,senders 与 receviers 都可以写入
// 收到后,关闭所有 senders 和 receviers
toStop := make(chan string, 1)
// moderator
go func() {
<-toStop
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(MaxRandomNumber)
if value == 0 {
select {
case toStop <- "sender#" + id:
default:
}
return
}
select {
case <-stopCh:
return
default:
}
select {
case <-stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
for {
select {
case <-stopCh:
return
default:
}
select {
case <-stopCh:
return
case value := <-dataCh:
if value == MaxRandomNumber-1 {
select {
case toStop <- "receiver#" + id:
default:
}
return
}
}
}
}(strconv.Itoa(i))
}
time.Sleep(1000 * time.Second)
}
通道工厂模式
不将通道作为参数传递给协程,而用函数来生成一个通道并返回(工厂角色);函数内有个匿名函数被协程调用。
func main() {
stream := pump()
go suck(stream)
time.Sleep(1e9)
}
func pump() chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
ch <- i
}
}()
return ch
}
func suck(ch chan int) {
for {
fmt.Println(<-ch)
}
}
Feature 模式
func main() {
retCh := AsyncService() // 下面两句 并行执行
otherTask()
select { // 等待 第一句的结果,并且有超时机制
case ret := <-retCh:
fmt.Println("get from retCh : ", ret)
case <-time.After(time.Millisecond * 200): // 200 ms 超时
fmt.Println("time out")
}
}
func service() string {
time.Sleep(time.Millisecond * 50) // 50 ms 执行完的一个Service
return "Done"
}
func AsyncService() chan string { // 使用协程将它包装成一个异步操作
retCh := make(chan string, 1) // 思考下,有 buffer 和 无buffer 的区别 ?
go func() {
ret := service()
retCh <- ret // 如果是无 buffer ,则主调函数未取数据之前,回阻塞在此
}()
return retCh
}
func otherTask() {
time.Sleep(time.Millisecond * 100) // 100ms 执行完的一个 Task
}
使用锁还是通道?
对于任何可以建模为 Master-Worker 范例的问题,一个类似于 worker 使用通道进行通信和交互、Master进行整体协调的方案都能完美解决。如果系统部署在多台机器上,各个机器上执行 Worker 协程,Master 和 Worker 之间使用 netchan 或者 RPC 进行通信。
使用锁的情景:
访问共享数据结构中的缓存信息
保存应用程序上下文和状态信息数据
使用通道的情景:
与异步操作的结果进行交互
分发任务
传递数据所有权
当你发现你的锁使用规则变得很复杂时,可以反省使用通道会不会使问题变得简单些。
惰性求值器
type Any interface{}
type EvalFunc func(Any) (Any, Any)
func main() {
evenFunc := func(state Any) (Any, Any) {
os := state.(int)
ns := os + 2
return os, ns
}
even := BuildLazyIntEvaluator(evenFunc, 0)
for i := 0; i < 10; i++ {
fmt.Printf("%vth even: %v\n", i, even())
}
}
func BuildLazyEvaluator(evalFunc EvalFunc, initState Any) func() Any {
retValChan := make(chan Any)
loopFunc := func() {
var actState Any = initState
var retVal Any
for {
retVal, actState = evalFunc(actState)
retValChan <- retVal
}
}
retFunc := func() Any {
return <- retValChan
}
go loopFunc()
return retFunc
}
func BuildLazyIntEvaluator(evalFunc EvalFunc, initState Any) func() int {
ef := BuildLazyEvaluator(evalFunc, initState)
return func() int {
return ef().(int)
}
}