Uncategorized


29
Nov 11

Our experience with distributed computing using Gridgain

Intro

I’ve been doing distributed computing in various forms for many years. Some at the higher level, some at the lower level and can tell you that with all the research advances and toolkits out there, it’s getting easier, but it’s still not as straight forward as it should be. Similar issues exist in multi-[process|thread] implementations. Although abstraction toolkits exist and they definitely make it easier to perform such actions without knowing much about implementing distributed algorithms, they are still leaky abstractions that in most non-trivial cases lead to having to have knowledge of the memory model, synchronization semantics, mutability, network topologies, etc… I’m not saying this is bad, I’m just saying that we haven’t yet reached the point where distributed or multi-[process|thread] computing is a cross cutting application concern. We have to actively bake it into our applications. I’m not arguing that abstractions should make developers ignorant of the underlying mechanisms, it’s just that they should be introduced at different levels of abstraction. It’s good to know what makes things tick (i.e. algorithms, data structures, etc…). Just look at the ORM world. The promise of not having to know SQL and just programming using OO took the OO world by storm. The naive thought that if you didn’t know the latest/greatest in ORM, you weren’t worthy. Years later, it turned out to be just a fad. Most have now turned back to SQL or some abstraction that’s flexible enough to allow you to work as low level or as high level as needed. In some cases, people are turning away from SQL data stores completely, but that’s another story.

A Little History

About 3 years ago, I was in the process of starting a company with my partners and we had a dilemma. We needed to process large amounts of data in near real-time. We also needed to be able to scale this horizontally in a “near” linear fashion. Data throughput was not temporally predictable, if predictable at all. After doing some searching and trying to fight the urge to implement things from scratch, we came upon the tuple-space programming model. It’s similar to the blackboard system for those that have an AI background. This is exactly what we needed, some shared distributed memory model that we didn’t have to think about (it presented itself as a single unified tuple space), and a programming API that allowed us to distribute jobs to work on the data stored in that model. Javaspaces is the java specification for Tuple Space Model. At the time, the only company that implemented this spec was GigaSpaces. We took their toolkit for a spin and it worked. The model was pleasant to program to and things were great. That’s until they didn’t work. Debugging was difficult, it leaked distributed abstraction. Deployment was also not very straightforward. None of that was the limitation of the Tuple Space Model, rather it was the implementation. I’m not saying GigaSpaces didn’t have a good implementation. I actually think it was rather nice at the time and am sure it’s way better now. At one point, we wrote an intermediary home-brewed rewrite of the system, so that we didn’t have to rush with the main implementation and can flush it out without harsh time constraints. In a few months, we ended up folding the plans to use GigaSpaces not because of the software, but rather because the company [GigaSpaces] had financial difficulties and their software, not being open source, was in flux in our opinion and we didn’t want to bet the success of our company on a commercial product of a company that looked like they were going to fold. Years later, they are still in business, great for them, but I don’t particularly regret our decision, especially looking at today’s landscape.

Most of our backend software is written in Java, Scala, and Python. Our web CRUD front end is written in PHP. The front end has a model that reflects our business domain, though it already encapsulates all of the business rules for our relational backend data store. We have a calculation process that utilizes these business rules to perform a bunch of operations and in the process reads/writes to the database. This process is very involved as it crunches hundreds of thousands of data points and will go up to millions in the next few months. It was written in PHP. We want to rewrite it using the model of distributing the data and computation using data affinity (collocating data and computations together). We’ve done it before and it works. So we’re happy to do it again, but we want to do this right and that might take some time. In the meantime, we wanted to take an intermediary step of distributing the workload amongst multiple servers (brought up and down as needed). I’ve been looking at numerous distributed toolkits for a while, from Hadoop to Akka to Gridgain. One that always stood out in the crowd has been Gridgain and in the last 6 months I’ve tried to find some place where it would pay its dividends. This project was it. We had a distributed scheduling service running on ec2 within a week, not bad, being that I had to learn the APIs and various ec2 deployment ins and outs.

Implementation

