Class: DRbQS::MessageServer

Inherits:
Object
  • Object
show all
Includes:
HistoryUtils
Defined in:
lib/drbqs/message.rb

Instance Method Summary collapse

Methods included from HistoryUtils

time_to_string

Constructor Details

#initialize(message, logger = nil) ⇒ MessageServer

Returns a new instance of MessageServer.



7
8
9
10
11
# File 'lib/drbqs/message.rb', line 7

def initialize(message, logger = nil)
  @message = message
  @node_list = NodeList.new
  @logger = logger
end

Instance Method Details

#check_connectionObject



47
48
49
50
51
52
53
54
55
# File 'lib/drbqs/message.rb', line 47

def check_connection
  deleted = @node_list.delete_not_alive
  @logger.info("IDs of deleted nodes") { deleted } if deleted.size > 0 && @logger
  @node_list.each do |id, str|
    @message.write([id, :alive_p])
  end
  @node_list.set_check_connection
  deleted
end

#get_messageObject



13
14
15
16
17
18
19
20
# File 'lib/drbqs/message.rb', line 13

def get_message
  begin
    mes = @message.take([:server, Symbol, nil], 0)
    manage_message(*mes[1..2])
  rescue Rinda::RequestExpiredError
    nil
  end
end

#node_not_exist?Boolean

Returns:

  • (Boolean)


95
96
97
# File 'lib/drbqs/message.rb', line 95

def node_not_exist?
  @node_list.empty?
end

#send_exitObject



64
65
66
# File 'lib/drbqs/message.rb', line 64

def send_exit
  send_signal_to_all_nodes(:exit)
end

#send_finalization(task) ⇒ Object



68
69
70
71
# File 'lib/drbqs/message.rb', line 68

def send_finalization(task)
  set_finalization(task)
  send_signal_to_all_nodes(:finalize)
end

#send_status(calculating_task_id) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/drbqs/message.rb', line 73

def send_status(calculating_task_id)
  s = ''
  @node_list.history.each do |node_id, events|
    if events.size == 0 || events.size > 2
      raise "Invalid history of nodes: #{events.inspect}"
    end
    connect = events[0]
    s << sprintf("%4d %s\t", node_id, connect[2])
    if disconnect = events[1]
      s << "disconnected: (#{time_to_string(connect[0])} - #{time_to_string(disconnect[0])})\n"
    else
      task_ids = calculating_task_id[node_id]
      s << "task: #{task_ids.map { |num| num.to_s }.join(', ')} (#{time_to_string(connect[0])})\n"
    end
  end
  begin
    @message.take([:status, nil], 0)
  rescue Rinda::RequestExpiredError
  end
  @message.write([:status, s])
end

#set_finalization(task) ⇒ Object



114
115
116
# File 'lib/drbqs/message.rb', line 114

def set_finalization(task)
  set_special_task(:finalization, task)
end

#set_initialization(task) ⇒ Object

If the task has already set, the method overwrite old task of initialization by new task.



110
111
112
# File 'lib/drbqs/message.rb', line 110

def set_initialization(task)
  set_special_task(:initialize, task)
end