Class: Concord::Computation
- Inherits:
-
Object
- Object
- Concord::Computation
- Defined in:
- lib/concord.rb
Overview
Thrift service definition. Wraps a user-defined computation.
Instance Attribute Summary collapse
-
#handler ⇒ Object
Returns the value of attribute handler.
-
#proxy_host ⇒ Object
Returns the value of attribute proxy_host.
-
#proxy_port ⇒ Object
Returns the value of attribute proxy_port.
Class Method Summary collapse
-
.serve(computation) ⇒ Object
Initialize a new ‘Computation` and start serving it.
Instance Method Summary collapse
-
#boltMetadata ⇒ Concord::Thrift::ComputationMetadata
The user-defined computation metadata.
-
#boltProcessRecords(records) ⇒ Object
Process records from upstream.
-
#boltProcessTimer(key, time) ⇒ Object
Process a timer callback from the proxy.
-
#destroy ⇒ Object
The class destructor, use this method to perform any cleanup before the proxy kills the process this instance resides in.
- #enrich_stream(stream) ⇒ Object
-
#get_state(key) ⇒ String
Retrieve a binary blob stored in the proxy state.
-
#init ⇒ Object
The initialization function, called when the framework is ready to start sending the computation records.
-
#initialize(handler: nil, proxy_host: nil, proxy_port: nil) ⇒ Computation
constructor
Initialize a new ‘Computation` and register it with the proxy.
- #register_with_scheduler ⇒ Object
-
#set_state(key, value) ⇒ Object
Store a binary blob, identified by a key, in the proxy state.
Constructor Details
#initialize(handler: nil, proxy_host: nil, proxy_port: nil) ⇒ Computation
Initialize a new ‘Computation` and register it with the proxy
87 88 89 90 91 |
# File 'lib/concord.rb', line 87 def initialize(handler: nil, proxy_host: nil, proxy_port: nil) self.handler = handler self.proxy_host = proxy_host self.proxy_port = proxy_port end |
Instance Attribute Details
#handler ⇒ Object
Returns the value of attribute handler.
81 82 83 |
# File 'lib/concord.rb', line 81 def handler @handler end |
#proxy_host ⇒ Object
Returns the value of attribute proxy_host.
81 82 83 |
# File 'lib/concord.rb', line 81 def proxy_host @proxy_host end |
#proxy_port ⇒ Object
Returns the value of attribute proxy_port.
81 82 83 |
# File 'lib/concord.rb', line 81 def proxy_port @proxy_port end |
Class Method Details
.serve(computation) ⇒ Object
Initialize a new ‘Computation` and start serving it. This is the only method directly called by users.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/concord.rb', line 102 def self.serve(computation) listen_address = ENV[::Concord::Thrift::KConcordEnvKeyClientListenAddr] proxy_address = ENV[::Concord::Thrift::KConcordEnvKeyClientProxyAddr] listen_host, listen_port = listen_address.split(':') proxy_host, proxy_port = proxy_address.split(':') handler = self.new(handler: computation, proxy_host: proxy_host, proxy_port: Integer(proxy_port)) processor = Thrift::ComputationService::Processor.new(handler) transport = ::Thrift::ServerSocket.new(listen_host, Integer(listen_port)) transport_factory = ::Thrift::FramedTransportFactory.new protocol_factory = ::Thrift::BinaryProtocolAcceleratedFactory.new # The reason the client computations MUST use a simple blocking server # is that we have process_timer and process_record both which exec as # a callback in the work thread pool which means that you might get # 2 callbacks whichs makes the code multi threaded - we guarantee single # thread for each callback server = ::Thrift::SimpleServer.new(processor, transport, transport_factory, protocol_factory) # Register with localhost proxy. Note that this method is `oneway' # which means after final TCP 'ack' it finishes. handler.register_with_scheduler server.serve end |
Instance Method Details
#boltMetadata ⇒ Concord::Thrift::ComputationMetadata
Returns The user-defined computation metadata.
178 179 180 181 182 183 184 |
# File 'lib/concord.rb', line 178 def boltMetadata = nil log_failure do = handler. end () end |
#boltProcessRecords(records) ⇒ Object
Process records from upstream. Wraps the user method in transactions, which are returned to the proxy upon completion.
134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/concord.rb', line 134 def boltProcessRecords(records) txs = [] records.each do |record| ctx = ComputationContext.new(self) log_failure do handler.process_record(ctx, record) end txs.push(ctx.transaction) end txs end |
#boltProcessTimer(key, time) ⇒ Object
Process a timer callback from the proxy. Wraps the user method in a transaction, which is returned to the proxy upon completion.
150 151 152 153 154 155 156 |
# File 'lib/concord.rb', line 150 def boltProcessTimer(key, time) ctx = ComputationContext.new(self) log_failure do handler.process_timer(ctx, key, time) end ctx.transaction end |
#destroy ⇒ Object
The class destructor, use this method to perform any cleanup before the proxy kills the process this instance resides in.
171 172 173 174 175 |
# File 'lib/concord.rb', line 171 def destroy log_failure do handler.destroy end end |
#enrich_stream(stream) ⇒ Object
230 231 232 233 234 235 236 237 238 239 |
# File 'lib/concord.rb', line 230 def enrich_stream(stream) sm = ::Concord::Thrift::StreamMetadata.new if stream.is_a?(Array) sm.name = stream.first sm.grouping = stream.last else sm.name = stream end sm end |
#get_state(key) ⇒ String
Retrieve a binary blob stored in the proxy state
189 190 191 |
# File 'lib/concord.rb', line 189 def get_state(key) proxy.getState(key) end |
#init ⇒ Object
The initialization function, called when the framework is ready to start sending the computation records. Wraps the user method in a transaction, which is returned to the proxy upon completion.
161 162 163 164 165 166 167 |
# File 'lib/concord.rb', line 161 def init ctx = ComputationContext.new(self) log_failure do handler.init(ctx) end ctx.transaction end |
#register_with_scheduler ⇒ Object
93 94 95 96 97 |
# File 'lib/concord.rb', line 93 def register_with_scheduler log_failure do proxy.registerWithScheduler(boltMetadata) end end |
#set_state(key, value) ⇒ Object
Store a binary blob, identified by a key, in the proxy state
196 197 198 |
# File 'lib/concord.rb', line 196 def set_state(key, value) proxy.setState(key, value) end |