Real-time applications in R plumber with WebSockets

Gabriel
6 min readDec 13, 2021

--

Few weeks ago, I was updating a web application that had a dashboard. The backend was built using R plumber and the frontend with Angular. For no other reason besides thinking that it would look cool, I wanted to implement a real-time updating of some of the plots in the dashboard.

Normally I find libraries for R that does basically anything that comes to mind. However, this time I did not find a way to use e.g. socket.io for this real-time behavior, so I opted to go with WebSockets.

Enabling WebSockets in R plumber

Few hours later, I found out that the current version of R plumber does not support WebSockets :(

library(plumber)
y<-Plumber$new()
y$onWSOpen()
Warning message:In y$onWSOpen() : WebSockets not supported.

Luckily enough, turns out the authors had already proposed a solution to enable WebSockets in R plumber. In order to be able to modify the onWSOpen method from the Plumber “object”, I created a new R6Class that inherits Plumber and added the necessary methods and variables:

#plumberWebSocket.Rlibrary(plumber)
library(websocket)
library(R6)
PlumberWebSocket <- R6Class("PlumberWebSocket",
inherit = Plumber,
public = list(
onWSOpen = function(ws) {
if (is.function(private$ws_open)) {
private$ws_open(ws)
}
invisible(self)
},
websocket = function(open = NULL) {
if (!is.null(open)) stopifnot(is.function(open))
private$ws_open <- open
}
),
private = list(
ws_open = NULL
)
)

With this in place, now I was able to use both WebSockets and HTTP in my plumber application!

Now, let’s look at a couple of example on how to use this new capability.

Example 1: creating a chat service in R

This is as bizarre as it sounds… Who would create a chat service using R? Anyway, I find it to be a very good first example to make sure that WebSockets are working.

First of all, the server side code for this example looks like this:

library(uuid)clients <- list()addClient <- function(ws) {
clients[[ws$request$uuid]] <<- ws #<<- modifies clients globally
return(clients)
}
removeClient <- function(uuid) {
clients[[uuid]] <<- NULL
}
addClientUserName <- function(ws, name) {
clients[[ws$request$uuid]]$request$USER <<- name
}
#' @plumber
function(pr) {
pr$websocket(
function(ws) {
ws$request$uuid <- UUIDgenerate()
ws$request$USER <- "Unknown"
addClient(ws)
print("New user connected!")
ws$onMessage(function(binary, message) {
if(grepl("My name is=",message)) {
addClientUserName(ws, strsplit(message,"=")[[1]][2])
}
else {
for(client in clients) { client$send(paste(clients[[ws$request$uuid]]$request$USER,":", message))
}
}
})
ws$onClose(function() {
removeClient(ws$request$uuid)
})
}
)
}

The first function addClient()is used to store each client’s WebSocket connection information in a list so that I can track them. This function is called when a user connects, and a unique id is added to the request object ws$request$uuid <- UUIDgenerate() . With this in place, now I can send messages to specific (or all) clients by looping through the clients list.

The second function removeClient() is used to remove the connection information of the clients that close the connection. This one is triggered in ws$onClose() .

The third function addClientUserName() gets triggered with ws$onMessage()when a user send a message that starts with “My name is=<name>” and associates the <name> to a connection in the list of connections. This is just a dummy way to assign usernames and have some kind of authentication in this very simple chat application.

The rest of the code enables WebSockets in our R plumber application and defines what happens in each of the events. For example, in ws$onMessage() when the message is any other string but “My name is=” will loop through all the connections stored in the clients list and send the message to each of the clients.

In order to start the server, this would be the command:

library(plumber)
source("plumberWebSocket.R")
PlumberWebSocket$new("plumber_chat.R")$run(port=8080)
#Running plumber API at http://127.0.0.1:8080
#Running swagger Docs at
http://127.0.0.1:8080/__docs__/

Now, let’s chat using R and the websockets client library. The client side code looks like:

library(websocket)
ws <- websocket::WebSocket$new("ws://127.0.0.1:8080/")
ws$onMessage(function(event) {
cat(event$data, "\n")
})
ws$send("My name is=John")
ws$send("How are you everyone?")

In the code above, I am creating a WebSocket connection to the server. I first send a message with “My name is” so that it add a name to the connection object. Then, I can send messages around.

Connecting in two terminal would look something like:

Example 2: creating a real-time polling application with R plumber and Angular

In this second example I use WebSockets in R plumber and an Angular frontend to create a simple polling application. In this application, users will connect to the the WebSocket and will be able to cast a vote for a color (red, green or blue) by sending a message after pressing a button in the frontend. The frontend will also include a plotly barplot with the overall voting results that will get updated in real-time as new votes are casted.

Just as in the previous example, the backend code looks like:

plumber_poll.Rlibrary(uuid)
clients <- list()
results <- data.frame(
color=c("red","blue","green"),
votes=c(0,0,0))
addClient <- function(ws) {
clients[[ws$request$uuid]] <<- ws
print(clients)
return(clients)
}
removeClient <- function(uuid) {
clients[[uuid]] <<- NULL
}
updateVoteCount <- function(color) {
results[results$color == color,"votes"] <<- results[results$color == color,"votes"]+1
}
#* @filter logger
function(req, res){
cat(as.character(Sys.time()), "-",
req$REQUEST_METHOD, req$PATH_INFO, "-",
req$HTTP_USER_AGENT, "@", req$REMOTE_ADDR, "\n")
plumber::forward()
}
#* @filter cors
cors <- function(res) {
res$setHeader("Access-Control-Allow-Origin", "*")
plumber::forward()
}
#' @get /poll-results
function() {
return(results)
}
#' @plumber
function(pr) {
pr$websocket(
function(ws) {
ws$request$uuid <- UUIDgenerate()
addClient(ws)
print("New user connected!")
ws$onMessage(function(binary, message) {
updateVoteCount(jsonlite::fromJSON(message))
for(client in clients) {
client$send(message)
}
})
ws$onClose(function() {
removeClient(ws$request$uuid)
})
}
)
}

Again, I create a clients list to keep track of the connections. I also create a results dataframe that will contain the voting results.

The @filter logger is just a “filter” with a function that allows me to log incoming http requests (totally optional). The @filter cors is useful during development and allows Cross-Origin Resource Sharing (i.e. allows the frontend running in another domain to communicate with the API).

The endpoint @get /poll-results returns the full poll results to the users through a GET request.

Under @plumberI then enable the WebSocket and define the callback functions forws$onMessage() just as in the previous example. However, here I will use the “messages” as votes, and once a message is received, updateVoteCount() will be triggered and add the new vote into the results dataframe. Moreover, I also send what was the latest vote to all the clients (see the for loop).

That’s everything for the backend. The frontend is also very simple. As it is hard to show all the code here, I will only show the code that interacts with our plumber API below.

export class VotingFormComponent implements OnInit, OnDestroy {ws = webSocket(environment.WS_ENDPOINT);
latestVote: any;
constructor(private ps: PollService) { }ngOnInit(): void {
this.ws.subscribe(newVote => {
this.latestVote = newVote;
this.ps.updateResults(newVote);
});
}
vote(color: string) {
this.ws.next(color)
}
ngOnDestroy() {
this.ws.complete();
}
}

This is the voting component. Here I am connecting to the WebSocket at environment.WS_ENDPOINT which in my case was ‘ws://127.0.0.1:8080’ and subscribing to the stream. Once a new vote is casted by anyone connected to it, this.latestVote will get the value and this.ps.updateResults() (see below) will be triggered.

The function vote()is in charge to cast votes, and this.ws.comple() closes the connection when the component is destroyed.

export class PollService {API_ENDPOINT = environment.API_ENDPOINT;
fullPollResults = new Subject<any>();
pollResults: any;
constructor(private http: HttpClient) {}
getFullResults() {
return this.http.get(`${this.API_ENDPOINT}/poll-results`).subscribe(pollResults => {
this.pollResults = pollResults;
this.fullPollResults.next(pollResults)
})
}
updateResults(lastestVote: any) {
this.pollResults = this.pollResults.map((e:any) => {
if(e.color == lastestVote) {
e.votes++
}
return e
})
this.fullPollResults.next(this.pollResults)
}
}

The PollService contains getFullResults()that does a HTTP GET request to the API endpoint /poll-results which returns the full polling results (see backend code). This function is necessary such that all new users can get the current poll results when they first connect.

The function updateResults() updates the fullPollResults as soon as a new vote is cast (through the WebSocket) by just adding the latestVote.

Finally, the PollResultsComponent includes a plotly barplot that gets updated when fullPollResults Subject received new data.

export class PollResultsComponent implements OnInit, OnDestroy {constructor(private ps: PollService) { }
pollResultsSubs!: Subscription;
data = [
{
x: [],
y: [],
type: 'bar'
}
];
ngOnInit(): void {
this.pollResultsSubs = this.ps.fullPollResults.subscribe(results => {
this.data = [
{
x: results.map((e:any) => e.color),
y: results.map((e:any) => e.votes),
type: 'bar'
}
];
});
this.ps.getFullResults();
}
ngOnDestroy() {
this.pollResultsSubs.unsubscribe();
}
}

So, all in all, the workflow is as follows:

  1. A user opens the application and connects to the WebSocket
  2. The user retrieves the current full poll-results through a GET request to the /poll-results endpoint
  3. Every time a vote is casted through the WebSocket, the client’s full poll-results will update by adding the latestVote to the vote array in the client side.

The final result looks like this in two browser sessions:

In case anyone finds this useful. The code has been added to this repo.

--

--

No responses yet