Posts Tagged: scala


29
Nov 11

Our experience with distributed computing using Gridgain

Intro

I’ve been doing distributed computing in various forms for many years. Some at the higher level, some at the lower level and can tell you that with all the research advances and toolkits out there, it’s getting easier, but it’s still not as straight forward as it should be. Similar issues exist in multi-[process|thread] implementations. Although abstraction toolkits exist and they definitely make it easier to perform such actions without knowing much about implementing distributed algorithms, they are still leaky abstractions that in most non-trivial cases lead to having to have knowledge of the memory model, synchronization semantics, mutability, network topologies, etc… I’m not saying this is bad, I’m just saying that we haven’t yet reached the point where distributed or multi-[process|thread] computing is a cross cutting application concern. We have to actively bake it into our applications. I’m not arguing that abstractions should make developers ignorant of the underlying mechanisms, it’s just that they should be introduced at different levels of abstraction. It’s good to know what makes things tick (i.e. algorithms, data structures, etc…). Just look at the ORM world. The promise of not having to know SQL and just programming using OO took the OO world by storm. The naive thought that if you didn’t know the latest/greatest in ORM, you weren’t worthy. Years later, it turned out to be just a fad. Most have now turned back to SQL or some abstraction that’s flexible enough to allow you to work as low level or as high level as needed. In some cases, people are turning away from SQL data stores completely, but that’s another story.

A Little History

About 3 years ago, I was in the process of starting a company with my partners and we had a dilemma. We needed to process large amounts of data in near real-time. We also needed to be able to scale this horizontally in a “near” linear fashion. Data throughput was not temporally predictable, if predictable at all. After doing some searching and trying to fight the urge to implement things from scratch, we came upon the tuple-space programming model. It’s similar to the blackboard system for those that have an AI background. This is exactly what we needed, some shared distributed memory model that we didn’t have to think about (it presented itself as a single unified tuple space), and a programming API that allowed us to distribute jobs to work on the data stored in that model. Javaspaces is the java specification for Tuple Space Model. At the time, the only company that implemented this spec was GigaSpaces. We took their toolkit for a spin and it worked. The model was pleasant to program to and things were great. That’s until they didn’t work. Debugging was difficult, it leaked distributed abstraction. Deployment was also not very straightforward. None of that was the limitation of the Tuple Space Model, rather it was the implementation. I’m not saying GigaSpaces didn’t have a good implementation. I actually think it was rather nice at the time and am sure it’s way better now. At one point, we wrote an intermediary home-brewed rewrite of the system, so that we didn’t have to rush with the main implementation and can flush it out without harsh time constraints. In a few months, we ended up folding the plans to use GigaSpaces not because of the software, but rather because the company [GigaSpaces] had financial difficulties and their software, not being open source, was in flux in our opinion and we didn’t want to bet the success of our company on a commercial product of a company that looked like they were going to fold. Years later, they are still in business, great for them, but I don’t particularly regret our decision, especially looking at today’s landscape.

Most of our backend software is written in Java, Scala, and Python. Our web CRUD front end is written in PHP. The front end has a model that reflects our business domain, though it already encapsulates all of the business rules for our relational backend data store. We have a calculation process that utilizes these business rules to perform a bunch of operations and in the process reads/writes to the database. This process is very involved as it crunches hundreds of thousands of data points and will go up to millions in the next few months. It was written in PHP. We want to rewrite it using the model of distributing the data and computation using data affinity (collocating data and computations together). We’ve done it before and it works. So we’re happy to do it again, but we want to do this right and that might take some time. In the meantime, we wanted to take an intermediary step of distributing the workload amongst multiple servers (brought up and down as needed). I’ve been looking at numerous distributed toolkits for a while, from Hadoop to Akka to Gridgain. One that always stood out in the crowd has been Gridgain and in the last 6 months I’ve tried to find some place where it would pay its dividends. This project was it. We had a distributed scheduling service running on ec2 within a week, not bad, being that I had to learn the APIs and various ec2 deployment ins and outs.

