I’ve been doing distributed computing in various forms for many years. Some at the higher level, some at the lower level and can tell you that with all the research advances and toolkits out there, it’s getting easier, but it’s still not as straight forward as it should be. Similar issues exist in multi-[process|thread] implementations. Although abstraction toolkits exist and they definitely make it easier to perform such actions without knowing much about implementing distributed algorithms, they are still leaky abstractions that in most non-trivial cases lead to having to have knowledge of the memory model, synchronization semantics, mutability, network topologies, etc… I’m not saying this is bad, I’m just saying that we haven’t yet reached the point where distributed or multi-[process|thread] computing is a cross cutting application concern. We have to actively bake it into our applications. I’m not arguing that abstractions should make developers ignorant of the underlying mechanisms, it’s just that they should be introduced at different levels of abstraction. It’s good to know what makes things tick (i.e. algorithms, data structures, etc…). Just look at the ORM world. The promise of not having to know SQL and just programming using OO took the OO world by storm. The naive thought that if you didn’t know the latest/greatest in ORM, you weren’t worthy. Years later, it turned out to be just a fad. Most have now turned back to SQL or some abstraction that’s flexible enough to allow you to work as low level or as high level as needed. In some cases, people are turning away from SQL data stores completely, but that’s another story.
A Little History
About 3 years ago, I was in the process of starting a company with my partners and we had a dilemma. We needed to process large amounts of data in near real-time. We also needed to be able to scale this horizontally in a “near” linear fashion. Data throughput was not temporally predictable, if predictable at all. After doing some searching and trying to fight the urge to implement things from scratch, we came upon the tuple-space programming model. It’s similar to the blackboard system for those that have an AI background. This is exactly what we needed, some shared distributed memory model that we didn’t have to think about (it presented itself as a single unified tuple space), and a programming API that allowed us to distribute jobs to work on the data stored in that model. Javaspaces is the java specification for Tuple Space Model. At the time, the only company that implemented this spec was GigaSpaces. We took their toolkit for a spin and it worked. The model was pleasant to program to and things were great. That’s until they didn’t work. Debugging was difficult, it leaked distributed abstraction. Deployment was also not very straightforward. None of that was the limitation of the Tuple Space Model, rather it was the implementation. I’m not saying GigaSpaces didn’t have a good implementation. I actually think it was rather nice at the time and am sure it’s way better now. At one point, we wrote an intermediary home-brewed rewrite of the system, so that we didn’t have to rush with the main implementation and can flush it out without harsh time constraints. In a few months, we ended up folding the plans to use GigaSpaces not because of the software, but rather because the company [GigaSpaces] had financial difficulties and their software, not being open source, was in flux in our opinion and we didn’t want to bet the success of our company on a commercial product of a company that looked like they were going to fold. Years later, they are still in business, great for them, but I don’t particularly regret our decision, especially looking at today’s landscape.
Most of our backend software is written in Java, Scala, and Python. Our web CRUD front end is written in PHP. The front end has a model that reflects our business domain, though it already encapsulates all of the business rules for our relational backend data store. We have a calculation process that utilizes these business rules to perform a bunch of operations and in the process reads/writes to the database. This process is very involved as it crunches hundreds of thousands of data points and will go up to millions in the next few months. It was written in PHP. We want to rewrite it using the model of distributing the data and computation using data affinity (collocating data and computations together). We’ve done it before and it works. So we’re happy to do it again, but we want to do this right and that might take some time. In the meantime, we wanted to take an intermediary step of distributing the workload amongst multiple servers (brought up and down as needed). I’ve been looking at numerous distributed toolkits for a while, from Hadoop to Akka to Gridgain. One that always stood out in the crowd has been Gridgain and in the last 6 months I’ve tried to find some place where it would pay its dividends. This project was it. We had a distributed scheduling service running on ec2 within a week, not bad, being that I had to learn the APIs and various ec2 deployment ins and outs.
Our implementation has a scheduler that decides what computations need to be performed. We then schedule these computations by pushing a job to a distributed worker node. Because our job is run as a shell script (invoking PHP) and outputs statistics after it successfully runs, we run the job using java’s ProcessBuilder class. We then run the process, and capture its output (in json). The output is then returned to the scheduling node, evaluated, and logged. The scheduler then knows that this job can run again (we have a need to ensure the job is a singleton when it comes to running in a grid).
Our implementation is in Scala. Gridgain has a rather nice Scala DSL. We used it as much as we could, but in some cases resorted to java API for reasons I’ll explain later.
First, here is our simple task that scheduler (once it figures out a job needs to run), pushes to remote nodes…
The above is pretty self explanatory. I kept of bunch of irrelevant things around, like inferring the return of the process and parsing/returning json.
Our scheduler is more complex, so I won’t show it, but it’s all custom business logic, nothing that has to do with scheduling a job on the grid. To scheduler the job, all you have to do is…
val retVal = grid.remoteProjection().call( UNICAST, new GridTask(scriptCommand, scriptEnv, scriptDir))
retVal is now the JValue instance returned from the remote job. If you get rid of the custom business logic in the GridTask implementation, the whole thing is a few lines of code. Talk about an “abstraction”! Also, one mention is that don’t let simplicity in their examples fool you. Their API is full blown and gives you the level of granularity you need, jus ask, and ye shall have. For example, grid.remoteProjection() returns a projection of all remote nodes (not including the current node, which is local). This is important for us because we didn’t want the local node (scheduler) doing any computations as it’s running on a box not able to support it.
One great thing about Gridgain, is that it works the same on a single node as it does on multiple nodes (same physical box), as it does on multiple physical nodes. You can also start multiple nodes within a single JVM. When I first heard this, I thought to myself, sounds great, but why? Nikita mentioned debugging and then a light came on. I remembered debugging GigaSpaces stuff and what worked on a single node, sometimes didn’t on multiple nodes. Mind, it was almost always my mistake, but debugging it was not very easy.
Our infrastructure runs on EC2. Gridgain provides EC2 images, but besides the fact that they run CentOS I believe, which I’ve grown to dislike, I’m also a control freak when it comes to my servers. I want them clean and mean:-). I prefer debian/ubuntu boxes, though I opted to create my own AMI. Installing Gridgain was easy, configuring is also a 2 minute task. It took me a few hours to figure it out and with the help of the forum, the configuration was a few lines of XML. We’re using the community edition, which comes with rudimentary IP discovery SPI. They have much more robust discovery SPIs available in their enterprise edition. One which I think makes the most sense on EC2 is S3 discovery. Basically, it uses S3 to write node information, and all nodes communicate using a S3 bucket. Makes sense. We weren’t ready to dish any money out for enterprise version yet, so I had to settle for IP discovery. In our case, it wasn’t hard. Basically, the scheduler in a single node that runs behind an elastic IP address that never changes. That means that the other boxes only have to know the IP address of the scheduler to make the initial communication. Once it can connect to one node, it joins the full grid. Because we have a single scheduler, if the scheduler goes down, the workers are no longer a part of the grid until the scheduler comes back up. This is OK for us, since due to some domain details, we can only have a single scheduler at this time and we’re OK with that single point of failure, especially being that we can bring it back up in no time and the worker nodes patiently retry the join operation at an interval and then rejoin the grid once the scheduler is back up. This is out topology. Gridgain supports pretty much what ever you want, including P2P no single point of failure topology. Below are the relevant configurations for our stuff…
The 10.1.1.1 is the local IP address of the scheduler box. The ip is repeated in the “addresses” section, telling gridgain that this can be the sole server and it doesn’t have to join a grid topology before it goes live. Also, shared=”true” is important, as it tells gridgain to share configurations amongst the boxes in the grid. Without it, you’ll have an “order of operations” issue, where a master must be started first before the worker. With it, that issue is moot and you can start/stop things as you please. I wish they would make this the default.
Right now, Gridgain cannot bind to a wildcard, though you have to specify the private IP address. If it changes (box reboots), you have to change it too. They promised a solution in their next release, which will allow to listen on private IP and communicate over public IP. This will help in other NAT topologies. Being able to listen to a wildcard will also help in having a config you never have to change. But even with this caveat, this is quite a breeze.
The worker config is similar, except it only needs to know about the scheduler and does not need to operate until it has joined a topology…
There are small caveats I found, none of which created much of a hurdle.
First is serialization. In my case, I’m was using logback for logging, gridgain uses log4j. We both use slf4j, which takes the first in the classpath. If you’re going to distributed a job that references something that clashes with classpaths, you have to do some classpath mangling. Removing log4j from gridgain’s lib directory would fix the issue, but I didn’t want to customize the install. I was originally using a Scala closure as a job unit, which had no references to the log object. In theory, if that’s the unit that gets serialized and sent over the wire, the other end should not have to worry about any logback references, since they aren’t a part of the serialized closure references. In my case, that didn’t work. Somehow the serialization decided to serialize logback related stuff, because the top level class where the closure was being created used the logger. I’m not sure if this is a problem with serialization or a leaky abstraction of the JVM and the fact that functions aren’t first class citizens. I think the lowest level of serialization is at the class level, though I had to extract it to a class and implement a GridTask instead. Because GridTask class extension didn’t reference any logger object, it was serialized as needed and sent over without causing a classpath conflict. I haven’t had the time yet to figure out whether it’s the fault of Gridgain’s optimized serializer or whether this is a side effect of the JVM (as I mentioned above). I’ll try to find some time to test this later.
Second, Gridgain Community Edition has discovery that works great in a homogeneous topology, but for EC2 (NAT, ephemeral private IP addresses, etc…), configuration is transient in terms of if any of the things I listed changes, the config must change. This can be remedied by startup scripts, but Nikita said they’ll add better support for it in the next version (public IP communication would be a good first step, binding to wildcard interfaces would be a great second).
Overall we had an awesome experience with Gridgain. The grid application ran flawlessly during our busiest time of the year (Thanksgiving weekend). It ran so flawlessly, that this morning, I forgot which boxes it was physically running on.
I plan on using Gridgain in the future and hopefully utilize their data grid to rewrite our computation system to utilize in memory data/compute collocation (data affinity).
Nikita, thanks for all the help getting things sorted out in the first few days.