CloudWatch Filter Pattern

When an AWS lambda crashes it doesn't get a chance to leave an exception. We will use filter patterns on AWS CloudWatch logs to trigger backup processing when lambdas fail.

Goto CloudWatch Log groups and select "Logs groups" and "Subscription filters." There are two filters we will need:

  • Filter for when memory exceeds allocation

    • Pattern: "Error: Runtime exited with error: signal: killed"

  • Filter for when processing time exceeds allocation

    • Pattern: "Task timed out"

The Destination ARN will point to arn:aws:lambda:us-west-2:118234403147:function:delete-rudy-testing-cloudwatch, which is can be whatever you want. In this example I set it up to send sns message which emails me.

[1] Error Generating Lambda

Runtime: Python 3.9. Handler: lambda_function.lambda_handler. Architecture: arm64.

Test JSON:

{
  "input_bucket": "noaa-wcsd-pds",
  "input_key": "data/raw/Henry_B._Bigelow/HB20ORT/EK60/D20200225-T163738.raw",
  "output_bucket": "noaa-wcsd-pds-index",
  "output_key": "data/raw/Henry_B._Bigelow/HB20ORT/EK60/D20200225-T163738.zarr"
}

Code:

import logging
import os
import time
import random


logging.basicConfig(level=logging.DEBUG)
logger=logging.getLogger(__name__)

def test_out_of_time():
    random_number = random.randint(2, 5)
    logger.info(f"Testing out of time, random_number is: {random_number}")
    time.sleep(random_number)

def test_out_of_memory():
    s = []
    logger.info(f"Testing out of memory")
    for i in range(1000):
       for j in range(1000):
           for k in range(1000):
               s.append("More")

def lambda_handler(event: dict, context: dict) -> dict:
    print(f"Lambda function ARN: {context.invoked_function_arn}")
    print(f"CloudWatch log stream name: {context.log_stream_name}")
    print(f"CloudWatch log group name: {context.log_group_name}")
    print(f"Lambda Request ID: {context.aws_request_id}")
    print(f"Lambda function memory limits in MB: {context.memory_limit_in_mb}")
    print(f"Lambda time remaining in MS: {context.get_remaining_time_in_millis()}")
    #
    logger.setLevel(logging.DEBUG)
    logger.info(f"_HANDLER: {os.environ['_HANDLER']}")
    logger.info(f"AWS_EXECUTION_ENV: {os.environ['AWS_EXECUTION_ENV']}")
    AWS_LAMBDA_FUNCTION_MEMORY_SIZE = os.environ['AWS_LAMBDA_FUNCTION_MEMORY_SIZE'];
    print(f"AWS_LAMBDA_FUNCTION_MEMORY_SIZE: {AWS_LAMBDA_FUNCTION_MEMORY_SIZE}")
    #
    input_bucket = event["input_bucket"]  # 'noaa-wcsd-pds'
    print(f"input bucket: {input_bucket}")
    logger.info(f"input bucket: {input_bucket}")
    input_key = event["input_key"]  # 'data/raw/Henry_B._Bigelow/HB20ORT/EK60/D20200225-T163738.raw'
    print(f"input key: {input_key}")
    logger.info(f"input key: {input_key}")
    output_bucket = event["output_bucket"]  # 'noaa-wcsd-zarr-pds'
    print(f"output bucket: {output_bucket}")
    output_key = event["output_key"]
    print(f"output key: {output_key}")  # data/raw/Henry_B._Bigelow/HB20ORT/EK60/D20200225-T163738.zarr
    #
    test_out_of_time();
    #test_out_of_memory();
    #
    logger.info("This is a sample INFO message.. !!")
    logger.debug("This is a sample DEBUG message.. !!")
    logger.error("This is a sample ERROR message.... !!")
    logger.critical("This is a sample 6xx error message.. !!")

The code can be configured to run out of time or memory by what is commented.

The logging will send logs to CloudWatch's "/aws/lambda/delete-rudy-error-generating-lambda" log group.

[2] Testing Cloudwatch

This step will monitor the stream of CloudWatch Logs and determine when a specific string is found, in the case we are looking for "Task timed out" and "Error: Runtime exited with error: signal: killed."

import base64
import boto3
from botocore.config import Config
import gzip
import json
import logging
import os
import time, math

from botocore.exceptions import ClientError
#config = Config(region_name='us-west-2', signature_version='v4')
config = Config(signature_version='v4')
#session = boto3.Session(region_name="us-west-2")
session = boto3.Session()
log_client = session.client('logs', config=config)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def logpayload(event):
    logger.setLevel(logging.DEBUG)
    logger.debug(event['awslogs']['data'])
    compressed_payload = base64.b64decode(event['awslogs']['data'])
    uncompressed_payload = gzip.decompress(compressed_payload)
    log_payload = json.loads(uncompressed_payload)
    return log_payload


