Commit f8c435c1 authored by John E. Vincent's avatar John E. Vincent

fixing examples and renaming watcher.rb to custom_watcher.rb

parent 9553c5bd
......@@ -14,3 +14,4 @@ Gemfile.lock
doc
.yardoc/*
examples/log/*
examples/tmp/*
......@@ -31,9 +31,8 @@ class EventMachine::NoahAgent
if EventMachine.reactor_running?
@worker = EM.spawn {|event, message, watch_list|
logger = LOGGER
logger.debug("Worker initiated")
logger.info("got event on http worker: #{event}")
logger.info("got message on http worker: #{message}")
logger.info("Worker initiated")
logger.debug("got event on http worker: #{event}")
matches = watch_list.find_all{|w| event =~ /^#{Base64.decode64(w)}/}
logger.debug("Found #{matches.size} matches for #{event}")
EM::Iterator.new(matches).each do |watch, iter|
......@@ -41,16 +40,15 @@ class EventMachine::NoahAgent
logger.info("Sending message to: #{ep} for pattern: #{p}")
http = EM::HttpRequest.new(ep, :connection_timeout => 2, :inactivity_timeout => 4).post :body => message
http.callback {
LOGGER.debug("Message posted to #{ep} successfully")
#iter.next
logger.info("Message posted to #{ep} successfully")
}
http.errback {
LOGGER.debug("Something went wrong")
#iter.net
logger.error("Something went wrong")
}
iter.next
end
}
self.succeed("Succeed callback")
else
logger.fatal("Must be inside a reactor!")
end
......@@ -70,22 +68,26 @@ class EventMachine::NoahAgent
def broker(msg)
# This is just for testing for now
@logger.warn(msg)
e,m = msg.split("|")
be = Base64.encode64(e).gsub("\n","")
@logger.info("Encoded event: #{be}")
@worker.notify e, m, @@watchers.clone
end
end
EventMachine.run do
EM.error_handler do |e|
Logger.new(STDOUT).warn(e)
end
logger = LOGGER
trap("INT") { logger.debug("Shutting down. Watches will not be fired");EM.stop }
noah = EventMachine::NoahAgent.new
noah.errback{|x| logger.error("Errback: #{x}")}
noah.callback{|y| logger.info("Callback: #{y}")}
# Passing messages...like a boss
master_channel = EventMachine::Channel.new
r = EventMachine::Hiredis::Client.connect
r.errback{|x| logger.error("Unable to connect to redis: #{x}")}
logger.debug("Starting up")
r.psubscribe("//noah/*")
r.on(:pmessage) do |pattern, event, message|
......
#!/usr/bin/env ruby
require File.join(File.dirname(__FILE__), '..','lib','noah','watcher')
require File.join(File.dirname(__FILE__), '..','lib','noah','custom_watcher')
class StdoutWatcher < Noah::Watcher
class StdoutWatcher < Noah::CustomWatcher
redis_host "redis://127.0.0.1:6379/0"
pattern "//noah/configuration/redis_server"
pattern "//noah/configuration/redis"
destination Proc.new {|x| puts x}
run!
end
#!/usr/bin/env ruby
require File.join(File.dirname(__FILE__), '..','lib','noah','watcher')
require 'excon'
require File.join(File.dirname(__FILE__), '..','lib','noah','custom_watcher')
require 'em-http-request'
class HttpPostWatch < Noah::Watcher
class HttpPostWatch < Noah::CustomWatcher
redis_host "redis://127.0.0.1:6379/0"
pattern "//noah/configuration"
destination Proc.new {|x| ::Excon.post("http://localhost:4567/webhook", :headers => {"Content-Type" => "application/json"}, :body => x)}
destination Proc.new {|x| ::EM::HttpRequest.new('http://localhost:4567/webhook', :connection_timeout => 2, :inactivity_timeout => 4).post :body => x}
run!
end
#!/usr/bin/env ruby
require File.join(File.dirname(__FILE__), '..','lib','noah','watcher')
require File.join(File.dirname(__FILE__), '..','lib','noah','custom_watcher')
require 'logger'
class LoggingWatcher < Noah::Watcher
redis_host "redis://127.0.0.1:6379/5"
pattern "//noah/application"
class LoggingWatcher < Noah::CustomWatcher
redis_host "redis://127.0.0.1:6379/0"
pattern "//noah"
destination Proc.new {|x| log = Logger.new(STDOUT); log.debug(x)}
run!
end
#!/usr/bin/env ruby
require File.join(File.dirname(__FILE__), '..','lib','noah','watcher')
require 'excon'
require File.join(File.dirname(__FILE__), '..','lib','noah','custom_watcher')
require 'em-http-request'
class HttpPostWatch < Noah::Watcher
class HttpPostWatch < Noah::CustomWatcher
redis_host "redis://127.0.0.1:6379/0"
pattern "//noah/configuration/redis_server"
destination Proc.new {|x| puts x; ::Excon.put("http://localhost:4567/webhook", :headers => {"Content-Type" => "application/json"}, :body => x)}
destination Proc.new {|x| ::EM::HttpRequest.new('http://localhost:4567/webhook', :connection_timeout => 2, :inactivity_timeout => 4).post :body => x}
run!
end
#!/usr/bin/env ruby
require 'sinatra'
require 'ohm'
require 'open-uri'
......
......@@ -8,7 +8,7 @@
$(document).ready(function(){
function debug(str){ $("#debug").append("<p>" + str); };
ws = new WebSocket("ws://localhost:3001/");
ws = new WebSocket("ws://localhost:3009/");
ws.onmessage = function(evt) { $("#msg").append("<p>"+evt.data+"</p>"); };
ws.onclose = function() { debug("socket closed"); };
ws.onopen = function() {
......
......@@ -20,7 +20,7 @@ EventMachine.run do
@channel.push "(#{event}) #{message}"
end
EventMachine::WebSocket.start(:host => "0.0.0.0", :port => 3001) do |ws|
EventMachine::WebSocket.start(:host => "0.0.0.0", :port => 3009) do |ws|
ws.onopen {
sub = @channel.subscribe { |msg|
ws.send msg
......
......@@ -10,6 +10,7 @@ require 'haml'
require 'yaml'
require 'sinatra/base'
require File.join(File.dirname(__FILE__), 'noah', 'custom_watcher')
require File.join(File.dirname(__FILE__), 'noah','validations')
require File.join(File.dirname(__FILE__), 'noah','models')
require File.join(File.dirname(__FILE__), 'noah','app')
require 'eventmachine'
require 'uri'
require 'logger'
begin
require 'em-hiredis'
rescue LoadError
puts "Please install: em-hiredis"
end
@log = Logger.new(STDOUT)
@log.level = Logger::DEBUG
require File.join(File.dirname(__FILE__), 'passthrough')
require File.join(File.dirname(__FILE__), '..','vendor','em-hiredis','lib','em-hiredis')
module Noah
class Watcher
class CustomWatcher
extend Passthrough
passthrough :redis_host, :pattern, :destination, :run!, :run_watcher
......
......@@ -21,10 +21,9 @@ class Noah::App
put '/e/*' do
raise("Data too large") if request.body.size > 512
d = request.body.read || nil
e = Noah::Ephemeral.find_or_create(:path => "/#{params[:splat][0]}")
e.data = d
opts = {:path => "/#{params[:splat][0]}", :data => d}
e = Noah::Ephemeral.find_or_create(opts)
if e.valid?
e.save
action = e.is_new? ? "create" : "update"
r = {"action" => action, "result" => "success", "id" => e.id, "path" => e.path, "data" => e.data}
r.to_json
......
module Noah
class Ephemeral < Model #NYI
class Ephemeral < Model
attribute :path
attribute :data
......@@ -18,14 +18,16 @@ module Noah
end
def to_hash
h = {:path => path, :data => data, :created_at => created_at, :updated_at => :updated_at}
h = {:path => path, :data => data, :created_at => created_at, :updated_at => updated_at}
super.merge(h)
end
class << self
def find_or_create(opts = {})
begin
find(opts).first.nil? ? (eph = create(opts)) : (eph = find(opts).first)
path, data = opts[:path], opts[:data]
find(:path => path).first.nil? ? (eph = new(:path => path)) : (eph = find(:path => path).first)
eph.data = data
if eph.valid?
eph.save
end
......
source "http://rubygems.org"
# Specify your gem's dependencies in em-hiredis.gemspec
gemspec
Getting started
===============
Connect to redis
redis_client = EM::Hiredis.connect
The client is a deferrable which succeeds when the underlying connection is established so you can bind to this. This isn't necessary however - any commands sent before the connection is established (or while reconnecting) will be sent to redis on connect.
redis_client.callback { puts "Redis now connected" }
All redis commands are available without any remapping of names
redis.set('foo', 'bar').callback {
redis.get('foo').callback { |value|
p [:returned, value]
}
}
As a shortcut, if you're only interested in binding to the success case you can simply provide a block to any command
redis.get('foo') { |value|
p [:returned, value]
}
Handling failure
----------------
All commands return a deferrable. In the case that redis replies with an error (for example you called a hash operation against a set), or in the case that the redis connection is broken before the command returns, the deferrable will fail. If you care about the failure case you should bind to the errback - for example:
redis.sadd('aset', 'member').callback {
response_deferrable = redis.hget('aset', 'member')
response_deferrable.errback { |e|
p e # => #<RuntimeError: ERR Operation against a key holding the wrong kind of value>
}
}
Pubsub
------
This example should explain things. Once a redis connection is in a pubsub state, you must make sure you only send pubsub commands.
redis = EM::Hiredis::Client.connect
subscriber = EM::Hiredis::Client.connect
subscriber.subscribe('bar.0')
subscriber.psubscribe('bar.*')
subscriber.on(:message) { |channel, message|
p [:message, channel, message]
}
subscriber.on(:pmessage) { |key, channel, message|
p [:pmessage, key, channel, message]
}
EM.add_periodic_timer(1) {
redis.publish("bar.#{rand(2)}", "hello").errback { |e|
p [:publisherror, e]
}
}
require 'bundler'
Bundler::GemHelper.install_tasks
# -*- encoding: utf-8 -*-
$:.push File.expand_path("../lib", __FILE__)
require "em-hiredis/version"
Gem::Specification.new do |s|
s.name = "em-hiredis"
s.version = EM::Hiredis::VERSION
s.platform = Gem::Platform::RUBY
s.authors = ["Martyn Loughran"]
s.email = ["me@mloughran.com"]
s.homepage = "http://github.com/mloughran/em-hiredis"
s.summary = %q{Eventmachine redis client}
s.description = %q{Eventmachine redis client using hiredis native parser}
s.add_dependency 'hiredis', '~> 0.2.0'
s.rubyforge_project = "em-hiredis"
s.files = `git ls-files`.split("\n")
s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
s.require_paths = ["lib"]
end
require 'eventmachine'
module EM
module Hiredis
class << self
attr_writer :logger
def logger
@logger ||= begin
require 'logger'
log = Logger.new(STDOUT)
log.level = Logger::WARN
log
end
end
end
end
end
require 'em-hiredis/event_emitter'
require 'em-hiredis/connection'
require 'em-hiredis/client'
module EM::Hiredis
class Client
PUBSUB_MESSAGES = %w{message pmessage}.freeze
include EM::Hiredis::EventEmitter
include EM::Deferrable
def self.connect(host = 'localhost', port = 6379)
new(host, port)
end
def initialize(host, port)
@host, @port = host, port
@subs, @psubs = [], []
@defs = []
@connection = EM.connect(host, port, Connection, host, port)
@connection.on(:closed) {
if @connected
@defs.each { |d| d.fail("Redis disconnected") }
@defs = []
@deferred_status = nil
@connected = false
@reconnecting = true
reconnect
else
EM.add_timer(1) { reconnect }
end
}
@connection.on(:connected) {
@connected = true
select(@db) if @db
@subs.each { |s| method_missing(:subscribe, s) }
@psubs.each { |s| method_missing(:psubscribe, s) }
succeed
if @reconnecting
@reconnecting = false
emit(:reconnected)
end
}
@connection.on(:message) { |reply|
if RuntimeError === reply
raise "Replies out of sync: #{reply.inspect}" if @defs.empty?
deferred = @defs.shift
deferred.fail(reply) if deferred
else
if reply && PUBSUB_MESSAGES.include?(reply[0]) # reply can be nil
kind, subscription, d1, d2 = *reply
case kind.to_sym
when :message
emit(:message, subscription, d1)
when :pmessage
emit(:pmessage, subscription, d1, d2)
end
else
raise "Replies out of sync: #{reply.inspect}" if @defs.empty?
deferred = @defs.shift
deferred.succeed(reply) if deferred
end
end
}
@connected = false
@reconnecting = false
end
# Indicates that commands have been sent to redis but a reply has not yet
# been received.
#
# This can be useful for example to avoid stopping the
# eventmachine reactor while there are outstanding commands
#
def pending_commands?
@connected && @defs.size > 0
end
def subscribe(channel)
@subs << channel
method_missing(:subscribe, channel)
end
def unsubscribe(channel)
@subs.delete(channel)
method_missing(:unsubscribe, channel)
end
def psubscribe(channel)
@psubs << channel
method_missing(:psubscribe, channel)
end
def punsubscribe(channel)
@psubs.delete(channel)
method_missing(:punsubscribe, channel)
end
def select(db)
@db = db
method_missing(:select, db)
end
def method_missing(sym, *args)
deferred = EM::DefaultDeferrable.new
# Shortcut for defining the callback case with just a block
deferred.callback { |result| yield(result) } if block_given?
if @connected
@connection.send_command(sym, *args)
@defs.push(deferred)
else
callback {
@connection.send_command(sym, *args)
@defs.push(deferred)
}
end
return deferred
end
private
def reconnect
EM::Hiredis.logger.debug("Trying to reconnect to Redis")
@connection.reconnect @host, @port
end
end
end
require 'hiredis/reader'
module EM::Hiredis
class Connection < EM::Connection
include EM::Hiredis::EventEmitter
def initialize(host, port)
super
@host, @port = host, port
end
def connection_completed
EM::Hiredis.logger.info("Connected to Redis")
@reader = ::Hiredis::Reader.new
emit(:connected)
end
def receive_data(data)
@reader.feed(data)
until (reply = @reader.gets) == false
emit(:message, reply)
end
end
def unbind
EM::Hiredis.logger.info("Disconnected from Redis")
emit(:closed)
end
def send_command(sym, *args)
send_data(command(sym, *args))
end
protected
COMMAND_DELIMITER = "\r\n"
def command(*args)
command = []
command << "*#{args.size}"
args.each do |arg|
arg = arg.to_s
command << "$#{string_size arg}"
command << arg
end
command.join(COMMAND_DELIMITER) + COMMAND_DELIMITER
end
if "".respond_to?(:bytesize)
def string_size(string)
string.to_s.bytesize
end
else
def string_size(string)
string.to_s.size
end
end
end
end
module EM::Hiredis
module EventEmitter
def on(event, &listener)
_listeners[event] << listener
end
def emit(event, *args)
_listeners[event].each { |l| l.call(*args) }
end
def remove_listener(event, &listener)
_listeners[event].delete(listener)
end
def remove_all_listeners(event)
_listeners.delete(event)
end
def listeners(event)
_listeners[event]
end
private
def _listeners
@_listeners ||= Hash.new { |h,k| h[k] = [] }
end
end
end
module EM
module Hiredis
VERSION = "0.0.1"
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment