Files
test/chapters/chapter-5-concurrency.md

16 KiB
Raw Blame History

第五章:并发编程 —— Goroutine 与 Channel 的艺术

本章目标:深入理解 Go 并发的核心机制,掌握 Goroutine 的调度原理、Channel 的通信模式、同步原语的使用、并发设计模式、竞态检测与上下文控制,能够编写高效、安全、可扩展的并发程序。

5.1 并发基础与 Goroutine

5.1.1 并发 vs 并行

  • 并发Concurrency:多个任务在同一时间段内交替执行(宏观上同时,微观上交替)。
  • 并行Parallelism:多个任务在同一时刻同时执行(需要多核 CPU

Go 语言的设计哲学是并发优先,通过 Goroutine 和 Channel 让并发编程变得简单。

5.1.2 Goroutine 基础

package main

import (
    "fmt"
    "time"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        time.Sleep(100 * time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    go say("world")  // 启动 Goroutine
    say("hello")     // 主 Goroutine
    
    time.Sleep(1 * time.Second)  // 等待子 Goroutine 完成
}

深度解析

  • go 关键字启动一个轻量级线程Goroutine
  • Goroutine 由 Go 运行时Runtime 调度,而非操作系统
  • 主函数退出会导致所有 Goroutine 终止(需要等待机制)

5.1.3 Goroutine 的轻量性

func goroutineCount() {
    for i := 0; i < 10000; i++ {
        go func(id int) {
            // 轻量级任务
        }(i)
    }
    time.Sleep(1 * time.Second)
    fmt.Println("成功启动 10000 个 Goroutine")
}

深度解析

  • 初始栈大小仅 2KB(操作系统线程通常 1-8MB
  • 栈可动态增长(最大可达 GB 级别)
  • 创建开销极小,可轻松创建百万级 Goroutine
  • 上下文切换由 Go 运行时在用户态完成,无需陷入内核

5.2 GMP 调度模型 核心重点

Go 的并发性能得益于其独特的 GMP 调度模型

5.2.1 GMP 模型简介

  • G (Goroutine):协程,包含栈、指令指针、状态等
  • M (Machine):操作系统线程,真正执行代码的实体
  • P (Processor):逻辑处理器,管理 G 队列,调度 G 到 M 上执行
+----------+      +----------+      +----------+
|   G1     |      |   G2     |      |   G3     |
+----------+      +----------+      +----------+
      |                 |                 |
      v                 v                 v
+------------------------------------------------+
|                   P (逻辑处理器)                |
|  [G 队列] 调度器  (工作窃取)                    |
+------------------------------------------------+
      |                 |                 |
      v                 v                 v
+----------+      +----------+      +----------+
|   M1     |      |   M2     |      |   M3     |
+----------+      +----------+      +----------+

5.2.2 调度流程

  1. 启动:创建 G放入 P 的本地队列
  2. 调度M 获取 P从 P 的本地队列取出 G 执行
  3. 阻塞G 阻塞(如 IOM 与 P 分离P 继续调度其他 G
  4. 工作窃取P 的队列为空时,从其他 P 窃取 G
  5. 系统调用G 发起系统调用M 阻塞P 寻找新 M 继续调度

5.2.3 调试与监控

import "runtime"

func debugGoroutine() {
    runtime.GOMAXPROCS(4)  // 设置最大 P 数量(通常等于 CPU 核数)
    fmt.Printf("GOMAXPROCS: %d\n", runtime.GOMAXPROCS(0))
    fmt.Printf("NumCPU: %d\n", runtime.NumCPU())
    fmt.Printf("NumGoroutine: %d\n", runtime.NumGoroutine())
}

5.3 ChannelGoroutine 间的通信

Go 哲学:不要通过共享内存来通信,而要通过通信来共享内存。

5.3.1 Channel 的创建与基本操作

func channelBasic() {
    // 无缓冲 Channel同步
    ch := make(chan int)
    
    go func() {
        ch <- 42  // 发送
    }()
    
    val := <-ch  // 接收(阻塞直到有数据)
    fmt.Println(val)  // 42
    
    // 有缓冲 Channel异步
    bufferedCh := make(chan int, 2)
    bufferedCh <- 1
    bufferedCh <- 2
    // bufferedCh <- 3  // 阻塞(缓冲区满)
    
    fmt.Println(<-bufferedCh)  // 1
    fmt.Println(<-bufferedCh)  // 2
}

深度解析

  • 无缓冲 Channel:发送和接收必须同时就绪(同步通信)
  • 有缓冲 Channel:发送直到缓冲区满,接收直到缓冲区空
  • 方向chan<- T(只发送)、<-chan T(只接收)

5.3.2 Channel 的关闭与遍历

func channelClose() {
    ch := make(chan int)
    
    go func() {
        for i := 0; i < 5; i++ {
            ch <- i
        }
        close(ch)  // 关闭 Channel
    }()
    
    // 遍历 Channel
    for val := range ch {
        fmt.Println(val)
    }
    
    // 检查是否关闭
    val, ok := <-ch
    if !ok {
        fmt.Println("Channel 已关闭")
    }
}

深度解析

  • 只能由发送方关闭 Channel
  • 关闭后仍可接收剩余数据,接收值为零值
  • 重复关闭或向已关闭 Channel 发送会 panic
  • range 自动遍历直到 Channel 关闭

5.3.3 Channel 的常见模式

1. 工作池Worker Pool

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Printf("Worker %d 开始处理 %d\n", id, j)
        time.Sleep(time.Second)
        results <- j * 2
    }
}

func workerPoolDemo() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)
    
    // 启动 3 个 worker
    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }
    
    // 发送 5 个任务
    for j := 1; j <= 5; j++ {
        jobs <- j
    }
    close(jobs)
    
    // 接收结果
    for r := 1; r <= 5; r++ {
        <-results
    }
}