Implementation

Our implementation has a scheduler that decides what computations need to be performed. We then schedule these computations by pushing a job to a distributed worker node. Because our job is run as a shell script (invoking PHP) and outputs statistics after it successfully runs, we run the job using java’s ProcessBuilder class. We then run the process, and capture its output (in json). The output is then returned to the scheduling node, evaluated, and logged. The scheduler then knows that this job can run again (we have a need to ensure the job is a singleton when it comes to running in a grid).

Our implementation is in Scala. Gridgain has a rather nice Scala DSL. We used it as much as we could, but in some cases resorted to java API for reasons I’ll explain later.

First, here is our simple task that scheduler (once it figures out a job needs to run), pushes to remote nodes…

import org.gridgain.grid.GridClosureCallMode._
import org.gridgain.scalar.scalar
import scalar._
import java.io.File
import io.Source
import net.liftweb.json._
import net.liftweb.json.JsonDSL._
import java.util.{Date, Collections}
import java.util.concurrent.{Callable, TimeUnit, Executors, ConcurrentHashMap}


class GridTask(scriptCommand:String, scriptEnv:String, scriptDir:String) extends Callable[JValue] {

  def call() = {
    val proc = new ProcessBuilder(scriptCommand.split(" "): _*)
    proc.command()
    proc.directory(new File(scriptDir))
    proc.environment().put("APPLICATION_ENV", scriptEnv);
    val p = proc.start()
    p.waitFor();
    val retString = Source.fromInputStream(p.getInputStream).getLines()
    val errString = Source.fromInputStream(p.getErrorStream).getLines()
    if (p.exitValue == 0 && !retString.isEmpty) {
      parse(retString.mkString("\n"))
    } else {
      ("event_id" -> 53) ~
        ("start_time" -> new Date().getTime) ~
        ("error" -> errString)
    }
  }
}

The above is pretty self explanatory. I kept of bunch of irrelevant things around, like inferring the return of the process and parsing/returning json.

Our scheduler is more complex, so I won’t show it, but it’s all custom business logic, nothing that has to do with scheduling a job on the grid. To scheduler the job, all you have to do is…

val retVal = grid.remoteProjection().call(
      UNICAST, 
      new GridTask(scriptCommand, scriptEnv, scriptDir))

retVal is now the JValue instance returned from the remote job. If you get rid of the custom business logic in the GridTask implementation, the whole thing is a few lines of code. Talk about an “abstraction”! Also, one mention is that don’t let simplicity in their examples fool you. Their API is full blown and gives you the level of granularity you need, jus ask, and ye shall have. For example, grid.remoteProjection() returns a projection of all remote nodes (not including the current node, which is local). This is important for us because we didn’t want the local node (scheduler) doing any computations as it’s running on a box not able to support it.

Deployment

One great thing about Gridgain, is that it works the same on a single node as it does on multiple nodes (same physical box), as it does on multiple physical nodes. You can also start multiple nodes within a single JVM. When I first heard this, I thought to myself, sounds great, but why? Nikita mentioned debugging and then a light came on. I remembered debugging GigaSpaces stuff and what worked on a single node, sometimes didn’t on multiple nodes. Mind, it was almost always my mistake, but debugging it was not very easy.

Our infrastructure runs on EC2. Gridgain provides EC2 images, but besides the fact that they run CentOS I believe, which I’ve grown to dislike, I’m also a control freak when it comes to my servers. I want them clean and mean:-). I prefer debian/ubuntu boxes, though I opted to create my own AMI. Installing Gridgain was easy, configuring is also a 2 minute task. It took me a few hours to figure it out and with the help of the forum, the configuration was a few lines of XML. We’re using the community edition, which comes with rudimentary IP discovery SPI. They have much more robust discovery SPIs available in their enterprise edition. One which I think makes the most sense on EC2 is S3 discovery. Basically, it uses S3 to write node information, and all nodes communicate using a S3 bucket. Makes sense. We weren’t ready to dish any money out for enterprise version yet, so I had to settle for IP discovery. In our case, it wasn’t hard. Basically, the scheduler in a single node that runs behind an elastic IP address that never changes. That means that the other boxes only have to know the IP address of the scheduler to make the initial communication. Once it can connect to one node, it joins the full grid. Because we have a single scheduler, if the scheduler goes down, the workers are no longer a part of the grid until the scheduler comes back up. This is OK for us, since due to some domain details, we can only have a single scheduler at this time and we’re OK with that single point of failure, especially being that we can bring it back up in no time and the worker nodes patiently retry the join operation at an interval and then rejoin the grid once the scheduler is back up. This is out topology. Gridgain supports pretty much what ever you want, including P2P no single point of failure topology. Below are the relevant configurations for our stuff…

