<- function() {
dynamo_connect ::dynamodb(config = list(
pawscredentials = list(
creds = list(
access_key_id = Sys.getenv("ACCESS_KEY_ID"),
secret_access_key = Sys.getenv("SECRET_ACCESS_KEY")
),profile = Sys.getenv("PROFILE")
),region = Sys.getenv("REGION")
)) }
In our previous post we covered some basics of using the paws
SDK
to interact with AWS DynamoDB
from R
. We wrote a few simple functions to prepare R
lists in the appropriate format for DynamoDB
and some wrappers to put these lists as items in the remote NoSQL database.
These are good first steps, but we can’t really use these functions in a production setting, where we might need to populate a table with a few thousand or few million rows. In this post we’ll extend our work to accommodate these more realistic needs.
Creating DynamoDB
tables from R
We first create a function to wrap the call to paws::dynamodb()
that we use to establish a client for the cloud database service. We assume the credentials are stored in an .Renviron
file and available in R
as environmental variables through Sys.getenv
. Our connection function then is simply:
Users experienced with DynamoDB
might notice that we don’t use all config options, including session_token
and endpoint
. So far we haven’t needed these. As mentioned before, the paws
documentation is extensive, so we point the user to ?paws::dynamodb
for more details regarding the configuration.
Once we have the client ready, we can create a table from R
. We’ll set both a partition key
, or HASH
key as specified in the KeySchema
(id
) and a sort key
or RANGE
(timestamp). Later we can query the table using these keys which together form a composite primary key, but for now we need to have the table, and start populating it.
<- dynamo_connect()
con $create_table(
conAttributeDefinitions = list(
list(
AttributeName = "id",
AttributeType = "N"
),list(
AttributeName = "timestamp",
AttributeType = "N"
)
),KeySchema = list(
list(
AttributeName = "id",
KeyType = "HASH"
),list(
AttributeName = "timestamp",
KeyType = "RANGE"
)
),ProvisionedThroughput = list(
ReadCapacityUnits = 1,
WriteCapacityUnits = 1
),TableName = "example"
)
To confirm, we can query the DB for the list of tables:
$list_tables() con
Loading a large-ish table into DynamoDB
Our next step is to start loading data. For this, we’ll generate some random data.
<- data.frame(
data_to_put id = 1:10000,
timestamp = as.numeric(seq.POSIXt(
from = as.POSIXct("2020-01-01"),
to = Sys.time(),
length.out = 10000
)),measurement = sample.int(n = 100, size = 10000, replace = TRUE)
)
We can now try to put these data into our newly created table using the functions from our previous post on {paws}
. But that would not be a great approach, because there we are using APIs intended for putting single items or a small volume. Instead, lets write a wrapper around the batch_write_item
API, so we can load our data in bulk.
To batch write to our DynamoDB table, we need to create a requests for each item (row of the table) we wish to put. The put requests have the following format, identical to the nested named list produced by our function dynamo_item_prep
, but with two more layers to label the Item
and the type of request as PutRequest
.
list(PutRequest = list(Item = dynamo_item_prep(.item = data_to_put[1, ])))
#> $PutRequest
#> $PutRequest$Item
#> $PutRequest$Item$id
#> $PutRequest$Item$id$N
#> [1] 1
#> $PutRequest$Item$timestamp
#> $PutRequest$Item$timestamp$N
#> [1] 1577858400
#> $PutRequest$Item$measurement
#> $PutRequest$Item$measurement$N
#> [1] 28
A list of 25 PutRequest
s (or DeleteRequest
s) weighing up to 16MB can be submitted in one batch (see ?paws.database::dynamodb_batch_write_item
), so we also need a way to divide our 10K items into 25-item chunks. Our updated dynamo_bulk_put
function applies these two modifications to adjust the formatting and split the items into digestible chunks.
<- function(.con, .table, .df) {
dynamo_bulk_put <- lapply(1:nrow(.df), function(i) {
requests list(PutRequest = list(Item = dynamo_item_prep(.item = .df[i, ])))
})
<- length(requests)
n_items # from https://stackoverflow.com/a/18857482/8543257
<-
chunked split(requests, rep(1:ceiling(n_items / 25), each = 25)[1:n_items])
lapply(chunked, function(L) {
<- list()
requestList <- L
requestList[[.table]] $batch_write_item(RequestItems = requestList)
.con
}) }
To test this, we first delete and re-create the example
table:
$delete_table(TableName = "example")
con$create_table(
conAttributeDefinitions = list(
list(
AttributeName = "id",
AttributeType = "N"
),list(
AttributeName = "timestamp",
AttributeType = "N"
)
),KeySchema = list(
list(
AttributeName = "id",
KeyType = "HASH"
),list(
AttributeName = "timestamp",
KeyType = "RANGE"
)
),ProvisionedThroughput = list(
ReadCapacityUnits = 1,
WriteCapacityUnits = 1
),TableName = "example"
)
And then try to put all 10K rows:
dynamo_bulk_put(.con = con, .table = "example", .df = data_to_put)
#> Error: com.amazonaws.dynamodb.v20120810 (HTTP 400). The level of configured provisioned throughput for the table was
#> exceeded. Consider increasing your provisioning level with the UpdateTable API.
But we get an error because we are making more requests than the default provisioned throughput limit for our table.
To address this, we should increase the provisioning level using dynamodb_update_table
. In this case we should increase the WriteCapacityUnits
, as we are trying to write data, and try to load our 10K rows again. For details on the meaning of the capacity units, and different types of provisioning for DynamoDB tables, consult the official documentation.
$update_table(
conProvisionedThroughput = list(
ReadCapacityUnits = 1,
WriteCapacityUnits = 50
),TableName = "example"
)dynamo_bulk_put(.con = con, .table = "example", .df = data_to_put)
After loading all of our data, we can check the number of items in the cloud table with:
$scan(TableName = "example", Select = "COUNT")$Count
con#> [1] 10000
Finally, remember to update the provisioning to lower the write capacity units to default level, and avoid paying four resources we no longer need.
$update_table(
conProvisionedThroughput = list(
ReadCapacityUnits = 1,
WriteCapacityUnits = 1
),TableName = "example"
)