AWS

2024 Hackathon: ASW Step Function Feedback

From the AWS Samples on Github, we take a look at how to get real-time feedback from Step Functions


Introduction and Installation

During our bi-annual Hackathon held on October 16-18, 2024, Metal Toad looked at the Amazon AWS Sample repositories to learn more about the technologies and architectures. There are over 6,500 repositories with code samples and ideas on how to implement a wide variety of projects. In the Hackathon, each team chose a project to examine.

One limitation of the AWS Step Functions is that the end user has no idea how the function is progressing, unless he accesses the AWS Console and checks how the function works.

Here, we'll look into a way to return a status report to a Step Function in real time, using a web socket to communicate with the user's frontend.

We assume you know a little big about web sockets, AWS API Gateways, Lamdba Functions and Step Functions.

The AWS Sample project for this can be downloaded here:

https://github.com/aws-samples/aws-step-functions-progress-tracking

This has instructions on how to install the necessary components, and a simplified diagram showing how the Step Function interacts with other components. During installation, the web socket endpoint is saved, and we'll use it later to test the API.

Installation creates the WebSocket API, 5 Lambda Functions, a DynamoDB table, and a Step Function. The Step Function shows two ways to report the status back to the front end, which we'll see below.

To maintain a connection to the user's frontend, it uses a Websocket API. With the web socket, we'll have a connect function, disconnect function, and a function to trigger the step function.

How it works

Examining the API, there is are three routes: $connect; $disconnect and onOrder:

image-20241022111048514

$connect will call a lambda that writes an item to the DynamoDB table, $disconnect calls a lambda that removes the item from DynamoDB, and onOrder calls a lambda that triggers the step function. Here are code snippets from each Lambda Function:

# onConnect function

ddb = boto3.client('dynamodb')
table_name = os.environ['TABLE_NAME']

def handler(event, context):
   connection_id = event['requestContext']['connectionId']
   domain_name = event['requestContext']['domainName']
   stage = event['requestContext']['stage']
   add_connection_id(connection_id, domain_name, stage)
   return {
       'statusCode': 200
  }

def add_connection_id(connection_id, domain_name, stage):
   print(connection_id)
   ddb.put_item(
       TableName=table_name,
       Item={
           'connectionId': {'S': connection_id},
           'domainName': {'S': domain_name},
           'stage': {'S': stage}
      }
  )
# OnDisconnect function

ddb = boto3.client('dynamodb')
table_name = os.environ['TABLE_NAME']

def handler(event, context):
   connection_id = event['requestContext']['connectionId']
   delete_connection_id(connection_id)
   return {
       'statusCode': 200
  }

def delete_connection_id(connection_id):
   ddb.delete_item(
       TableName=table_name,
       Key={
           'connectionId': {'S': connection_id}
      }
  )
# OnOrder function

client = boto3.client('stepfunctions')
stepfunctions_arn = os.environ['STEP_FUNCTIONS_ARN']

def handler(event, context):
   connectionId = event['requestContext']['connectionId']

   client.start_execution(
       stateMachineArn=stepfunctions_arn,
       input=json.dumps({'ConnectionId': connectionId})
  )
           
   return {
       'statusCode': 200
  }

The Table Name and Step Function ARN are retrieved from environment variables for each Lambda Function.

The Step Function has the following states:

image-20241022112249248

All of the "Report" states send feedback via the WebSocket API, except "Label Generation Complete", which uses a Lambda Function to send the data directly to the web socket. This is to illustrate the different ways you can communicate with the front end application.

Calling the API Gateway with the step function directly uses the following state:

{
   "Report: Workflow started": {
     "Next": "Mock: Inventory check",
     "Parameters": {
       "ApiEndpoint": "wxyz1234.execute-api.us-west-2.amazonaws.com",
       "AuthType": "IAM_ROLE",
       "Method": "POST",
       "Path.$": "States.Format('/@connections/{}', $.ConnectionId)",
       "RequestBody": {
         "Message": "🥁 Workflow started",
         "Progress": 10
      },
       "Stage": "Prod"
    },
     "Resource": "arn:aws:states:::apigateway:invoke",
     "ResultPath": "$.Params",
     "Type": "Task"
  }

The "Label Generation Complete" state invokes a Lambda (ReportProgressFunction) to send a message directly to the connection:

# Report Progress function

ddb_client = boto3.client('dynamodb', region_name=os.environ['AWS_REGION'])

table_name = os.environ['TABLE_NAME']
api_url = os.environ['API_URL']

def handler(event, context):
   print(event)
   print(f'Endpoint URL: {api_url}')

   connection_id = event['ConnectionId']

   apigw_management_api_client = boto3.client('apigatewaymanagementapi', endpoint_url=api_url)

   try:
       apigw_management_api_client.post_to_connection(
           ConnectionId=connection_id,
           Data=bytes(json.dumps(event), 'utf-8')
      )
   except apigw_management_api_client.exceptions.GoneException as e:
       if e.response['Error']['Code'] == 410:
           print(f"Found stale connection, deleting {connection_id}")
           ddb_client.delete_item(TableName=table_name, Key={'connectionId': {'S': connection_id}})
       else:
           raise e

   return {
       'ConnectionId': event['ConnectionId']
  }

During the connect function, an item is saved in a DynamoDB table (called ConnectionsTable), using the connection ID as item key, and saving the domain name (WebSocket API endpoint), along with the web socket stage (this can be considered the version).

Seeing it in action

To test the web socket, we'll be using Postman. To create the connection to the web socket, in Postman, click on "New" and choose WebSocket.

In the URL field, put the address logged during the installation, or go into API Gateway, find the "ProgressTrackingWebsocketAPI", then in API settings you have the endpoint:

image-20241022110213783

Don't forget to add the stage to the end of the websocket address, which can be found on the Stages tab:

image-20241022110624918

So the address would be something like this: wss://wxyz1234.execute-api.us-west-2.amazonaws.com/Prod. In Postman:

image-20241022110923695

If you click on the connect button, Postman will establish a web socket connection to the API. Checking the DynamoDB table you will find an item for the connection like this:

image-20241022113922006

Now we can invoke the Step Function and see the responses sent back. Under message, enter the following json, and click on Send.

{ "action": "onOrder" }

You should see the "Messages" fill out with the responses from the web socket connection:

image-20241022114606380

Note that the messages have embedded unicode emoticons.

Concluding

One downside to the sample is that only sends the data over a web socket connection, so if the connection is broken (triggering the disconnect and removing the item from the DynamoDB table), the step function fails when the next "report" function is called. An improvement would be for the frontend application to save the connection ID, and status information, and add an endpoint to the API Gateway where we could poll the status.

We hope this has been interesting reading, and that you learned something new. Please don't hesitate to contact us for any of your AWS or web application needs!

Similar posts

Get notified on new marketing insights

Be the first to know about new B2B SaaS Marketing insights to build or refine your marketing function with the tools and knowledge of today’s industry.