Scheduler

<property name="discoverySpi">
    <bean class="org.gridgain.grid.spi.discovery.tcp.GridTcpDiscoverySpi">
        <property name="localAddress" value="10.1.1.1"/>
        <property name="ipFinder">
            <bean class="org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.GridTcpDiscoveryVmIpFinder">
                <property name="shared" value="true"/>
                <property name="addresses">
                    <list>
                        <value>10.1.1.1:47500</value>
                    </list>
                </property>
            </bean>
        </property>
        <property name="heartbeatFrequency" value="2000"/>
    </bean>
</property>

The 10.1.1.1 is the local IP address of the scheduler box. The ip is repeated in the “addresses” section, telling gridgain that this can be the sole server and it doesn’t have to join a grid topology before it goes live. Also, shared=”true” is important, as it tells gridgain to share configurations amongst the boxes in the grid. Without it, you’ll have an “order of operations” issue, where a master must be started first before the worker. With it, that issue is moot and you can start/stop things as you please. I wish they would make this the default.

Right now, Gridgain cannot bind to a wildcard, though you have to specify the private IP address. If it changes (box reboots), you have to change it too. They promised a solution in their next release, which will allow to listen on private IP and communicate over public IP. This will help in other NAT topologies. Being able to listen to a wildcard will also help in having a config you never have to change. But even with this caveat, this is quite a breeze.

The worker config is similar, except it only needs to know about the scheduler and does not need to operate until it has joined a topology…

<property name="discoverySpi">
    <bean class="org.gridgain.grid.spi.discovery.tcp.GridTcpDiscoverySpi">
        <property name="localAddress" value="10.1.1.2"/>
        <property name="ipFinder">
            <bean class="org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.GridTcpDiscoveryVmIpFinder">
                <property name="shared" value="true"/>
                <property name="addresses">
                    <list>
                        <value>scheduler.node.com:47500</value>
                    </list>
                </property>
            </bean>
        </property>
        <property name="heartbeatFrequency" value="2000"/>
    </bean>
</property>

Caveats

There are small caveats I found, none of which created much of a hurdle.

First is serialization. In my case, I’m was using logback for logging, gridgain uses log4j. We both use slf4j, which takes the first in the classpath. If you’re going to distributed a job that references something that clashes with classpaths, you have to do some classpath mangling. Removing log4j from gridgain’s lib directory would fix the issue, but I didn’t want to customize the install. I was originally using a Scala closure as a job unit, which had no references to the log object. In theory, if that’s the unit that gets serialized and sent over the wire, the other end should not have to worry about any logback references, since they aren’t a part of the serialized closure references. In my case, that didn’t work. Somehow the serialization decided to serialize logback related stuff, because the top level class where the closure was being created used the logger. I’m not sure if this is a problem with serialization or a leaky abstraction of the JVM and the fact that functions aren’t first class citizens. I think the lowest level of serialization is at the class level, though I had to extract it to a class and implement a GridTask instead. Because GridTask class extension didn’t reference any logger object, it was serialized as needed and sent over without causing a classpath conflict. I haven’t had the time yet to figure out whether it’s the fault of Gridgain’s optimized serializer or whether this is a side effect of the JVM (as I mentioned above). I’ll try to find some time to test this later.

Second, Gridgain Community Edition has discovery that works great in a homogeneous topology, but for EC2 (NAT, ephemeral private IP addresses, etc…), configuration is transient in terms of if any of the things I listed changes, the config must change. This can be remedied by startup scripts, but Nikita said they’ll add better support for it in the next version (public IP communication would be a good first step, binding to wildcard interfaces would be a great second).

Conclusion

Overall we had an awesome experience with Gridgain. The grid application ran flawlessly during our busiest time of the year (Thanksgiving weekend). It ran so flawlessly, that this morning, I forgot which boxes it was physically running on.

I plan on using Gridgain in the future and hopefully utilize their data grid to rewrite our computation system to utilize in memory data/compute collocation (data affinity).

Nikita, thanks for all the help getting things sorted out in the first few days.


21
Oct 10

On JVM, languages, platforms, and frameworks

Today apple announced through their Java update that their support for the JVM is now deprecated and will possibly be removed from the future OS releases. The blogosphere is flaming, mostly with Java supporters who are either pissed off at Apple, worried about the future of their investment in the Java platform, or both. I don’t think the future of the Java platform should be in question due to any apple decisions, at the end of the day, there aren’t many production Java deployments on OS X, but it is a fact that a large portion of Java developers utilize OS X as their primary development platform. These developers, without proper support for their environments, will move to Linux and maybe even Windows. This move alone will probably not hurt apple in the short run, but their behavior towards isolating different developer groups, will eventually come back on them. Developers from any environments will cautiously approach, as the tamed Leopard and eventually Lion might bite them in the ass at the least expected moment.

It might be that Apple wants Oracle to take the charge in maintaining the OS X port and deprecating support might be a way to negotiate this without face-to-face negotiations. I’m fine with that, frankly I could care less who provides the JVM, as long as one is provided and is relatively actively supported. Until that announcement happens, this is yet another bump in the future of the JVM. First the Oracle purchase of Sun, then the lawsuit, and now the decision by Apple, definitely creates unneeded distractions for this platform’s developers all over the globe.

So I started this not to gripe any more about Apple’s decision, I’m sure there are enough posts out there flooding your RSS streams to keep you busy, rather I wanted to question the future of languages/industry in regards to the language/platform of the future.

Last week I attended the StrangeLoop conference in St. Louis. It was a gem of a conference, definitely the best I’ve been too in a long time. Alex, besides seeming like an overall awesome guy, has some extraordinary “brilliant people herding” abilities. How he managed to bring together a group of brilliant speakers and then convince another group of awesome developers to attend, is beyond me. The conference had some great talks and panels about the latest/greatest and bleeding edge tech stuff. One of the best panels was about the future of programming languages. The panel consisted of (Guy Steele, Alex Payne, Josh Bloch, Bruce Tate, and Douglas Crockford), all whom I have great respect for. One prevailing factor in most discussions in this panel as well as throughout the conference, has been concurrency. In the mutli-core/cpu world, what language/platform will allow for this paradigm transition to happen seamlessly. The fact is that although there are some awesome innovations/implementations going on in this area, STM, Actors, fork/join, and various others, none have yet abstracted the concurrency model away from the developer, as seamless as memory management and garbage collection is done in today’s runtime environments. But this is an exciting time to be in, many ideas are flowing around and something will appear on the horizon sooner or later. This something, as Guy Steele pointed out, will most likely be a model that will allow for divide/conquer, map/reduce operations to happen through language idioms and possible seamless abstractions. Accumulators are evil :-)

There are many languages/platforms out there today, but none have been as predominant and as overall polished as Java and the JVM. From the language perspective Java’s getting stagnant and to some, boring, but the fact that it has an ecosystem of wonderful libraries and products is hard to ignore. The fact that all of these are bytecode compatible is even more to rant about, as with the advent of numerous great languages built on top of the JVM, it makes the transition to a different language and programming paradigm, much easier. It is truly hard to think of any current platform/VM that’s more prevalent and better suited for large scale enterprise development than the JVM. .NET comes to mind, but I doubt anyone from the non-Microsoft camp will be switching :-) . There are other platforms, most notably Python and Ruby, but although both are credible, the presence of GIL on both, make the choice of using them in a concurrency model very difficult. You can architect and deploy your system as multi-process vs. multi-thread and arguably that model has its benefits, mostly by getting rid of the shared state model concurrency issues, but we (at least I do) like to have a choice. This decision shouldn’t be shoved down our throats because the language development camp doesn’t want, doesn’t think one is necessary, or [add your own excuse here] to produce a thread-safe non-GIL thread model.

The other issue with most of these languages/platforms, as well as the other ones I like, is the deployment options. They suck! From providing modular builds to deploying production applications, they just aren’t as polished and in most cases as stable/supported as the JVM ones. Common Lisp, one of my favorites as of late, for example, is an awesome language with numerous compilers/interpreters. Lisp doesn’t have a good packaging, dependency resolution, and build story, but even if you can get past that with some of the available half-baked solutions, then when it’s time to build/compile/deploy your app, you’re fucked, unless you want to build one yourself. I enjoy such challenges on Friday/Saturday nights, but not when time is limited and milestones are due (which is most of the time).

Ruby and Python for example, have a decent package managers gem and easy_install/pip respectively, but two problems lurk. First, lots of modules are written in C and in many cases, in my experience, are a big pain in the ass to compile, especially with today’s heterogeneous architectures i386, x86_64, etc… Lots of incompatibilities arise, forcing more time away from doing what I should be doing. Somehow my milestones never include the 2+ day derailments due to such issues. Maybe that’s what’s left of my optimism. The second problem only applies if you’re writing a web app and if you are, then you know the issues. Where are those stable/supported app servers? WSGI and rack should provide answers soon, for now, there are many options and none are without major issues as well. Some are a pain to install/deploy, some aren’t actively maintained. I mean, am I just being anal and asking for way too much or am I eternally spoiled by the JVM. Is it too much to ask to bundle the application into some archive or directory structure and just drop it in somewhere or point your server config towards it. Either way, even if they ease the pain of deploying webapps, the fact that [Python/Ruby] are not suitable in multi-core/cpu environments where threads are needed, is a show stopper for lots of apps I write. I know I can architect around the issues, but again, why should I have to program to the platform vs. the other way around. Give me the choices and trust me to make the best decision.

The next things is native GUI development. It is true that lots of interesting apps today are developed and deployed as web apps, but that doesn’t discount the fact that there is still a need for a native GUI in lots of use cases. Swing provides a good and in some instances really good, cross platform GUI library which allows to deploy your GUI across most popular platforms with 95% or more cross platform consistency. That sounds pretty good to me.

There are other toolkits, wxWidgets, QT, etc…, which also have bindings to python and ruby, but again, with today’s multicore, it would be a shame to not be able to utilize these cores simultaneously due to GIL. The bindings in languages that due provide a better concurrency story, work great, but these languages still suffer from the other pain points I mentioned before (i.e. deployment, build, package management, etc…). It’s a Catch-22.

So maybe I’m missing something here, but I think the JVM is the best option we have at this time that allows for multiple platforms, languages, paradigms, and comes with a great success story in the enterprise (build tools, deployment/modularity, enterprise grade servers, etc…). Languages implemented on top of the JVM benefit from this quite successful ecosystem. Ah, and might I mention that great libraries exist for about anything you’re trying to do. This is also true of Python, but I can’t say the same for Ruby. Ruby has numerous gems for most tasks, but they all seem half-baked at most. There are frameworks like Rails and Sinatra, which are great and fully supported with active communities, though as long as you don’t venture too far off the traditional path.

