Class: Gossiperl::Client::Messaging
- Inherits:
-
Resolution
- Object
- Resolution
- Gossiperl::Client::Messaging
- Defined in:
- lib/gossiperl_client/headers.rb,
lib/gossiperl_client/messaging.rb
Constant Summary
Constants inherited from Resolution
Instance Method Summary collapse
- #digest_ack(digest) ⇒ Object
- #digest_exit ⇒ Object
- #digest_forwarded_ack(digest_id) ⇒ Object
- #digest_subscribe(event_types) ⇒ Object
- #digest_unsubscribe(event_types) ⇒ Object
- #get_callback_block ⇒ Object
-
#initialize(worker, &block) ⇒ Messaging
constructor
A new instance of Messaging.
- #send(digest) ⇒ Object
- #start ⇒ Object
Methods inherited from Resolution
Constructor Details
#initialize(worker, &block) ⇒ Messaging
Returns a new instance of Messaging.
9 10 11 12 |
# File 'lib/gossiperl_client/messaging.rb', line 9 def initialize worker, &block self.worker = worker @callback_block = block end |
Instance Method Details
#digest_ack(digest) ⇒ Object
72 73 74 75 76 77 78 79 |
# File 'lib/gossiperl_client/messaging.rb', line 72 def digest_ack digest ack = ::Gossiperl::Client::Thrift::DigestAck.new ack.name = self.worker.[:client_name].to_s ack.heartbeat = Time.now.to_i ack.reply_id = digest.id ack.membership = [] self.send ack end |
#digest_exit ⇒ Object
109 110 111 112 113 114 115 116 |
# File 'lib/gossiperl_client/messaging.rb', line 109 def digest_exit digest = ::Gossiperl::Client::Thrift::DigestExit.new digest.name = self.worker.[:client_name].to_s digest.heartbeat = Time.now.to_i digest.secret = self.worker.[:client_secret].to_s self.send digest self.worker.working = false end |
#digest_forwarded_ack(digest_id) ⇒ Object
81 82 83 84 85 86 87 |
# File 'lib/gossiperl_client/messaging.rb', line 81 def digest_forwarded_ack digest_id ack = ::Gossiperl::Client::Thrift::DigestForwardedAck.new ack.name = self.worker.[:client_name].to_s ack.secret = self.worker.[:client_secret].to_s ack.reply_id = digest_id self.send ack end |
#digest_subscribe(event_types) ⇒ Object
89 90 91 92 93 94 95 96 97 |
# File 'lib/gossiperl_client/messaging.rb', line 89 def digest_subscribe event_types digest = ::Gossiperl::Client::Thrift::DigestSubscribe.new digest.name = self.worker.[:client_name].to_s digest.secret = self.worker.[:client_secret].to_s digest.id = SecureRandom.uuid.to_s digest.heartbeat = Time.now.to_i digest.event_types = event_types.map{|item| item.to_s} self.send digest end |
#digest_unsubscribe(event_types) ⇒ Object
99 100 101 102 103 104 105 106 107 |
# File 'lib/gossiperl_client/messaging.rb', line 99 def digest_unsubscribe event_types digest = ::Gossiperl::Client::Thrift::DigestUnsubscribe.new digest.name = self.worker.[:client_name].to_s digest.secret = self.worker.[:client_secret].to_s digest.id = SecureRandom.uuid.to_s digest.heartbeat = Time.now.to_i digest.event_types = event_types.map{|item| item.to_s} self.send digest end |
#get_callback_block ⇒ Object
14 15 16 |
# File 'lib/gossiperl_client/messaging.rb', line 14 def get_callback_block @callback_block end |
#send(digest) ⇒ Object
68 69 70 |
# File 'lib/gossiperl_client/messaging.rb', line 68 def send digest self.transport.send digest end |
#start ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/gossiperl_client/messaging.rb', line 18 def start self.transport = Gossiperl::Client::Transport::Udp.new( self.worker ) if self.worker..has_key?(:thrift_window) self.transport.recv_buf_size = self.worker.[:thrift_window] end Thread.new(self) do |msg| msg.transport.handle do |data| if data.kind_of? Hash if data.has_key?(:error) msg.worker.process_event( { :event => :failed, :error => data[:error] } ) elsif data.has_key?(:forward) msg.worker.process_event( { :event => :forwarded, :digest => data[:envelope], :digest_type => data[:type] } ) msg.digest_forwarded_ack data[:envelope].id else msg.worker.process_event( { :event => :failed, :error => { :unsupported_hash_response => data } } ) end else if data.is_a?( Gossiperl::Client::Thrift::Digest ) msg.digest_ack data elsif data.is_a?( Gossiperl::Client::Thrift::DigestAck ) msg.worker.state.receive data elsif data.is_a?( Gossiperl::Client::Thrift::DigestEvent ) msg.worker.process_event( { :event => :event, :details => { :type => data.event_type, :member => data.event_object, :heartbeat => data.heartbeat } } ) elsif data.is_a?( Gossiperl::Client::Thrift::DigestSubscribeAck ) msg.worker.process_event( { :event => :subscribed, :details => { :types => data.event_types.map{|item| item.to_sym}, :heartbeat => data.heartbeat } } ) elsif data.is_a?( Gossiperl::Client::Thrift::DigestUnsubscribeAck ) msg.worker.process_event( { :event => :unsubscribed, :details => { :types => data.event_types.map{|item| item.to_sym}, :heartbeat => data.heartbeat } } ) elsif data.is_a?( Gossiperl::Client::Thrift::DigestForwardedAck ) msg.worker.process_event( { :event => :forwarded_ack, :details => { :reply_id => data.reply_id } } ) else msg.worker.process_event( { :event => :failed, :error => { :unsupported_digest => data } } ) end end end end end |