Saki's 研究记录

快速搭建本地Temporal环境并运行简单demo

字数统计: 566阅读时长: 3 min
2023/09/06

1. 运行 Temporal 集群

1
2
3
git clone https://github.com/temporalio/docker-compose.git
cd docker-compose
docker-compose up

2. 创建项目

创建项目目录:

1
mkdir hello-temporal

初始化项目:

1
go mod init hello-temporal

下载最新版本的 Go SDK(或者在最后执行go mod tidy):

1
2
3
go get -u go.temporal.io/sdk@latest
# 用于生成唯一 ID
go get github.com/google/uuid@v1.3.0

目录结构:

1
2
3
4
5
6
7
8
9
10
.
├── activity
│   └── say_hello.go
├── go.mod
├── go.sum
├── main.go
├── worker
│   └── main.go
└── workflow
└── handle_name.go

activity/say_hello.go:

1
2
3
4
5
6
7
8
9
10
11
package activity

import (
"context"
"fmt"
)

func SayHello(ctx context.Context, name string) (string, error) {
msg := fmt.Sprintf("Hello, %s!", name)
return msg, nil
}

workflow/handle_name.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package workflow

import (
"fmt"
"hello-temporal/activity"
"time"

"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)

func HandleName(ctx workflow.Context, name string) (string, error) {
retryPolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 500,
}
options := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
RetryPolicy: retryPolicy,
}

ctx = workflow.WithActivityOptions(ctx, options)
f := workflow.ExecuteActivity(ctx, activity.SayHello, name)

res := new(string)
err := f.Get(ctx, res)
if err != nil {
fmt.Printf("failed to say hello to %s, because %s\n", name, err)
return "", err
}
return *res, nil
}

worker/main.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"hello-temporal/activity"
"hello-temporal/workflow"
"log"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

func main() {
c, err := client.Dial(client.Options{
HostPort: "127.0.0.1:7233",
Namespace: "default",
})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()

w := worker.New(c, "test_task_queue_name", worker.Options{})
w.RegisterWorkflow(workflow.HandleName)
w.RegisterActivity(activity.SayHello)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("unable to start Worker", err)
}
}

main.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"context"
"fmt"
"hello-temporal/workflow"
"log"

"github.com/google/uuid"
"go.temporal.io/sdk/client"
)

func main() {
c, err := client.Dial(client.Options{
HostPort: "127.0.0.1:7233",
Namespace: "default",
})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()

options := client.StartWorkflowOptions{
ID: fmt.Sprintf("workflow_id-%s", uuid.New().String()),
TaskQueue: "test_task_queue_name",
}
workflowRun, err := c.ExecuteWorkflow(
context.Background(),
options, workflow.HandleName,
"Tim")
if err != nil {
log.Printf("failed to ExecuteWorkflow err: %s\n", err.Error())
return
}
log.Printf("Workflow Id: %s\n", workflowRun.GetID())
log.Printf("Run Id: %s\n", workflowRun.GetRunID())
var res string
if err := workflowRun.Get(context.Background(), &res); err != nil {
log.Fatalln("failed to execute Workflow", err)
}
log.Printf("result: %s\n", res)
}

3. 运行

启动 Worker

1
go run worker/main.go

在另外一个窗口,启动 Workflow

1
go run. main.go

以上。

CATALOG
  1. 1. 1. 运行 Temporal 集群
  2. 2. 2. 创建项目
  3. 3. 3. 运行