# CloudWatch Filter Pattern

Goto CloudWatch Log groups and select "Logs groups" and "Subscription filters." There are two filters we will need:

* Filter for when memory exceeds allocation
  * Pattern: "Error: Runtime exited with error: signal: killed"
* Filter for when processing time exceeds allocation
  * Pattern: "Task timed out"

The Destination ARN will point to arn:aws:lambda:us-west-2:118234403147:function:delete-rudy-testing-cloudwatch, which is can be whatever you want. In this example I set it up to send sns message which emails me.

## \[1] Error Generating Lambda

Runtime: Python 3.9. Handler: lambda\_function.lambda\_handler. Architecture: arm64.

Test JSON:

```
{
  "input_bucket": "noaa-wcsd-pds",
  "input_key": "data/raw/Henry_B._Bigelow/HB20ORT/EK60/D20200225-T163738.raw",
  "output_bucket": "noaa-wcsd-pds-index",
  "output_key": "data/raw/Henry_B._Bigelow/HB20ORT/EK60/D20200225-T163738.zarr"
}
```

Code:

```python
import logging
import os
import time
import random


logging.basicConfig(level=logging.DEBUG)
logger=logging.getLogger(__name__)

def test_out_of_time():
    random_number = random.randint(2, 5)
    logger.info(f"Testing out of time, random_number is: {random_number}")
    time.sleep(random_number)

def test_out_of_memory():
    s = []
    logger.info(f"Testing out of memory")
    for i in range(1000):
       for j in range(1000):
           for k in range(1000):
               s.append("More")

def lambda_handler(event: dict, context: dict) -> dict:
    print(f"Lambda function ARN: {context.invoked_function_arn}")
    print(f"CloudWatch log stream name: {context.log_stream_name}")
    print(f"CloudWatch log group name: {context.log_group_name}")
    print(f"Lambda Request ID: {context.aws_request_id}")
    print(f"Lambda function memory limits in MB: {context.memory_limit_in_mb}")
    print(f"Lambda time remaining in MS: {context.get_remaining_time_in_millis()}")
    #
    logger.setLevel(logging.DEBUG)
    logger.info(f"_HANDLER: {os.environ['_HANDLER']}")
    logger.info(f"AWS_EXECUTION_ENV: {os.environ['AWS_EXECUTION_ENV']}")
    AWS_LAMBDA_FUNCTION_MEMORY_SIZE = os.environ['AWS_LAMBDA_FUNCTION_MEMORY_SIZE'];
    print(f"AWS_LAMBDA_FUNCTION_MEMORY_SIZE: {AWS_LAMBDA_FUNCTION_MEMORY_SIZE}")
    #
    input_bucket = event["input_bucket"]  # 'noaa-wcsd-pds'
    print(f"input bucket: {input_bucket}")
    logger.info(f"input bucket: {input_bucket}")
    input_key = event["input_key"]  # 'data/raw/Henry_B._Bigelow/HB20ORT/EK60/D20200225-T163738.raw'
    print(f"input key: {input_key}")
    logger.info(f"input key: {input_key}")
    output_bucket = event["output_bucket"]  # 'noaa-wcsd-zarr-pds'
    print(f"output bucket: {output_bucket}")
    output_key = event["output_key"]
    print(f"output key: {output_key}")  # data/raw/Henry_B._Bigelow/HB20ORT/EK60/D20200225-T163738.zarr
    #
    test_out_of_time();
    #test_out_of_memory();
    #
    logger.info("This is a sample INFO message.. !!")
    logger.debug("This is a sample DEBUG message.. !!")
    logger.error("This is a sample ERROR message.... !!")
    logger.critical("This is a sample 6xx error message.. !!")
```

The code can be configured to run out of time or memory by what is commented.

The logging will send logs to CloudWatch's "/aws/lambda/delete-rudy-error-generating-lambda" log group.

## \[2] Testing Cloudwatch

This step will monitor the stream of CloudWatch Logs and determine when a specific string is found, in the case we are looking for "Task timed out" and "Error: Runtime exited with error: signal: killed."