Our implementation has a scheduler that decides what computations need to be performed. We then schedule these computations by pushing a job to a distributed worker node. Because our job is run as a shell script (invoking PHP) and outputs statistics after it successfully runs, we run the job using java’s ProcessBuilder class. We then run the process, and capture its output (in json). The output is then returned to the scheduling node, evaluated, and logged. The scheduler then knows that this job can run again (we have a need to ensure the job is a singleton when it comes to running in a grid).

Our implementation is in Scala. Gridgain has a rather nice Scala DSL. We used it as much as we could, but in some cases resorted to java API for reasons I’ll explain later.

First, here is our simple task that scheduler (once it figures out a job needs to run), pushes to remote nodes…

import org.gridgain.grid.GridClosureCallMode._
import org.gridgain.scalar.scalar
import scalar._
import java.io.File
import io.Source
import net.liftweb.json._
import net.liftweb.json.JsonDSL._
import java.util.{Date, Collections}
import java.util.concurrent.{Callable, TimeUnit, Executors, ConcurrentHashMap}


class GridTask(scriptCommand:String, scriptEnv:String, scriptDir:String) extends Callable[JValue] {

  def call() = {
    val proc = new ProcessBuilder(scriptCommand.split(" "): _*)
    proc.command()
    proc.directory(new File(scriptDir))
    proc.environment().put("APPLICATION_ENV", scriptEnv);
    val p = proc.start()
    p.waitFor();
    val retString = Source.fromInputStream(p.getInputStream).getLines()
    val errString = Source.fromInputStream(p.getErrorStream).getLines()
    if (p.exitValue == 0 && !retString.isEmpty) {
      parse(retString.mkString("\n"))
    } else {
      ("event_id" -> 53) ~
        ("start_time" -> new Date().getTime) ~
        ("error" -> errString)
    }
  }
}

The above is pretty self explanatory. I kept of bunch of irrelevant things around, like inferring the return of the process and parsing/returning json.

Our scheduler is more complex, so I won’t show it, but it’s all custom business logic, nothing that has to do with scheduling a job on the grid. To scheduler the job, all you have to do is…

val retVal = grid.remoteProjection().call(
      UNICAST, 
      new GridTask(scriptCommand, scriptEnv, scriptDir))

retVal is now the JValue instance returned from the remote job. If you get rid of the custom business logic in the GridTask implementation, the whole thing is a few lines of code. Talk about an “abstraction”! Also, one mention is that don’t let simplicity in their examples fool you. Their API is full blown and gives you the level of granularity you need, jus ask, and ye shall have. For example, grid.remoteProjection() returns a projection of all remote nodes (not including the current node, which is local). This is important for us because we didn’t want the local node (scheduler) doing any computations as it’s running on a box not able to support it.

Deployment

One great thing about Gridgain, is that it works the same on a single node as it does on multiple nodes (same physical box), as it does on multiple physical nodes. You can also start multiple nodes within a single JVM. When I first heard this, I thought to myself, sounds great, but why? Nikita mentioned debugging and then a light came on. I remembered debugging GigaSpaces stuff and what worked on a single node, sometimes didn’t on multiple nodes. Mind, it was almost always my mistake, but debugging it was not very easy.

Our infrastructure runs on EC2. Gridgain provides EC2 images, but besides the fact that they run CentOS I believe, which I’ve grown to dislike, I’m also a control freak when it comes to my servers. I want them clean and mean:-). I prefer debian/ubuntu boxes, though I opted to create my own AMI. Installing Gridgain was easy, configuring is also a 2 minute task. It took me a few hours to figure it out and with the help of the forum, the configuration was a few lines of XML. We’re using the community edition, which comes with rudimentary IP discovery SPI. They have much more robust discovery SPIs available in their enterprise edition. One which I think makes the most sense on EC2 is S3 discovery. Basically, it uses S3 to write node information, and all nodes communicate using a S3 bucket. Makes sense. We weren’t ready to dish any money out for enterprise version yet, so I had to settle for IP discovery. In our case, it wasn’t hard. Basically, the scheduler in a single node that runs behind an elastic IP address that never changes. That means that the other boxes only have to know the IP address of the scheduler to make the initial communication. Once it can connect to one node, it joins the full grid. Because we have a single scheduler, if the scheduler goes down, the workers are no longer a part of the grid until the scheduler comes back up. This is OK for us, since due to some domain details, we can only have a single scheduler at this time and we’re OK with that single point of failure, especially being that we can bring it back up in no time and the worker nodes patiently retry the join operation at an interval and then rejoin the grid once the scheduler is back up. This is out topology. Gridgain supports pretty much what ever you want, including P2P no single point of failure topology. Below are the relevant configurations for our stuff…