JVM has it’s own set of issue, the fact that it was written with static languages in mind and lacks support for dynamic bindings, tail call optimizations, and other things that make writing languages on top of it more difficult. It’s future is now also in question due to the new Oracle stewardship and the legal obstacles it chose to pursue rather than spend that time and money on the platform. Nevertheless, the ecosystem is still flourishing, kept afloat but tons of great developers and supporting companies who care about the platform and greatly benefit from it. JVM allows us to program in different languages while being concentrated on the task at hand, not peripheral issues like compiling for different architectures, battling the deployment inadequacies, not being able to utilize cores efficiently, and a variety of other issues. JVM ecosystem might not have the most ideal solutions to these problems, but they are far better than anything out there right now. If people that spend their time bashing the JVM platform would spend as much time making their platform better, maybe we’d have other choices.

I’d love to hear other’s thoughts on this topic. What do you think about the JVM and what’s your language/platform of choice. How do you build, deploy, distribute your applications? What concurrency options are available on that platform and how they compare to others? I’m familiar with most JVM options, especially Clojure and Scala, so I’m mostly asking for anything outside of the JVM ecosystem. I hope to someday compile a list of these and present them in an objective manner, for now, all I have is my empirical opinions.


19
Feb 10

Extension-based content negotiation and nested routes with Restlet

I’ve been working with Restlet to expose a RESTful api interface to the data model for one of my projects. Restlet is a super flexible library allowing one to configure and access all the properties of HTTP through a REST-oriented API.

The application bootstrapping options are also super flexible, allowing one to configure how routes are resolved and nest routes for cascading capabilities. I ran into a small caveat when I tried to configure extension based content negotiation. Basically, the idea of extension based content negotiation, is that instead of using “Accept” headers, one can append a mime extension to their request URI to request a particular return format. Say, we have a http://localhost/api/resource uri, one can request xml or json formats by simply doing http://localhost/api/resource.xml or http://localhost/api/resource.json. Of course your resource has to support these formats. The documentation on this type of content negotiation is non-existent. I had to scour a bunch of users group messages and javadocs before I figured it out. I figured I’ll shared if someone else is interested.

My applications is written in Scala, so examples will be provided as such. I’m sure any experienced developer can easily discern the java equivalent.

First, in your application bootstrapping, you must turn on the extensionsTunnel option. Here is my code, which also demonstrates nested routes. Then, in your resource you must conditionally infer the MediaType provided and emit the representation of this resource based on it.

import org.restlet.{Restlet, Application => RestletApplication}
import scala.xml._
//... other imports excluded

class TestApplication extends RestletApplication {
  override def createInboundRoot: Restlet = {

    val apiRouter = new Router(getContext)
    apiRouter.attach("/test", classOf[TestResource])

    val rootRouter = new Router(getContext)
    rootRouter.attach("/api/v1", apiRouter).getTemplate.setMatchingMode(Template.MODE_STARTS_WITH)

    getTunnelService.setExtensionsTunnel(true)

    return rootRouter
  }
}

class TestResource extends ServerResource {

  @Get("xml|json")
  def represent(v:Variant):String = {
    return v.getMediaType match {
          case MediaType.TEXT_XML | MediaType.APPLICATION_XML => <response><message>Hello from Restlet!</message></response>.toString
          case MediaType.APPLICATION_JSON => "{\"message\": \"Hello from Restlet\"}"
        }
  }
}

First, the root router’s matching mode must be set to Template.MODE_STARTS_WITH, otherwise it will try to match based on full absolute uri path and not find any nested resources. So the matching mode is very important in the case where you’re working with nested resources.

Second, you set the extensions tunnel property to true: getTunnelService.setExtensionsTunnel(true). This will turn on the extension tunneling service and perform content negotiation based on the URI’s extension. Note: if an extension is not provided, it will resort to first available representation supported by the resource. It can get more complicated I believe based on other configurations, but this is what happens in the most simple scenario.

Now, with content negotiation on, the resource has to conditionally infer the proper MediaType requested and provide its representation for the MediaType. In Scala this is very elegantly done using the super flexible match/case construct. This construct can be used as Java’s switch statement, but it is way more powerful and allows for advanced pattern matching. As you can see, I check for both xml and json media types and provide the proper representation. The supported media types are handled through @Get annotation. For more info, see Restlet’s annotations and Resource documentation.

