scala streaming api
A few days ago I posted about my experimentation with the Kafka Streams api. While I struggled a bit with the Scala/Java interop I have to say I am generally a fan of the API they created. The concept of a “topology” was pretty cool. The programmer is given an API to create “processor nodes” which are basically the steps that the data goes thru as it moves from the source topic to the sink topic.
I thought it might be fun to try and implement a basic streaming API on my own. In this example, I am reading data from a “source” file, mapping the data thru several node steps, then writing it to a “sink” file.
My chosen data points are the lyrics from the Little Dragon song Constant Surprises. The API looks like this:
Here are the components.
Our basic Message data class:
a wrapper around our Message class allowing for access and transformations:
When text is read in from a file, Scala will provide a Iterator[String]
. So for the purposes of my API construction I am wrapping the entire Iterator in a class called “StreamWrapper” that has a map operation included. It also has a sink
operation which we will get to soon.
Finally we need a way to start the stream. We will wrap that operation in a StreamInitializer
object.
The Scala way is scala.io.Source.fromInputStream(file).getLines
which returns an Iterator[String]
Each String
is wrapped in a Node
case class, then all nodes are put into a StreamWrapper
.
note: counterF()
is the counter I wrote about yesterday, using it to give each message a sequence ID.
Now we are able to chain together a bunch of “nodes” and create a streaming topology:
For the future it would be cool to have the process tail the source file and send new lines into the stream whenever they are added. (for now it just reads the whole file and dumps it in one swift process)