Class: Ruote::StorageHistory

Inherits:
Object
  • Object
show all
Defined in:
lib/ruote/log/storage_history.rb

Overview

Logs the ruote engine history to the storage underlying the worker.

Warning : don’t use this history implementation when the storage is HashStorage. It will fill up your memory… Keeping history for a transient ruote is a bit overkill (IMHO).

using the StorageHistory

engine.add_service(
  'history', 'ruote/log/storage_history', 'Ruote::StorageHistory')

# ...

process_history = engine.history.by_wfid(wfid0)

final note

By default, the history is an in-memory history (see Ruote::DefaultHistory) (and it is worthless when there are multiple workers).

Constant Summary collapse

DATE_REGEX =
/!(\d{4}-\d{2}-\d{2})!/

Instance Method Summary collapse

Constructor Details

#initialize(context, options = {}) ⇒ StorageHistory

Returns a new instance of StorageHistory.



54
55
56
57
58
59
60
# File 'lib/ruote/log/storage_history.rb', line 54

def initialize(context, options={})

  @context = context
  @options = options

  @context.storage.add_type('history')
end

Instance Method Details

#by_date(date) ⇒ Object

Returns all the history events for a given day.

Takes as argument whatever is a datetime when turned to a string and parsed.



105
106
107
108
109
110
# File 'lib/ruote/log/storage_history.rb', line 105

def by_date(date)

  date = Time.parse(date.to_s).strftime('%Y-%m-%d')

  @context.storage.get_many('history', /!#{date}!/)
end

#by_process(wfid) ⇒ Object Also known as: by_wfid

Returns all the msgs for a given wfid (process instance id).



77
78
79
80
# File 'lib/ruote/log/storage_history.rb', line 77

def by_process(wfid)

  @context.storage.get_many('history', wfid)
end

#clear!Object

The history system doesn’t implement purge! so that when purge! is called on the engine, the history is not cleared.

Call this dangerous clear! method to clean out any history file.



121
122
123
124
# File 'lib/ruote/log/storage_history.rb', line 121

def clear!

  @context.storage.purge_type!('history')
end

#on_msg(msg) ⇒ Object

This method is called by the worker via the context. Successfully processed msgs are passed here.



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/ruote/log/storage_history.rb', line 129

def on_msg(msg)

  return unless accept?(msg)

  msg = msg.dup
    # a shallow copy is sufficient

  si = if fei = msg['fei']
    Ruote::FlowExpressionId.to_storage_id(fei)
  else
    msg['wfid'] || 'no_wfid'
  end

  _id = msg['_id']
  msg['original_id'] = _id
  msg['_id'] = "#{_id}!#{si}"

  msg['type'] = 'history'
  msg['original_put_at'] = msg['put_at']

  msg.delete('_rev')

  @context.storage.put(msg)
end

#rangeObject

Returns an array [ most recent date, oldest date ] (Time instances).



85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/ruote/log/storage_history.rb', line 85

def range

  ids = @context.storage.ids('history')

  #p ids.sort == ids

  fm = DATE_REGEX.match(ids.first)[1]
  lm = DATE_REGEX.match(ids.last)[1]

  first = Time.parse("#{fm} 00:00:00 UTC")
  last = Time.parse("#{lm} 00:00:00 UTC") + 24 * 3600

  [ first, last ]
end

#wfidsObject

Returns all the wfids for which there are history items (msgs) stored.



64
65
66
67
68
69
70
71
72
73
# File 'lib/ruote/log/storage_history.rb', line 64

def wfids

  wfids = @context.storage.ids('history').collect { |id|
    id.split('!').last
  }.uniq.sort

  wfids.delete('no_wfid')

  wfids
end