常见的并非模型

在并行编程中,处理循环迭代时常用的并发模型有几种:

  1. Worker Pool(工作池)

    • 描述:创建固定数量的工作 goroutine,这些 goroutine 从共享的任务队列中获取任务并执行。
    • 优点:控制并发量,避免过多 goroutine 导致资源耗尽。
    • 示例
      package main
      
      import (
          "fmt"
          "sync"
      )
      
      func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
          defer wg.Done()
          for job := range jobs {
              fmt.Printf("Worker %d processing job %d\n", id, job)
              results <- job * 2
          }
      }
      
      func main() {
          const numWorkers = 3
          jobs := make(chan int, 10)
          results := make(chan int, 10)
          var wg sync.WaitGroup
      
          // Start worker goroutines
          for i := 1; i <= numWorkers; i++ {
              wg.Add(1)
              go worker(i, jobs, results, &wg)
          }
      
          // Send jobs
          for j := 1; j <= 10; j++ {
              jobs <- j
          }
          close(jobs)
      
          // Wait for all workers to finish
          go func() {
              wg.Wait()
              close(results)
          }()
      
          // Collect results
          for result := range results {
              fmt.Println("Result:", result)
          }
      }
  2. Map/Reduce

    • 描述:将数据集合分成多个部分并行处理,然后将结果合并。
    • 优点:适合大规模数据处理任务。
    • 示例
      package main
      
      import (
          "fmt"
          "sync"
      )
      
      func mapWorker(data []int, results chan<- int, wg *sync.WaitGroup) {
          defer wg.Done()
          sum := 0
          for _, v := range data {
              sum += v
          }
          results <- sum
      }
      
      func main() {
          data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
          chunkSize := 2
          var wg sync.WaitGroup
          results := make(chan int, len(data)/chunkSize)
      
          // Divide data into chunks and process
          for i := 0; i < len(data); i += chunkSize {
              end := i + chunkSize
              if end > len(data) {
                  end = len(data)
              }
              wg.Add(1)
              go mapWorker(data[i:end], results, &wg)
          }
      
          // Wait for all map workers to finish
          go func() {
              wg.Wait()
              close(results)
          }()
      
          // Collect results
          total := 0
          for result := range results {
              total += result
          }
      
          fmt.Println("Total:", total)
      }
  3. Fan-Out/Fan-In

    • 描述:多个 goroutine 处理来自同一通道的数据(Fan-Out),然后将结果汇总到一个通道中(Fan-In)。
    • 优点:可以有效利用 CPU 核心,实现数据流的处理。
    • 示例
      package main
      
      import (
          "fmt"
          "sync"
      )
      
      func generateNumbers(n int, out chan<- int) {
          for i := 0; i < n; i++ {
              out <- i
          }
          close(out)
      }
      
      func squareNumbers(in <-chan int, out chan<- int, wg *sync.WaitGroup) {
          defer wg.Done()
          for num := range in {
              out <- num * num
          }
      }
      
      func main() {
          numbers := make(chan int, 10)
          squared := make(chan int, 10)
          var wg sync.WaitGroup
      
          // Generate numbers
          go generateNumbers(10, numbers)
      
          // Start worker goroutines
          for i := 0; i < 3; i++ {
              wg.Add(1)
              go squareNumbers(numbers, squared, &wg)
          }
      
          // Close squared channel once all workers are done
          go func() {
              wg.Wait()
              close(squared)
          }()
      
          // Print squared numbers
          for result := range squared {
              fmt.Println(result)
          }
      }

这些模型可以根据任务的性质和要求选择合适的实现,来优化并发处理的性能和效率。

   


经典案例:生成图像缩略图并计算这些缩略图的总大小

func main() {
    images := []string{
        "/Users/xxx/Pictures/avatar/1.jpeg",
        "/Users/xxx/Pictures/avatar/2.jpeg",
        "/Users/xxx/Pictures/avatar/3.jpeg",
    }
    
    ch := make(chan string)
    go func() {
        defer close(ch)
        for _, filename := range images {
            ch <- filename
            log.Println(filename + " sent .")
        }
    }()
    result := makeThumbnails6(ch)
    log.Printf("Total size of thumbnails: %d bytes\n", result)

}

