Class: Raft::Goliath::HttpJsonRpcProvider
- Inherits:
-
RpcProvider
- Object
- RpcProvider
- Raft::Goliath::HttpJsonRpcProvider
- Defined in:
- lib/raft/goliath.rb
Instance Attribute Summary collapse
-
#uri_generator ⇒ Object
readonly
Returns the value of attribute uri_generator.
Instance Method Summary collapse
- #append_entries(request, cluster, &block) ⇒ Object
- #append_entries_to_follower(request, node_id, &block) ⇒ Object
- #command(request, node_id) ⇒ Object
- #create_append_entries_to_follower_request(request, node_id, &block) ⇒ Object
-
#initialize(uri_generator) ⇒ HttpJsonRpcProvider
constructor
A new instance of HttpJsonRpcProvider.
- #request_votes(request, cluster, &block) ⇒ Object
Constructor Details
#initialize(uri_generator) ⇒ HttpJsonRpcProvider
Returns a new instance of HttpJsonRpcProvider.
107 108 109 |
# File 'lib/raft/goliath.rb', line 107 def initialize(uri_generator) @uri_generator = uri_generator end |
Instance Attribute Details
#uri_generator ⇒ Object (readonly)
Returns the value of attribute uri_generator.
105 106 107 |
# File 'lib/raft/goliath.rb', line 105 def uri_generator @uri_generator end |
Instance Method Details
#append_entries(request, cluster, &block) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/raft/goliath.rb', line 139 def append_entries(request, cluster, &block) deferred_calls = [] EM.synchrony do cluster.node_ids.each do |node_id| next if node_id == request.leader_id deferred_calls << create_append_entries_to_follower_request(request, node_id, &block) end end deferred_calls.each do |http| EM::Synchrony.sync http end end |
#append_entries_to_follower(request, node_id, &block) ⇒ Object
152 153 154 155 156 |
# File 'lib/raft/goliath.rb', line 152 def append_entries_to_follower(request, node_id, &block) # EM.synchrony do create_append_entries_to_follower_request(request, node_id, &block) # end end |
#command(request, node_id) ⇒ Object
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/raft/goliath.rb', line 181 def command(request, node_id) sent_hash = HashMarshalling.object_to_hash(request, %w(command)) sent_json = MultiJson.dump(sent_hash) http = EventMachine::HttpRequest.new(uri_generator.call(node_id, 'command')).apost( :body => sent_json, :head => { 'Content-Type' => 'application/json' }) http = EM::Synchrony.sync(http) if http.response_header.status == 200 received_hash = MultiJson.load(http.response) HashMarshalling.hash_to_object(received_hash, Raft::CommandResponse) else Raft::Goliath.log("command failed for node '#{node_id}' with code #{http.response_header.status}") CommandResponse.new(false) end end |
#create_append_entries_to_follower_request(request, node_id, &block) ⇒ Object
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/raft/goliath.rb', line 158 def create_append_entries_to_follower_request(request, node_id, &block) sent_hash = HashMarshalling.object_to_hash(request, %w(term leader_id prev_log_index prev_log_term entries commit_index)) sent_hash['entries'] = sent_hash['entries'].map {|obj| HashMarshalling.object_to_hash(obj, %w(term index command))} sent_json = MultiJson.dump(sent_hash) raise "replicating to self!" if request.leader_id == node_id #STDOUT.write("\nleader #{request.leader_id} replicating entries to #{node_id}: #{sent_hash.pretty_inspect}\n")#"\t#{caller[0..4].join("\n\t")}") http = EventMachine::HttpRequest.new(uri_generator.call(node_id, 'append_entries')).apost( :body => sent_json, :head => { 'Content-Type' => 'application/json' }) http.callback do #STDOUT.write("\nleader #{request.leader_id} calling back to #{node_id} to append entries\n") if http.response_header.status == 200 received_hash = MultiJson.load(http.response) response = HashMarshalling.hash_to_object(received_hash, Raft::AppendEntriesResponse) yield node_id, response else Raft::Goliath.log("append_entries failed for node '#{node_id}' with code #{http.response_header.status}") end end http end |
#request_votes(request, cluster, &block) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/raft/goliath.rb', line 111 def request_votes(request, cluster, &block) sent_hash = HashMarshalling.object_to_hash(request, %w(term candidate_id last_log_index last_log_term)) sent_json = MultiJson.dump(sent_hash) deferred_calls = [] EM.synchrony do cluster.node_ids.each do |node_id| next if node_id == request.candidate_id http = EventMachine::HttpRequest.new(uri_generator.call(node_id, 'request_vote')).apost( :body => sent_json, :head => { 'Content-Type' => 'application/json' }) http.callback do if http.response_header.status == 200 received_hash = MultiJson.load(http.response) response = HashMarshalling.hash_to_object(received_hash, Raft::RequestVoteResponse) #STDOUT.write("\n\t#{node_id} responded #{response.vote_granted} to #{request.candidate_id}\n\n") yield node_id, request, response else Raft::Goliath.log("request_vote failed for node '#{node_id}' with code #{http.response_header.status}") end end deferred_calls << http end end deferred_calls.each do |http| EM::Synchrony.sync http end end |