golang,go,博客,开源,编程

lancet concurrency FanIn

Updated on with 0 views and 0 comments
// FanIn merge multiple channels into one channel.
// Play: https://go.dev/play/p/2VYFMexEvTm
func (c *Channel[T]) FanIn(ctx context.Context, channels ...<-chan T) <-chan T {
	out := make(chan T)

	go func() {
		var wg sync.WaitGroup
		wg.Add(len(channels))

		for _, c := range channels {
			go func(c <-chan T) {
				defer wg.Done()
				for v := range c {
					select {
					case <-ctx.Done():
						return
					case out <- v:
					}
				}
			}(c)
		}
		wg.Wait()
		close(out)
	}()

	return out
}

你提供的 FanIn 函数用于将多个输入通道(channels)合并成一个输出通道(out)。这个函数能够从多个通道中接收数据,并将数据转发到单一的输出通道,直到所有输入通道都关闭或者上下文被取消。

让我们逐步分析这个函数的工作原理。

FanIn 函数解析

函数签名

func (c *Channel[T]) FanIn(ctx context.Context, channels ...<-chan T) <-chan T
  • ctx context.Context: 用于取消操作的上下文,帮助管理多个 goroutine 的生命周期。
  • channels ...<-chan T: 可变参数,接受多个输入通道,所有的通道都会被合并到一个通道中。
  • 返回值:一个只读通道(<-chan T),这是所有输入通道数据的合并结果。

函数逻辑

  1. 创建输出通道

    out := make(chan T)
    

    创建一个新的通道 out,用于存放从多个输入通道接收到的数据。

  2. 启动一个 goroutine 来执行合并操作

    go func() {
        var wg sync.WaitGroup
        wg.Add(len(channels)) // 设置 WaitGroup 的计数器为输入通道的数量
    

    该 goroutine 会启动多个子 goroutine,每个子 goroutine 用于从一个输入通道中接收数据并将数据发送到输出通道。sync.WaitGroup 被用来等待所有 goroutine 完成操作。

  3. 遍历所有输入通道并启动 goroutine

    for _, c := range channels {
        go func(c <-chan T) {
            defer wg.Done()
            for v := range c {
                select {
                case <-ctx.Done(): // 如果上下文被取消,则退出
                    return
                case out <- v: // 向 out 发送数据
                }
            }
        }(c)
    }
    
    • 对每个输入通道 c 启动一个新的 goroutine。
    • 在 goroutine 内部,使用 for 循环遍历通道 c 中的数据,直到通道关闭。
    • 使用 select 语句:
      • case <-ctx.Done(): 如果上下文被取消,退出当前的 goroutine。
      • case out <- v: 如果接收到数据,将数据发送到输出通道 out
  4. 等待所有 goroutine 完成

    wg.Wait() // 等待所有 goroutine 完成
    close(out) // 所有通道的数据都被合并到 out 之后,关闭 out 通道
    
    • wg.Wait() 阻塞,直到所有的 goroutine 完成任务。
    • 一旦所有 goroutine 完成,关闭输出通道 out,以通知接收方没有更多数据可用。
  5. 返回输出通道

    return out
    

    返回合并后的输出通道 out

关键点分析

1. 多个 goroutine 并发读取多个通道

  • 每个输入通道都由独立的 goroutine 处理,多个 goroutine 并发地从多个通道中读取数据并将其合并到输出通道。
  • 使用 sync.WaitGroup 来等待所有 goroutine 完成数据处理,确保所有数据都被合并到输出通道之后才关闭该通道。

2. select 语句确保同步

  • 在每个 goroutine 内部,使用 select 语句来处理两种情况:
    • 如果 ctx.Done() 被触发,表示上下文已取消,goroutine 会立即退出。
    • 如果没有上下文取消信号,数据会从输入通道发送到输出通道。

3. 上下文控制并发任务

  • 通过传递 ctx,我们能够在程序需要时取消所有 goroutine,保证在不需要继续处理时,所有的并发操作能够及时停止。

4. 关闭输出通道

  • 一旦所有输入通道的数据都被合并,out 通道会被关闭。这是通过 wg.Wait() 来确保所有的 goroutine 完成之后才关闭通道。
  • close(out) 的作用是通知接收方没有更多的数据可以接收,从而避免接收方一直等待。

使用场景

FanIn 函数适用于将多个 goroutine 中的多个通道合并到一个通道中,常见的使用场景包括:

  1. 多个生产者合并数据:如果有多个 goroutine 同时生产数据,FanIn 可以将它们的输出合并到一个通道中,方便统一处理。
  2. 任务处理与结果汇总:当多个任务并行处理时,每个任务通过一个通道返回结果,FanIn 可以将这些结果汇总到一个通道中,进行后续处理。

示例代码

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Channel[T any] struct{}

func (c *Channel[T]) FanIn(ctx context.Context, channels ...<-chan T) <-chan T {
    out := make(chan T)

    go func() {
        var wg sync.WaitGroup
        wg.Add(len(channels))

        for _, c := range channels {
            go func(c <-chan T) {
                defer wg.Done()
                for v := range c {
                    select {
                    case <-ctx.Done():
                        return
                    case out <- v:
                    }
                }
            }(c)
        }
        wg.Wait()
        close(out)
    }()

    return out
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    ch3 := make(chan int)

    // 模拟不同的 goroutine 向通道发送数据
    go func() {
        defer close(ch1)
        for i := 0; i < 5; i++ {
            ch1 <- i
            time.Sleep(time.Millisecond * 100)
        }
    }()

    go func() {
        defer close(ch2)
        for i := 5; i < 10; i++ {
            ch2 <- i
            time.Sleep(time.Millisecond * 150)
        }
    }()

    go func() {
        defer close(ch3)
        for i := 10; i < 15; i++ {
            ch3 <- i
            time.Sleep(time.Millisecond * 50)
        }
    }()

    ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
    defer cancel()

    c := &Channel[int]{}
    out := c.FanIn(ctx, ch1, ch2, ch3)

    for v := range out {
        fmt.Println(v)
    }
}

结果

0
5
10
1
6
11
2
7
12
3
8
13
4
9
14

在这个例子中,FanIn 函数将三个不同的通道 ch1ch2ch3 合并到一个通道 out。由于这些通道的数据生成速度不同,所以合并后数据的顺序会按各个 goroutine 的执行顺序来。

总结

  • FanIn 是一个非常有用的模式,它可以将多个输入通道的数据合并成一个输出通道。
  • 它使用 goroutine 来并发地处理多个输入通道,同时利用 sync.WaitGroup 来确保所有通道都完成数据传输之后才关闭输出通道。
  • 上下文 ctx 用来控制 goroutine 的生命周期,可以在需要时取消所有并发操作,避免资源泄露。

标题:lancet concurrency FanIn
作者:mooncakeee
地址:http://blog.dd95828.com/articles/2025/01/09/1736408501021.html
联系:scotttu@163.com