# 第五章:并发编程 —— Goroutine 与 Channel 的艺术 > **本章目标**:深入理解 Go 并发的核心机制,掌握 Goroutine 的调度原理、Channel 的通信模式、同步原语的使用、并发设计模式、竞态检测与上下文控制,能够编写高效、安全、可扩展的并发程序。 ## 5.1 并发基础与 Goroutine ### 5.1.1 并发 vs 并行 - **并发(Concurrency)**:多个任务在**同一时间段**内交替执行(宏观上同时,微观上交替)。 - **并行(Parallelism)**:多个任务在**同一时刻**同时执行(需要多核 CPU)。 Go 语言的设计哲学是**并发优先**,通过 Goroutine 和 Channel 让并发编程变得简单。 ### 5.1.2 Goroutine 基础 ```go package main import ( "fmt" "time" ) func say(s string) { for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func main() { go say("world") // 启动 Goroutine say("hello") // 主 Goroutine time.Sleep(1 * time.Second) // 等待子 Goroutine 完成 } ``` **深度解析**: - `go` 关键字启动一个**轻量级线程**(Goroutine) - Goroutine 由 **Go 运行时(Runtime)** 调度,而非操作系统 - 主函数退出会导致所有 Goroutine 终止(需要等待机制) ### 5.1.3 Goroutine 的轻量性 ```go func goroutineCount() { for i := 0; i < 10000; i++ { go func(id int) { // 轻量级任务 }(i) } time.Sleep(1 * time.Second) fmt.Println("成功启动 10000 个 Goroutine") } ``` **深度解析**: - 初始栈大小仅 **2KB**(操作系统线程通常 1-8MB) - 栈可动态增长(最大可达 GB 级别) - 创建开销极小,可轻松创建**百万级** Goroutine - 上下文切换由 Go 运行时在**用户态**完成,无需陷入内核 --- ## 5.2 GMP 调度模型 ⭐ 核心重点 Go 的并发性能得益于其独特的 **GMP 调度模型**。 ### 5.2.1 GMP 模型简介 - **G (Goroutine)**:协程,包含栈、指令指针、状态等 - **M (Machine)**:操作系统线程,真正执行代码的实体 - **P (Processor)**:逻辑处理器,管理 G 队列,调度 G 到 M 上执行 ``` +----------+ +----------+ +----------+ | G1 | | G2 | | G3 | +----------+ +----------+ +----------+ | | | v v v +------------------------------------------------+ | P (逻辑处理器) | | [G 队列] 调度器 (工作窃取) | +------------------------------------------------+ | | | v v v +----------+ +----------+ +----------+ | M1 | | M2 | | M3 | +----------+ +----------+ +----------+ ``` ### 5.2.2 调度流程 1. **启动**:创建 G,放入 P 的本地队列 2. **调度**:M 获取 P,从 P 的本地队列取出 G 执行 3. **阻塞**:G 阻塞(如 IO),M 与 P 分离,P 继续调度其他 G 4. **工作窃取**:P 的队列为空时,从其他 P 窃取 G 5. **系统调用**:G 发起系统调用,M 阻塞,P 寻找新 M 继续调度 ### 5.2.3 调试与监控 ```go import "runtime" func debugGoroutine() { runtime.GOMAXPROCS(4) // 设置最大 P 数量(通常等于 CPU 核数) fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0)) fmt.Printf("NumCPU: %d\n", runtime.NumCPU()) fmt.Printf("NumGoroutine: %d\n", runtime.NumGoroutine()) } ``` --- ## 5.3 Channel:Goroutine 间的通信 > **Go 哲学**:不要通过共享内存来通信,而要通过通信来共享内存。 ### 5.3.1 Channel 的创建与基本操作 ```go func channelBasic() { // 无缓冲 Channel(同步) ch := make(chan int) go func() { ch <- 42 // 发送 }() val := <-ch // 接收(阻塞直到有数据) fmt.Println(val) // 42 // 有缓冲 Channel(异步) bufferedCh := make(chan int, 2) bufferedCh <- 1 bufferedCh <- 2 // bufferedCh <- 3 // 阻塞(缓冲区满) fmt.Println(<-bufferedCh) // 1 fmt.Println(<-bufferedCh) // 2 } ``` **深度解析**: - **无缓冲 Channel**:发送和接收必须**同时就绪**(同步通信) - **有缓冲 Channel**:发送直到缓冲区满,接收直到缓冲区空 - **方向**:`chan<- T`(只发送)、`<-chan T`(只接收) ### 5.3.2 Channel 的关闭与遍历 ```go func channelClose() { ch := make(chan int) go func() { for i := 0; i < 5; i++ { ch <- i } close(ch) // 关闭 Channel }() // 遍历 Channel for val := range ch { fmt.Println(val) } // 检查是否关闭 val, ok := <-ch if !ok { fmt.Println("Channel 已关闭") } } ``` **深度解析**: - 只能由**发送方**关闭 Channel - 关闭后仍可接收剩余数据,接收值为零值 - 重复关闭或向已关闭 Channel 发送会 **panic** - `range` 自动遍历直到 Channel 关闭 ### 5.3.3 Channel 的常见模式 #### 1. 工作池(Worker Pool) ```go func worker(id int, jobs <-chan int, results chan<- int) { for j := range jobs { fmt.Printf("Worker %d 开始处理 %d\n", id, j) time.Sleep(time.Second) results <- j * 2 } } func workerPoolDemo() { jobs := make(chan int, 100) results := make(chan int, 100) // 启动 3 个 worker for w := 1; w <= 3; w++ { go worker(w, jobs, results) } // 发送 5 个任务 for j := 1; j <= 5; j++ { jobs <- j } close(jobs) // 接收结果 for r := 1; r <= 5; r++ { <-results } } ``` #### 2. 扇出(Fan-out)与扇入(Fan-in) ```go func fanOutFanIn() { in := make(chan int) // 扇出:多个 worker 处理 c1 := make(chan int) c2 := make(chan int) go func() { for n := range in { c1 <- n * 2 } close(c1) }() go func() { for n := range in { c2 <- n * 3 } close(c2) }() // 扇入:合并结果 out := make(chan int) go func() { var count int for c := range []chan int{c1, c2} { for n := range c { out <- n count++ if count == 2 { // 简化逻辑 break } } } close(out) }() // 发送 go func() { in <- 1 close(in) }() // 接收 for n := range out { fmt.Println(n) } } ``` #### 3. 选择器(Select) ```go func selectDemo() { ch1 := make(chan string) ch2 := make(chan string) go func() { time.Sleep(1 * time.Second) ch1 <- "来自 ch1" }() go func() { time.Sleep(2 * time.Second) ch2 <- "来自 ch2" }() for i := 0; i < 2; i++ { select { case msg1 := <-ch1: fmt.Println(msg1) case msg2 := <-ch2: fmt.Println(msg2) case <-time.After(3 * time.Second): fmt.Println("超时") return default: fmt.Println("无数据,做其他事") time.Sleep(500 * time.Millisecond) } } } ``` **深度解析**: - `select` 阻塞直到某个 case 就绪 - 多个 case 就绪时,**随机选择**一个 - `default` 使 select 非阻塞 - `time.After` 用于超时控制 --- ## 5.4 同步原语(sync 包) ### 5.4.1 WaitGroup ```go func waitGroupDemo() { var wg sync.WaitGroup for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() fmt.Printf("Worker %d 开始\n", id) time.Sleep(time.Second) fmt.Printf("Worker %d 完成\n", id) }(i) } wg.Wait() // 等待所有 Goroutine 完成 fmt.Println("所有任务完成") } ``` **深度解析**: - `Add(n)`:增加计数器 - `Done()`:减 1(通常用 `defer`) - `Wait()`:阻塞直到计数器为 0 - **注意**:`Add` 必须在 Goroutine 启动前调用,或 `defer Done` ### 5.4.2 Mutex(互斥锁) ```go func mutexDemo() { var mu sync.Mutex var count int for i := 0; i < 1000; i++ { go func() { mu.Lock() count++ mu.Unlock() }() } time.Sleep(1 * time.Second) fmt.Println("Count:", count) // 1000 } ``` **深度解析**: - `Lock()`:加锁,阻塞直到获得锁 - `Unlock()`:解锁 - **死锁**:避免嵌套锁、保持锁顺序 - **性能**:竞争激烈时性能下降,考虑使用 `atomic` 或 `RWMutex` ### 5.4.3 RWMutex(读写锁) ```go func rwMutexDemo() { var mu sync.RWMutex var data = make(map[string]int) // 读操作 go func() { mu.RLock() _ = data["key"] mu.RUnlock() }() // 写操作 go func() { mu.Lock() data["key"] = 1 mu.Unlock() }() } ``` **深度解析**: - `RLock()` / `RUnlock()`:读锁,允许多个读者 - `Lock()` / `Unlock()`:写锁,排他 - **适用场景**:读多写少 ### 5.4.4 Once(单次执行) ```go func onceDemo() { var once sync.Once setup := func() { fmt.Println("初始化一次") } for i := 0; i < 5; i++ { go func() { once.Do(setup) }() } time.Sleep(1 * time.Second) } ``` **深度解析**: - `Do(f)`:确保 `f` 只执行一次 - **适用场景**:单例模式、延迟初始化 ### 5.4.5 Cond(条件变量) ```go func condDemo() { var cond *sync.Cond list := []string{} cond = sync.NewCond(&sync.Mutex{}) // 消费者 go func() { cond.L.Lock() for len(list) == 0 { cond.Wait() // 等待通知 } item := list[0] list = list[1:] fmt.Println("消费:", item) cond.L.Unlock() }() // 生产者 time.Sleep(500 * time.Millisecond) cond.L.Lock() list = append(list, "item1") cond.Signal() // 通知一个 cond.L.Unlock() } ``` --- ## 5.5 原子操作(atomic 包) ```go func atomicDemo() { var count int64 for i := 0; i < 1000; i++ { go func() { atomic.AddInt64(&count, 1) }() } time.Sleep(1 * time.Second) fmt.Println("Count:", atomic.LoadInt64(&count)) } ``` **深度解析**: - 比 Mutex 更快,无锁操作 - 支持 `int32`, `int64`, `uint32`, `uint64`, `uintptr`, `unsafe.Pointer` - 常用操作:`Add`, `Load`, `Store`, `Swap`, `CompareAndSwap` --- ## 5.6 Context(上下文控制) ### 5.6.1 Context 基础 ```go func contextDemo() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() go func() { select { case <-time.After(3 * time.Second): fmt.Println("任务完成") case <-ctx.Done(): fmt.Println("任务取消:", ctx.Err()) } }() time.Sleep(3 * time.Second) } ``` **深度解析**: - `WithCancel`:手动取消 - `WithTimeout`:超时自动取消 - `WithDeadline`:指定时间取消 - `WithValue`:传递键值对(慎用) ### 5.6.2 上下文传递最佳实践 - **不要**将 Context 存入结构体 - **不要**传递 `nil` Context,使用 `context.Background()` - **不要**用 Context 传递业务数据,仅用于取消/超时/追踪 --- ## 5.7 竞态检测(Race Detector) ```bash go run -race main.go go test -race ./... ``` **深度解析**: - 检测数据竞争(多个 Goroutine 同时访问同一变量,至少一个是写) - 性能开销约 2-5 倍,仅用于测试 - 发现竞态后,使用 Mutex、Channel 或 atomic 修复 --- ## 5.8 并发设计模式 ### 5.8.1 管道(Pipeline) ```go func pipelineDemo() { // 阶段 1:生成 gen := func(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out } // 阶段 2:平方 sq := func(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out } // 执行 c := gen(1, 2, 3) c = sq(c) for n := range c { fmt.Println(n) // 1, 4, 9 } } ``` ### 5.8.2 重试与退避 ```go func retryWithBackoff() { var attempts int maxAttempts := 5 for { attempts++ if attempts > maxAttempts { fmt.Println("重试失败") return } if attempt() { fmt.Println("成功") return } time.Sleep(time.Duration(attempts*100) * time.Millisecond) } } func attempt() bool { // 模拟失败 return false } ``` ### 5.8.3 限流(Rate Limiting) ```go func rateLimiter() { limiter := time.NewTicker(100 * time.Millisecond) defer limiter.Stop() for i := 0; i < 10; i++ { <-limiter.C fmt.Println("处理任务", i) } } ``` --- ## 5.9 深度实践:综合案例 ### 5.9.1 并发爬虫 ```go type Crawler struct { visited map[string]bool mu sync.Mutex wg sync.WaitGroup limit chan struct{} } func (c *Crawler) crawl(url string, depth int) { if depth <= 0 { return } c.limit <- struct{}{} defer func() { <-c.limit }() defer c.wg.Done() c.mu.Lock() if c.visited[url] { c.mu.Unlock() return } c.visited[url] = true c.mu.Unlock() fmt.Println("抓取:", url) // 模拟获取子链接 links := []string{url + "/1", url + "/2"} for _, link := range links { c.wg.Add(1) go c.crawl(link, depth-1) } } func concurrentCrawlerDemo() { crawler := &Crawler{ visited: make(map[string]bool), limit: make(chan struct{}, 10), // 限制并发数 } crawler.wg.Add(1) go crawler.crawl("http://example.com", 3) crawler.wg.Wait() fmt.Println("爬虫完成") } ``` ### 5.9.2 并发地图聚合 ```go func concurrentMapAggregation() { data := make(map[string]int) var mu sync.Mutex var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) go func(id int) { defer wg.Done() key := fmt.Sprintf("key%d", id%10) mu.Lock() data[key]++ mu.Unlock() }(i) } wg.Wait() fmt.Println(data) } ``` --- ## 5.10 常见陷阱与最佳实践 ### 5.10.1 Goroutine 泄漏 ```go // 错误:Channel 未关闭导致泄漏 func leak() { ch := make(chan int) go func() { ch <- 1 // 未关闭,接收方阻塞 }() <-ch } // 正确 func noLeak() { ch := make(chan int) go func() { defer close(ch) ch <- 1 }() <-ch } ``` ### 5.10.2 死锁 ```go // 错误:嵌套锁 func deadlock() { var mu1, mu2 sync.Mutex go func() { mu1.Lock() mu2.Lock() // ... mu2.Unlock() mu1.Unlock() }() go func() { mu2.Lock() // 等待 mu2 mu1.Lock() // 等待 mu1(死锁) // ... mu2.Unlock() mu1.Unlock() }() } ``` ### 5.10.3 最佳实践 1. **优先使用 Channel**:避免共享内存 2. **限制并发数**:使用信号量或 Worker Pool 3. **及时关闭 Channel**:避免泄漏 4. **使用 Context**:传递取消信号 5. **运行 Race Detector**:测试时开启 6. **避免 Goroutine 泄漏**:确保所有 Goroutine 能退出 7. **锁的粒度**:尽量缩小锁的范围 --- ## 5.11 课后练习 1. **并发计数器**:使用 Mutex 和 atomic 实现并发安全的计数器 2. **并发爬虫**:实现一个带并发限制的网页爬虫 3. **管道处理**:实现一个多阶段的数据处理管道 4. **超时控制**:实现一个带超时的任务执行器 5. **竞态检测**:编写一个有竞态的程序,用 `-race` 检测并修复 ## 5.12 下一步 完成本章后,你将进入第六章:**实战项目**,构建一个完整的 Web API 服务,综合运用前面所有知识(数据结构、函数、接口、并发)。 --- **代码仓库位置**:https://giter.top/openclaw/test/tree/main/chapters/chapter-5 **下一章预告**:HTTP 服务器、路由、中间件、数据库连接池、RESTful API 设计、部署