Commit e3fc2cc3 authored by John E. Vincent's avatar John E. Vincent

more watcher agent work

parent e0159253
......@@ -7,38 +7,60 @@ require 'em-hiredis'
require 'thin'
require 'noah'
LOGGER = Logger.new(STDOUT)
class EventMachine::NoahAgent
include EM::Deferrable
@@watchers = Noah::Watchers.all
def initialize
@logger = LOGGER
@logger.debug("Initializing with #{@@watchers.size} registered watches")
# Some logic to spawn protocol specific agents
# i.e. http, amqp, xmpp, redis, whatever
if EventMachine.reactor_running?
@http_channel = EventMachine::Channel.new
@redis_channel = EventMachine::Channel.new
@log_channel = EventMachine::Channel.new
else
log.fatal("Must be inside a reactor!")
end
end
def watchers
@@watchers.size
end
def reread_watchers
@logger.debug("Found new watches")
@logger.debug("Current watch count: #{@@watchers.size}")
@@watchers = Noah::Watchers.all
@logger.debug("New watch count: #{@@watchers.size}")
end
def process_message(msg)
# This is just for testing for now
@logger.info(msg)
end
end
EventMachine.run do
@logger = Logger.new(STDOUT)
logger = LOGGER
noah = EventMachine::NoahAgent.new
# Passing messages...like a boss
@channel = EventMachine::Channel.new
channel = EventMachine::Channel.new
r = EventMachine::Hiredis::Client.connect
logger.debug("Starting up")
r.psubscribe("//noah/*")
r.on(:pmessage) do |pattern, event, message|
@channel.push "[#{event}] [#{message}]"
@logger.debug("Publishing [#{event}]")
noah.reread_watchers if event =~ /^\/\/noah\/watcher\/.*/
channel.push "[#{event}][#{message}]"
logger.debug("Publishing [#{event}]")
end
sub = @channel.subscribe {|msg|
puts msg
sub = channel.subscribe {|msg|
noah.process_message(msg) unless noah.watchers == 0
}
# EventMachine::WebSocket.start(:host => "0.0.0.0", :port => 3001) do |ws|
# ws.onopen {
# sub = @channel.subscribe { |msg|
# ws.send msg
# }
#
# @channel.push "#{sub} connected and waiting...."
#
# ws.onmessage { |msg|
# @channel.push "<#{sub}>: #{msg}"
# }
#
# ws.onclose {
# @channel.unsubscribe(sub)
# }
# }
# end
end
......@@ -21,6 +21,11 @@ module Noah
@name = Digest::SHA1.hexdigest "#{endpoint}#{pattern}"
end
def to_hash
h = {:pattern => pattern, :endpoint => endpoint, :created_at => created_at, :updated_at => updated_at}
super.merge(h)
end
private
# Not sure about these next two.
# Could get around patterns changing due to namespace changes
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment