Commit f46e9723 authored by John E. Vincent's avatar John E. Vincent

adding watcher client library. moving models

parent eb532835
require File.join(File.dirname(__FILE__), 'configurations')
module Noah
class Application < Model
collection :configurations, Configuration
def to_hash
arr = []
configurations.sort.each {|c| arr << c.to_hash}
super.merge(:name => name, :created_at => created_at, :updated_at => updated_at, :configurations => arr)
end
class << self
def find_or_create(opts = {})
begin
find(opts).first.nil? ? (app = create(opts)) : (app = find(opts).first)
if app.valid?
app.save
end
app
rescue Exception => e
e.message
end
end
end
end
class Applications
def self.all(options = {})
options.empty? ? Application.all.sort : Application.find(options).sort
end
end
end
module Noah
class Configuration < Model
attribute :format
attribute :body
attribute :new_record
reference :application, Application
index :format
index :body
def validate
super
assert_present :format
assert_present :body
assert_present :application_id
assert_unique [:name, :application_id]
end
def to_hash
Application[application_id].nil? ? app_name=nil : app_name=Application[application_id].name
super.merge(:name => name, :format => format, :body => body, :created_at => created_at, :updated_at => updated_at, :application => app_name)
end
class << self
def find_or_create(opts={})
begin
if find(opts).first.nil?
conf = create(opts)
else
conf = find(opts).first
end
rescue Exception => e
e.message
end
end
end
end
class Configurations
def self.all(options = {})
options.empty? ? Configuration.all.sort : Configuration.find(options).sort
end
end
end
require File.join(File.dirname(__FILE__), 'services')
module Noah
class Host < Model
# Host model
# @return {Host} a {Host} object
attribute :status
collection :services, Service
index :status
def validate
super
assert_present :status
assert_unique :name
assert_member :status, ["up","down","pending"]
end
# @return [Hash] A hash representation of a {Host}
def to_hash
arr = []
services.sort.each {|s| arr << s.to_hash}
h = {:name => name, :status => status, :created_at => created_at, :updated_at => updated_at, :services => arr}
super.merge(h)
end
class << self
def find_or_create(opts = {})
begin
# exclude requested status from lookup
h = find(opts.reject{|key,value| key == :status}).first
host = h.nil? ? create(opts) : h
host.status = opts[:status]
if host.valid?
host.save
end
host
rescue Exception => e
e.message
end
end
end
end
class Hosts
# @param [Hash] optional filters for results
# @return [Array] Array of {Host} objects
def self.all(options = {})
options.empty? ? Noah::Host.all.sort : Noah::Host.find(options).sort
end
end
end
module Noah
class Service < Model
attribute :status
reference :host, Host
index :status
def validate
super
assert_present :status
assert_present :host_id
assert_unique [:name, :host_id]
assert_member :status, ["up", "down", "pending"]
end
def to_hash
Host[host_id].nil? ? host_name=nil : host_name=Host[host_id].name
super.merge(:name => name, :status => status, :updated_at => updated_at, :host => host_name)
end
class << self
def find_or_create(opts = {})
begin
# convert passed host object to host_id if passed
if opts.has_key?(:host)
opts.merge!({:host_id => opts[:host].id})
opts.reject!{|key, value| key == :host}
end
# exclude requested status from lookup
s = find(opts.reject{|key,value| key == :status}).first
service = s.nil? ? create(opts) : s
service.status = opts[:status]
if service.valid?
service.save
end
service
rescue Exception => e
e.message
end
end
end
end
class Services
def self.all(options = {})
options.empty? ? Service.all.sort : Service.find(options).sort
end
end
end
module Noah
class Watcher < Model #NYI
attribute :client
attribute :endpoint
attribute :event
attribute :action
index :client
index :event
def validate
assert_present :client, :endpoint, :event, :action
assert_unique [:client, :endpoint, :event, :action]
end
end
class Watchers
def self.all(options = {})
options.empty? ? Watcher.all.sort : Watcher.find(options).sort
end
end
end
module Noah
module Passthrough
def passthrough(*methods)
methods.each do |method|
raise ArgumentError if ! method.is_a?(Symbol)
meth = method.to_s
self.class_eval("def #{meth}(*args); self.class.#{meth}(*args); end")
end
end
end
end
require 'eventmachine'
require 'uri'
require File.join(File.dirname(__FILE__), 'passthrough')
require File.join(File.dirname(__FILE__), '..','vendor','em-hiredis','lib','em-hiredis')
module Noah
class Watcher
extend Passthrough
passthrough :redis_host, :pattern, :destination, :run!, :run_watcher
attr_accessor :my_pattern, :my_destination, :my_redis
def self.watch(&blk)
watcher = Noah::Watcher.new
watcher.instance_eval(&blk) if block_given?
watcher
end
def initialize
@my_redis ||= ENV['REDIS_URL']
@my_pattern ||= 'noah.*'
end
def self.redis_host(host)
@my_redis = host
end
def self.pattern(pattern)
@my_pattern = pattern
end
def self.destination(destination)
@my_destination = destination
end
def self.run!
@my_destination.nil? ? (raise ArgumentError) : run_watcher(@my_destination)
end
private
def self.run_watcher(dest)
redis_url = URI.parse(@my_redis)
db = redis_url.path.gsub(/\//,'')
EventMachine.run do
channel = EventMachine::Channel.new
r = EventMachine::Hiredis::Client.connect(redis_url.host, redis_url.port)
r.psubscribe("#{db}:#{@my_pattern}")
r.on(:pmessage) do |pattern, event, message|
channel.push "#{message}"
end
sub = channel.subscribe {|msg| dest.call(msg)}
end
end
end
end
source "http://rubygems.org"
# Specify your gem's dependencies in em-hiredis.gemspec
gemspec
Getting started
===============
Connect to redis
redis_client = EM::Hiredis.connect
The client is a deferrable which succeeds when the underlying connection is established so you can bind to this. This isn't necessary however - any commands sent before the connection is established (or while reconnecting) will be sent to redis on connect.
redis_client.callback { puts "Redis now connected" }
All redis commands are available without any remapping of names
redis.set('foo', 'bar').callback {
redis.get('foo').callback { |value|
p [:returned, value]
}
}
As a shortcut, if you're only interested in binding to the success case you can simply provide a block to any command
redis.get('foo') { |value|
p [:returned, value]
}
Handling failure
----------------
All commands return a deferrable. In the case that redis replies with an error (for example you called a hash operation against a set), or in the case that the redis connection is broken before the command returns, the deferrable will fail. If you care about the failure case you should bind to the errback - for example:
redis.sadd('aset', 'member').callback {
response_deferrable = redis.hget('aset', 'member')
response_deferrable.errback { |e|
p e # => #<RuntimeError: ERR Operation against a key holding the wrong kind of value>
}
}
Pubsub
------
This example should explain things. Once a redis connection is in a pubsub state, you must make sure you only send pubsub commands.
redis = EM::Hiredis::Client.connect
subscriber = EM::Hiredis::Client.connect
subscriber.subscribe('bar.0')
subscriber.psubscribe('bar.*')
subscriber.on(:message) { |channel, message|
p [:message, channel, message]
}
subscriber.on(:pmessage) { |key, channel, message|
p [:pmessage, key, channel, message]
}
EM.add_periodic_timer(1) {
redis.publish("bar.#{rand(2)}", "hello").errback { |e|
p [:publisherror, e]
}
}
require 'bundler'
Bundler::GemHelper.install_tasks
# -*- encoding: utf-8 -*-
$:.push File.expand_path("../lib", __FILE__)
require "em-hiredis/version"
Gem::Specification.new do |s|
s.name = "em-hiredis"
s.version = EM::Hiredis::VERSION
s.platform = Gem::Platform::RUBY
s.authors = ["Martyn Loughran"]
s.email = ["me@mloughran.com"]
s.homepage = "http://github.com/mloughran/em-hiredis"
s.summary = %q{Eventmachine redis client}
s.description = %q{Eventmachine redis client using hiredis native parser}
s.add_dependency 'hiredis', '~> 0.2.0'
s.rubyforge_project = "em-hiredis"
s.files = `git ls-files`.split("\n")
s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
s.require_paths = ["lib"]
end
require 'eventmachine'
module EM
module Hiredis
class << self
attr_writer :logger
def logger
@logger ||= begin
require 'logger'
log = Logger.new(STDOUT)
log.level = Logger::WARN
log
end
end
end
end
end
require 'em-hiredis/event_emitter'
require 'em-hiredis/connection'
require 'em-hiredis/client'
module EM::Hiredis
class Client
PUBSUB_MESSAGES = %w{message pmessage}.freeze
include EM::Hiredis::EventEmitter
include EM::Deferrable
def self.connect(host = 'localhost', port = 6379)
new(host, port)
end
def initialize(host, port)
@host, @port = host, port
@subs, @psubs = [], []
@defs = []
@connection = EM.connect(host, port, Connection, host, port)
@connection.on(:closed) {
if @connected
@defs.each { |d| d.fail("Redis disconnected") }
@defs = []
@deferred_status = nil
@connected = false
@reconnecting = true
reconnect
else
EM.add_timer(1) { reconnect }
end
}
@connection.on(:connected) {
@connected = true
select(@db) if @db
@subs.each { |s| method_missing(:subscribe, s) }
@psubs.each { |s| method_missing(:psubscribe, s) }
succeed
if @reconnecting
@reconnecting = false
emit(:reconnected)
end
}
@connection.on(:message) { |reply|
if RuntimeError === reply
raise "Replies out of sync: #{reply.inspect}" if @defs.empty?
deferred = @defs.shift
deferred.fail(reply) if deferred
else
if reply && PUBSUB_MESSAGES.include?(reply[0]) # reply can be nil
kind, subscription, d1, d2 = *reply
case kind.to_sym
when :message
emit(:message, subscription, d1)
when :pmessage
emit(:pmessage, subscription, d1, d2)
end
else
raise "Replies out of sync: #{reply.inspect}" if @defs.empty?
deferred = @defs.shift
deferred.succeed(reply) if deferred
end
end
}
@connected = false
@reconnecting = false
end
# Indicates that commands have been sent to redis but a reply has not yet
# been received.
#
# This can be useful for example to avoid stopping the
# eventmachine reactor while there are outstanding commands
#
def pending_commands?
@connected && @defs.size > 0
end
def subscribe(channel)
@subs << channel
method_missing(:subscribe, channel)
end
def unsubscribe(channel)
@subs.delete(channel)
method_missing(:unsubscribe, channel)
end
def psubscribe(channel)
@psubs << channel
method_missing(:psubscribe, channel)
end
def punsubscribe(channel)
@psubs.delete(channel)
method_missing(:punsubscribe, channel)
end
def select(db)
@db = db
method_missing(:select, db)
end
def method_missing(sym, *args)
deferred = EM::DefaultDeferrable.new
# Shortcut for defining the callback case with just a block
deferred.callback { |result| yield(result) } if block_given?
if @connected
@connection.send_command(sym, *args)
@defs.push(deferred)
else
callback {
@connection.send_command(sym, *args)
@defs.push(deferred)
}
end
return deferred
end
private
def reconnect
EM::Hiredis.logger.debug("Trying to reconnect to Redis")
@connection.reconnect @host, @port
end
end
end
require 'hiredis/reader'
module EM::Hiredis
class Connection < EM::Connection
include EM::Hiredis::EventEmitter
def initialize(host, port)
super
@host, @port = host, port
end
def connection_completed
EM::Hiredis.logger.info("Connected to Redis")
@reader = ::Hiredis::Reader.new
emit(:connected)
end
def receive_data(data)
@reader.feed(data)
until (reply = @reader.gets) == false
emit(:message, reply)
end
end
def unbind
EM::Hiredis.logger.info("Disconnected from Redis")
emit(:closed)
end
def send_command(sym, *args)
send_data(command(sym, *args))
end
protected
COMMAND_DELIMITER = "\r\n"
def command(*args)
command = []
command << "*#{args.size}"
args.each do |arg|
arg = arg.to_s
command << "$#{string_size arg}"
command << arg
end
command.join(COMMAND_DELIMITER) + COMMAND_DELIMITER
end
if "".respond_to?(:bytesize)
def string_size(string)
string.to_s.bytesize
end
else
def string_size(string)
string.to_s.size
end
end
end
end
module EM::Hiredis
module EventEmitter
def on(event, &listener)
_listeners[event] << listener
end
def emit(event, *args)
_listeners[event].each { |l| l.call(*args) }
end
def remove_listener(event, &listener)
_listeners[event].delete(listener)
end
def remove_all_listeners(event)
_listeners.delete(event)
end
def listeners(event)
_listeners[event]
end
private
def _listeners
@_listeners ||= Hash.new { |h,k| h[k] = [] }
end
end
end
module EM
module Hiredis
VERSION = "0.0.1"
end
end
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment