Monitor MySQL Error logs from AWS Cloudwatch

Here we will write a Python script that will Monitor MySQL Error logs from AWS Cloudwatch and we’ll notify in case any error is detected.

import boto3
import gzip
import re
import smtplib
import os
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# AWS credentials
aws_access_key_id = 'YOUR_ACCESS_KEY_ID'
aws_secret_access_key = 'YOUR_SECRET_ACCESS_KEY'

# CloudWatch log group and stream
log_group_name = 'mysql'
log_stream_name = 'error_log'

# Regular expression for MySQL errors
error_pattern = re.compile(r'^\d{6} \d{1,2}:\d{1,2}:\d{1,2}\s+\d+\s+\[(\w+)\]\s+(.*)$')

# Email configuration
sender_email = 'SENDER_EMAIL_ADDRESS'
recipient_email = 'RECIPIENT_EMAIL_ADDRESS'
smtp_server = 'SMTP_SERVER_ADDRESS'
smtp_port = 587
smtp_username = 'SMTP_USERNAME'
smtp_password = 'SMTP_PASSWORD'

def download_logs():
    client = boto3.client('logs', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
    response = client.get_log_events(logGroupName=log_group_name, logStreamName=log_stream_name)
    events = response['events']
    while 'nextToken' in response:
        response = client.get_log_events(logGroupName=log_group_name, logStreamName=log_stream_name, nextToken=response['nextToken'])
        events.extend(response['events'])
    return [gzip.decompress(event['message'].encode('utf-8')).decode('utf-8') for event in events]

def search_logs():
    logs = download_logs()
    errors = []
    for log in logs:
        match = error_pattern.match(log)
        if match:
            severity = match.group(1)
            message = match.group(2)
            if severity == 'ERROR':
                errors.append(message)
    return errors

def send_email(subject, body):
    msg = MIMEMultipart()
    msg['From'] = sender_email
    msg['To'] = recipient_email
    msg['Subject'] = subject
    msg.attach(MIMEText(body))
    server = smtplib.SMTP(smtp_server, smtp_port)
    server.starttls()
    server.login(smtp_username, smtp_password)
    server.sendmail(sender_email, recipient_email, msg.as_string())
    server.quit()

if __name__ == '__main__':
    errors = search_logs()
    if errors:
        subject = f'MySQL errors detected in {log_group_name}/{log_stream_name}'
        body = '\n\n'.join(errors)
        send_email(subject, body)
        print(f'Sent email with {len(errors)} error(s).')
    else:
        print('No errors found.')

How it works:

Here’s how the script works:

  1. The script downloads the MySQL error logs from the specified CloudWatch log group and stream.
  2. The logs are searched for any errors using a regular expression. If any errors are found, they are added to a list.
  3. If any errors were found, the script sends an email with the error messages. Otherwise, it prints a message indicating that no errors were found.

Make sure to replace the AWS credentials, log group and stream names, email addresses, SMTP server settings, and regular expression pattern with your own values.

You can schedule this script to run periodically using a cron job or other scheduling mechanism.

Leave a Comment