func makeThumbnails6(filenames <-chan string) int64 {
    sizes := make(chan int64)
    var wg sync.WaitGroup
    for f := range filenames {

        log.Printf("%v receive .", f)
        wg.Add(1)
        go func(f string) {
            defer wg.Done()
            thumb, err := thumbnail.ImageFile(f)
            if err != nil {
                log.Println(err)
                return
            }
            info, _ := os.Stat(thumb)
            sizes <- info.Size()
        }(f)
    }
    go func() {
        wg.Wait()
        close(sizes)
    }()
    var total int64
    for size := range sizes {
        total += size
    }
    return total
}

 

这里面有几点需要注意的:
1: 闭包的变量: 这里的闭包函数把f 传递进去了,防止改变外部的f

 go func(f string) {
            defer wg.Done()
            thumb, err := thumbnail.ImageFile(f)
            if err != nil {
                log.Println(err)
                return
            }
            info, _ := os.Stat(thumb)
            sizes <- info.Size()
        }(f)

 

2:为什么使用阻塞的size channel?( 外部启动第2个goroutine的时候, makeThumbnails6里面的goroutine执行比较快就结束返回了,关闭了 size channel 怎么办

有一个点很好奇:在makeThumbnails6中如果两次for循环间隔太大,是否导致该函数提前退出。
比如说filenames chan每隔1分钟发送一个消息。但 for f := range filenames 中goroutine只需要半分钟就执行完,看起来会在第二次循环前导致wg.Wait()解除,sizes chan关闭,从而makeThumbnails6未等待filenames chan关闭就返回了)

   answer:不会;for f := range filenames 中启动的 goroutine 向 sizes 中发送消息会阻塞(无缓存channel),这样只有循环结束才会开始接收size信息(主goroutine),wg.done之后才会执行,

   即wg等待的goroutine数量只会一直增加到最大,然后开始减少,直到wg.wait.

   
   makeThumbnails6
函数中,sizes channel 用于接收处理后的文件大小。由于 sizes 是一个阻塞的 channel,发送操作(sizes <- info.Size())会阻塞直到有 goroutine 从 channel 中接收数据,

     这样可以避免在未处理完数据的情况下继续添加更多数据。

 

3: 这里为什么都是go func() { wg.Wait() close(squared) }() 不能直接在主进程中执行吗?

  go func() {
        wg.Wait()
        close(sizes)
    }()
    var total int64
    for size := range sizes {
        total += size
    }

 在并发编程中,使用
go func() { wg.Wait(); close(squared) }() 的主要目的是在主 goroutine 外部执行一些操作,以避免阻塞主进程。以下是具体原因:

  1. 避免阻塞主 goroutine

    • 在主 goroutine 中直接执行 wg.Wait(); close(squared) 会阻塞主进程,直到所有工作 goroutines 完成。如果这些工作 goroutines 处理的是耗时操作,主进程会因此停滞。
    • 使用 go func() 可以异步执行这个操作,让主 goroutine 继续执行其他操作,例如接收结果或处理其他逻辑,而不会被 wg.Wait() 阻塞。
  2. 确保关闭通道时所有 goroutines 已完成

    • wg.Wait() 确保在关闭通道之前,所有的工作 goroutines 都已完成任务。关闭通道操作需要在所有数据处理完成后执行,以避免数据丢失或读取错误。
    • 通过将关闭通道的操作放在一个新的 goroutine 中,确保在所有工作 goroutines 完成后才关闭通道,这样做可以避免主 goroutine 等待过程中的复杂性。
  3. 分析代码结构

    • 使用 go func() 可以将通道关闭的逻辑与主程序的其他逻辑分开,使代码结构更清晰和简洁。主 goroutine 可以专注于接收和处理结果,而关闭通道的逻辑在一个独立的 goroutine 中完成。
    • wg.Wait(): 这个调用会阻塞当前 goroutine,直到 WaitGroup 的计数器减到 0,表示所有添加到 WaitGroup 中的 goroutines 都已完成。(相当于一个常驻进场一直判断 WaitGroup 的计数器是否减到 0,到0 才会关闭channel)

    • channel关闭了, 下面的for range 循环才会退出.( for range channel是 Go 程序中的一个循环结构,它在当前的 goroutine 中运行 ,但是这个循环退出的条件要等channel关闭才会退出)

 

 

请登录后发表评论

    没有回复内容