Commit f7e41c63 authored by John E. Vincent's avatar John E. Vincent

I'm pretty happy with this watcher implementation

parent f15ebc40
......@@ -13,32 +13,24 @@ LOGGER = Logger.new(STDOUT)
class EventMachine::NoahAgent
include EM::Deferrable
@@watchers = Noah::Watchers.all
@@watchers = Noah::Watcher.watch_list
def initialize
@logger = LOGGER
@logger.debug("Initializing with #{@@watchers.size} registered watches")
# Some logic to spawn protocol specific agents
# i.e. http, amqp, xmpp, redis, whatever
if EventMachine.reactor_running?
@http_worker = EM.spawn {|msg, ep|
@worker = EM.spawn {|event, message, watch_list|
logger = LOGGER
logger.debug("spawning http processor")
logger.info("got ep on http worker: #{ep}")
logger.info("got msg on http worker: #{msg}")
}
@redis_worker = EM.spawn {|msg|
logger = LOGGER
logger.debug("spawning redis worker")
logger.info("got msg on redis worker: #{msg}")
}
@log_worker = EM.spawn {|msg|
logger = LOGGER
logger.debug("spawning logger worker: #{msg}")
}
@amqp_worker = EM.spawn {|msg|
logger = LOGGER
logger.debug("spawning amqp worker: #{msg}")
logger.debug("Worker initiated")
logger.info("got event on http worker: #{event}")
logger.info("got message on http worker: #{message}")
matches = watch_list.find_all{|w| event =~ /^#{Base64.decode64(w)}/}
logger.debug("Found #{matches.size} matches for #{event}")
EM::Iterator.new(matches).each do |watch, iter|
p, ep = Base64.decode64(watch).split("|")
logger.info("Sending message to: #{ep} for pattern: #{p}")
iter.next
end
}
else
logger.fatal("Must be inside a reactor!")
......@@ -52,18 +44,18 @@ class EventMachine::NoahAgent
def reread_watchers
@logger.debug("Found new watches")
@logger.debug("Current watch count: #{@@watchers.size}")
@@watchers = Noah::Watchers.all
@@watchers = Noah::Watcher.watch_list
@logger.debug("New watch count: #{@@watchers.size}")
@logger.debug(@@watchers)
#@logger.debug(@@watchers)
end
def broker(msg)
# This is just for testing for now
@logger.info(msg.class)
@logger.warn(msg)
e,m = msg.split("\t")
@http_worker.notify m, e
e,m = msg.split("|")
be = Base64.encode64(e).gsub("\n","")
@logger.info("Encoded event: #{be}")
@worker.notify e, m, @@watchers.clone
end
end
......@@ -78,11 +70,12 @@ EventMachine.run do
r.psubscribe("//noah/*")
r.on(:pmessage) do |pattern, event, message|
noah.reread_watchers if event =~ /^\/\/noah\/watcher\/.*/
master_channel.push "#{event}\t#{message}"
master_channel.push "#{event}|#{message}"
logger.debug("Publishing [#{event}]")
end
sub = master_channel.subscribe {|msg|
# We short circuit if we have no watchers
noah.broker(msg) unless noah.watchers == 0
}
end
......@@ -69,7 +69,7 @@ module Noah
self.name.nil? ? name=@deleted_name : name=self.name
# Pulling out dbnum for now. Need to rethink it
#pub_category = "#{db}:noah.#{self.class.to_s}[#{name}].#{meth}"
pub_category = "#{self.patternize_me}.#{meth}"
pub_category = "#{self.patternize_me}"
Ohm.redis.publish(pub_category, self.to_hash.merge({"action" => meth, "pubcategory" => pub_category}).to_json)
# The following provides a post post-action hook. It allows a class to provide it's own handling after the fact
......
......@@ -21,7 +21,7 @@ module Noah
end
def name
@name = Digest::SHA1.hexdigest "#{endpoint}#{pattern}"
@name = Base64.encode64("#{pattern}|#{endpoint}").gsub("\n","")
end
def to_hash
......@@ -30,19 +30,10 @@ module Noah
end
def self.watch_list
hsh = Hash.new
watch_list = self.all.sort_by(:pattern)
watch_list.each do |watch|
p = watch.pattern.to_sym
e = watch.endpoint
if hsh.has_key?(p)
hsh[p].push(watch.endpoint)
else
hsh[p] = Array.new
hsh[p].push(watch.endpoint)
end
end
hsh
arr = []
watches = self.all.sort_by(:pattern)
watches.each {|w| arr << w.name}
arr
end
private
......
......@@ -9,6 +9,10 @@ module Noah
self.instance_of?(Noah::Watcher) ? (assert endpoint_overrides?, error) : (assert false, "Validation not applicable")
end
def assert_valid_watch(error = [:pattern, :invalid_format])
self.instance_of?(Noah::Watcher) ? (assert pattern_valid?, error) : (assert false, "Validation not applicable")
end
private
def endpoint_covered?
watches = Watcher.all.find(:endpoint => self.endpoint).sort
......@@ -32,5 +36,13 @@ module Noah
return false
end
def pattern_valid?
unless self.pattern.match(/^\/\/noah\/.*\/$/)
return false
end
rescue ArgumentError
return false
end
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment