Package com.rabbitmq.client
Class RpcClient
- java.lang.Object
-
- com.rabbitmq.client.RpcClient
-
- All Implemented Interfaces:
java.lang.AutoCloseable
- Direct Known Subclasses:
JsonRpcClient
public class RpcClient extends java.lang.Object implements java.lang.AutoCloseable
Convenience class which manages simple RPC-style communication. The class is agnostic about the format of RPC arguments / return values. It simply provides a mechanism for sending a message to an exchange with a given routing key, and waiting for a response.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description private static class
RpcClient.IncrementingCorrelationIdSupplier
static class
RpcClient.Response
The response object is an envelope that contains all of the data provided to the `handleDelivery` consumer
-
Field Summary
Fields Modifier and Type Field Description private Channel
_channel
Channel we are communicating onprivate DefaultConsumer
_consumer
Consumer attached to our reply queueprivate java.util.Map<java.lang.String,BlockingCell<java.lang.Object>>
_continuationMap
Map from request correlation ID to continuation BlockingCellprivate java.util.function.Supplier<java.lang.String>
_correlationIdSupplier
Generates correlation ID for each request.private java.lang.String
_exchange
Exchange to send requests toprivate java.util.function.Function<java.lang.Object,RpcClient.Response>
_replyHandler
private java.lang.String
_replyTo
Queue where the server should put the replyprivate java.lang.String
_routingKey
Routing key to use for requestsprivate int
_timeout
timeout to use on call responsesprivate boolean
_useMandatory
Whether to publish RPC requests with the mandatory flag or not.private java.util.concurrent.atomic.AtomicBoolean
closed
closed flagstatic java.util.function.Function<java.lang.Object,RpcClient.Response>
DEFAULT_REPLY_HANDLER
private java.lang.String
lastCorrelationId
private static org.slf4j.Logger
LOGGER
protected static int
NO_TIMEOUT
NO_TIMEOUT value must match convention onBlockingCell.uninterruptibleGet(int)
-
Constructor Summary
Constructors Constructor Description RpcClient(Channel channel, java.lang.String exchange, java.lang.String routingKey)
Deprecated.useRpcClient(RpcClientParams)
instead, will be removed in 6.0.0RpcClient(Channel channel, java.lang.String exchange, java.lang.String routingKey, int timeout)
Deprecated.useRpcClient(RpcClientParams)
instead, will be removed in 6.0.0RpcClient(Channel channel, java.lang.String exchange, java.lang.String routingKey, java.lang.String replyTo)
Deprecated.useRpcClient(RpcClientParams)
instead, will be removed in 6.0.0RpcClient(Channel channel, java.lang.String exchange, java.lang.String routingKey, java.lang.String replyTo, int timeout)
Deprecated.useRpcClient(RpcClientParams)
instead, will be removed in 6.0.0RpcClient(RpcClientParams params)
Construct aRpcClient
with the passed-inRpcClientParams
.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description private void
checkNotClosed()
Private API - ensures the RpcClient is correctly open.void
close()
Public API - cancels the consumer, thus deleting the temporary queue, and marks the RpcClient as closed.RpcClient.Response
doCall(AMQP.BasicProperties props, byte[] message)
RpcClient.Response
doCall(AMQP.BasicProperties props, byte[] message, int timeout)
Channel
getChannel()
Retrieve the channel.Consumer
getConsumer()
Retrieve the consumer.java.util.Map<java.lang.String,BlockingCell<java.lang.Object>>
getContinuationMap()
Retrieve the continuation map.int
getCorrelationId()
Retrieve the last correlation id used.java.lang.String
getExchange()
Retrieve the exchange.java.lang.String
getRoutingKey()
Retrieve the routing key.static java.util.function.Supplier<java.lang.String>
incrementingCorrelationIdSupplier()
Creates generation IDs as a sequence of integers.static java.util.function.Supplier<java.lang.String>
incrementingCorrelationIdSupplier(java.lang.String prefix)
Creates generation IDs as a sequence of integers, with the provided prefix.java.util.Map<java.lang.String,java.lang.Object>
mapCall(java.lang.Object[] keyValuePairs)
Perform an AMQP wire-protocol-table based RPC roundtrip, first constructing the table from an array of alternating keys (in even-numbered elements, starting at zero) and values (in odd-numbered elements, starting at one)
Restrictions on value arguments apply as inmapCall(Map)
.java.util.Map<java.lang.String,java.lang.Object>
mapCall(java.util.Map<java.lang.String,java.lang.Object> message)
Perform an AMQP wire-protocol-table based RPC roundtrip
There are some restrictions on the values appearing in the table:
they must be of typeString
,LongString
,Integer
,BigDecimal
,Date
, or (recursively) aMap
of the enclosing type.byte[]
primitiveCall(byte[] message)
Perform a simple byte-array-based RPC roundtrip.byte[]
primitiveCall(AMQP.BasicProperties props, byte[] message)
byte[]
primitiveCall(AMQP.BasicProperties props, byte[] message, int timeout)
void
publish(AMQP.BasicProperties props, byte[] message)
RpcClient.Response
responseCall(byte[] message)
Perform a simple byte-array-based RPC roundtrip Useful if you need to get at more than just the body of the messageRpcClient.Response
responseCall(byte[] message, int timeout)
Perform a simple byte-array-based RPC roundtrip Useful if you need to get at more than just the body of the messageprotected DefaultConsumer
setupConsumer()
Registers a consumer on the reply queue.java.lang.String
stringCall(java.lang.String message)
Perform a simple string-based RPC roundtrip.
-
-
-
Field Detail
-
LOGGER
private static final org.slf4j.Logger LOGGER
-
_channel
private final Channel _channel
Channel we are communicating on
-
_exchange
private final java.lang.String _exchange
Exchange to send requests to
-
_routingKey
private final java.lang.String _routingKey
Routing key to use for requests
-
_replyTo
private final java.lang.String _replyTo
Queue where the server should put the reply
-
_timeout
private final int _timeout
timeout to use on call responses
-
NO_TIMEOUT
protected static final int NO_TIMEOUT
NO_TIMEOUT value must match convention onBlockingCell.uninterruptibleGet(int)
- See Also:
- Constant Field Values
-
_useMandatory
private final boolean _useMandatory
Whether to publish RPC requests with the mandatory flag or not.
-
closed
private final java.util.concurrent.atomic.AtomicBoolean closed
closed flag
-
DEFAULT_REPLY_HANDLER
public static final java.util.function.Function<java.lang.Object,RpcClient.Response> DEFAULT_REPLY_HANDLER
-
_replyHandler
private final java.util.function.Function<java.lang.Object,RpcClient.Response> _replyHandler
-
_continuationMap
private final java.util.Map<java.lang.String,BlockingCell<java.lang.Object>> _continuationMap
Map from request correlation ID to continuation BlockingCell
-
_correlationIdSupplier
private final java.util.function.Supplier<java.lang.String> _correlationIdSupplier
Generates correlation ID for each request.- Since:
- 5.9.0
-
lastCorrelationId
private java.lang.String lastCorrelationId
-
_consumer
private final DefaultConsumer _consumer
Consumer attached to our reply queue
-
-
Constructor Detail
-
RpcClient
public RpcClient(RpcClientParams params) throws java.io.IOException
Construct aRpcClient
with the passed-inRpcClientParams
.- Parameters:
params
-- Throws:
java.io.IOException
- Since:
- 5.6.0
- See Also:
RpcClientParams
-
RpcClient
@Deprecated public RpcClient(Channel channel, java.lang.String exchange, java.lang.String routingKey, java.lang.String replyTo, int timeout) throws java.io.IOException
Deprecated.useRpcClient(RpcClientParams)
instead, will be removed in 6.0.0Construct a new RpcClient that will communicate on the given channel, sending requests to the given exchange with the given routing key. Causes the creation of a temporary private autodelete queue. The name of this queue can be specified.- Parameters:
channel
- the channel to use for communicationexchange
- the exchange to connect toroutingKey
- the routing keyreplyTo
- the queue where the server should put the replytimeout
- milliseconds before timing out on wait for response- Throws:
java.io.IOException
- if an error is encountered
-
RpcClient
@Deprecated public RpcClient(Channel channel, java.lang.String exchange, java.lang.String routingKey, java.lang.String replyTo) throws java.io.IOException
Deprecated.useRpcClient(RpcClientParams)
instead, will be removed in 6.0.0Construct a new RpcClient that will communicate on the given channel, sending requests to the given exchange with the given routing key. Causes the creation of a temporary private autodelete queue. The name of the queue can be provided (only relevant for RabbitMQ servers that do not support Direct Reply-to. Waits forever for responses (that is, no timeout).- Parameters:
channel
- the channel to use for communicationexchange
- the exchange to connect toroutingKey
- the routing keyreplyTo
- the queue where the server should put the reply- Throws:
java.io.IOException
- if an error is encountered
-
RpcClient
@Deprecated public RpcClient(Channel channel, java.lang.String exchange, java.lang.String routingKey) throws java.io.IOException
Deprecated.useRpcClient(RpcClientParams)
instead, will be removed in 6.0.0Construct a new RpcClient that will communicate on the given channel, sending requests to the given exchange with the given routing key. Direct Reply-to will be used for response propagation. Waits forever for responses (that is, no timeout).- Parameters:
channel
- the channel to use for communicationexchange
- the exchange to connect toroutingKey
- the routing key- Throws:
java.io.IOException
- if an error is encountered
-
RpcClient
@Deprecated public RpcClient(Channel channel, java.lang.String exchange, java.lang.String routingKey, int timeout) throws java.io.IOException
Deprecated.useRpcClient(RpcClientParams)
instead, will be removed in 6.0.0Construct a new RpcClient that will communicate on the given channel, sending requests to the given exchange with the given routing key.
Causes the creation of a temporary private autodelete queue. The name of this queue will be "amq.rabbitmq.reply-to".- Parameters:
channel
- the channel to use for communicationexchange
- the exchange to connect toroutingKey
- the routing keytimeout
- milliseconds before timing out on wait for response- Throws:
java.io.IOException
- if an error is encountered
-
-
Method Detail
-
checkNotClosed
private void checkNotClosed() throws java.io.IOException
Private API - ensures the RpcClient is correctly open.- Throws:
java.io.IOException
- if an error is encountered
-
close
public void close() throws java.io.IOException
Public API - cancels the consumer, thus deleting the temporary queue, and marks the RpcClient as closed.- Specified by:
close
in interfacejava.lang.AutoCloseable
- Throws:
java.io.IOException
- if an error is encountered
-
setupConsumer
protected DefaultConsumer setupConsumer() throws java.io.IOException
Registers a consumer on the reply queue.- Returns:
- the newly created and registered consumer
- Throws:
java.io.IOException
- if an error is encountered
-
publish
public void publish(AMQP.BasicProperties props, byte[] message) throws java.io.IOException
- Throws:
java.io.IOException
-
doCall
public RpcClient.Response doCall(AMQP.BasicProperties props, byte[] message) throws java.io.IOException, java.util.concurrent.TimeoutException
- Throws:
java.io.IOException
java.util.concurrent.TimeoutException
-
doCall
public RpcClient.Response doCall(AMQP.BasicProperties props, byte[] message, int timeout) throws java.io.IOException, ShutdownSignalException, java.util.concurrent.TimeoutException
- Throws:
java.io.IOException
ShutdownSignalException
java.util.concurrent.TimeoutException
-
primitiveCall
public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message) throws java.io.IOException, ShutdownSignalException, java.util.concurrent.TimeoutException
- Throws:
java.io.IOException
ShutdownSignalException
java.util.concurrent.TimeoutException
-
primitiveCall
public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message, int timeout) throws java.io.IOException, ShutdownSignalException, java.util.concurrent.TimeoutException
- Throws:
java.io.IOException
ShutdownSignalException
java.util.concurrent.TimeoutException
-
primitiveCall
public byte[] primitiveCall(byte[] message) throws java.io.IOException, ShutdownSignalException, java.util.concurrent.TimeoutException
Perform a simple byte-array-based RPC roundtrip.- Parameters:
message
- the byte array request message to send- Returns:
- the byte array response received
- Throws:
ShutdownSignalException
- if the connection dies during our waitjava.io.IOException
- if an error is encounteredjava.util.concurrent.TimeoutException
- if a response is not received within the configured timeout
-
responseCall
public RpcClient.Response responseCall(byte[] message) throws java.io.IOException, ShutdownSignalException, java.util.concurrent.TimeoutException
Perform a simple byte-array-based RPC roundtrip Useful if you need to get at more than just the body of the message- Parameters:
message
- the byte array request message to send- Returns:
- The response object is an envelope that contains all of the data provided to the `handleDelivery` consumer
- Throws:
ShutdownSignalException
- if the connection dies during our waitjava.io.IOException
- if an error is encounteredjava.util.concurrent.TimeoutException
- if a response is not received within the configured timeout
-
responseCall
public RpcClient.Response responseCall(byte[] message, int timeout) throws java.io.IOException, ShutdownSignalException, java.util.concurrent.TimeoutException
Perform a simple byte-array-based RPC roundtrip Useful if you need to get at more than just the body of the message- Parameters:
message
- the byte array request message to sendtimeout
- milliseconds before timing out on wait for response- Returns:
- The response object is an envelope that contains all of the data provided to the `handleDelivery` consumer
- Throws:
ShutdownSignalException
- if the connection dies during our waitjava.io.IOException
- if an error is encounteredjava.util.concurrent.TimeoutException
- if a response is not received within the configured timeout
-
stringCall
public java.lang.String stringCall(java.lang.String message) throws java.io.IOException, ShutdownSignalException, java.util.concurrent.TimeoutException
Perform a simple string-based RPC roundtrip.- Parameters:
message
- the string request message to send- Returns:
- the string response received
- Throws:
ShutdownSignalException
- if the connection dies during our waitjava.io.IOException
- if an error is encounteredjava.util.concurrent.TimeoutException
- if a timeout occurs before the response is received
-
mapCall
public java.util.Map<java.lang.String,java.lang.Object> mapCall(java.util.Map<java.lang.String,java.lang.Object> message) throws java.io.IOException, ShutdownSignalException, java.util.concurrent.TimeoutException
Perform an AMQP wire-protocol-table based RPC roundtrip
There are some restrictions on the values appearing in the table:
they must be of typeString
,LongString
,Integer
,BigDecimal
,Date
, or (recursively) aMap
of the enclosing type.- Parameters:
message
- the table to send- Returns:
- the table received
- Throws:
ShutdownSignalException
- if the connection dies during our waitjava.io.IOException
- if an error is encounteredjava.util.concurrent.TimeoutException
- if a timeout occurs before a response is received
-
mapCall
public java.util.Map<java.lang.String,java.lang.Object> mapCall(java.lang.Object[] keyValuePairs) throws java.io.IOException, ShutdownSignalException, java.util.concurrent.TimeoutException
Perform an AMQP wire-protocol-table based RPC roundtrip, first constructing the table from an array of alternating keys (in even-numbered elements, starting at zero) and values (in odd-numbered elements, starting at one)
Restrictions on value arguments apply as inmapCall(Map)
.- Parameters:
keyValuePairs
- alternating {key, value, key, value, ...} data to send- Returns:
- the table received
- Throws:
ShutdownSignalException
- if the connection dies during our waitjava.io.IOException
- if an error is encounteredjava.util.concurrent.TimeoutException
- if a timeout occurs before a response is received
-
getChannel
public Channel getChannel()
Retrieve the channel.- Returns:
- the channel to which this client is connected
-
getExchange
public java.lang.String getExchange()
Retrieve the exchange.- Returns:
- the exchange to which this client is connected
-
getRoutingKey
public java.lang.String getRoutingKey()
Retrieve the routing key.- Returns:
- the routing key for messages to this client
-
getContinuationMap
public java.util.Map<java.lang.String,BlockingCell<java.lang.Object>> getContinuationMap()
Retrieve the continuation map.- Returns:
- the map of objects to blocking cells for this client
-
getCorrelationId
public int getCorrelationId()
Retrieve the last correlation id used.Note as of 5.9.0, correlation IDs may not always be integers (by default, they are). This method will try to parse the last correlation ID string as an integer, so this may result in
NumberFormatException
if the correlation ID supplier provided byRpcClientParams.correlationIdSupplier(Supplier)
does not generate appropriate IDs.- Returns:
- the most recently used correlation id
- See Also:
RpcClientParams.correlationIdSupplier(Supplier)
-
getConsumer
public Consumer getConsumer()
Retrieve the consumer.- Returns:
- an interface to the client's consumer object
-
incrementingCorrelationIdSupplier
public static java.util.function.Supplier<java.lang.String> incrementingCorrelationIdSupplier()
Creates generation IDs as a sequence of integers.- Returns:
- Since:
- 5.9.0
- See Also:
RpcClientParams.correlationIdSupplier(Supplier)
-
incrementingCorrelationIdSupplier
public static java.util.function.Supplier<java.lang.String> incrementingCorrelationIdSupplier(java.lang.String prefix)
Creates generation IDs as a sequence of integers, with the provided prefix.- Parameters:
prefix
-- Returns:
- Since:
- 5.9.0
- See Also:
RpcClientParams.correlationIdSupplier(Supplier)
-
-