Data streams are often processed in a distributed manner using multiple machines or multiple processes. For example, a data stream may be produced by a sensor attached to a remote machine or multiple clustering algorithms run in parallel using several R processes. Another application is to connect to other software components in a stream mining pipeline.
First, we show how socket connections together with the package
stream
can be used to connect multiple processes or
machines.
Then we give examples of how package streamConnect
makes
connecting stream mining components more convenient by providing an
interface to connect stream processing using sockets or web
services. While sockets are only used to connect data steam
generating processes, web services are more versatile and can also be
used to create data stream clustering processes as a service.
The final section of this paper shows how to deploy the server/web service.
The functions write_stream()
and the class
DSD_ReadStream
provided in package stream
can
be used for communicate via connections (files, sockets, URLs, etc.). In
the first example, we manually set up the connection. The example is
useful to understand how sockets work especially for users interested in
implementing their own components using other programming languages or
connecting with other data stream software.
A more convenient way to do this using package
streamConnect
is described later in this paper.
For we find an available port.
## [1] 19650
The server serves data from a data stream. We use library
callr
to create a separate R process that serves a data
stream creating 10 points every second using a socket connection, but
you can also put the code in function r_bg()
in a file
called server.R
and run (potentially on a different
machine) it with R CMD BATCH server.R
from the command
line.
##
## Attaching package: 'callr'
## The following object is masked from 'package:rmarkdown':
##
## run
rp1 <- r_bg(function(port) {
library(stream)
stream <- DSD_Gaussians(k = 3, d = 3)
blocksize <- 10
con <- socketConnection(port = port, server = TRUE)
while (TRUE) {
write_stream(stream, con, n = blocksize, close = FALSE)
Sys.sleep(1)
}
close(con)
},
args = list(port = port))
rp1
## PROCESS 'R', running, pid 4260.
The client consumes the data stream. We open the connection which
starts the data generating process. Note that streamConnect
is not used here. For convenience, we only use the helper
retry()
defined in streamConnect to make sure the server
connections are established.
## A connection with
## description "->localhost:19650"
## class "sockconn"
## mode "r"
## text "text"
## opened "opened"
## can read "yes"
## can write "yes"
We poll all available data (n = -1
) several times. The
first request should yield 10 points, the second none and the third
request should yield 20 points (2 seconds).
## V1 V2 V3
## 1 0.9351033 0.9755215 0.325833302
## 2 0.9410035 0.9665466 0.325980752
## 3 0.2777504 0.7818957 0.404477852
## 4 0.9561333 1.0475067 0.269723464
## 5 0.5474651 0.8035274 0.005818664
## 6 0.5130138 0.7942215 -0.001873796
## 7 0.9427160 1.0643578 0.219448365
## 8 1.0147507 0.9532129 0.352208698
## 9 0.9539514 1.0014433 0.319349422
## 10 0.2945968 0.7559638 0.394759189
## 11 0.1987487 0.8653632 0.400777137
## 12 0.5560994 0.7828004 0.025728864
## 13 0.2848937 0.7239691 0.398225350
## 14 0.9577190 0.9349404 0.316577305
## 15 0.5801526 0.8185967 0.055585563
## 16 0.9682857 1.0124763 0.234119456
## 17 0.9553973 0.9950655 0.304588605
## 18 0.9669106 0.9703764 0.309561276
## 19 0.9719432 1.0819497 0.278362267
## 20 0.5873637 0.8666131 0.100781575
## [1] V1 V2 V3
## <0 rows> (or 0-length row.names)
## V1 V2 V3
## 1 0.8914968 0.9983011 0.297256784
## 2 1.0003102 0.9668791 0.282004048
## 3 1.0105414 0.9714505 0.314209775
## 4 0.6030828 0.8808783 0.034868288
## 5 0.6006558 0.8310528 0.061972897
## 6 0.9253175 1.0152939 0.261015377
## 7 0.9174193 0.9081097 0.423757525
## 8 0.3106659 0.6947775 0.367518242
## 9 0.3565637 0.6738449 0.387914885
## 10 0.3065113 0.6998869 0.418941537
## 11 0.9587662 0.9739466 0.339657046
## 12 0.2650925 0.7007781 0.392534884
## 13 0.2647784 0.7936385 0.472286896
## 14 0.5195622 0.8997765 -0.031224975
## 15 0.5625908 0.8309857 0.017831605
## 16 0.3131279 0.7159313 0.384087725
## 17 0.9201286 0.9224198 0.360238501
## 18 0.5462375 0.8738420 -0.001452378
## 19 0.9831290 0.9778791 0.335385922
## 20 0.9852742 0.9487940 0.299067494
streamConnect
provides a more convenient way to set up a
connection using sockets. publish_DSD_via_Socket()
creates
a socket broadcasting the data stream and DSD_ReadSocket
creates a DSD
object reading from that socket.
We will use an available port.
## [1] 24830
We create a DSD process sending data to the port.
library(streamConnect)
rp1 <- DSD_Gaussians(k = 3, d = 3) %>% publish_DSD_via_Socket(port = port)
rp1
## PROCESS 'R', running, pid 4315.
Next, we create a DSD that connects to the socket.
DSD_ReadSocket()
already performs internally retries
library(streamConnect)
dsd <- DSD_ReadSocket(port = port, col.names = c("x", "y", "z", ".class"))
dsd
## Data Stream from Connection (d = 3, k = NA)
## Class: DSD_ReadStream, DSD_R, DSD
## connection: ->localhost:24830 (opened)
## x y z .class
## 1 0.94375431 0.7014603 0.5113278 3
## 2 0.04397319 0.3920436 0.8797119 2
## 3 0.06250808 0.4045761 0.9008937 2
## 4 0.97751861 0.8018427 0.5981745 3
## 5 0.07623114 0.3617491 0.8926938 2
## 6 0.95528487 0.4120564 0.9650456 1
## 7 1.04581897 0.4445584 1.0309124 1
## 8 0.00854794 0.3763124 0.9053326 2
## 9 1.03487398 0.4474087 0.9366040 1
## 10 -0.02515803 0.4268451 0.9680768 2
Web services are more versatile, they can be used to deploy data
stream sources using
publish_DSD_via_WebService()
/DSD_ReadWebservice
or data stream tasks using
publish_DSC_via_WebService()
/DSC_WebService
.
Here we only show how to deploy a clusterer, but a DSD can be published
in a similar manner. Larger workflows can be created using
DST_Runner
from stream
.
streamConnect
uses the package plumber
to
manage web services. The data is transmitted in serialized form. The
default serialization format it csv
(comma separated
values). Other formats are json
and rds
(see
plumber::serializer_csv
).
We will use an available port.
## [1] 40841
Creating a clustering web service process listening for data on the port.
## PROCESS 'R', running, pid 4370.
Connect to the web service with a local DSC interface.
library(streamConnect)
dsc <- DSC_WebService(paste0("http://localhost", ":", port),
verbose = TRUE, config = httr::verbose(info = TRUE))
## Connecting to DSC Web service at http://localhost:40841
## Success
## Web Service Data Stream Clusterer: DBSTREAM
## Served from: http://localhost:40841
## Class: DSC_WebService, DSC_R, DSC
## Number of micro-clusters: 0
## Number of macro-clusters: 0
Note that the verbose output can help with debugging connection issues.
Cluster some data.
## Web Service Data Stream Clusterer: DBSTREAM
## Served from: http://localhost:40841
## Class: DSC_WebService, DSC_R, DSC
## Number of micro-clusters: 27
## Number of macro-clusters: 4
## # A tibble: 27 × 2
## X1 X2
## <dbl> <dbl>
## 1 0.370 0.0863
## 2 0.0894 0.813
## 3 0.0837 0.653
## 4 0.184 0.757
## 5 0.404 0.859
## 6 0.447 0.839
## 7 0.415 0.0695
## 8 0.459 0.0449
## 9 0.141 0.774
## 10 0.0929 0.766
## # ℹ 17 more rows
## [1] 66.973790 35.837456 3.224949 35.058181 63.044611 60.021095 68.369653
## [8] 35.225759 87.085785 57.134669 31.654292 3.289640 38.635385 38.830618
## [15] 41.187040 22.897197 37.166141 5.158245 5.935806 5.308434 11.047012
## [22] 11.228603 11.729318 4.557810 19.425439 21.626282 8.476491
Web services and the socket-based server can be easily deployed to
any server or cloud system including containers. Make sure R and the
package streamConnect
and all dependencies are installed.
Create a short R script to start the server/service and deploy it.
library(streamConnect)
port = 8001
publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", port = port,
background = FALSE)
Web services can also be deployed using a plumber task file. The following call does not create a server, but returns the name of the task file.
Open the file in R studio to deploy it or read the plumber Hosting vignette.