Here we will write a Python script that will Monitor MySQL Error logs from AWS Cloudwatch and we’ll notify in case any error is detected.
import boto3 import gzip import re import smtplib import os from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart # AWS credentials aws_access_key_id = 'YOUR_ACCESS_KEY_ID' aws_secret_access_key = 'YOUR_SECRET_ACCESS_KEY' # CloudWatch log group and stream log_group_name = 'mysql' log_stream_name = 'error_log' # Regular expression for MySQL errors error_pattern = re.compile(r'^\d{6} \d{1,2}:\d{1,2}:\d{1,2}\s+\d+\s+\[(\w+)\]\s+(.*)$') # Email configuration sender_email = 'SENDER_EMAIL_ADDRESS' recipient_email = 'RECIPIENT_EMAIL_ADDRESS' smtp_server = 'SMTP_SERVER_ADDRESS' smtp_port = 587 smtp_username = 'SMTP_USERNAME' smtp_password = 'SMTP_PASSWORD' def download_logs(): client = boto3.client('logs', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key) response = client.get_log_events(logGroupName=log_group_name, logStreamName=log_stream_name) events = response['events'] while 'nextToken' in response: response = client.get_log_events(logGroupName=log_group_name, logStreamName=log_stream_name, nextToken=response['nextToken']) events.extend(response['events']) return [gzip.decompress(event['message'].encode('utf-8')).decode('utf-8') for event in events] def search_logs(): logs = download_logs() errors = [] for log in logs: match = error_pattern.match(log) if match: severity = match.group(1) message = match.group(2) if severity == 'ERROR': errors.append(message) return errors def send_email(subject, body): msg = MIMEMultipart() msg['From'] = sender_email msg['To'] = recipient_email msg['Subject'] = subject msg.attach(MIMEText(body)) server = smtplib.SMTP(smtp_server, smtp_port) server.starttls() server.login(smtp_username, smtp_password) server.sendmail(sender_email, recipient_email, msg.as_string()) server.quit() if __name__ == '__main__': errors = search_logs() if errors: subject = f'MySQL errors detected in {log_group_name}/{log_stream_name}' body = '\n\n'.join(errors) send_email(subject, body) print(f'Sent email with {len(errors)} error(s).') else: print('No errors found.')
How it works:
Here’s how the script works:
- The script downloads the MySQL error logs from the specified CloudWatch log group and stream.
- The logs are searched for any errors using a regular expression. If any errors are found, they are added to a list.
- If any errors were found, the script sends an email with the error messages. Otherwise, it prints a message indicating that no errors were found.
Make sure to replace the AWS credentials, log group and stream names, email addresses, SMTP server settings, and regular expression pattern with your own values.
You can schedule this script to run periodically using a cron job or other scheduling mechanism.