需要 boto3 和 SWF 示例

Posted

技术标签:

【中文标题】需要 boto3 和 SWF 示例【英文标题】:boto3 and SWF example needed 【发布时间】:2015-12-19 17:23:48 【问题描述】:

亚马逊正在为未来的发展推广 boto3,但没有为新的 boto3 提供足够的文档。

有人愿意分享将 SWF 与 boto3 一起使用的示例代码吗?

【问题讨论】:

你找到了吗? 【参考方案1】:

这是我目前发现的唯一例子:

https://github.com/jhludwig/aws-swf-boto3

所以流程概述看起来像这样(注意这是直接从上面的链接中提取的,但添加了一些额外的注释和更多的流程)。

需要注意的是,SWF 对事物的名称进行操作。为这些名称赋予执行意义取决于您的代码。例如,您的Decider 将轮询并使用任务名称决定下一步。

有些事情我不太确定。 TASKLIST 我相信引用是一种命名空间。它不是真正的事物列表,它更多的是按名称隔离事物。现在我可能完全错了,从我的基本理解来看,这就是我认为的意思。

您可以在任何地方运行您的决策者和工人。由于它们可以访问 AWS,因此如果您的防火墙允许 0.0.0.0/0 出口,您将可以访问。

AWS 文档还提到您可以运行 lambda,但我还没有找到如何触发它。

创建boto3 swf客户端:

import boto3
from botocore.exceptions import ClientError

swf = boto3.client('swf')

创建域

try:
  swf.register_domain(
    name=<DOMAIN>,
    description="Test SWF domain",
    workflowExecutionRetentionPeriodInDays="10" # keep history for this long
  )
except ClientError as e:
    print "Domain already exists: ", e.response.get("Error", ).get("Code")

创建域后,我们现在注册工作流:

注册工作流程

try:
  swf.register_workflow_type(
    domain=DOMAIN, # string
    name=WORKFLOW, # string
    version=VERSION, # string
    description="Test workflow",
    defaultExecutionStartToCloseTimeout="250",
    defaultTaskStartToCloseTimeout="NONE",
    defaultChildPolicy="TERMINATE",
    defaultTaskList="name": TASKLIST  # TASKLIST is a string
  )
  print "Test workflow created!"
except ClientError as e:
  print "Workflow already exists: ", e.response.get("Error", ).get("Code")

注册工作流程后,我们现在可以开始分配任务了。

将任务分配给工作流。

您可以分配 N 个任务。请记住,这些主要是字符串,您的代码将赋予它们执行意义。

try:
  swf.register_activity_type(
    domain=DOMAIN,
    name="DoSomething",
    version=VERSION, # string
    description="This is a worker that does something",
    defaultTaskStartToCloseTimeout="NONE",
    defaultTaskList="name": TASKLIST  # TASKLIST is a string
  )
  print "Worker created!"
except ClientError as e:
  print "Activity already exists: ", e.response.get("Error", ).get("Code")

发送启动工作流

创建了域、工作流和任务后,我们现在可以开始工作流了。

import boto3

swf = boto3.client('swf')

response = swf.start_workflow_execution(
  domain=DOMAIN # string,
  workflowId='test-1001',
  workflowType=
    "name": WORKFLOW,# string
    "version": VERSION # string
  ,
  taskList=
      'name': TASKLIST
  ,
  input=''
)

print "Workflow requested: ", response

注意workflowId,这是一个自定义标识符,例如str(uuid.uuid4())。来自文档:

与工作流执行相关的用户定义标识符。您可以使用它来将自定义标识符与工作流执行相关联。如果工作流执行在逻辑上是先前执行的重新启动,您可以指定相同的标识符。您不能同时使用相同的 workflowId 进行两个打开的工作流执行。

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.start_workflow_execution

此时,什么都不会发生,因为我们没有运行Decider,也没有任何Workers。让我们看看它们是什么样子的。

决策者

我们的决策者将轮询以获取一个决策任务来做出决策:

import boto3
from botocore.client import Config
import uuid

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)

注意上面的超时设置。您可以参考此 PR 以了解其背后的原理:

https://github.com/boto/botocore/pull/634

来自 Boto3 SWF 文档:

工作人员应将其客户端套接字超时设置为至少 70 秒(比服务可能持有轮询请求的最长时间高 10 秒)。

正是这个 PR 使 boto3 能够执行该功能。

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_decision_task

print "Listening for Decision Tasks"

