Serverless Email Service

Alejandro Cora González
8 min readFeb 20, 2021

Often you can face the requirement to implement a generic approach to send emails from all your applications or micro-services. This post describes a possible Serverless Architecture that provides the mechanism to send emails through an API endpoint within Amazon Web Service.

Application Requirements

  • Serverless.
  • Fault-tolerance.
  • Support attachments.

Architecture

The use of AWS API Gateway as an entry point to a Lambda Function is probably a common pattern in serverless applications, using the Lambda Function to handles the logic and communicates with other resources within AWS or third-party services. Here, you are responsible for error handling and retry logic within your Lambda function code.

Storage First Pattern

But, what if we want to avoid the messages losses or if we realize that the capacities of the Lambda Service could be overpassed? In this scenario, don’t worry we can use: the Storage First Pattern. It uses native error handling with retry logic or dead-letter queues (DLQ) at the SQS layer before any code is executed.

Using directly integration between API Gateway and SQS, you can increase application reliability while reducing lines of code. This pattern will be used in our architecture.

Project Structure

serverless-email-service
├── __init__.py
├── lambda_send_to_ses
│ ├── __init__.py
│ ├── main.py
│ ├── requirements.txt
│ └── tests
│ ├── events.py
│ ├── __init__.py
│ └── tests_send_email.py
├── lambda_track_events
│ ├── __init__.py
│ ├── main.py
│ ├── requirements.txt
│ └── tests
│ ├── events.py
│ ├── __init__.py
│ └── tests_event_tracking.py
├── requirements.txt
└── template.yaml

Function for Emails Sending

The function will take the messages delivered from the SQS queue and will use AWS Simple Email Service (SES) to send the emails.

# -*- coding: utf-8 -*-

import json
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

import boto3

ses = boto3.client("ses")
s3 = boto3.client("s3")


def lambda_handler(event: dict, context):
print("Starting send email process...")
records = event.get("Records", [])
if not records:
print("No records found!")

for record in records:
data = record.get("body")
if data:
data = json.loads(data)
destination = data.get("Destination")
message = data.get("Message")
source = data.get("Source")

args = {
"Source": source,
"Destination": destination,
"ConfigurationSetName": data.get("ConfigurationSetName"),
"Message": message
}

# Because these parameters can not be empty...
not_none_parameters = [
"ReturnPath", "SourceArn",
"ReturnPathArn", "ReplyToAddresses",
"ConfigurationSetName", "Tags"
]

for id_ in not_none_parameters:
if data.get(id_):
args[id_] = data[id_]

contains_attachment = data.get("Attachments", [])
if contains_attachment:
raw_message = MIMEMultipart()
raw_message['Subject'] = message["Subject"]["Data"]
raw_message['From'] = source

destinations = destination.get("ToAddresses") + \
destination.get("CcAddresses") + \
destination.get("BccAddresses")

raw_message["To"] = ', '.join(destinations)
part = MIMEText(message["Body"]["Html"]["Data"], 'html')
raw_message.attach(part)

for attachment in data["Attachments"]:
file_ = s3.get_object(
Bucket=attachment.get("Bucket"),
Key=attachment.get("Key"))

file_data = file_["Body"].read()
part = MIMEApplication(file_data)
part.add_header(
"Content-Disposition",
"attachment",
filename=f"{attachment['Name']}.{attachment['Ext']}"
)

raw_message.attach(part)

args.update({
"Destinations": destinations,
"RawMessage": {
"Data": raw_message.as_string()
}
})

del args["Message"]
del args["Destination"]

try:
method = getattr(ses, "send_raw_email" if contains_attachment else "send_email")
response = method(**args)
print(f"Message: {response.get('MessageId')} successfully sent!")

except Exception as error:
print(error)

return {
"statusCode": 200,
"body": json.dumps({
"message": f"Were sent {len(records)} emails!"
})
}

Function for events processing

We want to track the events that SES sends related to the emails we are sending. Why? Because you probably want to know if the email was delivery, was opened by the client or if the link you sent was clicked.

# -*- coding: utf-8 -*-

import json
import os

import boto3

dynamo = boto3.resource('dynamodb')
dynamo_table = dynamo.Table(os.environ.get("DYNAMO_TABLE"))


