// Bridge link multiply channels into one channel. // Play: https://go.dev/play/p/qmWSy1NVF-Y func (c *Channel[T]) Bridge(ctx context.Context, chanStream <-chan <-chan T) <-chan T { valStream := make(chan T) go func() { defer close(valStream) for { var stream <-chan T select { case maybeStream, ok := <-chanStream: if !ok { return } stream = maybeStream case <-ctx.Done(): return } for val := range c.OrDone(ctx, stream) { select { case valStream <- val: case <-ctx.Done(): }.... lancet concurrency Bridge lancet
// FanIn merge multiple channels into one channel. // Play: https://go.dev/play/p/2VYFMexEvTm func (c *Channel[T]) FanIn(ctx context.Context, channels ...<-chan T) <-chan T { out := make(chan T) go func() { var wg sync.WaitGroup wg.Add(len(channels)) for _, c := range channels { go func(c <-chan T) { defer wg.Done() for v := range c { select { case <-ctx.Done(): return case out <- v: } } }(c) } wg.Wait() close(out) }() return out } 你提供的 FanIn 函数用于将多个输入通道(channels)合并成一个输出通道(out)。这.... 有更新! lancet concurrency FanIn lancet
// Tee split one chanel into two channels, until cancel the context. // Play: https://go.dev/play/p/3TQPKnCirrP func (c *Channel[T]) Tee(ctx context.Context, in <-chan T) (<-chan T, <-chan T) { out1 := make(chan T) out2 := make(chan T) go func() { defer close(out1) defer close(out2) for val := range c.OrDone(ctx, in) { var out1, out2 = out1, out2 for i := 0; i < 2; i++ { select { case <-ctx.Done(): case out1 <- val: out1 = nil case out2 <- val: out2 = nil } } } }() return ou.... lancet concurrency Tee lancet
import ( "context" "fmt" "github.com/duke-git/lancet/v2/concurrency" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() c := concurrency.NewChannel[int]() genVals := func() <-chan <-chan int { out := make(chan (<-chan int)) go func() { defer close(out) for i := 1; i <= 5; i++ { stream := make(chan int, 1) stream <- i close(stream) out <- stream } }() return out } for v := range c.Bridge(ctx, genVals()) { fmt.Println(v) } // Output: // 1 // 2 .... lacet concurrency Bridge示例 lancet
// Or read one or more channels into one channel, will close when any readin channel is closed. // Play: https://go.dev/play/p/Wqz9rwioPww func (c *Channel[T]) Or(channels ...<-chan T) <-chan T { switch len(channels) { case 0: return nil case 1: return channels[0] } orDone := make(chan T) go func() { defer close(orDone) switch len(channels) { case 2: select { case <-channels[0]: case <-channels[1]: } default: select { case <-channels[0]: case <-channels[1]: case <-channels[2.... lancet concurrency Or(channels ...<-chan T) <-chan T lancet