Storm's acker tracks completion of each tupletree with a checksum hash: each time a tuple is sent, its value is XORed into the checksum, and each time a tuple is acked its value is XORed in again. If all tuples have been successfully acked, the checksum will be zero (the odds that the checksum will be zero otherwise are vanishingly small).
You can read a bit more about the reliability mechanism elsewhere on the wiki -- this explains the internal details.
execute()
The acker is actually a regular bolt, with its execute method defined withing mk-acker-bolt
. When a new tupletree is born, the spout sends the XORed edge-ids of each tuple recipient, which the acker records in its pending
ledger. Every time an executor acks a tuple, the acker receives a partial checksum that is the XOR of the tuple's own edge-id (clearing it from the ledger) and the edge-id of each downstream tuple the executor emitted (thus entering them into the ledger).
This is accomplished as follows.
On a tick tuple, just advance pending tupletree checksums towards death and return. Otherwise, update or create the record for this tupletree:
Next, put the record), into the RotatingMap (thus resetting is countdown to expiry) and take action:
Finally, pass on an ack of our own.
RotatingMap
The acker stores pending tuples in a RotatingMap
, a simple device used in several places within Storm to efficiently time-expire a process.
The RotatingMap behaves as a HashMap, and offers the same O(1) access guarantees.
Internally, it holds several HashMaps ('buckets') of its own, each holding a cohort of records that will expire at the same time. Let's call the longest-lived bucket death row, and the most recent the nursery. Whenever a value is .put()
to the RotatingMap, it is relocated to the nursery -- and removed from any other bucket it might have been in (effectively resetting its death clock).
Whenever its owner calls .rotate()
, the RotatingMap advances each cohort one step further towards expiration. (Typically, Storm objects call rotate on every receipt of a system tick stream tuple.) If there are any key-value pairs in the former death row bucket, the RotatingMap invokes a callback (given in the constructor) for each key-value pair, letting its owner take appropriate action (eg, failing a tuple.