П о д а т о ц и

Data are fun

Using R and {paws} to populate DynamoDB tables #2

In our previous post we covered some basics of using the paws SDK to interact with AWS DynamoDB from R. We wrote a few simple functions to prepare R lists in the appropriate format for DynamoDB and some wrappers to put these lists as items in the remote NoSQL database.

These are good first steps, but we can’t really use these functions in a production setting, where we might need to populate a table with a few thousand or few million rows. In this post we’ll extend our work to accommodate these more realistic needs.

Creating DynamoDB tables from R

We first create a function to wrap the call to paws::dynamodb() that we use to establish a client for the cloud database service. We assume the credentials are stored in an .Renviron file and available in R as environmental variables through Sys.getenv. Our connection function then is simply:

dynamo_connect <- function() {
  paws::dynamodb(config = list(
    credentials = list(
      creds = list(
        access_key_id = Sys.getenv("ACCESS_KEY_ID"),
        secret_access_key = Sys.getenv("SECRET_ACCESS_KEY")
      ),
      profile = Sys.getenv("PROFILE")
    ),
    region = Sys.getenv("REGION")
  ))
}

Users experienced with DynamoDB might notice that we don’t use all config options, including session_token and endpoint. So far we haven’t needed these. As mentioned before, the paws documentation is extensive, so we point the user to ?paws::dynamodb for more details regarding the configuration.

Once we have the client ready, we can create a table from R. We’ll set both a partition key, or HASH key as specified in the KeySchema (id) and a sort key or RANGE (timestamp). Later we can query the table using these keys which together form a composite primary key, but for now we need to have the table, and start populating it.

con <- dynamo_connect()
con$create_table(
  AttributeDefinitions = list(
    list(
      AttributeName = "id",
      AttributeType = "N"
    ),
    list(
      AttributeName = "timestamp",
      AttributeType = "N"
    )
  ),
  KeySchema = list(
    list(
      AttributeName = "id",
      KeyType = "HASH"
    ),
    list(
      AttributeName = "timestamp",
      KeyType = "RANGE"
    )
  ),
  ProvisionedThroughput = list(
    ReadCapacityUnits = 1,
    WriteCapacityUnits = 1
  ),
  TableName = "example"
)

To confirm, we can query the DB for the list of tables:

con$list_tables()

Loading a large-ish table into DynamoDB

Our next step is to start loading data. For this, we’ll generate some random data.

data_to_put <- data.frame(
  id = 1:10000,
  timestamp = as.numeric(seq.POSIXt(
    from = as.POSIXct("2020-01-01"),
    to = Sys.time(),
    length.out = 10000
  )),
  measurement = sample.int(n = 100, size = 10000, replace = TRUE)
)

We can now try to put these data into our newly created table using the functions from our previous post on {paws}. But that would not be a great approach, because there we are using APIs intended for putting single items or a small volume. Instead, lets write a wrapper around the batch_write_item API, so we can load our data in bulk.

To batch write to our DynamoDB table, we need to create a requests for each item (row of the table) we wish to put. The put requests have the following format, identical to the nested named list produced by our function dynamo_item_prep, but with two more layers to label the Item and the type of request as PutRequest.

list(PutRequest = list(Item = dynamo_item_prep(.item = data_to_put[1, ])))
#> $PutRequest
#> $PutRequest$Item
#> $PutRequest$Item$id
#> $PutRequest$Item$id$N
#> [1] 1


#> $PutRequest$Item$timestamp
#> $PutRequest$Item$timestamp$N
#> [1] 1577858400


#> $PutRequest$Item$measurement
#> $PutRequest$Item$measurement$N
#> [1] 28

A list of 25 PutRequests (or DeleteRequests) weighing up to 16MB can be submitted in one batch (see ?paws.database::dynamodb_batch_write_item), so we also need a way to divide our 10K items into 25-item chunks. Our updated dynamo_bulk_put function applies these two modifications to adjust the formatting and split the items into digestible chunks.

dynamo_bulk_put <- function(.con, .table, .df) {
  requests <- lapply(1:nrow(.df), function(i) {
    list(PutRequest = list(Item = dynamo_item_prep(.item = .df[i, ])))
  })
  
  n_items <- length(requests)
  # from https://stackoverflow.com/a/18857482/8543257
  chunked <-
    split(requests, rep(1:ceiling(n_items / 25), each = 25)[1:n_items])
  
  lapply(chunked, function(L) {
    requestList <- list()
    requestList[[.table]] <- L
    .con$batch_write_item(RequestItems = requestList)
  })
}

To test this, we first delete and re-create the example table:

con$delete_table(TableName = "example")
con$create_table(
  AttributeDefinitions = list(
    list(
      AttributeName = "id",
      AttributeType = "N"
    ),
    list(
      AttributeName = "timestamp",
      AttributeType = "N"
    )
  ),
  KeySchema = list(
    list(
      AttributeName = "id",
      KeyType = "HASH"
    ),
    list(
      AttributeName = "timestamp",
      KeyType = "RANGE"
    )
  ),
  ProvisionedThroughput = list(
    ReadCapacityUnits = 1,
    WriteCapacityUnits = 1
  ),
  TableName = "example"
)

And then try to put all 10K rows:

dynamo_bulk_put(.con = con, .table = "example", .df = data_to_put)
#> Error: com.amazonaws.dynamodb.v20120810 (HTTP 400). The level of configured provisioned throughput for the table was
#> exceeded. Consider increasing your provisioning level with the UpdateTable API.

But we get an error because we are making more requests than the default provisioned throughput limit for our table.

To address this, we should increase the provisioning level using dynamodb_update_table. In this case we should increase the WriteCapacityUnits, as we are trying to write data, and try to load our 10K rows again. For details on the meaning of the capacity units, and different types of provisioning for DynamoDB tables, consult the official documentation.

con$update_table(
  ProvisionedThroughput = list(
    ReadCapacityUnits = 1,
    WriteCapacityUnits = 50
  ),
  TableName = "example"
)
dynamo_bulk_put(.con = con, .table = "example", .df = data_to_put)

After loading all of our data, we can check the number of items in the cloud table with:

con$scan(TableName = "example", Select = "COUNT")$Count
#> [1] 10000

Finally, remember to update the provisioning to lower the write capacity units to default level, and avoid paying four resources we no longer need.

con$update_table(
  ProvisionedThroughput = list(
    ReadCapacityUnits = 1,
    WriteCapacityUnits = 1
  ),
  TableName = "example"
)