Module: Tapyrus::Network::MessageHandler
- Included in:
- Connection
- Defined in:
- lib/tapyrus/network/message_handler.rb
Overview
P2P message handler used by peer connection class.
Instance Method Summary collapse
-
#defer_handle_command(command, payload) ⇒ Object
handle command with EM#defer.
-
#handle(message) ⇒ Object
handle p2p message.
- #handle_command(command, payload) ⇒ Object
- #handshake_done ⇒ Object
- #on_addr(addr) ⇒ Object
- #on_block(block) ⇒ Object
- #on_cmpct_block(cmpct_block) ⇒ Object
- #on_fee_filter(fee_filter) ⇒ Object
- #on_get_addr ⇒ Object
- #on_get_data(get_data) ⇒ Object
- #on_get_headers(headers) ⇒ Object
- #on_headers(headers) ⇒ Object
- #on_inv(inv) ⇒ Object
- #on_mem_pool ⇒ Object
- #on_merkle_block(merkle_block) ⇒ Object
- #on_not_found(not_found) ⇒ Object
- #on_ping(ping) ⇒ Object
- #on_pong(pong) ⇒ Object
- #on_reject(reject) ⇒ Object
- #on_send_cmpct(cmpct) ⇒ Object
- #on_send_headers ⇒ Object
- #on_tx(tx) ⇒ Object
- #on_ver_ack ⇒ Object
- #on_version(version) ⇒ Object
- #parse(message) ⇒ Object
- #parse_header ⇒ Object
- #send_message(msg) ⇒ Object
Instance Method Details
#defer_handle_command(command, payload) ⇒ Object
handle command with EM#defer
47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/tapyrus/network/message_handler.rb', line 47 def defer_handle_command(command, payload) operation = proc { handle_command(command, payload) } callback = proc { |result| } errback = proc do |e| logger.error("error occurred. #{e.message}") logger.error(e.backtrace) peer.handle_error(e) end EM.defer(operation, callback, errback) end |
#handle(message) ⇒ Object
handle p2p message.
8 9 10 11 12 13 14 15 16 17 |
# File 'lib/tapyrus/network/message_handler.rb', line 8 def handle() peer.last_recv = Time.now.to_i peer.bytes_recv += .bytesize begin parse() rescue Tapyrus::Message::Error => e logger.error("invalid header magic. #{e.message}") close end end |
#handle_command(command, payload) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/tapyrus/network/message_handler.rb', line 59 def handle_command(command, payload) logger.info("[#{addr}] process command #{command}.") case command when Tapyrus::Message::Version::COMMAND on_version(Tapyrus::Message::Version.parse_from_payload(payload)) when Tapyrus::Message::VerAck::COMMAND on_ver_ack when Tapyrus::Message::GetAddr::COMMAND on_get_addr when Tapyrus::Message::Addr::COMMAND on_addr(Tapyrus::Message::Addr.parse_from_payload(payload)) when Tapyrus::Message::SendHeaders::COMMAND on_send_headers when Tapyrus::Message::FeeFilter::COMMAND on_fee_filter(Tapyrus::Message::FeeFilter.parse_from_payload(payload)) when Tapyrus::Message::Ping::COMMAND on_ping(Tapyrus::Message::Ping.parse_from_payload(payload)) when Tapyrus::Message::Pong::COMMAND on_pong(Tapyrus::Message::Pong.parse_from_payload(payload)) when Tapyrus::Message::GetHeaders::COMMAND on_get_headers(Tapyrus::Message::GetHeaders.parse_from_payload(payload)) when Tapyrus::Message::Headers::COMMAND on_headers(Tapyrus::Message::Headers.parse_from_payload(payload)) when Tapyrus::Message::Block::COMMAND on_block(Tapyrus::Message::Block.parse_from_payload(payload)) when Tapyrus::Message::Tx::COMMAND on_tx(Tapyrus::Message::Tx.parse_from_payload(payload)) when Tapyrus::Message::NotFound::COMMAND on_not_found(Tapyrus::Message::NotFound.parse_from_payload(payload)) when Tapyrus::Message::MemPool::COMMAND on_mem_pool when Tapyrus::Message::Reject::COMMAND on_reject(Tapyrus::Message::Reject.parse_from_payload(payload)) when Tapyrus::Message::SendCmpct::COMMAND on_send_cmpct(Tapyrus::Message::SendCmpct.parse_from_payload(payload)) when Tapyrus::Message::Inv::COMMAND on_inv(Tapyrus::Message::Inv.parse_from_payload(payload)) when Tapyrus::Message::MerkleBlock::COMMAND on_merkle_block(Tapyrus::Message::MerkleBlock.parse_from_payload(payload)) when Tapyrus::Message::CmpctBlock::COMMAND on_cmpct_block(Tapyrus::Message::CmpctBlock.parse_from_payload(payload)) when Tapyrus::Message::GetData::COMMAND on_get_data(Tapyrus::Message::GetData.parse_from_payload(payload)) else logger.warn("unsupported command received. command: #{command}, payload: #{payload.bth}") close("with command #{command}") end end |
#handshake_done ⇒ Object
116 117 118 119 120 121 |
# File 'lib/tapyrus/network/message_handler.rb', line 116 def handshake_done return unless @incomming_handshake && @outgoing_handshake logger.info "handshake finished." @connected = true post_handshake end |
#on_addr(addr) ⇒ Object
142 143 144 145 |
# File 'lib/tapyrus/network/message_handler.rb', line 142 def on_addr(addr) logger.info("receive addr message. #{addr.build_json}") # TODO end |
#on_block(block) ⇒ Object
182 183 184 185 |
# File 'lib/tapyrus/network/message_handler.rb', line 182 def on_block(block) logger.info("receive block message.") # TODO end |
#on_cmpct_block(cmpct_block) ⇒ Object
235 236 237 |
# File 'lib/tapyrus/network/message_handler.rb', line 235 def on_cmpct_block(cmpct_block) logger.info("receive cmpct_block message. #{cmpct_block.build_json}") end |
#on_fee_filter(fee_filter) ⇒ Object
152 153 154 155 |
# File 'lib/tapyrus/network/message_handler.rb', line 152 def on_fee_filter(fee_filter) logger.info("receive feefilter message. #{fee_filter.build_json}") @fee_rate = fee_filter.fee_rate end |
#on_get_addr ⇒ Object
137 138 139 140 |
# File 'lib/tapyrus/network/message_handler.rb', line 137 def on_get_addr logger.info("receive getaddr message.") peer.send_addrs end |
#on_get_data(get_data) ⇒ Object
239 240 241 |
# File 'lib/tapyrus/network/message_handler.rb', line 239 def on_get_data(get_data) logger.info("receive get data message. #{get_data.build_json}") end |
#on_get_headers(headers) ⇒ Object
172 173 174 175 |
# File 'lib/tapyrus/network/message_handler.rb', line 172 def on_get_headers(headers) logger.info("receive getheaders message.") # TODO end |
#on_headers(headers) ⇒ Object
177 178 179 180 |
# File 'lib/tapyrus/network/message_handler.rb', line 177 def on_headers(headers) logger.info("receive headers message.") peer.handle_headers(headers) end |
#on_inv(inv) ⇒ Object
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/tapyrus/network/message_handler.rb', line 212 def on_inv(inv) logger.info("receive inv message.") blocks = [] txs = [] inv.inventories.each do |i| case i.identifier when Tapyrus::Message::Inventory::MSG_TX txs << i.hash when Tapyrus::Message::Inventory::MSG_BLOCK blocks << i.hash else logger.warn("[#{addr}] peer sent unknown inv type: #{i.identifier}") end end logger.info("receive block= #{blocks.size}, txs: #{txs.size}") peer.handle_block_inv(blocks) unless blocks.empty? end |
#on_mem_pool ⇒ Object
197 198 199 200 |
# File 'lib/tapyrus/network/message_handler.rb', line 197 def on_mem_pool logger.info("receive mempool message.") # TODO return mempool tx end |
#on_merkle_block(merkle_block) ⇒ Object
230 231 232 233 |
# File 'lib/tapyrus/network/message_handler.rb', line 230 def on_merkle_block(merkle_block) logger.info("receive merkle block message. #{merkle_block.build_json}") peer.handle_merkle_block(merkle_block) end |
#on_not_found(not_found) ⇒ Object
192 193 194 195 |
# File 'lib/tapyrus/network/message_handler.rb', line 192 def on_not_found(not_found) logger.info("receive notfound message. #{not_found.build_json}") # TODO end |
#on_ping(ping) ⇒ Object
157 158 159 160 |
# File 'lib/tapyrus/network/message_handler.rb', line 157 def on_ping(ping) logger.info("receive ping message. #{ping.build_json}") (ping.to_response) end |
#on_pong(pong) ⇒ Object
162 163 164 165 166 167 168 169 170 |
# File 'lib/tapyrus/network/message_handler.rb', line 162 def on_pong(pong) logger.info("receive pong message. #{pong.build_json}") if pong.nonce == peer.last_ping_nonce peer.last_ping_nonce = nil peer.last_pong = Time.now.to_i else logger.debug "The remote peer sent the wrong nonce (#{pong.nonce})." end end |
#on_reject(reject) ⇒ Object
202 203 204 205 |
# File 'lib/tapyrus/network/message_handler.rb', line 202 def on_reject(reject) logger.warn("receive reject message. #{reject.build_json}") # TODO end |
#on_send_cmpct(cmpct) ⇒ Object
207 208 209 210 |
# File 'lib/tapyrus/network/message_handler.rb', line 207 def on_send_cmpct(cmpct) logger.info("receive sendcmpct message. #{cmpct.build_json}") # TODO if mode is high and version is 1, relay block with cmpctblock message end |
#on_send_headers ⇒ Object
147 148 149 150 |
# File 'lib/tapyrus/network/message_handler.rb', line 147 def on_send_headers logger.info("receive sendheaders message.") @sendheaders = true end |
#on_tx(tx) ⇒ Object
187 188 189 190 |
# File 'lib/tapyrus/network/message_handler.rb', line 187 def on_tx(tx) logger.info("receive tx message. #{tx.build_json}") peer.handle_tx(tx) end |
#on_ver_ack ⇒ Object
131 132 133 134 135 |
# File 'lib/tapyrus/network/message_handler.rb', line 131 def on_ver_ack logger.info("receive verack message.") @outgoing_handshake = true handshake_done end |
#on_version(version) ⇒ Object
123 124 125 126 127 128 129 |
# File 'lib/tapyrus/network/message_handler.rb', line 123 def on_version(version) logger.info("receive version message. #{version.build_json}") @version = version (Tapyrus::Message::VerAck.new) @incomming_handshake = true handshake_done end |
#parse(message) ⇒ Object
19 20 21 22 23 24 25 26 27 |
# File 'lib/tapyrus/network/message_handler.rb', line 19 def parse() += command, payload, rest = parse_header return unless command defer_handle_command(command, payload) = "" parse(rest) if rest && rest.bytesize > 0 end |
#parse_header ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/tapyrus/network/message_handler.rb', line 29 def parse_header head_magic = Tapyrus.chain_params.magic_head return if .nil? || .size < MESSAGE_HEADER_SIZE magic, command, length, checksum = .unpack("a4A12Va4") raise Tapyrus::Message::Error, "invalid header magic. #{magic.bth}" unless magic.bth == head_magic payload = [MESSAGE_HEADER_SIZE...(MESSAGE_HEADER_SIZE + length)] return if payload.size < length unless Tapyrus.double_sha256(payload)[0...4] == checksum raise Tapyrus::Message::Error, "header checksum mismatch. #{checksum.bth}" end rest = [(MESSAGE_HEADER_SIZE + length)..-1] [command, payload, rest] end |
#send_message(msg) ⇒ Object
108 109 110 111 112 113 114 |
# File 'lib/tapyrus/network/message_handler.rb', line 108 def (msg) logger.info "send message #{msg.class::COMMAND}" pkt = msg.to_pkt peer.last_send = Time.now.to_i peer.bytes_sent = pkt.bytesize send_data(pkt) end |