16 KiB
16 KiB
第五章:并发编程 —— Goroutine 与 Channel 的艺术
本章目标:深入理解 Go 并发的核心机制,掌握 Goroutine 的调度原理、Channel 的通信模式、同步原语的使用、并发设计模式、竞态检测与上下文控制,能够编写高效、安全、可扩展的并发程序。
5.1 并发基础与 Goroutine
5.1.1 并发 vs 并行
- 并发(Concurrency):多个任务在同一时间段内交替执行(宏观上同时,微观上交替)。
- 并行(Parallelism):多个任务在同一时刻同时执行(需要多核 CPU)。
Go 语言的设计哲学是并发优先,通过 Goroutine 和 Channel 让并发编程变得简单。
5.1.2 Goroutine 基础
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
go say("world") // 启动 Goroutine
say("hello") // 主 Goroutine
time.Sleep(1 * time.Second) // 等待子 Goroutine 完成
}
深度解析:
go关键字启动一个轻量级线程(Goroutine)- Goroutine 由 Go 运行时(Runtime) 调度,而非操作系统
- 主函数退出会导致所有 Goroutine 终止(需要等待机制)
5.1.3 Goroutine 的轻量性
func goroutineCount() {
for i := 0; i < 10000; i++ {
go func(id int) {
// 轻量级任务
}(i)
}
time.Sleep(1 * time.Second)
fmt.Println("成功启动 10000 个 Goroutine")
}
深度解析:
- 初始栈大小仅 2KB(操作系统线程通常 1-8MB)
- 栈可动态增长(最大可达 GB 级别)
- 创建开销极小,可轻松创建百万级 Goroutine
- 上下文切换由 Go 运行时在用户态完成,无需陷入内核
5.2 GMP 调度模型 ⭐ 核心重点
Go 的并发性能得益于其独特的 GMP 调度模型。
5.2.1 GMP 模型简介
- G (Goroutine):协程,包含栈、指令指针、状态等
- M (Machine):操作系统线程,真正执行代码的实体
- P (Processor):逻辑处理器,管理 G 队列,调度 G 到 M 上执行
+----------+ +----------+ +----------+
| G1 | | G2 | | G3 |
+----------+ +----------+ +----------+
| | |
v v v
+------------------------------------------------+
| P (逻辑处理器) |
| [G 队列] 调度器 (工作窃取) |
+------------------------------------------------+
| | |
v v v
+----------+ +----------+ +----------+
| M1 | | M2 | | M3 |
+----------+ +----------+ +----------+
5.2.2 调度流程
- 启动:创建 G,放入 P 的本地队列
- 调度:M 获取 P,从 P 的本地队列取出 G 执行
- 阻塞:G 阻塞(如 IO),M 与 P 分离,P 继续调度其他 G
- 工作窃取:P 的队列为空时,从其他 P 窃取 G
- 系统调用:G 发起系统调用,M 阻塞,P 寻找新 M 继续调度
5.2.3 调试与监控
import "runtime"
func debugGoroutine() {
runtime.GOMAXPROCS(4) // 设置最大 P 数量(通常等于 CPU 核数)
fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
fmt.Printf("NumCPU: %d\n", runtime.NumCPU())
fmt.Printf("NumGoroutine: %d\n", runtime.NumGoroutine())
}
5.3 Channel:Goroutine 间的通信
Go 哲学:不要通过共享内存来通信,而要通过通信来共享内存。
5.3.1 Channel 的创建与基本操作
func channelBasic() {
// 无缓冲 Channel(同步)
ch := make(chan int)
go func() {
ch <- 42 // 发送
}()
val := <-ch // 接收(阻塞直到有数据)
fmt.Println(val) // 42
// 有缓冲 Channel(异步)
bufferedCh := make(chan int, 2)
bufferedCh <- 1
bufferedCh <- 2
// bufferedCh <- 3 // 阻塞(缓冲区满)
fmt.Println(<-bufferedCh) // 1
fmt.Println(<-bufferedCh) // 2
}
深度解析:
- 无缓冲 Channel:发送和接收必须同时就绪(同步通信)
- 有缓冲 Channel:发送直到缓冲区满,接收直到缓冲区空
- 方向:
chan<- T(只发送)、<-chan T(只接收)
5.3.2 Channel 的关闭与遍历
func channelClose() {
ch := make(chan int)
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 关闭 Channel
}()
// 遍历 Channel
for val := range ch {
fmt.Println(val)
}
// 检查是否关闭
val, ok := <-ch
if !ok {
fmt.Println("Channel 已关闭")
}
}
深度解析:
- 只能由发送方关闭 Channel
- 关闭后仍可接收剩余数据,接收值为零值
- 重复关闭或向已关闭 Channel 发送会 panic
range自动遍历直到 Channel 关闭
5.3.3 Channel 的常见模式
1. 工作池(Worker Pool)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("Worker %d 开始处理 %d\n", id, j)
time.Sleep(time.Second)
results <- j * 2
}
}
func workerPoolDemo() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 启动 3 个 worker
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 发送 5 个任务
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 接收结果
for r := 1; r <= 5; r++ {
<-results
}
}
2. 扇出(Fan-out)与扇入(Fan-in)
func fanOutFanIn() {
in := make(chan int)
// 扇出:多个 worker 处理
c1 := make(chan int)
c2 := make(chan int)
go func() {
for n := range in {
c1 <- n * 2
}
close(c1)
}()
go func() {
for n := range in {
c2 <- n * 3
}
close(c2)
}()
// 扇入:合并结果
out := make(chan int)
go func() {
var count int
for c := range []chan int{c1, c2} {
for n := range c {
out <- n
count++
if count == 2 { // 简化逻辑
break
}
}
}
close(out)
}()
// 发送
go func() {
in <- 1
close(in)
}()
// 接收
for n := range out {
fmt.Println(n)
}
}
3. 选择器(Select)
func selectDemo() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "来自 ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "来自 ch2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
case <-time.After(3 * time.Second):
fmt.Println("超时")
return
default:
fmt.Println("无数据,做其他事")
time.Sleep(500 * time.Millisecond)
}
}
}
深度解析:
select阻塞直到某个 case 就绪- 多个 case 就绪时,随机选择一个
default使 select 非阻塞time.After用于超时控制
5.4 同步原语(sync 包)
5.4.1 WaitGroup
func waitGroupDemo() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Worker %d 开始\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d 完成\n", id)
}(i)
}
wg.Wait() // 等待所有 Goroutine 完成
fmt.Println("所有任务完成")
}
深度解析:
Add(n):增加计数器Done():减 1(通常用defer)Wait():阻塞直到计数器为 0- 注意:
Add必须在 Goroutine 启动前调用,或defer Done
5.4.2 Mutex(互斥锁)
func mutexDemo() {
var mu sync.Mutex
var count int
for i := 0; i < 1000; i++ {
go func() {
mu.Lock()
count++
mu.Unlock()
}()
}
time.Sleep(1 * time.Second)
fmt.Println("Count:", count) // 1000
}
深度解析:
Lock():加锁,阻塞直到获得锁Unlock():解锁- 死锁:避免嵌套锁、保持锁顺序
- 性能:竞争激烈时性能下降,考虑使用
atomic或RWMutex
5.4.3 RWMutex(读写锁)
func rwMutexDemo() {
var mu sync.RWMutex
var data = make(map[string]int)
// 读操作
go func() {
mu.RLock()
_ = data["key"]
mu.RUnlock()
}()
// 写操作
go func() {
mu.Lock()
data["key"] = 1
mu.Unlock()
}()
}
深度解析:
RLock()/RUnlock():读锁,允许多个读者Lock()/Unlock():写锁,排他- 适用场景:读多写少
5.4.4 Once(单次执行)
func onceDemo() {
var once sync.Once
setup := func() {
fmt.Println("初始化一次")
}
for i := 0; i < 5; i++ {
go func() {
once.Do(setup)
}()
}
time.Sleep(1 * time.Second)
}
深度解析:
Do(f):确保f只执行一次- 适用场景:单例模式、延迟初始化
5.4.5 Cond(条件变量)
func condDemo() {
var cond *sync.Cond
list := []string{}
cond = sync.NewCond(&sync.Mutex{})
// 消费者
go func() {
cond.L.Lock()
for len(list) == 0 {
cond.Wait() // 等待通知
}
item := list[0]
list = list[1:]
fmt.Println("消费:", item)
cond.L.Unlock()
}()
// 生产者
time.Sleep(500 * time.Millisecond)
cond.L.Lock()
list = append(list, "item1")
cond.Signal() // 通知一个
cond.L.Unlock()
}
5.5 原子操作(atomic 包)
func atomicDemo() {
var count int64
for i := 0; i < 1000; i++ {
go func() {
atomic.AddInt64(&count, 1)
}()
}
time.Sleep(1 * time.Second)
fmt.Println("Count:", atomic.LoadInt64(&count))
}
深度解析:
- 比 Mutex 更快,无锁操作
- 支持
int32,int64,uint32,uint64,uintptr,unsafe.Pointer - 常用操作:
Add,Load,Store,Swap,CompareAndSwap
5.6 Context(上下文控制)
5.6.1 Context 基础
func contextDemo() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
go func() {
select {
case <-time.After(3 * time.Second):
fmt.Println("任务完成")
case <-ctx.Done():
fmt.Println("任务取消:", ctx.Err())
}
}()
time.Sleep(3 * time.Second)
}
深度解析:
WithCancel:手动取消WithTimeout:超时自动取消WithDeadline:指定时间取消WithValue:传递键值对(慎用)
5.6.2 上下文传递最佳实践
- 不要将 Context 存入结构体
- 不要传递
nilContext,使用context.Background() - 不要用 Context 传递业务数据,仅用于取消/超时/追踪
5.7 竞态检测(Race Detector)
go run -race main.go
go test -race ./...
深度解析:
- 检测数据竞争(多个 Goroutine 同时访问同一变量,至少一个是写)
- 性能开销约 2-5 倍,仅用于测试
- 发现竞态后,使用 Mutex、Channel 或 atomic 修复
5.8 并发设计模式
5.8.1 管道(Pipeline)
func pipelineDemo() {
// 阶段 1:生成
gen := func(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 阶段 2:平方
sq := func(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 执行
c := gen(1, 2, 3)
c = sq(c)
for n := range c {
fmt.Println(n) // 1, 4, 9
}
}
5.8.2 重试与退避
func retryWithBackoff() {
var attempts int
maxAttempts := 5
for {
attempts++
if attempts > maxAttempts {
fmt.Println("重试失败")
return
}
if attempt() {
fmt.Println("成功")
return
}
time.Sleep(time.Duration(attempts*100) * time.Millisecond)
}
}
func attempt() bool {
// 模拟失败
return false
}
5.8.3 限流(Rate Limiting)
func rateLimiter() {
limiter := time.NewTicker(100 * time.Millisecond)
defer limiter.Stop()
for i := 0; i < 10; i++ {
<-limiter.C
fmt.Println("处理任务", i)
}
}
5.9 深度实践:综合案例
5.9.1 并发爬虫
type Crawler struct {
visited map[string]bool
mu sync.Mutex
wg sync.WaitGroup
limit chan struct{}
}
func (c *Crawler) crawl(url string, depth int) {
if depth <= 0 {
return
}
c.limit <- struct{}{}
defer func() { <-c.limit }()
defer c.wg.Done()
c.mu.Lock()
if c.visited[url] {
c.mu.Unlock()
return
}
c.visited[url] = true
c.mu.Unlock()
fmt.Println("抓取:", url)
// 模拟获取子链接
links := []string{url + "/1", url + "/2"}
for _, link := range links {
c.wg.Add(1)
go c.crawl(link, depth-1)
}
}
func concurrentCrawlerDemo() {
crawler := &Crawler{
visited: make(map[string]bool),
limit: make(chan struct{}, 10), // 限制并发数
}
crawler.wg.Add(1)
go crawler.crawl("http://example.com", 3)
crawler.wg.Wait()
fmt.Println("爬虫完成")
}
5.9.2 并发地图聚合
func concurrentMapAggregation() {
data := make(map[string]int)
var mu sync.Mutex
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key%d", id%10)
mu.Lock()
data[key]++
mu.Unlock()
}(i)
}
wg.Wait()
fmt.Println(data)
}
5.10 常见陷阱与最佳实践
5.10.1 Goroutine 泄漏
// 错误:Channel 未关闭导致泄漏
func leak() {
ch := make(chan int)
go func() {
ch <- 1
// 未关闭,接收方阻塞
}()
<-ch
}
// 正确
func noLeak() {
ch := make(chan int)
go func() {
defer close(ch)
ch <- 1
}()
<-ch
}
5.10.2 死锁
// 错误:嵌套锁
func deadlock() {
var mu1, mu2 sync.Mutex
go func() {
mu1.Lock()
mu2.Lock()
// ...
mu2.Unlock()
mu1.Unlock()
}()
go func() {
mu2.Lock() // 等待 mu2
mu1.Lock() // 等待 mu1(死锁)
// ...
mu2.Unlock()
mu1.Unlock()
}()
}
5.10.3 最佳实践
- 优先使用 Channel:避免共享内存
- 限制并发数:使用信号量或 Worker Pool
- 及时关闭 Channel:避免泄漏
- 使用 Context:传递取消信号
- 运行 Race Detector:测试时开启
- 避免 Goroutine 泄漏:确保所有 Goroutine 能退出
- 锁的粒度:尽量缩小锁的范围
5.11 课后练习
- 并发计数器:使用 Mutex 和 atomic 实现并发安全的计数器
- 并发爬虫:实现一个带并发限制的网页爬虫
- 管道处理:实现一个多阶段的数据处理管道
- 超时控制:实现一个带超时的任务执行器
- 竞态检测:编写一个有竞态的程序,用
-race检测并修复
5.12 下一步
完成本章后,你将进入第六章:实战项目,构建一个完整的 Web API 服务,综合运用前面所有知识(数据结构、函数、接口、并发)。
代码仓库位置:https://giter.top/openclaw/test/tree/main/chapters/chapter-5
下一章预告:HTTP 服务器、路由、中间件、数据库连接池、RESTful API 设计、部署