public class DefaultMessageSender extends java.lang.Object implements MessageSender
DefaultMessageSender
handles transmission of messages to a client via WebSockets.
Messages are first processed and compressed in a separate worker thread. Processed messages are then handed off to another (separate) worker thread that delivers the messages via the WebSocket.
Each worker thread is backed by a blocking queue. One queue and thread provide messaging processing. The other queue and thread provide message transmission via the WebSocket.
Queuing (and worker threads) and compression can be disabled. Without queuing DefaultMessageSender does not use worker threads and instead executes all tasks in the calling thread.
Only message types specified in the queuedMessageTypes configuration are placed in the queues. All other message types are processed and transmitted in the calling thread.
Queued message processing and transmission can be paused and resumed. When paused, the worker threads simply remove tasks from the queue and throw them away.
Compression is done with gzip. The configuration parameter, minMessageLengthForCompression
specifies the minimum message size for compression. Messages smaller than this size are not
compressed.
WebsocketConnection
loads the configuration via Spring from app-config.xml
.Modifier and Type | Class and Description |
---|---|
private class |
DefaultMessageSender.BinaryMessageSender |
private class |
DefaultMessageSender.Preprocessor |
private class |
DefaultMessageSender.TextMessageSender |
Modifier and Type | Field and Description |
---|---|
private boolean |
compressionEnabled
If true then compress messages.
|
private boolean |
discardMessagesIfQueueFull
If true and a queue is full then discard the oldest task to make room for the new task.
|
private java.util.Set<MessageSenderListener> |
listeners |
private static Log |
logger |
private int |
maxQueueSize
The maximum size of a processing or transmission queue.
|
private int |
minMessageLengthForCompression
The minimum message size for compression.
|
private PausableThreadPoolExecutor |
preprocessorExecutor |
private java.util.concurrent.ArrayBlockingQueue<java.lang.Runnable> |
preprocessorQueue |
private java.util.Set<OutboundMessages> |
queuedMessageTypes
Message types that should be queued - and thus handled across multiple threads.
|
private boolean |
queuingEnabled
If true then use worker threads for processing and transmission.
|
private PausableThreadPoolExecutor |
senderExecutor |
private java.util.concurrent.ArrayBlockingQueue<java.lang.Runnable> |
senderQueue |
private WsOutbound |
wsOutbound |
Constructor and Description |
---|
DefaultMessageSender() |
Modifier and Type | Method and Description |
---|---|
void |
addListener(MessageSenderListener listener) |
boolean |
getDiscardMessagesIfQueueFull() |
int |
getMaxQueueSize() |
int |
getMinMessageLengthForCompression() |
java.util.Set<OutboundMessages> |
getQueuedMessageTypes() |
void |
initialize(WsOutbound wsOutbound) |
boolean |
isCompressionEnabled() |
private boolean |
isQueuedMessageType(OutboundMessages messageType) |
boolean |
isQueuingEnabled() |
private void |
notifyListeners(MessageSenderEvent.Type eventType) |
void |
pause()
Pause queued message transmission.
|
private void |
preprocessAndSendMessage(java.lang.String requestID,
OutboundMessages messageType,
java.lang.String update) |
private java.lang.String |
preprocessMessage(java.lang.String requestId,
OutboundMessages type,
java.lang.String update) |
private void |
preprocessMessageAndEnqueue(java.lang.String requestId,
OutboundMessages messageType,
java.lang.String update) |
void |
removeListener(MessageSenderListener listener) |
void |
reset() |
void |
resume() |
private void |
sendBinaryMessage(byte[] message,
OutboundMessages messageType,
int uncompressedMessageSize,
boolean fromQueue) |
void |
sendFile(java.nio.file.Path path) |
void |
sendMessage(java.lang.String requestID,
OutboundMessages messageType,
java.lang.String update) |
private void |
sendTextMessage(java.lang.String message,
OutboundMessages messageType) |
void |
setCompressionEnabled(boolean compressionEnabled) |
void |
setDiscardMessagesIfQueueFull(boolean discardMessagesIfQueueFull) |
void |
setMaxQueueSize(int maxQueueSize) |
void |
setMinMessageLengthForCompression(int minMessageLengthForCompression) |
void |
setQueuedMessageTypes(java.util.Set<OutboundMessages> queuedMessageTypes) |
void |
setQueuingEnabled(boolean queuingEnabled) |
void |
shutdown() |
private void |
submitTask(java.util.concurrent.ThreadPoolExecutor executor,
java.lang.Runnable task) |
private boolean queuingEnabled
private int maxQueueSize
discardMessagesIfQueueFull
is true then the oldest item is removed from the queue to make space for
the new item. If discardMessagesIfQueueFull
is false then the calling thread runs the task itself.private boolean discardMessagesIfQueueFull
private boolean compressionEnabled
private int minMessageLengthForCompression
private java.util.Set<OutboundMessages> queuedMessageTypes
private java.util.concurrent.ArrayBlockingQueue<java.lang.Runnable> preprocessorQueue
private PausableThreadPoolExecutor preprocessorExecutor
private java.util.concurrent.ArrayBlockingQueue<java.lang.Runnable> senderQueue
private PausableThreadPoolExecutor senderExecutor
private WsOutbound wsOutbound
private java.util.Set<MessageSenderListener> listeners
private static final Log logger
public void initialize(WsOutbound wsOutbound)
public void shutdown()
shutdown
in interface MessageSender
public void pause()
pause
in interface MessageSender
public void resume()
resume
in interface MessageSender
public void reset()
reset
in interface MessageSender
public void addListener(MessageSenderListener listener)
addListener
in interface MessageSender
public void removeListener(MessageSenderListener listener)
removeListener
in interface MessageSender
private void notifyListeners(MessageSenderEvent.Type eventType)
public void sendMessage(java.lang.String requestID, OutboundMessages messageType, java.lang.String update)
sendMessage
in interface MessageSender
public void sendFile(java.nio.file.Path path)
sendFile
in interface MessageSender
private void preprocessAndSendMessage(java.lang.String requestID, OutboundMessages messageType, java.lang.String update) throws java.io.IOException
java.io.IOException
private void preprocessMessageAndEnqueue(java.lang.String requestId, OutboundMessages messageType, java.lang.String update)
private java.lang.String preprocessMessage(java.lang.String requestId, OutboundMessages type, java.lang.String update)
private void submitTask(java.util.concurrent.ThreadPoolExecutor executor, java.lang.Runnable task) throws java.lang.InterruptedException
java.lang.InterruptedException
private void sendTextMessage(java.lang.String message, OutboundMessages messageType)
private void sendBinaryMessage(byte[] message, OutboundMessages messageType, int uncompressedMessageSize, boolean fromQueue)
private boolean isQueuedMessageType(OutboundMessages messageType)
public boolean isCompressionEnabled()
public void setCompressionEnabled(boolean compressionEnabled)
public boolean isQueuingEnabled()
public void setQueuingEnabled(boolean queuingEnabled)
public int getMaxQueueSize()
public void setMaxQueueSize(int maxQueueSize)
public boolean getDiscardMessagesIfQueueFull()
public void setDiscardMessagesIfQueueFull(boolean discardMessagesIfQueueFull)
public int getMinMessageLengthForCompression()
public void setMinMessageLengthForCompression(int minMessageLengthForCompression)
public java.util.Set<OutboundMessages> getQueuedMessageTypes()
public void setQueuedMessageTypes(java.util.Set<OutboundMessages> queuedMessageTypes)