Serverless API Service
There are many use-cases where a serverless architecture has a lot of benefits and of course an API service is not the exception. In a previous post, we learned how to deploy a Flask application on EC2 using a Load Balancer and Auto Scaling Groups. Of course, you can think about deploying it into the AWS Elastic Container Service but…, what if you don’t want to manage instances or clusters?
The first piece of advice we can give you is that you should not architect differently for serverless because you should be able to deploy your service in different environments without code modifications…
Architecture
The entry point for our Lambda Function where our FastAPI project resides will be AWS API Gateway. The service will retrieve data from the backends we have registered in the app. Using this generic approach we can have multiple ways to serve data as if we are working with GraphQL, where multiple data sources are behind a single endpoint. Also, we will create the mechanism (CI/CD) to deploy the code when it is pushed into the GitHub repository through AWS CodeBuild.
Project Structure
serverless-api-service
├── buildspec.yaml
├── Dockerfile
├── LICENSE
├── main.py
├── manage.py
├── project
│ ├── api
│ │ ├── configs
│ │ │ ├── __init__.py
│ │ │ └── service_mapper.py
│ │ ├── controllers
│ │ │ ├── __init__.py
│ │ │ ├── main.py
│ │ ├── __init__.py
│ │ ├── requirements.txt
│ │ ├── routers
│ │ │ ├── health.py
│ │ │ ├── __init__.py
│ │ │ ├── main.py
│ │ ├── services
│ │ │ ├── base.py
│ │ │ ├── dynamo_service.py
│ │ │ ├── __init__.py
│ │ │ ├── mock_service.py
│ │ └── tests
│ │ ├── base.py
│ │ ├── __init__.py
│ │ ├── test_generic_endpoints.py
│ │ └── test_health.py
│ ├── __init__.py
│ └── requirements.txt
├── README.md
├── requirements.txt
└── template.yml
FastAPI application
FastAPI is a modern web framework for building Rest APIs which offers fast applications development, high-performance, asynchronous code executions and many other interesting and useful characteristics, and as you can figure out is the framework of election for our project today.
Routers
Supposing the application is handling CRUD operations for many resources we are creating four generic entry points and we will let you use the controller and the implemented services to specify your application logic in case of some specific scenario.
You need to specify the services (class name) you want to use to handle the requests and you need to configure a mapper where you must specify the services and the resources relation. Does it sound tricky right?
Let’s put an example: Suppose you will have a request like http://localhost:8000/api/mocks, and you want to retrieve data from a data structure implemented in memory, but for another resource like http://localhost:8000/api/users, you want to retrieve the information from the table users in DynamoDB. Let’s check the mapper needed here:
project/api/configs/service_mapper.py# -*- coding: utf-8 -*-
from project.api.services.dynamo_service import DynamoService
from project.api.services.mock_service import MockService
mapper = {
"mocks": MockService(),
"users": DynamoService()
}
As you can see, you should have implemented the services needed to respond to the resources requested. But…, why do it in this way? Well, probably you heard before about the SOLID principles, in this specific use-case, we are using the Liskov Substitution Principle (L) and the Dependency Inversion Principle (D).
Before we go to the services, let’s create the generic service endpoints. These functions get the requests and re-direct the application flow for the controller.
project/api/routers/main.py# -*- coding: utf-8 -*-
import json
import os
from http import HTTPStatus
from fastapi import APIRouter
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.status import (
HTTP_201_CREATED, HTTP_404_NOT_FOUND, HTTP_204_NO_CONTENT,
HTTP_200_OK, HTTP_409_CONFLICT, HTTP_500_INTERNAL_SERVER_ERROR, HTTP_405_METHOD_NOT_ALLOWED
)
from project.api.controllers.main import Controller
from project.api.services.base import ServiceException
services = os.environ.get("ACTIVE_SERVICES", "")
controller = Controller(services)
core_api = APIRouter()
@core_api.get("/{model}", status_code=HTTP_200_OK)
def get_records(request: Request, model):
parameters = request.query_params
filters, fields = parameters.get("filters"), parameters.get("fields"),
offset, limit = parameters.get("offset"), parameters.get("limit", 20)
try:
result = controller.get_all(model, filters=filters, fields=fields, offset=offset, limit=limit)
return JSONResponse(content=result)
except NotImplementedError:
return JSONResponse(status_code=HTTP_405_METHOD_NOT_ALLOWED)
except Exception as error:
return JSONResponse(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": str(error)})
@core_api.get("/{model}/{record_id}", status_code=HTTP_200_OK)
def get_record(model, record_id):
try:
result = controller.get(model, record_id)
return JSONResponse(content=result)
except KeyError:
return JSONResponse(
status_code=HTTP_404_NOT_FOUND,
content={})
except NotImplementedError:
return JSONResponse(status_code=HTTP_405_METHOD_NOT_ALLOWED)
except Exception as error:
return JSONResponse(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": str(error)})
@core_api.post("/{model}", status_code=HTTP_201_CREATED)
async def create_record(request: Request, model):
try:
controller.create(model, json.loads(await request.body()))
return JSONResponse(
status_code=HTTP_201_CREATED,
content={})
except NotImplementedError:
return JSONResponse(status_code=HTTP_405_METHOD_NOT_ALLOWED)
except ServiceException as error:
return JSONResponse(
status_code=HTTP_409_CONFLICT,
content={"error": str(error)})
except Exception as error:
return JSONResponse(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": str(error)})
@core_api.put("/{model}/{record_id}", status_code=HTTP_200_OK)
async def update_record(request: Request, model, record_id):
try:
controller.update(model, record_id, json.loads(await request.body()))
return JSONResponse(content={})
except KeyError:
return JSONResponse(
status_code=HTTP_404_NOT_FOUND,
content={})
except NotImplementedError:
return JSONResponse(status_code=HTTP_405_METHOD_NOT_ALLOWED)
except Exception as error:
return JSONResponse(
status_code=500,
content={"error": str(error)})
@core_api.delete("/{model}/{record_id}", status_code=HTTP_204_NO_CONTENT)
def delete_record(model, record_id):
try:
controller.delete(model, record_id)
return Response(status_code=HTTPStatus.NO_CONTENT.value)
except NotImplementedError:
return JSONResponse(status_code=HTTP_405_METHOD_NOT_ALLOWED)
except KeyError:
return JSONResponse(
status_code=HTTP_404_NOT_FOUND,
content={})
except Exception as error:
return JSONResponse(
status_code=500,
content={"error": str(error)})
Controller
Well, the function of the controller is to find the service that should respond to the requested resource and redirect the application flow to it.
project/api/controllers/main.py# -*- coding: utf-8 -*-
from typing import Any, Dict
from project.api.services.base import Service
from project.api.configs.service_mapper import mapper
class Controller:
def __init__(self, services: str):
"""
Initialize the services...
:param services: String that contains the service names (coma separated)
"""
services = services.split(",")
self.services = [Service.create_class(service)() for service in services]
self.mapper = mapper
def __get_service(self, model: str):
"""
Try to find a service for the given model...
:param model: Model to use
:return: The Service associated to the model.
"""
try:
return self.mapper[model]
except KeyError:
raise ControllerException("No service associated to the requested resource!")
def get_all(self, model: str, filters, fields, offset, limit):
return self.__get_service(model).get_records(
model, filters, fields, offset, limit)
def get(self, model: str, record_id: Any):
return self.__get_service(model).get_record(model, record_id)
def create(self, model: str, values: Dict):
self.__get_service(model).create_record(model, values["id"], values)
def update(self, model: str, record_id: Any, values: Dict):
self.__get_service(model).update_record(model, record_id, values)
def delete(self, model: str, record_id: Any):
self.__get_service(model).delete_record(model, record_id)
class ControllerException(Exception):
""" Custom class for controller exceptions """
At this point, the controller does not know how the services are implemented, even it does not know which services are running because is a high-level module in charge of creating those services and pass them to the controller, is here where we apply the Dependency Inversion Principle because we should depend upon abstractions, not on concrete implementations and also the Liskov Substitution Principle, this principle defines that objects of a superclass shall be replaceable with objects of its subclasses without breaking the application.
Services
The service must implement all the functions for your CRUD operations defined in the base.py abstract class. In the case where some of your services does not allow some method (for example POST, PUT, DELETE), it must raise NotImplementedError.
project/api/services/base.py# -*- coding: utf-8 -*-
from abc import abstractmethod
from typing import Any, Dict, Iterable, List, Tuple
class Service:
"""
Abstract Service you need to implement to
interact with your data storage of preference.
"""
subclasses = {}
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
cls.subclasses[cls.__name__] = cls
@classmethod
def create_class(cls, subclass_name: str = None):
if not subclass_name:
raise NotImplementedError
return cls.subclasses.get(subclass_name)
@abstractmethod
def get_records(self, model: str, filters: List[Tuple], fields: List[str],
offset: int = 0, limit: int = 20) -> Iterable:
"""
Return records using pagination...
:param model: Model or object family.
:param filters: Filters to apply to limit the records to retrieve.
:param fields: Fields you want to retrieve. If None, return all the object fields.
:param offset: Start index to retrieve
:param limit: End index to retrieve.
:return:
"""
@abstractmethod
def get_record(self, model: str, record_id: Any) -> Any:
"""
Return if exist the record with the specified id...
:param model: Model or object family.
:param record_id: Record id.
:return: The record or None
"""
@abstractmethod
def create_record(self, model: str, record_id: Any, record_values: Dict):
"""
Create a record using the values...
:param model: Model or object family.
:param record_id: Record id.
:param record_values: Dictionary of values.
:return:
"""
@abstractmethod
def update_record(self, model: str, record_id: Any, record_values: Dict):
"""
Update the record using the values...
:param model: Model or object family.
:param record_id: Record id.
:param record_values: Dictionary of values.
:return:
"""
@abstractmethod
def delete_record(self, model: str, record_id: Any):
"""
Delete the record...
:param model: Model or object family.
:param record_id: Record id.
:return:
"""
class ServiceException(Exception):
""" Custom class for service exceptions """
MockService
This is a mock service without a real backend service behind it. Probably useful for testing purposes.
# -*- coding: utf-8 -*-
from typing import Any, Dict, Iterable, List, Tuple
from project.api.services.base import Service, ServiceException
mock = {
"mocks": {
"000-1": {
"id": "000-1",
"name": "Service",
"Description": "Some fancy description...",
"value": 50.0
},
"000-2": {
"id": "000-2",
"name": "Gas Station",
"Description": "Some fancy gas station...",
"value": 1.5
}
}
}
class MockService(Service):
def get_records(self, model: str, filters: List[Tuple], fields: List[str],
offset: int = 0, limit: int = 20) -> Iterable:
return [value for _, value in mock.get(model, {}).items()]
def get_record(self, model: str, record_id: Any) -> Any:
return mock.get(model, {})[record_id]
def create_record(self, model: str, record_id: Any, record_values: Dict):
if record_id in mock[model]:
raise ServiceException(f"Record with id {record_id}, already exist!")
mock[model][record_values["id"]] = record_values
def update_record(self, model: str, record_id: Any, values: Dict):
for key, value in values.items():
mock[model][record_id][key] = value
def delete_record(self, model: str, record_id: Any):
del mock[model][record_id]
DynamoService
For a more realistic service, we could use a DynamoDB table. In this case, is the same approach, we need to implement the abstract class Service, and add the service to the mapper (as we did previously).
# -*- coding: utf-8 -*-
import os
from typing import Any, Dict, Iterable, List, Tuple
import boto3
from project.api.services.base import Service
class DynamoService(Service):
def __init__(self):
self.resource = boto3.resource("dynamodb", region_name=os.getenv("REGION"))
def get_records(self, model: str, filters: List[Tuple], fields: List[str],
offset: Any = None, limit: int = 20) -> Iterable:
args = {
"TableName": model,
"Select": "ALL_ATTRIBUTES" if not fields else "SPECIFIC_ATTRIBUTES",
"ReturnConsumedCapacity": "TOTAL",
"Limit": limit
}
if offset:
args["ExclusiveStartKey"] = {"id": {"S": offset}}
if fields:
args["ProjectionExpression"] = fields
return self.resource.Table(model).scan(**args)["Items"]
def get_record(self, model: str, record_id: Any, **kwargs) -> Any:
return self.resource.Table(model).get_item(
Key={"id": record_id}
)["Item"]
def create_record(self, model: str, record_id: Any, record_values: Dict):
self.resource.Table(model).put_item(Item=record_values)
def update_record(self, model: str, record_id: Any, record_values: Dict):
update_expression = 'SET {}'.format(','.join(f'#{k}=:{k}' for k in record_values))
expression_attribute_values = {f':{k}': v for k, v in record_values.items()}
expression_attribute_names = {f'#{k}': k for k in record_values}
response = self.resource.Table(model).update_item(
TableName=model,
Key={'{}'.format("id"): record_id},
UpdateExpression=update_expression,
ExpressionAttributeValues=expression_attribute_values,
ExpressionAttributeNames=expression_attribute_names,
ReturnValues="UPDATED_NEW"
)
return response
def delete_record(self, model: str, record_id: Any):
self.resource.Table(model).delete_item(Key={"id": record_id})
AWS deployment
Once we have our service running and tested locally is time to deploy it into the AWS Cloud and for this purpose, we will create a SAM template.
Serverless Application Model
The Serverless Application Model (SAM) is an open-source framework created with the purpose of building serverless applications on AWS, which provides shorthand syntax to express functions, APIs, databases, and event source mappings.
AWS CodeBuild
For continuous integration and continuous deployment (CI/CD) processes, we will use AWS CodeBuild which provides full support for the Serverless Application Model.
AWS Codebuild looks for a “buildspec.yaml” in the source code root directory to run a build. This file is a collection of build commands and related settings, in YAML format.
# buildspec.yamlversion: 0.2
phases:
build:
commands:
- sam build
- sam deploy --stack-name serverless-api-service --s3-bucket serverless-api-service --capabilities CAPABILITY_NAMED_IAM
GitHub integration
The integration between CodeBuild and GitHub is done via a webhook, that’s why we need to generate a personal access token into our GitHub account. To do that you can visit: https://github.com/settings/tokens. Below the access, you need to grant.
Secrets Manager
In the SAM template, we need to put the access we just created, but of course, putting a credential into the code is not an option. Then, because currently the use of System Manager Store Parameter service inside the SAM template is not allowed, we will use the service Secrets Manager for that purpose.
SAM template
Below, the SAM template defines the resources needed to fulfil the defined service architecture.
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
Serverless API Service | Architecture for APIs using API Gateway, Lambda and FastAPI framework...
Resources:
FastApiServiceRole:
Type: AWS::IAM::Role
Properties:
RoleName: FastApiServiceRole
Description: Role for all resources into the service...
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- apigateway.amazonaws.com
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: CloudWatchPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: "*"
Action:
- logs:*
- PolicyName: DynamoPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: !GetAtt DynamoDBTable.Arn
Action:
- dynamodb:*
FastApiGateway:
Type: AWS::Serverless::Api
Properties:
StageName: prod
OpenApiVersion: "3.0.0"
FastApiLambda:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub ${AWS::StackName}-FastApiLambda
Description: >
Lambda to handle request...
CodeUri: ./
Handler: main.handler
Role: !GetAtt FastApiServiceRole.Arn
Runtime: python3.8
MemorySize: 128
Timeout: 300
Events:
ApiEvent:
Type: Api
Properties:
Path: /{proxy+}
Method: ANY
RestApiId:
Ref: FastApiGateway
Environment:
Variables:
REGION: us-east-1
ACTIVE_SERVICES: MockService,DynamoService
DynamoDBTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: users
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
CodeBuildSourceCredential:
Type: AWS::CodeBuild::SourceCredential
Properties:
ServerType: GITHUB
AuthType: PERSONAL_ACCESS_TOKEN
Token: "{{resolve:secretsmanager:github_personal_access_token:SecretString:github_personal_access_token}}"
FastApiCodeBuild:
Type: AWS::CodeBuild::Project
Properties:
Name: serverless-api-service-deployment
Description: Build process for the service using the SAM template...
ServiceRole: !GetAtt FastApiServiceCodeBuildRole.Arn
Artifacts:
Name: code-build-artifacts
Type: S3
Location: serverless-api-service
Path: ""
NamespaceType: NONE
Packaging: NONE
Environment:
Type: LINUX_CONTAINER
ComputeType: BUILD_GENERAL1_SMALL
Image: aws/codebuild/amazonlinux2-x86_64-standard:2.0
Source:
Location: https://github.com/alekglez/serverless-api-service.git
Type: GITHUB
GitCloneDepth: 1
Auth:
Resource: !Ref CodeBuildSourceCredential
Type: OAUTH
Triggers:
Webhook: true
FilterGroups:
- - Type: EVENT
Pattern: PUSH
ExcludeMatchedPattern: false
BadgeEnabled: false
LogsConfig:
CloudWatchLogs:
Status: ENABLED
S3Logs:
Status: DISABLED
EncryptionDisabled: false
TimeoutInMinutes: 10
FastApiServiceCodeBuildRole:
Type: AWS::IAM::Role
Properties:
RoleName: FastApiServiceCodeBuildRole
Description: Access...
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- codebuild.amazonaws.com
- lambda.amazonaws.com
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AWSLambda_FullAccess
- arn:aws:iam::aws:policy/AmazonAPIGatewayAdministrator
- arn:aws:iam::aws:policy/AWSCloudFormationFullAccess
Policies:
- PolicyName: CodeBuildLogsPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource:
- "*"
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
- PolicyName: CodeBuildCodePipelineBucketPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource:
- "arn:aws:s3:::codepipeline-us-east-1-*"
Action:
- s3:PutObject
- s3:GetObject
- s3:GetObjectVersion
- s3:GetBucketAcl
- s3:GetBucketLocation
- PolicyName: CodeBuildDeploymentBucketPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource:
- "*"
Action:
- s3:*
- PolicyName: CodeBuildCodeBuildPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource:
- "*"
Action:
- codebuild:CreateReportGroup
- codebuild:CreateReport
- codebuild:UpdateReport
- codebuild:BatchPutTestCases
- codebuild:ImportSourceCredentials
Deploying the infrastructure
The first deployment to AWS must be done manually using the SAM command-line interface. Later, all the deployments will be done automatically through AWS CodeBuild when the code is pushed into the repository.
Install SAM CLI
AWS provides the right tool to easily create, deploy and manage your serverless applications defined in a SAM template. You need to install and configure a few things in order to use the AWS SAM CLI.
Then, you can create the configurations and run the below commands in order to create and deploy your project infrastructure.
export AWS_SAM_STACK_NAME=serverless-api-service
export ACTIVE_SERVICES=MockService,DynamoService
export REGION=us-east-1git clone https://github.com/alekglez/serverless-api-service.git
cd serverless-api-servicevirtualenv --python=python3.8 .venv
pip install -r requirements.txtaws s3api create-bucket \
--bucket serverless-api-service \
--region us-east-1 \
--acl privatesam build
sam deploy --stack-name serverless-api-service --s3-bucket serverless-api-service --capabilities CAPABILITY_NAMED_IAMaws dynamodb put-item --table-name users --item '{"id": {"S": "000-1"}, "full_name": {"S": "User X"}, "gender": {"S": "M"}}' --return-consumed-capacity TOTALaws dynamodb put-item --table-name users --item '{"id": {"S": "000-2"}, "full_name": {"S": "User Y"}, "gender": {"S": "F"}}' --return-consumed-capacity TOTAL
After the deployment is done, you must go to the Stage menu in the API Gateway console and retrieve the Invoke URL for your endpoints. Now, you are ready to send some request.
# Get all records...
curl --location --request GET 'http://{server}/api/mocks'
curl --location --request GET 'http://{server}/api/users'
curl --location --request GET 'http://{server}/api/users?fields=id,full_name'# Get specific record...
curl --location --request GET 'http://{server}/api/mocks/000-1'
curl --location --request GET 'http://{server}/api/users/000-1'# Create a record...
curl --location --request POST 'https://lqfshxplp2.execute-api.us-east-1.amazonaws.com/prod/api/users' \
--header 'Content-Type: application/json' \
--data-raw '{
"id": "000-3",
"full_name": "User3",
"gender": "F"
}'# Update a record...
curl --location --request PUT 'https://lqfshxplp2.execute-api.us-east-1.amazonaws.com/prod/api/users/000-2' \
--header 'Content-Type: application/json' \
--data-raw '{
"full_name": "zzz"
}'# Delete a record...
curl --location --request DELETE 'https://lqfshxplp2.execute-api.us-east-1.amazonaws.com/prod/api/users/000-3'
Once you upload new code to your repository, you can check the build history in AWS CodeBuild.
That’s all for today folks. Maybe, this project can be a blueprint for your next Rest API journey using a serverless architecture…
Code repository: https://github.com/alekglez/serverless-api-service