Amazon AWS 简单工作流服务 SWF PHP 示例

Posted

技术标签:

【中文标题】Amazon AWS 简单工作流服务 SWF PHP 示例【英文标题】:Amazon AWS Simple Workflow Service SWF PHP Samples 【发布时间】:2014-05-11 00:23:51 【问题描述】:

我想知道是否有任何适用于 AWS phpSDK 2+ 的 SWF 工作流程 PHP 示例代码?

【问题讨论】:

【参考方案1】:

根据上面的帖子,我能够创建一个功能齐全的工作流程,该工作流程非常容易为您的目的进行修改。谢谢大家。在使用 composer 安装 AWS PHP SDK、分配正确的文件路径并在控制台中设置域、工作流类型和活动类型之后(这真的很简单),您可以使用下面的脚本按顺序执行流程。在发布之前,我能够自己执行它们。一切都设置好后运行 ExecuteFlow.php 就足够了!

8个脚本贴在这里(别担心,3个很短):

ExecuteFlow.php:

<?php 
//This script is used to begin a new workflow then activate the decider and workers to finish the job.
// 
//SWF scripts originally developed by ~ in April, 2018 based on the first comment in https://***.com/questions/22765377/amazon-aws-simple-workflow-service-swf-php-samples
//
//PROGRAMMER MODIFICATIONS sections are for updates to the Steps in Workflow, the rest of SWF Workflow should be automated with these scripts.
//
// Command Reference: 
//
// https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-swf-2012-01-25.html 
//
// For Reference: 
//          Workflow Type (Name) is MainWorkflow
//          Activity Type (Names) are Step 1, Step 2, and Step 3
//    Task List for MainWorkflow is MainTaskList
//           Task List for Step 1 is TaskList1, Step 2 is TaskList2, etc.
//
//
//Software Required:
//
//Installed AWS PHP SDK
//ExecuteFlow.php
//Decider.php
//Worker.php
//DescribeExecution.php
//TerminateFlow.php
//Test.php
//Test2.php
//Test3.php



//Once it's installed using composer, this accesses the PHP AWS SDK
require_once "PHP-AWS-SDK/autoload.php";


//This grabs the client from the SDK so we can use it to do our SWF bidding.
//(Note: the version should stay synonymous with the currently packaged SDK version so it works with our code.)
$client = new Aws\Swf\SwfClient([
    'version' => '2012-01-25',
    'region'  => 'us-east-1' 
    ]);



//------------------------------------------------------------------------------
//----------------------PROGRAMMER MODIFICATIONS--------------------------------
//------------------------------------------------------------------------------

//Variables that we want to carry through the workflow
$Domain_Name='MyDomainName';
$Activity_Type_Version = "1";
$Workflow_Task_List_Name="MainTaskList";
$TotalNumberofSteps=3;
$TimeAllottedforEachStep="300"; //seconds

//------------------------------------------------------------------------------
//------------------------------------------------------------------------------



//Clear the way for a new workflow


//How many workflows are open?
$result = $client->countOpenWorkflowExecutions([
    'domain' => $Domain_Name, 
    'startTimeFilter' => [
        'oldestDate' => 1524755000,
    ],
]);

$OpenWorkflowCount=$result["count"];


if ($OpenWorkflowCount <> 0)

//This loop terminates all the current workflows 
    for ($x = 1; $x <= $OpenWorkflowCount; $x++) 


$result = $client->listOpenWorkflowExecutions([
    'domain' => $Domain_Name, 
    'startTimeFilter' => [
        'oldestDate' => 1524755000,
    ],
]);

$OpenWorkflowID = $result["executionInfos"][0]["execution"]["workflowId"];
echo "You have an open workflow: $OpenWorkflowID.\n\n";
$WorkflowID=$OpenWorkflowID;
echo "Terminating Workflow $OpenWorkflowID...\n";
$Run_ID=$result["executionInfos"][0]["execution"]["runId"];
$Reason="Flow was terminated to clear the way for a new Workflow";
require 'TerminateFlow.php';







// Execution of the workflow begins now-----------------------------------------

