Serverless API Service

Architecture

Project Structure

serverless-api-service
├── buildspec.yaml
├── Dockerfile
├── LICENSE
├── main.py
├── manage.py
├── project
│ ├── api
│ │ ├── configs
│ │ │ ├── __init__.py
│ │ │ └── service_mapper.py
│ │ ├── controllers
│ │ │ ├── __init__.py
│ │ │ ├── main.py
│ │ ├── __init__.py
│ │ ├── requirements.txt
│ │ ├── routers
│ │ │ ├── health.py
│ │ │ ├── __init__.py
│ │ │ ├── main.py
│ │ ├── services
│ │ │ ├── base.py
│ │ │ ├── dynamo_service.py
│ │ │ ├── __init__.py
│ │ │ ├── mock_service.py
│ │ └── tests
│ │ ├── base.py
│ │ ├── __init__.py
│ │ ├── test_generic_endpoints.py
│ │ └── test_health.py
│ ├── __init__.py
│ └── requirements.txt
├── README.md
├── requirements.txt
└── template.yml

FastAPI application

Routers

project/api/configs/service_mapper.py# -*- coding: utf-8 -*-

from project.api.services.dynamo_service import DynamoService
from project.api.services.mock_service import MockService


mapper = {
"mocks": MockService(),
"users": DynamoService()
}
project/api/routers/main.py# -*- coding: utf-8 -*-

import json
import os
from http import HTTPStatus

from fastapi import APIRouter
from starlette.requests import Request
from starlette.responses import JSONResponse, Response

from starlette.status import (
HTTP_201_CREATED, HTTP_404_NOT_FOUND, HTTP_204_NO_CONTENT,
HTTP_200_OK, HTTP_409_CONFLICT, HTTP_500_INTERNAL_SERVER_ERROR, HTTP_405_METHOD_NOT_ALLOWED
)

from project.api.controllers.main import Controller
from project.api.services.base import ServiceException

services = os.environ.get("ACTIVE_SERVICES", "")
controller = Controller(services)
core_api = APIRouter()


@core_api.get("/{model}", status_code=HTTP_200_OK)
def get_records(request: Request, model):
parameters = request.query_params
filters, fields = parameters.get("filters"), parameters.get("fields"),
offset, limit = parameters.get("offset"), parameters.get("limit", 20)

try:
result = controller.get_all(model, filters=filters, fields=fields, offset=offset, limit=limit)
return JSONResponse(content=result)

except NotImplementedError:
return JSONResponse(status_code=HTTP_405_METHOD_NOT_ALLOWED)

