golang,go,博客,开源,编程
goflow
是一个基于 Go 语言的高性能、可扩展、分布式的工作流框架,由 GitHub 用户 s8sg 开发。它允许开发者以编程方式将分布式工作流定义为任务的有向无环图(DAG),并通过多个工作节点(Worker)均匀分配负载来执行任务。
goflow
允许用户以 DAG 的形式定义工作流,每个节点代表一个任务,边表示任务之间的依赖关系。这种结构使得任务的执行顺序清晰,便于管理复杂的工作流程。
任务可以分布在多个 Worker 上执行,goflow
通过均匀分配负载的方式,确保各个 Worker 的工作量平衡,从而提高整体执行效率。
goflow
提供了 FlowService
结构体,允许用户配置服务的端口、Redis 地址、OpenTrace 地址、Worker 并发数等参数,便于根据实际需求进行扩展和调整。
框架支持集成 OpenTracing,提供对工作流执行过程的监控和追踪功能,帮助开发者及时发现和解决问题。
goflow
使用 Redis 作为后端存储,管理工作流的状态和任务队列,确保任务的可靠执行和状态的持久化。
go mod init myflow
go get github.com/s8sg/goflow@master
创建一个名为 flow.go
的文件,并添加以下代码:
package main
import (
"fmt"
goflow "github.com/s8sg/goflow/v1"
flow "github.com/s8sg/goflow/flow/v1"
)
// 定义任务函数
func doSomething(data []byte, option map[string][]string) ([]byte, error) {
return []byte(fmt.Sprintf("你说了:\"%s\"", string(data))), nil
}
// 定义工作流
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
dag := workflow.Dag()
dag.Node("test", doSomething)
return nil
}
func main() {
fs := &goflow.FlowService{
Port: 8080,
RedisURL: "localhost:6379",
OpenTraceUrl: "localhost:5775",
WorkerConcurrency: 5,
EnableMonitoring: true,
}
fs.Register("myflow", DefineWorkflow)
fs.Start()
}
上述代码中,我们定义了一个名为 doSomething
的任务函数,并在工作流中添加了一个名为 test
的节点来执行该任务。然后,我们配置并启动了 FlowService
,监听 8080 端口,连接本地的 Redis 和 OpenTrace 服务,设置 Worker 的并发数为 5,并启用了监控功能。
goflow
支持在工作流中添加条件判断和分支控制,允许根据不同的条件路径执行不同的任务序列,增强了工作流的灵活性。
框架支持在工作流中定义循环结构和子流程,便于处理重复性任务和模块化的流程设计。
通过 Redis 的支持,goflow
能够持久化工作流的状态,确保在系统重启或故障恢复后,能够继续执行未完成的任务。
goflow
提供了多个示例,展示了不同的工作流模式:
该示例展示了如何并行执行多个任务节点。在工作流中,多个节点可以同时处理相同的输入数据,从而提高处理效率。
要运行该示例:
构建项目:
make
启动依赖服务:
docker-compose down && docker-compose up -d
运行示例:
./examples
使用 curl
执行工作流:
curl -d hello localhost:8080/flow/parallel
此命令将向 parallel
工作流发送数据,触发并行执行的任务节点。
该示例展示了如何根据条件动态选择执行路径。工作流中的某个节点根据输入数据的特定条件,决定后续执行哪个分支。
例如,根据面部识别的结果,决定是执行 face-match
任务还是 create-user
任务。
要运行该示例,请按照与 parallel
示例相同的步骤,将 parallel
替换为 condition
。
goflow
提供了 ConditionalBranch
方法,用于根据条件动态选择执行路径。例如:
branches := dag.ConditionalBranch("handle-response", []string{"pass", "fail"}, func(response []byte) []string {
if string(response) == "pass" {
return []string{"pass"}
}
return []string{"fail"}
})
branches["pass"].Node("process-pass", processPass)
branches["fail"].Node("process-fail", processFail)
在上述代码中,根据 response
的内容,工作流将选择执行 process-pass
或 process-fail
节点。
goflow
支持将一个完整的 DAG 嵌套为另一个工作流的子节点,便于模块化和复用。例如:
func SubWorkflow() *flow.Dag {
dag := flow.NewDag()
dag.Node("step1", step1)
dag.Node("step2", step2)
dag.Edge("step1", "step2")
return dag
}
func DefineWorkflow(f *flow.Workflow, context *flow.Context) error {
dag := f.Dag()
dag.Node("start", start)
dag.SubDag("subflow", SubWorkflow())
dag.Node("end", end)
dag.Edge("start", "subflow")
dag.Edge("subflow", "end")
return nil
}
在上述代码中,SubWorkflow
定义了一个子工作流,包含两个步骤 step1
和 step2
。主工作流中,subflow
节点嵌套了该子工作流,实现了模块化设计。
🌰就举到这里。
goflow
是一个功能强大且灵活的工作流框架,适用于需要处理复杂任务编排和分布式执行的场景。
作者的readme和examples写的很详细,如果对这个库感兴趣,建议通读readme和examples。