Class: OriginatorProtocol

Inherits:
Object
  • Object
show all
Defined in:
lib/gorgon/originator_protocol.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, cluster_id = nil) ⇒ OriginatorProtocol

Returns a new instance of OriginatorProtocol.



8
9
10
11
12
# File 'lib/gorgon/originator_protocol.rb', line 8

def initialize(logger, cluster_id=nil)
  @originator_exchange_name = OriginatorProtocol.originator_exchange_name(cluster_id)
  @job_exchange_name = OriginatorProtocol.job_exchange_name(cluster_id)
  @logger = logger
end

Class Method Details

.job_exchange_name(cluster_id) ⇒ Object



22
23
24
25
26
27
28
# File 'lib/gorgon/originator_protocol.rb', line 22

def self.job_exchange_name(cluster_id)
  if cluster_id
    "gorgon.jobs.#{cluster_id}"
  else
    'gorgon.jobs'
  end
end

.originator_exchange_name(cluster_id) ⇒ Object



14
15
16
17
18
19
20
# File 'lib/gorgon/originator_protocol.rb', line 14

def self.originator_exchange_name(cluster_id)
  if cluster_id
    "gorgon.originators.#{cluster_id}"
  else
    "gorgon.originators"
  end
end

Instance Method Details

#append_protocol_information_to_job_definition(job_definition) ⇒ Object



55
56
57
58
59
60
61
62
# File 'lib/gorgon/originator_protocol.rb', line 55

def append_protocol_information_to_job_definition job_definition
  job_definition = job_definition.dup

  job_definition.file_queue_name = @file_queue.name
  job_definition.reply_exchange_name = @reply_exchange.name

  return job_definition
end

#cancel_jobObject



82
83
84
85
86
# File 'lib/gorgon/originator_protocol.rb', line 82

def cancel_job
  @file_queue.purge if @file_queue
  @channel.fanout("gorgon.worker_managers").publish(cancel_message) if @channel
  @logger.log "Cancel Message sent"
end

#connect(connection_information, options = {}) ⇒ Object



30
31
32
33
34
35
# File 'lib/gorgon/originator_protocol.rb', line 30

def connect connection_information, options={}
  @connection = AMQP.connect(connection_information)
  @channel = AMQP::Channel.new(@connection)
  @connection.on_closed { options[:on_closed].call } if options[:on_closed]
  open_queues
end

#disconnectObject



88
89
90
91
# File 'lib/gorgon/originator_protocol.rb', line 88

def disconnect
  cleanup_queues_and_exchange
  @connection.disconnect if @connection
end

#publish_files(files) ⇒ Object



37
38
39
40
41
42
43
# File 'lib/gorgon/originator_protocol.rb', line 37

def publish_files files
  @file_queue = @channel.queue("file_queue_" + UUIDTools::UUID.timestamp_create.to_s, :auto_delete => true)

  files.each do |file|
    @channel.default_exchange.publish(file, :routing_key => @file_queue.name)
  end
end

#publish_job_to_all(job_definition) ⇒ Object



45
46
47
48
# File 'lib/gorgon/originator_protocol.rb', line 45

def publish_job_to_all job_definition
  job_definition = append_protocol_information_to_job_definition(job_definition)
  @channel.fanout(@job_exchange_name).publish(job_definition.to_json)
end

#publish_job_to_one(job_definition, listener_queue_name) ⇒ Object



50
51
52
53
# File 'lib/gorgon/originator_protocol.rb', line 50

def publish_job_to_one job_definition, listener_queue_name
  job_definition = append_protocol_information_to_job_definition(job_definition)
  @channel.default_exchange.publish(job_definition.to_json, :routing_key => listener_queue_name)
end

#receive_new_listener_notificationsObject



76
77
78
79
80
# File 'lib/gorgon/originator_protocol.rb', line 76

def receive_new_listener_notifications
  @originator_queue.subscribe do |payload|
    yield payload
  end
end

#receive_payloadsObject



70
71
72
73
74
# File 'lib/gorgon/originator_protocol.rb', line 70

def receive_payloads
  @reply_queue.subscribe do |payload|
    yield payload
  end
end

#send_message_to_listeners(type, body = {}) ⇒ Object



64
65
66
67
68
# File 'lib/gorgon/originator_protocol.rb', line 64

def send_message_to_listeners type, body={}
  # TODO: we probably want to use a different exchange for this type of messages
  message = {:type => type, :reply_exchange_name => @reply_exchange.name, :body => body}
  @channel.fanout(@job_exchange_name).publish(Yajl::Encoder.encode(message))
end