// Generate a random workflow ID
$WorkflowID = mt_rand(1000, 9999);

//Turn it into a string for input to SWF
settype($WorkflowID,"string");


echo "\nRandomly Generated Workflow ID is ".$WorkflowID."\n";


//Let's get this party started.
$client->startWorkflowExecution([
    'domain' => $Domain_Name,
    'workflowId' => $WorkflowID,
    'workflowType' => [
        'name' => 'MainWorkflow',
        'version' => '1'
        ],
    'taskList' => [
        'name' => $Workflow_Task_List_Name
        ], 
    'input' => 'Starting a Workflow',
    //'executionStartToCloseTimeout' => '300',
   // 'taskStartToCloseTimeout' => '300',
   // 'childPolicy' => 'TERMINATE',
]);


echo "The workflow is now being executed!\n\n";
//echo "\nLet's ask the decider what to do next:\n";
require 'Decider.php';


//Here we will confirm completion of the workflow

 echo "Verifying Completion: \n";

$result = $client->describeWorkflowExecution([
    'domain' => $Domain_Name, 
    'execution' => [ 
        'runId' => $Run_ID,
        'workflowId' => $Workflow_ID,
    ],
]);


 $Finished = $result["executionInfo"]["closeStatus"];


 if ($Finished == 'COMPLETED')
 
 echo "$Finished \nWe have completed the workflow!\n";
 

 else 
  echo "$Finished \nCheck on your application.\n";
 


 //This is the amount of time it took to run the workflow

  $WorkflowStartTimestamp=$result["executionInfo"]["startTimestamp"];
 $WorkflowEndTimestamp=$result["executionInfo"]["closeTimestamp"];
 $wfinterval = $WorkflowStartTimestamp->diff($WorkflowEndTimestamp);
    $m=$wfinterval->format('%i minute(s)');
    $s=$wfinterval->format('%s second(s)');

    echo "Running time: $m $s.\n\n";

?>

决定者.php:

<?php

//DECIDER




// What's on the agenda today boss?
$result = $client->pollForDecisionTask(array(
    "domain" => $Domain_Name,
    "taskList" => array(
        "name" => $Workflow_Task_List_Name
    ),
     "identity" => "Decider is choosing whether to continue or end the workflow",
    // "maximumPageSize" => 50,
    "reverseOrder" => true //This makes sure the events are returned in reverse order. It makes it easier to tell which event is current/most recent (helps generate $WorkflowEventID variable below).
));



// Parse info we need returned from the pollForDecisionTask call
$task_token = $result["taskToken"];
$Workflow_ID = $result["workflowExecution"]["workflowId"];
$Run_ID=$result["workflowExecution"]["runId"];
$WorkflowEventID = $result["events"][0]["eventId"];


// Our logic that decides what happens next...
//If we have x steps, we will need 3+x*6 WorkflowEventID's before sending the command to end the workflow.
//Here's where this is automatically calculated:
$CalculatedEventID=3+$TotalNumberofSteps*6;


//Below, we decide on whether to continue the workflow.
//It would be simpler to skip this section and only modify the section below, but 
//it allows us to avoid redundancy (for example, "require 'Worker.php';" below).

if($WorkflowEventID < $CalculatedEventID)
    $task_list = $Workflow_Task_List_Name;
    $continue_workflow = true;


elseif($WorkflowEventID == $CalculatedEventID)
    $task_list = $Workflow_Task_List_Name;
    $continue_workflow = false;






//------------------------------------------------------------------------------
//----------------------PROGRAMMER MODIFICATIONS--------------------------------
//------------------------------------------------------------------------------
// Now that we populated our variables based on what we received 
// from SWF, we need to tell SWF what we want to do next

// These loops assign variables the appropriate names/values for each Step based
//on the Event ID we're on. These variables are carried through to the worker.
//Variable values should be synonymous with the activity types in the SWF console.


