Add Chapter 5: Concurrency - Goroutines, Channels, Sync Primitives, and Patterns (Deep Dive)

This commit is contained in:
openclaw
2026-03-23 23:36:39 +00:00
parent aa46d006a8
commit f79acea8a8
3 changed files with 1140 additions and 0 deletions

View File

@@ -0,0 +1,766 @@
# 第五章:并发编程 —— Goroutine 与 Channel 的艺术
> **本章目标**:深入理解 Go 并发的核心机制,掌握 Goroutine 的调度原理、Channel 的通信模式、同步原语的使用、并发设计模式、竞态检测与上下文控制,能够编写高效、安全、可扩展的并发程序。
## 5.1 并发基础与 Goroutine
### 5.1.1 并发 vs 并行
- **并发Concurrency**:多个任务在**同一时间段**内交替执行(宏观上同时,微观上交替)。
- **并行Parallelism**:多个任务在**同一时刻**同时执行(需要多核 CPU
Go 语言的设计哲学是**并发优先**,通过 Goroutine 和 Channel 让并发编程变得简单。
### 5.1.2 Goroutine 基础
```go
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
go say("world") // 启动 Goroutine
say("hello") // 主 Goroutine
time.Sleep(1 * time.Second) // 等待子 Goroutine 完成
}
```
**深度解析**
- `go` 关键字启动一个**轻量级线程**Goroutine
- Goroutine 由 **Go 运行时Runtime** 调度,而非操作系统
- 主函数退出会导致所有 Goroutine 终止(需要等待机制)
### 5.1.3 Goroutine 的轻量性
```go
func goroutineCount() {
for i := 0; i < 10000; i++ {
go func(id int) {
// 轻量级任务
}(i)
}
time.Sleep(1 * time.Second)
fmt.Println("成功启动 10000 个 Goroutine")
}
```
**深度解析**
- 初始栈大小仅 **2KB**(操作系统线程通常 1-8MB
- 栈可动态增长(最大可达 GB 级别)
- 创建开销极小,可轻松创建**百万级** Goroutine
- 上下文切换由 Go 运行时在**用户态**完成,无需陷入内核
---
## 5.2 GMP 调度模型 ⭐ 核心重点
Go 的并发性能得益于其独特的 **GMP 调度模型**
### 5.2.1 GMP 模型简介
- **G (Goroutine)**:协程,包含栈、指令指针、状态等
- **M (Machine)**:操作系统线程,真正执行代码的实体
- **P (Processor)**:逻辑处理器,管理 G 队列,调度 G 到 M 上执行
```
+----------+ +----------+ +----------+
| G1 | | G2 | | G3 |
+----------+ +----------+ +----------+
| | |
v v v
+------------------------------------------------+
| P (逻辑处理器) |
| [G 队列] 调度器 (工作窃取) |
+------------------------------------------------+
| | |
v v v
+----------+ +----------+ +----------+
| M1 | | M2 | | M3 |
+----------+ +----------+ +----------+
```
### 5.2.2 调度流程
1. **启动**:创建 G放入 P 的本地队列
2. **调度**M 获取 P从 P 的本地队列取出 G 执行
3. **阻塞**G 阻塞(如 IOM 与 P 分离P 继续调度其他 G
4. **工作窃取**P 的队列为空时,从其他 P 窃取 G
5. **系统调用**G 发起系统调用M 阻塞P 寻找新 M 继续调度
### 5.2.3 调试与监控
```go
import "runtime"
func debugGoroutine() {
runtime.GOMAXPROCS(4) // 设置最大 P 数量(通常等于 CPU 核数)
fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
fmt.Printf("NumCPU: %d\n", runtime.NumCPU())
fmt.Printf("NumGoroutine: %d\n", runtime.NumGoroutine())
}
```
---
## 5.3 ChannelGoroutine 间的通信
> **Go 哲学**:不要通过共享内存来通信,而要通过通信来共享内存。
### 5.3.1 Channel 的创建与基本操作
```go
func channelBasic() {
// 无缓冲 Channel同步
ch := make(chan int)
go func() {
ch <- 42 // 发送
}()
val := <-ch // 接收(阻塞直到有数据)
fmt.Println(val) // 42
// 有缓冲 Channel异步
bufferedCh := make(chan int, 2)
bufferedCh <- 1
bufferedCh <- 2
// bufferedCh <- 3 // 阻塞(缓冲区满)
fmt.Println(<-bufferedCh) // 1
fmt.Println(<-bufferedCh) // 2
}
```
**深度解析**
- **无缓冲 Channel**:发送和接收必须**同时就绪**(同步通信)
- **有缓冲 Channel**:发送直到缓冲区满,接收直到缓冲区空
- **方向**`chan<- T`(只发送)、`<-chan T`(只接收)
### 5.3.2 Channel 的关闭与遍历
```go
func channelClose() {
ch := make(chan int)
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 关闭 Channel
}()
// 遍历 Channel
for val := range ch {
fmt.Println(val)
}
// 检查是否关闭
val, ok := <-ch
if !ok {
fmt.Println("Channel 已关闭")
}
}
```
**深度解析**
- 只能由**发送方**关闭 Channel
- 关闭后仍可接收剩余数据,接收值为零值
- 重复关闭或向已关闭 Channel 发送会 **panic**
- `range` 自动遍历直到 Channel 关闭
### 5.3.3 Channel 的常见模式
#### 1. 工作池Worker Pool
```go
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("Worker %d 开始处理 %d\n", id, j)
time.Sleep(time.Second)
results <- j * 2
}
}
func workerPoolDemo() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 启动 3 个 worker
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 发送 5 个任务
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 接收结果
for r := 1; r <= 5; r++ {
<-results
}
}
```
#### 2. 扇出Fan-out与扇入Fan-in
```go
func fanOutFanIn() {
in := make(chan int)
// 扇出:多个 worker 处理
c1 := make(chan int)
c2 := make(chan int)
go func() {
for n := range in {
c1 <- n * 2
}
close(c1)
}()
go func() {
for n := range in {
c2 <- n * 3
}
close(c2)
}()
// 扇入:合并结果
out := make(chan int)
go func() {
var count int
for c := range []chan int{c1, c2} {
for n := range c {
out <- n
count++
if count == 2 { // 简化逻辑
break
}
}
}
close(out)
}()
// 发送
go func() {
in <- 1
close(in)
}()
// 接收
for n := range out {
fmt.Println(n)
}
}
```
#### 3. 选择器Select
```go
func selectDemo() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "来自 ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "来自 ch2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
case <-time.After(3 * time.Second):
fmt.Println("超时")
return
default:
fmt.Println("无数据,做其他事")
time.Sleep(500 * time.Millisecond)
}
}
}
```
**深度解析**
- `select` 阻塞直到某个 case 就绪
- 多个 case 就绪时,**随机选择**一个
- `default` 使 select 非阻塞
- `time.After` 用于超时控制
---
## 5.4 同步原语sync 包)
### 5.4.1 WaitGroup
```go
func waitGroupDemo() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Worker %d 开始\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d 完成\n", id)
}(i)
}
wg.Wait() // 等待所有 Goroutine 完成
fmt.Println("所有任务完成")
}
```
**深度解析**
- `Add(n)`:增加计数器
- `Done()`:减 1通常用 `defer`
- `Wait()`:阻塞直到计数器为 0
- **注意**`Add` 必须在 Goroutine 启动前调用,或 `defer Done`
### 5.4.2 Mutex互斥锁
```go
func mutexDemo() {
var mu sync.Mutex
var count int
for i := 0; i < 1000; i++ {
go func() {
mu.Lock()
count++
mu.Unlock()
}()
}
time.Sleep(1 * time.Second)
fmt.Println("Count:", count) // 1000
}
```
**深度解析**
- `Lock()`:加锁,阻塞直到获得锁
- `Unlock()`:解锁
- **死锁**:避免嵌套锁、保持锁顺序
- **性能**:竞争激烈时性能下降,考虑使用 `atomic``RWMutex`
### 5.4.3 RWMutex读写锁
```go
func rwMutexDemo() {
var mu sync.RWMutex
var data = make(map[string]int)
// 读操作
go func() {
mu.RLock()
_ = data["key"]
mu.RUnlock()
}()
// 写操作
go func() {
mu.Lock()
data["key"] = 1
mu.Unlock()
}()
}
```
**深度解析**
- `RLock()` / `RUnlock()`:读锁,允许多个读者
- `Lock()` / `Unlock()`:写锁,排他
- **适用场景**:读多写少
### 5.4.4 Once单次执行
```go
func onceDemo() {
var once sync.Once
setup := func() {
fmt.Println("初始化一次")
}
for i := 0; i < 5; i++ {
go func() {
once.Do(setup)
}()
}
time.Sleep(1 * time.Second)
}
```
**深度解析**
- `Do(f)`:确保 `f` 只执行一次
- **适用场景**:单例模式、延迟初始化
### 5.4.5 Cond条件变量
```go
func condDemo() {
var cond *sync.Cond
list := []string{}
cond = sync.NewCond(&sync.Mutex{})
// 消费者
go func() {
cond.L.Lock()
for len(list) == 0 {
cond.Wait() // 等待通知
}
item := list[0]
list = list[1:]
fmt.Println("消费:", item)
cond.L.Unlock()
}()
// 生产者
time.Sleep(500 * time.Millisecond)
cond.L.Lock()
list = append(list, "item1")
cond.Signal() // 通知一个
cond.L.Unlock()
}
```
---
## 5.5 原子操作atomic 包)
```go
func atomicDemo() {
var count int64
for i := 0; i < 1000; i++ {
go func() {
atomic.AddInt64(&count, 1)
}()
}
time.Sleep(1 * time.Second)
fmt.Println("Count:", atomic.LoadInt64(&count))
}
```
**深度解析**
- 比 Mutex 更快,无锁操作
- 支持 `int32`, `int64`, `uint32`, `uint64`, `uintptr`, `unsafe.Pointer`
- 常用操作:`Add`, `Load`, `Store`, `Swap`, `CompareAndSwap`
---
## 5.6 Context上下文控制
### 5.6.1 Context 基础
```go
func contextDemo() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
go func() {
select {
case <-time.After(3 * time.Second):
fmt.Println("任务完成")
case <-ctx.Done():
fmt.Println("任务取消:", ctx.Err())
}
}()
time.Sleep(3 * time.Second)
}
```
**深度解析**
- `WithCancel`:手动取消
- `WithTimeout`:超时自动取消
- `WithDeadline`:指定时间取消
- `WithValue`:传递键值对(慎用)
### 5.6.2 上下文传递最佳实践
- **不要**将 Context 存入结构体
- **不要**传递 `nil` Context使用 `context.Background()`
- **不要**用 Context 传递业务数据,仅用于取消/超时/追踪
---
## 5.7 竞态检测Race Detector
```bash
go run -race main.go
go test -race ./...
```
**深度解析**
- 检测数据竞争(多个 Goroutine 同时访问同一变量,至少一个是写)
- 性能开销约 2-5 倍,仅用于测试
- 发现竞态后,使用 Mutex、Channel 或 atomic 修复
---
## 5.8 并发设计模式
### 5.8.1 管道Pipeline
```go
func pipelineDemo() {
// 阶段 1生成
gen := func(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 阶段 2平方
sq := func(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 执行
c := gen(1, 2, 3)
c = sq(c)
for n := range c {
fmt.Println(n) // 1, 4, 9
}
}
```
### 5.8.2 重试与退避
```go
func retryWithBackoff() {
var attempts int
maxAttempts := 5
for {
attempts++
if attempts > maxAttempts {
fmt.Println("重试失败")
return
}
if attempt() {
fmt.Println("成功")
return
}
time.Sleep(time.Duration(attempts*100) * time.Millisecond)
}
}
func attempt() bool {
// 模拟失败
return false
}
```
### 5.8.3 限流Rate Limiting
```go
func rateLimiter() {
limiter := time.NewTicker(100 * time.Millisecond)
defer limiter.Stop()
for i := 0; i < 10; i++ {
<-limiter.C
fmt.Println("处理任务", i)
}
}
```
---
## 5.9 深度实践:综合案例
### 5.9.1 并发爬虫
```go
type Crawler struct {
visited map[string]bool
mu sync.Mutex
wg sync.WaitGroup
limit chan struct{}
}
func (c *Crawler) crawl(url string, depth int) {
if depth <= 0 {
return
}
c.limit <- struct{}{}
defer func() { <-c.limit }()
defer c.wg.Done()
c.mu.Lock()
if c.visited[url] {
c.mu.Unlock()
return
}
c.visited[url] = true
c.mu.Unlock()
fmt.Println("抓取:", url)
// 模拟获取子链接
links := []string{url + "/1", url + "/2"}
for _, link := range links {
c.wg.Add(1)
go c.crawl(link, depth-1)
}
}
func concurrentCrawlerDemo() {
crawler := &Crawler{
visited: make(map[string]bool),
limit: make(chan struct{}, 10), // 限制并发数
}
crawler.wg.Add(1)
go crawler.crawl("http://example.com", 3)
crawler.wg.Wait()
fmt.Println("爬虫完成")
}
```
### 5.9.2 并发地图聚合
```go
func concurrentMapAggregation() {
data := make(map[string]int)
var mu sync.Mutex
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key%d", id%10)
mu.Lock()
data[key]++
mu.Unlock()
}(i)
}
wg.Wait()
fmt.Println(data)
}
```
---
## 5.10 常见陷阱与最佳实践
### 5.10.1 Goroutine 泄漏
```go
// 错误Channel 未关闭导致泄漏
func leak() {
ch := make(chan int)
go func() {
ch <- 1
// 未关闭,接收方阻塞
}()
<-ch
}
// 正确
func noLeak() {
ch := make(chan int)
go func() {
defer close(ch)
ch <- 1
}()
<-ch
}
```
### 5.10.2 死锁
```go
// 错误:嵌套锁
func deadlock() {
var mu1, mu2 sync.Mutex
go func() {
mu1.Lock()
mu2.Lock()
// ...
mu2.Unlock()
mu1.Unlock()
}()
go func() {
mu2.Lock() // 等待 mu2
mu1.Lock() // 等待 mu1死锁
// ...
mu2.Unlock()
mu1.Unlock()
}()
}
```
### 5.10.3 最佳实践
1. **优先使用 Channel**:避免共享内存
2. **限制并发数**:使用信号量或 Worker Pool
3. **及时关闭 Channel**:避免泄漏
4. **使用 Context**:传递取消信号
5. **运行 Race Detector**:测试时开启
6. **避免 Goroutine 泄漏**:确保所有 Goroutine 能退出
7. **锁的粒度**:尽量缩小锁的范围
---
## 5.11 课后练习
1. **并发计数器**:使用 Mutex 和 atomic 实现并发安全的计数器
2. **并发爬虫**:实现一个带并发限制的网页爬虫
3. **管道处理**:实现一个多阶段的数据处理管道
4. **超时控制**:实现一个带超时的任务执行器
5. **竞态检测**:编写一个有竞态的程序,用 `-race` 检测并修复
## 5.12 下一步
完成本章后,你将进入第六章:**实战项目**,构建一个完整的 Web API 服务,综合运用前面所有知识(数据结构、函数、接口、并发)。
---
**代码仓库位置**https://giter.top/openclaw/test/tree/main/chapters/chapter-5
**下一章预告**HTTP 服务器、路由、中间件、数据库连接池、RESTful API 设计、部署

View File

@@ -0,0 +1,3 @@
module go-tutorial
go 1.21

371
chapters/chapter-5/main.go Normal file
View File

@@ -0,0 +1,371 @@
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
// 5.1 Goroutine 基础
func say(s string) {
for i := 0; i < 3; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func goroutineBasic() {
fmt.Println("=== Goroutine 基础 ===")
go say("world")
say("hello")
time.Sleep(500 * time.Millisecond)
}
// 5.2 GMP 模型演示
func gmpDemo() {
fmt.Println("\n=== GMP 模型 ===")
fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
fmt.Printf("NumCPU: %d\n", runtime.NumCPU())
fmt.Printf("NumGoroutine: %d\n", runtime.NumGoroutine())
}
// 5.3 Channel 基础
func channelBasic() {
fmt.Println("\n=== Channel 基础 ===")
// 无缓冲 Channel
ch := make(chan int)
go func() {
ch <- 42
}()
val := <-ch
fmt.Println("无缓冲 Channel:", val)
// 有缓冲 Channel
bufferedCh := make(chan int, 2)
bufferedCh <- 1
bufferedCh <- 2
fmt.Println("有缓冲 Channel:", <-bufferedCh, <-bufferedCh)
}
// 5.3 Channel 关闭与遍历
func channelClose() {
fmt.Println("\n=== Channel 关闭与遍历 ===")
ch := make(chan int)
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
}()
for val := range ch {
fmt.Print(val, " ")
}
fmt.Println()
// 检查关闭
if _, ok := <-ch; !ok {
fmt.Println("Channel 已关闭")
}
}
// 5.3 工作池
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("Worker %d 处理 %d\n", id, j)
time.Sleep(100 * time.Millisecond)
results <- j * 2
}
}
func workerPoolDemo() {
fmt.Println("\n=== 工作池 ===")
jobs := make(chan int, 10)
results := make(chan int, 10)
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
for r := 1; r <= 5; r++ {
<-results
}
fmt.Println("工作池完成")
}
// 5.3 Select
func selectDemo() {
fmt.Println("\n=== Select ===")
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(500 * time.Millisecond)
ch1 <- "来自 ch1"
}()
go func() {
time.Sleep(300 * time.Millisecond)
ch2 <- "来自 ch2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
case <-time.After(1 * time.Second):
fmt.Println("超时")
return
}
}
}
// 5.4 WaitGroup
func waitGroupDemo() {
fmt.Println("\n=== WaitGroup ===")
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Worker %d 开始\n", id)
time.Sleep(200 * time.Millisecond)
fmt.Printf("Worker %d 完成\n", id)
}(i)
}
wg.Wait()
fmt.Println("所有任务完成")
}
// 5.4 Mutex
func mutexDemo() {
fmt.Println("\n=== Mutex ===")
var mu sync.Mutex
var count int
for i := 0; i < 1000; i++ {
go func() {
mu.Lock()
count++
mu.Unlock()
}()
}
time.Sleep(1 * time.Second)
fmt.Println("Count:", count)
}
// 5.4 RWMutex
func rwMutexDemo() {
fmt.Println("\n=== RWMutex ===")
var mu sync.RWMutex
var data = make(map[string]int)
var wg sync.WaitGroup
// 读操作
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.RLock()
_ = data["key"]
mu.RUnlock()
}()
}
// 写操作
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
data["key"]++
mu.Unlock()
}()
}
wg.Wait()
fmt.Println("Data:", data)
}
// 5.4 Once
func onceDemo() {
fmt.Println("\n=== Once ===")
var once sync.Once
setup := func() {
fmt.Println("初始化一次")
}
for i := 0; i < 5; i++ {
go func() {
once.Do(setup)
}()
}
time.Sleep(500 * time.Millisecond)
}
// 5.5 Atomic
func atomicDemo() {
fmt.Println("\n=== Atomic ===")
var count int64
for i := 0; i < 1000; i++ {
go func() {
atomic.AddInt64(&count, 1)
}()
}
time.Sleep(1 * time.Second)
fmt.Println("Atomic Count:", atomic.LoadInt64(&count))
}
// 5.6 Context
func contextDemo() {
fmt.Println("\n=== Context ===")
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
go func() {
select {
case <-time.After(1 * time.Second):
fmt.Println("任务完成")
case <-ctx.Done():
fmt.Println("任务取消:", ctx.Err())
}
}()
time.Sleep(1 * time.Second)
}
// 5.8 管道
func pipelineDemo() {
fmt.Println("\n=== 管道 ===")
// 生成
gen := func(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 平方
sq := func(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
c := gen(1, 2, 3)
c = sq(c)
for n := range c {
fmt.Println(n)
}
}
// 5.9 并发爬虫
type Crawler struct {
visited map[string]bool
mu sync.Mutex
wg sync.WaitGroup
limit chan struct{}
}
func (c *Crawler) crawl(url string, depth int) {
if depth <= 0 {
return
}
c.limit <- struct{}{}
defer func() { <-c.limit }()
defer c.wg.Done()
c.mu.Lock()
if c.visited[url] {
c.mu.Unlock()
return
}
c.visited[url] = true
c.mu.Unlock()
fmt.Println("抓取:", url)
// 模拟子链接
links := []string{url + "/1", url + "/2"}
for _, link := range links {
c.wg.Add(1)
go c.crawl(link, depth-1)
}
}
func crawlerDemo() {
fmt.Println("\n=== 并发爬虫 ===")
crawler := &Crawler{
visited: make(map[string]bool),
limit: make(chan struct{}, 5),
}
crawler.wg.Add(1)
go crawler.crawl("http://example.com", 2)
crawler.wg.Wait()
fmt.Println("爬虫完成")
}
// 5.10 竞态检测示例(需配合 -race 运行)
func raceDemo() {
fmt.Println("\n=== 竞态检测示例 ===")
var count int
for i := 0; i < 100; i++ {
go func() {
count++ // 竞态!
}()
}
time.Sleep(1 * time.Second)
fmt.Println("Count:", count)
}
func main() {
goroutineBasic()
gmpDemo()
channelBasic()
channelClose()
workerPoolDemo()
selectDemo()
waitGroupDemo()
mutexDemo()
rwMutexDemo()
onceDemo()
atomicDemo()
contextDemo()
pipelineDemo()
crawlerDemo()
raceDemo()
}