There are many use-cases where a serverless architecture has a lot of benefits and of course an API service is not the exception. In a previous post, we learned how to deploy a Flask application on EC2 using a Load Balancer and Auto Scaling Groups. Of course, you can think about deploying it into the AWS Elastic Container Service but…, what if you don’t want to manage instances or clusters?

The first piece of advice we can give you is that you should not architect differently for serverless because you should be able to deploy your service in different environments without code modifications…

Architecture

Project Structure

serverless-api-service
├── buildspec.yaml
├── Dockerfile
├── LICENSE
├── main.py
├── manage.py
├── project
│ ├── api
│ │ ├── configs
│ │ │ ├── __init__.py
│ │ │ └── service_mapper.py
│ │ ├── controllers
│ │ │ ├── __init__.py
│ │ │ ├── main.py
│ │ ├── __init__.py
│ │ ├── requirements.txt
│ │ ├── routers
│ │ │ ├── health.py
│ │ │ ├── __init__.py
│ │ │ ├── main.py
│ │ ├── services
│ │ │ ├── base.py
│ │ │ ├── dynamo_service.py
│ │ │ ├── __init__.py
│ │ │ ├── mock_service.py
│ │ └── tests
│ │ ├── base.py
│ │ ├── __init__.py
│ │ ├── test_generic_endpoints.py
│ │ └── test_health.py
│ ├── __init__.py
│ └── requirements.txt
├── README.md
├── requirements.txt
└── template.yml

FastAPI application

Routers

You need to specify the services (class name) you want to use to handle the requests and you need to configure a mapper where you must specify the services and the resources relation. Does it sound tricky right?

Let’s put an example: Suppose you will have a request like http://localhost:8000/api/mocks, and you want to retrieve data from a data structure implemented in memory, but for another resource like http://localhost:8000/api/users, you want to retrieve the information from the table users in DynamoDB. Let’s check the mapper needed here:

project/api/configs/service_mapper.py# -*- coding: utf-8 -*-

from project.api.services.dynamo_service import DynamoService
from project.api.services.mock_service import MockService


mapper = {
"mocks": MockService(),
"users": DynamoService()
}

As you can see, you should have implemented the services needed to respond to the resources requested. But…, why do it in this way? Well, probably you heard before about the SOLID principles, in this specific use-case, we are using the Liskov Substitution Principle (L) and the Dependency Inversion Principle (D).

Before we go to the services, let’s create the generic service endpoints. These functions get the requests and re-direct the application flow for the controller.

project/api/routers/main.py# -*- coding: utf-8 -*-

import json
import os
from http import HTTPStatus

from fastapi import APIRouter
from starlette.requests import Request
from starlette.responses import JSONResponse, Response

from starlette.status import (
HTTP_201_CREATED, HTTP_404_NOT_FOUND, HTTP_204_NO_CONTENT,
HTTP_200_OK, HTTP_409_CONFLICT, HTTP_500_INTERNAL_SERVER_ERROR, HTTP_405_METHOD_NOT_ALLOWED
)

from project.api.controllers.main import Controller
from project.api.services.base import ServiceException

services = os.environ.get("ACTIVE_SERVICES", "")
controller = Controller(services)
core_api = APIRouter()


@core_api.get("/{model}", status_code=HTTP_200_OK)
def get_records(request: Request, model):
parameters = request.query_params
filters, fields = parameters.get("filters"), parameters.get("fields"),
offset, limit = parameters.get("offset"), parameters.get("limit", 20)

try:
result = controller.get_all(model, filters=filters, fields=fields, offset=offset, limit=limit)
return JSONResponse(content=result)

except NotImplementedError:
return JSONResponse(status_code=HTTP_405_METHOD_NOT_ALLOWED)

