Asynchronous Programming with Goroutines in Ruby

pipeline

I’ve been programming on and off in Go for a little while now, and I came across a gem that makes it possible to use channels and threaded functions for sequential processing. The gem is called “agent” which was originally written Ilya Grigorik. It provides a number of extensions to the Kernel such as go! and channel!.

Overview

What is a channel?

Channels aren’t really a new idea, they came out of a research paper1 called Communicating Sequential Processes. They became really popular when the Go programming language came out in which channels are a core construct of the language and really the way most of your work is done.

Channels are able to hold units of data, called messages which allow other relevant parties to listen on. A channel has two “ports” one for pushing data into and one for pulling data out of. They are also atomic, so there is no need to worry about a read or write resulting in something inconsistent, such as getting the same message twice.

In agent a channel can optionally be buffered. Using a buffered channel allows you to write a message to it without having to worry about immediately blocking. Otherwise a channel has a buffer size of zero, which means that writing to the channel blocks until someone decides to read from it. Knowing how your channels work is extremely important, which will be covered in “Ugh Deadlock“.

require 'agent'

# Create an unbuffered channel
unbuff = channel!(String)

# Create a buffered channel
buff = channel!(String, 1)

# Write to the channels
buff << "Hello Unblocking World"
# Execution blocks until someone reads from it
unbuff << "Hello Blocking World"

What does a goroutine do?

In the Go programming language a goroutine is “the execution of a function call as an independent concurrent thread of control, […], within the same address space”. You can think of them as threads but don’t need to worry too much about the details. Also if the function provided is a lambda, you get the benefits of closures.

In Ruby the benefits are very much the same, we are passing in a block of code to the go! method call. This method takes care of all the gross bits of threading for us, though if you grab onto the result of the go! call you’ll see that we aren’t working with any magic language construct, just a thread.

goroutine = go! { sleep(5); }
puts goroutine.inspect
# #<Thread:0x007fbf66ed5508 sleep>

In practice though, you don’t need to worry about this. You’ll invoke your goroutine and carry on your merry way.

How can the two be used together?

Goroutines and channels become very powerful because they allow us to communicate asynchronously. There are lots of ways to handle work that might be slow, such as jobs that create other jobs in a system like Sidekiq. While this approach works, it results in systems that are difficult to understand because of the separation.

Instead we can use channels as a tool to let other aspects of our pipeline know when they can start the next stage of processing. Since the channel becomes the method of communication, we can write our code in a way that makes it look sequential even though when we actually execute it, it will be running asynchronously. This takes a huge load off of our brains, which get really fuzzy when dealing with asynchronicity.

Consuming a channel

Let’s say we are building a system that makes strawberry rhubarb pies. If we were to make a pie on our own it would look something like this:

  1. Cut together flour and shortening to make the dough
  2. Clean strawberries and rhubarb removing all dirt
  3. Quarter strawberries and chop rhubarb into cubes
  4. Roll out dough and place enough to cover the bottom of your pie tin. Trim the edges
  5. Measure out 2 cups of strawberries, 2 cups of rhubarb and 1 cup of white sugar
  6. Mix measured strawberries, rhubarb and sugar together until even
  7. Pour mixture on top of dough
  8. Cover pie with other piece of dough and make some cuts to let steam out
  9. Place in oven

For a family friendly baking day, doing this synchronously is no big deal. You have a good time, make some pie and all is fabulous. But if we are looking to crank out pies as quickly as possible, this just won’t do. We need to split this up into a bunch of separate pieces. Looking at the process we can make a few optimizations:

  1. A person makes the dough, and prepares the tins as well as the pie cover
  2. A person cleans, cuts and mixes the fruits into the required portions. Let’s say they place them in bowls that someone can just grab.
  3. A person that prepares the pies and places them in the ovens. For simplicity we are going to assume that we have unlimited space in our ovens.
PreparedTin = Struct.new(:bottom, :top)
MixedFruits = Struct.new(:mixture)
RawPie      = Struct.new(:bottom, :mixture, :top)

class Pie
  def initialize(slices=6)
    @slices = slices
  end

  def take_slice
    if slices?
      @slices -= 1
      "slice o pie"
    end
  end

  def slices?
    @slices > 0
  end
end

class Oven
  def initialize(chan)
    @completed = chan
  end

  def bake(pie)
    go! do
      sleep(10)
      @completed << Pie.new
    end
  end
end