Now, accessing the resources yields the following results:

  $ curl http://localhost:8080/api/v1/test.xml
  Hello from Restlet

  $ curl http://localhost:8080/api/v1/test.json
  {"message": "Hello from Restlet"}

10
Feb 10

Avoid using nulls in Scala

Scala’s handling of null’s mixed with implicit casting is quite tricky. I learned the hard way today and it took hours to figure out what was going on. First I thought it was a bug, but then someone pointed out how implicit casting effect null method parameters.

The bottom line is: DO NOT USE NULLs unless you are utilizing java libraries and have no choice. Use Option instead, with Some() or None().

The problem is best described with code…

  def checkNullOrEmpty(v:Seq[Any]):Boolean = {
    println("Class:"+v.getClass)
    return (v != null) && !v.isEmpty
  }

  case class Race(val event:String, val protocol:String) {
    println("Event:"+event+", protocol:"+protocol)
    assert(checkNullOrEmpty(event))
    assert(checkNullOrEmpty(protocol))
  }

  val t = new Race(null, null)

Pasting the above into REPL yields the following result…

Event:null, protocol:null
Class:class scala.collection.immutable.WrappedString
java.lang.NullPointerException
    at scala.Proxy$class.toString(Proxy.scala:29)
    at scala.collection.immutable.WrappedString.toString(WrappedString.scala:22)
    at scala.collection.immutable.StringLike$class.length(StringLike.scala:48)
    at scala.collection.immutable.WrappedString.length(WrappedString.scala:22)
    at scala.collection.IndexedSeqLike$class.isEmpty(IndexedSeqLike.scala:81)
    at scala.collection.immutable.WrappedString.isEmpty(WrappedString.scala:22)
    at .checkNullOrEmpty(<console>:6)
    .....

So why is NPE thrown at this breakpoint return (v != null) && !v.isEmpty?

So let’s look further into the output. When a Race instance is created, the constructor values are initialized to null. Inside the constructor we print this out and verify that values are in fact null. When We get to checkNullOrEmpty method call, the class of v is WrappedString and though the object is no longer null. In Java the call to getClass would fail, as v would be null, in Scala it’s casted (converted) to WrappedString.

This happens through Scala’s implicit conversions. The checkNullOrempty method expects a Seq. Although Seq is not a superclass or interface of String. So when we create the Race instance and specify both event and protocol as null, they are still String types with a null reference. Using say event or protocol as an argument to checkNullOrEmpty yields an implicit conversion. Why? Well, in Java the compilation would fail, since the Seq trait is not a part of the inheritance hierarchy of String, but in Scala, it succeeds, as Scala finds an implicit conversion method to convert String to WrappedString. This method is defined in Predef object. We know the Predef is imported by default into all Scala classes. Predef extends LowPriorityImplicits class, which defines this implicit conversion implicit def wrapString(s: String): WrappedString = new WrappedString(s). So basically Scala decides that the best way to convert the String type into Seq[Any] is by using this implicit conversion. So it wraps the null value with the WrappedString.

This causes two issues… First, the not null check no longer works, as the object is not null, due to the fact that it’s an instance of WrappedString, so (v != null) is true. Since that passes, it then executes the RHS of && operator and then tries to infer on !v.isEmpty, which throws the NPE, as the underlying String value wrapped is null.

I’m not necessarily sure whether this is a bug, feature, or maybe there is no real consensus on how null should be handled, but as you see this causes issues and should either be avoided through avoiding null and using Option instead. If you are using java libs that return nulls, wrapping the return value in Option might be a good idea, before proceeding any further.


6
Feb 10

Implementing bloom filter with a murmur hash function

Recently I read a blog post by Jonathan Ellis about bloom filters. Jonathan works on Cassandra and though had lots of empirical recommendations on its implementation. Cassandra uses bloom filter extensively for performance optimization. Bloom filter is a space-efficient probabilistic data structure used to test whether an element belongs to a set. It’s similar to a standard hashtable, except that its space efficient and doesn’t store the actual value, rather it hashes the keys in a bit array. The reason it’s a probabilistic data structure, is that it allows false positives, but not false negatives. This means, that to answer the question whether A is a subset of B (A ⊆ B), a bloom filter returns true or false. A false (doesn’t exist) is guaranteed to be accurate, but true (exists), has a probability of being false positive.

