How to replicate your DynamoDB database to Elasticsearch using Swift and Soto

In this post I'll show you how to replicate data from your DynamoDB tables to Elasticsearch using AWS Lambda, Swift and Soto.

Swift Package Manager

I've been heavily using DynamoDB over the last few months and it's great! For our use case where we have fairly complex queries but we know all the queries we want to do up front, it's fast and efficient no matter how much data we try to query. (This is commonly known as OLTP - or online transaction processing). However, when you need to query data using random access patterns, such as for analytics, it sucks. It's expensive because you have to scan the entire table and it's slow. (This is also know as OLAP - or online analytical processing).

One of the clients I've been working with at the moment are building an online grocery store. Using DynamoDB for storing all the grocery items is a great fit. We know that we want to get all the items for a department, get single items, get all items for a category etc. Setting up our DynamoDB tables is relatively simple and our queries are fast, no matter how many items we have in the database.

As an aside, getting to the point where 'setting up our DynamoDB tables is relatively simple' is a steep learning curve. The whole thought process behind single table design, index overloads and composite keys is completely different to the SQL world. Here's a great post with some pointers and re:Invent videos to help. I'll likely have more to say about DynamoDB in a later blog.

The replication problem

Whilst using DynamoDB for the majority of our queries is great, you'll eventually hit a roadblock any time you want to do search or analytics. For our grocery store items, we want to be able to provide a good search experience to users and allow any future analytics queries to find top performing items etc. DynamoDB is rubbish for doing any kind of search across a table — it's simply not designed for that use case.

Thankfully, Elasticsearch exists. Elasticsearch is a powerful search engine that provides analytics and search capabilities across different indexes. It allows us to provide complex search patterns, fuzzy matching and even autocomplete to users when searching for items, categories or departments.

So how do we get the data from DynamoDB into Elasticsearch and keep it up to date? We do this with a Lambda function.

DynamoDB offers streams, which contain a replication log of every row added, modified or deleted in a table. We can then hook up a Lambda function which gets triggered whenever there's new data in the stream. Then in the Lambda function, we can transform the data into whatever way makes sense for us and insert it into Elasticsearch.

Setting up a Lambda

Almost all of our backend code for the grocery store is written in Swift so it makes sense to use Swift in our Lambda as well! The introduction of the Swift AWS Lambda Runtime makes this really simple to do.

Our Lambda project contains two modules - one for the application start point that hosts a very simple main.swift file and another that contains the application logic. Our main.swift simply looks like this:

import App
import AWSLambdaRuntime

Lambda.run { context in
    return Handler(eventLoop: context.eventLoop)
}

Doing this and splitting our our application logic into a separate module, allows us to write tests for our Handler. Next, in our App module we create a skeleton handler:

public class Handler: EventLoopLambdaHandler {
    public typealias In = AWSLambdaEvents.DynamoDB.Event
    public typealias Out = Void

    let httpClient: HTTPClient
    let awsClient: AWSClient

    public init(eventLoop: EventLoop) {
        self.httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoop))
        self.awsClient = AWSClient(credentialProvider: .selector(.environment, .configFile()),
                                   httpClientProvider: .shared(httpClient))

    }

    public func handle(context: Lambda.Context, event: AWSLambdaEvents.DynamoDB.Event) -> EventLoopFuture<Void> {
        // Handler code goes here
    }

    public func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void> {
        let promise = context.eventLoop.makePromise(of: Void.self)
        awsClient.shutdown() { error in
            if let error = error {
                promise.fail(error)
            } else {
                promise.succeed(())
            }
        }
        return promise.futureResult.flatMap {
            let promise = context.eventLoop.makePromise(of: Void.self)
            self.httpClient.shutdown() { error in
                if let error = error {
                    promise.fail(error)
                } else {
                    promise.succeed(())
                }
            }
            return promise.futureResult
        }
    }
}

This is the same code across all our Lambda functions that are triggered by a DynamoDB stream. We set up an HTTP Client, which is used by both Soto and the Elasticsearch Client. Notice the handle(context:event:) signature - we're only listening for DynamoDB events here and the Swift AWS Lambda runtime converts them to Swift objects for us, making it easy to use.

Processing events

