Class: OriginatorProtocol

Inherits:
Object
  • Object
show all
Defined in:
lib/gorgon/originator_protocol.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, cluster_id = nil) ⇒ OriginatorProtocol

Returns a new instance of OriginatorProtocol.



8
9
10
11
# File 'lib/gorgon/originator_protocol.rb', line 8

def initialize(logger, cluster_id=nil)
  @job_queue_name = OriginatorProtocol.job_queue_name(cluster_id)
  @logger = logger
end

Class Method Details

.job_queue_name(cluster_id) ⇒ Object



13
14
15
16
17
18
19
# File 'lib/gorgon/originator_protocol.rb', line 13

def self.job_queue_name(cluster_id)
  if cluster_id
    "gorgon.jobs.#{cluster_id}"
  else
    'gorgon.jobs'
  end
end

Instance Method Details

#cancel_jobObject



55
56
57
58
59
# File 'lib/gorgon/originator_protocol.rb', line 55

def cancel_job
  @file_queue.purge if @file_queue
  @channel.fanout("gorgon.worker_managers").publish(cancel_message) if @channel
  @logger.log "Cancel Message sent"
end

#connect(connection_information, options = {}) ⇒ Object



21
22
23
24
25
26
# File 'lib/gorgon/originator_protocol.rb', line 21

def connect connection_information, options={}
  @connection = AMQP.connect(connection_information)
  @channel = AMQP::Channel.new(@connection)
  @connection.on_closed { options[:on_closed].call } if options[:on_closed]
  open_queues
end

#disconnectObject



61
62
63
64
# File 'lib/gorgon/originator_protocol.rb', line 61

def disconnect
  cleanup_queues_and_exchange
  @connection.disconnect if @connection
end

#publish_files(files) ⇒ Object



28
29
30
31
32
33
34
# File 'lib/gorgon/originator_protocol.rb', line 28

def publish_files files
  @file_queue = @channel.queue("file_queue_" + UUIDTools::UUID.timestamp_create.to_s, :auto_delete => true)

  files.each do |file|
    @channel.default_exchange.publish(file, :routing_key => @file_queue.name)
  end
end

#publish_job(job_definition) ⇒ Object



36
37
38
39
40
41
# File 'lib/gorgon/originator_protocol.rb', line 36

def publish_job job_definition
  job_definition.file_queue_name = @file_queue.name
  job_definition.reply_exchange_name = @reply_exchange.name

  @channel.fanout(@job_queue_name).publish(job_definition.to_json)
end

#receive_payloadsObject



49
50
51
52
53
# File 'lib/gorgon/originator_protocol.rb', line 49

def receive_payloads
  @reply_queue.subscribe do |payload|
    yield payload
  end
end

#send_message_to_listeners(type, body = {}) ⇒ Object



43
44
45
46
47
# File 'lib/gorgon/originator_protocol.rb', line 43

def send_message_to_listeners type, body={}
  # TODO: we probably want to use a different exchange for this type of messages
  message = {:type => type, :reply_exchange_name => @reply_exchange.name, :body => body}
  @channel.fanout(@job_queue_name).publish(Yajl::Encoder.encode(message))
end