Class: Cloudist::Listener

Inherits:
Object show all
Includes:
ActiveSupport::Callbacks
Defined in:
lib/cloudist/listener.rb

Direct Known Subclasses

GenericListener

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#job_queue_nameObject (readonly)

Returns the value of attribute job_queue_name.



6
7
8
# File 'lib/cloudist/listener.rb', line 6

def job_queue_name
  @job_queue_name
end

#payloadObject (readonly)

Returns the value of attribute payload.



6
7
8
# File 'lib/cloudist/listener.rb', line 6

def payload
  @payload
end

Class Method Details

.after(*args, &block) ⇒ Object



30
31
32
# File 'lib/cloudist/listener.rb', line 30

def after(*args, &block)
  set_callback(:call, :after, *args, &block)
end

.before(*args, &block) ⇒ Object



26
27
28
# File 'lib/cloudist/listener.rb', line 26

def before(*args, &block)
  set_callback(:call, :before, *args, &block)
end

.listen_to(*job_queue_names) ⇒ Object



10
11
12
# File 'lib/cloudist/listener.rb', line 10

def listen_to(*job_queue_names)
  self.job_queue_names = job_queue_names.map { |q| Utils.reply_prefix(q) }
end

.subscribe(queue_name) ⇒ Object

Raises:

  • (RuntimeError)


14
15
16
17
18
19
20
21
22
23
24
# File 'lib/cloudist/listener.rb', line 14

def subscribe(queue_name)
  raise RuntimeError, "You can't subscribe until EM is running" unless EM.reactor_running?

  reply_queue = Cloudist::ReplyQueue.new(queue_name)
  reply_queue.subscribe do |request|
    instance = Cloudist.listener_instances[queue_name] ||= new
    instance.handle_request(request)
  end

  queue_name
end

Instance Method Details

#call(meth, args) ⇒ Object



98
99
100
101
102
103
104
105
106
# File 'lib/cloudist/listener.rb', line 98

def call(meth, args)
  run_callbacks :call do
    if args.empty?
      send(meth)
    else
      send(meth, *args)
    end
  end
end

#dataObject



56
57
58
# File 'lib/cloudist/listener.rb', line 56

def data
  payload.body
end

#error(e) ⇒ Object



124
125
126
# File 'lib/cloudist/listener.rb', line 124

def error(e)
  # :noop
end

#event(type) ⇒ Object



116
117
118
# File 'lib/cloudist/listener.rb', line 116

def event(type)
  # :noop
end

#handle_key(key) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/cloudist/listener.rb', line 60

def handle_key(key)
  key = key.split(':', 2)
  return [nil, nil] if key.empty?

  method_and_args = [key.shift.to_sym]
  case method_and_args[0]
  when :event
    if key.size > 0 && self.respond_to?(key.first)
      method_and_args = [key.shift]
    end
    method_and_args << key

  when :progress
    method_and_args << payload.progress
    method_and_args << payload.description

  when :runtime
    method_and_args << payload.runtime

  when :reply

  when :update

  when :error
    # method_and_args << Cloudist::SafeError.new(payload)
    method_and_args << Hashie::Mash.new(payload.body)

  when :log
    method_and_args << payload.message
    method_and_args << payload.level

  else
    method_and_args << data if method(method_and_args[0]).arity == 1
  end

  return method_and_args
end

#handle_request(request) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/cloudist/listener.rb', line 37

def handle_request(request)
  @payload = request.payload
  key = [payload.message_type.to_s, payload.headers[:event]].compact.join(':')

  meth, *args = handle_key(key)

  if meth && self.respond_to?(meth)
    if method(meth).arity <= args.size
      call(meth, args.first(method(meth).arity))
    else
      raise ArgumentError, "Unable to fire callback (#{meth}) because we don't have enough args"
    end
  end
end

#idObject



52
53
54
# File 'lib/cloudist/listener.rb', line 52

def id
  payload.id
end

#log(message, level) ⇒ Object



120
121
122
# File 'lib/cloudist/listener.rb', line 120

def log(message, level)
  # :noop
end

#progress(pct) ⇒ Object



108
109
110
# File 'lib/cloudist/listener.rb', line 108

def progress(pct)
  # :noop
end

#runtime(seconds) ⇒ Object



112
113
114
# File 'lib/cloudist/listener.rb', line 112

def runtime(seconds)
  # :noop
end