Class: Concord::Computation
- Inherits:
-
Object
- Object
- Concord::Computation
- Defined in:
- lib/concord.rb
Overview
Thrift service definition. Wraps a user-defined computation.
Instance Attribute Summary collapse
-
#handler ⇒ Object
Returns the value of attribute handler.
-
#proxy_host ⇒ Object
Returns the value of attribute proxy_host.
-
#proxy_port ⇒ Object
Returns the value of attribute proxy_port.
Class Method Summary collapse
-
.serve(computation) ⇒ Object
Initialize a new
Computationand start serving it.
Instance Method Summary collapse
-
#boltMetadata ⇒ Concord::Thrift::ComputationMetadata
The user-defined computation metadata.
-
#boltProcessRecord(record) ⇒ Object
Process an upstream record.
-
#boltProcessTimer(key, time) ⇒ Object
Process a timer callback from the proxy.
-
#get_state(key) ⇒ String
Retrieve a binary blob stored in the proxy state.
-
#init ⇒ Object
The initialization function, called when the framework is ready to start sending the computation records.
-
#initialize(handler: nil, proxy_host: nil, proxy_port: nil) ⇒ Computation
constructor
Initialize a new
Computationand register it with the proxy. - #register_with_scheduler ⇒ Object
-
#set_state(key, value) ⇒ Object
Store a binary blob, identified by a key, in the proxy state.
Constructor Details
#initialize(handler: nil, proxy_host: nil, proxy_port: nil) ⇒ Computation
Initialize a new Computation and register it with the proxy
87 88 89 90 91 |
# File 'lib/concord.rb', line 87 def initialize(handler: nil, proxy_host: nil, proxy_port: nil) self.handler = handler self.proxy_host = proxy_host self.proxy_port = proxy_port end |
Instance Attribute Details
#handler ⇒ Object
Returns the value of attribute handler.
81 82 83 |
# File 'lib/concord.rb', line 81 def handler @handler end |
#proxy_host ⇒ Object
Returns the value of attribute proxy_host.
81 82 83 |
# File 'lib/concord.rb', line 81 def proxy_host @proxy_host end |
#proxy_port ⇒ Object
Returns the value of attribute proxy_port.
81 82 83 |
# File 'lib/concord.rb', line 81 def proxy_port @proxy_port end |
Class Method Details
.serve(computation) ⇒ Object
Initialize a new Computation and start serving it. This is the only method directly called by users.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/concord.rb', line 102 def self.serve(computation) listen_address = ENV[::Concord::Thrift::KConcordEnvKeyClientListenAddr] proxy_address = ENV[::Concord::Thrift::KConcordEnvKeyClientProxyAddr] listen_host, listen_port = listen_address.split(':') proxy_host, proxy_port = proxy_address.split(':') handler = self.new(handler: computation, proxy_host: proxy_host, proxy_port: Integer(proxy_port)) processor = Thrift::ComputationService::Processor.new(handler) transport = ::Thrift::ServerSocket.new(listen_host, Integer(listen_port)) transport_factory = ::Thrift::FramedTransportFactory.new protocol_factory = ::Thrift::BinaryProtocolAcceleratedFactory.new server = ::Thrift::ThreadedServer.new(processor, transport, transport_factory, protocol_factory) t = Thread.new{ server.serve } handler.register_with_scheduler t.join end |
Instance Method Details
#boltMetadata ⇒ Concord::Thrift::ComputationMetadata
Returns The user-defined computation metadata.
162 163 164 165 166 167 168 |
# File 'lib/concord.rb', line 162 def boltMetadata = nil log_failure do = handler. end () end |
#boltProcessRecord(record) ⇒ Object
Process an upstream record. Wraps the user method in a transaction, which is returned to the proxy upon completion.
130 131 132 133 134 135 136 |
# File 'lib/concord.rb', line 130 def boltProcessRecord(record) ctx = ComputationContext.new(self) log_failure do handler.process_record(ctx, record) end ctx.transaction end |
#boltProcessTimer(key, time) ⇒ Object
Process a timer callback from the proxy. Wraps the user method in a transaction, which is returned to the proxy upon completion.
142 143 144 145 146 147 148 |
# File 'lib/concord.rb', line 142 def boltProcessTimer(key, time) ctx = ComputationContext.new(self) log_failure do handler.process_timer(ctx, key, time) end ctx.transaction end |
#get_state(key) ⇒ String
Retrieve a binary blob stored in the proxy state
173 174 175 |
# File 'lib/concord.rb', line 173 def get_state(key) proxy.getState(key) end |
#init ⇒ Object
The initialization function, called when the framework is ready to start sending the computation records. Wraps the user method in a transaction, which is returned to the proxy upon completion.
153 154 155 156 157 158 159 |
# File 'lib/concord.rb', line 153 def init ctx = ComputationContext.new(self) log_failure do handler.init(ctx) end ctx.transaction end |
#register_with_scheduler ⇒ Object
93 94 95 96 97 |
# File 'lib/concord.rb', line 93 def register_with_scheduler log_failure do proxy.registerWithScheduler(boltMetadata) end end |
#set_state(key, value) ⇒ Object
Store a binary blob, identified by a key, in the proxy state
180 181 182 |
# File 'lib/concord.rb', line 180 def set_state(key, value) proxy.setState(key, value) end |