--- title: "stream: Working With Data Streams using Connections and Web Services" author: "Michael Hahsler" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{stream: Working With Data Streams using Connections and Web Services} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r setup, include=FALSE} knitr::opts_chunk$set(echo = TRUE) ``` # Introduction Data streams are often processed in a distributed manner using multiple machines or multiple processes. For example, a data stream may be produced by a sensor attached to a remote machine or multiple clustering algorithms run in parallel using several R processes. Another application is to connect to other software components in a stream mining pipeline. First, we show how socket connections together with the package `stream` can be used to connect multiple processes or machines. Then we give examples of how package `streamConnect` makes connecting stream mining components more convenient by providing an interface to connect stream processing using _sockets_ or _web services_. While sockets are only used to connect data steam generating processes, web services are more versatile and can also be used to create data stream clustering processes as a service. The final section of this paper shows how to deploy the server/web service. # Using Sockets Directly to Publish a Data Stream The functions `write_stream()` and the class `DSD_ReadStream` provided in package `stream` can be used for communicate via connections (files, sockets, URLs, etc.). In the first example, we manually set up the connection. The example is useful to understand how sockets work especially for users interested in implementing their own components using other programming languages or connecting with other data stream software. A more convenient way to do this using package `streamConnect` is described later in this paper. For we find an available port. ```{r} port <- httpuv::randomPort() port ``` ## Server: Serving a Data Stream The server serves data from a data stream. We use library `callr` to create a separate R process that serves a data stream creating 10 points every second using a socket connection, but you can also put the code in function `r_bg()` in a file called `server.R` and run (potentially on a different machine) it with `R CMD BATCH server.R` from the command line. ```{r eval = TRUE} library(stream) library(callr) rp1 <- r_bg(function(port) { library(stream) stream <- DSD_Gaussians(k = 3, d = 3) blocksize <- 10 con <- socketConnection(port = port, server = TRUE) while (TRUE) { write_stream(stream, con, n = blocksize, close = FALSE) Sys.sleep(1) } close(con) }, args = list(port = port)) rp1 ``` ## Client: Reading from the Stream The client consumes the data stream. We open the connection which starts the data generating process. Note that `streamConnect` is not used here. For convenience, we only use the helper `retry()` defined in streamConnect to make sure the server connections are established. ```{r eval = TRUE} con <- streamConnect::retry(socketConnection(port = port, open = 'r')) con dsd <- streamConnect::retry(DSD_ReadStream(con)) ``` We poll all available data (`n = -1`) several times. The first request should yield 10 points, the second none and the third request should yield 20 points (2 seconds). ```{r eval = TRUE} get_points(dsd, n= -1) get_points(dsd, n= -1) Sys.sleep(2) get_points(dsd, n= -1) close(con) ``` ## Server: Stoping the Server Process Here we stop the `callr` process. Note that the socket connection is still active and will serve the data in the connection buffer as long as the reading process keeps the connection open. ```{r eval = TRUE} rp1$kill() ``` # streamConnect Sockets `streamConnect` provides a more convenient way to set up a connection using sockets. `publish_DSD_via_Socket()` creates a socket broadcasting the data stream and `DSD_ReadSocket` creates a `DSD` object reading from that socket. We will use an available port. ```{r} port <- httpuv::randomPort() port ``` ## Server: Publish Data We create a DSD process sending data to the port. ```{r eval = TRUE} library(streamConnect) rp1 <- DSD_Gaussians(k = 3, d = 3) %>% publish_DSD_via_Socket(port = port) rp1 ``` ## Client: Connect to the Data Stream Next, we create a DSD that connects to the socket. `DSD_ReadSocket()` already performs internally retries ```{r eval = TRUE} library(streamConnect) dsd <- DSD_ReadSocket(port = port, col.names = c("x", "y", "z", ".class")) dsd get_points(dsd, n = 10) plot(dsd) close_stream(dsd) ``` ## Server: Stoping the Server Process Closing the stream on the client also closes the connection which may already kill the serving process. ```{r eval = TRUE} if (rp1$is_alive()) rp1$kill() ``` # streamConnect Web Services Web services are more versatile, they can be used to deploy data stream sources using `publish_DSD_via_WebService()`/`DSD_ReadWebservice` or data stream tasks using `publish_DSC_via_WebService()`/`DSC_WebService`. Here we only show how to deploy a clusterer, but a DSD can be published in a similar manner. Larger workflows can be created using `DST_Runner` from `stream`. `streamConnect` uses the package `plumber` to manage web services. The data is transmitted in serialized form. The default serialization format it `csv` (comma separated values). Other formats are `json` and `rds` (see `plumber::serializer_csv`). We will use an available port. ```{r} port <- httpuv::randomPort() port ``` ## Server: Create a Web Service Creating a clustering web service process listening for data on the port. ```{r eval = TRUE} library(streamConnect) rp1 <- publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", port = port) rp1 ``` ## Client: Connect to the Web Service Connect to the web service with a local DSC interface. ```{r eval = TRUE} library(streamConnect) dsc <- DSC_WebService(paste0("http://localhost", ":", port), verbose = TRUE, config = httr::verbose(info = TRUE)) dsc ``` Note that the verbose output can help with debugging connection issues. Cluster some data. ```{r eval = TRUE} dsd <- DSD_Gaussians(k = 3, d = 2, noise = 0.05) update(dsc, dsd, 500) dsc get_centers(dsc) get_weights(dsc) plot(dsc) ``` ## Server: Stop the Web Service Kill the web service process. ```{r eval = TRUE} rp1$kill() ``` # Deploying the Server/Web Service Web services and the socket-based server can be easily deployed to any server or cloud system including containers. Make sure R and the package `streamConnect` and all dependencies are installed. Create a short R script to start the server/service and deploy it. ```{r, eval = FALSE} library(streamConnect) port = 8001 publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", port = port, background = FALSE) ``` Web services can also be deployed using a plumber task file. The following call does not create a server, but returns the name of the task file. ```{r, eval = FALSE} publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", serve = FALSE) ``` Open the file in R studio to deploy it or read the [plumber Hosting vignette](https://www.rplumber.io/articles/hosting.html).