golang,go,博客,开源,编程

golang每日一库之工作流引擎Temporal

Updated on with 0 views and 0 comments

Temporal 是一个开源的分布式工作流编排系统,旨在简化构建和运行可靠、可扩展的长时间运行的后端应用程序。

它最初是由 Uber 的 Cadence 系统演变而来,现在由 Temporal Technologies 公司主导开发。

目前已被 Coinbase、Netflix、Box、Snap 等大规模应用。


核心功能

1. 分布式工作流编排

Temporal 支持在多台机器上协调执行复杂的业务逻辑。工作流可以在失败、重启甚至升级后继续执行,不需要人工干预。

2. 持久性与容错

所有工作流状态会被持久化(通常使用 Cassandra、MySQL、PostgreSQL 等后端数据库)。这使得工作流可以在服务崩溃或网络中断时恢复执行。

3. 异步任务和重试机制

Temporal 提供强大的异步任务支持,以及自动的、可配置的重试逻辑。即使下游服务暂时不可用,也能自动重试直到成功或达到重试上限。

4. 幂等与去重

通过事件驱动和有序日志处理机制,Temporal 可以保证工作流执行的幂等性,防止重复执行任务。

5. 编程语言支持

支持多种语言 SDK:

  • Go (temporalio/sdk-go)
  • Java (temporalio/sdk-java)
  • TypeScript/Node.js (temporalio/sdk-typescript)
  • Python(目前处于 beta 阶段)

组件

Temporal 系统分为以下关键组件:

1. Frontend Service

处理所有来自 SDK 的请求,是系统的入口点。

2. History Service

负责管理工作流执行的历史记录和状态。

3. Matching Service

处理任务队列,将任务分配给 worker。

4. Worker 服务

Worker 是由开发者实现的服务,用来实际执行工作流和活动(Activities)。Worker 通过 SDK 与 Temporal Server 通信。

5. Persistence Layer

数据持久化层,支持 Cassandra、MySQL、PostgreSQL,负责存储执行历史、任务队列、工作流状态等。


原理

  1. 开发者定义工作流和活动(Activity)函数
    func MyWorkflow(ctx workflow.Context, input string) error {
        err := workflow.ExecuteActivity(ctx, MyActivity, input).Get(ctx, nil)
        return err
    }
    
    func MyActivity(ctx context.Context, input string) error {
        // 实际的业务逻辑
        return nil
    }
    
  2. Worker 注册工作流和活动
    worker.RegisterWorkflow(MyWorkflow)
    worker.RegisterActivity(MyActivity)
    
  3. Client 启动工作流
    workflowOptions := client.StartWorkflowOptions{
        TaskQueue: "my-task-queue",
    }
    we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, MyWorkflow, "hello world")
    
  4. Temporal 保证工作流一致性与容错
    • 每一个状态变化都被写入数据库
    • 崩溃后可以继续执行
    • 支持定时器、信号、查询等复杂逻辑

架构组成

Temporal 是服务端-客户端架构,核心组件包括:

  1. Temporal Server(Go 实现)
    • Frontend Service:处理 SDK 请求
    • History Service:存储执行状态、历史记录
    • Matching Service:分配任务给 Worker
    • Worker Service:处理后台任务
  2. Client SDK
    • 提供语言级 API 用于定义工作流和活动
  3. Worker
    • 用户实现的服务端代码,用于运行工作流/活动
  4. 持久化层
    • 支持 MySQL、PostgreSQL、Cassandra、Elasticsearch(可选用于可观察性)

示例

示例 1:电商订单处理

下单 → 扣款 → 库存检查 → 发货 → 邮件通知
func OrderWorkflow(ctx workflow.Context, order Order) error {
    err := workflow.ExecuteActivity(ctx, DeductInventory, order.ItemID).Get(ctx, nil)
    if err != nil {
        return err
    }

    err = workflow.ExecuteActivity(ctx, ChargeCustomer, order.UserID, order.Amount).Get(ctx, nil)
    if err != nil {
        return err
    }

    err = workflow.ExecuteActivity(ctx, ShipOrder, order).Get(ctx, nil)
    return err
}

示例 2:数据处理流水线(ETL)

抓取数据 → 清洗 → 转换 → 存储

Temporal 可用于将这些阶段连接成一个可靠的工作流,自动处理失败和重试。


示例 3:视频转码处理

上传 → 分辨率转换 → 水印添加 → CDN 分发

适合需要 GPU 资源、运行时间长但失败成本高的流程。


示例 4:用户注册流程(Saga 模式)

创建账户 → 发放优惠券 → 推送欢迎邮件

若失败:
撤销账户 → 撤销优惠券

Temporal 非常适合处理分布式事务补偿逻辑


示例 5:金融服务自动化(贷款审批)

收集用户信息 → 第三方信用审核 → 风控策略 → 人工审核 → 通知审批结果

这些任务可能分散在多个服务、角色之间,Temporal 保证任务一致性。


示例 6:定时和周期性任务

每日结算 → 定期备份 → 周期性健康检查

工作流可利用定时器 workflow.Sleep() 或 cron 式触发机制轻松实现。

部署

使用官方 Docker Compose 快速运行本地 Temporal:

git clone https://github.com/temporalio/docker-compose.git
cd docker-compose
docker-compose up

标题:golang每日一库之工作流引擎Temporal
作者:mooncakeee
地址:http://blog.dd95828.com/articles/2025/06/03/1748918890556.html
联系:scotttu@163.com