2. 扇出Fan-out与扇入Fan-in

func fanOutFanIn() {
    in := make(chan int)
    
    // 扇出:多个 worker 处理
    c1 := make(chan int)
    c2 := make(chan int)
    
    go func() {
        for n := range in {
            c1 <- n * 2
        }
        close(c1)
    }()
    
    go func() {
        for n := range in {
            c2 <- n * 3
        }
        close(c2)
    }()
    
    // 扇入:合并结果
    out := make(chan int)
    go func() {
        var count int
        for c := range []chan int{c1, c2} {
            for n := range c {
                out <- n
                count++
                if count == 2 { // 简化逻辑
                    break
                }
            }
        }
        close(out)
    }()
    
    // 发送
    go func() {
        in <- 1
        close(in)
    }()
    
    // 接收
    for n := range out {
        fmt.Println(n)
    }
}

3. 选择器Select

func selectDemo() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "来自 ch1"
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "来自 ch2"
    }()
    
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println(msg1)
        case msg2 := <-ch2:
            fmt.Println(msg2)
        case <-time.After(3 * time.Second):
            fmt.Println("超时")
            return
        default:
            fmt.Println("无数据,做其他事")
            time.Sleep(500 * time.Millisecond)
        }
    }
}

深度解析

  • select 阻塞直到某个 case 就绪
  • 多个 case 就绪时,随机选择一个
  • default 使 select 非阻塞
  • time.After 用于超时控制

5.4 同步原语sync 包)

5.4.1 WaitGroup

func waitGroupDemo() {
    var wg sync.WaitGroup
    
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Worker %d 开始\n", id)
            time.Sleep(time.Second)
            fmt.Printf("Worker %d 完成\n", id)
        }(i)
    }
    
    wg.Wait()  // 等待所有 Goroutine 完成
    fmt.Println("所有任务完成")
}

深度解析

  • Add(n):增加计数器
  • Done():减 1通常用 defer
  • Wait():阻塞直到计数器为 0
  • 注意Add 必须在 Goroutine 启动前调用,或 defer Done

5.4.2 Mutex互斥锁

func mutexDemo() {
    var mu sync.Mutex
    var count int
    
    for i := 0; i < 1000; i++ {
        go func() {
            mu.Lock()
            count++
            mu.Unlock()
        }()
    }
    
    time.Sleep(1 * time.Second)
    fmt.Println("Count:", count)  // 1000
}