if($continue_workflow === true)

 //Decider STEP 1---------------------------------------------------------------
    if ($WorkflowEventID == "3")

 $Step = "Step 1";
     $Activity_Task_List_Name = "TaskList1";
     $Activity_ID = "1";
    // echo "The decider says its current Workflow ID is ".$Workflow_ID."\n\n";
 


 //Decider STEP 2---------------------------------------------------------------  
    elseif ($WorkflowEventID == "9")

 $Step = "Step 2";
    $Activity_ID = "2";
     $Activity_Task_List_Name = "TaskList2";
   

  //Decider STEP 3--------------------------------------------------------------
    elseif ($WorkflowEventID == "15")

 $Step = "Step 3";
    $Activity_ID = "3";
     $Activity_Task_List_Name = "TaskList3";
   

else 
echo "Something's Fishy\n\n";

//------------------------------------------------------------------------------
//------------------------------------------------------------------------------




//We mark the Decision Task Complete before moving to the Worker

   $client->respondDecisionTaskCompleted(array(
        "taskToken" => $task_token,
        "decisions" => array(
            array(
                "decisionType" => "ScheduleActivityTask",
                "scheduleActivityTaskDecisionAttributes" => array(
                    "activityType" => array(
                        "name" => $Step,
                        "version" => $Activity_Type_Version
                    ),
                    "activityId" => $Activity_ID,
                    "control" => "Have a great day!",
                    // Customize timeout values
                    //"scheduleToCloseTimeout" => "360",
                    //"scheduleToStartTimeout" => "300",
                    "startToCloseTimeout" => $TimeAllottedforEachStep,
                    //"heartbeatTimeout" => "60",
                    "taskList" => array(
                        "name" => $Workflow_Task_List_Name
                    ),
                    "input" => "$Step Assigned to Worker"
                )
            )
        )
    ));


//Here we have the worker do the decider's bidding. 
     require 'Worker.php';




//The Timestamp stuff and response here can also be placed at the end of the worker script.
//They are placed here to simplify the worker script since the decider is 
//really supposed to be doing all the 'non-executing stuff' stuff. 


    $ReverseDescribeorNot=False;
    require 'DescribeExecution.php';

    $TimestampStart = $result["events"][$WorkflowEventID]["eventTimestamp"];
    $TimestampEnd   = $result["events"][$WorkflowEventID+4]["eventTimestamp"];
    $interval = $TimestampStart->diff($TimestampEnd);
    $m=$interval->format('%i minute(s)');
    $s=$interval->format('%s second(s)');


    echo $Step."/".$TotalNumberofSteps." completed in $m $s.\n\n";






 //We require the Decider here because if we do so in the Worker script, the worker script is unable to finish before being executed again above. 
 //This allows the Decider to loop back up to the top and figure out where it's at in the workflow and decide from there. 
 //Although it puts the Decider into a loop of itself, the if statements make sure it executes the correct things based on its feeback from SWF.
     require 'Decider.php';






// End workflow if last event ID was the final WorkflowEventID
else if($continue_workflow === false)
    $client->respondDecisionTaskCompleted(array(
        "taskToken" => $task_token,
        "decisions" => array(

            array(
                "decisionType" => "CompleteWorkflowExecution",

            ),
        )
    ));



?>

Worker.php:

<?php

//WORKER


//Typically, for SWF, a worker is a service, script, or person that performs a specific function. 
//For our purposes, all our workers are php scripts.
//Therefore, in order to avoid redundancy, we simply use this single script to activate our workers.
//Because developers don't want to worry about putting a "pollForActivityTask" and "respondActivityTaskCompleted" call in each of their scripts,
//we simply make those calls here and execute each script when SWF is ready for it. 


//echo "\nNow, the worker is working.\n\n";



// Check with SWF for activities
$result = $client->pollForActivityTask([
    "domain" => "Cloud Optimization",
    "taskList" => [
        "name" => $Workflow_Task_List_Name
    ]
]);


// Take out task token from the response above

$task_token = $result["taskToken"];




//------------------------------------------------------------------------------
//----------------------PROGRAMMER MODIFICATIONS--------------------------------
//------------------------------------------------------------------------------
// This is where the Worker actually executes activities

//Worker STEP 1-----------------------------------------------------------------
if ($Step == 'Step 1') 

require 'Test.php';



