A plumber API to filter and aggregate datasets

Demo of a {plumber} that uses dynamic filtering and aggregation
R
plumber
API
filter
aggregate
httr2
Author

teo

Published

2025-02-13

Over the past couple of weeks, I’ve been sharing some code that I use for dynamic filtering and aggregation of data frames in R. The idea of these functions was to have single-step methods for complex filter or aggregate queries. Each of the functions worked with a list of instructions, specifically formatted for the task. I adopted this approach to make it easier to use these functions in web applications or APIs. If the filter or aggregate query request can be sent as a list, then it can be easily converted to JSON and used in API calls.

In this post, I am going to demo how we can integrate the functions from the previous two posts into a {plumber} API and, after deployment, have /filter and /aggregate endpoints available as a “service”.

Plumber API for filter and aggregate

The {plumber} API is organized in the form of an R package.

├── DESCRIPTION
├── inst
│   └── plumber.r
├── man
│   ├── dot-aggregate.Rd
│   ├── dot-filter.Rd
│   └── run_api.Rd
├── NAMESPACE
├── plumb.ex.Rproj
├── R
│   ├── api.r
│   └── funs.r
└── test.r

The R directory contains the script funs.r where I define the .filter and .aggregate functions I covered in the previous two posts. The .api.r script contains a single function that starts the API on our preferred host and port (defaults to localhost and 5000).

#' start plumber api
#' @param host the host address
#' @param port the port to use
#' @importFrom plumber pr pr_run pr_get
#' @export
run_api <- function(port = 5000, host = "127.0.0.1") {
  path <- system.file("plumber.r", package = "plumb.ex")
  path |>
    pr() |>
    pr_run(port = port, host = host)
}

This is similar to {golem}’s run_app() function and it streamlines the development cycle. Whenever I make updates to the underlying code in funs.r or the {plumber} endpoints in inst/plumber.r, I can run:

devtools::document() # re-documents and loads all functions 
run_api()

and that would start the API making it accessible at http://127.0.0.1:5000, with the {swagger} interface for manual testing at http://127.0.0.1:5000/__docs__/. It is then ready for queries.

Endpoints

The package exports the .filter and .aggregate functions. Then the API uses these when defining the endpoints. In this example API we have three endpoints defined in inst/plumber.r:

# plumber.R.

#* @plumber
#* @apiTitle Plumber filter or aggregate
#* @apiDescription Plumber example for dynamic filtering and aggregation
#* @apiVersion 1.0.0
function(pr) {
  pr |>
    plumber::pr_set_serializer(plumber::serializer_unboxed_json())
}

#* Hello
#* @get /hello
function() {
  "Hello, there!"
}

#* Filter
#* @param data the name of the data frame to filter, `"iris"` or `"mtcars"`
#* @param instructions_list a list of instructions for aggregation.
#* @post /filter
function(data, instructions_list) {
  d <- get(data)
  l <- jsonlite::fromJSON(instructions_list, simplifyVector = FALSE)
  .filter(d, l)
}

#* Aggregate
#* @param data the name of the data frame to aggregate, `"iris"` or `"mtcars"`
#* @param instructions_list a list of instructions for aggregation.
#* @post /aggregate
function(data, instructions_list) {
  d <- get(data)
  l <- jsonlite::fromJSON(instructions_list, simplifyVector = FALSE)
  .aggregate(d, l)
}

The first is /hello and is simply a health check endpoint. Using httr2 we can make requests to it as follows:

r$> library(jsonlite)
    library(httr2)
 
    request("http://127.0.0.1:5000/hello") |>
      req_perform() |>
      resp_body_json()
[1] "Hello, there!"

The other two endpoints are simple wrappers around .filter and .aggregate to do two steps:

  1. get the dataset requested by the user. For simplicity in this example, the dataset argument is passed on as a string, and then we user get to get that dataset, assuming its one of the ones prepackaged with R. But its easy to modify that and simply send JSON data to the endpoint.

  2. convert the JSON received by the API into a list as required in R. This is needed because the instructions list for filtering or aggregation is sent with the request as JSON, and the validation within our functions expects a list. One could relax these requirement as well by allowing a JSON string in the instructions_list argument and parsing this internally.

