intro to kafka streams
I sat down to follow the Write a Streams Application tutorial just to get a taste of the Streams API. Boy was I in for a headache!
The API is implemented using some modern Java (JDK 8) and I found it did not play well with Scala AT ALL. It was a pure bash-my-head-into-the-wall session until I got it working.
The problem was with the type-casting interop between Java and Scala. It is not a clean interop. Here’s the code I used to convert part of that tutorial I mentioned above to Scala. Once my headache goes away I may try to get a little further.
in build.gradle
def scalaVersion = "2.11.8"
def log4jVersion = "2.9.1"
def kafkaVersion = "1.0.0"
dependencies {
compile "org.scala-lang:scala-library:$scalaVersion"
compile "org.scala-lang:scala-reflect:$scalaVersion"
compile "org.apache.kafka:kafka-streams:$kafkaVersion"
compile "org.apache.logging.log4j:log4j-api:$log4jVersion"
runtime "org.apache.logging.log4j:log4j-core:$log4jVersion"
runtime "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
}
scala code:
package com.lombardo
import java.util.Properties
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.{KStream, ValueMapper}
import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig}
class TransformString {
val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe2")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091")
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass())
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass())
val builder = new StreamsBuilder
val source: KStream[String, String] = builder.stream("readings")
// ugly, ugly, ugly hacking here. need to find a better way to implement this
source.mapValues[String](new ValueMapper[String, String] {
override def apply(value: String): String = {
value.toUpperCase
}
}).to("my-output-topic")
val topology = builder.build
val streams = new KafkaStreams(topology, props)
streams.start
}
Since the Kafka Producer
and Consumer
APIs are so clear and easy to use I would much rather build an application using those, compared to this streaming business. perhaps that will change as I get more experience. But for now, I’d prefer to keep it straightforward with the old school ways.