How to start with DynamoDB in your Ktor application (Part 3)
In the second part of my series about using DynamoDB
in a Ktor application I give an overview about how the modeling of tables can be optimized to get a consistent performance even the table data is growing fast. This can be achieved by using composite keys, a good choosing of the primary key and the creation of global secondary indexes for different use cases. But there are still topics uncovered that are important to know about when working with DynamoDB.
So today I want to cover these additional topics:
-
Using converters to control how data is persisted. Until now I only used the data types that are directly supported by DynamoDB and persisted in the same way, but that’s not always enough.
-
Processing large amount of data by using the batch processing feature of DynamoDB both for reading and writing operations.
-
Until now, I only used
QueryConditionals
in the queries but there are additional filter expressions available for query and scan operations. -
Also, I want to talk about TTL, a way to automatically clean up my tables to save storage.
-
Lastly, I want to show how sparse indexes can improve scanning for items.
So let’s start with converters…
Converters
DynamoDB supports a variety of data types, including scalar types (like String
, Number
, Binary
), document types (List
, Map
), and set types (String Set
, Number Set
, Binary Set
). However, real-world applications often require storing custom or complex data types. This is where converters come into play.
What Are Converters?
Converters are mechanisms that transform custom data types into a format that DynamoDB can store and retrieve. They serialize complex objects into DynamoDB-compatible types and deserialize them back when reading from the database.
Why Use Converters?
-
Flexibility: Store complex objects without flattening them into DynamoDB’s limited data types.
-
Efficiency: Optimize storage and retrieval by converting data into compact formats.
-
Integration: Seamlessly integrate with object-relational mappers (ORMs) or other data-handling libraries.
Implementing Converters
For showing how a custom converter can be used, I continue working in the GitHub repository that I already used in the previous two posts.
The starting point is a JSON object that consists of a user with multiple properties and also an embedded address object.
{
"userId" : 11113,
"name" : "John Doe",
"email" : "john.doe@mail.de",
"jobStatus": "EMPLOYED",
"createdAt": "2024-10-19T21:44:47.375139",
"address": {
"street": "Main Street",
"city": "Los Angeles",
"state": "Main",
"zip": 11111
}
}
Because I don’t want to persist the address in a separate table, it should be persisted as part of the user. Also, the jobStatus
should not be persisted as string (using the name of the enum value) but as number (to be able to change the name later without updating the items in the database). Lastly the createdAt
property, that is a timestamp that depends on the localization, I want to store it with offset UTC to not need to deal with timezones.
So how can this be done?
First, let’s have a look at how this would be implemented without the knowledge of converters.
data class AddressEntity(
val street: String,
val city: String,
val state: String,
val zip: Int
)
data class UserEntity(
@DynamoKtPartitionKey
val userId: Int,
val name: String,
val email: String,
val jobStatus: JobStatus,
val address: AddressEntity,
val createdAt: LocalDateTime = LocalDateTime.now(),
)
Persisting the above JSON input will lead to the following entry in the DynamoDB table.
{
"name": {
"S": "John Doe"
},
"createdAt": {
"S": "2024-10-19T21:44:47.375139"
},
"jobStatus": {
"S": "EMPLOYED"
},
"address": {
"M": {
"zip": {
"N": "11111"
},
"state": {
"S": "Main"
},
"city": {
"S": "Los Angeles"
},
"street": {
"S": "Main Street"
}
}
},
"userId": {
"N": "11113"
},
"email": {
"S": "john.doe@mail.de"
}
}
Not what I expected. Although the embedded address is persisted correctly as an embedded map of properties inside the user item, the jobStatus
and the createdAt
property are not as expected. So it’s time to implement custom converters.
A converter can easily change the default behavior and be implemented by extending the AttributeConverter
interface and specify the type that I want to convert.
class JobStatusConverter : AttributeConverter<JobStatus> {
override fun transformFrom(input: JobStatus): AttributeValue {
return AttributeValue.builder().n(input.id.toString()).build()
}
override fun transformTo(input: AttributeValue): JobStatus {
return JobStatus.fromId(input.n().toInt())
}
override fun type(): EnhancedType<JobStatus> = EnhancedType.of(JobStatus::class.java)
override fun attributeValueType(): AttributeValueType = AttributeValueType.N
}
class LocalDateTimeConverter : AttributeConverter<LocalDateTime> {
override fun transformFrom(input: LocalDateTime): AttributeValue {
return AttributeValue.builder().s(input.toInstant(ZoneOffset.UTC).toString()).build()
}
override fun transformTo(input: AttributeValue): LocalDateTime {
return Instant.parse(input.s()).atZone(ZoneId.of("Europe/Berlin")).toLocalDateTime()
}
override fun type(): EnhancedType<LocalDateTime> = EnhancedType.of(LocalDateTime::class.java)
override fun attributeValueType(): AttributeValueType = AttributeValueType.S
}
There are four methods I need to implement for every converter:
-
transFormFrom(…)
is used for mapping aJobStatus
to anAttributeValue
, which already provides implementations for the built-in types. In my case I want to store thejobStatus
as number, so I use the builder with then(…)
- function to create the corresponding number attribute. -
transformTo(…)
is used for parsing back the number to aJobStatus
. I use a parse function inside theJobStatus
enum to retrieve the matching entry by id (JobStatus.fromId
). -
type()
specifies the type I want to convert -JobStatus
. -
attributeValueType()
is used to specify the corresponding DynamoDB attribute type, in my caseAttributeValueType.N
for number.
The same I can do for transforming the LocalDateTime
to a string that represents a timestamp in UTC.
In the next step I need to tell DynamoDB to use the converter for serializing and deserializing the UserEntity
when storing or retrieving from database.
data class UserEntity(
@DynamoKtPartitionKey
val userId: Int,
val name: String,
val email: String,
@DynamoKtConverted(JobStatusConverter::class)
val jobStatus: JobStatus,
val address: AddressEntity,
@DynamoKtConverted(LocalDateTimeConverter::class)
val createdAt: LocalDateTime = LocalDateTime.now(),
)
That’s all. With this I can use the UserEntity
and the data is persisted in the expected way.
{
"name": {
"S": "John Doe"
},
"createdAt": {
"S": "2024-10-19T21:44:47.375139Z"
},
"jobStatus": {
"N": "1"
},
"address": {
"M": {
"zip": {
"N": "11111"
},
"state": {
"S": "Main"
},
"city": {
"S": "Los Angeles"
},
"street": {
"S": "Main Street"
}
}
},
"userId": {
"N": "11113"
},
"email": {
"S": "john.doe@mail.de"
}
}
Best Practices for Using Converters
-
Consistency: Ensure that converters are consistently transforming data without changing it during serialization and deserialization process, to maintain data integrity.
-
Performance: Optimize serialization and deserialization logic to minimize latency, especially for large or deeply nested objects.
-
Error Handling: Implement robust error handling to manage serialization/deserialization failures gracefully.
-
Versioning: If your custom data types evolve, manage versioning to handle different object schemas within your application.
Batch Processing for Read and Write Operations
DynamoDB's batch operations allow me to perform multiple read or write operations in a single API call. This can significantly enhance performance and reduce the number of network requests, especially when dealing with large datasets.
Batch Read Operations: BatchGetItem
BatchGetItem
allows me to retrieve multiple items from one or more tables in a single request. It’s an efficient way to fetch data when I need multiple items simultaneously.
-
Up to 100 items: I can request up to 100 items per
BatchGetItem
call. -
Multiple Tables: Supports fetching items from multiple tables in a single request.
-
Consistent Reads: Optionally use strongly consistent reads.
For the execution of the batchGetItem
- function I need to first create a ReadBatch
and add the
necessary Key
for every item, that I want to load. In case of the UserEntity
the primary key only consists of the partition key (userId
). With the ReadBatch
I can create a BatchGetItemEnhancedRequest
that can
be called on the DynamoDbEnhancedAsyncClient
. As you can see this operation is not available on the DynamoDbAsyncTable
. The consuming of the available result works similar as the consuming the result of a scan operation.
suspend fun batchGet(userIds: List<UserId>): List<User> {
val readBatch = ReadBatch.builder(UserEntity::class.java).mappedTableResource(table)
userIds.forEach { userId ->
readBatch.addGetItem(Key.builder().partitionValue(userId.value).build())
}
return buildList {
dynamoDbEnhancedAsyncClient.batchGetItem(
BatchGetItemEnhancedRequest.builder().readBatches(readBatch.build()).build()
).asFlow().collect{ it.resultsForTable(table).stream().forEach { item -> add(item.toUser()) } }
}
}
Batch Write Operations: BatchWriteItem
BatchWriteItem
enables me to perform multiple put or delete operations across one or more tables in a single API call.
-
Up to 25 requests: I can include up to 25
PutRequest
orDeleteRequest
actions. -
Atomicity: Each individual operation within the batch is atomic, but the entire batch is not transactional.
-
Retries: Handles unprocessed items by allowing me to retry failed operations.
Below, there is an example of persisting a list of users. This works the same for the deletion of multiple users or event mixing both operations.
suspend fun batchWrite(userList: List<User>) {
var writeBatch = WriteBatch.builder(
UserEntity::class.java
).mappedTableResource(table)
userList.forEach { user ->
writeBatch.addPutItem(user.toUserEntity())
}
var unprocessedItems = dynamoDbEnhancedAsyncClient.batchWriteItem(
BatchWriteItemEnhancedRequest.builder().writeBatches(writeBatch.build()).build()
).await().unprocessedPutItemsForTable(table)
val maxRetryCount = 3
var retry = 1
while (unprocessedItems.isNotEmpty() && retry++ <= maxRetryCount) {
writeBatch = WriteBatch.builder(
UserEntity::class.java
).mappedTableResource(table)
unprocessedItems.forEach { user ->
writeBatch.addPutItem(user)
}
unprocessedItems = dynamoDbEnhancedAsyncClient.batchWriteItem(
BatchWriteItemEnhancedRequest.builder().writeBatches(writeBatch.build()).build()
).await().unprocessedPutItemsForTable(table)
}
if(unprocessedItems.isNotEmpty()) {
error("Not able to write items $unprocessedItems to database.")
}
}
Comparing to the batchGetItem
- method the batchWriteItem
- method needs some additional functionality. In case the inserting or deleting does not successfully process all items in the first round; as a result, I get a not empty list of unprocessed items. So it is possible for me to retry the processing again. In the above example, I use a simple mechanism that retries at maximum 3 times before an exception is thrown on not all items processed.
Best Practices for batch processing:
-
Exponential Backoff: Implement retries with exponential backoff to handle throttling gracefully.
-
Idempotency: Ensure that repeated operations, (especially writes) are idempotent to prevent data inconsistencies.
-
Monitoring: Keep track of unprocessed items and monitor batch operation metrics to optimize performance.
-
Size Management: Keep batch sizes within DynamoDB limits (100 for reads, 25 for writes) to avoid errors.
-
Parallelism: Use parallel processing for large batch operations, but manage concurrency to prevent overwhelming the database.
-
Error Handling: Implement robust error handling and retry mechanisms to handle partial failures effectively.
Filter Expressions
In the second part of my series about DynamoDB I already showed how QueryConditionals
work to specify conditions for query a table or GSI. But that is not the only possibility to limit the result that is returned from querying a table. An additional possibility is to use filter expressions.
In DynamoDB, a filter expression is used to refine the data returned by a scan or query operation. The query limits the results set based on primary key conditions, and the result can further be filtered by additional expressions. Unlike a query, a scan examines every item in the table or secondary index. Filter expressions provide an additional way to narrow down the result set based on the values of attributes without affecting the consumed read capacity units.
Key Characteristics of Filter Expressions in Scans
-
Post-Scan Filtering: Filter expressions are applied after DynamoDB reads the items from the table, but before the results are returned to the application. This means all items are scanned, but only those that match the filter criteria are returned.
-
No Reduction in Read Capacity: Since the filter is applied after scanning the items, it does not reduce the number of read capacity units consumed by the operation. I’m still charged for scanning all items in the table, even though the filter may reduce the amount of data returned.
-
Multiple Conditions: I can use logical operators like AND, OR, and NOT in my filter expressions to combine multiple conditions.
-
Attribute Functions: DynamoDB supports several functions like
attribute_exists
,attribute_not_exists
,begins_with
,contains
, andsize
that can be used within filter expressions.
Example Usage
To show how filter expressions are working, I use the UserEntity
that I introduced for the custom converter. The important fields for this example are:
-
createdAt
(I want to get users filtered by a time range they are created.) -
jobStatus
(I want to get users filtered by a specific job status.) -
name
(I want to get users filtered by names starting with a specific string.)
I start with the jobStatus
filter expression:
suspend fun findByJobStatus(jobStatus: JobStatus): List<User> {
return buildList {
table.scan(
ScanEnhancedRequest.builder().filterExpression(
Expression.builder()
.expression("jobStatus = :jobStatus")
.putExpressionValue(":jobStatus", AttributeValue.builder().n(jobStatus.id.toString()).build())
.build()
).build()
).asFlow().collect{ it.items().stream().forEach { item -> add(item.toUser()) } }
}
}
The scan
- method has an additional parameter ScanEnhancedRequest
which can take a filter expression. The filter expression consists of an expression jobStatus = :jobStatus and the specification of the placeholders that are part of the expression. In the above example, I want to filter for the jobStatus
property with the value that was given as parameter. The JobStatus
type needs to be given as AttributeValue
type that matches the type of the property. The collection of the matching items is working the same as for the findAll
- method.
The next filter expression is the one for the name
:
suspend fun findByNameStartingWith(namePrefix: String): List<User> {
return buildList {
table.scan(
ScanEnhancedRequest.builder().filterExpression(
Expression.builder()
.expression("begins_with(#nameAttr, :namePrefix)")
.putExpressionValue(":namePrefix", AttributeValue.builder().s(namePrefix).build())
.putExpressionName("#nameAttr", "name")
.build()
).build()
).asFlow().collect{ it.items().stream().forEach { item -> add(item.toUser()) } }
}
}
The general creation of the filter expression is the same as in the previous example, there are just two differences. The property I want to filter for is name
and this is a reserved keyword in DynamoDB (see Link for a complete list of reserved keywords). So if I use the same syntax as in the previous example, there would be an exception when using the method.
software.amazon.awssdk.services.dynamodb.model.DynamoDbException: Invalid FilterExpression: Attribute name is a reserved keyword; reserved keyword: name (Service: DynamoDb, Status Code: 400, Request ID: 2c36e4f3-b8aa-4949-bdb2-118702873550)
at software.amazon.awssdk.services.dynamodb.model.DynamoDbException$BuilderImpl.build(DynamoDbException.java:104)
at software.amazon.awssdk.services.dynamodb.model.DynamoDbException$BuilderImpl.build(DynamoDbException.java:58)
To prevent this, I also use a placeholder for the name
of the property I want to use in the expression string and put the name of the property by calling the putExpressionName
- function.
The other difference is, that I use an available function of DynamoDB begins_with
, that automatically checks if the property of an item starts with the given string value.
The last example is the filter expression for all users created in time range:
suspend fun findAllCreatedInRange(start: LocalDateTime, end: LocalDateTime): List<User> {
return buildList {
table.scan(
ScanEnhancedRequest.builder().filterExpression(
Expression.builder()
.expression("createdAt BETWEEN :start AND :end")
.putExpressionValue(":start", AttributeValue.builder().s(start.toInstant(ZoneOffset.UTC).toString()).build())
.putExpressionValue(":end", AttributeValue.builder().s(end.toInstant(ZoneOffset.UTC).toString()).build()).build()
).build()
).asFlow().collect { it.items().stream().forEach { item -> add(item.toUser()) } }
}
}
For this filter expression I use the keyword BETWEEN
to search for all items in a time range. The same conditions as for query conditionals are also available for filter expressions.
Best Practices:
-
Use Projections: Limit the attributes returned by using a projection expression to reduce the size of the response.
-
Paginated Scans: Use paginated scans to avoid reading all items at once for large datasets.
-
Optimize Filter Logic
: Design filter expressions to exclude as much data as possible to minimize the amount of data transferred.
Time To Live (TTL)
In DynamoDB, Time to Live (TTL) is a mechanism that allows me to automatically expire and delete items after a specified period. Each item in a table can have a designated attribute, (usually a timestamp) that represents its expiration time. Once the specified time is reached, the item is marked for deletion and eventually removed by DynamoDB
without consuming any write throughput.
TTL is particularly useful in scenarios where I want to automatically purge outdated or unused data. In my case, I might want to use TTL to manage the lifecycle of user records, especially if they have a limited relevance period.
How TTL Works:
-
TTL Attribute: I specify an attribute in my table (e.g.,
expirationTime
) that holds a timestamp (in Unix epoch time format). This value represents the time after which the item will expire. -
Background Deletion: When the TTL is reached, DynamoDB marks the item for deletion. The deletion is not immediate but occurs in the background. Once deleted, the item is no longer accessible. Non-Billing: The deletion of items using TTL does not consume write capacity units, so it is an efficient way to manage stale data.
Applying TTL to the UserEntity Table
I want to use TTL in the UserEntity
table to automatically delete users after a certain period, such as users who have been inactive for a long time or are marked for deletion.
Modifying UserEntity to Include a TTL Attribute
I add an attribute expirationTime
to the UserEntity
class. This attribute will store the Unix timestamp representing when the user should be automatically deleted.
data class UserEntity(
@DynamoKtPartitionKey
val userId: Int,
val name: String,
val email: String,
@DynamoKtConverted(JobStatusConverter::class)
val jobStatus: JobStatus,
val address: AddressEntity,
@DynamoKtConverted(LocalDateTimeConverter::class)
val createdAt: LocalDateTime = LocalDateTime.now(),
val expirationTime: Long? = null // TTL attribute (Unix timestamp)
)
Setting the TTL Value for Users
In this example, when creating or updating a user, I can assign the expirationTime
attribute. This would typically be set to a timestamp (in Unix epoch time format) that indicates when the user should be deleted. For instance, if I want to expire a user record after 30 days of inactivity, I can calculate the expiration timestamp when creating or updating the user.
In the first step, I need to enable TTL for the UserEntity
. I use the AWS cli for this.
aws dynamodb update-time-to-live \
--table-name UserEntity \
--time-to-live-specification "Enabled=true, AttributeName=expirationTime" \
--endpoint-url http://localhost:8000
The result will be:
{
"TimeToLiveSpecification": {
"Enabled": true,
"AttributeName": "expirationTime"
}
}
In the next step I need to set the expirationTime
in all necessary items.
That’s all. From now on DynamoDB will automatically delete all user items if the value of the expirationTime
property is in the past. The items are not directly deleted but within the next hours after expiration. So it is important to not rely on the behavior that expired items are directly no longer available.
Sparse Index
A sparse index in DynamoDB refers to a type of index that contains only a subset of items from the base table. This is achieved by creating an index that includes only the items where the indexed attribute is present (not null). Sparse indexes are an efficient way to manage large datasets when I want to index only relevant items.
How Sparse Indexes Work:
In DynamoDB, a Global Secondary Indexes (GSIs) can be sparse. An index is considered sparse because only items in the base table that have a value for the indexed attribute are included in the index. Items that do not have the attribute (or have it as null) are excluded from the index. This feature is particularly useful when I want to create a secondary index on a table, but I only care about indexing a small subset of the table’s items.
Benefits of Sparse Indexes:
-
Reduced Storage Costs: Since only items with the indexed attribute are included, I save on storage costs because fewer items are stored in the index.
-
Efficient Queries: Queries on sparse indexes are more efficient because the index contains fewer items, reducing the number of read operations required.
-
Targeted Indexing: I can use sparse indexes to track only items with certain conditions, like expired items, orders in a certain status, or items flagged for review.
Use Case Example: Users with Expiration Time
In the section about TTL I marked items of the user table with an expirationTime
to let the DynamoDB automatically delete them after reaching the expiration. I can also create a sparse index to track only users that have an expirationTime
attribute, such as when managing temporary or inactive accounts.
Let’s say I want to create a Global Secondary Index (GSI) that tracks only users that have an expirationTime attribute. This is useful if you want to query for users whose accounts are expiring soon.
The base table is the UserEntity
from above. To create the sparse index I need to execute the below command using the AWS cli.
aws dynamodb update-table \
--table-name UserEntity \
--attribute-definitions AttributeName=expirationTime,AttributeType=N \
--global-secondary-index-updates \
"[{\"Create\":{\"IndexName\":\"User-ExpirationTime-index\",\"KeySchema\":[{\"AttributeName\":\"expirationTime\",\"KeyType\":\"HASH\"}],\"Projection\":{\"ProjectionType\":\"ALL\"},\"ProvisionedThroughput\":{\"ReadCapacityUnits\":5,\"WriteCapacityUnits\":5}}}]" \
--endpoint-url http://localhost:8000
-
Attribute Definitions: I define
expirationTime
as the indexed attribute. -
Global Secondary Index (GSI): I create a new GSI named
User-ExpirationTime-index
, where the partition key isexpirationTime
. -
Projection: ProjectionType=ALL means that all attributes from the base table (not just the key attributes) are projected into the index.
Now that I’ve created a sparse index on expirationTime
, I can query the index to retrieve only users who have an expiration time. Here’s an example of a query to fetch users whose expirationTime is in the past.
suspend fun findAllExpired(): List<User> {
val currentTimestamp = Instant.now().epochSecond
return buildList {
table.index(USER_EXPIRATION_TIME_INDEX)
.scan(ScanEnhancedRequest.builder().filterExpression(
Expression.builder()
.expression("expirationTime < :currentTime")
.putExpressionValue(":currentTime", AttributeValue.builder().n(currentTimestamp.toString()).build())
.build()
).build())
.asFlow().collect { it.items().stream().forEach { item -> add(item.toUser()) } }
}
}
The filter expression is performant because the index only contains items with an expirationTime
property. This only makes sense if the expirationTime
is only set on a small number of items.
When to Use Sparse Indexes?
-
Optional Attributes: If an attribute is optional or only applies to a subset of items, I can create a sparse index to track only items where this attribute is present.
-
Time-Based Data: Sparse indexes are great for time-based data, such as expiration dates, where you only care about indexing items that will expire soon or items that need to be processed within a certain time frame.
-
Flagged Data: If I have a flag attribute (e.g., isArchived, isDeleted, etc.), I can create a sparse index to track only items where that flag is set, reducing the overhead of scanning the entire table.
Conclusion
In this post, I explored several additional key features of DynamoDB, including converters for custom data types, batch processing, filter expressions, Time to Live (TTL), and sparse indexes, each of which enhances the overall usability.
Converters for Custom Data Types: By using custom converters, DynamoDB allows seamless storage and retrieval of complex data types, such as LocalDateTime
or enumerations, making it easier to work with domain-specific objects in a type-safe way. This provides developers the flexibility to integrate DynamoDB with object models without needing to manually serialize or deserialize them.
Batch Processing for reads and writes: Batch operations significantly enhance throughput by allowing multiple read or write operations in a single API call. Whether I’m fetching multiple items with batchGetItem
or inserting/updating items with batchWriteItem
, batch processing helps optimize performance by reducing the number of individual calls, thus reducing latency and improving overall system efficiency.
Filter Expressions: These allow fine-grained control over the data I retrieve from a scan or query operation. I can filter items based on attributes, ranges, or conditions, ensuring that you fetch only the relevant data without loading unnecessary items. Filter expressions can also be combined with various attributes to make my queries more targeted and efficient.
TTL (Time to Live): TTL provides a mechanism for automatically expiring and deleting items after a certain period. This feature is incredibly useful for use cases like session management or expiring temporary data, as it automates the cleanup of stale data, reducing storage costs and keeping my dataset lean without consuming write capacity.
Sparse Indexes: Sparse indexes give me the power to index only a subset of items that contain specific attributes. This is an efficient approach when I want to focus on data that meets certain conditions, such as items with expiration times or flagged statuses. Sparse indexes are a great tool for optimizing query performance while saving on storage and maintaining targeted access to my data.
That’s it for now with my series about DynamoDB. I covered all the topics that have been relevant for me in the last months since I started using DynamoDB
as persistence storage in one of my production projects. There are surely a lot of other topics around DynamoDB available, that I haven’t touched until now, so maybe there will be an additional fourth part in the future.
You can find the full code used for this article on Github.
Comments