Class Tap::Joins::Sync
In: lib/tap/joins/sync.rb
Parent: Join

Sync works the same as Join, but passes the collected results of the inputs (ie an array) to the outputs. The results will not be passed until all of inputs have returned. A collision results if a single input completes twice before the group completes as a whole.

Methods

call   join   new   reset  

Classes and Modules

Class Tap::Joins::Sync::Callback
Class Tap::Joins::Sync::SynchronizeError

Constants

NIL_VALUE = Object.new   NIL_VALUE is used to mark empty slots (nil itself cannot be used because it is a valid result value).

Attributes

results  [R]  An array holding results until the batch is ready to execute.

Public Class methods

[Source]

    # File lib/tap/joins/sync.rb, line 20
20:       def initialize(config={}, app=Tap::App.current)
21:         super
22:         @results = nil
23:       end

Public Instance methods

Call is called by a Callback and stores the result at the specified index in results. If the results have all been set, then they are sent to each output.

[Source]

    # File lib/tap/joins/sync.rb, line 57
57:       def call(result, index)
58:         if result == NIL_VALUE
59:           raise "NIL_VALUE cannot be passed as a result"
60:         end
61:         
62:         unless results[index] == NIL_VALUE
63:           raise SynchronizeError, "already got a result for: #{inputs[index]}"
64:         end
65:         results[index] = result
66:         
67:         unless results.include?(NIL_VALUE)
68:           outputs.each {|output| exe(output, results) }
69:           reset
70:         end
71:       end

A synchronized join sets a Callback as the join of each input. The callback is responsible for setting the result of each input into the correct ‘results’ slot.

[Source]

    # File lib/tap/joins/sync.rb, line 34
34:       def join(inputs, outputs)
35:         @inputs.each do |input|
36:           input.joins.delete_if do |join|
37:             join.kind_of?(Callback) && join.join == self
38:           end
39:         end if @inputs
40: 
41:         @inputs = inputs
42: 
43:         index = 0
44:         inputs.each do |input|
45:           input.joins << Callback.new(self, index)
46:           index += 1
47:         end if inputs
48:         reset
49:         
50:         @outputs = outputs
51:         self
52:       end

Resets results. Normally there is no reason to call this method as it will shuffle the arguments being passed through self.

[Source]

    # File lib/tap/joins/sync.rb, line 27
27:       def reset
28:         @results = inputs ? Array.new(inputs.length, NIL_VALUE) : nil
29:       end

[Validate]