深度解析

  • Lock():加锁,阻塞直到获得锁
  • Unlock():解锁
  • 死锁:避免嵌套锁、保持锁顺序
  • 性能:竞争激烈时性能下降,考虑使用 atomicRWMutex

5.4.3 RWMutex读写锁

func rwMutexDemo() {
    var mu sync.RWMutex
    var data = make(map[string]int)
    
    // 读操作
    go func() {
        mu.RLock()
        _ = data["key"]
        mu.RUnlock()
    }()
    
    // 写操作
    go func() {
        mu.Lock()
        data["key"] = 1
        mu.Unlock()
    }()
}

深度解析

  • RLock() / RUnlock():读锁,允许多个读者
  • Lock() / Unlock():写锁,排他
  • 适用场景:读多写少

5.4.4 Once单次执行

func onceDemo() {
    var once sync.Once
    setup := func() {
        fmt.Println("初始化一次")
    }
    
    for i := 0; i < 5; i++ {
        go func() {
            once.Do(setup)
        }()
    }
    
    time.Sleep(1 * time.Second)
}

深度解析

  • Do(f):确保 f 只执行一次
  • 适用场景:单例模式、延迟初始化

5.4.5 Cond条件变量

func condDemo() {
    var cond *sync.Cond
    list := []string{}
    
    cond = sync.NewCond(&sync.Mutex{})
    
    // 消费者
    go func() {
        cond.L.Lock()
        for len(list) == 0 {
            cond.Wait()  // 等待通知
        }
        item := list[0]
        list = list[1:]
        fmt.Println("消费:", item)
        cond.L.Unlock()
    }()
    
    // 生产者
    time.Sleep(500 * time.Millisecond)
    cond.L.Lock()
    list = append(list, "item1")
    cond.Signal()  // 通知一个
    cond.L.Unlock()
}

5.5 原子操作atomic 包)

func atomicDemo() {
    var count int64
    
    for i := 0; i < 1000; i++ {
        go func() {
            atomic.AddInt64(&count, 1)
        }()
    }
    
    time.Sleep(1 * time.Second)
    fmt.Println("Count:", atomic.LoadInt64(&count))
}

深度解析

  • 比 Mutex 更快,无锁操作
  • 支持 int32, int64, uint32, uint64, uintptr, unsafe.Pointer
  • 常用操作:Add, Load, Store, Swap, CompareAndSwap

5.6 Context上下文控制

5.6.1 Context 基础

func contextDemo() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()
    
    go func() {
        select {
        case <-time.After(3 * time.Second):
            fmt.Println("任务完成")
        case <-ctx.Done():
            fmt.Println("任务取消:", ctx.Err())
        }
    }()
    
    time.Sleep(3 * time.Second)
}

深度解析

  • WithCancel:手动取消
  • WithTimeout:超时自动取消
  • WithDeadline:指定时间取消
  • WithValue:传递键值对(慎用)

5.6.2 上下文传递最佳实践

  • 不要将 Context 存入结构体
  • 不要传递 nil Context使用 context.Background()
  • 不要用 Context 传递业务数据,仅用于取消/超时/追踪

5.7 竞态检测Race Detector

go run -race main.go
go test -race ./...

深度解析

  • 检测数据竞争(多个 Goroutine 同时访问同一变量,至少一个是写)
  • 性能开销约 2-5 倍,仅用于测试
  • 发现竞态后,使用 Mutex、Channel 或 atomic 修复

5.8 并发设计模式

5.8.1 管道Pipeline

func pipelineDemo() {
    // 阶段 1生成
    gen := func(nums ...int) <-chan int {
        out := make(chan int)
        go func() {
            for _, n := range nums {
                out <- n
            }
            close(out)
        }()
        return out
    }
    
    // 阶段 2平方
    sq := func(in <-chan int) <-chan int {
        out := make(chan int)
        go func() {
            for n := range in {
                out <- n * n
            }
            close(out)
        }()
        return out
    }
    
    // 执行
    c := gen(1, 2, 3)
    c = sq(c)
    
    for n := range c {
        fmt.Println(n)  // 1, 4, 9
    }
}

5.8.2 重试与退避

