一个非阻塞的channel
可以被用来实现semaphore
。semaphore
可以看作可以由多个人同时拥有的互斥锁。如果channel
的缓冲区长度为N
,那它可以被看作一个最多被N
个人同时拥有的互斥锁。而互斥锁可以看作N = 1
的特殊情况。
semaphore
通常被用于限制最大并行数量。
和互斥锁一样,也有以下两种方式实现semaphore
:
通过发送数据拥有,通过接收数据释放
通过接收数据拥有,通过发送数据释放
以下是使用第二种方式的一个例子:
package main
import (
"log"
"time"
"math/rand"
)
type Seat int
type Bar chan Seat
func (bar Bar) ServeCustomer(c int) {
log.Print("customer#", c, " enters the bar")
seat := <- bar // 需要一个椅子,才能喝酒
log.Print("++ customer#", c, " drinks at seat#", seat)
time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
log.Print("-- customer#", c, " frees seat#", seat)
bar <- seat // 离开酒吧,空椅子多了一个
}
func main() {
rand.Seed(time.Now().UnixNano())
// 一共有10个椅子。
bar24x7 := make(Bar, 10)
// 把椅子放入酒吧。
for seatId := 0; seatId < cap(bar24x7); seatId++ {
// 这些send操作都不会阻塞
bar24x7 <- Seat(seatId)
}
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
go bar24x7.ServeCustomer(customerId)
}
for {time.Sleep(time.Second)}
}
在以上例子中,每个顾客需要一个椅子才能喝酒。所以任何时刻最多有10名顾客在喝酒。
main
函数里最后的for
循环是用来防止程序退出的。其实还有更好的方法,我们以后会介绍。
在以上例子中,虽然任何时刻最多有10名顾客在喝酒,但可能有超过10名顾客同时在酒吧里,因为有的顾客在等空椅子。虽然每个顾客的goroutine
比系统线程消耗的资源少很多,大量goroutine
的资源消耗也是不能忽略的。所以可以等有空椅子时,再创建一个顾客用的goroutine
:
... // 这些代码和上面一样
func (bar Bar) ServeCustomerAtSeat(c int, seat Seat) {
log.Print("++ customer#", c, " drinks at seat#", seat)
time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
log.Print("-- customer#", c, " frees seat#", seat)
bar <- seat // 离开酒吧,空椅子多了一个
}
func main() {
rand.Seed(time.Now().UnixNano())
bar24x7 := make(Bar, 10)
for seatId := 0; seatId < cap(bar24x7); seatId++ {
bar24x7 <- Seat(seatId)
}
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
// 需要一个座位去服务下一名顾客。
seat := <- bar24x7
go bar24x7.ServeCustomerAtSeat(customerId, seat)
}
for {time.Sleep(time.Second)}
}
现在,同时最多有10个顾客用的goroutine
了。但整个程序的运行过程中,依然会创建大量的goroutine
。
在一个更高效的版本中,整个程序中最多10个goroutine
会被创建:
... // 这些代码和上面一样
func (bar Bar) ServeCustomerAtSeat(consumers chan int) {
for c := range consumers {
seatId := <- bar
log.Print("++ customer#", c, " drinks at seat#", seatId)
time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
log.Print("-- customer#", c, " frees seat#", seatId)
bar <- seatId // 离开酒吧,空椅子多了一个
}
}
func main() {
rand.Seed(time.Now().UnixNano())
bar24x7 := make(Bar, 10)
for seatId := 0; seatId < cap(bar24x7); seatId++ {
bar24x7 <- Seat(seatId)
}
consumers := make(chan int)
for i := 0; i < cap(bar24x7); i++ {
go bar24x7.ServeCustomerAtSeat(consumers)
}
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
consumers <- customerId
}
}
题外话:如果我们不介意椅子编号的话,bar24x7
这个semaphore
甚至都不需要:
... // 这些代码和上面一样
func ServeCustomer(consumers chan int) {
for c := range consumers {
log.Print("++ customer#", c, " drinks at the bar")
time.Sleep(time.Second * time.Duration(2 + rand.Intn(6)))
log.Print("-- customer#", c, " leaves the bar")
}
}
func main() {
rand.Seed(time.Now().UnixNano())
const BarSeatCount = 10
consumers := make(chan int)
for i := 0; i < BarSeatCount; i++ {
go ServeCustomer(consumers)
}
for customerId := 0; ; customerId++ {
time.Sleep(time.Second)
consumers <- customerId
}
}
如果通过发送数据拥有semaphore
的话,代码也是类似的。甚至不需要放椅子:
package main
import (
"log"
"time"
"math/rand"
)
type Customer struct{id int}
type Bar chan Customer
func (bar Bar) ServeCustomer(c Customer) {
log.Print("++ customer#", c.id, " starts drinking")
time.Sleep(time.Second * time.Duration(3 + rand.Intn(16)))
log.Print("-- customer#", c.id, " leaves the bar")
<- bar // 离开酒吧,空位多了一个
}
func main() {
rand.Seed(time.Now().UnixNano())
// 酒吧最多能同时服务10名顾客。
bar24x7 := make(Bar, 10)
for customerId := 0; ; customerId++ {
time.Sleep(time.Second * 2)
customer := Customer{customerId}
// 等待进入酒吧。
bar24x7 <- customer
go bar24x7.ServeCustomer(customer)
}
for {time.Sleep(time.Second)}
}