Class: Concord::Computation
- Inherits:
-
Object
- Object
- Concord::Computation
- Defined in:
- lib/concord.rb
Overview
Thrift service definition. Wraps a user-defined computation.
Instance Attribute Summary collapse
-
#handler ⇒ Object
Returns the value of attribute handler.
-
#proxy_host ⇒ Object
Returns the value of attribute proxy_host.
-
#proxy_port ⇒ Object
Returns the value of attribute proxy_port.
Class Method Summary collapse
-
.serve(computation) ⇒ Object
Initialize a new
Computationand start serving it.
Instance Method Summary collapse
-
#boltMetadata ⇒ Concord::Thrift::ComputationMetadata
The user-defined computation metadata.
-
#boltProcessRecord(record) ⇒ Object
Process an upstream record.
-
#boltProcessTimer(key, time) ⇒ Object
Process a timer callback from the proxy.
- #enrich_stream(stream) ⇒ Object
-
#get_state(key) ⇒ String
Retrieve a binary blob stored in the proxy state.
-
#init ⇒ Object
The initialization function, called when the framework is ready to start sending the computation records.
-
#initialize(handler: nil, proxy_host: nil, proxy_port: nil) ⇒ Computation
constructor
Initialize a new
Computationand register it with the proxy. - #register_with_scheduler ⇒ Object
-
#set_state(key, value) ⇒ Object
Store a binary blob, identified by a key, in the proxy state.
Constructor Details
#initialize(handler: nil, proxy_host: nil, proxy_port: nil) ⇒ Computation
Initialize a new Computation and register it with the proxy
87 88 89 90 91 |
# File 'lib/concord.rb', line 87 def initialize(handler: nil, proxy_host: nil, proxy_port: nil) self.handler = handler self.proxy_host = proxy_host self.proxy_port = proxy_port end |
Instance Attribute Details
#handler ⇒ Object
Returns the value of attribute handler.
81 82 83 |
# File 'lib/concord.rb', line 81 def handler @handler end |
#proxy_host ⇒ Object
Returns the value of attribute proxy_host.
81 82 83 |
# File 'lib/concord.rb', line 81 def proxy_host @proxy_host end |
#proxy_port ⇒ Object
Returns the value of attribute proxy_port.
81 82 83 |
# File 'lib/concord.rb', line 81 def proxy_port @proxy_port end |
Class Method Details
.serve(computation) ⇒ Object
Initialize a new Computation and start serving it. This is the only method directly called by users.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/concord.rb', line 102 def self.serve(computation) listen_address = ENV[::Concord::Thrift::KConcordEnvKeyClientListenAddr] proxy_address = ENV[::Concord::Thrift::KConcordEnvKeyClientProxyAddr] listen_host, listen_port = listen_address.split(':') proxy_host, proxy_port = proxy_address.split(':') handler = self.new(handler: computation, proxy_host: proxy_host, proxy_port: Integer(proxy_port)) processor = Thrift::ComputationService::Processor.new(handler) transport = ::Thrift::ServerSocket.new(listen_host, Integer(listen_port)) transport_factory = ::Thrift::FramedTransportFactory.new protocol_factory = ::Thrift::BinaryProtocolAcceleratedFactory.new # The reason the client computations MUST use a simple blocking server # is that we have process_timer and process_record both which exec as # a callback in the work thread pool which means that you might get # 2 callbacks whichs makes the code multi threaded - we guarantee single # thread for each callback server = ::Thrift::SimpleServer.new(processor, transport, transport_factory, protocol_factory) # Register with localhost proxy. Note that this method is `oneway' # which means after final TCP 'ack' it finishes. handler.register_with_scheduler server.serve end |
Instance Method Details
#boltMetadata ⇒ Concord::Thrift::ComputationMetadata
Returns The user-defined computation metadata.
166 167 168 169 170 171 172 |
# File 'lib/concord.rb', line 166 def boltMetadata = nil log_failure do = handler. end () end |
#boltProcessRecord(record) ⇒ Object
Process an upstream record. Wraps the user method in a transaction, which is returned to the proxy upon completion.
134 135 136 137 138 139 140 |
# File 'lib/concord.rb', line 134 def boltProcessRecord(record) ctx = ComputationContext.new(self) log_failure do handler.process_record(ctx, record) end ctx.transaction end |
#boltProcessTimer(key, time) ⇒ Object
Process a timer callback from the proxy. Wraps the user method in a transaction, which is returned to the proxy upon completion.
146 147 148 149 150 151 152 |
# File 'lib/concord.rb', line 146 def boltProcessTimer(key, time) ctx = ComputationContext.new(self) log_failure do handler.process_timer(ctx, key, time) end ctx.transaction end |
#enrich_stream(stream) ⇒ Object
218 219 220 221 222 223 224 225 226 227 |
# File 'lib/concord.rb', line 218 def enrich_stream(stream) sm = ::Concord::Thrift::StreamMetadata.new if stream.is_a?(Array) sm.name = stream.first sm.grouping = stream.last else sm.name = stream end sm end |
#get_state(key) ⇒ String
Retrieve a binary blob stored in the proxy state
177 178 179 |
# File 'lib/concord.rb', line 177 def get_state(key) proxy.getState(key) end |
#init ⇒ Object
The initialization function, called when the framework is ready to start sending the computation records. Wraps the user method in a transaction, which is returned to the proxy upon completion.
157 158 159 160 161 162 163 |
# File 'lib/concord.rb', line 157 def init ctx = ComputationContext.new(self) log_failure do handler.init(ctx) end ctx.transaction end |
#register_with_scheduler ⇒ Object
93 94 95 96 97 |
# File 'lib/concord.rb', line 93 def register_with_scheduler log_failure do proxy.registerWithScheduler(boltMetadata) end end |
#set_state(key, value) ⇒ Object
Store a binary blob, identified by a key, in the proxy state
184 185 186 |
# File 'lib/concord.rb', line 184 def set_state(key, value) proxy.setState(key, value) end |