Interactive demo

Start the API

During development, we would run:

devtools::document() # re-documents and loads all functions 
run_api()

If the plumb.ex package is installed, we would instead run:

library(plumb.ex) # re-documents and loads all functions 
plumb.ex::run_api(port = 6000)

If its deployed, for one method see this post, we follow the code below to make requests.

Filter

From R and {shiny} we can call the /filter endpoint using the following steps. First, we create our filter instructions, convert that to JSON, and finally use httr or httr2 to make a request to the API:

library(jsonlite)
library(httr2)

filter_data <- list(
  list(col = "Sepal.Length", fun = "between", min = 4.9, max = 5),
  list(col = "Species", fun = "in", val = c("setosa", "versicolor"))
)

filter_json <- jsonlite::toJSON(filter_data, auto_unbox = TRUE)
jsonlite::prettify(filter_json)
[
    {
        "col": "Sepal.Length",
        "fun": "between",
        "min": 4.9,
        "max": 5
    },
    {
        "col": "Species",
        "fun": "in",
        "val": [
            "setosa",
            "versicolor"
        ]
    }
]
 
request("http://127.0.0.1:6000") |>
  req_url_path("filter") |>
  req_body_json(
    data = list(data = "iris", instructions_list = filter_json),
    auto_unbox = TRUE
  ) |>
  req_perform() |>
  resp_body_json(simplifyVector = TRUE)
   Sepal.Length Sepal.Width Petal.Length Petal.Width    Species
1           4.9         3.0          1.4         0.2     setosa
2           5.0         3.6          1.4         0.2     setosa
3           5.0         3.4          1.5         0.2     setosa
4           4.9         3.1          1.5         0.1     setosa
5           5.0         3.0          1.6         0.2     setosa
6           5.0         3.4          1.6         0.4     setosa
7           4.9         3.1          1.5         0.2     setosa
8           5.0         3.2          1.2         0.2     setosa
9           4.9         3.6          1.4         0.1     setosa
10          5.0         3.5          1.3         0.3     setosa
11          5.0         3.5          1.6         0.6     setosa
12          5.0         3.3          1.4         0.2     setosa
13          4.9         2.4          3.3         1.0 versicolor
14          5.0         2.0          3.5         1.0 versicolor
15          5.0         2.3          3.3         1.0 versicolor

Aggregate

Similarly, for the /aggregate endpoint:

library(jsonlite)
library(httr2)

aggregate_instructions <- list(
  groups = list(col = "Species"),
  aggregates = list(
    list(col = "Sepal.Length", fun = "mean"),
    list(col = "Sepal.Width", fun = "mean")
  )
)

aggregate_json <- jsonlite::toJSON(aggregate_instructions, auto_unbox = TRUE)
jsonlite::prettify(aggregate_json)
{
    "groups": {
        "col": "Species"
    },
    "aggregates": [
        {
            "col": "Sepal.Length",
            "fun": "mean"
        },
        {
            "col": "Sepal.Width",
            "fun": "mean"
        }
    ]
}
 
request("http://127.0.0.1:5000") |>
  req_url_path("aggregate") |>
  req_body_json(
    data = list(data = "iris", instructions_list = aggregate_json),
    auto_unbox = TRUE
  ) |>
  req_perform() |>
  resp_body_json(simplifyVector = TRUE)
     Species Sepal.Length_mean Sepal.Width_mean
1     setosa              5.01             3.43
2 versicolor              5.94             2.77
3  virginica              6.59             2.97

How is this useful?

I struggle with this sometimes my self. Isn’t this why we have databases to begin with? Of course, having an API that queries a database would be preferable in most applications requiring large amounts of data. However, sometimes we don’t have the expertise, bandwidth, or budget for a database setup, or we could be working with local files, in arrow, parquet, qs or some other format where it might make sense or is justifiable to run the queries in R it self.

Summary

In this post, we demonstrated how to create a {plumber} API that provides endpoints for dynamic filtering and aggregation of datasets in R. We structured the API as an R package, defined the necessary functions, and set up the endpoints to handle JSON requests. We also provided examples of how to interact with the API using httr2 from R. This approach can be useful for scenarios where a database setup is not feasible, and we need to work with data files directly in R.