So why would one use such an algorithm? Say you store records on disk. Someone requests a particular record and you proceed to seek this record. This is usually an expensive operation for high throughput or systems with limited resources, though before invoking such an expensive operation you can find out if the record exists. If record does exist, you can then proceed to retrieve it through the more intensive operation. Because there is a small probability of this being a false positive, you might still find (through the resource intensive operation) that it doesn’t exist. So in an environment where you’re servicing many requests which might not exist, you can reduce the amount of expensive operations and answer such request in constant time O(1).

Jonathan’s blog post provides some great information about how to implement a very effective bloom filter. One of the most important considerations is the hash function. To lower the probability of false positives, one must use a hash function which effectively distributes the hashes across the hash-space. Jonathan recommends the use of murmur hash algorithm, which is one of the most efficient and effective hash functions, which has great performance and low collisions rate.

Another thing done to reduce hash collisions and in turn false positives, is the fact that you don’t just turn on the bits of a single hash function result, rather, you do so numerous times. (5 times is referred to a lot in literature and seems like a sweet spot). This means, that you take a key, calculate 5 hashes (using 5 different hash algorithms or a single hash algorithm strategy I’ll discuss below) and set the bit for each one of these hashes in the bit array. Answering the question of whether the key exists, does the reverse. Calculate 5 hashes and check to make sure they are all set in the bit array. If any of the 5 aren’t set, then you can be assured it doesn’t exist.

So let’s look at some code. Below is the implementation of bloom filter in Scala. It relies on a murmur hash implementation which I won’t list, but you can view/download it here.

  import scala.collection.mutable.BitSet

  class BloomFilter(capacity:Int, hashable:Hashable, hashCount:Int = 5) {

    private val buckets:BitSet = { new BitSet(capacity) }
    private val hashFunc = hashable.hashes(hashCount)(capacity) _

    def addValue(value:String) {
      hashFunc(value).foreach( buckets += _ )
    }

    def exists_?(value:String):Boolean = {
      for ( i <- hashFunc(value) ) if (!buckets.contains(i)) return false
      return true
    }
  }

  trait Hashable {
    def hashes(hashCount:Int)(max:Int)(value:String):Array[Int]
  }

  class MurmurHashable extends Hashable {
    import com.cobrio.algorithms.{MurmurHash => MH}
    def hashes(hashCount:Int)(max:Int)(value:String):Array[Int] = {
      val hash1 = MH.hash(value.getBytes, 0)
      val hash2 = MH.hash(value.getBytes, hash1)
      ( for ( i <- 0 until hashCount) yield Math.abs((hash1 + i * hash2) % max) ).toArray
    }
  }

The code above should be pretty self explanatory, but let’s just take a look at the hashing strategy. We calculate 5 hashes (default) on the key being stored, although we only ever invoke the murmur algorithm twice. Look at the highlighted lines above. Adam Kirsch and Michael Mitzenmacher wrote a paper titled, Less Hashing, Same Performance…, which shows that using a particular hashing technique which simulates additional hash functions beyond two, can increase performance of bloom filters without any significant loss in the false positive probability. To summarize the math in the paper, this is the formula: gi(x) = h1(x) + ih2(x) mod m, where m is the number of buckets in the bloom filter, h1 and h2 are the two calculated hashes respectively, and i will range from 0 up to k – 1 where k is the number of hashes we want to generate.

Here is how you’d use the above bloom filter…

  val bloom = new BloomFilter(2000, new MurmurHashable())
  bloom.addValue("Ilya Sterin")
  bloom.addValue("Elijah Sterin")

  assert(bloom.exists_?("Ilya Sterin"))
  assert(bloom.exists_?("Elijah Sterin"))
  assert(!bloom.exists_?("Don't Exist"))