func retryWithBackoff() {
    var attempts int
    maxAttempts := 5
    
    for {
        attempts++
        if attempts > maxAttempts {
            fmt.Println("重试失败")
            return
        }
        
        if attempt() {
            fmt.Println("成功")
            return
        }
        
        time.Sleep(time.Duration(attempts*100) * time.Millisecond)
    }
}

func attempt() bool {
    // 模拟失败
    return false
}

5.8.3 限流Rate Limiting

func rateLimiter() {
    limiter := time.NewTicker(100 * time.Millisecond)
    defer limiter.Stop()
    
    for i := 0; i < 10; i++ {
        <-limiter.C
        fmt.Println("处理任务", i)
    }
}

5.9 深度实践:综合案例

5.9.1 并发爬虫

type Crawler struct {
    visited map[string]bool
    mu      sync.Mutex
    wg      sync.WaitGroup
    limit   chan struct{}
}

func (c *Crawler) crawl(url string, depth int) {
    if depth <= 0 {
        return
    }
    
    c.limit <- struct{}{}
    defer func() { <-c.limit }()
    defer c.wg.Done()
    
    c.mu.Lock()
    if c.visited[url] {
        c.mu.Unlock()
        return
    }
    c.visited[url] = true
    c.mu.Unlock()
    
    fmt.Println("抓取:", url)
    
    // 模拟获取子链接
    links := []string{url + "/1", url + "/2"}
    for _, link := range links {
        c.wg.Add(1)
        go c.crawl(link, depth-1)
    }
}

func concurrentCrawlerDemo() {
    crawler := &Crawler{
        visited: make(map[string]bool),
        limit:   make(chan struct{}, 10),  // 限制并发数
    }
    
    crawler.wg.Add(1)
    go crawler.crawl("http://example.com", 3)
    
    crawler.wg.Wait()
    fmt.Println("爬虫完成")
}

5.9.2 并发地图聚合

func concurrentMapAggregation() {
    data := make(map[string]int)
    var mu sync.Mutex
    
    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            key := fmt.Sprintf("key%d", id%10)
            
            mu.Lock()
            data[key]++
            mu.Unlock()
        }(i)
    }
    
    wg.Wait()
    fmt.Println(data)
}

5.10 常见陷阱与最佳实践

5.10.1 Goroutine 泄漏

// 错误Channel 未关闭导致泄漏
func leak() {
    ch := make(chan int)
    go func() {
        ch <- 1
        // 未关闭,接收方阻塞
    }()
    <-ch
}

// 正确
func noLeak() {
    ch := make(chan int)
    go func() {
        defer close(ch)
        ch <- 1
    }()
    <-ch
}

5.10.2 死锁

// 错误:嵌套锁
func deadlock() {
    var mu1, mu2 sync.Mutex
    go func() {
        mu1.Lock()
        mu2.Lock()
        // ...
        mu2.Unlock()
        mu1.Unlock()
    }()
    go func() {
        mu2.Lock()  // 等待 mu2
        mu1.Lock()  // 等待 mu1死锁
        // ...
        mu2.Unlock()
        mu1.Unlock()
    }()
}

5.10.3 最佳实践

  1. 优先使用 Channel:避免共享内存
  2. 限制并发数:使用信号量或 Worker Pool
  3. 及时关闭 Channel:避免泄漏
  4. 使用 Context:传递取消信号
  5. 运行 Race Detector:测试时开启
  6. 避免 Goroutine 泄漏:确保所有 Goroutine 能退出
  7. 锁的粒度:尽量缩小锁的范围

5.11 课后练习

  1. 并发计数器:使用 Mutex 和 atomic 实现并发安全的计数器
  2. 并发爬虫:实现一个带并发限制的网页爬虫
  3. 管道处理:实现一个多阶段的数据处理管道
  4. 超时控制:实现一个带超时的任务执行器
  5. 竞态检测:编写一个有竞态的程序,用 -race 检测并修复

5.12 下一步

完成本章后,你将进入第六章:实战项目,构建一个完整的 Web API 服务,综合运用前面所有知识(数据结构、函数、接口、并发)。


代码仓库位置https://giter.top/openclaw/test/tree/main/chapters/chapter-5

下一章预告HTTP 服务器、路由、中间件、数据库连接池、RESTful API 设计、部署