golang,go,博客,开源,编程

lancet concurrency Or(channels ...<-chan T) <-chan T

Published on with 0 views and 0 comments
// Or read one or more channels into one channel, will close when any readin channel is closed.
// Play: https://go.dev/play/p/Wqz9rwioPww
func (c *Channel[T]) Or(channels ...<-chan T) <-chan T {
	switch len(channels) {
	case 0:
		return nil
	case 1:
		return channels[0]
	}

	orDone := make(chan T)

	go func() {
		defer close(orDone)

		switch len(channels) {
		case 2:
			select {
			case <-channels[0]:
			case <-channels[1]:
			}
		default:
			select {
			case <-channels[0]:
			case <-channels[1]:
			case <-channels[2]:
			case <-c.Or(append(channels[3:], orDone)...):
			}
		}
	}()

	return orDone
}

你提供的 Go 代码定义了一个 Or 方法,它是 Channel[T] 类型的方法,目的是从多个通道中选择第一个接收到数据的通道,并返回这个数据。这个方法的实现涉及了对多个通道的并发操作和选择(select)机制。

代码分析

首先,让我们分析这段代码的结构和实现逻辑。

方法签名

func (c *Channel[T]) Or(channels ...<-chan T) <-chan T {
  • 这是 Channel[T] 类型的方法,其中 T 是通道中的数据类型。
  • 方法接受一个变长参数 channels,这些参数是多个只读通道(<-chan T),并返回一个只读通道 orDone,这个通道将接收从多个通道中选取的第一个接收到的数据。

第一个 switch 语句

switch len(channels) {
case 0:
    return nil
case 1:
    return channels[0]
}
  • 如果传入的通道数量为 0,返回 nil,表示没有通道可用。
  • 如果只有一个通道传入,直接返回这个通道。

创建 orDone 通道

orDone := make(chan T)
  • 创建一个 orDone 通道,它将用于返回第一个接收到数据的通道的数据。

启动一个 goroutine

go func() {
	defer close(orDone)
  • 启动一个 goroutine 来处理并发的通道选择逻辑,并确保在所有操作完成后关闭 orDone 通道。

select 语句

接下来的代码逻辑基于传入的通道数量执行不同的选择。

当通道数量为 2 时
switch len(channels) {
case 2:
    select {
    case <-channels[0]:
    case <-channels[1]:
    }
}
  • 如果只有两个通道,select 语句会在这两个通道中选择一个,谁先接收到数据,select 就会执行哪个 case
当通道数量超过 2 时
default:
    select {
    case <-channels[0]:
    case <-channels[1]:
    case <-channels[2]:
    case <-c.Or(append(channels[3:], orDone)...):
    }
}
  • 如果传入的通道数量超过 2 个,select 语句会选择前三个通道中的一个,或者递归地调用 Or 方法来处理剩下的通道(通过 append(channels[3:], orDone) 将剩余的通道和 orDone 合并)。
  • 这样,代码递归地处理多个通道,直到其中一个通道接收到数据为止。

返回结果

return orDone
  • 最后,返回 orDone 通道,这个通道会在有数据时接收到第一个触发的通道的数据。

改进的理解

  1. 递归调用 Or:当通道的数量超过 2 个时,代码递归调用 Or 方法来处理剩余的通道。这是为了保证能处理大量通道的情况,并且能够避免重复的 select 语句。递归的基本思想是将传入的多个通道分成两部分,处理前两个通道和剩余的通道,再将剩余部分递归地传递给 Or 方法。
  2. select 的工作原理select 语句用于在多个通道中选择一个进行操作。它会阻塞直到其中一个通道可以接收到数据。当多个通道同时准备好时,select 会随机选择一个通道。
  3. 并发操作:由于 Or 方法中使用了 goroutine,所以它可以并发地从多个通道中接收数据,保证高效的并发处理。

使用场景

这个方法的实现适用于以下场景:

  • 你有多个通道,它们都在不同的地方生成数据。你希望从这些通道中获取第一个成功发送数据的通道,并对数据进行处理。
  • 你可能不关心是哪一个通道先发送数据,只要第一个发送数据的通道返回数据即可。

示例用法

以下是一个使用 Or 方法的示例:

package main

import (
    "fmt"
    "time"
)

type Channel[T any] struct{}

func (c *Channel[T]) Or(channels ...<-chan T) <-chan T {
    switch len(channels) {
    case 0:
        return nil
    case 1:
        return channels[0]
    }

    orDone := make(chan T)

    go func() {
        defer close(orDone)

        switch len(channels) {
        case 2:
            select {
            case v := <-channels[0]:
                orDone <- v
            case v := <-channels[1]:
                orDone <- v
            }
        default:
            select {
            case v := <-channels[0]:
                orDone <- v
            case v := <-channels[1]:
                orDone <- v
            case v := <-channels[2]:
                orDone <- v
            case v := <-c.Or(append(channels[3:], orDone)...):
                orDone <- v
            }
        }
    }()

    return orDone
}

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    ch3 := make(chan int)

    // 模拟在不同时间向通道发送数据
    go func() {
        time.Sleep(2 * time.Second)
        ch1 <- 1
    }()
    go func() {
        time.Sleep(1 * time.Second)
        ch2 <- 2
    }()
    go func() {
        time.Sleep(3 * time.Second)
        ch3 <- 3
    }()

    c := &Channel[int]{}
    resultCh := c.Or(ch1, ch2, ch3)

    // 等待第一个通道返回数据
    fmt.Println("接收到的值:", <-resultCh)
}

输出:

接收到的值: 2

在上面的示例中,Or 方法会返回第一个收到数据的通道。在 ch2 中的数据在 1 秒后就被发送,而 ch1ch3 会在稍后的时间发送数据。Or 方法确保接收的是 ch2 中的第一个数据。

总结

这个 Or 方法通过递归调用和 select 语句,使得多个通道可以并发地工作,并确保在接收到第一个数据时就返回结果。它的设计非常灵活,能够处理任意数量的通道,并且是并发编程中非常有用的工具,特别是在多通道数据处理时。


标题:lancet concurrency Or(channels ...<-chan T) <-chan T
作者:mooncakeee
地址:http://blog.dd95828.com/articles/2025/01/09/1736404918246.html
联系:scotttu@163.com