curator cache and watch
The Apache Curator API is pretty slick. It is written in a “fluent” style making it nice and readable. Also the use of “anonymous functions” aka lamdas make it fun to work with.
Let’s examine the PathChildrenCache
class. A PathChildrenCache
is a construct that does local mirroring of a ZooKeeper path. We can set up an “event handler” for the cache, which will be called when an event takes place on a watched zNode. There’s an ENUM which contains all possible events, and our event handler will match on the type of event. Holy crap I’m having JavaScript flash backs here…
requirements:
// build.sbt
libraryDependencies ++= Seq(
"org.apache.zookeeper" % "zookeeper" % "3.4.10",
"org.apache.curator" % "curator-framework" % "2.12.0",
"org.apache.curator" % "curator-recipes" % "2.12.0"
)
// imports
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.recipes.cache.{PathChildrenCache, PathChildrenCacheEvent, PathChildrenCacheListener}
import org.apache.curator.retry.ExponentialBackoffRetry
The basic setup:
val zkConnectString = sys.env("ZOOKEEPER_CONNECTION_STRING")
val path = "/little"
val retryPolicy = new ExponentialBackoffRetry(1000, 3)
val client = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy)
/**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
*/
val cache = new PathChildrenCache(client, path, true)
Now we add a “callback” in the form of a PathChildrenCacheListener
(and a helper just to print stuff out for now).
To use the PathChildrenCacheListener
interface we need to implement a childEvent
method:
private def prettyPrintEvent(event: PathChildrenCacheEvent): (String, String) = {
event.getData.getPath -> new String(event.getData.getData, "UTF-8")
}
cache.getListenable.addListener(new PathChildrenCacheListener {
override def childEvent(client: CuratorFramework, event: PathChildrenCacheEvent): Unit = {
event.getType match {
case PathChildrenCacheEvent.Type.CHILD_ADDED => println(s"Child added: ${prettyPrintEvent(event)}")
case PathChildrenCacheEvent.Type.CHILD_REMOVED => println(s"Child removed: ${prettyPrintEvent(event)}")
case PathChildrenCacheEvent.Type.CHILD_UPDATED => println(s"Child updated: ${prettyPrintEvent(event)}")
case _ => println(event.getType)
}
}
})
Now just add some basic driver code and watch the output to see the PathChildrenCacheListener
in action!
client.start
cache.start
val nestedPath1 = path + "/dragon"
val nestedPath2 = path + "/louis"
val data1 = "first bit of data"
val data2 = "second bit"
val data3 = "third"
val data4 = "fourth"
List(path, nestedPath1, nestedPath2).foreach(path => {
if (client.checkExists.creatingParentContainersIfNeeded.forPath(path) == null)
try { client.create.creatingParentContainersIfNeeded.forPath(path) }
catch { case e:NodeExistsException => println(e.getMessage)}
Thread.sleep(500)
})
client.setData.forPath(nestedPath1, data1.getBytes)
Thread.sleep(500)
client.setData.forPath(nestedPath2, data2.getBytes)
Thread.sleep(500)
client.setData.forPath(nestedPath1, data3.getBytes)
Thread.sleep(500)
client.setData.forPath(nestedPath2, data4.getBytes)
Thread.sleep(500)
cache.close
client.close
// output.....
// CONNECTION_RECONNECTED
// Child added: (/little/dragon,192.168.1.12)
// Child added: (/little/louis,192.168.1.12)
// Child updated: (/little/dragon,first bit of data)
// Child updated: (/little/louis,second bit)
// Child updated: (/little/dragon,third)
// Child updated: (/little/louis,fourth)
//
Why all the Thread.sleep
’s? Good question. These Curator methods all seem to run as background processes, meaning that synchronous execution cannot be guaranteed. It is even more confusing to me as the SyncBuilder
part of the library is “always in the background” which is the opposite of how other frameworks like Node.js implements sync
. More research will be needed.