Package org.zeromq
Class ZProxy
java.lang.Object
org.zeromq.ZProxy
Implementation of a remotely controlled proxy for 0MQ, using
The goals of this implementation are to delegate the creation of sockets in a background thread via a callback interface to ensure their correct use and to provide ultimately to end-users the following features.
You can then use it like this:
ZActor
.
The goals of this implementation are to delegate the creation of sockets in a background thread via a callback interface to ensure their correct use and to provide ultimately to end-users the following features.
Basic features:
- Remote Control
- Start: if was paused, flushes the pending messages
- Pause: lets the socket queues accumulate messages according to their types
- Stop: Shutdowns the proxy, can be restarted
- Status: Retrieves the status of the proxy
- Cold Restart: Closes and recreates the connections
Hot Restart
: User-defined behavior with custom messagesConfigure
: User-defined behavior with custom messagescommand(String, boolean)
...: Custom commands of your own- Exit: Definitive shutdown of the proxy and its control
- Proxy mechanism ensured by pluggable pumps
- with built-in low-level
ZProxy.ZmqPump
(zmq.ZMQ): useful for performances - with built-in high-level
ZProxy.ZPump
(ZeroMQ): useful formessage transformation
, lower performances - with your own-custom proxy pump implementing a
1-method interface
- with built-in low-level
You can have all the above non-customizable features in about these lines of code:
final ZProxy.Proxy provider = new ZProxy.SimpleProxy()
{
public Socket create(ZContext ctx, ZProxy.Plug place, Object ... args)
{
assert ("TEST".equals(args[0]);
Socket socket = null;
if (place == ZProxy.Plug.FRONT) {
socket = ctx.createSocket(ZMQ.ROUTER);
}
if (place == ZProxy.Plug.BACK) {
socket = ctx.createSocket(ZMQ.DEALER);
}
return socket;
}
public void configure(Socket socket, ZProxy.Plug place, Object ... args)
{
assert ("TEST".equals(args[0]);
int port = -1;
if (place == ZProxy.Plug.FRONT) {
port = socket.bind("tcp://127.0.0.1:6660");
}
if (place == ZProxy.Plug.BACK) {
port = socket.bind("tcp://127.0.0.1:6661");
}
if (place == ZProxy.Plug.CAPTURE && socket != null) {
socket.bind("tcp://127.0.0.1:4263");
}
}
};
ZProxy proxy = ZProxy.newProxy("ProxyOne", provider, "ABRACADABRA", Arrays.asList("TEST"));
Once created, the proxy is not started. You have to perform first a start command on it.
This choice was made because it is easier for a user to start it with one line of code than for the code to internally handle
different possible starting states (after all, someone may want the proxy started but paused at first or configured in a specific way?)
and because the a/sync stuff was funnier. Life is unfair ...
Or maybe an idea is floating in the air?
You can then use it like this:
final boolean async = false, sync = true;
String status = null;
status = proxy.status();
status = proxy.pause(sync);
status = proxy.start(async);
status = proxy.restart(new ZMsg());
status = proxy.status(async);
status = proxy.stop(sync);
boolean here = proxy.sign();
ZMsg cfg = new ZMsg();
msg.add("CONFIG-1");
ZMsg rcvd = proxy.configure(cfg);
proxy.exit();
status = proxy.status(sync);
assert (!proxy.started());
A programmatic interface
with enums is also available.-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic enum
static enum
Possible places for sockets in the proxy.static interface
private static final class
static interface
static enum
private static final class
A specialized transport for better transmission purposes that will send each packets individually instead of the whole message.static class
A pump that reads a message as a whole before transmitting it. -
Field Summary
FieldsModifier and TypeFieldDescriptionprivate final ZAgent
static final String
private static final String
private static final AtomicInteger
private final ZStar.Exit
private static final String
static final String
private static final String
static final String
private static final String
private static final String
static final String
private static final String
private static final String
static final String
-
Constructor Summary
ConstructorsConstructorDescriptionZProxy
(String name, ZAgent.SelectorCreator selector, ZProxy.Proxy creator, String motdelafin, Object... args) Deprecated.ZProxy
(String name, ZProxy.Proxy creator, String motdelafin, Object... args) Creates a new named proxy.ZProxy
(String name, ZProxy.Proxy sockets, ZProxy.Pump pump, String motdelafin, Object... args) Creates a new named proxy.ZProxy
(ZAgent.SelectorCreator selector, ZProxy.Proxy creator, String motdelafin, Object... args) Deprecated.useZProxy(Proxy, String, Object...)
instead.ZProxy
(ZContext ctx, String name, ZAgent.SelectorCreator selector, ZProxy.Proxy sockets, ZProxy.Pump pump, String motdelafin, Object... args) Deprecated.ZProxy
(ZContext ctx, String name, ZProxy.Proxy sockets, ZProxy.Pump pump, String motdelafin, Object... args) Creates a new named proxy.ZProxy
(ZProxy.Proxy creator, String motdelafin, Object... args) Creates a new unnamed proxy. -
Method Summary
Modifier and TypeMethodDescriptionSends a command message to the proxy actor.command
(ZProxy.Command command, boolean sync) Sends a command message to the proxy actor.command
(ZProxy.Command command, ZMsg msg, boolean sync) Sends a command message to the proxy actor.Configures the proxy.exit()
Stops the proxy and exits.exit
(boolean sync) Deprecated.The call is synchronous: the sync parameter is ignored, as it leads to many mistakes in case of a provided ZContext.boolean
Binary inquiry for the status of the proxy.static ZProxy
newProxy
(ZContext ctx, String name, ZAgent.SelectorCreator selector, ZProxy.Proxy sockets, String motdelafin, Object... args) Deprecated.usenewProxy(ZContext, String, Proxy, String, Object...)
instead.static ZProxy
Creates a new low-level proxy for better performances.static ZProxy
newZProxy
(ZContext ctx, String name, ZAgent.SelectorCreator selector, ZProxy.Proxy sockets, String motdelafin, Object... args) Deprecated.static ZProxy
Creates a new proxy in a ZeroMQ way.pause
(boolean sync) Pauses the proxy.private String
Restarts the proxy.start
(boolean sync) Starts the proxy.boolean
started()
Binary inquiry for the status of the proxy.status()
Inquires for the status of the proxy.status
(boolean sync) Inquires for the status of the proxy.stop
(boolean sync) Stops the proxy.
-
Field Details
-
START
-
PAUSE
-
STOP
-
RESTART
-
EXIT
-
STATUS
-
CONFIG
-
STARTED
-
PAUSED
-
STOPPED
-
EXITED
-
ALIVE
-
counter
-
agent
-
exit
-
-
Constructor Details
-
ZProxy
@Deprecated public ZProxy(ZAgent.SelectorCreator selector, ZProxy.Proxy creator, String motdelafin, Object... args) Deprecated.useZProxy(Proxy, String, Object...)
instead.Creates a new unnamed proxy.- Parameters:
selector
- the creator of the selector used for the proxy.creator
- the creator of the sockets of the proxy.motdelafin
- the final word used to mark the end of the proxy. Null to disable this mechanism.args
- an optional array of arguments that will be passed at the creation.
-
ZProxy
Creates a new unnamed proxy.- Parameters:
creator
- the creator of the sockets of the proxy.motdelafin
- the final word used to mark the end of the proxy. Null to disable this mechanism.args
- an optional array of arguments that will be passed at the creation.
-
ZProxy
@Deprecated public ZProxy(String name, ZAgent.SelectorCreator selector, ZProxy.Proxy creator, String motdelafin, Object... args) Deprecated.useZProxy(String, Proxy, String, Object...)
instead.Creates a new named proxy.- Parameters:
name
- the name of the proxy (used in threads naming).selector
- the creator of the selector used for the proxy.creator
- the creator of the sockets of the proxy.motdelafin
- the final word used to mark the end of the proxy. Null to disable this mechanism.args
- an optional array of arguments that will be passed at the creation.
-
ZProxy
Creates a new named proxy.- Parameters:
name
- the name of the proxy (used in threads naming).creator
- the creator of the sockets of the proxy.motdelafin
- the final word used to mark the end of the proxy. Null to disable this mechanism.args
- an optional array of arguments that will be passed at the creation.
-
ZProxy
@Deprecated public ZProxy(ZContext ctx, String name, ZAgent.SelectorCreator selector, ZProxy.Proxy sockets, ZProxy.Pump pump, String motdelafin, Object... args) Deprecated.Creates a new named proxy.- Parameters:
ctx
- the main context used. If null, a new context will be created and closed at the stop of the operation. If not null, it is the responsibility of the call to close it.name
- the name of the proxy (used in threads naming).selector
- the creator of the selector used for the proxy.sockets
- the creator of the sockets of the proxy.pump
- the pump used for the proxymotdelafin
- the final word used to mark the end of the proxy. Null to disable this mechanism.args
- an optional array of arguments that will be passed at the creation.
-
ZProxy
public ZProxy(String name, ZProxy.Proxy sockets, ZProxy.Pump pump, String motdelafin, Object... args) Creates a new named proxy. A new context will be created and closed at the stop of the operation.- Parameters:
name
- the name of the proxy (used in threads naming).sockets
- the creator of the sockets of the proxy.pump
- the pump used for the proxymotdelafin
- the final word used to mark the end of the proxy. Null to disable this mechanism.args
- an optional array of arguments that will be passed at the creation.
-
ZProxy
public ZProxy(ZContext ctx, String name, ZProxy.Proxy sockets, ZProxy.Pump pump, String motdelafin, Object... args) Creates a new named proxy.- Parameters:
ctx
- the main context used. If null, a new context will be created and closed at the stop of the operation. If not null, it is the responsibility of the call to close it.name
- the name of the proxy (used in threads naming).sockets
- the creator of the sockets of the proxy.pump
- the pump used for the proxymotdelafin
- the final word used to mark the end of the proxy. Null to disable this mechanism.args
- an optional array of arguments that will be passed at the creation.
-
-
Method Details
-
newZProxy
@Deprecated public static ZProxy newZProxy(ZContext ctx, String name, ZAgent.SelectorCreator selector, ZProxy.Proxy sockets, String motdelafin, Object... args) Deprecated.Creates a new proxy in a ZeroMQ way. This proxy will be less efficient than thelow-level one
.- Parameters:
ctx
- the context used for the proxy. Possibly null, in this case a new context will be created and automatically destroyed afterwards.name
- the name of the proxy. Possibly null.selector
- the creator of the selector used for the internal polling. Not null.sockets
- the sockets creator of the proxy. Not null.motdelafin
- the final word used to mark the end of the proxy. Null to disable this mechanism.args
- an optional array of arguments that will be passed at the creation.- Returns:
- the created proxy.
-
newZProxy
public static ZProxy newZProxy(ZContext ctx, String name, ZProxy.Proxy sockets, String motdelafin, Object... args) Creates a new proxy in a ZeroMQ way. This proxy will be less efficient than thelow-level one
.- Parameters:
ctx
- the context used for the proxy. Possibly null, in this case a new context will be created and automatically destroyed afterwards.name
- the name of the proxy. Possibly null.sockets
- the sockets creator of the proxy. Not null.motdelafin
- the final word used to mark the end of the proxy. Null to disable this mechanism.args
- an optional array of arguments that will be passed at the creation.- Returns:
- the created proxy.
-
newProxy
@Deprecated public static ZProxy newProxy(ZContext ctx, String name, ZAgent.SelectorCreator selector, ZProxy.Proxy sockets, String motdelafin, Object... args) Deprecated.usenewProxy(ZContext, String, Proxy, String, Object...)
instead.Creates a new low-level proxy for better performances.- Parameters:
ctx
- the context used for the proxy. Possibly null, in this case a new context will be created and automatically destroyed afterwards.name
- the name of the proxy. Possibly null.selector
- the creator of the selector used for the internal polling. Not null.sockets
- the sockets creator of the proxy. Not null.motdelafin
- the final word used to mark the end of the proxy. Null to disable this mechanism.args
- an optional array of arguments that will be passed at the creation.- Returns:
- the created proxy.
-
newProxy
public static ZProxy newProxy(ZContext ctx, String name, ZProxy.Proxy sockets, String motdelafin, Object... args) Creates a new low-level proxy for better performances.- Parameters:
ctx
- the context used for the proxy. Possibly null, in this case a new context will be created and automatically destroyed afterwards.name
- the name of the proxy. Possibly null.sockets
- the sockets creator of the proxy. Not null.motdelafin
- the final word used to mark the end of the proxy. Null to disable this mechanism.args
- an optional array of arguments that will be passed at the creation.- Returns:
- the created proxy.
-
start
Starts the proxy.- Parameters:
sync
- true to read the status in synchronous way, false for asynchronous mode- Returns:
- the read status
-
pause
Pauses the proxy. A paused proxy will cease processing messages, causing them to be queued up and potentially hit the high-water mark on the frontend or backend socket, causing messages to be dropped, or writing applications to block.- Parameters:
sync
- true to read the status in synchronous way, false for asynchronous mode- Returns:
- the read status
-
stop
Stops the proxy.- Parameters:
sync
- true to read the status in synchronous way, false for asynchronous mode- Returns:
- the read status
-
command
Sends a command message to the proxy actor. Can be useful for programmatic interfaces. Does not works with commandsCONFIG
andRESTART
.- Parameters:
command
- the command to execute. Not null.sync
- true to read the status in synchronous way, false for asynchronous mode- Returns:
- the read status
-
command
Sends a command message to the proxy actor. Can be useful for programmatic interfaces. Does not works with commandsCONFIG
andRESTART
.- Parameters:
command
- the command to execute.sync
- true to read the status in synchronous way, false for asynchronous mode- Returns:
- the read state
-
command
Sends a command message to the proxy actor. Can be useful for programmatic interfaces. Works only with commandsCONFIG
andRESTART
.- Parameters:
command
- the command to execute.msg
- the custom message to transmit.sync
- true to read the status in synchronous way, false for asynchronous mode- Returns:
- the response message
-
configure
Configures the proxy. The distant side has to send back one (1) mandatory response message.- Parameters:
msg
- the custom message sent as configuration tip- Returns:
- the mandatory response message of the configuration.
-
restart
Restarts the proxy. Stays alive.- Parameters:
hot
- null to make a cold restart (closing then re-creation of the sockets) or a configuration message to perform a configurable hot restart,
-
exit
Deprecated.The call is synchronous: the sync parameter is ignored, as it leads to many mistakes in case of a provided ZContext.Stops the proxy and exits.- Parameters:
sync
- forced to true to read the status in synchronous way.- Returns:
- the read status.
-
exit
Stops the proxy and exits. The call is synchronous.- Returns:
- the read status.
-
status
Inquires for the status of the proxy. This call is synchronous. -
status
Inquires for the status of the proxy.- Parameters:
sync
- true to read the status in synchronous way, false for asynchronous mode. If false, you get the last cached status of the proxy
-
recvStatus
-
isStarted
public boolean isStarted()Binary inquiry for the status of the proxy. -
started
public boolean started()Binary inquiry for the status of the proxy.
-
ZProxy(String, Proxy, String, Object...)
instead.