```python
import base64
import boto3
from botocore.config import Config
import gzip
import json
import logging
import os
import time, math

from botocore.exceptions import ClientError
#config = Config(region_name='us-west-2', signature_version='v4')
config = Config(signature_version='v4')
#session = boto3.Session(region_name="us-west-2")
session = boto3.Session()
log_client = session.client('logs', config=config)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def logpayload(event):
    logger.setLevel(logging.DEBUG)
    logger.debug(event['awslogs']['data'])
    compressed_payload = base64.b64decode(event['awslogs']['data'])
    uncompressed_payload = gzip.decompress(compressed_payload)
    log_payload = json.loads(uncompressed_payload)
    return log_payload


def error_details(payload):
    error_msg = ""
    log_events = payload['logEvents']
    logger.debug(payload)
    loggroup = payload['logGroup']
    logstream = payload['logStream']
    lambda_func_name = loggroup.split('/')
    logger.debug(f'LogGroup: {loggroup}')
    logger.debug(f'Logstream: {logstream}')
    logger.debug(f'Function name: {lambda_func_name[3]}')
    logger.debug(log_events)
    for log_event in log_events:
        error_msg += log_event['message']
    logger.debug('Message: %s' % error_msg.split("\n")[0])
    # Message: ['2023-03-30T18:33:45.455Z 90403de2-c856-4ab8-9a6f-35be1a7ba5f9 Task timed out after 3.00 seconds', '', '']
    print('-'*10)
    requestId = error_msg.split("\n")[0].split(' ')[1] # "e63712f5-d74c-4975-94de-d5292a96a610"
    logger.debug(f"requestId: {requestId}")
    logger.debug(f"requestId: {type(requestId)}")
    #requestId = "47791332-a8c6-4347-9bea-16570606bc27"
    #logger.debug(f"requestId: {requestId}")
    #logger.debug(f"requestId: {type(requestId)}")
    failed_key = ""
    try:
        time.sleep(60) # Note: CloudWatch Insights needs time to generate the initial log
        endTime = math.floor(time.time()) + 30
        startTime = endTime - (60 * 60 * 48) # N hours into the past
        response = log_client.start_query(
            logGroupName="/aws/lambda/delete-rudy-error-generating-lambda",
            startTime=startTime,
            endTime=endTime,
            queryString=f"fields @message | filter @requestId like '{requestId}'",
            limit=100
        )
        while True:
            time.sleep(10) # results['status']: Running, Complete
            results = log_client.get_query_results(queryId=response['queryId'])
            logger.debug(f'results temp: {results}')
            if results['status'] == 'Complete':
                break
        searchString = "input key: "
        logger.debug(f"results final: {results['results']}")
        if len(results['results']) > 0:
            for i in results['results']:
                message, _ = i
                if message['value'].find(searchString) > 0:
                    failed_key = message['value'].split('\t')[-1].strip().split(' ')[-1]
                    print(failed_key) # failed key
        else:
            logger.debug("No results returned.")
    except Exception as err:
        logger.error(f"Exception encountered: {err}")
    print('-'*10)
    logger.debug("AAA.")
    return loggroup, logstream, error_msg, lambda_func_name, failed_key


def publish_message(loggroup, logstream, error_msg, lambda_func_name, failed_key):
    logger.debug("CCC.")
    sns_arn = os.environ['SNS_ARN']  # Getting the SNS Topic ARN passed in by the environment variables.
    snsclient = boto3.client('sns')
    try:
        message = ""
        message += "\nLambda Error Summary456" + "\n"
        message += "##########################################################\n"
        message += "# LogGroup Name:- " + str(loggroup) + "\n"
        message += "# LogStream:- " + str(logstream) + "\n"
        message += "# Log Message:- " + "\n"
        message += "# \t\t" + str(error_msg.split("\n")[0]) + "\n"
        message += "# \t\t" + f"failed key: {failed_key}" + "\n"
        message += "##########################################################\n"

        # Sending the notification...
        snsclient.publish(
            TargetArn=sns_arn,
            Subject=f'Execution error for Lambda - {lambda_func_name[3]}',
            Message=message
        )
    except ClientError as e:
        logger.error("An error occured: %s" % e)


def lambda_handler(event, context):
    print("Lambda function ARN:", context.invoked_function_arn)
    print("CloudWatch log stream name:", context.log_stream_name)
    print("CloudWatch log group name:",  context.log_group_name)
    print("Lambda Request ID:", context.aws_request_id)
    print("Lambda function memory limits in MB:", context.memory_limit_in_mb)
    print("Lambda time remaining in MS:", context.get_remaining_time_in_millis())
    #
    pload = logpayload(event)
    lgroup, lstream, errmessage, lambdaname, failed_key = error_details(pload)
    logger.debug("BBB.")
    publish_message(lgroup, lstream, errmessage, lambdaname, failed_key)ho
```

Add environment variable: SNS\_ARN arn:aws:sns:us-west-2:118234403147:delete-rudy-testing-topic.

## Order of events during processing

The process goes as follows. \[1] The error-generating-lambda gets invoked. The lambda times out and among the logs returns a message:

> Response { "errorMessage": "2023-04-13T22:07:39.311Z 2e6d33d8-af65-4822-b974-782f2de5950a Task timed out after 3.01 seconds" }

There isn't enough information with this message alone, we need the key. That message gets captured by CloudWatch logs. From there we can use "Log Insights" to scan using a filter pattern to look for keywords. Now we can search with the log insights for the string "2e6d33d8-af65-4822-b974-782f2de5950a". Specifing /aws/lambda/error-generating-lambda with:

> ```pschorrql
> fields @message | filter @requestId like '2e6d33d8-af65-4822-b974-782f2de5950a'
> ```

You can run the query. It will return \~10 messages including the message regarding the task timing out. One message will specify the input key as:

> input key: data/raw/Henry\_B.\_Bigelow/HB20ORT/EK60/D20200225-T163739.raw

We can search for this message and extract the input\_key for further processing.

**Note: biggest problem is timing. The logs are created immediately but for some reason the log insights don't return reliable results until \[sometimes] 5 minutes after the lambda was run.**
