public class StatsUtil extends Object
Modifier and Type | Field and Description |
---|---|
static int |
TEN_MIN_IN_SECONDS |
static String |
TEN_MIN_IN_SECONDS_STR |
static String |
TYPE |
Constructor and Description |
---|
StatsUtil() |
Modifier and Type | Method and Description |
---|---|
static Map<String,Object> |
aggBoltExecWinStats(Map<String,Object> accStats,
Map<String,Object> newStats,
boolean includeSys)
aggregate windowed stats from a bolt executor stats with a Map of accumulated stats.
|
static Map<String,Number> |
aggBoltLatAndCount(Map<List<String>,Double> id2execAvg,
Map<List<String>,Double> id2procAvg,
Map<List<String>,Long> id2numExec)
Aggregates number executed, process latency, and execute latency across all streams.
|
static <K> Map<K,Map> |
aggBoltStreamsLatAndCount(Map<K,Double> id2execAvg,
Map<K,Double> id2procAvg,
Map<K,Long> id2numExec)
aggregate number executed and process & execute latencies.
|
static ComponentPageInfo |
aggCompExecsStats(Map exec2hostPort,
Map task2component,
Map<List<Integer>,Map<String,Object>> beats,
String window,
boolean includeSys,
String topologyId,
StormTopology topology,
String componentId)
aggregate component executor stats.
|
static Map<String,Object> |
aggCompExecStats(String window,
boolean includeSys,
Map<String,Object> accStats,
Map<String,Object> beat,
String compType)
Combines the aggregate stats of one executor with the given map, selecting the appropriate window and including system components as
specified.
|
static Map<String,Object> |
aggPreMergeCompPageBolt(Map<String,Object> beat,
String window,
boolean includeSys)
pre-merge component page bolt stats from an executor heartbeat 1.
|
static Map<String,Object> |
aggPreMergeCompPageSpout(Map<String,Object> beat,
String window,
boolean includeSys)
pre-merge component page spout stats from an executor heartbeat 1.
|
static <K,V extends Number> |
aggPreMergeTopoPageBolt(Map<String,Object> beat,
String window,
boolean includeSys)
pre-merge component stats of specified bolt id.
|
static <K,V extends Number> |
aggPreMergeTopoPageSpout(Map<String,Object> m,
String window,
boolean includeSys)
pre-merge component stats of specified spout id and returns { comp id -> comp-stats }.
|
static <K> Map<String,Map<K,Double>> |
aggregateAverages(List<Map<String,Map<K,Double>>> avgSeq,
List<Map<String,Map<K,Long>>> countSeq)
compute an weighted average from a list of average maps and a corresponding count maps extracted from a list of ExecutorSummary.
|
static <K> Map<String,Double> |
aggregateAvgStreams(Map<String,Map<K,Double>> avgs,
Map<String,Map<K,Long>> counts)
aggregate weighted average of all streams.
|
static <T> Map<String,Map> |
aggregateBoltStats(List<ExecutorSummary> statsSeq,
boolean includeSys)
aggregate bolt stats.
|
static Map<String,Map> |
aggregateBoltStreams(Map<String,Map> stats)
aggregate all bolt streams.
|
static <T> Map<String,Map<String,Map<T,Long>>> |
aggregateCommonStats(List<ExecutorSummary> statsSeq)
aggregate common stats from a spout/bolt, called in aggregateSpoutStats/aggregateBoltStats.
|
static Map<String,Object> |
aggregateCompStats(String window,
boolean includeSys,
List<Map<String,Object>> beats,
String compType)
Aggregate the stats for a component over a given window of time.
|
static <T> Map<String,Map<T,Long>> |
aggregateCounts(List<Map<String,Map<T,Long>>> countsSeq)
aggregate a list of count maps into one map.
|
static <K,V extends Number> |
aggregateCountStreams(Map<String,Map<K,V>> stats)
aggregate count streams by window.
|
static Map<String,Map> |
aggregateSpoutStats(List<ExecutorSummary> statsSeq,
boolean includeSys)
aggregate spout stats.
|
static Map<String,Map> |
aggregateSpoutStreams(Map<String,Map> stats)
aggregate all spout streams.
|
static Map<String,Object> |
aggSpoutExecWinStats(Map<String,Object> accStats,
Map<String,Object> beat,
boolean includeSys)
aggregate windowed stats from a spout executor stats with a Map of accumulated stats.
|
static Map<String,Number> |
aggSpoutLatAndCount(Map<String,Double> id2compAvg,
Map<String,Long> id2numAcked)
aggregate number acked and complete latencies across all streams.
|
static <K> Map<K,Map> |
aggSpoutStreamsLatAndCount(Map<K,Double> id2compAvg,
Map<K,Long> id2acked)
Aggregates number acked and complete latencies.
|
static TopologyPageInfo |
aggTopoExecsStats(String topologyId,
Map exec2nodePort,
Map task2component,
Map<List<Integer>,Map<String,Object>> beats,
StormTopology topology,
String window,
boolean includeSys,
IStormClusterState clusterState)
aggregate topo executors stats.
|
static Map<String,Object> |
aggTopoExecStats(String window,
boolean includeSys,
Map<String,Object> accStats,
Map<String,Object> beat,
String compType)
A helper function that does the common work to aggregate stats of one executor with the given map for the topology page.
|
static List<WorkerSummary> |
aggWorkerStats(String stormId,
String stormName,
Map<Integer,String> task2Component,
Map<List<Integer>,Map<String,Object>> beats,
Map<List<Long>,List<Object>> exec2NodePort,
Map<String,String> nodeHost,
Map<WorkerSlot,WorkerResources> worker2Resources,
boolean includeSys,
boolean userAuthorized,
String filterSupervisor,
String owner)
aggregate statistics per worker for a topology.
|
static Map<String,Map> |
boltStreamsStats(List<ExecutorSummary> summs,
boolean includeSys)
aggregates bolt stream stats, returns a Map of {metric -> win -> aggregated value}.
|
static String |
componentType(StormTopology topology,
String compId)
Get the coponenet type for a give id.
|
static double |
computeBoltCapacity(List<ExecutorSummary> executorSumms)
computes max bolt capacity.
|
static double |
computeExecutorCapacity(ExecutorSummary summary)
Compute the capacity of a executor.
|
static Map<List<Integer>,Map<String,Object>> |
convertExecutorBeats(Map<ExecutorInfo,ExecutorBeat> beats)
convert thrift executor heartbeats into a java HashMap.
|
static Map<List<Integer>,ExecutorStats> |
convertExecutorsStats(Map<ExecutorInfo,ExecutorStats> stats)
convert executors stats into a HashMap, note that ExecutorStats are remained unchanged.
|
static Map<String,Object> |
convertExecutorStats(ExecutorStats stats)
convert thrift ExecutorStats structure into a java HashMap.
|
static Map<List<Integer>,Map<String,Object>> |
convertWorkerBeats(SupervisorWorkerHeartbeat workerHeartbeat)
convert
SupervisorWorkerHeartbeat to nimbus local report executor heartbeats. |
static Map<String,Object> |
convertZkExecutorHb(ExecutorBeat beat)
convert thrift ExecutorBeat into a java HashMap.
|
static Map<String,Object> |
convertZkWorkerHb(ClusterWorkerHeartbeat workerHb)
convert a thrift worker heartbeat into a java HashMap.
|
static String |
errorSubset(String errorStr) |
static List<Map<String,Object>> |
extractDataFromHb(Map executor2hostPort,
Map task2component,
Map<List<Integer>,Map<String,Object>> beats,
boolean includeSys,
StormTopology topology)
extracts a list of executor data from heart beats.
|
static List<Map<String,Object>> |
extractDataFromHb(Map executor2hostPort,
Map task2component,
Map<List<Integer>,Map<String,Object>> beats,
boolean includeSys,
StormTopology topology,
String compId)
extracts a list of executor data from heart beats.
|
static List<Map<String,Object>> |
extractNodeInfosFromHbForComp(Map<List<? extends Number>,List<Object>> exec2hostPort,
Map<Integer,String> task2component,
boolean includeSys,
String compId)
extract a list of host port info for specified component.
|
static String |
floatStr(Double n)
Convert a float to a string for display.
|
static List<ExecutorSummary> |
getFilledStats(List<ExecutorSummary> summs)
filter ExecutorSummary whose stats is null.
|
static Map<String,Object> |
mergeAggCompStatsCompPageBolt(Map<String,Object> accBoltStats,
Map<String,Object> boltStats)
merge accumulated bolt stats with pre-merged component stats.
|
static Map<String,Object> |
mergeAggCompStatsCompPageSpout(Map<String,Object> accSpoutStats,
Map<String,Object> spoutStats)
merge accumulated bolt stats with pre-merged component stats.
|
static Map<String,Object> |
mergeAggCompStatsTopoPageBolt(Map<String,Object> accBoltStats,
Map<String,Object> boltStats)
merge accumulated bolt stats with new bolt stats.
|
static Map<String,Object> |
mergeAggCompStatsTopoPageSpout(Map<String,Object> accSpoutStats,
Map<String,Object> spoutStats)
merge accumulated bolt stats with new bolt stats.
|
static Map<String,Object> |
postAggregateCompStats(Map<String,Object> compStats)
post aggregate component stats: 1.
|
static <T> Map<String,Map<String,Map<T,Long>>> |
preProcessStreamSummary(Map<String,Map<String,Map<T,Long>>> streamSummary,
boolean includeSys)
filter system streams of aggregated spout/bolt stats if necessary.
|
static Map<String,Map> |
spoutStreamsStats(List<ExecutorSummary> summs,
boolean includeSys)
aggregates spout stream stats, returns a Map of {metric -> win -> aggregated value}.
|
static ExecutorStats |
thriftifyExecutorStats(Map stats)
Convert Executor stats to thrift data structure.
|
static SupervisorWorkerHeartbeat |
thriftifyRpcWorkerHb(String stormId,
List<Long> executorId)
Used for local test.
|
static <K> Map |
windowSetConverter(Map stats,
org.apache.storm.stats.ClientStatsUtil.KeyTransformer<K> firstKeyFunc) |
public static final String TYPE
public static final int TEN_MIN_IN_SECONDS
public static final String TEN_MIN_IN_SECONDS_STR
public static Map<String,Number> aggBoltLatAndCount(Map<List<String>,Double> id2execAvg, Map<List<String>,Double> id2procAvg, Map<List<String>,Long> id2numExec)
id2execAvg
- { global stream id -> exec avg value }, e.g., {["split" "default"] 0.44313}id2procAvg
- { global stream id -> proc avg value }id2numExec
- { global stream id -> executed }public static Map<String,Number> aggSpoutLatAndCount(Map<String,Double> id2compAvg, Map<String,Long> id2numAcked)
public static <K> Map<K,Map> aggBoltStreamsLatAndCount(Map<K,Double> id2execAvg, Map<K,Double> id2procAvg, Map<K,Long> id2numExec)
public static <K> Map<K,Map> aggSpoutStreamsLatAndCount(Map<K,Double> id2compAvg, Map<K,Long> id2acked)
public static Map<String,Object> aggPreMergeCompPageBolt(Map<String,Object> beat, String window, boolean includeSys)
beat
- executor heartbeat datawindow
- specified windowincludeSys
- whether to include system streamspublic static Map<String,Object> aggPreMergeCompPageSpout(Map<String,Object> beat, String window, boolean includeSys)
beat
- executor heartbeat datawindow
- specified windowincludeSys
- whether to include system streamspublic static <K,V extends Number> Map<String,Object> aggPreMergeTopoPageBolt(Map<String,Object> beat, String window, boolean includeSys)
beat
- executor heartbeat datawindow
- specified windowincludeSys
- whether to include system streamspublic static <K,V extends Number> Map<String,Object> aggPreMergeTopoPageSpout(Map<String,Object> m, String window, boolean includeSys)
public static Map<String,Object> mergeAggCompStatsCompPageBolt(Map<String,Object> accBoltStats, Map<String,Object> boltStats)
accBoltStats
- accumulated bolt statsboltStats
- pre-merged component statspublic static Map<String,Object> mergeAggCompStatsCompPageSpout(Map<String,Object> accSpoutStats, Map<String,Object> spoutStats)
public static Map<String,Object> mergeAggCompStatsTopoPageBolt(Map<String,Object> accBoltStats, Map<String,Object> boltStats)
accBoltStats
- accumulated bolt statsboltStats
- new input bolt statspublic static Map<String,Object> mergeAggCompStatsTopoPageSpout(Map<String,Object> accSpoutStats, Map<String,Object> spoutStats)
public static Map<String,Object> aggTopoExecStats(String window, boolean includeSys, Map<String,Object> accStats, Map<String,Object> beat, String compType)
public static TopologyPageInfo aggTopoExecsStats(String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState)
topologyId
- topology idexec2nodePort
- executor -> host+porttask2component
- task -> componentbeats
- executor[start, end] -> executor heartbeattopology
- storm topologywindow
- the window to be aggregatedincludeSys
- whether to include system streamsclusterState
- cluster statepublic static <T> Map<String,Map> aggregateBoltStats(List<ExecutorSummary> statsSeq, boolean includeSys)
statsSeq
- a seq of ExecutorStatsincludeSys
- whether to include system streamspublic static Map<String,Map> aggregateSpoutStats(List<ExecutorSummary> statsSeq, boolean includeSys)
statsSeq
- a seq of ExecutorStatsincludeSys
- whether to include system streamspublic static <T> Map<String,Map<String,Map<T,Long>>> aggregateCommonStats(List<ExecutorSummary> statsSeq)
public static <T> Map<String,Map<String,Map<T,Long>>> preProcessStreamSummary(Map<String,Map<String,Map<T,Long>>> streamSummary, boolean includeSys)
public static <K,V extends Number> Map<String,Long> aggregateCountStreams(Map<String,Map<K,V>> stats)
stats
- a Map of value: {win -> stream -> value}public static <K> Map<String,Map<K,Double>> aggregateAverages(List<Map<String,Map<K,Double>>> avgSeq, List<Map<String,Map<K,Long>>> countSeq)
avgSeq
- a list of {win -> global stream id -> avg value}countSeq
- a list of {win -> global stream id -> count value}public static <K> Map<String,Double> aggregateAvgStreams(Map<String,Map<K,Double>> avgs, Map<String,Map<K,Long>> counts)
avgs
- a Map of {win -> stream -> average value}counts
- a Map of {win -> stream -> count value}public static Map<String,Map> spoutStreamsStats(List<ExecutorSummary> summs, boolean includeSys)
public static Map<String,Map> boltStreamsStats(List<ExecutorSummary> summs, boolean includeSys)
public static Map<String,Map> aggregateSpoutStreams(Map<String,Map> stats)
stats
- a Map of {metric -> win -> stream id -> value}public static Map<String,Map> aggregateBoltStreams(Map<String,Map> stats)
stats
- a Map of {metric -> win -> stream id -> value}public static Map<String,Object> aggBoltExecWinStats(Map<String,Object> accStats, Map<String,Object> newStats, boolean includeSys)
public static Map<String,Object> aggSpoutExecWinStats(Map<String,Object> accStats, Map<String,Object> beat, boolean includeSys)
public static <T> Map<String,Map<T,Long>> aggregateCounts(List<Map<String,Map<T,Long>>> countsSeq)
countsSeq
- a seq of {win -> GlobalStreamId -> value}public static Map<String,Object> aggregateCompStats(String window, boolean includeSys, List<Map<String,Object>> beats, String compType)
public static Map<String,Object> aggCompExecStats(String window, boolean includeSys, Map<String,Object> accStats, Map<String,Object> beat, String compType)
public static Map<String,Object> postAggregateCompStats(Map<String,Object> compStats)
compStats
- accumulated comp statspublic static ComponentPageInfo aggCompExecsStats(Map exec2hostPort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, String window, boolean includeSys, String topologyId, StormTopology topology, String componentId)
exec2hostPort
- a Map of {executor -> host+port}task2component
- a Map of {task id -> component}beats
- a converted HashMap of executor heartbeats, {executor -> heartbeat}window
- specified windowincludeSys
- whether to include system streamstopologyId
- topology idtopology
- storm topologycomponentId
- component idpublic static List<WorkerSummary> aggWorkerStats(String stormId, String stormName, Map<Integer,String> task2Component, Map<List<Integer>,Map<String,Object>> beats, Map<List<Long>,List<Object>> exec2NodePort, Map<String,String> nodeHost, Map<WorkerSlot,WorkerResources> worker2Resources, boolean includeSys, boolean userAuthorized, String filterSupervisor, String owner)
stormId
- topology idstormName
- storm topologytask2Component
- a Map of {task id -> component}beats
- a converted HashMap of executor heartbeats, {executor -> heartbeat}exec2NodePort
- a Map of {executor -> host+port}includeSys
- whether to include system streamsuserAuthorized
- whether the user is authorized to view topology infofilterSupervisor
- if not null, only return WorkerSummaries for that supervisorowner
- owner of the topologypublic static Map<List<Integer>,Map<String,Object>> convertExecutorBeats(Map<ExecutorInfo,ExecutorBeat> beats)
public static Map<List<Integer>,Map<String,Object>> convertWorkerBeats(SupervisorWorkerHeartbeat workerHeartbeat)
SupervisorWorkerHeartbeat
to nimbus local report executor heartbeats.public static Map<String,Object> convertZkExecutorHb(ExecutorBeat beat)
public static Map<String,Object> convertZkWorkerHb(ClusterWorkerHeartbeat workerHb)
public static Map<List<Integer>,ExecutorStats> convertExecutorsStats(Map<ExecutorInfo,ExecutorStats> stats)
public static Map<String,Object> convertExecutorStats(ExecutorStats stats)
public static List<Map<String,Object>> extractNodeInfosFromHbForComp(Map<List<? extends Number>,List<Object>> exec2hostPort, Map<Integer,String> task2component, boolean includeSys, String compId)
exec2hostPort
- {executor -> host+port}task2component
- {task id -> component}includeSys
- whether to include system streamscompId
- component idpublic static List<Map<String,Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, boolean includeSys, StormTopology topology)
public static List<Map<String,Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>,Map<String,Object>> beats, boolean includeSys, StormTopology topology, String compId)
public static double computeBoltCapacity(List<ExecutorSummary> executorSumms)
executorSumms
- a list of ExecutorSummarypublic static double computeExecutorCapacity(ExecutorSummary summary)
summary
- the stats for the executor.public static List<ExecutorSummary> getFilledStats(List<ExecutorSummary> summs)
summs
- a list of ExecutorSummarypublic static SupervisorWorkerHeartbeat thriftifyRpcWorkerHb(String stormId, List<Long> executorId)
public static ExecutorStats thriftifyExecutorStats(Map stats)
stats
- the stats in the form of a map.public static String componentType(StormTopology topology, String compId)
topology
- the topology this is a part of.compId
- the id of the component.public static String floatStr(Double n)
n
- the value to format.Copyright © 2023 The Apache Software Foundation. All rights reserved.