public class JmsSpout extends BaseRichSpout implements javax.jms.MessageListener
A Storm Spout
implementation that listens to a JMS topic or queue and outputs tuples based on the messages it receives.
JmsSpout
instances rely on JmsProducer
implementations to obtain the JMS ConnectionFactory
and Destination
objects necessary to connect to a JMS topic/queue.
When a JmsSpout
receives a JMS message, it delegates to an internal JmsTupleProducer
instance to create a Storm tuple from the incoming message.
Typically, developers will supply a custom JmsTupleProducer
implementation appropriate for the expected message content.
Constructor and Description |
---|
JmsSpout() |
Modifier and Type | Method and Description |
---|---|
void |
ack(Object msgId)
Ack a successfully handled message by the matching
JmsMessageID . |
void |
close()
Close the
session and connection . |
void |
declareOutputFields(OutputFieldsDeclarer declarer)
Use the
tupleProducer to determine which fields are about to be emitted. |
void |
fail(Object msgId)
Fail an unsuccessfully handled message by its
JmsMessageID . |
int |
getJmsAcknowledgeMode()
Returns the JMS Session acknowledgement mode for the JMS session associated with this spout.
|
protected javax.jms.Session |
getSession() |
boolean |
hasFailures()
Returns
true if the spout has received failures from which it has not yet recovered. |
boolean |
isDistributed() |
void |
nextTuple()
Generate the next tuple from a message.
|
void |
onMessage(javax.jms.Message msg)
javax.jms.MessageListener implementation. |
void |
open(Map conf,
TopologyContext context,
SpoutOutputCollector collector)
ISpout implementation. |
protected void |
recovered()
Marks a healthy session state.
|
void |
setDistributed(boolean isDistributed)
Sets the “distributed” mode of this spout.
|
void |
setJmsAcknowledgeMode(int mode)
Sets the JMS Session acknowledgement mode for the JMS session.
|
void |
setJmsProvider(JmsProvider provider)
Set
jmsProvider . |
void |
setJmsTupleProducer(JmsTupleProducer producer)
Set the
JmsTupleProducer implementation that will convert javax.jms.Message object to org.apache.storm.tuple.Values objects to be emitted. |
void |
setRecoveryPeriodMs(long period)
Sets the periodicity of the timer task that checks for failures and recovers the JMS session.
|
activate, deactivate
getComponentConfiguration
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
getComponentConfiguration
public void setJmsAcknowledgeMode(int mode)
Sets the JMS Session acknowledgement mode for the JMS session.
Possible values:
mode
- JMS Session Acknowledgement modeIllegalArgumentException
- if the mode is not recognized.public int getJmsAcknowledgeMode()
Returns the JMS Session acknowledgement mode for the JMS session associated with this spout. Can be either of:
Session.AUTO_ACKNOWLEDGE
Session.CLIENT_ACKNOWLEDGE
Session.DUPS_OK_ACKNOWLEDGE
Session.SESSION_TRANSACTED
public void setJmsProvider(JmsProvider provider)
Set jmsProvider
.
Set the JmsProvider
implementation that this Spout will use to connect to a JMS javax.jms.Desination
provider
- the provider to usepublic void setJmsTupleProducer(JmsTupleProducer producer)
Set the JmsTupleProducer
implementation that will convert javax.jms.Message
object to org.apache.storm.tuple.Values
objects to be emitted.
producer
- the producer instance to usepublic void onMessage(javax.jms.Message msg)
javax.jms.MessageListener
implementation.
Stored the JMS message in an internal queue for processing by the nextTuple()
method.
onMessage
in interface javax.jms.MessageListener
msg
- the message to handlepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
ISpout
implementation.
Connects the JMS spout to the configured JMS destination topic/queue.
open
in interface ISpout
conf
- The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine.context
- This object can be used to get information about this task’s place within the topology, including the task id and component id of this task, input and output information, etc.collector
- The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object.public void close()
Close the session
and connection
.
When overridden, should always call super
to finalize the active connections.
close
in interface ISpout
close
in class BaseRichSpout
public void nextTuple()
Generate the next tuple from a message.
This method polls the queue that’s being filled asynchronously by the jms connection, every POLL_INTERVAL_MS
seconds. When a message arrives, a Values
(tuple) is generated using tupleProducer
. It is emitted, and the message is saved to toCommit
and pendingMessages
for later handling.
public void ack(Object msgId)
Ack a successfully handled message by the matching JmsMessageID
.
Acking means removing the message from the pending messages collections, and if it was the oldest pending message - ack it to the mq as well, so that it’s the only one acked.
Will only be called if we’re transactional or not AUTO_ACKNOWLEDGE.
ack
in interface ISpout
ack
in class BaseRichSpout
public void fail(Object msgId)
Fail an unsuccessfully handled message by its JmsMessageID
.
Failing means dropping all pending messages and queueing a recovery attempt.
Will only be called if we’re transactional or not AUTO_ACKNOWLEDGE
fail
in interface ISpout
fail
in class BaseRichSpout
public void declareOutputFields(OutputFieldsDeclarer declarer)
Use the tupleProducer
to determine which fields are about to be emitted.
Note that nextTuple()
always emits to the default stream, and thus only fields declared for this stream are used.
declareOutputFields
in interface IComponent
declarer
- this is used to declare output stream ids, output fields, and whether or not each output stream is a direct streampublic boolean hasFailures()
Returns true
if the spout has received failures from which it has not yet recovered.
true
if there were failures, false
otherwise.protected void recovered()
Marks a healthy session state.
public void setRecoveryPeriodMs(long period)
Sets the periodicity of the timer task that checks for failures and recovers the JMS session.
period
- desired wait periodpublic boolean isDistributed()
distributed
.public void setDistributed(boolean isDistributed)
Sets the “distributed” mode of this spout.
If true
multiple instances of this spout may be created across the cluster (depending on the “parallelism_hint” in the topology configuration).
Setting this value to false
essentially means this spout will run as a singleton within the cluster (“parallelism_hint” will be ignored).
In general, this should be set to false
if the underlying JMS destination is a topic, and true
if it is a JMS queue.
isDistributed
- true
if should be distributed, false
otherwise.protected javax.jms.Session getSession()
Copyright © 2022 The Apache Software Foundation. All Rights Reserved.