Serverless Email Service
Often you can face the requirement to implement a generic approach to send emails from all your applications or micro-services. This post describes a possible Serverless Architecture that provides the mechanism to send emails through an API endpoint within Amazon Web Service.
Application Requirements
- Serverless.
- Fault-tolerance.
- Support attachments.
Architecture
The use of AWS API Gateway as an entry point to a Lambda Function is probably a common pattern in serverless applications, using the Lambda Function to handles the logic and communicates with other resources within AWS or third-party services. Here, you are responsible for error handling and retry logic within your Lambda function code.
Storage First Pattern
But, what if we want to avoid the messages losses or if we realize that the capacities of the Lambda Service could be overpassed? In this scenario, don’t worry we can use: the Storage First Pattern. It uses native error handling with retry logic or dead-letter queues (DLQ) at the SQS layer before any code is executed.
Using directly integration between API Gateway and SQS, you can increase application reliability while reducing lines of code. This pattern will be used in our architecture.
Project Structure
serverless-email-service
├── __init__.py
├── lambda_send_to_ses
│ ├── __init__.py
│ ├── main.py
│ ├── requirements.txt
│ └── tests
│ ├── events.py
│ ├── __init__.py
│ └── tests_send_email.py
├── lambda_track_events
│ ├── __init__.py
│ ├── main.py
│ ├── requirements.txt
│ └── tests
│ ├── events.py
│ ├── __init__.py
│ └── tests_event_tracking.py
├── requirements.txt
└── template.yaml
Function for Emails Sending
The function will take the messages delivered from the SQS queue and will use AWS Simple Email Service (SES) to send the emails.
# -*- coding: utf-8 -*-
import json
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import boto3
ses = boto3.client("ses")
s3 = boto3.client("s3")
def lambda_handler(event: dict, context):
print("Starting send email process...")
records = event.get("Records", [])
if not records:
print("No records found!")
for record in records:
data = record.get("body")
if data:
data = json.loads(data)
destination = data.get("Destination")
message = data.get("Message")
source = data.get("Source")
args = {
"Source": source,
"Destination": destination,
"ConfigurationSetName": data.get("ConfigurationSetName"),
"Message": message
}
# Because these parameters can not be empty...
not_none_parameters = [
"ReturnPath", "SourceArn",
"ReturnPathArn", "ReplyToAddresses",
"ConfigurationSetName", "Tags"
]
for id_ in not_none_parameters:
if data.get(id_):
args[id_] = data[id_]
contains_attachment = data.get("Attachments", [])
if contains_attachment:
raw_message = MIMEMultipart()
raw_message['Subject'] = message["Subject"]["Data"]
raw_message['From'] = source
destinations = destination.get("ToAddresses") + \
destination.get("CcAddresses") + \
destination.get("BccAddresses")
raw_message["To"] = ', '.join(destinations)
part = MIMEText(message["Body"]["Html"]["Data"], 'html')
raw_message.attach(part)
for attachment in data["Attachments"]:
file_ = s3.get_object(
Bucket=attachment.get("Bucket"),
Key=attachment.get("Key"))
file_data = file_["Body"].read()
part = MIMEApplication(file_data)
part.add_header(
"Content-Disposition",
"attachment",
filename=f"{attachment['Name']}.{attachment['Ext']}"
)
raw_message.attach(part)
args.update({
"Destinations": destinations,
"RawMessage": {
"Data": raw_message.as_string()
}
})
del args["Message"]
del args["Destination"]
try:
method = getattr(ses, "send_raw_email" if contains_attachment else "send_email")
response = method(**args)
print(f"Message: {response.get('MessageId')} successfully sent!")
except Exception as error:
print(error)
return {
"statusCode": 200,
"body": json.dumps({
"message": f"Were sent {len(records)} emails!"
})
}
Function for events processing
We want to track the events that SES sends related to the emails we are sending. Why? Because you probably want to know if the email was delivery, was opened by the client or if the link you sent was clicked.
# -*- coding: utf-8 -*-
import json
import os
import boto3
dynamo = boto3.resource('dynamodb')
dynamo_table = dynamo.Table(os.environ.get("DYNAMO_TABLE"))
def lambda_handler(event, context):
""" Receive Events notifications from SES """
for record in event.get("Records", []):
body = record.get("body", "{}")
if body:
try:
body = json.loads(body)
except json.JSONDecodeError:
# SNS could sent messages like below:
# Successfully validated SNS topic for Amazon SES event publishing.
return
message_id = body["mail"]["messageId"]
event_type = body["eventType"]
function = event_mapper.get(event_type)
if function:
data = function(body) or {}
data.update({"message_id": message_id, "event_type": event_type})
dynamo_table.put_item(Item=data)
return {
"statusCode": 200,
"body": json.dumps({
"message": "Events updated!"
})
}
def _send(message_data: dict):
mail_data = message_data["mail"]
return {
"timestamp": mail_data["timestamp"],
"destination": mail_data["destination"]
}
def _delivery(message_data: dict):
event_data = message_data["delivery"]
return {
"timestamp": event_data["timestamp"],
"processing_time_millis": event_data["processingTimeMillis"],
"recipients": event_data["recipients"],
"smtp_response": event_data["smtpResponse"],
"reporting_mta": event_data["reportingMTA"]
}
def _open(message_data: dict):
event_data = message_data["open"]
return {
"timestamp": event_data["timestamp"],
"userAgent": event_data["userAgent"],
"ipAddress": event_data["ipAddress"]
}
def _click(message_data: dict):
event_data = message_data["click"]
return {
"ip_address": event_data["ipAddress"],
"timestamp": event_data["timestamp"],
"link": event_data["link"],
"link_tags": event_data["linkTags"],
"user_agent": event_data["userAgent"]
}
def _reject(message_data: dict):
event_data = message_data["reject"]
return {
"reason": event_data["reason"]
}
def _bounce(message_data: dict):
event_data = message_data["bounce"]
return {
"bounce_type": event_data["bounceType"],
"bounce_subtype": event_data["bounceSubType"],
"bounced_recipients": event_data["bouncedRecipients"]
}
def _complaint(message_data: dict):
event_data = message_data["complaint"]
return {
"complained_recipients": event_data["complainedRecipients"],
"timestamp": event_data["timestamp"],
"feedback_id": event_data["feedbackId"],
"user_agent": event_data["userAgent"],
"complaint_feedback_type": event_data["complaintFeedbackType"],
"arrival_date": event_data["arrivalDate"]
}
def _rendering_failure(message_data: dict):
event_data = message_data["failure"]
return {
"error_message": event_data["errorMessage"],
"template_name": event_data["templateName"]
}
def _delivery_delay(message_data: dict):
event_data = message_data["deliveryDelay"]
return {
"timestamp": event_data["timestamp"],
"delay_type": event_data["delayType"],
"expiration_time": event_data["expirationTime"],
"delayed_recipients": event_data["delayedRecipients"]
}
event_mapper = {
"Bounce": _bounce,
"Complaint": _complaint,
"Delivery": _delivery,
"Send": _send,
"Reject": _reject,
"Open": _open,
"Click": _click,
"Rendering Failure": _rendering_failure,
"DeliveryDelay": _delivery_delay,
}
SAM Template
The Amazon Web Service Serverless Application Model (SAM) is an open-source framework created with the purpose of building serverless applications on AWS, which provides shorthand syntax to express functions, APIs, databases, and event source mappings. Writing a few lines per resource, you can define the application you want and model it using YAML or JSON.
Below, you can find the SAM template which defines the resources needed to fulfil the architecture defined in the diagram.
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
Serverless Email Service | Architecture for email sending within AWS...
Resources:
APIGateway:
Type: AWS::ApiGatewayV2::Api
Properties:
Name: Serverless Email Service API Gateway
Description: Endpoint to send emails...
ProtocolType: HTTP
APIGatewayStage:
Type: AWS::ApiGatewayV2::Stage
Properties:
ApiId: !Ref APIGateway
AutoDeploy: true
StageName: prod
AccessLogSettings:
DestinationArn: !GetAtt APIGatewayLogGroup.Arn
Format: >-
{"requestId":"$context.requestId", "ip": "$context.identity.sourceIp",
"caller":"$context.identity.caller",
"user":"$context.identity.user","requestTime":"$context.requestTime",
"routeKey":"$context.routeKey","status":"$context.status",
"error":"$context.integrationErrorMessage"}
SendEmailRoute:
Type: AWS::ApiGatewayV2::Route
Properties:
ApiId: !Ref APIGateway
RouteKey: 'POST /send_email'
Target: !Join
- /
- - integrations
- !Ref RouteIntegration
RouteIntegration:
Type: AWS::ApiGatewayV2::Integration
Properties:
ApiId: !Ref APIGateway
Description: API Gateway integration with SQS
CredentialsArn: !GetAtt ServerlessEmailServiceRole.Arn
IntegrationType: AWS_PROXY
IntegrationSubtype: SQS-SendMessage
PayloadFormatVersion: 1.0
RequestParameters:
QueueUrl: !Ref EmailServiceSQS
MessageBody: $request.body
APIGatewayLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: ServerlessEmailService-API-Gateway
RetentionInDays: 1
EmailServiceSQS:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub ${AWS::StackName}-sqs
LambdaFunctionSendToSES:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub ${AWS::StackName}-SendToSES
Description: >
Function to retrieve data from SQS and use SES to
send the email...
CodeUri: lambda_send_to_ses/
Handler: main.lambda_handler
Role: !GetAtt ServerlessEmailServiceRole.Arn
Runtime: python3.8
Events:
EmailServiceSQSEvent:
Type: SQS
Properties:
Queue: !GetAtt EmailServiceSQS.Arn
BatchSize: 1
Enabled: true
SNSEmailEventsTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: email_events_topic
DisplayName: email_events_topic
FifoTopic: false
SESConfigurationSet:
Type: AWS::SES::ConfigurationSet
Properties:
Name: EmailServiceConfigurationSet
EmailServiceEventsSQS:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub ${AWS::StackName}-events_sqs
EmailServiceEventsSQSPolicy:
Type: AWS::SQS::QueuePolicy
Properties:
Queues:
- !Ref EmailServiceEventsSQS
PolicyDocument:
Statement:
Effect: Allow
Principal: "*"
Action: "sqs:*"
Resource: "*"
Condition:
ArnEquals:
"aws:SourceArn": !Ref SNSEmailEventsTopic
EventsSqsSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref SNSEmailEventsTopic
Endpoint: !GetAtt EmailServiceEventsSQS.Arn
RawMessageDelivery: true
Protocol: sqs
DynamoDBTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: email_service_events_tracker
AttributeDefinitions:
- AttributeName: message_id
AttributeType: S
- AttributeName: event_type
AttributeType: S
KeySchema:
- AttributeName: message_id
KeyType: HASH
- AttributeName: event_type
KeyType: RANGE
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
LambdaFunctionTrackEvents:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub ${AWS::StackName}-TrackEvents
Description: >
Function to track received events from SES
related with the emails sent...
CodeUri: lambda_track_events/
Handler: main.lambda_handler
Role: !GetAtt ServerlessEmailServiceRole.Arn
Runtime: python3.8
Events:
EmailServiceSQSEvent:
Type: SQS
Properties:
Queue: !GetAtt EmailServiceEventsSQS.Arn
BatchSize: 1
Enabled: true
Environment:
Variables:
ConfigurationSetName: !Ref SESConfigurationSet
DYNAMO_TABLE: !Select [1, !Split ['/', !GetAtt DynamoDBTable.Arn]]
EmailServiceS3Bucket:
Type: AWS::S3::Bucket
Properties:
BucketName: email-service-bucket
ServerlessEmailServiceRole:
Type: AWS::IAM::Role
Properties:
RoleName: ServerlessEmailServiceRole
Description: Role for all resources into the Serverless Email Service project...
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- apigateway.amazonaws.com
- lambda.amazonaws.com
- sqs.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: SendToSESLambdaPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-SendToSES
Action:
- lambda:*
- PolicyName: TrackEventsLambdaPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-TrackEvents
Action:
- lambda:*
- PolicyName: EmailSQSPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: !GetAtt EmailServiceSQS.Arn
Action:
- sqs:*
- PolicyName: EventsSQSPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: !GetAtt EmailServiceEventsSQS.Arn
Action:
- sqs:*
- PolicyName: CloudWatchPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: "*"
Action:
- logs:*
- PolicyName: SESPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: "*"
Action:
- ses:*
- PolicyName: SNSPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: !Ref SNSEmailEventsTopic
Action:
- sns:*
- PolicyName: DynamoPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: !GetAtt DynamoDBTable.Arn
Action:
- dynamodb:*
- PolicyName: S3Policy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: "*"
Action:
- s3:*
Tags:
- Key: "project"
Value: "email_service"
- Key: "environment"
Value: "prod"
Deploying the infrastructure
Install SAM CLI
AWS provides the right tool to easily create, deploy and manage your serverless applications defined in a SAM template You need to install and configure a few things in order to use the AWS SAM CLI.
Getting and deploying the application
Before you deploy your application you need to create an environment variable to define the name of your stack.
export AWS_SAM_STACK_NAME=serverless-email-service
Here we are supposing you are taking the application from the repository instead of creating it for your own.
$ git clone https://github.com/alekglez/serverless-email-service.git
$ cd serverless-email-service
$ virtualenv --python=python3.8 .venv
$ source .venv/bin/activate
$ pip install -r email_service/requirements.txt$ cd email_service
$ sam build
$ sam deploy --stack-name serverless-email-service --s3-bucket serverless-email-service --capabilities CAPABILITY_NAMED_IAM
After deployment
Currently, you can’t specify an Amazon SNS event destination inside a
CloudFormation templates, that’s why you need to do it manually, then you need to go to https://console.aws.amazon.com/ses/home?region=us-east-1#edit-configuration-set:EmailServiceConfigurationSet and create de SNS destination.
Important: Amazon SES requires that you verify your identities (the domains or email addresses that you send email from) to confirm that you own them, and to prevent unauthorized use.
Testing the service
To test the service using attachments upload a file to the bucket, below an example of how could you do it.
$ aws s3 cp /home/alejandro/Downloads/ticket.pdf s3://email-service-bucket
The server URL you can find it in your CloudFormation Stack.
With an attachment
curl --location --request POST 'https://server-url/prod/send_email' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
"Destination": {
"BccAddresses": [
],
"CcAddresses": [
],
"ToAddresses": [
"alek.cora.glez@gmail.com"
]
},
"Message": {
"Body": {
"Html": {
"Charset": "UTF-8",
"Data": "Test. https://as.com"
},
"Text": {
"Charset": "UTF-8",
"Data": "Test"
}
},
"Subject": {
"Charset": "UTF-8",
"Data": "Test email"
}
},
"Source": "alek.cora.glez@gmail.com",
"ConfigurationSetName": "EmailServiceConfigurationSet",
"Attachments": [
{
"Bucket": "email-service-bucket",
"Key": "ticket.pdf",
"Name": "ticket",
"Ext": "pdf"
},
{
"Bucket": "email-service-bucket",
"Key": "Aporte.pdf",
"Name": "aporte",
"Ext": "pdf"
}
]
}'
Without attachment
curl --location --request POST 'https://server-url/prod/send_email' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
"Destination": {
"BccAddresses": [
],
"CcAddresses": [
],
"ToAddresses": [
"alek.cora.glez@gmail.com"
]
},
"Message": {
"Body": {
"Html": {
"Charset": "UTF-8",
"Data": "Test. https://as.com"
},
"Text": {
"Charset": "UTF-8",
"Data": "Test. https://as.com"
}
},
"Subject": {
"Charset": "UTF-8",
"Data": "Test email"
}
},
"Source": "alek.cora.glez@gmail.com",
"ConfigurationSetName": "EmailServiceConfigurationSet"
}'
To-Do List
Is important you SHOULD not deploy this application in production without check the role’s accesses/permissions to ensure the Principle of Least Privilege (PoLP) and without creating the mechanism to protect your API endpoint from external or unauthorized access.
Well, that’s all for today. I’m hoping this project may be useful for some of you…
Code repository: https://github.com/alekglez/serverless-email-service.git