Class: Raft::Goliath::HttpJsonRpcProvider

Inherits:
RpcProvider
  • Object
show all
Defined in:
lib/raft/goliath.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri_generator) ⇒ HttpJsonRpcProvider



104
105
106
# File 'lib/raft/goliath.rb', line 104

def initialize(uri_generator)
  @uri_generator = uri_generator
end

Instance Attribute Details

#uri_generatorObject (readonly)

Returns the value of attribute uri_generator.



102
103
104
# File 'lib/raft/goliath.rb', line 102

def uri_generator
  @uri_generator
end

Instance Method Details

#append_entries(request, cluster, &block) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/raft/goliath.rb', line 136

def append_entries(request, cluster, &block)
  deferred_calls = []
  EM.synchrony do
    cluster.node_ids.each do |node_id|
      next if node_id == request.leader_id
      deferred_calls << create_append_entries_to_follower_request(request, node_id, &block)
    end
  end
  deferred_calls.each do |http|
    EM::Synchrony.sync http
  end
end

#append_entries_to_follower(request, node_id, &block) ⇒ Object



149
150
151
152
153
# File 'lib/raft/goliath.rb', line 149

def append_entries_to_follower(request, node_id, &block)
#        EM.synchrony do
    create_append_entries_to_follower_request(request, node_id, &block)
#        end
end

#command(request, node_id) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/raft/goliath.rb', line 178

def command(request, node_id)
  sent_hash = HashMarshalling.object_to_hash(request, %w(command))
  sent_json = MultiJson.dump(sent_hash)
  http = EventMachine::HttpRequest.new(uri_generator.call(node_id, 'command')).apost(
      :body => sent_json,
      :head => { 'Content-Type' => 'application/json' })
  http = EM::Synchrony.sync(http)
  if http.response_header.status == 200
    received_hash = MultiJson.load(http.response)
    HashMarshalling.hash_to_object(received_hash, Raft::CommandResponse)
  else
    Raft::Goliath.log("command failed for node '#{node_id}' with code #{http.response_header.status}")
    CommandResponse.new(false)
  end
end

#create_append_entries_to_follower_request(request, node_id, &block) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/raft/goliath.rb', line 155

def create_append_entries_to_follower_request(request, node_id, &block)
  sent_hash = HashMarshalling.object_to_hash(request, %w(term leader_id prev_log_index prev_log_term entries commit_index))
  sent_hash['entries'] = sent_hash['entries'].map {|obj| HashMarshalling.object_to_hash(obj, %w(term index command))}
  sent_json = MultiJson.dump(sent_hash)
  raise "replicating to self!" if request.leader_id == node_id
  #STDOUT.write("\nleader #{request.leader_id} replicating entries to #{node_id}: #{sent_hash.pretty_inspect}\n")#"\t#{caller[0..4].join("\n\t")}")

  http = EventMachine::HttpRequest.new(uri_generator.call(node_id, 'append_entries')).apost(
      :body => sent_json,
      :head => { 'Content-Type' => 'application/json' })
  http.callback do
    #STDOUT.write("\nleader #{request.leader_id} calling back to #{node_id} to append entries\n")
    if http.response_header.status == 200
      received_hash = MultiJson.load(http.response)
      response = HashMarshalling.hash_to_object(received_hash, Raft::AppendEntriesResponse)
      yield node_id, response
    else
      Raft::Goliath.log("append_entries failed for node '#{node_id}' with code #{http.response_header.status}")
    end
  end
  http
end

#request_votes(request, cluster, &block) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/raft/goliath.rb', line 108

def request_votes(request, cluster, &block)
  sent_hash = HashMarshalling.object_to_hash(request, %w(term candidate_id last_log_index last_log_term))
  sent_json = MultiJson.dump(sent_hash)
  deferred_calls = []
  EM.synchrony do
    cluster.node_ids.each do |node_id|
      next if node_id == request.candidate_id
      http = EventMachine::HttpRequest.new(uri_generator.call(node_id, 'request_vote')).apost(
          :body => sent_json,
          :head => { 'Content-Type' => 'application/json' })
      http.callback do
        if http.response_header.status == 200
          received_hash = MultiJson.load(http.response)
          response = HashMarshalling.hash_to_object(received_hash, Raft::RequestVoteResponse)
          #STDOUT.write("\n\t#{node_id} responded #{response.vote_granted} to #{request.candidate_id}\n\n")
          yield node_id, request, response
        else
          Raft::Goliath.log("request_vote failed for node '#{node_id}' with code #{http.response_header.status}")
        end
      end
      deferred_calls << http
    end
  end
  deferred_calls.each do |http|
    EM::Synchrony.sync http
  end
end