| Title: | Connecting Stream Mining Components Using Sockets and Web Services |
|---|---|
| Description: | Adds functionality to connect stream mining components from package stream using sockets and Web services. The package can be used create distributed workflows and create plumber-based Web services which can be deployed on most common cloud services. |
| Authors: | Michael Hahsler [aut, cre, cph] (ORCID: <https://orcid.org/0000-0003-2716-1405>) |
| Maintainer: | Michael Hahsler <[email protected]> |
| License: | GPL-3 |
| Version: | 0.0-6.1 |
| Built: | 2026-05-13 06:37:26 UTC |
| Source: | https://github.com/mhahsler/streamConnect |
Provides a DSC front-end for a clusterer running as a web service. The methods
nclusters(), get_center(), get_weights() are supported. The request is
retried with httr::RETRY() if it fails the first time.
DSC_WebService(url, verbose = FALSE, ...)DSC_WebService(url, verbose = FALSE, ...)
url |
endpoint URI address in the format |
verbose |
logical; display connection information. |
... |
further arguments are passed on to |
A stream::DSC object.
Other WebService:
DSD_ReadWebService(),
publish_DSC_via_WebService(),
publish_DSD_via_WebService()
Other dsc:
publish_DSC_via_WebService()
# find a free port port <- httpuv::randomPort() port # deploy a clustering process listening for data on the port rp1 <- publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", port = port) rp1 # get a local DSC interface dsc <- DSC_WebService(paste0("http://localhost", ":", port), verbose = TRUE, config = httr::verbose(info = TRUE)) dsc # cluster dsd <- DSD_Gaussians(k = 3, d = 2, noise = 0.05) update(dsc, dsd, 500) get_centers(dsc) get_weights(dsc) plot(dsc) # kill the background clustering process. rp1$kill() rp1# find a free port port <- httpuv::randomPort() port # deploy a clustering process listening for data on the port rp1 <- publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", port = port) rp1 # get a local DSC interface dsc <- DSC_WebService(paste0("http://localhost", ":", port), verbose = TRUE, config = httr::verbose(info = TRUE)) dsc # cluster dsd <- DSD_Gaussians(k = 3, d = 2, noise = 0.05) update(dsc, dsd, 500) get_centers(dsc) get_weights(dsc) plot(dsc) # kill the background clustering process. rp1$kill() rp1
Creates a DSD_ReadStream that reads from a port.
DSD_ReadSocket(host = "localhost", port, retry_args = NULL, ...)DSD_ReadSocket(host = "localhost", port, retry_args = NULL, ...)
host |
hostname. |
port |
host port. |
retry_args |
a list with arguments for |
... |
further arguments are passed on to |
A stream::DSD object.
Other Socket:
publish_DSD_via_Socket()
Other dsd:
DSD_ReadWebService(),
publish_DSD_via_Socket(),
publish_DSD_via_WebService()
# find a free port port <- httpuv::randomPort() port # create a background DSD process sending data to the port rp1 <- DSD_Gaussians(k = 3, d = 3) %>% publish_DSD_via_Socket(port = port) rp1 # create a DSD that connects to the socket. Note that we need to # specify the column names of the stream dsd <- DSD_ReadSocket(port = port, col.names = c("x", "y", "z", ".class")) dsd get_points(dsd, n = 10) plot(dsd) close_stream(dsd) # end the DSD process. Note: that closing the connection above # may already kill the process. if (rp1$is_alive()) rp1$kill() rp1# find a free port port <- httpuv::randomPort() port # create a background DSD process sending data to the port rp1 <- DSD_Gaussians(k = 3, d = 3) %>% publish_DSD_via_Socket(port = port) rp1 # create a DSD that connects to the socket. Note that we need to # specify the column names of the stream dsd <- DSD_ReadSocket(port = port, col.names = c("x", "y", "z", ".class")) dsd get_points(dsd, n = 10) plot(dsd) close_stream(dsd) # end the DSD process. Note: that closing the connection above # may already kill the process. if (rp1$is_alive()) rp1$kill() rp1
Reads from a web service that published an operation called
get_points which takes a parameter n and returns n data points in CSV or json
format. The request is
retried with httr::RETRY() if it fails the first time.
DSD_ReadWebService(url, verbose = FALSE, ...)DSD_ReadWebService(url, verbose = FALSE, ...)
url |
endpoint URI address in the format |
verbose |
logical; display connection information. |
... |
further arguments are passed on to |
A stream::DSD object.
Other WebService:
DSC_WebService(),
publish_DSC_via_WebService(),
publish_DSD_via_WebService()
Other dsd:
DSD_ReadSocket(),
publish_DSD_via_Socket(),
publish_DSD_via_WebService()
# find a free port port <- httpuv::randomPort() port # create a background DSD process sending data to the port rp1 <- publish_DSD_via_WebService("DSD_Gaussians(k = 3, d = 3)", port = port) ## use json for the transport layer instead of csv # rp1 <- publish_DSD_via_WebService("DSD_Gaussians(k = 3, d = 3)", # port = port, serialize = "json") rp1 # create a DSD that connects to the web service dsd <- DSD_ReadWebService(paste0("http://localhost", ":", port)) dsd get_points(dsd, n = 10) plot(dsd) # end the DSD process. Note: that closing the connection above # may already kill the process. rp1$kill() rp1# find a free port port <- httpuv::randomPort() port # create a background DSD process sending data to the port rp1 <- publish_DSD_via_WebService("DSD_Gaussians(k = 3, d = 3)", port = port) ## use json for the transport layer instead of csv # rp1 <- publish_DSD_via_WebService("DSD_Gaussians(k = 3, d = 3)", # port = port, serialize = "json") rp1 # create a DSD that connects to the web service dsd <- DSD_ReadWebService(paste0("http://localhost", ":", port)) dsd get_points(dsd, n = 10) plot(dsd) # end the DSD process. Note: that closing the connection above # may already kill the process. rp1$kill() rp1
Uses the package plumber to publish a data stream task as a web service.
publish_DSC_via_WebService( dsc, port, task_file = NULL, serializer = "csv", serve = TRUE, background = TRUE, debug = FALSE )publish_DSC_via_WebService( dsc, port, task_file = NULL, serializer = "csv", serve = TRUE, background = TRUE, debug = FALSE )
dsc |
A character string that creates a DSC. |
port |
port used to serve the task. |
task_file |
name of the plumber task script file. |
serializer |
method used to serialize the data. By default |
serve |
if |
background |
logical; start a background process? |
debug |
if |
The function writes a plumber task script file and starts the web server to serve the content of the stream using the endpoints
GET /info
POST /update requires the data to be uploaded as a file in csv format (see Examples section).
GET /get_centers with parameter type (see stream::get_centers()).
GET /get_weights with parameter type (see stream::get_weights()).
Supported serializers are csv (default), json, and rds.
APIs generated using plumber can be easily deployed. See: Hosting. By setting a task_file and serve = FALSE a plumber
task script file is generated that can deployment.
a processx::process object created with callr::r_bg() which runs the plumber server
in the background. The process can be stopped with rp$kill() or by killing the process
using the operating system with the appropriate PID. rp$get_result() can
be used to check for errors in the server process (e.g., when it terminates
unexpectedly).
Other WebService:
DSC_WebService(),
DSD_ReadWebService(),
publish_DSD_via_WebService()
Other dsc:
DSC_WebService()
# find a free port port <- httpuv::randomPort() port # Deploy a clustering process listening for data on the port rp1 <- publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", port = port) rp1 # look at ? DSC_WebService for a convenient interface. # Here we we show how to connect to the port and send data manually. library(httr) # the info verb returns some basic information about the clusterer. resp <- RETRY("GET", paste0("http://localhost:", port, "/info")) d <- content(resp, show_col_types = FALSE) d # create a local data stream and send it to the clusterer using the update verb. dsd <- DSD_Gaussians(k = 3, d = 2, noise = 0.05) tmp <- tempfile() stream::write_stream(dsd, tmp, n = 500, header = TRUE) resp <- POST(paste0("http://localhost:", port, "/update"), body = list(upload = upload_file(tmp))) unlink(tmp) resp # retrieve the cluster centers using the get_centers verb resp <- GET(paste0("http://localhost:", port, "/get_centers")) d <- content(resp, show_col_types = FALSE) head(d) plot(dsd, n = 100) points(d, col = "red", pch = 3, lwd = 3) # kill the process. rp1$kill() rp1 # Debug the interface (run the service and start a web interface) if (interactive()) publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", port = port, debug = TRUE)# find a free port port <- httpuv::randomPort() port # Deploy a clustering process listening for data on the port rp1 <- publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", port = port) rp1 # look at ? DSC_WebService for a convenient interface. # Here we we show how to connect to the port and send data manually. library(httr) # the info verb returns some basic information about the clusterer. resp <- RETRY("GET", paste0("http://localhost:", port, "/info")) d <- content(resp, show_col_types = FALSE) d # create a local data stream and send it to the clusterer using the update verb. dsd <- DSD_Gaussians(k = 3, d = 2, noise = 0.05) tmp <- tempfile() stream::write_stream(dsd, tmp, n = 500, header = TRUE) resp <- POST(paste0("http://localhost:", port, "/update"), body = list(upload = upload_file(tmp))) unlink(tmp) resp # retrieve the cluster centers using the get_centers verb resp <- GET(paste0("http://localhost:", port, "/get_centers")) d <- content(resp, show_col_types = FALSE) head(d) plot(dsd, n = 100) points(d, col = "red", pch = 3, lwd = 3) # kill the process. rp1$kill() rp1 # Debug the interface (run the service and start a web interface) if (interactive()) publish_DSC_via_WebService("DSC_DBSTREAM(r = .05)", port = port, debug = TRUE)
Creates a socket server connection to send steam data.
publish_DSD_via_Socket(dsd, port, blocksize = 1024L, background = TRUE, ...)publish_DSD_via_Socket(dsd, port, blocksize = 1024L, background = TRUE, ...)
dsd |
A DSD object. |
port |
port used to serve the DSD. |
blocksize |
number of data points pushed on the buffer at once. |
background |
logical; start a background process? |
... |
further arguments are passed on to |
Creates a server socket with socketConnection()
and then uses a stream::write_stream() to write data to a socket connection.
This method does not provide a header for the data.
a processx::process object created with callr::r_bg() which runs the plumber server
in the background. The process can be stopped with rp$kill() or by killing the process
using the operating system with the appropriate PID. rp$get_result() can
be used to check for errors in the server process (e.g., when it terminates
unexpectedly).
Other Socket:
DSD_ReadSocket()
Other dsd:
DSD_ReadSocket(),
DSD_ReadWebService(),
publish_DSD_via_WebService()
# find a free port port <- httpuv::randomPort() port # create a background DSD process sending data to the port rp1 <- DSD_Gaussians(k = 3, d = 3) %>% publish_DSD_via_Socket(port = port) rp1 # connect to the port (retry waits for the socket to establish) con <- retry(socketConnection(port = port, open = 'r')) dsd <- retry(DSD_ReadStream(con, col.names = c("x", "y", "z", ".class"))) get_points(dsd, n = 10) plot(dsd) # close connection close_stream(dsd) # end the DSD process. Note: that closing the connection above # may already kill the process. rp1$kill() rp1# find a free port port <- httpuv::randomPort() port # create a background DSD process sending data to the port rp1 <- DSD_Gaussians(k = 3, d = 3) %>% publish_DSD_via_Socket(port = port) rp1 # connect to the port (retry waits for the socket to establish) con <- retry(socketConnection(port = port, open = 'r')) dsd <- retry(DSD_ReadStream(con, col.names = c("x", "y", "z", ".class"))) get_points(dsd, n = 10) plot(dsd) # close connection close_stream(dsd) # end the DSD process. Note: that closing the connection above # may already kill the process. rp1$kill() rp1
Uses the package plumber to publish a data stream as a web service.
publish_DSD_via_WebService( dsd, port, task_file = NULL, serializer = "csv", serve = TRUE, background = TRUE, debug = FALSE )publish_DSD_via_WebService( dsd, port, task_file = NULL, serializer = "csv", serve = TRUE, background = TRUE, debug = FALSE )
dsd |
A character string that creates a DSD. |
port |
port used to serve the DSD. |
task_file |
name of the plumber task script file. |
serializer |
method used to serialize the data. By default |
serve |
if |
background |
logical; start a background process? |
debug |
if |
The function writes a plumber task script file and starts the web server to serve the content of the stream using the endpoints
http://localhost:port/get_points?n=100 and
http://localhost:port/info.
APIs generated using plumber can be easily deployed. See: Hosting. By setting a task_file and serve = FALSE a plumber
task script file is generated that can be deployment.
A convenient reader for stream data over web services is available as DSD_ReadWebService.
a processx::process object created with callr::r_bg() which runs the plumber server
in the background. The process can be stopped with rp$kill() or by killing the process
using the operating system with the appropriate PID. rp$get_result() can
be used to check for errors in the server process (e.g., when it terminates
unexpectedly).
Other WebService:
DSC_WebService(),
DSD_ReadWebService(),
publish_DSC_via_WebService()
Other dsd:
DSD_ReadSocket(),
DSD_ReadWebService(),
publish_DSD_via_Socket()
# find a free port port <- httpuv::randomPort() port # create a background DSD process sending data to the port rp1 <- publish_DSD_via_WebService("DSD_Gaussians(k = 3, d = 3)", port = port) rp1 # connect to the port and read manually. See DSD_ReadWebService for # a more convenient way to connect to the WebService in R. library("httr") # we use RETRY to give the server time to spin up resp <- RETRY("GET", paste0("http://localhost:", port, "/info")) d <- content(resp, show_col_types = FALSE) d # example: Get 100 points and plot them resp <- GET(paste0("http://localhost:", port, "/get_points?n=100")) d <- content(resp, show_col_types = FALSE) head(d) dsd <- DSD_Memory(d) dsd plot(dsd, n = -1) # end the DSD process. Note: that closing the connection above # may already kill the process. rp1$kill() rp1 # Publish using json rp2 <- publish_DSD_via_WebService("DSD_Gaussians(k = 3, d = 3)", port = port, serializer = "json") rp2 # connect to the port and read # we use RETRY to give the server time to spin up resp <- RETRY("GET", paste0("http://localhost:", port, "/info")) content(resp, as = "text") resp <- GET(paste0("http://localhost:", port, "/get_points?n=5")) content(resp, as = "text") # cleanup rp2$kill() rp2 # Debug the interface (run the service and start a web interface) if (interactive()) publish_DSD_via_WebService("DSD_Gaussians(k = 3, d = 3)", port = port, debug = TRUE)# find a free port port <- httpuv::randomPort() port # create a background DSD process sending data to the port rp1 <- publish_DSD_via_WebService("DSD_Gaussians(k = 3, d = 3)", port = port) rp1 # connect to the port and read manually. See DSD_ReadWebService for # a more convenient way to connect to the WebService in R. library("httr") # we use RETRY to give the server time to spin up resp <- RETRY("GET", paste0("http://localhost:", port, "/info")) d <- content(resp, show_col_types = FALSE) d # example: Get 100 points and plot them resp <- GET(paste0("http://localhost:", port, "/get_points?n=100")) d <- content(resp, show_col_types = FALSE) head(d) dsd <- DSD_Memory(d) dsd plot(dsd, n = -1) # end the DSD process. Note: that closing the connection above # may already kill the process. rp1$kill() rp1 # Publish using json rp2 <- publish_DSD_via_WebService("DSD_Gaussians(k = 3, d = 3)", port = port, serializer = "json") rp2 # connect to the port and read # we use RETRY to give the server time to spin up resp <- RETRY("GET", paste0("http://localhost:", port, "/info")) content(resp, as = "text") resp <- GET(paste0("http://localhost:", port, "/get_points?n=5")) content(resp, as = "text") # cleanup rp2$kill() rp2 # Debug the interface (run the service and start a web interface) if (interactive()) publish_DSD_via_WebService("DSD_Gaussians(k = 3, d = 3)", port = port, debug = TRUE)
Retries and expression that fails. This is mainly used to retry establishing a connection.
retry(f, times = 5, wait = 1, verbose = FALSE, operation = NULL)retry(f, times = 5, wait = 1, verbose = FALSE, operation = NULL)
f |
expression |
times |
integer; number of times |
wait |
number of seconds to wait in between tries. |
verbose |
logical; show progress and errors. |
operation |
name of the operation used in the error message. |
the result of the expression f
retry(1)retry(1)