Scheduler

<property name="discoverySpi">
    <bean class="org.gridgain.grid.spi.discovery.tcp.GridTcpDiscoverySpi">
        <property name="localAddress" value="10.1.1.1"/>
        <property name="ipFinder">
            <bean class="org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.GridTcpDiscoveryVmIpFinder">
                <property name="shared" value="true"/>
                <property name="addresses">
                    <list>
                        <value>10.1.1.1:47500</value>
                    </list>
                </property>
            </bean>
        </property>
        <property name="heartbeatFrequency" value="2000"/>
    </bean>
</property>

The 10.1.1.1 is the local IP address of the scheduler box. The ip is repeated in the “addresses” section, telling gridgain that this can be the sole server and it doesn’t have to join a grid topology before it goes live. Also, shared=”true” is important, as it tells gridgain to share configurations amongst the boxes in the grid. Without it, you’ll have an “order of operations” issue, where a master must be started first before the worker. With it, that issue is moot and you can start/stop things as you please. I wish they would make this the default.

Right now, Gridgain cannot bind to a wildcard, though you have to specify the private IP address. If it changes (box reboots), you have to change it too. They promised a solution in their next release, which will allow to listen on private IP and communicate over public IP. This will help in other NAT topologies. Being able to listen to a wildcard will also help in having a config you never have to change. But even with this caveat, this is quite a breeze.

The worker config is similar, except it only needs to know about the scheduler and does not need to operate until it has joined a topology…

<property name="discoverySpi">
    <bean class="org.gridgain.grid.spi.discovery.tcp.GridTcpDiscoverySpi">
        <property name="localAddress" value="10.1.1.2"/>
        <property name="ipFinder">
            <bean class="org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.GridTcpDiscoveryVmIpFinder">
                <property name="shared" value="true"/>
                <property name="addresses">
                    <list>
                        <value>scheduler.node.com:47500</value>
                    </list>
                </property>
            </bean>
        </property>
        <property name="heartbeatFrequency" value="2000"/>
    </bean>
</property>

Caveats

There are small caveats I found, none of which created much of a hurdle.

First is serialization. In my case, I’m was using logback for logging, gridgain uses log4j. We both use slf4j, which takes the first in the classpath. If you’re going to distributed a job that references something that clashes with classpaths, you have to do some classpath mangling. Removing log4j from gridgain’s lib directory would fix the issue, but I didn’t want to customize the install. I was originally using a Scala closure as a job unit, which had no references to the log object. In theory, if that’s the unit that gets serialized and sent over the wire, the other end should not have to worry about any logback references, since they aren’t a part of the serialized closure references. In my case, that didn’t work. Somehow the serialization decided to serialize logback related stuff, because the top level class where the closure was being created used the logger. I’m not sure if this is a problem with serialization or a leaky abstraction of the JVM and the fact that functions aren’t first class citizens. I think the lowest level of serialization is at the class level, though I had to extract it to a class and implement a GridTask instead. Because GridTask class extension didn’t reference any logger object, it was serialized as needed and sent over without causing a classpath conflict. I haven’t had the time yet to figure out whether it’s the fault of Gridgain’s optimized serializer or whether this is a side effect of the JVM (as I mentioned above). I’ll try to find some time to test this later.

Second, Gridgain Community Edition has discovery that works great in a homogeneous topology, but for EC2 (NAT, ephemeral private IP addresses, etc…), configuration is transient in terms of if any of the things I listed changes, the config must change. This can be remedied by startup scripts, but Nikita said they’ll add better support for it in the next version (public IP communication would be a good first step, binding to wildcard interfaces would be a great second).

Conclusion