def lambda_handler(event, context):
""" Receive Events notifications from SES """

for record in event.get("Records", []):
body = record.get("body", "{}")
if body:
try:
body = json.loads(body)
except json.JSONDecodeError:
# SNS could sent messages like below:
# Successfully validated SNS topic for Amazon SES event publishing.
return

message_id = body["mail"]["messageId"]
event_type = body["eventType"]

function = event_mapper.get(event_type)
if function:
data = function(body) or {}
data.update({"message_id": message_id, "event_type": event_type})
dynamo_table.put_item(Item=data)

return {
"statusCode": 200,
"body": json.dumps({
"message": "Events updated!"
})
}


def _send(message_data: dict):
mail_data = message_data["mail"]
return {
"timestamp": mail_data["timestamp"],
"destination": mail_data["destination"]
}


def _delivery(message_data: dict):
event_data = message_data["delivery"]
return {
"timestamp": event_data["timestamp"],
"processing_time_millis": event_data["processingTimeMillis"],
"recipients": event_data["recipients"],
"smtp_response": event_data["smtpResponse"],
"reporting_mta": event_data["reportingMTA"]
}


def _open(message_data: dict):
event_data = message_data["open"]
return {
"timestamp": event_data["timestamp"],
"userAgent": event_data["userAgent"],
"ipAddress": event_data["ipAddress"]
}


def _click(message_data: dict):
event_data = message_data["click"]
return {
"ip_address": event_data["ipAddress"],
"timestamp": event_data["timestamp"],
"link": event_data["link"],
"link_tags": event_data["linkTags"],
"user_agent": event_data["userAgent"]
}


def _reject(message_data: dict):
event_data = message_data["reject"]
return {
"reason": event_data["reason"]
}


def _bounce(message_data: dict):
event_data = message_data["bounce"]
return {
"bounce_type": event_data["bounceType"],
"bounce_subtype": event_data["bounceSubType"],
"bounced_recipients": event_data["bouncedRecipients"]
}


def _complaint(message_data: dict):
event_data = message_data["complaint"]
return {
"complained_recipients": event_data["complainedRecipients"],
"timestamp": event_data["timestamp"],
"feedback_id": event_data["feedbackId"],
"user_agent": event_data["userAgent"],
"complaint_feedback_type": event_data["complaintFeedbackType"],
"arrival_date": event_data["arrivalDate"]
}


def _rendering_failure(message_data: dict):
event_data = message_data["failure"]
return {
"error_message": event_data["errorMessage"],
"template_name": event_data["templateName"]
}


def _delivery_delay(message_data: dict):
event_data = message_data["deliveryDelay"]
return {
"timestamp": event_data["timestamp"],
"delay_type": event_data["delayType"],
"expiration_time": event_data["expirationTime"],
"delayed_recipients": event_data["delayedRecipients"]
}


event_mapper = {
"Bounce": _bounce,
"Complaint": _complaint,
"Delivery": _delivery,
"Send": _send,
"Reject": _reject,
"Open": _open,
"Click": _click,
"Rendering Failure": _rendering_failure,
"DeliveryDelay": _delivery_delay,
}

SAM Template

The Amazon Web Service Serverless Application Model (SAM) is an open-source framework created with the purpose of building serverless applications on AWS, which provides shorthand syntax to express functions, APIs, databases, and event source mappings. Writing a few lines per resource, you can define the application you want and model it using YAML or JSON.

Below, you can find the SAM template which defines the resources needed to fulfil the architecture defined in the diagram.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Description: >
Serverless Email Service | Architecture for email sending within AWS...

Resources:
APIGateway:
Type: AWS::ApiGatewayV2::Api
Properties:
Name: Serverless Email Service API Gateway
Description: Endpoint to send emails...
ProtocolType: HTTP

APIGatewayStage:
Type: AWS::ApiGatewayV2::Stage
Properties:
ApiId: !Ref APIGateway
AutoDeploy: true
StageName: prod
AccessLogSettings:
DestinationArn: !GetAtt APIGatewayLogGroup.Arn
Format: >-
{"requestId":"$context.requestId", "ip": "$context.identity.sourceIp",
"caller":"$context.identity.caller",
"user":"$context.identity.user","requestTime":"$context.requestTime",
"routeKey":"$context.routeKey","status":"$context.status",
"error":"$context.integrationErrorMessage"}

