golang,go,博客,开源,编程

lancet concurrency Bridge

Published on with 0 views and 0 comments
// Bridge link multiply channels into one channel.
// Play: https://go.dev/play/p/qmWSy1NVF-Y
func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-chan T {
	valStream := make(chan T)

	go func() {
		defer close(valStream)

		for {
			var stream <-chan T
			select {
			case maybeStream, ok := <-chanStream:
				if !ok {
					return
				}
				stream = maybeStream
			case <-ctx.Done():
				return
			}

			for val := range c.OrDone(ctx, stream) {
				select {
				case valStream <- val:
				case <-ctx.Done():
				}
			}
		}
	}()

	return valStream
}

注意

for val := range c.OrDone(ctx, stream) {
				select {
				case valStream <- val:
				case <-ctx.Done():
				}
			}

这里是可能阻塞。

你提供的 Bridge 函数是一个更为复杂的通道处理函数,它的作用是将多个 通道流chanStream)合并成一个单一的输出通道(valStream)。这个模式有点类似于管道流(stream)的合并,即每次 chanStream 传递的是一个新的通道,而 Bridge 函数会将这些通道中的数据统一转发到 valStream 通道。

Bridge 函数解析

函数签名

func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-chan T
  • ctx context.Context: 用于控制上下文,支持取消或超时的机制,保证在不需要继续执行时能够及时退出。
  • chanStream <-chan <-chan T: 输入是一个只读通道,其元素是 通道,即 chan <-chan T 类型的通道。每个元素都是一个新的通道,数据会从这些新通道中读取并传递到输出通道。
  • 返回值:一个只读通道(<-chan T),用于接收从多个通道流中获取的所有数据。

函数逻辑

  1. 创建输出通道

    valStream := make(chan T)
    

    这里创建一个新的通道 valStream,用于将从多个输入通道流中接收到的数据统一输出。

  2. 启动 goroutine

    go func() {
        defer close(valStream)
    

    启动一个 goroutine 来异步处理输入的多个通道流(chanStream),并将其中的数据传递到输出通道 valStream

  3. 循环处理通道流

    for {
        var stream <-chan T
        select {
        case maybeStream, ok := <-chanStream:
            if !ok {
                return
            }
            stream = maybeStream
        case <-ctx.Done():
            return
        }
    
    • 进入一个无限循环,每次从 chanStream 中接收一个通道 maybeStream(也就是 chan <-chan T 类型的元素)。
    • 如果 chanStream 关闭了(okfalse),则退出整个循环。
    • select 语句用于在以下两种情况下做出选择:
      • chanStream 获取一个新的通道:如果有新的通道传递给 chanStream,就赋值给 stream
      • 上下文被取消:如果上下文被取消(ctx.Done()),则退出循环,停止所有工作。
  4. 从当前通道流读取数据并转发到输出通道

    for val := range c.OrDone(ctx, stream) {
        select {
        case valStream <- val:
        case <-ctx.Done():
        }
    }
    
    • 一旦我们得到了 stream(即一个新的通道),就开始从这个通道读取数据。c.OrDone(ctx, stream) 确保如果上下文被取消时能够安全退出。
    • 对于每个从 stream 接收到的数据 val,会尝试将其发送到 valStream 输出通道。如果上下文取消,则不会继续发送。
  5. 关闭输出通道

    defer close(valStream)
    
    • go 函数结束时,确保 valStream 被关闭。
  6. 返回输出通道

    return valStream
    

    最后返回合并后的输出通道 valStream,它会将所有从多个输入通道中读取到的数据按顺序传递出去。

关键点分析

  1. 动态通道流处理chanStream 本身是一个通道,每次传递一个新的通道。当一个新的通道流传递到 chanStream 时,Bridge 会从该通道读取数据并将数据转发到输出通道 valStream。这个模式是处理动态生成的多个通道流的典型场景。
  2. OrDone 方法确保取消机制c.OrDone(ctx, stream) 确保如果上下文 ctx 被取消,Bridge 函数会停止从当前的 stream 读取数据。这种方式避免了泄露或过多的资源消耗。
  3. select 与多个通道流select 在这里有两个重要功能:
    • 监听从 chanStream 中接收一个新的通道流。
    • 监听 ctx.Done() 来响应上下文取消。
  4. 数据流的合并:所有的输入通道流(每个都是一个通道)会被合并到一个输出通道 valStream 中。它允许动态接入多个并发通道流并将数据统一处理。

使用场景

Bridge 模式常用于以下场景:

  1. 动态生成的通道流:当你的应用需要处理多个动态生成的通道流时,可以使用 Bridge 来统一收集所有通道的数据。比如,多个并发任务的结果流合并。
  2. 多级数据流处理:如果每个通道代表一个不同的处理阶段,而你需要将多个阶段的结果合并到一个通道中,Bridge 非常适合这种需求。
  3. 分布式系统中的合并:在分布式系统中,可能有多个服务异步地生成数据流,Bridge 可以将这些不同的流合并到一个通道进行统一处理。

示例代码

下面是一个使用 Bridge 函数的示例:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

type Channel[T any] struct{}

func (c *Channel[T]) OrDone(ctx context.Context, ch <-chan T) <-chan T {
	out := make(chan T)
	go func() {
		defer close(out)
		for v := range ch {
			select {
			case <-ctx.Done():
				return
			case out <- v:
			}
		}
	}()
	return out
}

func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-chan T {
	valStream := make(chan T)

	go func() {
		defer close(valStream)

		for {
			var stream <-chan T
			select {
			case maybeStream, ok := <-chanStream:
				if !ok {
					return
				}
				stream = maybeStream
			case <-ctx.Done():
				return
			}

			for val := range c.OrDone(ctx, stream) {
				select {
				case valStream <- val:
				case <-ctx.Done():
				}
			}
		}
	}()

	return valStream
}

func main() {
	ch1 := make(chan int)
	ch2 := make(chan int)
	ch3 := make(chan int)
	chanStream := make(chan <-chan int)

	// Simulating multiple sources of data
	go func() {
		defer close(ch1)
		for i := 0; i < 5; i++ {
			ch1 <- i
			time.Sleep(time.Millisecond * 100)
		}
	}()

	go func() {
		defer close(ch2)
		for i := 5; i < 10; i++ {
			ch2 <- i
			time.Sleep(time.Millisecond * 150)
		}
	}()

	go func() {
		defer close(ch3)
		for i := 10; i < 15; i++ {
			ch3 <- i
			time.Sleep(time.Millisecond * 50)
		}
	}()

	// Sending the channels to the chanStream channel
	go func() {
		chanStream <- ch1
		chanStream <- ch2
		chanStream <- ch3
		close(chanStream)
	}()

	// Using Bridge to merge all the channels into one
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
	defer cancel()

	c := &Channel[int]{}
	valStream := c.Bridge(ctx, chanStream)

	// Receiving merged values from valStream
	for val := range valStream {
		fmt.Println(val)
	}
}

结果

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14

总结

  • Bridge 函数用于将多个通道流(每个元素是一个通道)合并成一个单一的输出通道。
  • 通过 select 和上下文管理,确保在通道流关闭或上下文取消时能安全退出。
  • 适用于动态接入多个通道流并统一处理的场景,比如任务并发结果的汇总、多级数据流处理等。

标题:lancet concurrency Bridge
作者:mooncakeee
地址:http://blog.dd95828.com/articles/2025/01/09/1736409821585.html
联系:scotttu@163.com