Commit fc74cc71 authored by John E. Vincent's avatar John E. Vincent

working through the agent process more

parent e3fc2cc3
......@@ -6,6 +6,7 @@ require 'optparse'
require 'em-hiredis'
require 'thin'
require 'noah'
require 'json'
LOGGER = Logger.new(STDOUT)
......@@ -20,11 +21,27 @@ class EventMachine::NoahAgent
# Some logic to spawn protocol specific agents
# i.e. http, amqp, xmpp, redis, whatever
if EventMachine.reactor_running?
@http_channel = EventMachine::Channel.new
@redis_channel = EventMachine::Channel.new
@log_channel = EventMachine::Channel.new
@http_worker = EM.spawn {|msg, ep|
logger = LOGGER
logger.debug("spawning http processor")
logger.info("got ep on http worker: #{ep}")
logger.info("got msg on http worker: #{msg}")
}
@redis_worker = EM.spawn {|msg|
logger = LOGGER
logger.debug("spawning redis worker")
logger.info("got msg on redis worker: #{msg}")
}
@log_worker = EM.spawn {|msg|
logger = LOGGER
logger.debug("spawning logger worker: #{msg}")
}
@amqp_worker = EM.spawn {|msg|
logger = LOGGER
logger.debug("spawning amqp worker: #{msg}")
}
else
log.fatal("Must be inside a reactor!")
logger.fatal("Must be inside a reactor!")
end
end
......@@ -37,11 +54,16 @@ class EventMachine::NoahAgent
@logger.debug("Current watch count: #{@@watchers.size}")
@@watchers = Noah::Watchers.all
@logger.debug("New watch count: #{@@watchers.size}")
@logger.debug(@@watchers)
end
def process_message(msg)
def broker(msg)
# This is just for testing for now
@logger.info(msg)
@logger.info(msg.class)
@logger.warn(msg)
e,m = msg.split("\t")
@http_worker.notify m, e
end
end
......@@ -49,18 +71,18 @@ EventMachine.run do
logger = LOGGER
noah = EventMachine::NoahAgent.new
# Passing messages...like a boss
channel = EventMachine::Channel.new
master_channel = EventMachine::Channel.new
r = EventMachine::Hiredis::Client.connect
logger.debug("Starting up")
r.psubscribe("//noah/*")
r.on(:pmessage) do |pattern, event, message|
noah.reread_watchers if event =~ /^\/\/noah\/watcher\/.*/
channel.push "[#{event}][#{message}]"
master_channel.push "#{event}\t#{message}"
logger.debug("Publishing [#{event}]")
end
sub = channel.subscribe {|msg|
noah.process_message(msg) unless noah.watchers == 0
sub = master_channel.subscribe {|msg|
noah.broker(msg) unless noah.watchers == 0
}
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment