public class StormClusterStateImpl extends Object implements IStormClusterState
Constructor and Description |
---|
StormClusterStateImpl(IStateStorage stateStorage,
ILocalAssignmentsBackend assignmentsassignmentsBackend,
ClusterStateContext context,
boolean shouldCloseStateStorageOnDisconnect) |
Modifier and Type | Method and Description |
---|---|
void |
activateStorm(String stormId,
StormBase stormBase,
Map<String,Object> topoConf) |
List<String> |
activeKeys() |
List<String> |
activeStorms() |
void |
addNimbusHost(String nimbusId,
NimbusSummary nimbusSummary) |
void |
addPrivateWorkerKey(WorkerTokenServiceType type,
String topologyId,
long keyVersion,
PrivateWorkerKey key)
Store a new version of a private key.
|
Assignment |
assignmentInfo(String stormId,
Runnable callback)
Get the assignment based on storm id from local backend.
|
VersionedData<Assignment> |
assignmentInfoWithVersion(String stormId,
Runnable callback) |
List<String> |
assignments(Runnable callback) |
Map<String,Assignment> |
assignmentsInfo()
Get all the topologies assignments mapping stormId -> Assignment from local backend.
|
Integer |
assignmentVersion(String stormId,
Runnable callback) |
List<String> |
backpressureTopologies()
Get backpressure topologies.
|
List<String> |
blobstore(Runnable callback) |
List<String> |
blobstoreInfo(String blobKey) |
Credentials |
credentials(String stormId,
Runnable callback) |
void |
deleteTopologyProfileRequests(String stormId,
ProfileRequest profileRequest) |
void |
disconnect() |
List<ErrorInfo> |
errors(String stormId,
String componentId) |
List<String> |
errorTopologies() |
Map<ExecutorInfo,ExecutorBeat> |
executorBeats(String stormId,
Map<List<Long>,NodeInfo> executorNodePort)
need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock
overrides all the timestamps.
|
NimbusInfo |
getLeader(Runnable callback)
Get leader info from state store, which was written when a master gains leadership.
|
long |
getNextPrivateWorkerKeyVersion(WorkerTokenServiceType type,
String topologyId)
Get the next key version number that should be used for this topology id.
|
PrivateWorkerKey |
getPrivateWorkerKey(WorkerTokenServiceType type,
String topologyId,
long keyVersion)
Get a private key used to validate a token is correct.
|
List<ProfileRequest> |
getTopologyProfileRequests(String stormId) |
ClusterWorkerHeartbeat |
getWorkerHeartbeat(String stormId,
String node,
Long port) |
List<ProfileRequest> |
getWorkerProfileRequests(String stormId,
NodeInfo nodeInfo) |
List<String> |
heartbeatStorms() |
Set<String> |
idsOfTopologiesWithPrivateWorkerKeys()
Get a list of all topologyIds that currently have private worker keys stored, of any kind.
|
boolean |
isAssignmentsBackendSynchronized()
Flag to indicate if the assignments synced successfully, see
IStormClusterState.syncRemoteAssignments(Map) . |
boolean |
isPacemakerStateStore()
Flag to indicate if the Pacameker is backend store.
|
protected void |
issueCallback(AtomicReference<Runnable> cb) |
protected void |
issueMapCallback(ConcurrentHashMap<String,Runnable> callbackConcurrentHashMap,
String key) |
ErrorInfo |
lastError(String stormId,
String componentId) |
List<NimbusSummary> |
nimbuses() |
Assignment |
remoteAssignmentInfo(String stormId,
Runnable callback)
Get the assignment based on storm id from remote state store, eg: ZK.
|
void |
removeAllPrivateWorkerKeys(String topologyId)
Remove all of the worker keys for a given topology.
|
void |
removeBackpressure(String stormId)
Remove backpressure.
|
void |
removeBlobstoreKey(String blobKey) |
void |
removeExpiredPrivateWorkerKeys(String topologyId)
Remove all keys for the given topology that have expired.
|
void |
removeKeyVersion(String blobKey) |
void |
removeStorm(String stormId) |
void |
removeStormBase(String stormId) |
void |
removeWorkerBackpressure(String stormId,
String node,
Long port)
Remove worker backpressure.
|
void |
removeWorkerHeartbeat(String stormId,
String node,
Long port) |
void |
reportError(String stormId,
String componentId,
String node,
Long port,
Throwable error) |
void |
setAssignment(String stormId,
Assignment info,
Map<String,Object> topoConf) |
void |
setAssignmentsBackendSynchronized()
Mark the assignments as synced successfully, see
IStormClusterState.isAssignmentsBackendSynchronized() . |
void |
setCredentials(String stormId,
Credentials creds,
Map<String,Object> topoConf) |
void |
setTopologyLogConfig(String stormId,
LogConfig logConfig,
Map<String,Object> topoConf) |
void |
setupBackpressure(String stormId,
Map<String,Object> topoConf)
Setup backpressure.
|
void |
setupBlob(String key,
NimbusInfo nimbusInfo,
Integer versionInfo) |
void |
setupErrors(String stormId,
Map<String,Object> topoConf) |
void |
setupHeatbeats(String stormId,
Map<String,Object> topoConf) |
void |
setWorkerProfileRequest(String stormId,
ProfileRequest profileRequest) |
StormBase |
stormBase(String stormId,
Runnable callback)
Get a storm base for a topology.
|
String |
stormId(String stormName)
Get storm id from passed name, null if the name doesn't exist on cluster.
|
void |
supervisorHeartbeat(String supervisorId,
SupervisorInfo info) |
SupervisorInfo |
supervisorInfo(String supervisorId) |
List<String> |
supervisors(Runnable callback) |
void |
syncRemoteAssignments(Map<String,byte[]> remote)
Sync the remote state store assignments to local backend, used when master gains leadership, see
LeaderListenerCallback . |
void |
syncRemoteIds(Map<String,String> remote)
Sync all the active storm ids of the cluster, used now when master gains leadership.
|
void |
teardownHeartbeats(String stormId) |
void |
teardownTopologyErrors(String stormId) |
boolean |
topologyBackpressure(String stormId,
long timeoutMs,
Runnable callback)
Check whether a topology is in throttle-on status or not: if the backpresure/storm-id dir is not empty, this topology has
throttle-on, otherwise throttle-off.
|
LogConfig |
topologyLogConfig(String stormId,
Runnable cb) |
void |
updateStorm(String stormId,
StormBase newElems)
To update this function due to APersistentMap/APersistentSet is clojure's structure.
|
void |
workerHeartbeat(String stormId,
String node,
Long port,
ClusterWorkerHeartbeat info) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
allSupervisorInfo, allSupervisorInfo, getTopoId, topologyBases
public StormClusterStateImpl(IStateStorage stateStorage, ILocalAssignmentsBackend assignmentsassignmentsBackend, ClusterStateContext context, boolean shouldCloseStateStorageOnDisconnect) throws Exception
Exception
protected void issueCallback(AtomicReference<Runnable> cb)
protected void issueMapCallback(ConcurrentHashMap<String,Runnable> callbackConcurrentHashMap, String key)
public List<String> assignments(Runnable callback)
assignments
in interface IStormClusterState
public Assignment assignmentInfo(String stormId, Runnable callback)
IStormClusterState
assignmentInfo
in interface IStormClusterState
stormId
- topology idcallback
- callback functionAssignment
public Assignment remoteAssignmentInfo(String stormId, Runnable callback)
IStormClusterState
remoteAssignmentInfo
in interface IStormClusterState
stormId
- topology idcallback
- callback functionAssignment
public Map<String,Assignment> assignmentsInfo()
IStormClusterState
assignmentsInfo
in interface IStormClusterState
public void syncRemoteAssignments(Map<String,byte[]> remote)
IStormClusterState
LeaderListenerCallback
.syncRemoteAssignments
in interface IStormClusterState
remote
- assigned assignments for a specific IStormClusterState
instance, usually a supervisor/node.public boolean isAssignmentsBackendSynchronized()
IStormClusterState
IStormClusterState.syncRemoteAssignments(Map)
.isAssignmentsBackendSynchronized
in interface IStormClusterState
public boolean isPacemakerStateStore()
IStormClusterState
isPacemakerStateStore
in interface IStormClusterState
public void setAssignmentsBackendSynchronized()
IStormClusterState
IStormClusterState.isAssignmentsBackendSynchronized()
.setAssignmentsBackendSynchronized
in interface IStormClusterState
public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback)
assignmentInfoWithVersion
in interface IStormClusterState
public Integer assignmentVersion(String stormId, Runnable callback) throws Exception
assignmentVersion
in interface IStormClusterState
Exception
public List<String> blobstoreInfo(String blobKey)
blobstoreInfo
in interface IStormClusterState
public List<NimbusSummary> nimbuses()
nimbuses
in interface IStormClusterState
public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary)
addNimbusHost
in interface IStormClusterState
public List<String> activeStorms()
activeStorms
in interface IStormClusterState
public StormBase stormBase(String stormId, Runnable callback)
IStormClusterState
stormBase
in interface IStormClusterState
stormId
- the id of the topologycallback
- something to call if the data changes (best effort)public String stormId(String stormName)
IStormClusterState
stormId
in interface IStormClusterState
stormName
- storm namepublic void syncRemoteIds(Map<String,String> remote)
IStormClusterState
syncRemoteIds
in interface IStormClusterState
remote
- stormName -> stormId mappingpublic ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port)
getWorkerHeartbeat
in interface IStormClusterState
public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo)
getWorkerProfileRequests
in interface IStormClusterState
public List<ProfileRequest> getTopologyProfileRequests(String stormId)
getTopologyProfileRequests
in interface IStormClusterState
public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest)
setWorkerProfileRequest
in interface IStormClusterState
public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest)
deleteTopologyProfileRequests
in interface IStormClusterState
public Map<ExecutorInfo,ExecutorBeat> executorBeats(String stormId, Map<List<Long>,NodeInfo> executorNodePort)
executorBeats
in interface IStormClusterState
stormId
- topology idexecutorNodePort
- executor id -> node + portpublic List<String> supervisors(Runnable callback)
supervisors
in interface IStormClusterState
public SupervisorInfo supervisorInfo(String supervisorId)
supervisorInfo
in interface IStormClusterState
public void setupHeatbeats(String stormId, Map<String,Object> topoConf)
setupHeatbeats
in interface IStormClusterState
public void teardownHeartbeats(String stormId)
teardownHeartbeats
in interface IStormClusterState
public void teardownTopologyErrors(String stormId)
teardownTopologyErrors
in interface IStormClusterState
public NimbusInfo getLeader(Runnable callback)
IStormClusterState
Caution: it can not be used for fencing and is only for informational purposes because we use ZK as our backend now, which could have a overdue info of nodes.
getLeader
in interface IStormClusterState
callback
- callback funcNimbusInfo
public List<String> backpressureTopologies()
IStormClusterState
backpressureTopologies
in interface IStormClusterState
public List<String> heartbeatStorms()
heartbeatStorms
in interface IStormClusterState
public List<String> errorTopologies()
errorTopologies
in interface IStormClusterState
public void setTopologyLogConfig(String stormId, LogConfig logConfig, Map<String,Object> topoConf)
setTopologyLogConfig
in interface IStormClusterState
public LogConfig topologyLogConfig(String stormId, Runnable cb)
topologyLogConfig
in interface IStormClusterState
public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info)
workerHeartbeat
in interface IStormClusterState
public void removeWorkerHeartbeat(String stormId, String node, Long port)
removeWorkerHeartbeat
in interface IStormClusterState
public void supervisorHeartbeat(String supervisorId, SupervisorInfo info)
supervisorHeartbeat
in interface IStormClusterState
public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback)
topologyBackpressure
in interface IStormClusterState
stormId
- The topology IdtimeoutMs
- How long until the backpressure znode is invalid.callback
- The callback functionpublic void setupBackpressure(String stormId, Map<String,Object> topoConf)
IStormClusterState
setupBackpressure
in interface IStormClusterState
public void removeBackpressure(String stormId)
IStormClusterState
removeBackpressure
in interface IStormClusterState
public void removeWorkerBackpressure(String stormId, String node, Long port)
IStormClusterState
removeWorkerBackpressure
in interface IStormClusterState
public void activateStorm(String stormId, StormBase stormBase, Map<String,Object> topoConf)
activateStorm
in interface IStormClusterState
public void updateStorm(String stormId, StormBase newElems)
updateStorm
in interface IStormClusterState
public void removeStormBase(String stormId)
removeStormBase
in interface IStormClusterState
public void setAssignment(String stormId, Assignment info, Map<String,Object> topoConf)
setAssignment
in interface IStormClusterState
public void setupBlob(String key, NimbusInfo nimbusInfo, Integer versionInfo)
setupBlob
in interface IStormClusterState
public List<String> activeKeys()
activeKeys
in interface IStormClusterState
public List<String> blobstore(Runnable callback)
blobstore
in interface IStormClusterState
public void removeStorm(String stormId)
removeStorm
in interface IStormClusterState
public void removeBlobstoreKey(String blobKey)
removeBlobstoreKey
in interface IStormClusterState
public void removeKeyVersion(String blobKey)
removeKeyVersion
in interface IStormClusterState
public void setupErrors(String stormId, Map<String,Object> topoConf)
setupErrors
in interface IStormClusterState
public void reportError(String stormId, String componentId, String node, Long port, Throwable error)
reportError
in interface IStormClusterState
public List<ErrorInfo> errors(String stormId, String componentId)
errors
in interface IStormClusterState
public ErrorInfo lastError(String stormId, String componentId)
lastError
in interface IStormClusterState
public void setCredentials(String stormId, Credentials creds, Map<String,Object> topoConf)
setCredentials
in interface IStormClusterState
public Credentials credentials(String stormId, Runnable callback)
credentials
in interface IStormClusterState
public void disconnect()
disconnect
in interface IStormClusterState
public PrivateWorkerKey getPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion)
IStormClusterState
getPrivateWorkerKey
in interface IStormClusterState
type
- the type of service the key is for.topologyId
- the topology id the key is for.keyVersion
- the version of the key this is for.public void addPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion, PrivateWorkerKey key)
IStormClusterState
addPrivateWorkerKey
in interface IStormClusterState
type
- the type of service this key is for.topologyId
- the topology this key is forkeyVersion
- the version of the key this is for.key
- the key to store.public long getNextPrivateWorkerKeyVersion(WorkerTokenServiceType type, String topologyId)
IStormClusterState
getNextPrivateWorkerKeyVersion
in interface IStormClusterState
type
- the type of service this is for.topologyId
- the topology id this is for.public void removeExpiredPrivateWorkerKeys(String topologyId)
IStormClusterState
removeExpiredPrivateWorkerKeys
in interface IStormClusterState
topologyId
- the id of the topology to scan.public void removeAllPrivateWorkerKeys(String topologyId)
IStormClusterState
removeAllPrivateWorkerKeys
in interface IStormClusterState
topologyId
- the topology to clean up after.public Set<String> idsOfTopologiesWithPrivateWorkerKeys()
IStormClusterState
idsOfTopologiesWithPrivateWorkerKeys
in interface IStormClusterState
Copyright © 2023 The Apache Software Foundation. All rights reserved.