//Worker STEP 2-----------------------------------------------------------------
elseif ($Step == 'Step 2') 

require 'Test2.php';



//Worker STEP 3-----------------------------------------------------------------
elseif ($Step == 'Step 3') 
require 'Test3.php';


else 
echo "Something's Super Duper Fishy\n\n";

//------------------------------------------------------------------------------
//------------------------------------------------------------------------------



 //If the Activity Timed out, we'll find out here before our app explodes

 //This grabs the eventType statement associated with the current Workflow EventID

 $ReverseDescribeorNot=True;
 require 'DescribeExecution.php';
 $MightHaveTimedOut = $result["events"][1]["eventType"];



if ($MightHaveTimedOut == "ActivityTaskTimedOut")

    echo "\n$Step Timed Out, Cancelling Workflow...\n";
    $Reason="$Step Activity Timed Out";
    require 'TerminateFlow.php';
    exit;


else 

    $ResultResponse=$Step."/".$TotalNumberofSteps." completed.";
// Tell SWF that we finished what we need to do on this node
$client->respondActivityTaskCompleted(array(
    "taskToken" => $task_token,
    "result" => $ResultResponse
));





//echo "\nAaand now back to the decider.\n";


?>

终止流.php:

<?php

$result = $client->terminateWorkflowExecution([
    'childPolicy' => 'TERMINATE',
    'details' => $Reason,
    'domain' => $Domain_Name, // REQUIRED
    'reason' => $Reason,
    'runId' => $Run_ID,
    'workflowId' => $WorkflowID, // REQUIRED
]);

echo "Workflow $WorkflowID Terminated\n\n";

?>

描述执行:

<?php



// echo "\n\nExecution History:\n\n";

$result = $client->getWorkflowExecutionHistory([
    'domain' => $Domain_Name, 
    'execution' => [ 
        'runId' => $Run_ID,
        'workflowId' => $Workflow_ID,
    ],

//     'maximumPageSize' => <integer>,
//   'nextPageToken' => '<string>',
    'reverseOrder' => $ReverseDescribeorNot,//true || false,
]);


?>

Test.php:

<?php

echo "Test.php is executing!\n";

?>

Test2.php:

<?php

echo "Test2.php is executing!\n";

?>

Test3.php:

<?php

echo "Test3.php is executing!\n";

?>

Terminal Output

只要脚本在同一个文件夹中,文件路径就不是问题。 祝你好运!

【讨论】:

【参考方案2】:

我找了一个教程,但找不到。最终,我使用 Ruby 和 Web API 浏览了文档和示例,并拼凑了使用 PHP SDK 的具体细节。

您需要做的第一件事是注册您的域、工作流程和活动。这可以通过 AWS 控制台或使用 PHP 开发工具包来完成。使用 SDK,使用以下内容:

<?php

require_once "path/to/aws.phar";

use Aws\Swf\SwfClient;

// Create an instance of the SWF class
$client = SwfClient::factory(array(
    "key" => "your_aws_key",
    "secret" => "your_aws_secret_key",
    "region" => "your_aws_region"
));

// Register your domain
$client->registerDomain(array(
    "name" => "domain name you want",
    "description" => "this is a test domain",
    "workflowExecutionRetentionPeriodInDays" => "7"
));

// Register your workflow
$client->registerWorkflowType(array(
    "domain" => "domain name you registered in previous call",
    "name" => "workflow name you want",
    "version" => "1.0",
    "description" => "this is a sample",
    "defaultTaskList" => array(
        "name" => "mainTaskList"
    ),
    "defaultChildPolicy" => "TERMINATE"
));

// Register an activity
$client->registerActivityType(array(
    "domain" => "domain name you registered above",
    "name" => "activity name you want",
    "version" => "1.0",
    "description" => "first activity in our workflow",
    "defaultTaskList" => array(
        "name" => "mainTaskList"
    )
));

// Follow activity registration example above and register 
// more activities as you wish

下一步是创建决策者。这是充当活动(工作)节点的协调节点的脚本。

