Package org.apache.storm.stats
Class StatsUtil
java.lang.Object
org.apache.storm.stats.StatsUtil
-
Field Summary
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionaggregate windowed stats from a bolt executor stats with a Map of accumulated stats.aggBoltLatAndCount
(Map<List<String>, Double> id2execAvg, Map<List<String>, Double> id2procAvg, Map<List<String>, Long> id2numExec) Aggregates number executed, process latency, and execute latency across all streams.aggBoltStreamsLatAndCount
(Map<K, Double> id2execAvg, Map<K, Double> id2procAvg, Map<K, Long> id2numExec) aggregate number executed and process & execute latencies.static ComponentPageInfo
aggCompExecsStats
(Map exec2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, String window, boolean includeSys, String topologyId, StormTopology topology, String componentId) aggregate component executor stats.aggCompExecStats
(String window, boolean includeSys, Map<String, Object> accStats, Map<String, Object> beat, String compType) Combines the aggregate stats of one executor with the given map, selecting the appropriate window and including system components as specified.aggPreMergeCompPageBolt
(Map<String, Object> beat, String window, boolean includeSys) pre-merge component page bolt stats from an executor heartbeat 1. computes component capacity 2. converts map keys of stats 3.aggPreMergeCompPageSpout
(Map<String, Object> beat, String window, boolean includeSys) pre-merge component page spout stats from an executor heartbeat 1. computes component capacity 2. converts map keys of stats 3.aggPreMergeTopoPageBolt
(Map<String, Object> beat, String window, boolean includeSys) pre-merge component stats of specified bolt id.aggPreMergeTopoPageSpout
(Map<String, Object> m, String window, boolean includeSys) pre-merge component stats of specified spout id and returns { comp id -> comp-stats }.compute an weighted average from a list of average maps and a corresponding count maps extracted from a list of ExecutorSummary.aggregate weighted average of all streams.aggregateBoltStats
(List<ExecutorSummary> statsSeq, boolean includeSys) aggregate bolt stats.aggregateBoltStreams
(Map<String, Map> stats) aggregate all bolt streams.aggregateCommonStats
(List<ExecutorSummary> statsSeq) aggregate common stats from a spout/bolt, called in aggregateSpoutStats/aggregateBoltStats.aggregateCompStats
(String window, boolean includeSys, List<Map<String, Object>> beats, String compType) Aggregate the stats for a component over a given window of time.aggregate a list of count maps into one map.aggregateCountStreams
(Map<String, Map<K, V>> stats) aggregate count streams by window.aggregateSpoutStats
(List<ExecutorSummary> statsSeq, boolean includeSys) aggregate spout stats.aggregateSpoutStreams
(Map<String, Map> stats) aggregate all spout streams.aggregate windowed stats from a spout executor stats with a Map of accumulated stats.aggregate number acked and complete latencies across all streams.aggSpoutStreamsLatAndCount
(Map<K, Double> id2compAvg, Map<K, Long> id2acked) Aggregates number acked and complete latencies.static TopologyPageInfo
aggTopoExecsStats
(String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState) aggregate topo executors stats.aggTopoExecStats
(String window, boolean includeSys, Map<String, Object> accStats, Map<String, Object> beat, String compType) A helper function that does the common work to aggregate stats of one executor with the given map for the topology page.static List<WorkerSummary>
aggWorkerStats
(String stormId, String stormName, Map<Integer, String> task2Component, Map<List<Integer>, Map<String, Object>> beats, Map<List<Long>, List<Object>> exec2NodePort, Map<String, String> nodeHost, Map<WorkerSlot, WorkerResources> worker2Resources, boolean includeSys, boolean userAuthorized, String filterSupervisor, String owner) aggregate statistics per worker for a topology.boltStreamsStats
(List<ExecutorSummary> summs, boolean includeSys) aggregates bolt stream stats, returns a Map of {metric -> win -> aggregated value}.static String
componentType
(StormTopology topology, String compId) Get the coponenet type for a give id.static double
computeBoltCapacity
(List<ExecutorSummary> executorSumms) computes max bolt capacity.static double
computeExecutorCapacity
(ExecutorSummary summary) Compute the capacity of a executor. approximation of the % of time spent doing real work.convert thrift executor heartbeats into a java HashMap.static Map<List<Integer>,
ExecutorStats> convert executors stats into a HashMap, note that ExecutorStats are remained unchanged.convert thrift ExecutorStats structure into a java HashMap.convertWorkerBeats
(SupervisorWorkerHeartbeat workerHeartbeat) convertSupervisorWorkerHeartbeat
to nimbus local report executor heartbeats.convert thrift ExecutorBeat into a java HashMap.convertZkWorkerHb
(ClusterWorkerHeartbeat workerHb) convert a thrift worker heartbeat into a java HashMap.static String
errorSubset
(String errorStr) extractDataFromHb
(Map executor2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, boolean includeSys, StormTopology topology) extracts a list of executor data from heart beats.extractDataFromHb
(Map executor2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, boolean includeSys, StormTopology topology, String compId) extracts a list of executor data from heart beats.extractNodeInfosFromHbForComp
(Map<List<? extends Number>, List<Object>> exec2hostPort, Map<Integer, String> task2component, boolean includeSys, String compId) extract a list of host port info for specified component.static String
Convert a float to a string for display.static List<ExecutorSummary>
getFilledStats
(List<ExecutorSummary> summs) filter ExecutorSummary whose stats is null.merge accumulated bolt stats with pre-merged component stats.merge accumulated bolt stats with pre-merged component stats.merge accumulated bolt stats with new bolt stats.merge accumulated bolt stats with new bolt stats.postAggregateCompStats
(Map<String, Object> compStats) post aggregate component stats: 1. computes execute-latency/process-latency from execute/process latency total 2. computes windowed weight avgs 3. transform Map keysfilter system streams of aggregated spout/bolt stats if necessary.spoutStreamsStats
(List<ExecutorSummary> summs, boolean includeSys) aggregates spout stream stats, returns a Map of {metric -> win -> aggregated value}.static ExecutorStats
thriftifyExecutorStats
(Map stats) Convert Executor stats to thrift data structure.static SupervisorWorkerHeartbeat
thriftifyRpcWorkerHb
(String stormId, List<Long> executorId) Used for local test.static <K> Map
windowSetConverter
(Map stats, org.apache.storm.stats.ClientStatsUtil.KeyTransformer<K> firstKeyFunc)
-
Field Details
-
TYPE
- See Also:
-
TEN_MIN_IN_SECONDS
public static final int TEN_MIN_IN_SECONDS- See Also:
-
TEN_MIN_IN_SECONDS_STR
- See Also:
-
-
Constructor Details
-
StatsUtil
public StatsUtil()
-
-
Method Details
-
aggBoltLatAndCount
public static Map<String,Number> aggBoltLatAndCount(Map<List<String>, Double> id2execAvg, Map<List<String>, Double> id2procAvg, Map<List<String>, Long> id2numExec) Aggregates number executed, process latency, and execute latency across all streams.- Parameters:
id2execAvg
- { global stream id -> exec avg value }, e.g., {["split" "default"] 0.44313}id2procAvg
- { global stream id -> proc avg value }id2numExec
- { global stream id -> executed }
-
aggSpoutLatAndCount
public static Map<String,Number> aggSpoutLatAndCount(Map<String, Double> id2compAvg, Map<String, Long> id2numAcked) aggregate number acked and complete latencies across all streams. -
aggBoltStreamsLatAndCount
public static <K> Map<K,Map> aggBoltStreamsLatAndCount(Map<K, Double> id2execAvg, Map<K, Double> id2procAvg, Map<K, Long> id2numExec) aggregate number executed and process & execute latencies. -
aggSpoutStreamsLatAndCount
public static <K> Map<K,Map> aggSpoutStreamsLatAndCount(Map<K, Double> id2compAvg, Map<K, Long> id2acked) Aggregates number acked and complete latencies. -
aggPreMergeCompPageBolt
public static Map<String,Object> aggPreMergeCompPageBolt(Map<String, Object> beat, String window, boolean includeSys) pre-merge component page bolt stats from an executor heartbeat 1. computes component capacity 2. converts map keys of stats 3. filters streams if necessary- Parameters:
beat
- executor heartbeat datawindow
- specified windowincludeSys
- whether to include system streams- Returns:
- per-merged stats
-
aggPreMergeCompPageSpout
public static Map<String,Object> aggPreMergeCompPageSpout(Map<String, Object> beat, String window, boolean includeSys) pre-merge component page spout stats from an executor heartbeat 1. computes component capacity 2. converts map keys of stats 3. filters streams if necessary- Parameters:
beat
- executor heartbeat datawindow
- specified windowincludeSys
- whether to include system streams- Returns:
- per-merged stats
-
aggPreMergeTopoPageBolt
public static <K,V extends Number> Map<String,Object> aggPreMergeTopoPageBolt(Map<String, Object> beat, String window, boolean includeSys) pre-merge component stats of specified bolt id.- Parameters:
beat
- executor heartbeat datawindow
- specified windowincludeSys
- whether to include system streams- Returns:
- { comp id -> comp-stats }
-
aggPreMergeTopoPageSpout
public static <K,V extends Number> Map<String,Object> aggPreMergeTopoPageSpout(Map<String, Object> m, String window, boolean includeSys) pre-merge component stats of specified spout id and returns { comp id -> comp-stats }. -
mergeAggCompStatsCompPageBolt
public static Map<String,Object> mergeAggCompStatsCompPageBolt(Map<String, Object> accBoltStats, Map<String, Object> boltStats) merge accumulated bolt stats with pre-merged component stats.- Parameters:
accBoltStats
- accumulated bolt statsboltStats
- pre-merged component stats- Returns:
- merged stats
-
mergeAggCompStatsCompPageSpout
public static Map<String,Object> mergeAggCompStatsCompPageSpout(Map<String, Object> accSpoutStats, Map<String, Object> spoutStats) merge accumulated bolt stats with pre-merged component stats. -
mergeAggCompStatsTopoPageBolt
public static Map<String,Object> mergeAggCompStatsTopoPageBolt(Map<String, Object> accBoltStats, Map<String, Object> boltStats) merge accumulated bolt stats with new bolt stats.- Parameters:
accBoltStats
- accumulated bolt statsboltStats
- new input bolt stats- Returns:
- merged bolt stats
-
mergeAggCompStatsTopoPageSpout
public static Map<String,Object> mergeAggCompStatsTopoPageSpout(Map<String, Object> accSpoutStats, Map<String, Object> spoutStats) merge accumulated bolt stats with new bolt stats. -
aggTopoExecStats
public static Map<String,Object> aggTopoExecStats(String window, boolean includeSys, Map<String, Object> accStats, Map<String, Object> beat, String compType) A helper function that does the common work to aggregate stats of one executor with the given map for the topology page. -
aggTopoExecsStats
public static TopologyPageInfo aggTopoExecsStats(String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState) aggregate topo executors stats.- Parameters:
topologyId
- topology idexec2nodePort
- executor -> host+porttask2component
- task -> componentbeats
- executor[start, end] -> executor heartbeattopology
- storm topologywindow
- the window to be aggregatedincludeSys
- whether to include system streamsclusterState
- cluster state- Returns:
- TopologyPageInfo thrift structure
-
aggregateBoltStats
public static <T> Map<String,Map> aggregateBoltStats(List<ExecutorSummary> statsSeq, boolean includeSys) aggregate bolt stats.- Parameters:
statsSeq
- a seq of ExecutorStatsincludeSys
- whether to include system streams- Returns:
- aggregated bolt stats: {metric -> win -> global stream id -> value}
-
aggregateSpoutStats
public static Map<String,Map> aggregateSpoutStats(List<ExecutorSummary> statsSeq, boolean includeSys) aggregate spout stats.- Parameters:
statsSeq
- a seq of ExecutorStatsincludeSys
- whether to include system streams- Returns:
- aggregated spout stats: {metric -> win -> global stream id -> value}
-
aggregateCommonStats
public static <T> Map<String,Map<String, aggregateCommonStatsMap<T, Long>>> (List<ExecutorSummary> statsSeq) aggregate common stats from a spout/bolt, called in aggregateSpoutStats/aggregateBoltStats. -
preProcessStreamSummary
public static <T> Map<String,Map<String, preProcessStreamSummaryMap<T, Long>>> (Map<String, Map<String, Map<T, Long>>> streamSummary, boolean includeSys) filter system streams of aggregated spout/bolt stats if necessary. -
aggregateCountStreams
public static <K,V extends Number> Map<String,Long> aggregateCountStreams(Map<String, Map<K, V>> stats) aggregate count streams by window.- Parameters:
stats
- a Map of value: {win -> stream -> value}- Returns:
- a Map of value: {win -> value}
-
aggregateAverages
public static <K> Map<String,Map<K, aggregateAveragesDouble>> (List<Map<String, Map<K, Double>>> avgSeq, List<Map<String, Map<K, Long>>> countSeq) compute an weighted average from a list of average maps and a corresponding count maps extracted from a list of ExecutorSummary.- Parameters:
avgSeq
- a list of {win -> global stream id -> avg value}countSeq
- a list of {win -> global stream id -> count value}- Returns:
- a Map of {win -> global stream id -> weighted avg value}
-
aggregateAvgStreams
public static <K> Map<String,Double> aggregateAvgStreams(Map<String, Map<K, Double>> avgs, Map<String, Map<K, Long>> counts) aggregate weighted average of all streams.- Parameters:
avgs
- a Map of {win -> stream -> average value}counts
- a Map of {win -> stream -> count value}- Returns:
- a Map of {win -> aggregated value}
-
spoutStreamsStats
aggregates spout stream stats, returns a Map of {metric -> win -> aggregated value}. -
boltStreamsStats
aggregates bolt stream stats, returns a Map of {metric -> win -> aggregated value}. -
aggregateSpoutStreams
aggregate all spout streams.- Parameters:
stats
- a Map of {metric -> win -> stream id -> value}- Returns:
- a Map of {metric -> win -> aggregated value}
-
aggregateBoltStreams
aggregate all bolt streams.- Parameters:
stats
- a Map of {metric -> win -> stream id -> value}- Returns:
- a Map of {metric -> win -> aggregated value}
-
aggBoltExecWinStats
public static Map<String,Object> aggBoltExecWinStats(Map<String, Object> accStats, Map<String, Object> newStats, boolean includeSys) aggregate windowed stats from a bolt executor stats with a Map of accumulated stats. -
aggSpoutExecWinStats
public static Map<String,Object> aggSpoutExecWinStats(Map<String, Object> accStats, Map<String, Object> beat, boolean includeSys) aggregate windowed stats from a spout executor stats with a Map of accumulated stats. -
aggregateCounts
aggregate a list of count maps into one map.- Parameters:
countsSeq
- a seq of {win -> GlobalStreamId -> value}
-
aggregateCompStats
public static Map<String,Object> aggregateCompStats(String window, boolean includeSys, List<Map<String, Object>> beats, String compType) Aggregate the stats for a component over a given window of time. -
aggCompExecStats
public static Map<String,Object> aggCompExecStats(String window, boolean includeSys, Map<String, Object> accStats, Map<String, Object> beat, String compType) Combines the aggregate stats of one executor with the given map, selecting the appropriate window and including system components as specified. -
postAggregateCompStats
post aggregate component stats: 1. computes execute-latency/process-latency from execute/process latency total 2. computes windowed weight avgs 3. transform Map keys- Parameters:
compStats
- accumulated comp stats
-
aggCompExecsStats
public static ComponentPageInfo aggCompExecsStats(Map exec2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, String window, boolean includeSys, String topologyId, StormTopology topology, String componentId) aggregate component executor stats.- Parameters:
exec2hostPort
- a Map of {executor -> host+port}task2component
- a Map of {task id -> component}beats
- a converted HashMap of executor heartbeats, {executor -> heartbeat}window
- specified windowincludeSys
- whether to include system streamstopologyId
- topology idtopology
- storm topologycomponentId
- component id- Returns:
- ComponentPageInfo thrift structure
-
aggWorkerStats
public static List<WorkerSummary> aggWorkerStats(String stormId, String stormName, Map<Integer, String> task2Component, Map<List<Integer>, Map<String, Object>> beats, Map<List<Long>, List<Object>> exec2NodePort, Map<String, String> nodeHost, Map<WorkerSlot, WorkerResources> worker2Resources, boolean includeSys, boolean userAuthorized, String filterSupervisor, String owner) aggregate statistics per worker for a topology. Optionally filtering on specific supervisors- Parameters:
stormId
- topology idstormName
- storm topologytask2Component
- a Map of {task id -> component}beats
- a converted HashMap of executor heartbeats, {executor -> heartbeat}exec2NodePort
- a Map of {executor -> host+port}includeSys
- whether to include system streamsuserAuthorized
- whether the user is authorized to view topology infofilterSupervisor
- if not null, only return WorkerSummaries for that supervisorowner
- owner of the topology
-
convertExecutorBeats
public static Map<List<Integer>,Map<String, convertExecutorBeatsObject>> (Map<ExecutorInfo, ExecutorBeat> beats) convert thrift executor heartbeats into a java HashMap. -
convertWorkerBeats
public static Map<List<Integer>,Map<String, convertWorkerBeatsObject>> (SupervisorWorkerHeartbeat workerHeartbeat) convertSupervisorWorkerHeartbeat
to nimbus local report executor heartbeats. -
convertZkExecutorHb
convert thrift ExecutorBeat into a java HashMap. -
convertZkWorkerHb
convert a thrift worker heartbeat into a java HashMap. -
convertExecutorsStats
public static Map<List<Integer>,ExecutorStats> convertExecutorsStats(Map<ExecutorInfo, ExecutorStats> stats) convert executors stats into a HashMap, note that ExecutorStats are remained unchanged. -
convertExecutorStats
convert thrift ExecutorStats structure into a java HashMap. -
extractNodeInfosFromHbForComp
public static List<Map<String,Object>> extractNodeInfosFromHbForComp(Map<List<? extends Number>, List<Object>> exec2hostPort, Map<Integer, String> task2component, boolean includeSys, String compId) extract a list of host port info for specified component.- Parameters:
exec2hostPort
- {executor -> host+port}task2component
- {task id -> component}includeSys
- whether to include system streamscompId
- component id- Returns:
- a list of host+port
-
extractDataFromHb
public static List<Map<String,Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, boolean includeSys, StormTopology topology) extracts a list of executor data from heart beats. -
extractDataFromHb
public static List<Map<String,Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, boolean includeSys, StormTopology topology, String compId) extracts a list of executor data from heart beats. -
computeBoltCapacity
computes max bolt capacity.- Parameters:
executorSumms
- a list of ExecutorSummary- Returns:
- max bolt capacity
-
computeExecutorCapacity
Compute the capacity of a executor. approximation of the % of time spent doing real work.- Parameters:
summary
- the stats for the executor.- Returns:
- the capacity of the executor.
-
getFilledStats
filter ExecutorSummary whose stats is null.- Parameters:
summs
- a list of ExecutorSummary- Returns:
- filtered summs
-
thriftifyRpcWorkerHb
Used for local test. -
thriftifyExecutorStats
Convert Executor stats to thrift data structure.- Parameters:
stats
- the stats in the form of a map.- Returns:
- teh thrift structure for the stats.
-
componentType
Get the coponenet type for a give id.- Parameters:
topology
- the topology this is a part of.compId
- the id of the component.- Returns:
- the type as a String "BOLT" or "SPOUT".
-
floatStr
Convert a float to a string for display.- Parameters:
n
- the value to format.- Returns:
- the string ready for display.
-
errorSubset
-
windowSetConverter
-