curator cache and watch
The Apache Curator API is pretty slick. It is written in a âfluentâ style making it nice and readable. Also the use of âanonymous functionsâ aka lamdas make it fun to work with.
Letâs examine the PathChildrenCache class. A PathChildrenCache is a construct that does local mirroring of a ZooKeeper path. We can set up an âevent handlerâ for the cache, which will be called when an event takes place on a watched zNode. Thereâs an ENUM which contains all possible events, and our event handler will match on the type of event. Holy crap Iâm having JavaScript flash backs hereâŚ
requirements:
// build.sbt
libraryDependencies ++= Seq(
"org.apache.zookeeper" % "zookeeper" % "3.4.10",
"org.apache.curator" % "curator-framework" % "2.12.0",
"org.apache.curator" % "curator-recipes" % "2.12.0"
)
// imports
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.recipes.cache.{PathChildrenCache, PathChildrenCacheEvent, PathChildrenCacheListener}
import org.apache.curator.retry.ExponentialBackoffRetryThe basic setup:
val zkConnectString = sys.env("ZOOKEEPER_CONNECTION_STRING")
val path = "/little"
val retryPolicy = new ExponentialBackoffRetry(1000, 3)
val client = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy)
/**
* @param client the client
* @param path path to watch
* @param cacheData if true, node contents are cached in addition to the stat
*/
val cache = new PathChildrenCache(client, path, true) Now we add a âcallbackâ in the form of a PathChildrenCacheListener (and a helper just to print stuff out for now).
To use the PathChildrenCacheListener interface we need to implement a childEvent method:
private def prettyPrintEvent(event: PathChildrenCacheEvent): (String, String) = {
event.getData.getPath -> new String(event.getData.getData, "UTF-8")
}
cache.getListenable.addListener(new PathChildrenCacheListener {
override def childEvent(client: CuratorFramework, event: PathChildrenCacheEvent): Unit = {
event.getType match {
case PathChildrenCacheEvent.Type.CHILD_ADDED => println(s"Child added: ${prettyPrintEvent(event)}")
case PathChildrenCacheEvent.Type.CHILD_REMOVED => println(s"Child removed: ${prettyPrintEvent(event)}")
case PathChildrenCacheEvent.Type.CHILD_UPDATED => println(s"Child updated: ${prettyPrintEvent(event)}")
case _ => println(event.getType)
}
}
})Now just add some basic driver code and watch the output to see the PathChildrenCacheListener in action!
client.start
cache.start
val nestedPath1 = path + "/dragon"
val nestedPath2 = path + "/louis"
val data1 = "first bit of data"
val data2 = "second bit"
val data3 = "third"
val data4 = "fourth"
List(path, nestedPath1, nestedPath2).foreach(path => {
if (client.checkExists.creatingParentContainersIfNeeded.forPath(path) == null)
try { client.create.creatingParentContainersIfNeeded.forPath(path) }
catch { case e:NodeExistsException => println(e.getMessage)}
Thread.sleep(500)
})
client.setData.forPath(nestedPath1, data1.getBytes)
Thread.sleep(500)
client.setData.forPath(nestedPath2, data2.getBytes)
Thread.sleep(500)
client.setData.forPath(nestedPath1, data3.getBytes)
Thread.sleep(500)
client.setData.forPath(nestedPath2, data4.getBytes)
Thread.sleep(500)
cache.close
client.close
// output.....
// CONNECTION_RECONNECTED
// Child added: (/little/dragon,192.168.1.12)
// Child added: (/little/louis,192.168.1.12)
// Child updated: (/little/dragon,first bit of data)
// Child updated: (/little/louis,second bit)
// Child updated: (/little/dragon,third)
// Child updated: (/little/louis,fourth)
//Why all the Thread.sleepâs? Good question. These Curator methods all seem to run as background processes, meaning that synchronous execution cannot be guaranteed. It is even more confusing to me as the SyncBuilder part of the library is âalways in the backgroundâ which is the opposite of how other frameworks like Node.js implements sync. More research will be needed.