SendEmailRoute:
Type: AWS::ApiGatewayV2::Route
Properties:
ApiId: !Ref APIGateway
RouteKey: 'POST /send_email'
Target: !Join
- /
- - integrations
- !Ref RouteIntegration

RouteIntegration:
Type: AWS::ApiGatewayV2::Integration
Properties:
ApiId: !Ref APIGateway
Description: API Gateway integration with SQS
CredentialsArn: !GetAtt ServerlessEmailServiceRole.Arn

IntegrationType: AWS_PROXY
IntegrationSubtype: SQS-SendMessage
PayloadFormatVersion: 1.0

RequestParameters:
QueueUrl: !Ref EmailServiceSQS
MessageBody: $request.body

APIGatewayLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: ServerlessEmailService-API-Gateway
RetentionInDays: 1

EmailServiceSQS:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub ${AWS::StackName}-sqs

LambdaFunctionSendToSES:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub ${AWS::StackName}-SendToSES
Description: >
Function to retrieve data from SQS and use SES to
send the email...

CodeUri: lambda_send_to_ses/
Handler: main.lambda_handler
Role: !GetAtt ServerlessEmailServiceRole.Arn
Runtime: python3.8

Events:
EmailServiceSQSEvent:
Type: SQS
Properties:
Queue: !GetAtt EmailServiceSQS.Arn
BatchSize: 1
Enabled: true

SNSEmailEventsTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: email_events_topic
DisplayName: email_events_topic
FifoTopic: false

SESConfigurationSet:
Type: AWS::SES::ConfigurationSet
Properties:
Name: EmailServiceConfigurationSet

EmailServiceEventsSQS:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub ${AWS::StackName}-events_sqs

EmailServiceEventsSQSPolicy:
Type: AWS::SQS::QueuePolicy
Properties:
Queues:
- !Ref EmailServiceEventsSQS
PolicyDocument:
Statement:
Effect: Allow
Principal: "*"
Action: "sqs:*"
Resource: "*"
Condition:
ArnEquals:
"aws:SourceArn": !Ref SNSEmailEventsTopic

EventsSqsSubscription:
Type: AWS::SNS::Subscription
Properties:
TopicArn: !Ref SNSEmailEventsTopic
Endpoint: !GetAtt EmailServiceEventsSQS.Arn
RawMessageDelivery: true
Protocol: sqs

DynamoDBTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: email_service_events_tracker
AttributeDefinitions:
- AttributeName: message_id
AttributeType: S
- AttributeName: event_type
AttributeType: S

KeySchema:
- AttributeName: message_id
KeyType: HASH
- AttributeName: event_type
KeyType: RANGE

ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1

TimeToLiveSpecification:
AttributeName: ttl
Enabled: true

LambdaFunctionTrackEvents:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub ${AWS::StackName}-TrackEvents
Description: >
Function to track received events from SES
related with the emails sent...

CodeUri: lambda_track_events/
Handler: main.lambda_handler
Role: !GetAtt ServerlessEmailServiceRole.Arn
Runtime: python3.8

Events:
EmailServiceSQSEvent:
Type: SQS
Properties:
Queue: !GetAtt EmailServiceEventsSQS.Arn
BatchSize: 1
Enabled: true

Environment:
Variables:
ConfigurationSetName: !Ref SESConfigurationSet
DYNAMO_TABLE: !Select [1, !Split ['/', !GetAtt DynamoDBTable.Arn]]

EmailServiceS3Bucket:
Type: AWS::S3::Bucket
Properties:
BucketName: email-service-bucket

ServerlessEmailServiceRole:
Type: AWS::IAM::Role
Properties:
RoleName: ServerlessEmailServiceRole
Description: Role for all resources into the Serverless Email Service project...

AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- apigateway.amazonaws.com
- lambda.amazonaws.com
- sqs.amazonaws.com
Action:
- sts:AssumeRole

Policies:
- PolicyName: SendToSESLambdaPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-SendToSES
Action:
- lambda:*

- PolicyName: TrackEventsLambdaPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-TrackEvents
Action:
- lambda:*

- PolicyName: EmailSQSPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: !GetAtt EmailServiceSQS.Arn
Action:
- sqs:*

