Commit 55d07a58 authored by John E. Vincent's avatar John E. Vincent

refactor agents for easier plugin support

parent f1500c66
......@@ -28,17 +28,16 @@ EventMachine.run do
EM.error_handler do |e|
LOGGER.warn(e)
end
logger = LOGGER
trap("INT") { logger.debug("Shutting down. Watches will not be fired");EM.stop }
trap("INT") { LOGGER.debug("Shutting down. Watches will not be fired");EM.stop }
noah = Noah::Agent.new
noah.errback{|x| logger.error("Errback: #{x}")}
noah.callback{|y| logger.info("Callback: #{y}")}
noah.errback{|x| LOGGER.error("Errback: #{x}")}
noah.callback{|y| LOGGER.info("Callback: #{y}")}
# Passing messages...like a boss
#master_channel = EventMachine::Channel.new
r = EventMachine::Hiredis::Client.connect
r.errback{|x| logger.error("Unable to connect to redis: #{x}")}
logger.debug("Starting up")
r.errback{|x| LOGGER.error("Unable to connect to redis: #{x}")}
LOGGER.info("Attaching to Redis Pubsub")
r.psubscribe("*")
r.on(:pmessage) do |pattern, event, message|
noah.reread_watchers if event =~ /^\/\/noah\/watchers\/.*/
......
log :level => :all
rack "/home/jvincent/development/noah/examples/lb.ru" do
listen 3000
end
require 'sinatra'
require './sinatra-load-test-endpoint.rb'
app = NoahPostDemo
app.run
require 'sinatra/base'
class NoahPostDemo < Sinatra::Base
configure do
set :app_file, __FILE__
set :server, %w[thin Kirk]
set :logging, true
set :run, true
set :port, 3000
end
post '/*' do
x = request.body.read
end
end
......@@ -27,9 +27,10 @@ module Noah
@logger.debug("Initializing with #{@@watchers.size} registered watches")
@logger.debug("#{@@agents} agents registered")
if EventMachine.reactor_running?
self.succeed("Succeed callback")
#instantiate_agents!
@logger.info("Started up!")
else
logger.fatal("Must be inside a reactor!")
@logger.fatal("Must be inside a reactor!")
end
end
......@@ -37,12 +38,8 @@ module Noah
@@watchers.size
end
def http_worker
@http_worker
end
def reread_watchers
@logger.debug("Found new watches")
@logger.info("Found new watches")
@logger.debug("Current watch count: #{@@watchers.size}")
@@watchers = Noah::Watcher.watch_list
@logger.debug("New watch count: #{@@watchers.size}")
......@@ -50,15 +47,32 @@ module Noah
def broker(msg)
e,m = msg.split("|")
# Below isn't being used right now
#be = Base64.encode64(e).gsub("\n","")
EM::Iterator.new(@@agents, @@agents.size).each do |agent, iter|
agent.send(:notify, e, m, @@watchers.clone)
iter.next
#a = agent.to_s.gsub(/::/,'_').downcase
x = agent.send(:new)
begin
#self.instance_variable_get("@#{a}").send(:notify, e, m, @@watchers)
x.notify(e, m, @@watchers.clone)
iter.next
rescue Exception => e
@logger.error("#{agent.to_s} invocation failed with #{e.message}")
end
end
end
private
protected
def instantiate_agents!
@@agents.each do |agent|
# Convert Noah::Agents::HttpAgent to
# noah_agents_httpagent
a = agent.to_s.gsub(/::/,'_').downcase
@logger.debug("#{a}")
# Create instance variable of a
self.class.send :attr_accessor, a.to_sym
# Set the instance variable "a" to instance of agent
self.instance_variable_set(:"@#{a}", agent.send(:new))
end
end
def find_and_register_agents
candidates = []
Gem.source_index.find_all {|g| candidates << g[1].name if g[1].name =~ /^noah-agent-.*/}
......
module Noah::Agents
module Base
class <<self
PREFIX = "base"
NAME = "base-agent"
end
class Base
PREFIX = "base"
NAME = "base-agent"
DEFAULT_CONCURRENCY = 1
def self.included(base)
def self.inherited(base)
Noah::Watchers.register_agent(base)
base.send :include, EM::Deferrable
base.send :extend, AgentClassMethods
end
end
module AgentClassMethods
def logger
Noah::Log.logger.progname = self.name
Noah::Log.logger
end
def find_watched_patterns!(watchlist)
watched_patterns = []
watchlist.find_all do |w|
p, ep = Base64.decode64(w).split('|')
watched_patterns << "#{p}|#{ep}" if ep =~ /^#{self.const_get("PREFIX")}/
def notify(event, message, watch_list)
logger.info("#{self.class} worker initiated")
worklist = []
watch_list.select{|w| worklist << w[:endpoint] if (w[:endpoint] =~ /^#{self.class::PREFIX}/ && event =~ /^#{w[:pattern]}/) }
if worklist.size >= 1
logger.info("Dispatching message to #{worklist.size} #{self.class.to_s} endpoints")
EM::Iterator.new(worklist, self.class::DEFAULT_CONCURRENCY).each do |ep, iter|
work!(ep, message)
iter.next
end
logger.info("Dispatched message to #{worklist.size} #{self.class.to_s} endpoints")
else
logger.info("No work to do")
end
watched_patterns
rescue Exception => e
logger.fatal("Exectution of notify failed with #{e.message}")
end
def notify(event, message, watch_list)
logger.info("Worker Initiated")
logger.debug("got event - #{event}")
watched_patterns = find_watched_patterns!(watch_list)
matches = watched_patterns.find_all {|w| event =~ /^#{w}/}
logger.debug("Found #{matches.size} matches for #{event}")
self.callback!(matches, message)
def logger
Noah::Log.logger.progname = self.class.to_s
Noah::Log.logger
end
end
end
require File.join(File.dirname(__FILE__), 'base_agent')
module Noah::Agents
class DummyAgent
include Noah::Agents::Base
class DummyAgent < Base
PREFIX = "dummy://"
NAME = self.name
NAME = self.class.to_s
DEFAULT_CONCURRENCY = 10
def self.callback!(matches, message)
EM::Iterator.new(matches).each do |watch, iter|
p, ep = watch.split("|")
logger.info("Sending message to: #{ep} for pattern: #{p}")
logger.debug("message received: #{message}")
iter.next
end
def work!(ep, message)
logger.info("Sending message to: #{ep}")
logger.info("Dummy message received: #{message}")
end
end
......
require File.join(File.dirname(__FILE__), 'base_agent')
module Noah::Agents
class HttpAgent
include Noah::Agents::Base
class HttpAgent < Base
PREFIX = "http://"
NAME = self.name
NAME = self.class.to_s
DEFAULT_CONCURRENCY = 500
def self.callback!(matches, message)
EM::Iterator.new(matches, 100).each do |watch, iter|
p, ep = watch.split("|")
logger.info("Sending message to (#{ep}) for pattern (#{p})")
http = EM::HttpRequest.new(ep, :connection_timeout => 2, :inactivity_timeout => 4).post :body => message
def work!(ep, message)
logger.info("Sending message to (#{ep})")
http = EM::HttpRequest.new(ep, :connection_timeout => 2, :inactivity_timeout => 2).post :body => message
http.callback {
logger.info("Message posted to #{ep} successfully")
}
http.errback {
logger.error("Something went wrong with #{ep}")
}
iter.next
end
end
end
......
......@@ -37,7 +37,7 @@ module Noah
def watch_list
arr = []
watches = self.all.sort_by(:pattern)
watches.each {|w| arr << w.name}
watches.each {|w| arr << w.to_hash}
arr
end
end
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment