golang,go,博客,开源,编程

golang每日一库之fastflow

Updated on with 0 views and 0 comments

今天介绍一个工作流库。

Fastflow 是一个由 ShiningRush 开发的基于 Go 语言的轻量级、高性能分布式工作流框架。

它旨在解决复杂任务流的调度与执行问题,特别适用于如离线任务、Kubernetes 集群管理、容器迁移等场景。


特点

  1. 基于 DAG 的工作流模型
    Fastflow 使用有向无环图(DAG)定义任务之间的依赖关系。每个节点(Task)代表一个操作,只有其所有依赖任务成功完成后,才会被触发执行。
  2. 高性能与并发执行
    借助 Go 的协程和 channel 技术,Fastflow 能在单个实例上并行处理数百到数万个任务,满足高并发需求。
  3. 可观测性
    集成 Prometheus,Fastflow 提供任务执行的实时指标,如并发任务数、任务分发时间等,便于监控和调试。
  4. 水平扩展与负载均衡
    支持多实例部署,通过 Leader 选举机制实现节点间的负载均衡,确保系统在高负载下的稳定性。
  5. 易用的 API 与扩展性
    提供开箱即用的 API,支持通过编程或 YAML 定义工作流。用户可以自定义任务行为(Action),并根据上下文决定是否跳过任务。
  6. 轻量级集成
    作为基础框架,Fastflow 易于集成到现有项目中,无需部署额外的服务,降低了引入成本。

基本概念

  • Dag:定义整个工作流的结构,包括任务节点及其依赖关系。
  • Task:工作流中的单个任务节点,指定执行的 Action 及其参数。
  • Action:任务的具体执行逻辑,用户可以自定义实现。
  • DagInstance:Dag 的一次运行实例,包含执行状态和上下文信息。

工作流模型


示例

以下是一个使用 Fastflow 定义并运行简单工作流的示例:

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/shiningrush/fastflow"
	mongoKeeper "github.com/shiningrush/fastflow/keeper/mongo"
	"github.com/shiningrush/fastflow/pkg/entity/run"
	"github.com/shiningrush/fastflow/pkg/mod"
	mongoStore "github.com/shiningrush/fastflow/store/mongo"
)

type PrintAction struct {
}

// Name define the unique action identity, it will be used by Task
func (a *PrintAction) Name() string {
	return "PrintAction"
}
func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error {
	fmt.Println("action start: ", time.Now())
	return nil
}

func main() {
	// Register action
	fastflow.RegisterAction([]run.Action{
		&PrintAction{},
	})

	// init keeper, it used to e
	keeper := mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
		Key:      "worker-1",
    // if your mongo does not set user/pwd, youshould remove it
		ConnStr:  "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin",
		Database: "mongo-demo",
		Prefix:   "test",
	})
	if err := keeper.Init(); err != nil {
		log.Fatal(fmt.Errorf("init keeper failed: %w", err))
	}

	// init store
	st := mongoStore.NewStore(&mongoStore.StoreOption{
    // if your mongo does not set user/pwd, youshould remove it
		ConnStr:  "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin",
		Database: "mongo-demo",
		Prefix:   "test",
	})
	if err := st.Init(); err != nil {
		log.Fatal(fmt.Errorf("init store failed: %w", err))
	}

	go createDagAndInstance()

	// start fastflow
	if err := fastflow.Start(&fastflow.InitialOption{
		Keeper: keeper,
		Store:  st,
		// use yaml to define dag
		ReadDagFromDir: "./",
	}); err != nil {
		panic(fmt.Sprintf("init fastflow failed: %s", err))
	}
}

func createDagAndInstance() {
	// wait fast start completed
	time.Sleep(time.Second)

	// run some dag instance
	for i := 0; i < 10; i++ {
		_, err := mod.GetCommander().RunDag("test-dag", nil)
		if err != nil {
			log.Fatal(err)
		}
		time.Sleep(time.Second * 10)
	}
}

适用场景

  • 离线任务调度:如日志处理、数据清洗等批处理任务。
  • Kubernetes 集群管理:自动化集群的创建、扩缩容、节点上下线等操作。
  • 容器迁移与运维:实现容器的自动迁移、服务重启等流程。
  • CI/CD 流程编排:构建、测试、部署等持续集成与交付流程的自动化

项目地址

https://github.com/ShiningRush/fastflow

项目的readme写的非常全面,建议仔细阅读。


标题:golang每日一库之fastflow
作者:mooncakeee
地址:http://blog.dd95828.com/articles/2025/05/28/1748395093514.html
联系:scotttu@163.com