def error_details(payload):
    error_msg = ""
    log_events = payload['logEvents']
    logger.debug(payload)
    loggroup = payload['logGroup']
    logstream = payload['logStream']
    lambda_func_name = loggroup.split('/')
    logger.debug(f'LogGroup: {loggroup}')
    logger.debug(f'Logstream: {logstream}')
    logger.debug(f'Function name: {lambda_func_name[3]}')
    logger.debug(log_events)
    for log_event in log_events:
        error_msg += log_event['message']
    logger.debug('Message: %s' % error_msg.split("\n")[0])
    # Message: ['2023-03-30T18:33:45.455Z 90403de2-c856-4ab8-9a6f-35be1a7ba5f9 Task timed out after 3.00 seconds', '', '']
    print('-'*10)
    requestId = error_msg.split("\n")[0].split(' ')[1] # "e63712f5-d74c-4975-94de-d5292a96a610"
    logger.debug(f"requestId: {requestId}")
    logger.debug(f"requestId: {type(requestId)}")
    #requestId = "47791332-a8c6-4347-9bea-16570606bc27"
    #logger.debug(f"requestId: {requestId}")
    #logger.debug(f"requestId: {type(requestId)}")
    failed_key = ""
    try:
        time.sleep(60) # Note: CloudWatch Insights needs time to generate the initial log
        endTime = math.floor(time.time()) + 30
        startTime = endTime - (60 * 60 * 48) # N hours into the past
        response = log_client.start_query(
            logGroupName="/aws/lambda/delete-rudy-error-generating-lambda",
            startTime=startTime,
            endTime=endTime,
            queryString=f"fields @message | filter @requestId like '{requestId}'",
            limit=100
        )
        while True:
            time.sleep(10) # results['status']: Running, Complete
            results = log_client.get_query_results(queryId=response['queryId'])
            logger.debug(f'results temp: {results}')
            if results['status'] == 'Complete':
                break
        searchString = "input key: "
        logger.debug(f"results final: {results['results']}")
        if len(results['results']) > 0:
            for i in results['results']:
                message, _ = i
                if message['value'].find(searchString) > 0:
                    failed_key = message['value'].split('\t')[-1].strip().split(' ')[-1]
                    print(failed_key) # failed key
        else:
            logger.debug("No results returned.")
    except Exception as err:
        logger.error(f"Exception encountered: {err}")
    print('-'*10)
    logger.debug("AAA.")
    return loggroup, logstream, error_msg, lambda_func_name, failed_key


def publish_message(loggroup, logstream, error_msg, lambda_func_name, failed_key):
    logger.debug("CCC.")
    sns_arn = os.environ['SNS_ARN']  # Getting the SNS Topic ARN passed in by the environment variables.
    snsclient = boto3.client('sns')
    try:
        message = ""
        message += "\nLambda Error Summary456" + "\n"
        message += "##########################################################\n"
        message += "# LogGroup Name:- " + str(loggroup) + "\n"
        message += "# LogStream:- " + str(logstream) + "\n"
        message += "# Log Message:- " + "\n"
        message += "# \t\t" + str(error_msg.split("\n")[0]) + "\n"
        message += "# \t\t" + f"failed key: {failed_key}" + "\n"
        message += "##########################################################\n"

        # Sending the notification...
        snsclient.publish(
            TargetArn=sns_arn,
            Subject=f'Execution error for Lambda - {lambda_func_name[3]}',
            Message=message
        )
    except ClientError as e:
        logger.error("An error occured: %s" % e)


def lambda_handler(event, context):
    print("Lambda function ARN:", context.invoked_function_arn)
    print("CloudWatch log stream name:", context.log_stream_name)
    print("CloudWatch log group name:",  context.log_group_name)
    print("Lambda Request ID:", context.aws_request_id)
    print("Lambda function memory limits in MB:", context.memory_limit_in_mb)
    print("Lambda time remaining in MS:", context.get_remaining_time_in_millis())
    #
    pload = logpayload(event)
    lgroup, lstream, errmessage, lambdaname, failed_key = error_details(pload)
    logger.debug("BBB.")
    publish_message(lgroup, lstream, errmessage, lambdaname, failed_key)ho

Add environment variable: SNS_ARN arn:aws:sns:us-west-2:118234403147:delete-rudy-testing-topic.

Order of events during processing

The process goes as follows. [1] The error-generating-lambda gets invoked. The lambda times out and among the logs returns a message:

Response { "errorMessage": "2023-04-13T22:07:39.311Z 2e6d33d8-af65-4822-b974-782f2de5950a Task timed out after 3.01 seconds" }

There isn't enough information with this message alone, we need the key. That message gets captured by CloudWatch logs. From there we can use "Log Insights" to scan using a filter pattern to look for keywords. Now we can search with the log insights for the string "2e6d33d8-af65-4822-b974-782f2de5950a". Specifing /aws/lambda/error-generating-lambda with:

fields @message | filter @requestId like '2e6d33d8-af65-4822-b974-782f2de5950a'

You can run the query. It will return ~10 messages including the message regarding the task timing out. One message will specify the input key as:

input key: data/raw/Henry_B._Bigelow/HB20ORT/EK60/D20200225-T163739.raw

We can search for this message and extract the input_key for further processing.

Note: biggest problem is timing. The logs are created immediately but for some reason the log insights don't return reliable results until [sometimes] 5 minutes after the lambda was run.

Last updated

Was this helpful?