while True:

  newTask = swf.poll_for_decision_task(
    domain=DOMAIN ,
    taskList='name': TASKLIST , # TASKLIST is a string
    identity='decider-1', # any identity you would like to provide, it's recorded in the history
    reverseOrder=False)

  if 'taskToken' not in newTask:
    print "Poll timed out, no new task.  Repoll"

  elif 'events' in newTask:

    eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
    lastEvent = eventHistory[-1]

    if lastEvent['eventType'] == 'WorkflowExecutionStarted':
      print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType']
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          
            'decisionType': 'ScheduleActivityTask',
            'scheduleActivityTaskDecisionAttributes': 
                'activityType':
                    'name': TASKNAME, # string
                    'version': VERSION # string
                    ,
                'activityId': 'activityid-' + str(uuid.uuid4()),
                'input': '',
                'scheduleToCloseTimeout': 'NONE',
                'scheduleToStartTimeout': 'NONE',
                'startToCloseTimeout': 'NONE',
                'heartbeatTimeout': 'NONE',
                'taskList': 'name': TASKLIST, # TASKLIST is a string
            
          
        ]
      )
      print "Task Dispatched:", newTask['taskToken']

    elif lastEvent['eventType'] == 'ActivityTaskCompleted':
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          
            'decisionType': 'CompleteWorkflowExecution',
            'completeWorkflowExecutionDecisionAttributes': 
              'result': 'success'
            
          
        ]
      )
      print "Task Completed!"

请注意,在此 sn-p 结束时,我们会检查是否有 ActivityTaskCompleted,并以决定 CompleteWorkflowExecution 进行回应,让 SWF 知道我们已经完成了。

这是决定者,工人是什么样的?

工人

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_activity_task

再次注意,我们设置read_timeout

import boto3
from botocore.client import Config

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)

现在我们开始我们的工人轮询:

print "Listening for Worker Tasks"

while True:

  task = swf.poll_for_activity_task(
    domain=DOMAIN,# string
    taskList='name': TASKLIST, # TASKLIST is a string
    identity='worker-1') # identity is for our history

  if 'taskToken' not in task:
    print "Poll timed out, no new task.  Repoll"

  else:
    print "New task arrived"

    swf.respond_activity_task_completed(
        taskToken=task['taskToken'],
        result='success'
    )

    print "Task Done"

我们再次向 SWF 发出我们已经完成工作的信号。

【讨论】:

虽然理论上可以回答这个问题,it would be preferable 在这里包含答案的基本部分,并提供链接以供参考。 完全明白为什么,我现在正在编辑它,将相关部分从链接中拉出并添加一些额外的注释。 现在回答很彻底。 Mod's 应该删除因缺乏信息而投下的反对票。 它什么也不做,只是确认任务完成。该过程是:通知 SWF 开始 -> SWF 通知您的决策者 -> 决策者决定安排活动任务:ScheduleActivityTask 或结束工作流:CompleteWorkflowExecution。如果它安排任务,则工作人员会收到带有工作流历史记录的请求。在示例中,它只是立即说“我完成了”respond_activity_task_completed。它将它踢回决策者,然后决策者将CompleteWorkflowExecution 发送到 SWF。 github.com/blitzagency/flowbee 一个包装器,pypi 上还有更多【参考方案2】:

官方文档的链接是[这里][1]。

这里有很多代码示例,只需点击链接或 [this][2] 之一即可。在可用服务部分下,它列出了 boto3 现在支持的所有服务以及详细示例。

一些例子是: boto3 并获取 SWF 的执行次数

import boto3
import datetime
import time
import dateutil.tz

def lambda_handler(event,context):
    swfClient = boto3.client('swf')
    currentTimeZone = dateutil.tz.gettz('Australia/Brisbane')
    latestDate = datetime.datetime.now(tz=currentTimeZone)
    oldestDate = latestDate - datetime.timedelta(1)

    fullTextPreloadResponse = swfClient.count_open_workflow_executions(
         domain=domainName,
         startTimeFilter=
             'oldestDate': oldestDate,
             'latestDate': latestDate
         ,
         typeFilter=
             'name': 'NAME_OF_YOUR_SWF_WORKFLOW_NAME',
             'version': 'VERSION_NUMBER'
         
     )
     print("the count is " + str(fullTextResponse['count']))
     print(fullTextResponse)

这是我在我的案例中用来获取正在运行的 SWF 工作流类型的计数的方法。我使用的格式在上面提到的文档中有很好的定义。

要简单地同时使用 boto3 和 SWF,首先要在 python lambda 函数中导入 boto3。然后正在添加 python DateTime。然后一个 boto3.client 设置我们可以使用的客户端 |与 SWF 交互。

其他示例如下:

history = swf.get_workflow_execution_history(
            domain= domainName,
            execution=
                'workflowId': workflowId,
                'runId': runId
            ,
        )

希望这个对你有帮助! [1]:https://boto3.amazonaws.com/v1/documentation/api/latest/index.html [2]:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html

【讨论】:

添加一些示例和解释比复制 URL 更好。 @mate00 感谢您的反馈。我已经更新了我的答案。干杯。

以上是关于需要 boto3 和 SWF 示例的主要内容,如果未能解决你的问题,请参考以下文章

markdown 使用GetParameter从SSM获取秘密使用Python和Boto3的示例

如何获取 swf 文件中的总帧数?

需要用于 ECS 帮助的 Python Boto3

正在寻找将 aws pig 步骤注入已经运行的 emr 的 boto3 python 示例?

python 最简单的boto3示例,用于创建RDS PostgreSQL实例

python 使用boto3列出正在运行的EC2实例的示例