Using R and {paws} to populate DynamoDB tables #2
In our previous post we covered some basics of using the paws
SDK
to interact with AWS DynamoDB
from R
. We wrote a few simple functions to prepare R
lists in the appropriate format for DynamoDB
and some wrappers to put these lists as items in the remote NoSQL database.
These are good first steps, but we can’t really use these functions in a production setting, where we might need to populate a table with a few thousand or few million rows. In this post we’ll extend our work to accommodate these more realistic needs.
Creating DynamoDB
tables from R
We first create a function to wrap the call to paws::dynamodb()
that we use to establish a client for the cloud database service. We assume the credentials are stored in an .Renviron
file and available in R
as environmental variables through Sys.getenv
. Our connection function then is simply:
dynamo_connect <- function() {
paws::dynamodb(config = list(
credentials = list(
creds = list(
access_key_id = Sys.getenv("ACCESS_KEY_ID"),
secret_access_key = Sys.getenv("SECRET_ACCESS_KEY")
),
profile = Sys.getenv("PROFILE")
),
region = Sys.getenv("REGION")
))
}
Users experienced with DynamoDB
might notice that we don’t use all config options,
including session_token
and endpoint
. So far we haven’t needed these. As mentioned before,
the paws
documentation is extensive, so we point the user to ?paws::dynamodb
for more details
regarding the configuration.
Once we have the client ready, we can create a table from R
. We’ll set both a partition key
,
or HASH
key as specified in the KeySchema
(id
) and a sort key
or RANGE
(timestamp). Later
we can query the table using these keys which together form a composite primary key, but for now
we need to have the table, and start populating it.
con <- dynamo_connect()
con$create_table(
AttributeDefinitions = list(
list(
AttributeName = "id",
AttributeType = "N"
),
list(
AttributeName = "timestamp",
AttributeType = "N"
)
),
KeySchema = list(
list(
AttributeName = "id",
KeyType = "HASH"
),
list(
AttributeName = "timestamp",
KeyType = "RANGE"
)
),
ProvisionedThroughput = list(
ReadCapacityUnits = 1,
WriteCapacityUnits = 1
),
TableName = "example"
)
To confirm, we can query the DB for the list of tables:
con$list_tables()
Loading a large-ish table into DynamoDB
Our next step is to start loading data. For this, we’ll generate some random data.
data_to_put <- data.frame(
id = 1:10000,
timestamp = as.numeric(seq.POSIXt(
from = as.POSIXct("2020-01-01"),
to = Sys.time(),
length.out = 10000
)),
measurement = sample.int(n = 100, size = 10000, replace = TRUE)
)
We can now try to put these data into our newly created table using the functions from
our previous post on {paws}
.
But that would not be a great approach, because there we are using APIs intended for putting single
items or a small volume. Instead, lets write a wrapper around the batch_write_item
API, so we can load our data in bulk.
To batch write to our DynamoDB table, we need to create a requests for each item
(row of the table) we wish to put. The put requests have the following format,
identical to the nested named list produced by our function dynamo_item_prep
, but
with two more layers to label the Item
and the type of request as PutRequest
.
list(PutRequest = list(Item = dynamo_item_prep(.item = data_to_put[1, ])))
#> $PutRequest
#> $PutRequest$Item
#> $PutRequest$Item$id
#> $PutRequest$Item$id$N
#> [1] 1
#> $PutRequest$Item$timestamp
#> $PutRequest$Item$timestamp$N
#> [1] 1577858400
#> $PutRequest$Item$measurement
#> $PutRequest$Item$measurement$N
#> [1] 28
A list of 25 PutRequest
s (or DeleteRequest
s) weighing up to 16MB can be submitted in one batch
(see ?paws.database::dynamodb_batch_write_item
), so we also need a way to divide our
10K items into 25-item chunks. Our updated dynamo_bulk_put
function applies these two
modifications to adjust the formatting and split the items into digestible chunks.
dynamo_bulk_put <- function(.con, .table, .df) {
requests <- lapply(1:nrow(.df), function(i) {
list(PutRequest = list(Item = dynamo_item_prep(.item = .df[i, ])))
})
n_items <- length(requests)
# from https://stackoverflow.com/a/18857482/8543257
chunked <-
split(requests, rep(1:ceiling(n_items / 25), each = 25)[1:n_items])
lapply(chunked, function(L) {
requestList <- list()
requestList[[.table]] <- L
.con$batch_write_item(RequestItems = requestList)
})
}
To test this, we first delete and re-create the example
table:
con$delete_table(TableName = "example")
con$create_table(
AttributeDefinitions = list(
list(
AttributeName = "id",
AttributeType = "N"
),
list(
AttributeName = "timestamp",
AttributeType = "N"
)
),
KeySchema = list(
list(
AttributeName = "id",
KeyType = "HASH"
),
list(
AttributeName = "timestamp",
KeyType = "RANGE"
)
),
ProvisionedThroughput = list(
ReadCapacityUnits = 1,
WriteCapacityUnits = 1
),
TableName = "example"
)
And then try to put all 10K rows:
dynamo_bulk_put(.con = con, .table = "example", .df = data_to_put)
#> Error: com.amazonaws.dynamodb.v20120810 (HTTP 400). The level of configured provisioned throughput for the table was
#> exceeded. Consider increasing your provisioning level with the UpdateTable API.
But we get an error because we are making more requests than the default provisioned throughput limit for our table.
To address this, we should increase the provisioning level using dynamodb_update_table
. In this
case we should increase the WriteCapacityUnits
, as we are trying to write data,
and try to load our 10K rows again. For details on the meaning of the capacity units,
and different types of provisioning for DynamoDB tables, consult the official documentation.
con$update_table(
ProvisionedThroughput = list(
ReadCapacityUnits = 1,
WriteCapacityUnits = 50
),
TableName = "example"
)
dynamo_bulk_put(.con = con, .table = "example", .df = data_to_put)
After loading all of our data, we can check the number of items in the cloud table with:
con$scan(TableName = "example", Select = "COUNT")$Count
#> [1] 10000
Finally, remember to update the provisioning to lower the write capacity units to default level, and avoid paying four resources we no longer need.
con$update_table(
ProvisionedThroughput = list(
ReadCapacityUnits = 1,
WriteCapacityUnits = 1
),
TableName = "example"
)