- PolicyName: EventsSQSPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: !GetAtt EmailServiceEventsSQS.Arn
Action:
- sqs:*

- PolicyName: CloudWatchPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: "*"
Action:
- logs:*

- PolicyName: SESPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: "*"
Action:
- ses:*

- PolicyName: SNSPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: !Ref SNSEmailEventsTopic
Action:
- sns:*

- PolicyName: DynamoPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: !GetAtt DynamoDBTable.Arn
Action:
- dynamodb:*

- PolicyName: S3Policy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: "*"
Action:
- s3:*

Tags:
- Key: "project"
Value: "email_service"
- Key: "environment"
Value: "prod"

Deploying the infrastructure

Install SAM CLI

AWS provides the right tool to easily create, deploy and manage your serverless applications defined in a SAM template You need to install and configure a few things in order to use the AWS SAM CLI.

Getting and deploying the application

Before you deploy your application you need to create an environment variable to define the name of your stack.

export AWS_SAM_STACK_NAME=serverless-email-service

Here we are supposing you are taking the application from the repository instead of creating it for your own.

$ git clone https://github.com/alekglez/serverless-email-service.git
$ cd serverless-email-service
$ virtualenv --python=python3.8 .venv
$ source .venv/bin/activate
$ pip install -r email_service/requirements.txt
$ cd email_service
$ sam build
$ sam deploy --stack-name serverless-email-service --s3-bucket serverless-email-service --capabilities CAPABILITY_NAMED_IAM

After deployment

Currently, you can’t specify an Amazon SNS event destination inside a
CloudFormation templates, that’s why you need to do it manually, then you need to go to https://console.aws.amazon.com/ses/home?region=us-east-1#edit-configuration-set:EmailServiceConfigurationSet and create de SNS destination.

Important: Amazon SES requires that you verify your identities (the domains or email addresses that you send email from) to confirm that you own them, and to prevent unauthorized use.

Testing the service

To test the service using attachments upload a file to the bucket, below an example of how could you do it.

$ aws s3 cp /home/alejandro/Downloads/ticket.pdf s3://email-service-bucket

The server URL you can find it in your CloudFormation Stack.

With an attachment

curl --location --request POST 'https://server-url/prod/send_email' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
"Destination": {
"BccAddresses": [
],
"CcAddresses": [

],
"ToAddresses": [
"alek.cora.glez@gmail.com"
]
},
"Message": {
"Body": {
"Html": {
"Charset": "UTF-8",
"Data": "Test. https://as.com"
},
"Text": {
"Charset": "UTF-8",
"Data": "Test"
}
},
"Subject": {
"Charset": "UTF-8",
"Data": "Test email"
}
},
"Source": "alek.cora.glez@gmail.com",
"ConfigurationSetName": "EmailServiceConfigurationSet",
"Attachments": [
{
"Bucket": "email-service-bucket",
"Key": "ticket.pdf",
"Name": "ticket",
"Ext": "pdf"
},
{
"Bucket": "email-service-bucket",
"Key": "Aporte.pdf",
"Name": "aporte",
"Ext": "pdf"
}
]
}'

Without attachment

curl --location --request POST 'https://server-url/prod/send_email' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
"Destination": {
"BccAddresses": [
],
"CcAddresses": [

],
"ToAddresses": [
"alek.cora.glez@gmail.com"
]
},
"Message": {
"Body": {
"Html": {
"Charset": "UTF-8",
"Data": "Test. https://as.com"
},
"Text": {
"Charset": "UTF-8",
"Data": "Test. https://as.com"
}
},
"Subject": {
"Charset": "UTF-8",
"Data": "Test email"
}
},
"Source": "alek.cora.glez@gmail.com",
"ConfigurationSetName": "EmailServiceConfigurationSet"
}'
Events Tracked in DynamoDB

To-Do List

Is important you SHOULD not deploy this application in production without check the role’s accesses/permissions to ensure the Principle of Least Privilege (PoLP) and without creating the mechanism to protect your API endpoint from external or unauthorized access.

Well, that’s all for today. I’m hoping this project may be useful for some of you…

Code repository: https://github.com/alekglez/serverless-email-service.git

--

--

Alejandro Cora González

Solutions Architect | Python Developer | Serverless Advocate