golang,go,博客,开源,编程
// FanIn merge multiple channels into one channel.
// Play: https://go.dev/play/p/2VYFMexEvTm
func (c *Channel[T]) FanIn(ctx context.Context, channels ...<-chan T) <-chan T {
out := make(chan T)
go func() {
var wg sync.WaitGroup
wg.Add(len(channels))
for _, c := range channels {
go func(c <-chan T) {
defer wg.Done()
for v := range c {
select {
case <-ctx.Done():
return
case out <- v:
}
}
}(c)
}
wg.Wait()
close(out)
}()
return out
}
你提供的 FanIn
函数用于将多个输入通道(channels
)合并成一个输出通道(out
)。这个函数能够从多个通道中接收数据,并将数据转发到单一的输出通道,直到所有输入通道都关闭或者上下文被取消。
让我们逐步分析这个函数的工作原理。
FanIn
函数解析func (c *Channel[T]) FanIn(ctx context.Context, channels ...<-chan T) <-chan T
ctx context.Context
: 用于取消操作的上下文,帮助管理多个 goroutine 的生命周期。channels ...<-chan T
: 可变参数,接受多个输入通道,所有的通道都会被合并到一个通道中。<-chan T
),这是所有输入通道数据的合并结果。创建输出通道:
out := make(chan T)
创建一个新的通道 out
,用于存放从多个输入通道接收到的数据。
启动一个 goroutine 来执行合并操作:
go func() {
var wg sync.WaitGroup
wg.Add(len(channels)) // 设置 WaitGroup 的计数器为输入通道的数量
该 goroutine 会启动多个子 goroutine,每个子 goroutine 用于从一个输入通道中接收数据并将数据发送到输出通道。sync.WaitGroup
被用来等待所有 goroutine 完成操作。
遍历所有输入通道并启动 goroutine:
for _, c := range channels {
go func(c <-chan T) {
defer wg.Done()
for v := range c {
select {
case <-ctx.Done(): // 如果上下文被取消,则退出
return
case out <- v: // 向 out 发送数据
}
}
}(c)
}
c
启动一个新的 goroutine。for
循环遍历通道 c
中的数据,直到通道关闭。select
语句:
case <-ctx.Done()
: 如果上下文被取消,退出当前的 goroutine。case out <- v
: 如果接收到数据,将数据发送到输出通道 out
。等待所有 goroutine 完成:
wg.Wait() // 等待所有 goroutine 完成
close(out) // 所有通道的数据都被合并到 out 之后,关闭 out 通道
wg.Wait()
阻塞,直到所有的 goroutine 完成任务。out
,以通知接收方没有更多数据可用。返回输出通道:
return out
返回合并后的输出通道 out
。
sync.WaitGroup
来等待所有 goroutine 完成数据处理,确保所有数据都被合并到输出通道之后才关闭该通道。select
语句确保同步select
语句来处理两种情况:
ctx.Done()
被触发,表示上下文已取消,goroutine 会立即退出。ctx
,我们能够在程序需要时取消所有 goroutine,保证在不需要继续处理时,所有的并发操作能够及时停止。out
通道会被关闭。这是通过 wg.Wait()
来确保所有的 goroutine 完成之后才关闭通道。close(out)
的作用是通知接收方没有更多的数据可以接收,从而避免接收方一直等待。FanIn
函数适用于将多个 goroutine 中的多个通道合并到一个通道中,常见的使用场景包括:
FanIn
可以将它们的输出合并到一个通道中,方便统一处理。FanIn
可以将这些结果汇总到一个通道中,进行后续处理。package main
import (
"context"
"fmt"
"sync"
"time"
)
type Channel[T any] struct{}
func (c *Channel[T]) FanIn(ctx context.Context, channels ...<-chan T) <-chan T {
out := make(chan T)
go func() {
var wg sync.WaitGroup
wg.Add(len(channels))
for _, c := range channels {
go func(c <-chan T) {
defer wg.Done()
for v := range c {
select {
case <-ctx.Done():
return
case out <- v:
}
}
}(c)
}
wg.Wait()
close(out)
}()
return out
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
// 模拟不同的 goroutine 向通道发送数据
go func() {
defer close(ch1)
for i := 0; i < 5; i++ {
ch1 <- i
time.Sleep(time.Millisecond * 100)
}
}()
go func() {
defer close(ch2)
for i := 5; i < 10; i++ {
ch2 <- i
time.Sleep(time.Millisecond * 150)
}
}()
go func() {
defer close(ch3)
for i := 10; i < 15; i++ {
ch3 <- i
time.Sleep(time.Millisecond * 50)
}
}()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
c := &Channel[int]{}
out := c.FanIn(ctx, ch1, ch2, ch3)
for v := range out {
fmt.Println(v)
}
}
0
5
10
1
6
11
2
7
12
3
8
13
4
9
14
在这个例子中,FanIn
函数将三个不同的通道 ch1
、ch2
和 ch3
合并到一个通道 out
。由于这些通道的数据生成速度不同,所以合并后数据的顺序会按各个 goroutine 的执行顺序来。
FanIn
是一个非常有用的模式,它可以将多个输入通道的数据合并成一个输出通道。sync.WaitGroup
来确保所有通道都完成数据传输之后才关闭输出通道。ctx
用来控制 goroutine 的生命周期,可以在需要时取消所有并发操作,避免资源泄露。