golang,go,博客,开源,编程

golang每日一库之goflow

Updated on with 0 views and 0 comments

goflow 是一个基于 Go 语言的高性能、可扩展、分布式的工作流框架,由 GitHub 用户 s8sg 开发。它允许开发者以编程方式将分布式工作流定义为任务的有向无环图(DAG),并通过多个工作节点(Worker)均匀分配负载来执行任务。


核心特性

1. DAG 构建与任务编排

goflow 允许用户以 DAG 的形式定义工作流,每个节点代表一个任务,边表示任务之间的依赖关系。这种结构使得任务的执行顺序清晰,便于管理复杂的工作流程。

2. 分布式执行与负载均衡

任务可以分布在多个 Worker 上执行,goflow 通过均匀分配负载的方式,确保各个 Worker 的工作量平衡,从而提高整体执行效率。

3. 可扩展的服务架构

goflow 提供了 FlowService 结构体,允许用户配置服务的端口、Redis 地址、OpenTrace 地址、Worker 并发数等参数,便于根据实际需求进行扩展和调整。

4. 内置监控与可观测性

框架支持集成 OpenTracing,提供对工作流执行过程的监控和追踪功能,帮助开发者及时发现和解决问题。

5. Redis 支持

goflow 使用 Redis 作为后端存储,管理工作流的状态和任务队列,确保任务的可靠执行和状态的持久化。


安装入门

安装

go mod init myflow
go get github.com/s8sg/goflow@master

编写第一个工作流

创建一个名为 flow.go 的文件,并添加以下代码:

package main

import (
	"fmt"
	goflow "github.com/s8sg/goflow/v1"
	flow "github.com/s8sg/goflow/flow/v1"
)

// 定义任务函数
func doSomething(data []byte, option map[string][]string) ([]byte, error) {
	return []byte(fmt.Sprintf("你说了:\"%s\"", string(data))), nil
}

// 定义工作流
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
	dag := workflow.Dag()
	dag.Node("test", doSomething)
	return nil
}

func main() {
	fs := &goflow.FlowService{
		Port:              8080,
		RedisURL:          "localhost:6379",
		OpenTraceUrl:      "localhost:5775",
		WorkerConcurrency: 5,
		EnableMonitoring:  true,
	}
	fs.Register("myflow", DefineWorkflow)
	fs.Start()
}

上述代码中,我们定义了一个名为 doSomething 的任务函数,并在工作流中添加了一个名为 test 的节点来执行该任务。然后,我们配置并启动了 FlowService,监听 8080 端口,连接本地的 Redis 和 OpenTrace 服务,设置 Worker 的并发数为 5,并启用了监控功能。


高级功能

1. 条件执行与分支控制

goflow 支持在工作流中添加条件判断和分支控制,允许根据不同的条件路径执行不同的任务序列,增强了工作流的灵活性。

2. 循环与子流程

框架支持在工作流中定义循环结构和子流程,便于处理重复性任务和模块化的流程设计。

3. 状态管理与持久化

通过 Redis 的支持,goflow 能够持久化工作流的状态,确保在系统重启或故障恢复后,能够继续执行未完成的任务。

示例

goflow 提供了多个示例,展示了不同的工作流模式:

  • single:单个节点的简单工作流。
  • serial:串行执行的多个节点。
  • parallel:并行执行的多个节点。
  • condition:包含条件分支的工作流。

1. 并行执行示例(parallel)

该示例展示了如何并行执行多个任务节点。在工作流中,多个节点可以同时处理相同的输入数据,从而提高处理效率。

要运行该示例:

  1. 构建项目:

    make
    
  2. 启动依赖服务:

    docker-compose down && docker-compose up -d
    
  3. 运行示例:

    ./examples
    
  4. 使用 curl 执行工作流:

    curl -d hello localhost:8080/flow/parallel
    

此命令将向 parallel 工作流发送数据,触发并行执行的任务节点。

2. 条件分支示例(condition)

该示例展示了如何根据条件动态选择执行路径。工作流中的某个节点根据输入数据的特定条件,决定后续执行哪个分支。

例如,根据面部识别的结果,决定是执行 face-match 任务还是 create-user 任务。

要运行该示例,请按照与 parallel 示例相同的步骤,将 parallel 替换为 condition

高级功能示例

1. 条件分支(Conditional Branching)

goflow 提供了 ConditionalBranch 方法,用于根据条件动态选择执行路径。例如:

branches := dag.ConditionalBranch("handle-response", []string{"pass", "fail"}, func(response []byte) []string {
    if string(response) == "pass" {
        return []string{"pass"}
    }
    return []string{"fail"}
})
branches["pass"].Node("process-pass", processPass)
branches["fail"].Node("process-fail", processFail)

在上述代码中,根据 response 的内容,工作流将选择执行 process-passprocess-fail 节点。

2. 子工作流(SubDAG)

goflow 支持将一个完整的 DAG 嵌套为另一个工作流的子节点,便于模块化和复用。例如:

func SubWorkflow() *flow.Dag {
    dag := flow.NewDag()
    dag.Node("step1", step1)
    dag.Node("step2", step2)
    dag.Edge("step1", "step2")
    return dag
}

func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
    dag := f.Dag()
    dag.Node("start", start)
    dag.SubDag("subflow", SubWorkflow())
    dag.Node("end", end)
    dag.Edge("start", "subflow")
    dag.Edge("subflow", "end")
    return nil
}

在上述代码中,SubWorkflow 定义了一个子工作流,包含两个步骤 step1step2。主工作流中,subflow 节点嵌套了该子工作流,实现了模块化设计。

🌰就举到这里。

最后

goflow 是一个功能强大且灵活的工作流框架,适用于需要处理复杂任务编排和分布式执行的场景。

作者的readme和examples写的很详细,如果对这个库感兴趣,建议通读readme和examples。


标题:golang每日一库之goflow
作者:mooncakeee
地址:http://blog.dd95828.com/articles/2025/05/29/1748481669444.html
联系:scotttu@163.com