Commit cd29b035 authored by John E. Vincent's avatar John E. Vincent

breaking out to modular agents. need to DRY up

parent cb98e067
......@@ -12,6 +12,8 @@ begin
require 'eventmachine'
require 'em-http-request'
require 'noah'
require 'noah/agents/http_agent'
require 'noah/agents/dummy_agent'
require 'json'
rescue LoadError
puts HELP
......@@ -23,31 +25,16 @@ LOGGER = Logger.new(STDOUT)
class EventMachine::NoahAgent
include EM::Deferrable
Noah::Agents::HttpAgent.register
Noah::Agents::DummyAgent.register
@@watchers = Noah::Watcher.watch_list
@@agents = Noah::Watchers.agents
def initialize
@logger = LOGGER
@logger.debug("Initializing with #{@@watchers.size} registered watches")
@logger.debug("#{@@agents} agents registered")
if EventMachine.reactor_running?
@worker = EM.spawn {|event, message, watch_list|
logger = LOGGER
logger.info("Worker initiated")
logger.debug("got event on http worker: #{event}")
matches = watch_list.find_all{|w| event =~ /^#{Base64.decode64(w)}/}
logger.debug("Found #{matches.size} matches for #{event}")
EM::Iterator.new(matches).each do |watch, iter|
p, ep = Base64.decode64(watch).split("|")
logger.info("Sending message to: #{ep} for pattern: #{p}")
http = EM::HttpRequest.new(ep, :connection_timeout => 2, :inactivity_timeout => 4).post :body => message
http.callback {
logger.info("Message posted to #{ep} successfully")
}
http.errback {
logger.error("Something went wrong")
}
iter.next
end
}
self.succeed("Succeed callback")
else
logger.fatal("Must be inside a reactor!")
......@@ -58,19 +45,24 @@ class EventMachine::NoahAgent
@@watchers.size
end
def http_worker
@http_worker
end
def reread_watchers
@logger.debug("Found new watches")
@logger.debug("Current watch count: #{@@watchers.size}")
@@watchers = Noah::Watcher.watch_list
@logger.debug("New watch count: #{@@watchers.size}")
#@logger.debug(@@watchers)
end
def broker(msg)
# This is just for testing for now
e,m = msg.split("|")
be = Base64.encode64(e).gsub("\n","")
@worker.notify e, m, @@watchers.clone
EM::Iterator.new(@@agents).each do |agent, iter|
agent.send(:notify, e, m, @@watchers.clone)
iter.next
end
end
end
......@@ -93,7 +85,6 @@ EventMachine.run do
r.on(:pmessage) do |pattern, event, message|
noah.reread_watchers if event =~ /^\/\/noah\/watcher\/.*/
master_channel.push "#{event}|#{message}"
logger.debug("Saw[#{event}]")
end
sub = master_channel.subscribe {|msg|
......
module Noah::Agents
class DummyAgent
include EM::Deferrable
PREFIX = "dummy"
NAME = "dummy"
def self.register
Noah::Watchers.register_agent(self)
end
def self.notify(event, message, watch_list)
logger = LOGGER
logger.info("#{NAME}: Worker initiated")
logger.debug("#{NAME}: got event - #{event}")
matches = watch_list.find_all{|w| event =~ /^#{Base64.decode64(w)}/}
logger.debug("#{NAME}: Found #{matches.size} possible matches for #{event}")
EM::Iterator.new(matches).each do |watch, iter|
p, ep = Base64.decode64(watch).split("|")
if ep =~ /^#{PREFIX}/
logger.info("#{NAME}: Sending message to: #{ep} for pattern: #{p}")
logger.debug("#{NAME}: message received: #{message}")
end
iter.next
end
end
end
end
require 'logger'
module Noah::Agents
class HttpAgent
include EM::Deferrable
PREFIX = "http"
NAME = "http"
def self.register
Noah::Watchers.register_agent(self)
end
def self.notify(event, message, watch_list)
logger = LOGGER
logger.info("#{NAME}: Worker initiated")
logger.debug("#{NAME}: got event - #{event}")
matches = watch_list.find_all{|w| event =~ /^#{Base64.decode64(w)}/}
logger.debug("#{PREFIX}: Found #{matches.size} possible matches for #{event}")
EM::Iterator.new(matches).each do |watch, iter|
p, ep = Base64.decode64(watch).split("|")
if ep =~ /^#{PREFIX}/
logger.info("#{NAME}: Sending message to (#{ep}) for pattern (#{p})")
http = EM::HttpRequest.new(ep, :connection_timeout => 2, :inactivity_timeout => 4).post :body => message
http.callback {
logger.info("#{NAME}: Message posted to #{ep} successfully")
}
http.errback {
logger.error("#{NAME}: Something went wrong with #{ep}")
}
iter.next
end
end
end
end
end
......@@ -55,8 +55,17 @@ module Noah
end
class Watchers
@@agents = []
def self.all(options = {})
options.empty? ? Watcher.all.sort : Watcher.find(options).sort
end
def self.register_agent(agent_class)
@@agents << agent_class
end
def self.agents
@@agents
end
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment