golang,go,博客,开源,编程

lancet concurrency Tee

Published on with 0 views and 0 comments
// Tee split one chanel into two channels, until cancel the context.
// Play: https://go.dev/play/p/3TQPKnCirrP
func (c *Channel[T]) Tee(ctx context.Context, in <-chan T) (<-chan T, <-chan T) {
	out1 := make(chan T)
	out2 := make(chan T)

	go func() {
		defer close(out1)
		defer close(out2)

		for val := range c.OrDone(ctx, in) {
			var out1, out2 = out1, out2
			for i := 0; i < 2; i++ {
				select {
				case <-ctx.Done():
				case out1 <- val:
					out1 = nil
				case out2 <- val:
					out2 = nil
				}
			}
		}
	}()

	return out1, out2
}

在你提供的 Tee 函数中,它实现了将一个通道(in)中的数据分流到两个不同的通道(out1out2)的功能,直到上下文(context)取消或者 in 通道关闭。

这个函数的目标是在 Go 并发编程中实现一个“分流”操作,将输入通道中的数据同时发送到两个输出通道,而每个输出通道会接收到相同的数据。下面是对该函数的详细解释和分析。

Tee 函数解释

函数签名

func (c *Channel[T]) Tee(ctx context.Context, in <-chan T) (<-chan T, <-chan T)
  • ctx context.Context: 用于控制 goroutine 的生命周期。如果 ctx 被取消,Tee 函数会停止向输出通道发送数据。
  • in <-chan T: 输入通道,用于从中接收数据。
  • 返回值是两个只读通道 <-chan T,这两个通道会接收到相同的数据。

函数逻辑

  1. 创建两个输出通道out1out2,用于分流数据。

    out1 := make(chan T)
    out2 := make(chan T)
    
  2. 启动一个 goroutine 来处理数据流:

    • 该 goroutine 会从输入通道 in 接收数据,并将每个接收到的数据同时发送到两个输出通道 out1out2
    go func() {
        defer close(out1)
        defer close(out2)
        // 向 out1 和 out2 中发送数据
        for val := range c.OrDone(ctx, in) {
            var out1, out2 = out1, out2
            for i := 0; i < 2; i++ {
                select {
                case <-ctx.Done(): // 如果上下文取消,停止发送数据
                case out1 <- val:  // 将数据发送到 out1
                    out1 = nil      // 发送后将 out1 设置为 nil,确保数据只会被发送一次
                case out2 <- val:  // 将数据发送到 out2
                    out2 = nil      // 同理,将 out2 设置为 nil
                }
            }
        }
    }()
    
    • defer close(out1)defer close(out2):保证在 goroutine 执行完毕时,两个通道会被关闭。
    • for val := range c.OrDone(ctx, in):从输入通道 in 中接收数据,直到 ctx 被取消或 in 关闭。
    • select 语句:同时向两个输出通道发送数据,直到其中一个通道接收到数据。如果一个通道接收成功,则将该通道的变量设为 nil,确保每个数据只会被发送一次。
  3. 返回两个输出通道out1out2,这两个通道会同时接收到相同的数据。

    return out1, out2
    

OrDone 方法的作用

你提到的 c.OrDone(ctx, in) 是一个在上下文取消时能够正常退出的通道包装。假设它的作用是包装 in 通道,确保在 ctx 被取消时,Tee 函数能安全退出。这是通过监听 ctx.Done() 来确保及时响应取消信号的。

// 这里假设的 OrDone 方法示意:
func (c *Channel[T]) OrDone(ctx context.Context, in <-chan T) <-chan T {
    out := make(chan T)

    go func() {
        select {
        case <-ctx.Done():
            close(out)
        case val := <-in:
            out <- val
        }
    }()

    return out
}

总结

  • Tee 函数的作用是将输入通道中的数据同时分发到两个输出通道 out1out2,这两个通道会接收到相同的数据。
  • 在数据分发的过程中,如果上下文 ctx 被取消,Tee 函数会停止发送数据并关闭输出通道。
  • select 语句确保数据能被发送到两个通道中的一个,并且避免重复发送。
  • OrDone 方法确保上下文取消时能够及时退出 goroutine。

这种模式常用于需要将一个数据流分发到多个处理阶段的场景中,在 Go 的并发编程中非常有用。


标题:lancet concurrency Tee
作者:mooncakeee
地址:http://blog.dd95828.com/articles/2025/01/09/1736408226591.html
联系:scotttu@163.com