except Exception as error:
return JSONResponse(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": str(error)})


@core_api.get("/{model}/{record_id}", status_code=HTTP_200_OK)
def get_record(model, record_id):
try:
result = controller.get(model, record_id)
return JSONResponse(content=result)

except KeyError:
return JSONResponse(
status_code=HTTP_404_NOT_FOUND,
content={})

except NotImplementedError:
return JSONResponse(status_code=HTTP_405_METHOD_NOT_ALLOWED)

except Exception as error:
return JSONResponse(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": str(error)})


@core_api.post("/{model}", status_code=HTTP_201_CREATED)
async def create_record(request: Request, model):
try:
controller.create(model, json.loads(await request.body()))
return JSONResponse(
status_code=HTTP_201_CREATED,
content={})

except NotImplementedError:
return JSONResponse(status_code=HTTP_405_METHOD_NOT_ALLOWED)

except ServiceException as error:
return JSONResponse(
status_code=HTTP_409_CONFLICT,
content={"error": str(error)})

except Exception as error:
return JSONResponse(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": str(error)})


@core_api.put("/{model}/{record_id}", status_code=HTTP_200_OK)
async def update_record(request: Request, model, record_id):
try:
controller.update(model, record_id, json.loads(await request.body()))
return JSONResponse(content={})

except KeyError:
return JSONResponse(
status_code=HTTP_404_NOT_FOUND,
content={})

except NotImplementedError:
return JSONResponse(status_code=HTTP_405_METHOD_NOT_ALLOWED)

except Exception as error:
return JSONResponse(
status_code=500,
content={"error": str(error)})


@core_api.delete("/{model}/{record_id}", status_code=HTTP_204_NO_CONTENT)
def delete_record(model, record_id):
try:
controller.delete(model, record_id)
return Response(status_code=HTTPStatus.NO_CONTENT.value)

except NotImplementedError:
return JSONResponse(status_code=HTTP_405_METHOD_NOT_ALLOWED)

except KeyError:
return JSONResponse(
status_code=HTTP_404_NOT_FOUND,
content={})

except Exception as error:
return JSONResponse(
status_code=500,
content={"error": str(error)})

Controller

project/api/controllers/main.py# -*- coding: utf-8 -*-

from typing import Any, Dict
from project.api.services.base import Service
from project.api.configs.service_mapper import mapper


class Controller:
def __init__(self, services: str):
"""
Initialize the services...
:param services: String that contains the service names (coma separated)
"""

services = services.split(",")
self.services = [Service.create_class(service)() for service in services]
self.mapper = mapper

def __get_service(self, model: str):
"""
Try to find a service for the given model...

:param model: Model to use
:return: The Service associated to the model.
"""

try:
return self.mapper[model]

except KeyError:
raise ControllerException("No service associated to the requested resource!")

def get_all(self, model: str, filters, fields, offset, limit):
return self.__get_service(model).get_records(
model, filters, fields, offset, limit)

def get(self, model: str, record_id: Any):
return self.__get_service(model).get_record(model, record_id)

def create(self, model: str, values: Dict):
self.__get_service(model).create_record(model, values["id"], values)

def update(self, model: str, record_id: Any, values: Dict):
self.__get_service(model).update_record(model, record_id, values)

def delete(self, model: str, record_id: Any):
self.__get_service(model).delete_record(model, record_id)


class ControllerException(Exception):
""" Custom class for controller exceptions """

At this point, the controller does not know how the services are implemented, even it does not know which services are running because is a high-level module in charge of creating those services and pass them to the controller, is here where we apply the Dependency Inversion Principle because we should depend upon abstractions, not on concrete implementations and also the Liskov Substitution Principle, this principle defines that objects of a superclass shall be replaceable with objects of its subclasses without breaking the application.

Services

project/api/services/base.py# -*- coding: utf-8 -*-

from abc import abstractmethod
from typing import Any, Dict, Iterable, List, Tuple


class Service:
"""
Abstract Service you need to implement to
interact with your data storage of preference.
"""

subclasses = {}

def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
cls.subclasses[cls.__name__] = cls

@classmethod
def create_class(cls, subclass_name: str = None):
if not subclass_name:
raise NotImplementedError

return cls.subclasses.get(subclass_name)

@abstractmethod
def get_records(self, model: str, filters: List[Tuple], fields: List[str],
offset: int = 0, limit: int = 20) -> Iterable:

"""
Return records using pagination...

:param model: Model or object family.
:param filters: Filters to apply to limit the records to retrieve.
:param fields: Fields you want to retrieve. If None, return all the object fields.
:param offset: Start index to retrieve
:param limit: End index to retrieve.
:return:
"""

@abstractmethod
def get_record(self, model: str, record_id: Any) -> Any:
"""
Return if exist the record with the specified id...

:param model: Model or object family.
:param record_id: Record id.
:return: The record or None
"""

@abstractmethod
def create_record(self, model: str, record_id: Any, record_values: Dict):
"""
Create a record using the values...

:param model: Model or object family.
:param record_id: Record id.
:param record_values: Dictionary of values.
:return:
"""

@abstractmethod
def update_record(self, model: str, record_id: Any, record_values: Dict):
"""
Update the record using the values...

:param model: Model or object family.
:param record_id: Record id.
:param record_values: Dictionary of values.
:return:
"""

@abstractmethod
def delete_record(self, model: str, record_id: Any):
"""
Delete the record...

:param model: Model or object family.
:param record_id: Record id.
:return:
"""


class ServiceException(Exception):
""" Custom class for service exceptions """

MockService

# -*- coding: utf-8 -*-

from typing import Any, Dict, Iterable, List, Tuple
from project.api.services.base import Service, ServiceException


mock = {
"mocks": {
"000-1": {
"id": "000-1",
"name": "Service",
"Description": "Some fancy description...",
"value": 50.0
},
"000-2": {
"id": "000-2",
"name": "Gas Station",
"Description": "Some fancy gas station...",
"value": 1.5
}
}
}


class MockService(Service):
def get_records(self, model: str, filters: List[Tuple], fields: List[str],
offset: int = 0, limit: int = 20) -> Iterable:

return [value for _, value in mock.get(model, {}).items()]

def get_record(self, model: str, record_id: Any) -> Any:
return mock.get(model, {})[record_id]

def create_record(self, model: str, record_id: Any, record_values: Dict):
if record_id in mock[model]:
raise ServiceException(f"Record with id {record_id}, already exist!")

mock[model][record_values["id"]] = record_values

def update_record(self, model: str, record_id: Any, values: Dict):
for key, value in values.items():
mock[model][record_id][key] = value

def delete_record(self, model: str, record_id: Any):
del mock[model][record_id]

DynamoService

# -*- coding: utf-8 -*-

import os
from typing import Any, Dict, Iterable, List, Tuple

import boto3

from project.api.services.base import Service


class DynamoService(Service):
def __init__(self):
self.resource = boto3.resource("dynamodb", region_name=os.getenv("REGION"))

def get_records(self, model: str, filters: List[Tuple], fields: List[str],
offset: Any = None, limit: int = 20) -> Iterable:

args = {
"TableName": model,
"Select": "ALL_ATTRIBUTES" if not fields else "SPECIFIC_ATTRIBUTES",
"ReturnConsumedCapacity": "TOTAL",
"Limit": limit
}

if offset:
args["ExclusiveStartKey"] = {"id": {"S": offset}}

if fields:
args["ProjectionExpression"] = fields

return self.resource.Table(model).scan(**args)["Items"]

def get_record(self, model: str, record_id: Any, **kwargs) -> Any:
return self.resource.Table(model).get_item(
Key={"id": record_id}
)["Item"]

def create_record(self, model: str, record_id: Any, record_values: Dict):
self.resource.Table(model).put_item(Item=record_values)

def update_record(self, model: str, record_id: Any, record_values: Dict):
update_expression = 'SET {}'.format(','.join(f'#{k}=:{k}' for k in record_values))
expression_attribute_values = {f':{k}': v for k, v in record_values.items()}
expression_attribute_names = {f'#{k}': k for k in record_values}

response = self.resource.Table(model).update_item(
TableName=model,
Key={'{}'.format("id"): record_id},
UpdateExpression=update_expression,
ExpressionAttributeValues=expression_attribute_values,
ExpressionAttributeNames=expression_attribute_names,
ReturnValues="UPDATED_NEW"
)

return response

def delete_record(self, model: str, record_id: Any):
self.resource.Table(model).delete_item(Key={"id": record_id})

AWS deployment

Serverless Application Model

AWS CodeBuild

AWS Codebuild looks for a “buildspec.yaml” in the source code root directory to run a build. This file is a collection of build commands and related settings, in YAML format.

# buildspec.yamlversion: 0.2
phases:
build:
commands:
- sam build
- sam deploy --stack-name serverless-api-service --s3-bucket serverless-api-service --capabilities CAPABILITY_NAMED_IAM

GitHub integration

Secrets Manager

SAM template

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Description: >
Serverless API Service | Architecture for APIs using API Gateway, Lambda and FastAPI framework...

Resources:
FastApiServiceRole:
Type: AWS::IAM::Role
Properties:
RoleName: FastApiServiceRole
Description: Role for all resources into the service...

AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- apigateway.amazonaws.com
- lambda.amazonaws.com
Action:
- sts:AssumeRole

Policies:
- PolicyName: CloudWatchPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: "*"
Action:
- logs:*

- PolicyName: DynamoPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: !GetAtt DynamoDBTable.Arn
Action:
- dynamodb:*

FastApiGateway:
Type: AWS::Serverless::Api
Properties:
StageName: prod
OpenApiVersion: "3.0.0"

FastApiLambda:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub ${AWS::StackName}-FastApiLambda
Description: >
Lambda to handle request...

CodeUri: ./
Handler: main.handler
Role: !GetAtt FastApiServiceRole.Arn

Runtime: python3.8
MemorySize: 128
Timeout: 300

Events:
ApiEvent:
Type: Api
Properties:
Path: /{proxy+}
Method: ANY
RestApiId:
Ref: FastApiGateway

Environment:
Variables:
REGION: us-east-1
ACTIVE_SERVICES: MockService,DynamoService

DynamoDBTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: users
AttributeDefinitions:
- AttributeName: id
AttributeType: S

KeySchema:
- AttributeName: id
KeyType: HASH

ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1

TimeToLiveSpecification:
AttributeName: ttl
Enabled: true

CodeBuildSourceCredential:
Type: AWS::CodeBuild::SourceCredential
Properties:
ServerType: GITHUB
AuthType: PERSONAL_ACCESS_TOKEN
Token: "{{resolve:secretsmanager:github_personal_access_token:SecretString:github_personal_access_token}}"

FastApiCodeBuild:
Type: AWS::CodeBuild::Project
Properties:
Name: serverless-api-service-deployment
Description: Build process for the service using the SAM template...
ServiceRole: !GetAtt FastApiServiceCodeBuildRole.Arn
Artifacts:
Name: code-build-artifacts
Type: S3
Location: serverless-api-service
Path: ""
NamespaceType: NONE
Packaging: NONE

Environment:
Type: LINUX_CONTAINER
ComputeType: BUILD_GENERAL1_SMALL
Image: aws/codebuild/amazonlinux2-x86_64-standard:2.0

Source:
Location: https://github.com/alekglez/serverless-api-service.git
Type: GITHUB
GitCloneDepth: 1
Auth:
Resource: !Ref CodeBuildSourceCredential
Type: OAUTH

Triggers:
Webhook: true
FilterGroups:
- - Type: EVENT
Pattern: PUSH
ExcludeMatchedPattern: false

BadgeEnabled: false

LogsConfig:
CloudWatchLogs:
Status: ENABLED

S3Logs:
Status: DISABLED
EncryptionDisabled: false

TimeoutInMinutes: 10

FastApiServiceCodeBuildRole:
Type: AWS::IAM::Role
Properties:
RoleName: FastApiServiceCodeBuildRole
Description: Access...

AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- codebuild.amazonaws.com
- lambda.amazonaws.com

ManagedPolicyArns:
- arn:aws:iam::aws:policy/AWSLambda_FullAccess
- arn:aws:iam::aws:policy/AmazonAPIGatewayAdministrator
- arn:aws:iam::aws:policy/AWSCloudFormationFullAccess

Policies:
- PolicyName: CodeBuildLogsPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource:
- "*"
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents

- PolicyName: CodeBuildCodePipelineBucketPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource:
- "arn:aws:s3:::codepipeline-us-east-1-*"
Action:
- s3:PutObject
- s3:GetObject
- s3:GetObjectVersion
- s3:GetBucketAcl
- s3:GetBucketLocation

- PolicyName: CodeBuildDeploymentBucketPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource:
- "*"
Action:
- s3:*

- PolicyName: CodeBuildCodeBuildPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource:
- "*"
Action:
- codebuild:CreateReportGroup
- codebuild:CreateReport
- codebuild:UpdateReport
- codebuild:BatchPutTestCases
- codebuild:ImportSourceCredentials

Deploying the infrastructure

Install SAM CLI

Then, you can create the configurations and run the below commands in order to create and deploy your project infrastructure.

export AWS_SAM_STACK_NAME=serverless-api-service
export ACTIVE_SERVICES=MockService,DynamoService
export REGION=us-east-1
git clone https://github.com/alekglez/serverless-api-service.git
cd serverless-api-service
virtualenv --python=python3.8 .venv
pip install -r requirements.txt
aws s3api create-bucket \
--bucket serverless-api-service \
--region us-east-1 \
--acl private
sam build
sam deploy --stack-name serverless-api-service --s3-bucket serverless-api-service --capabilities CAPABILITY_NAMED_IAM
aws dynamodb put-item --table-name users --item '{"id": {"S": "000-1"}, "full_name": {"S": "User X"}, "gender": {"S": "M"}}' --return-consumed-capacity TOTALaws dynamodb put-item --table-name users --item '{"id": {"S": "000-2"}, "full_name": {"S": "User Y"}, "gender": {"S": "F"}}' --return-consumed-capacity TOTAL

After the deployment is done, you must go to the Stage menu in the API Gateway console and retrieve the Invoke URL for your endpoints. Now, you are ready to send some request.

# Get all records...
curl --location --request GET 'http://{server}/api/mocks'
curl --location --request GET 'http://{server}/api/users'
curl --location --request GET 'http://{server}/api/users?fields=id,full_name'
# Get specific record...
curl --location --request GET 'http://{server}/api/mocks/000-1'
curl --location --request GET 'http://{server}/api/users/000-1'
# Create a record...
curl --location --request POST 'https://lqfshxplp2.execute-api.us-east-1.amazonaws.com/prod/api/users' \
--header 'Content-Type: application/json' \
--data-raw '{
"id": "000-3",
"full_name": "User3",
"gender": "F"
}'
# Update a record...
curl --location --request PUT 'https://lqfshxplp2.execute-api.us-east-1.amazonaws.com/prod/api/users/000-2' \
--header 'Content-Type: application/json' \
--data-raw '{
"full_name": "zzz"
}'
# Delete a record...
curl --location --request DELETE 'https://lqfshxplp2.execute-api.us-east-1.amazonaws.com/prod/api/users/000-3'

Once you upload new code to your repository, you can check the build history in AWS CodeBuild.

That’s all for today folks. Maybe, this project can be a blueprint for your next Rest API journey using a serverless architecture…

Code repository: https://github.com/alekglez/serverless-api-service

Cloud & Solutions & Data Architect | Python Developer | Serverless Advocate