except Exception as error:
return JSONResponse(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": str(error)})


@core_api.get("/{model}/{record_id}", status_code=HTTP_200_OK)
def get_record(model, record_id):
try:
result = controller.get(model, record_id)
return JSONResponse(content=result)

except KeyError:
return JSONResponse(
status_code=HTTP_404_NOT_FOUND,
content={})

except NotImplementedError:
return JSONResponse(status_code=HTTP_405_METHOD_NOT_ALLOWED)

except Exception as error:
return JSONResponse(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": str(error)})


@core_api.post("/{model}", status_code=HTTP_201_CREATED)
async def create_record(request: Request, model):
try:
controller.create(model, json.loads(await request.body()))
return JSONResponse(
status_code=HTTP_201_CREATED,
content={})

except NotImplementedError:
return JSONResponse(status_code=HTTP_405_METHOD_NOT_ALLOWED)

except ServiceException as error:
return JSONResponse(
status_code=HTTP_409_CONFLICT,
content={"error": str(error)})

except Exception as error:
return JSONResponse(
status_code=HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": str(error)})


@core_api.put("/{model}/{record_id}", status_code=HTTP_200_OK)
async def update_record(request: Request, model, record_id):
try:
controller.update(model, record_id, json.loads(await request.body()))
return JSONResponse(content={})

except KeyError:
return JSONResponse(
status_code=HTTP_404_NOT_FOUND,
content={})

except NotImplementedError:
return JSONResponse(status_code=HTTP_405_METHOD_NOT_ALLOWED)

except Exception as error:
return JSONResponse(
status_code=500,
content={"error": str(error)})


@core_api.delete("/{model}/{record_id}", status_code=HTTP_204_NO_CONTENT)
def delete_record(model, record_id):
try:
controller.delete(model, record_id)
return Response(status_code=HTTPStatus.NO_CONTENT.value)

except NotImplementedError:
return JSONResponse(status_code=HTTP_405_METHOD_NOT_ALLOWED)

except KeyError:
return JSONResponse(
status_code=HTTP_404_NOT_FOUND,
content={})

except Exception as error:
return JSONResponse(
status_code=500,
content={"error": str(error)})

Controller

project/api/controllers/main.py# -*- coding: utf-8 -*-

from typing import Any, Dict
from project.api.services.base import Service
from project.api.configs.service_mapper import mapper


class Controller:
def __init__(self, services: str):
"""
Initialize the services...
:param services: String that contains the service names (coma separated)
"""

services = services.split(",")
self.services = [Service.create_class(service)() for service in services]
self.mapper = mapper

def __get_service(self, model: str):
"""
Try to find a service for the given model...

:param model: Model to use
:return: The Service associated to the model.
"""

try:
return self.mapper[model]

except KeyError:
raise ControllerException("No service associated to the requested resource!")

def get_all(self, model: str, filters, fields, offset, limit):
return self.__get_service(model).get_records(
model, filters, fields, offset, limit)

def get(self, model: str, record_id: Any):
return self.__get_service(model).get_record(model, record_id)

def create(self, model: str, values: Dict):
self.__get_service(model).create_record(model, values["id"], values)

def update(self, model: str, record_id: Any, values: Dict):
self.__get_service(model).update_record(model, record_id, values)

def delete(self, model: str, record_id: Any):
self.__get_service(model).delete_record(model, record_id)


class ControllerException(Exception):
""" Custom class for controller exceptions """

Services

project/api/services/base.py# -*- coding: utf-8 -*-

from abc import abstractmethod
from typing import Any, Dict, Iterable, List, Tuple


class Service:
"""
Abstract Service you need to implement to
interact with your data storage of preference.
"""

subclasses = {}

def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
cls.subclasses[cls.__name__] = cls

@classmethod
def create_class(cls, subclass_name: str = None):
if not subclass_name:
raise NotImplementedError

return cls.subclasses.get(subclass_name)

@abstractmethod
def get_records(self, model: str, filters: List[Tuple], fields: List[str],
offset: int = 0, limit: int = 20) -> Iterable:

"""
Return records using pagination...

:param model: Model or object family.
:param filters: Filters to apply to limit the records to retrieve.
:param fields: Fields you want to retrieve. If None, return all the object fields.
:param offset: Start index to retrieve
:param limit: End index to retrieve.
:return:
"""

@abstractmethod
def get_record(self, model: str, record_id: Any) -> Any:
"""
Return if exist the record with the specified id...

:param model: Model or object family.
:param record_id: Record id.
:return: The record or None
"""

@abstractmethod
def create_record(self, model: str, record_id: Any, record_values: Dict):
"""
Create a record using the values...

:param model: Model or object family.
:param record_id: Record id.
:param record_values: Dictionary of values.
:return:
"""

@abstractmethod
def update_record(self, model: str, record_id: Any, record_values: Dict):
"""
Update the record using the values...

:param model: Model or object family.
:param record_id: Record id.
:param record_values: Dictionary of values.
:return:
"""

@abstractmethod
def delete_record(self, model: str, record_id: Any):
"""
Delete the record...

:param model: Model or object family.
:param record_id: Record id.
:return:
"""


class ServiceException(Exception):
""" Custom class for service exceptions """

MockService

# -*- coding: utf-8 -*-

from typing import Any, Dict, Iterable, List, Tuple
from project.api.services.base import Service, ServiceException


mock = {
"mocks": {
"000-1": {
"id": "000-1",
"name": "Service",
"Description": "Some fancy description...",
"value": 50.0
},
"000-2": {
"id": "000-2",
"name": "Gas Station",
"Description": "Some fancy gas station...",
"value": 1.5
}
}
}


class MockService(Service):
def get_records(self, model: str, filters: List[Tuple], fields: List[str],
offset: int = 0, limit: int = 20) -> Iterable:

return [value for _, value in mock.get(model, {}).items()]

def get_record(self, model: str, record_id: Any) -> Any:
return mock.get(model, {})[record_id]

def create_record(self, model: str, record_id: Any, record_values: Dict):
if record_id in mock[model]:
raise ServiceException(f"Record with id {record_id}, already exist!")

mock[model][record_values["id"]] = record_values

def update_record(self, model: str, record_id: Any, values: Dict):
for key, value in values.items():
mock[model][record_id][key] = value

def delete_record(self, model: str, record_id: Any):
del mock[model][record_id]

DynamoService

# -*- coding: utf-8 -*-

import os
from typing import Any, Dict, Iterable, List, Tuple

import boto3

from project.api.services.base import Service


class DynamoService(Service):
def __init__(self):
self.resource = boto3.resource("dynamodb", region_name=os.getenv("REGION"))

def get_records(self, model: str, filters: List[Tuple], fields: List[str],
offset: Any = None, limit: int = 20) -> Iterable:

args = {
"TableName": model,
"Select": "ALL_ATTRIBUTES" if not fields else "SPECIFIC_ATTRIBUTES",
"ReturnConsumedCapacity": "TOTAL",
"Limit": limit
}

if offset:
args["ExclusiveStartKey"] = {"id": {"S": offset}}

if fields:
args["ProjectionExpression"] = fields

return self.resource.Table(model).scan(**args)["Items"]

def get_record(self, model: str, record_id: Any, **kwargs) -> Any:
return self.resource.Table(model).get_item(
Key={"id": record_id}
)["Item"]

def create_record(self, model: str, record_id: Any, record_values: Dict):
self.resource.Table(model).put_item(Item=record_values)

def update_record(self, model: str, record_id: Any, record_values: Dict):
update_expression = 'SET {}'.format(','.join(f'#{k}=:{k}' for k in record_values))
expression_attribute_values = {f':{k}': v for k, v in record_values.items()}
expression_attribute_names = {f'#{k}': k for k in record_values}

response = self.resource.Table(model).update_item(
TableName=model,
Key={'{}'.format("id"): record_id},
UpdateExpression=update_expression,
ExpressionAttributeValues=expression_attribute_values,
ExpressionAttributeNames=expression_attribute_names,
ReturnValues="UPDATED_NEW"
)

return response

def delete_record(self, model: str, record_id: Any):
self.resource.Table(model).delete_item(Key={"id": record_id})

AWS deployment

Serverless Application Model

AWS CodeBuild

# buildspec.yamlversion: 0.2
phases:
build:
commands:
- sam build
- sam deploy --stack-name serverless-api-service --s3-bucket serverless-api-service --capabilities CAPABILITY_NAMED_IAM

GitHub integration

Secrets Manager

SAM template

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Description: >
Serverless API Service | Architecture for APIs using API Gateway, Lambda and FastAPI framework...

Resources:
FastApiServiceRole:
Type: AWS::IAM::Role
Properties:
RoleName: FastApiServiceRole
Description: Role for all resources into the service...

AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- apigateway.amazonaws.com
- lambda.amazonaws.com
Action:
- sts:AssumeRole

Policies:
- PolicyName: CloudWatchPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: "*"
Action:
- logs:*

- PolicyName: DynamoPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource: !GetAtt DynamoDBTable.Arn
Action:
- dynamodb:*

FastApiGateway:
Type: AWS::Serverless::Api
Properties:
StageName: prod
OpenApiVersion: "3.0.0"

FastApiLambda:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub ${AWS::StackName}-FastApiLambda
Description: >
Lambda to handle request...

CodeUri: ./
Handler: main.handler
Role: !GetAtt FastApiServiceRole.Arn

Runtime: python3.8
MemorySize: 128
Timeout: 300

Events:
ApiEvent:
Type: Api
Properties:
Path: /{proxy+}
Method: ANY
RestApiId:
Ref: FastApiGateway

Environment:
Variables:
REGION: us-east-1
ACTIVE_SERVICES: MockService,DynamoService

DynamoDBTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: users
AttributeDefinitions:
- AttributeName: id
AttributeType: S

KeySchema:
- AttributeName: id
KeyType: HASH

ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1

TimeToLiveSpecification:
AttributeName: ttl
Enabled: true

CodeBuildSourceCredential:
Type: AWS::CodeBuild::SourceCredential
Properties:
ServerType: GITHUB
AuthType: PERSONAL_ACCESS_TOKEN
Token: "{{resolve:secretsmanager:github_personal_access_token:SecretString:github_personal_access_token}}"

FastApiCodeBuild:
Type: AWS::CodeBuild::Project
Properties:
Name: serverless-api-service-deployment
Description: Build process for the service using the SAM template...
ServiceRole: !GetAtt FastApiServiceCodeBuildRole.Arn
Artifacts:
Name: code-build-artifacts
Type: S3
Location: serverless-api-service
Path: ""
NamespaceType: NONE
Packaging: NONE

Environment:
Type: LINUX_CONTAINER
ComputeType: BUILD_GENERAL1_SMALL
Image: aws/codebuild/amazonlinux2-x86_64-standard:2.0

Source:
Location: https://github.com/alekglez/serverless-api-service.git
Type: GITHUB
GitCloneDepth: 1
Auth:
Resource: !Ref CodeBuildSourceCredential
Type: OAUTH

Triggers:
Webhook: true
FilterGroups:
- - Type: EVENT
Pattern: PUSH
ExcludeMatchedPattern: false

BadgeEnabled: false

LogsConfig:
CloudWatchLogs:
Status: ENABLED

S3Logs:
Status: DISABLED
EncryptionDisabled: false

TimeoutInMinutes: 10

FastApiServiceCodeBuildRole:
Type: AWS::IAM::Role
Properties:
RoleName: FastApiServiceCodeBuildRole
Description: Access...

AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service:
- codebuild.amazonaws.com
- lambda.amazonaws.com

ManagedPolicyArns:
- arn:aws:iam::aws:policy/AWSLambda_FullAccess
- arn:aws:iam::aws:policy/AmazonAPIGatewayAdministrator
- arn:aws:iam::aws:policy/AWSCloudFormationFullAccess

Policies:
- PolicyName: CodeBuildLogsPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource:
- "*"
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents

- PolicyName: CodeBuildCodePipelineBucketPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource:
- "arn:aws:s3:::codepipeline-us-east-1-*"
Action:
- s3:PutObject
- s3:GetObject
- s3:GetObjectVersion
- s3:GetBucketAcl
- s3:GetBucketLocation

- PolicyName: CodeBuildDeploymentBucketPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource:
- "*"
Action:
- s3:*

- PolicyName: CodeBuildCodeBuildPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Resource:
- "*"
Action:
- codebuild:CreateReportGroup
- codebuild:CreateReport
- codebuild:UpdateReport
- codebuild:BatchPutTestCases
- codebuild:ImportSourceCredentials

Deploying the infrastructure

Install SAM CLI

export AWS_SAM_STACK_NAME=serverless-api-service
export ACTIVE_SERVICES=MockService,DynamoService
export REGION=us-east-1
git clone https://github.com/alekglez/serverless-api-service.git
cd serverless-api-service
virtualenv --python=python3.8 .venv
pip install -r requirements.txt
aws s3api create-bucket \
--bucket serverless-api-service \
--region us-east-1 \
--acl private
sam build
sam deploy --stack-name serverless-api-service --s3-bucket serverless-api-service --capabilities CAPABILITY_NAMED_IAM
aws dynamodb put-item --table-name users --item '{"id": {"S": "000-1"}, "full_name": {"S": "User X"}, "gender": {"S": "M"}}' --return-consumed-capacity TOTALaws dynamodb put-item --table-name users --item '{"id": {"S": "000-2"}, "full_name": {"S": "User Y"}, "gender": {"S": "F"}}' --return-consumed-capacity TOTAL
# Get all records...
curl --location --request GET 'http://{server}/api/mocks'
curl --location --request GET 'http://{server}/api/users'
curl --location --request GET 'http://{server}/api/users?fields=id,full_name'
# Get specific record...
curl --location --request GET 'http://{server}/api/mocks/000-1'
curl --location --request GET 'http://{server}/api/users/000-1'
# Create a record...
curl --location --request POST 'https://lqfshxplp2.execute-api.us-east-1.amazonaws.com/prod/api/users' \
--header 'Content-Type: application/json' \
--data-raw '{
"id": "000-3",
"full_name": "User3",
"gender": "F"
}'
# Update a record...
curl --location --request PUT 'https://lqfshxplp2.execute-api.us-east-1.amazonaws.com/prod/api/users/000-2' \
--header 'Content-Type: application/json' \
--data-raw '{
"full_name": "zzz"
}'
# Delete a record...
curl --location --request DELETE 'https://lqfshxplp2.execute-api.us-east-1.amazonaws.com/prod/api/users/000-3'

--

--

--

Cloud & Solutions & Data Architect | Python Developer | Serverless Advocate

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Metastore in Apache Spark

¬Software Project Communication: 7 Discussions To Revisit

How to do Functional Programming

GraphQL: The Good, The Bad, and The Bottomline

Using Huawei Availability Plugin in Flutter Apps

Python Pandas Tutorial (Part4)

Amazon DynamoDB Core Concepts

Zixem Sqli challenge-2

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alejandro Cora González

Alejandro Cora González

Cloud & Solutions & Data Architect | Python Developer | Serverless Advocate

More from Medium

Use AWS CDK to Write Amazon API Gateway service for Uploading Objects to AWS S3

AWS Elastic Load Balancing from a Serverless perspective

Configuring APIGateway VTL to store JSON object directly on S3 —With Lambda authorizer enabled

Building a serverless, scalable, and secure web application in AWS