
I’ve been programming on and off in Go for a little while now, and I came across a gem that makes it possible to use channels and threaded functions for sequential processing. The gem is called “agent” which was originally written Ilya Grigorik. It provides a number of extensions to the Kernel such as go! and channel!.
Channels aren’t really a new idea, they came out of a research paper1 called Communicating Sequential Processes. They became really popular when the Go programming language came out in which channels are a core construct of the language and really the way most of your work is done.
Channels are able to hold units of data, called messages which allow other relevant parties to listen on. A channel has two “ports” one for pushing data into and one for pulling data out of. They are also atomic, so there is no need to worry about a read or write resulting in something inconsistent, such as getting the same message twice.
In agent a channel can optionally be buffered. Using a buffered channel allows you to write a message to it without having to worry about immediately blocking. Otherwise a channel has a buffer size of zero, which means that writing to the channel blocks until someone decides to read from it. Knowing how your channels work is extremely important, which will be covered in “Ugh Deadlock“.
require 'agent'
# Create an unbuffered channel
unbuff = channel!(String)
# Create a buffered channel
buff = channel!(String, 1)
# Write to the channels
buff << "Hello Unblocking World"
# Execution blocks until someone reads from it
unbuff << "Hello Blocking World"
In the Go programming language a goroutine is “the execution of a function call as an independent concurrent thread of control, […], within the same address space”. You can think of them as threads but don’t need to worry too much about the details. Also if the function provided is a lambda, you get the benefits of closures.
In Ruby the benefits are very much the same, we are passing in a block of code to the go! method call. This method takes care of all the gross bits of threading for us, though if you grab onto the result of the go! call you’ll see that we aren’t working with any magic language construct, just a thread.
goroutine = go! { sleep(5); }
puts goroutine.inspect
# #<Thread:0x007fbf66ed5508 sleep>
In practice though, you don’t need to worry about this. You’ll invoke your goroutine and carry on your merry way.
Goroutines and channels become very powerful because they allow us to communicate asynchronously. There are lots of ways to handle work that might be slow, such as jobs that create other jobs in a system like Sidekiq. While this approach works, it results in systems that are difficult to understand because of the separation.
Instead we can use channels as a tool to let other aspects of our pipeline know when they can start the next stage of processing. Since the channel becomes the method of communication, we can write our code in a way that makes it look sequential even though when we actually execute it, it will be running asynchronously. This takes a huge load off of our brains, which get really fuzzy when dealing with asynchronicity.
Let’s say we are building a system that makes strawberry rhubarb pies. If we were to make a pie on our own it would look something like this:
For a family friendly baking day, doing this synchronously is no big deal. You have a good time, make some pie and all is fabulous. But if we are looking to crank out pies as quickly as possible, this just won’t do. We need to split this up into a bunch of separate pieces. Looking at the process we can make a few optimizations:
PreparedTin = Struct.new(:bottom, :top)
MixedFruits = Struct.new(:mixture)
RawPie = Struct.new(:bottom, :mixture, :top)
class Pie
def initialize(slices=6)
@slices = slices
end
def take_slice
if slices?
@slices -= 1
"slice o pie"
end
end
def slices?
@slices > 0
end
end
class Oven
def initialize(chan)
@completed = chan
end
def bake(pie)
go! do
sleep(10)
@completed << Pie.new
end
end
end
prepared_tins = channel!(PreparedTin)
mixed_fruits = channel!(MixedFruits)
completed_pies = channel!(Pie)
oven = Oven.new(completed_pies)
# Prepare our Pie Tins
go! do
loop do
dough = mix_flour_and_shortening
dough = roll_out_dough(dough)
top, bottom = cut_dough(dough)
prepared_tins << PreparedTin(top, bottom)
end
end
# Prepare our Mixed Fruits
go! do
loop do
strawberries = clean(Strawberry.new)
rhubarb = clean(Rhubarb.new)
sliced_berries = quarter(strawberries)
cubed_rhubarb = cube(rhubarb)
sugar = measure(Sugar.new)
mixture = mix(sliced_berries, cubed_rhubarb, sugar)
mixed_fruits << MixedFruits.new(mixture)
end
end
# Bake our pies
go! do
loop do
# Receiving from an agent channel returns a pair
# so if we were to receive and not call first
# we'd get something like [#PreparedTin, true]
#
tin = prepared_tins.receive.first
fruits = mixed_fruits.receive.first
raw_pie = RawPie.new(tin.bottom, fruits.mixtures, tin.top)
oven.bake(raw_pie)
end
end
# Eat our pies
go! do
pie = nil
loop do
pie ||= completed_pies.receive.first
if pie.slices?
puts "Om nom nom a #{pie.take_slice}"
else
# Throw out the tin, this pie is dead to us
pie = nil
end
end
end
So here we’ve gone and created 4 workers. One prepares the pie tins, another prepares our pies contents, another who puts the pies together and bakes them and finally, someone to consume all those pies. The code can be read from top to bottom, and we really don’t need to worry about concurrency all that much.
We need to be very careful when working with channels because we run the risk of deadlocking our system. Luckily the library has built in protections that will raise exceptions when a deadlock is detected.
f = channel!(String)
go! { f << "Hello World" }
f.receive # ["Hello World", true]
f.receive # Raises Exception
# #<Class:0x007fefc38dc768>: No live threads left. Deadlock?
Sometimes these can catch us off guard but do not fret, there are plenty of ways to get around this. We can use select! which is a based off a similar function for the Go language. select acts very similar to a case statement, though the big difference is that they are used for communication operations and this allows us to conditionally act on messages sent to different channels.
In agent we are also able to use a timeout which allows our select! to stop blocking on channels. For example, we can use select! with a timeout that will ensure we don’t deadlock.
select! do |s|
s.case(f, :receive) { |result| puts result }
s.timeout(0.1) { puts "Exceeded 100ms timeout!" }
end
In the Go programming language you can put whatever you want on a channel. Because the system is statically typed it knows how big each message is going to be. In agent this isn’t the case and there appears to be some marshalling going on under the covers. I came across this problem while trying to send objects that wrapped Nokogiri on some channels.
require 'agent'
require 'nokogiri'
require 'rest-client'
pages = ["http://github.com",
"http://bitbucket.org",
"http://sourceforge.net"]
class Result
def initialize(data)
@doc = Nokogiri::HTML(data)
end
def title
@doc.css('title').first
end
end
results = channel!(Result, 3)
pages.each do |page|
response = RestClient.get(page)
results << Result.new(response.body) # Raises error
# agent/push.rb:8:in `dump': no _dump_data is defined for class Nokogiri::HTML::Document (TypeError)
end
processing = true
while processing do
select! do |s|
s.case(results, :receive) { |r| puts r }
s.timeout(5) { processing = false }
end
end
puts "done"
After looking around a bit I figured that it was just the objects cannot be marshalled. I was caught off guard by the issue, but I was able to come up with a work around by simply using a Hash instead. I haven’t tried doing anything with value objects yet, but I don’t see why those would cause any problems either.
The agent library was an excellent way to approach the asynchronous programming problem in Ruby. It’s got some rough edges that might cut you, but with the right understanding of the limitations you can easily work around them. Other than that it’s a fabulous way to learn about this alternative approach to programming. I’d even go as far to say this could be a gateway into the Go programming language since it helps you establish the basics of using goroutines and channels to communicate.
Thanks for the introduction, very interesting! Might indeed be the best way to get accustomed to the Go way of doing things when coming from Ruby.