Overall we had an awesome experience with Gridgain. The grid application ran flawlessly during our busiest time of the year (Thanksgiving weekend). It ran so flawlessly, that this morning, I forgot which boxes it was physically running on.

I plan on using Gridgain in the future and hopefully utilize their data grid to rewrite our computation system to utilize in memory data/compute collocation (data affinity).

Nikita, thanks for all the help getting things sorted out in the first few days.


31
Oct 11

Distributed locking made easy

There are various situations when one would use a simple mutex to ensure mutual exclusion to a shared resource. This is usually very simple to accomplish using your favorite language library, but that constrains you to a single process mutual exclusion. Even single machine mutual exclusion is rather straight forward, usually just locking a resource (i.e. file) and awaiting for the lock to be released. You can use that for an IPC mutex.

But what if one needs a distributed mutex to allow mutual exclusion amongst distributed clients? At that, the mutex has to offer various guarantees, as is with any shared state. We know shared state is hard to reason about and provokes a lot of bug-prone software, shared distributed state, is much harder, requiring distributed guaranteed consensus all while operating in a non-reliable network environment. There are various distributed consensus algorithms, Paxos being one of the more widely used ones.

But you can deploy your own distributed locking service, without having to implement your own. Apache Zookeeper, offers distributed synchronization and group services. Mutex/locking is just one of the things you can do with Zookeeper. Zookeeper is rather low level, so implementing a distributed lock, although trivial, requires some boilerplate.

Recently we needed a distributed lock service, to ensure only one person in our organization is performing a particular systems activity at any given point and time. We implemented it on top of a homegrown tool written in ruby. The example below is in ruby, though the api calls would translate to any language…

require 'zookeeper'

