golang,go,博客,开源,编程
// Bridge link multiply channels into one channel.
// Play: https://go.dev/play/p/qmWSy1NVF-Y
func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-chan T {
valStream := make(chan T)
go func() {
defer close(valStream)
for {
var stream <-chan T
select {
case maybeStream, ok := <-chanStream:
if !ok {
return
}
stream = maybeStream
case <-ctx.Done():
return
}
for val := range c.OrDone(ctx, stream) {
select {
case valStream <- val:
case <-ctx.Done():
}
}
}
}()
return valStream
}
注意
for val := range c.OrDone(ctx, stream) {
select {
case valStream <- val:
case <-ctx.Done():
}
}
这里是可能阻塞。
你提供的 Bridge
函数是一个更为复杂的通道处理函数,它的作用是将多个 通道流(chanStream
)合并成一个单一的输出通道(valStream
)。这个模式有点类似于管道流(stream)的合并,即每次 chanStream
传递的是一个新的通道,而 Bridge
函数会将这些通道中的数据统一转发到 valStream
通道。
Bridge
函数解析func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-chan T
ctx context.Context
: 用于控制上下文,支持取消或超时的机制,保证在不需要继续执行时能够及时退出。chanStream <-chan <-chan T
: 输入是一个只读通道,其元素是 通道,即 chan <-chan T
类型的通道。每个元素都是一个新的通道,数据会从这些新通道中读取并传递到输出通道。<-chan T
),用于接收从多个通道流中获取的所有数据。创建输出通道:
valStream := make(chan T)
这里创建一个新的通道 valStream
,用于将从多个输入通道流中接收到的数据统一输出。
启动 goroutine:
go func() {
defer close(valStream)
启动一个 goroutine 来异步处理输入的多个通道流(chanStream
),并将其中的数据传递到输出通道 valStream
。
循环处理通道流:
for {
var stream <-chan T
select {
case maybeStream, ok := <-chanStream:
if !ok {
return
}
stream = maybeStream
case <-ctx.Done():
return
}
chanStream
中接收一个通道 maybeStream
(也就是 chan <-chan T
类型的元素)。chanStream
关闭了(ok
为 false
),则退出整个循环。select
语句用于在以下两种情况下做出选择:
chanStream
获取一个新的通道:如果有新的通道传递给 chanStream
,就赋值给 stream
。ctx.Done()
),则退出循环,停止所有工作。从当前通道流读取数据并转发到输出通道:
for val := range c.OrDone(ctx, stream) {
select {
case valStream <- val:
case <-ctx.Done():
}
}
stream
(即一个新的通道),就开始从这个通道读取数据。c.OrDone(ctx, stream)
确保如果上下文被取消时能够安全退出。stream
接收到的数据 val
,会尝试将其发送到 valStream
输出通道。如果上下文取消,则不会继续发送。关闭输出通道:
defer close(valStream)
go
函数结束时,确保 valStream
被关闭。返回输出通道:
return valStream
最后返回合并后的输出通道 valStream
,它会将所有从多个输入通道中读取到的数据按顺序传递出去。
chanStream
本身是一个通道,每次传递一个新的通道。当一个新的通道流传递到 chanStream
时,Bridge
会从该通道读取数据并将数据转发到输出通道 valStream
。这个模式是处理动态生成的多个通道流的典型场景。OrDone
方法确保取消机制:c.OrDone(ctx, stream)
确保如果上下文 ctx
被取消,Bridge
函数会停止从当前的 stream
读取数据。这种方式避免了泄露或过多的资源消耗。select
与多个通道流:select
在这里有两个重要功能:
chanStream
中接收一个新的通道流。ctx.Done()
来响应上下文取消。valStream
中。它允许动态接入多个并发通道流并将数据统一处理。Bridge
模式常用于以下场景:
Bridge
来统一收集所有通道的数据。比如,多个并发任务的结果流合并。Bridge
非常适合这种需求。Bridge
可以将这些不同的流合并到一个通道进行统一处理。下面是一个使用 Bridge
函数的示例:
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Channel[T any] struct{}
func (c *Channel[T]) OrDone(ctx context.Context, ch <-chan T) <-chan T {
out := make(chan T)
go func() {
defer close(out)
for v := range ch {
select {
case <-ctx.Done():
return
case out <- v:
}
}
}()
return out
}
func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-chan T {
valStream := make(chan T)
go func() {
defer close(valStream)
for {
var stream <-chan T
select {
case maybeStream, ok := <-chanStream:
if !ok {
return
}
stream = maybeStream
case <-ctx.Done():
return
}
for val := range c.OrDone(ctx, stream) {
select {
case valStream <- val:
case <-ctx.Done():
}
}
}
}()
return valStream
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
chanStream := make(chan <-chan int)
// Simulating multiple sources of data
go func() {
defer close(ch1)
for i := 0; i < 5; i++ {
ch1 <- i
time.Sleep(time.Millisecond * 100)
}
}()
go func() {
defer close(ch2)
for i := 5; i < 10; i++ {
ch2 <- i
time.Sleep(time.Millisecond * 150)
}
}()
go func() {
defer close(ch3)
for i := 10; i < 15; i++ {
ch3 <- i
time.Sleep(time.Millisecond * 50)
}
}()
// Sending the channels to the chanStream channel
go func() {
chanStream <- ch1
chanStream <- ch2
chanStream <- ch3
close(chanStream)
}()
// Using Bridge to merge all the channels into one
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
c := &Channel[int]{}
valStream := c.Bridge(ctx, chanStream)
// Receiving merged values from valStream
for val := range valStream {
fmt.Println(val)
}
}
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Bridge
函数用于将多个通道流(每个元素是一个通道)合并成一个单一的输出通道。select
和上下文管理,确保在通道流关闭或上下文取消时能安全退出。