Class: Raft::Goliath::HttpJsonRpcProvider

Inherits:
RpcProvider
  • Object
show all
Defined in:
lib/raft/goliath.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri_generator) ⇒ HttpJsonRpcProvider

Returns a new instance of HttpJsonRpcProvider.



107
108
109
# File 'lib/raft/goliath.rb', line 107

def initialize(uri_generator)
  @uri_generator = uri_generator
end

Instance Attribute Details

#uri_generatorObject (readonly)

Returns the value of attribute uri_generator.



105
106
107
# File 'lib/raft/goliath.rb', line 105

def uri_generator
  @uri_generator
end

Instance Method Details

#append_entries(request, cluster, &block) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/raft/goliath.rb', line 139

def append_entries(request, cluster, &block)
  deferred_calls = []
  EM.synchrony do
    cluster.node_ids.each do |node_id|
      next if node_id == request.leader_id
      deferred_calls << create_append_entries_to_follower_request(request, node_id, &block)
    end
  end
  deferred_calls.each do |http|
    EM::Synchrony.sync http
  end
end

#append_entries_to_follower(request, node_id, &block) ⇒ Object



152
153
154
155
156
# File 'lib/raft/goliath.rb', line 152

def append_entries_to_follower(request, node_id, &block)
#        EM.synchrony do
    create_append_entries_to_follower_request(request, node_id, &block)
#        end
end

#command(request, node_id) ⇒ Object



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/raft/goliath.rb', line 181

def command(request, node_id)
  sent_hash = HashMarshalling.object_to_hash(request, %w(command))
  sent_json = MultiJson.dump(sent_hash)
  http = EventMachine::HttpRequest.new(uri_generator.call(node_id, 'command')).apost(
      :body => sent_json,
      :head => { 'Content-Type' => 'application/json' })
  http = EM::Synchrony.sync(http)
  if http.response_header.status == 200
    received_hash = MultiJson.load(http.response)
    HashMarshalling.hash_to_object(received_hash, Raft::CommandResponse)
  else
    Raft::Goliath.log("command failed for node '#{node_id}' with code #{http.response_header.status}")
    CommandResponse.new(false)
  end
end

#create_append_entries_to_follower_request(request, node_id, &block) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/raft/goliath.rb', line 158

def create_append_entries_to_follower_request(request, node_id, &block)
  sent_hash = HashMarshalling.object_to_hash(request, %w(term leader_id prev_log_index prev_log_term entries commit_index))
  sent_hash['entries'] = sent_hash['entries'].map {|obj| HashMarshalling.object_to_hash(obj, %w(term index command))}
  sent_json = MultiJson.dump(sent_hash)
  raise "replicating to self!" if request.leader_id == node_id
  #STDOUT.write("\nleader #{request.leader_id} replicating entries to #{node_id}: #{sent_hash.pretty_inspect}\n")#"\t#{caller[0..4].join("\n\t")}")

  http = EventMachine::HttpRequest.new(uri_generator.call(node_id, 'append_entries')).apost(
      :body => sent_json,
      :head => { 'Content-Type' => 'application/json' })
  http.callback do
    #STDOUT.write("\nleader #{request.leader_id} calling back to #{node_id} to append entries\n")
    if http.response_header.status == 200
      received_hash = MultiJson.load(http.response)
      response = HashMarshalling.hash_to_object(received_hash, Raft::AppendEntriesResponse)
      yield node_id, response
    else
      Raft::Goliath.log("append_entries failed for node '#{node_id}' with code #{http.response_header.status}")
    end
  end
  http
end

#request_votes(request, cluster, &block) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/raft/goliath.rb', line 111

def request_votes(request, cluster, &block)
  sent_hash = HashMarshalling.object_to_hash(request, %w(term candidate_id last_log_index last_log_term))
  sent_json = MultiJson.dump(sent_hash)
  deferred_calls = []
  EM.synchrony do
    cluster.node_ids.each do |node_id|
      next if node_id == request.candidate_id
      http = EventMachine::HttpRequest.new(uri_generator.call(node_id, 'request_vote')).apost(
          :body => sent_json,
          :head => { 'Content-Type' => 'application/json' })
      http.callback do
        if http.response_header.status == 200
          received_hash = MultiJson.load(http.response)
          response = HashMarshalling.hash_to_object(received_hash, Raft::RequestVoteResponse)
          #STDOUT.write("\n\t#{node_id} responded #{response.vote_granted} to #{request.candidate_id}\n\n")
          yield node_id, request, response
        else
          Raft::Goliath.log("request_vote failed for node '#{node_id}' with code #{http.response_header.status}")
        end
      end
      deferred_calls << http
    end
  end
  deferred_calls.each do |http|
    EM::Synchrony.sync http
  end
end