public class JmsSpout extends BaseRichSpout
Spout
implementation that listens to a JMS topic or
queue and outputs tuples based on the messages it receives.
JmsSpout
instances rely on JmsProducer
implementations to obtain the JMS
ConnectionFactory
and Destination
objects necessary
to connect to a JMS topic/queue.
When a JmsSpout
receives a JMS message, it delegates to an
internal JmsTupleProducer
instance to create a Storm tuple from
the incoming message.
Typically, developers will supply a custom JmsTupleProducer
implementation appropriate for the expected message content.
Constructor and Description |
---|
JmsSpout() |
Modifier and Type | Method and Description |
---|---|
void |
ack(Object msgId)
Ack a successfully handled message by the matching
JmsMessageID . |
void |
close()
Close the
session and connection . |
void |
declareOutputFields(OutputFieldsDeclarer declarer)
Use the
tupleProducer to determine which fields are about
to be emitted. |
void |
fail(Object msgId)
Fail an unsuccessfully handled message by its
JmsMessageID . |
Map<String,Object> |
getComponentConfiguration()
Declare configuration specific to this component.
|
int |
getJmsAcknowledgeMode()
Returns the JMS Session acknowledgement mode for the JMS session
associated with this spout.
|
protected javax.jms.Session |
getSession()
Returns the currently active session.
|
boolean |
isDistributed()
Returns if the spout is distributed.
|
void |
nextTuple()
Generate the next tuple from a message.
|
void |
open(Map<String,Object> conf,
TopologyContext context,
SpoutOutputCollector spoutOutputCollector)
ISpout implementation. |
void |
setDistributed(boolean isDistributed)
Sets the "distributed" mode of this spout.
|
void |
setIndividualAcks()
Set if JMS vendor supports ack-ing individual messages.
|
void |
setJmsAcknowledgeMode(int mode)
Sets the JMS Session acknowledgement mode for the JMS session.
|
void |
setJmsProvider(JmsProvider provider)
Set
jmsProvider . |
void |
setJmsTupleProducer(JmsTupleProducer producer)
Set the
JmsTupleProducer
implementation that will convert javax.jms.Message
object to org.apache.storm.tuple.Values objects
to be emitted. |
activate, deactivate
public void setJmsAcknowledgeMode(int mode)
Possible values:
Any other vendor specific modes are not supported.
mode
- JMS Session Acknowledgement modepublic int getJmsAcknowledgeMode()
Session.AUTO_ACKNOWLEDGE
Session.CLIENT_ACKNOWLEDGE
Session.DUPS_OK_ACKNOWLEDGE
Session.SESSION_TRANSACTED
public void setJmsProvider(JmsProvider provider)
jmsProvider
.
Set the JmsProvider
implementation that this Spout will use to connect to
a JMS javax.jms.Desination
provider
- the provider to usepublic void setJmsTupleProducer(JmsTupleProducer producer)
JmsTupleProducer
implementation that will convert javax.jms.Message
object to org.apache.storm.tuple.Values
objects
to be emitted.producer
- the producer instance to usepublic void setIndividualAcks()
setJmsAcknowledgeMode(int)
}.public void open(Map<String,Object> conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector)
ISpout
implementation.
Connects the JMS spout to the configured JMS destination topic/queue.
conf
- The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster
configuration on this machine.context
- This object can be used to get information about this task's place within the topology, including the task id and
component id of this task, input and output information, etc.spoutOutputCollector
- The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and
close methods. The collector is thread-safe and should be saved as an instance variable of this spout object.public void close()
session
and connection
.
When overridden, should always call super
to finalize the active connections.
close
in interface ISpout
close
in class BaseRichSpout
public void nextTuple()
This method polls the queue that's being filled asynchronously by the
jms connection, every POLL_INTERVAL_MS
seconds.
public void ack(Object msgId)
JmsMessageID
.
Acking means removing the message from the pending messages collections, and if it was the oldest pending message - ack it to the mq as well, so that it's the only one acked.
Will only be called if we're transactional or not AUTO_ACKNOWLEDGE.
ack
in interface ISpout
ack
in class BaseRichSpout
public void fail(Object msgId)
JmsMessageID
.
Failing means dropping all pending messages and queueing a recovery attempt.
Will only be called if we're transactional or not AUTO_ACKNOWLEDGE
fail
in interface ISpout
fail
in class BaseRichSpout
public void declareOutputFields(OutputFieldsDeclarer declarer)
tupleProducer
to determine which fields are about
to be emitted.
Note that nextTuple()
always emits to the default stream,
and thus only fields declared for this stream are used.
declarer
- this is used to declare output stream ids, output fields, and whether or not each output stream is a direct streampublic boolean isDistributed()
distributed
.public void setDistributed(boolean isDistributed)
If true
multiple instances of this spout may be
created across the cluster (depending on the "parallelism_hint" in the topology configuration).
Setting this value to false
essentially means this spout
will run as a singleton within the cluster ("parallelism_hint" will be ignored).
In general, this should be set to false
if the underlying
JMS destination is a topic, and true
if it is a JMS queue.
isDistributed
- true
if should be distributed, false
otherwise.protected javax.jms.Session getSession()
public Map<String,Object> getComponentConfiguration()
IComponent
TopologyBuilder
getComponentConfiguration
in interface IComponent
getComponentConfiguration
in class BaseComponent
Copyright © 2023 The Apache Software Foundation. All rights reserved.