Now that we have our Lambda handler written, we need to do something with the events we get! AWSLambdaEvents.DynamoDB.Event provides an array of records that have changed (they'll be batched up and delivered in chunks, rather than calling the Lambda function on each individual change). We loop through each record and then work out what to do with it.

Each record has an eventName enum property that can either be .create, .modify or .delete. For a delete event, this is relatively simple. We parse the primary and secondary key from the event.record and store this to send to Elasticsearch with a bulk operation.

Creation and modification is slightly different. To start, because we use a single table design architecture, the item being created or modified could be an item, a category, a department etc. So first we need to work out what type it is. Our primary keys are in the form of <ITEM-TYPE>-<ITEM-ID>, e.g. DEPARTMENT-F1BD3ED7-5068-4023-B227-433EE30E373C. The event.record contains a change property with keys and newImage that we can use to get all the information we need. So we simply check the first part of the primary key. Once we know what type it is, we can decode it into our model object.

Since we use the single table design pattern, most of our database rows are stored with helpful key names such as, pk, sk, data1, data2 etc. We take this data and then transform it into a real Department type. The DynamoDBDepartment and Department models all live in a package that's shared across all the microservices and lambdas which reduces code duplication.

A simplified version of our code looks something like:

for record in event.records {
    if record.eventName == .remove {
        guard let id = record.change.keys["pk"], 
          case let .string(idString) = id else {
            return context.eventLoop.makeFailedFuture(
                HandlerError(message: "Failed to get ID from \(record)"))
        }
        do {
            let operation = ESBulkOperation<GroceryItem>(
                operationType: .delete, 
                index: index, 
                id: id, 
                document: nil)
            operations.append(operation)
        } catch {
            context.logger.warning("Failed to handle delete event - \(event)")
        } 
    } else {
        do {
            let item = try convertToItem(from: record.change)
            if record.eventName == .modify {
                let operation = ESBulkOperation<GroceryItem>(
                    operationType: .update, 
                    index: index, 
                    id: item.id.uuidString, 
                    document: item)
                operations.append(operation)
            } else {
                let operation = ESBulkOperation(
                    operationType: .create, 
                    index: index, id: 
                    item.id.uuidString, 
                    document: item)
                operations.append(operation)
            }
        } catch {
            context.logger.warning("Failed to handle event - \(event)")
        } 
    }
}

Transforming Data

All of our data is already pretty denormalized being stored in DynamoDB. But there are some instances where we want to change it before we store it in Elasticsearch. One example of this is when we store grocery items. To make both search better, and presenting results to clients better when search results are retrieved, we store the department and category names in the item, as well as their IDs. So when we get a grocery item in the event record we need to fetch this data from DynamoDB. To do this we use Soto. Soto is a 3rd party SDK for AWS that contains everything you'd ever need to interact with AWS. It's built on top of SwiftNIO and AsyncHTTPClient so integrates well with both Vapor and the Swift AWS Lambda runtime.

Using Soto, we perform a query to get the department and categories for the item and then populate our GroceryItemSearchResult model, ready for insertion into Elasticsearch. We also use Soto to decode the newImage keys from the event record into our DynamoDB model types.

Inserting into Elasticsearch

Once we've performed all the above actions, we have an array of items to delete, an array of items to insert and an array of items to update. To do this we use the Elasticsearch NIO Client, which uses AsyncHTTPClient to make calls to the Elasticsearch API.

All of the changes are wrapped up in the ESBulkOperation type so we can send them in a small number of requests, rather than overloading our Elasticsearch instance when we do a large inventory import. We store the items and departments in a index specific to that particular store. This makes searching inside a store easy to do. We use the ID of the item, category or department for the ID in Elasticsearch, instead of allowing it to create an ID for us. This makes updating items simple as we just overwrite the item at that ID with the new information.

Finally we call elasticSearchClient.bulk(itemOperations) for the different model types and all our data is replicated into Elasticsearch. The Elasticsearch NIO Client also uses Soto because our Elasticsearch instances are hosted in AWS using their managed service. This means that all our requests to Elasticsearch need to be signed. The Elasticsearch NIO Client uses Soto to sign each request so it's transparent to our application (which makes testing easy as well).

Our simplified code above would look something like:

elasticSearchClient.bulk(operations).transform(to: ())

This sends our operations to Elasticsearch and returns EventLoopFuture<Void> to the Lambda runtime so it the stream knows all the records have been processed successfully. If any errors are thrown, then the DynamoDB stream will retry for 24 hours until they are successful.

Wrapping up

Hopefully this post has explained how we replicate our DynamoDB data into Elasticsearch and keep the data up to date. You can use similar Lambda functions to duplicate DynamoDB data into Kinesis Firehose or into any data lake for analytics processing or ML applications.

Further aside - Kinesis Firehose can automatically insert data into Elasticsearch for you and throttle the insertions if you're ingesting more data than Elasticsearch can currently handle. Unfortunately we can't use that because you need a different firehose stream for each index and they need to be known up front. Our indexes are based on stores, which can be dynamically added which is why it's set up the way we have it.

Our Lambda code is pretty specific to our data transformation needs, which is why I haven't shared more of it here. But this should give you a good start to get going. And by using Swift, sharing model code and using libraries to do some of the heavy lifting for you, it can be nice and simple!