#!/usr/bin/env ruby
require 'base64'
require 'socket'
require 'fileutils'
require 'securerandom'
# UnixSocketForker is an experiment of inter-process communication using
# plain unix sockets to communicate between forked processes and the
# parent process. This can also be done via IO.pipe. In this experiment,
# the jobs are simply random arrays whose sums are calculated in the forked
# worker processes.
class UnixSocketForker
UNIX_SOCKET_FILE = '/tmp/sock'
NUM_PROCESSES = 5
def initialize
@jobs = []
@job_mutex = Mutex.new
start_server
spawn_workers
end
def submit(job)
@job_mutex.synchronize do
@jobs << job
end
end
private
def spawn_workers
pids = NUM_PROCESSES.times.map do
fork do
socket = UNIXSocket.new(UNIX_SOCKET_FILE)
while job = socket.gets
ary = decode_job(job.chomp)
socket.puts("Process #{$$} executed sum: #{ary.inject(:+)}")
end
socket.close
end
end
at_exit { pids.each { |pid| Process.wait(pid) } }
end
def start_server
FileUtils.rm(UNIX_SOCKET_FILE) if File.exists?(UNIX_SOCKET_FILE)
@server = UNIXServer.new(UNIX_SOCKET_FILE)
@read_sockets = [@server]
@write_sockets = []
Thread.new do
loop do
readables, writeables, _ = IO.select(@read_sockets, @write_sockets)
handle_readables(readables)
handle_writeables(writeables)
end
end
end
def handle_readables(sockets)
sockets.each do |socket|
if socket == @server
conn = socket.accept
@read_sockets << conn
@write_sockets << conn
else
puts socket.gets
end
end
end
def handle_writeables(sockets)
sockets.each do |socket|
@job_mutex.synchronize do
unless @jobs.empty?
socket.puts(encode_job(@jobs.shift))
end
end
end
end
def encode_job(job)
# remove silly newlines injected by Ruby's base64 library
Base64.encode64(Marshal.dump(job)).delete("\n")
end
def decode_job(job)
Marshal.load(Base64.decode64(job))
end
end
if $0 == __FILE__
# Create forker and submit jobs.
forker = UnixSocketForker.new
100.times do
job = (1..rand(1_000_000)).to_a
forker.submit(job)
end
end