prepared_tins  = channel!(PreparedTin)
mixed_fruits   = channel!(MixedFruits)
completed_pies = channel!(Pie)
oven           = Oven.new(completed_pies)

# Prepare our Pie Tins
go! do
  loop do
    dough = mix_flour_and_shortening
    dough = roll_out_dough(dough)
    top, bottom = cut_dough(dough)
    prepared_tins << PreparedTin(top, bottom)
  end
end

# Prepare our Mixed Fruits
go! do
  loop do
    strawberries   = clean(Strawberry.new)
    rhubarb        = clean(Rhubarb.new)
    sliced_berries = quarter(strawberries)
    cubed_rhubarb  = cube(rhubarb)
    sugar = measure(Sugar.new)
    mixture = mix(sliced_berries, cubed_rhubarb, sugar)
    mixed_fruits << MixedFruits.new(mixture)
  end
end

# Bake our pies
go! do
  loop do
    # Receiving from an agent channel returns a pair
    # so if we were to receive and not call first
    # we'd get something like [#PreparedTin, true]
    #
    tin = prepared_tins.receive.first
    fruits = mixed_fruits.receive.first
    raw_pie = RawPie.new(tin.bottom, fruits.mixtures, tin.top)
    oven.bake(raw_pie)
  end
end

# Eat our pies
go! do
  pie = nil
  loop do
    pie ||= completed_pies.receive.first
    if pie.slices?
      puts "Om nom nom a #{pie.take_slice}"
    else
      # Throw out the tin, this pie is dead to us
      pie = nil
    end
  end
end

So here we’ve gone and created 4 workers. One prepares the pie tins, another prepares our pies contents, another who puts the pies together and bakes them and finally, someone to consume all those pies. The code can be read from top to bottom, and we really don’t need to worry about concurrency all that much.

Ugh Deadlock

We need to be very careful when working with channels because we run the risk of deadlocking our system. Luckily the library has built in protections that will raise exceptions when a deadlock is detected.

f = channel!(String)
go! { f << "Hello World" }
f.receive # ["Hello World", true]
f.receive # Raises Exception
# #<Class:0x007fefc38dc768>: No live threads left. Deadlock?

Sometimes these can catch us off guard but do not fret, there are plenty of ways to get around this. We can use select! which is a based off a similar function for the Go language. select acts very similar to a case statement, though the big difference is that they are used for communication operations and this allows us to conditionally act on messages sent to different channels.

In agent we are also able to use a timeout which allows our select! to stop blocking on channels. For example, we can use select! with a timeout that will ensure we don’t deadlock.

select! do |s|
  s.case(f, :receive) { |result| puts result }
  s.timeout(0.1) { puts "Exceeded 100ms timeout!" }
end

Caveats

In the Go programming language you can put whatever you want on a channel. Because the system is statically typed it knows how big each message is going to be. In agent this isn’t the case and there appears to be some marshalling going on under the covers. I came across this problem while trying to send objects that wrapped Nokogiri on some channels.

require 'agent'
require 'nokogiri'
require 'rest-client'

pages = ["http://github.com",
    "http://bitbucket.org",
    "http://sourceforge.net"]

class Result
  def initialize(data)
    @doc = Nokogiri::HTML(data)
  end

  def title
    @doc.css('title').first
  end
end

results = channel!(Result, 3)

pages.each do |page|
  response = RestClient.get(page)
  results << Result.new(response.body) # Raises error
  # agent/push.rb:8:in `dump': no _dump_data is defined for class Nokogiri::HTML::Document (TypeError)
end

processing = true
while processing do
  select! do |s|
    s.case(results, :receive) { |r| puts r }
    s.timeout(5) { processing = false }
  end
end
puts "done"

After looking around a bit I figured that it was just the objects cannot be marshalled. I was caught off guard by the issue, but I was able to come up with a work around by simply using a Hash instead. I haven’t tried doing anything with value objects yet, but I don’t see why those would cause any problems either.

In Conclusion

The agent library was an excellent way to approach the asynchronous programming problem in Ruby. It’s got some rough edges that might cut you, but with the right understanding of the limitations you can easily work around them. Other than that it’s a fabulous way to learn about this alternative approach to programming. I’d even go as far to say this could be a gateway into the Go programming language since it helps you establish the basics of using goroutines and channels to communicate.


2 Responses
  • Manuel Meurer

    Thanks for the introduction, very interesting! Might indeed be the best way to get accustomed to the Go way of doing things when coming from Ruby.

Sorry, but commenting has been disabled.