Class: Concord::Computation
- Inherits:
-
Object
- Object
- Concord::Computation
- Defined in:
- lib/concord.rb
Overview
Thrift service definition. Wraps a user-defined computation.
Instance Attribute Summary collapse
-
#handler ⇒ Object
Returns the value of attribute handler.
-
#proxy_host ⇒ Object
Returns the value of attribute proxy_host.
-
#proxy_port ⇒ Object
Returns the value of attribute proxy_port.
Class Method Summary collapse
-
.serve(computation) ⇒ Object
Initialize a new
Computationand start serving it.
Instance Method Summary collapse
-
#boltMetadata ⇒ Concord::Thrift::ComputationMetadata
The user-defined computation metadata.
-
#boltProcessRecords(records) ⇒ Object
Process records from upstream.
-
#boltProcessTimer(key, time) ⇒ Object
Process a timer callback from the proxy.
- #enrich_stream(stream) ⇒ Object
-
#get_state(key) ⇒ String
Retrieve a binary blob stored in the proxy state.
-
#init ⇒ Object
The initialization function, called when the framework is ready to start sending the computation records.
-
#initialize(handler: nil, proxy_host: nil, proxy_port: nil) ⇒ Computation
constructor
Initialize a new
Computationand register it with the proxy. - #register_with_scheduler ⇒ Object
-
#set_state(key, value) ⇒ Object
Store a binary blob, identified by a key, in the proxy state.
Constructor Details
#initialize(handler: nil, proxy_host: nil, proxy_port: nil) ⇒ Computation
Initialize a new Computation and register it with the proxy
87 88 89 90 91 |
# File 'lib/concord.rb', line 87 def initialize(handler: nil, proxy_host: nil, proxy_port: nil) self.handler = handler self.proxy_host = proxy_host self.proxy_port = proxy_port end |
Instance Attribute Details
#handler ⇒ Object
Returns the value of attribute handler.
81 82 83 |
# File 'lib/concord.rb', line 81 def handler @handler end |
#proxy_host ⇒ Object
Returns the value of attribute proxy_host.
81 82 83 |
# File 'lib/concord.rb', line 81 def proxy_host @proxy_host end |
#proxy_port ⇒ Object
Returns the value of attribute proxy_port.
81 82 83 |
# File 'lib/concord.rb', line 81 def proxy_port @proxy_port end |
Class Method Details
.serve(computation) ⇒ Object
Initialize a new Computation and start serving it. This is the only method directly called by users.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/concord.rb', line 102 def self.serve(computation) listen_address = ENV[::Concord::Thrift::KConcordEnvKeyClientListenAddr] proxy_address = ENV[::Concord::Thrift::KConcordEnvKeyClientProxyAddr] listen_host, listen_port = listen_address.split(':') proxy_host, proxy_port = proxy_address.split(':') handler = self.new(handler: computation, proxy_host: proxy_host, proxy_port: Integer(proxy_port)) processor = Thrift::ComputationService::Processor.new(handler) transport = ::Thrift::ServerSocket.new(listen_host, Integer(listen_port)) transport_factory = ::Thrift::FramedTransportFactory.new protocol_factory = ::Thrift::BinaryProtocolAcceleratedFactory.new # The reason the client computations MUST use a simple blocking server # is that we have process_timer and process_record both which exec as # a callback in the work thread pool which means that you might get # 2 callbacks whichs makes the code multi threaded - we guarantee single # thread for each callback server = ::Thrift::SimpleServer.new(processor, transport, transport_factory, protocol_factory) # Register with localhost proxy. Note that this method is `oneway' # which means after final TCP 'ack' it finishes. handler.register_with_scheduler server.serve end |
Instance Method Details
#boltMetadata ⇒ Concord::Thrift::ComputationMetadata
Returns The user-defined computation metadata.
170 171 172 173 174 175 176 |
# File 'lib/concord.rb', line 170 def boltMetadata = nil log_failure do = handler. end () end |
#boltProcessRecords(records) ⇒ Object
Process records from upstream. Wraps the user method in transactions, which are returned to the proxy upon completion.
134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/concord.rb', line 134 def boltProcessRecords(records) txs = [] records.each do |record| ctx = ComputationContext.new(self) log_failure do handler.process_record(ctx, record) end txs.push(ctx.transaction) end txs end |
#boltProcessTimer(key, time) ⇒ Object
Process a timer callback from the proxy. Wraps the user method in a transaction, which is returned to the proxy upon completion.
150 151 152 153 154 155 156 |
# File 'lib/concord.rb', line 150 def boltProcessTimer(key, time) ctx = ComputationContext.new(self) log_failure do handler.process_timer(ctx, key, time) end ctx.transaction end |
#enrich_stream(stream) ⇒ Object
222 223 224 225 226 227 228 229 230 231 |
# File 'lib/concord.rb', line 222 def enrich_stream(stream) sm = ::Concord::Thrift::StreamMetadata.new if stream.is_a?(Array) sm.name = stream.first sm.grouping = stream.last else sm.name = stream end sm end |
#get_state(key) ⇒ String
Retrieve a binary blob stored in the proxy state
181 182 183 |
# File 'lib/concord.rb', line 181 def get_state(key) proxy.getState(key) end |
#init ⇒ Object
The initialization function, called when the framework is ready to start sending the computation records. Wraps the user method in a transaction, which is returned to the proxy upon completion.
161 162 163 164 165 166 167 |
# File 'lib/concord.rb', line 161 def init ctx = ComputationContext.new(self) log_failure do handler.init(ctx) end ctx.transaction end |
#register_with_scheduler ⇒ Object
93 94 95 96 97 |
# File 'lib/concord.rb', line 93 def register_with_scheduler log_failure do proxy.registerWithScheduler(boltMetadata) end end |
#set_state(key, value) ⇒ Object
Store a binary blob, identified by a key, in the proxy state
188 189 190 |
# File 'lib/concord.rb', line 188 def set_state(key, value) proxy.setState(key, value) end |