// Ask SWF for things the decider needs to know
$result = $client->pollForDecisionTask(array(
    "domain" => "your domain name",
    "taskList" => array(
        "name" => "mainTaskList"
    ),
    "identify" => "default",
    "maximumPageSize" => 50,
    "reverseOrder" => true
));

// Current version of activity types we are using
$activity_type_version = "1.0";

// Parse info we need returned from the pollForDecisionTask call
$task_token = $result["taskToken"];
$workflow_id = $result["workflowExecution"]["workflowId"];
$run_id = $result["workflowExecution"]["runId"];
$last_event = $result["events"][0]["eventId"];

// Our logic that decides what happens next
if($last_event == "3")
    $activity_type_name = "activity to start if last event ID was 3";
    $task_list = "mainTaskList";
    $activity_id = "1";
    $continue_workflow = true;

elseif($last_event == "8")
    $activity_type_name = "activity to start if last event ID was 8";
    $task_list = "mainTaskList";
    $activity_id = "2";
    $continue_workflow = false;


// Now that we populated our variables based on what we received 
// from SWF, we need to tell SWF what we want to do next
if($continue_workflow === true)
    $client->respondDecisionTaskCompleted(array(
        "taskToken" => $task_token,
        "decisions" => array(
            array(
                "decisionType" => "ScheduleActivityTask",
                "scheduleActivityTaskDecisionAttributes" => array(
                    "activityType" => array(
                        "name" => $activity_type_name,
                        "version" => $activity_type_version
                    ),
                    "activityId" => $activity_id,
                    "control" => "this is a sample message",
                    // Customize timeout values
                    "scheduleToCloseTimeout" => "360",
                    "scheduleToStartTimeout" => "300",
                    "startToCloseTimeout" => "60",
                    "heartbeatTimeout" => "60",
                    "taskList" => array(
                        "name" => $task_list
                    ),
                    "input" => "this is a sample message"
                )
            )
        )
    ));

// End workflow if last event ID was 8
else if($continue_workflow === false)
    $client->respondDecisionTaskCompleted(array(
        "taskToken" => $task_token,
        "decisions" => array(
            array(
                "decisionType" => "CompleteWorkflowExecution"
            )
        )
    ));

最后一步是创建活动工作者。您可以使用以下格式启动它们:

// Check with SWF for activities
$result = $client->pollForActivityTask(array(
    "domain" => "domain name you registered",
    "taskList" => array(
        "name" => "mainTaskList"
    )
));

// Take out task token from the response above
$task_token = $result["taskToken"];

// Do things on the computer that this script is saved on
exec("my program i want to execute");

// Tell SWF that we finished what we need to do on this node
$client->respondActivityTaskCompleted(array(
    "taskToken" => $task_token,
    "result" => "I've finished!"
));

您的活动工作者和决策者的脚本可以放在任何服务器上。这些脚本都调用 SWF 以便相互通信。

最后,要启动您刚刚创建的工作流,请使用:

// Generate a random workflow ID
$workflowId = mt_rand(1000, 9999);

// Starts a new instance of our workflow
$client->startWorkflowExecution(array(
    "domain" => "your domain name",
    "workflowId" => $workflowId,
    "workflowType" => array(
        "name" => "your workflow name",
        "version" => "1.0"
    ),
    "taskList" => array(
        "name" => "mainTaskList"
    ),
    "input" => "a message goes here",
    "executionStartToCloseTimeout" => "300",
    'taskStartToCloseTimeout' => "300",
    "childPolicy" => "TERMINATE"
));

【讨论】:

那么 swf 对您的整体效果如何?推荐?看看我从哪里来devops.stackexchange.com/a/3034/4997

以上是关于Amazon AWS 简单工作流服务 SWF PHP 示例的主要内容,如果未能解决你的问题,请参考以下文章

Amazon SWF:至少必须有一名工作人员在运行,为啥?

在 Amazon SWF 中,我可以滥用决策任务来实际执行工作吗

适用于长期运行的业务工作流程的 Amazon SWF

基于 SQS 消息触发 SWF 工作流

创建我的 AWS (Amazon) 网站的本地映像

如何在内部使用 Amazon SWF?