Class: Pione::Notification::TaskWorkerBrokerRecipient

Inherits:
Recipient
  • Object
show all
Defined in:
lib/pione/notification/task-worker-broker-recipient.rb

Overview

TaskWorkerBrokerRecipient is a recipient for task worker broker agent.

Instance Method Summary collapse

Methods inherited from Recipient

#notify

Constructor Details

#initialize(model, front_uri, listener_uri) ⇒ TaskWorkerBrokerRecipient

Returns a new instance of TaskWorkerBrokerRecipient.

Parameters:

  • model (TaskWorkerBrokerModel)

    task worker broker model

  • front_uri (URI)

    URI of command front

  • listener_uri (URI)

    URI of notification listener



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/pione/notification/task-worker-broker-recipient.rb', line 11

def initialize(model, front_uri, listener_uri)
  super(front_uri, listener_uri)

  @model = model
  @tuple_space = {}
  @lock = Mutex.new

  # update broker's tuple spaces
  @thread = Thread.new do
    while true
      sleep 1
      clean
      update_broker
    end
  end
end

Instance Method Details

#cleanObject

Clean tuple space table.



68
69
70
71
72
73
74
# File 'lib/pione/notification/task-worker-broker-recipient.rb', line 68

def clean
  @lock.synchronize do
    now = Time.now
    dtime = Global.tuple_space_disconnection_time
    @tuple_space.delete_if {|_, holder| (now - holder[:last_time]) > dtime}
  end
end

#get_tuple_space(uri) ⇒ Object

Get a tuple space from front server at the URI.



53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/pione/notification/task-worker-broker-recipient.rb', line 53

def get_tuple_space(uri)
  # build a reference to provider front
  front = DRb::DRbObject.new_with_uri(uri)

  # return the tuple space reference
  Timeout.timeout(3) {front.tuple_space}
rescue Timeout::Error
  Log::Debug.notification do
    'tuple_space notfication ignored the provider "%s" that seems to be something bad' % front.uri
  end
rescue DRb::DRbConnError, DRbPatch::ReplyReaderError => e
  Log::Debug.notification('The tuple space at "%s" disconnected: %s' % [front.uri, e.message])
end

#receive_tuple_space(message) ⇒ Object

Receive a "tupele space" message.



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/pione/notification/task-worker-broker-recipient.rb', line 35

def receive_tuple_space(message)
  uri = message["front"]
  if @tuple_space.has_key?(uri)
    @lock.synchronize {@tuple_space[uri][:last_time] = Time.now}
  else
    if tuple_space = get_tuple_space(uri)
      @lock.synchronize do
        @tuple_space[uri] = {:last_time => Time.now, :tuple_space => tuple_space}
      end
    end
  end
end

#terminateObject

Terminate the recipient.



29
30
31
32
# File 'lib/pione/notification/task-worker-broker-recipient.rb', line 29

def terminate
  super
  @thread.terminate
end

#update_brokerObject

Update the tuple space list of broker.



77
78
79
80
81
82
# File 'lib/pione/notification/task-worker-broker-recipient.rb', line 77

def update_broker
  @lock.synchronize do
    tuple_spaces = @tuple_space.values.map {|holder| holder[:tuple_space]}
    @model.update_tuple_spaces(tuple_spaces)
  end
end