class Lock
  def initialize(host, root="/my-app")
    @zk = Zookeeper.new(host)
    @root = root
  end
  
  def with_lock(app, timeout, timeout_callback, &block)
    new_lock_res = @zk.create(:path => "#{@root}/#{app}-", :sequence => true, :ephemeral => true)
    unique_lock_path = new_lock_res[:path]
    if get_lock(unique_lock_path, timeout)
      yield
      @zk.delete(:path => unique_lock_path)
    else
      timeout_callback.call
    end
  end

  private
  def get_lock(unique_lock_path, timeout)
    lock_key = unique_lock_path.gsub(/^#{Regexp.quote(@root)}\//, '')

    (0..4).each do
      children = @zk.get_children(:path => @root)[:children].sort
      watcher = Zookeeper::WatcherCallback.new {}
      if (children.first == lock_key)
        return true
      else
        less_than_path_idx = (children.index {|p| p == lock_key}) - 1
        stat_res = @zk.stat(:path => "#{@root}/#{children[less_than_path_idx]}",
                           :watcher => watcher)
        if stat_res[:stat].exists
          success = wait_until(timeout) { watcher.completed? }
          if !success
            return false
          end
        end
      end
    end
    return false
  end

  def wait_until(timeout=10, &block)
    time_to_stop = Time.now + timeout
    until yield do
      if Time.now > time_to_stop
        return false
      end
      sleep 0.1
    end
    return true
  end
end

The usage of this Lock class is such:

    Lock.new("localhost:2181").with_lock(
      "/myapp",
      5, ## Timeout in seconds
      lambda { ## Timeout callback
        abort("Couldn't acquire lock.  Timeout.") },
      lambda { ## Do what ever you want here  }
    )

The details of the algorithm are outlined here.

Of course, before you use it, you must install Zookeeper and create the root path /myapp in order to be able to use it.

Also, please note, I have removed the access control part from the example. In order to use this in production, I strongly encourage you read this.


24
Sep 11

jconsole/jvisualvm rmi on ec2

I finally figured this out, thanks to Google of course. No single post or documentation solved the issue, but after a 2 hour battle and various options, I finally have it working.

If you are running a java app on ec2 and want to remotely connect to it using jconsole or jvisualvm, you need to start your java app with a few options. Here is my configuration. Also, note that disabling authentication, opens this up for everyone. Not good, so don’t do this in production or on a box that matters. Also, this doesn’t work with a restrictive firewall. Since RMI port is chosen randomly, you must have a rather loose firewall policy. There are ways around it, with ssh tunneling I believe, but this post won’t cover it as this point, I might do it at some point later.

First you need a policy file. Again this can be fine tuned, the example below shows a dangerously loose one…

grant {
  permission java.security.AllPermission;
};

Place this file in some directory. In my example it’s sitting in my home dir and is named .java.policy

java -Dcom.sun.management.jmxremote \
  -Dcom.sun.management.jmxremote.port=9001 \
  -Dcom.sun.management.jmxremote.authenticate=false \
  -Dcom.sun.management.jmxremote.ssl=false \
  -Djava.security.policy=.java.policy \
  -Dcom.sun.management.jmxremote.local.only=false \
  -Djava.rmi.server.hostname=your.public.hostname.com \
  -jar test.jar Runner

This starts the app and an jndi service listening on port 9001.

In jvisualvm, you now can connect to your.public.hostname.com:9001. You can tune your parameters as needed, but two are crucial in my experience: com.sun.management.jmxremote.local.only and java.rmi.server.hostname. I had to specify these in order to make things work. Your mileage may vary.


30
Aug 11

Netcap TCP/IP sniffing (proxy)

I find myself lately doing a lot of raw TCP/IP stuff and had a need today to look at a protocol. I know there is Wireshark and similar utilities, but I needed to do something on a remote server, by creating a reverse proxy in order to learn the ins and outs of a custom protocol.

The below command creates a proxy and writes the protocol to files…

mknode /tmp/backpipe p

nc -l 61610 0</tmp/backpipe | \
tee -a /tmp/inflow | \
nc localhost 61611 | \
tee -a /tmp/outflow 1>/tmp/backpipe

In this case the server listens to port 61610 and then forwards the incoming packets to localhost:61611. You should modify the ports and forward host/port to what ever suits your need. Now, if you point the connection of any device to your server’s 61610 port, you can tail /tmp/inflow and /tmp/outflow to see the protocol communications, you can tail both together with…

tail -f /tmp/inflow /tmp/outflow

If you are on Mac OS X, in order to create a fifo file, you should replace the mknode command with…

mkfifo /tmp/backpipe

10
Feb 11

On startups and innovation

Throughout my career I’ve worked at numerous organizations, large and small. Last 5 years I’ve spent doing startups, the previous years were spent at large companies. I prefer small organizations, specifically startup environments, for reasons I’ll discuss later. There is something that can be learned, good and bad, from both types of organizations. I’ll start with some positives.

Startups

I’m going to use the word “startup” to not only refer to companies that are just launching or haven’t been around for a while. In my opinion startup is now synonymous with a company that operates like one. They are always in launch mode, always looking for new adventures and opportunities, and always innovating.

Not all small companies are startups. Many small companies have been around long enough to amass some or many startup antipatterns, which is in most cases rather detrimental to its growth. Large organizations can bear some of these antipatterns for years, as they lurke and hide behind large bureaucracies and/or residual revenues. Large organizations also have different priorities.

Startups are the innovation backbone in this country and others. Their beginnings usually involve individuals who have knowledge and experience in a particular area and/or they see a need to fill a gap. Sometimes it’s not a gap, in terms of a complete void, rather it’s a gap in terms of doing something better. These individuals are motivated by their idea and the vision of making a world (as they see it) a better place, glory, recognition, etc… There is also monetary motivation, but the product or service motivation is usually higher (at least in the ones that succeed). Also, there is drive and the drive has to be strong. I mean, in lots of cases, these individuals give up good paying jobs and benefits, put their quality of life and possibly the quality of life of others in jeopardy, without any guarantees, all for this dream. A lot of these folks have worked at other companies and they’ve decided that they can form a successful team and eventually bear the fruits of their labor (again not only in monetary compensation). Wow, that’s inspirational and also… “scary”.

During the initial stages of the startup, there is no time for bullshit. The bureaucracy and overhead that plagues established companies has to go. They have to get shit done, they have to do it good, and they have to do it fast. They don’t want to rush to sacrifice quality, since this is the reason they formed this alliance, to create a “quality” product, better than their competitors. This is also the time when creativity is at its highest. How else can you stay in business when you’re competing with a company that has 10 times as many employees, has been adding features for years, has an established brand, and more money.

Now the more technical side (I’m mostly writing about tech startups here, as this is what I’m more familiar with).

There are many things to do, design, architecture, programming, testing, deployment, etc… Choices need to be made and these choices have different priorities than in large companies. Stuff has to be done fast, so there is no time for the “Enterprise” stuff. Whatever that word at one point signified, it’s now become synonymous with “clueless pointy hairy bosses”. There is not time for commercial product evaluation, negotiations, and life cycles. Open Source Software usually rules in successful startups. It allows them to benefit from quality code with the ability to resolve bugs, add features and augment the software as they wish. They often built software using open source platforms and frameworks and themselves greatly contribute to open source. Many times the tools that they built, that aren’t specific to their business and don’t contain any intellectual property, are released to benefit the community. Now others can use that as a tool to benefit in their endeavor. The cycle continues.

I’m going to refrain from discussing specific examples of what I’d consider sensible choices. Every business and motivation has different priorities and in many cases what I would consider a suboptimal choice, would actually be the best choice for a task/dilemma at hand. One thing’s for sure though, open source software rules and beyond the brilliant people that make things happen, is a big contributor to the success of these companies.

Because people are smart, they have to get things done, and they put their blood and sweat into it, they take their time to create fun and productive environments. I mean, who wants to work a lot and not have fun doing it? Or who wants to give up the security of a full time job to fail?

Large companies

So I actually won’t talk about large companies here. I use the name mostly to refer to practices that are prevalent in larger organizations. I’m mostly going to look at the shift from an efficient, fun startup, to a bureaucratic mind draining organization that has lost its ways.

This happens all the time. The talent looses its interest and usually scatters to other companies or to start their own. The company is left with a product that’s aging and a leadership team that’s focused on maximizing the profit from the product it has. Nothing wrong with maximizing the profit, but not at the cost of stagnation. The talent drain is prevailing and the leadership for the lack of better judgment blames this on the lack of a process. “We’ve lost our ways, we’re not productive any more. We need to put a process in place that will get us going.” they say. I’m not going to judge these folks at this point, I mean their core competency isn’t innovation, it’s stability. The two are polar opposites.

Then havoc wreaks. Process after process is established; Feature after feature thoughtlessly gets added to the product; Documentation and processes take priority to common sense and productivity; Code base grows unmanageable; More time is spent talking about doing, than doing. Sometimes more and more people get hired, with less and less qualifications or for qualifications that aren’t critical to success. Sounds familiar?

This happens all the time. The bottom line is that most innovators don’t spend their lifetime chasing an idea (especially these days). They innovate and then when they don’t see any more room for innovation and fun, they leave. Yes, they have what some would call ADHD when it comes to stability and work ethics. They probably do, but in a good form, they are chasing their dreams and refuse to waste time on something that won’t make them happy.

This is not to say that innovators are better than “stabilizers” (I just made that one up). There is room for both, but stabilizers need to understand how to maintain the level of competitiveness and innovative spirit in the organizations. They should recruit and motivate the brightest, not the workaholics per say, not the conformists, but free spirited minds. These are the folks that make shit happen. These folks of course have to be kept in check by the stabilizers, but not by getting in their way. Actually, the total opposite, by getting out of their way. Give them room to do what they do best.

Conclusion

I don’t think it’s ever too late for a company to change its ways. Get back to a startup environment. Large organizations might have a hard time doing this. They have too many people and they can’t just start over. Small companies are great for this. They can start afresh. They can retain talent, pay them well, make them happy, and most of all, I’ll say it again, get out of their way. If you hired a development team to create a product, let them do it. The more shackles you put on them, the less likely they’ll succeed and the more likely they’ll leave ASAP. Intellectuals are always in demand and it really bothers me to see companies treat such employees as sweat shop workers. We pay you, you do as you’re told. That’s a recipe for disaster.

If your developers are more productive with language x and platform y, unless their choice is completely ridiculous, let them use it. They’ll thank you later, with a better quality product, more productivity and generally a more positive attitude.