A ideia do projeto é só incorporar coisas conforme a necessidade. No caso, estou precisando da algo melhor para paralelizar workers. Seguindo a sugestão do Mereghost, dei uma olhanda no ZeroMQ a fiquei agradevelmente impressionado.
Ao invés de um sistema "faz tudo" de mensageria, encontrei uma biblioteca que se propõe a ser apenas um socket on steroids para facilitar a comunicação inter-processos (e inter-threads). Para o que eu precisava, é perfeito (e rápido que dá medo)!
O maior problema é que precisava de algo que avisasse os workers que o trabalho acabou, mas que não os matasse no meio do processamento. Procurei aqui e ali, mas não achei um modelo que servisse. Mãos à massa e comecei a experimentar coisas, cheguei ao resultado aí embaixo.
A coisa toda ainda precisa de mais testes e acho que a ideia também carece de um certo amadurecimento, mas achei que ficou legal, e ainda por cima, funciona! :D
Esse é o cara que vai receber as mensagens. Ele verifica se a mensagem recebida é uma string "EOF", indicando que o sender já enviou tudo. Se sim, ele pára e avisa o sender que já terminou (usando outro canal).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rbczmq' | |
class Worker | |
def initialize(sender_port = 5555, commander_port = 5556) | |
@ctx = ZMQ::Context.new | |
@receiver = @ctx.socket(:PULL) | |
@receiver.connect("tcp://localhost:#{sender_port}") | |
@commander = @ctx.socket(:PUSH) | |
@commander.connect("tcp://localhost:#{commander_port}") | |
end | |
def work | |
@commander.send("START localhost #{Process.pid}") | |
loop do | |
msg = @receiver.recv_nonblock | |
next if msg.nil? | |
break if msg == 'EOF' | |
yield msg | |
end | |
ensure | |
@commander.send("FINISH localhost #{Process.pid}") | |
@receiver.close | |
@commander.close | |
@ctx.destroy | |
end | |
end | |
delay = ARGV[0].to_f | |
puts '=' * 20 | |
puts delay.inspect | |
w = Worker.new | |
counter = 0 | |
begin | |
w.work do |msg| | |
counter += 1 | |
sleep delay if delay > 0 | |
end | |
ensure | |
puts "Processed: #{counter}" | |
end |
Esse é o cara que envia as mensagens. Depois de mandar tudo ele envia uma string "EOF" para cada worker que disse que trabalharia com ele. Como não sabe quem é quem, ele manda um "EOF" e espera alguém responder, quando recebe a resposta ele desmarca esse worker da lista e manda outro "EOF". Assim continua até a lista de workers estar vazia.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rbczmq' | |
class WorkerCommander | |
def initialize(sender_port = 5555, commander_port = 5556) | |
@ctx = ZMQ::Context.new | |
@sender = @ctx.socket(:PUSH) | |
@sender.bind("tcp://*:#{sender_port}") | |
@commander = @ctx.socket(:PULL) | |
@commander.bind("tcp://*:#{commander_port}") | |
@workers = [] | |
if block_given? | |
begin | |
yield self | |
ensure | |
wait_workers_finish | |
end | |
end | |
end | |
def wait_workers_start(how_many = 1) | |
process(@commander.recv) while @workers.size < how_many | |
end | |
def send(msg) | |
@sender.send(msg) | |
process(@commander.recv_nonblock) | |
end | |
def wait_workers_finish | |
until @workers.empty? | |
@sender.send('EOF') | |
process(@commander.recv) | |
end | |
ensure | |
@sender.close | |
@commander.close | |
@ctx.destroy | |
end | |
private | |
def process(msg) | |
puts msg if msg | |
case msg | |
when /^START\s(.*)$/ | |
@workers << $1 | |
when /^FINISH\s(.*)$/ | |
@workers.delete($1) | |
end | |
end | |
end | |
message = 'x' * 5_000 | |
WorkerCommander.new do |c| | |
c.wait_workers_start(2) | |
5_000_000.times do |n| | |
c.send(message) | |
end | |
end |
Funciona bastante bem, a coisa só não é melhor porque o ZeroMQ manda um bolo da mensagens de cada vez, então, se por algum motivo um dos workers está mais lento que o resto, ele empaca todo mundo até terminar sua cota de mensagens.
Pelo que testei, quanto maior a mensagem, menor esse problema pois o "bolo" que ele manda contém menos mensagens. Mas, convenhamos, esse não é um grande problema.
O ponto mais grave é que se um worker morrer (ou for interrompido), as mensagens que estavam com ele vão pras cucuias.
Acredito que tem como contornar essas coisas, mas por enquanto, ainda não achei jeito :)
Nenhum comentário:
Postar um comentário