activemq-cpp-3.9.5
ActiveMQConnection.h
Go to the documentation of this file.
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_
19#define _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_
20
41
42#include <string>
43#include <memory>
44
45namespace activemq {
46namespace core {
47
48 using decaf::lang::Pointer;
49
50 class ActiveMQSession;
51 class ConnectionConfig;
52 class PrefetchPolicy;
53 class RedeliveryPolicy;
54
61 class AMQCPP_API ActiveMQConnection : public virtual cms::EnhancedConnection,
63 private:
64
65 ConnectionConfig* config;
66
70 std::auto_ptr<cms::ConnectionMetaData> connectionMetaData;
71
76
82
88
93
94 private:
95
96 ActiveMQConnection(const ActiveMQConnection&);
97 ActiveMQConnection& operator=(const ActiveMQConnection&);
98
99 public:
100
110 const Pointer<decaf::util::Properties> properties);
111
113
123
133
143
149 virtual void removeProducer(const Pointer<commands::ProducerId>& producerId);
150
157 virtual void addDispatcher(const Pointer<commands::ConsumerId>& consumer, Dispatcher* dispatcher);
158
164 virtual void removeDispatcher(const Pointer<commands::ConsumerId>& consumer);
165
176 virtual void sendPullRequest(const commands::ConsumerInfo* consumer, long long timeout);
177
182 bool isClosed() const {
183 return this->closed.get();
184 }
185
190 bool isStarted() const {
191 return this->started.get();
192 }
193
198 bool isTransportFailed() const {
199 return this->transportFailed.get();
200 }
201
220 virtual void destroyDestination(const commands::ActiveMQDestination* destination);
221
240 virtual void destroyDestination(const cms::Destination* destination);
241
253
263
271
272 public: // Connection Interface Methods
273
277 virtual const cms::ConnectionMetaData* getMetaData() const {
278 return connectionMetaData.get();
279 }
280
285
289 virtual std::string getClientID() const;
290
294 virtual void setClientID(const std::string& clientID);
295
300
304 virtual void close();
305
309 virtual void start();
310
314 virtual void stop();
315
320
325
330
335
340
341 public: // Configuration Options
342
347 void setUsername(const std::string& username);
348
354 const std::string& getUsername() const;
355
360 void setPassword(const std::string& password);
361
367 const std::string& getPassword() const;
368
373 void setDefaultClientId(const std::string& clientId);
374
380 void setBrokerURL(const std::string& brokerURL);
381
387 const std::string& getBrokerURL() const;
388
398
405
415
422
426 bool isDispatchAsync() const;
427
436 void setDispatchAsync(bool value);
437
443 bool isAlwaysSyncSend() const;
444
450 void setAlwaysSyncSend(bool value);
451
456 bool isUseAsyncSend() const;
457
462 void setUseAsyncSend(bool value);
463
468 bool isUseCompression() const;
469
476 void setUseCompression(bool value);
477
487 void setCompressionLevel(int value);
488
495
500 unsigned int getSendTimeout() const;
501
507 void setSendTimeout(unsigned int timeout);
508
513 unsigned int getConnectResponseTimeout() const;
514
521 void setConnectResponseTimeout(unsigned int connectResponseTimeout);
522
527 unsigned int getCloseTimeout() const;
528
533 void setCloseTimeout(unsigned int timeout);
534
542 unsigned int getProducerWindowSize() const;
543
550 void setProducerWindowSize(unsigned int windowSize);
551
557
566
572
578
586
594 void setWatchTopicAdvisories(bool value);
595
604 int getAuditDepth() const;
605
615 void setAuditDepth(int auditDepth);
616
623
630 void setAuditMaximumProducerNumber(int auditMaximumProducerNumber);
631
645
659 void setCheckForDuplicates(bool checkForDuplicates);
660
669
678 void setTransactedIndividualAck(bool transactedIndividualAck);
679
687
696 void setNonBlockingRedelivery(bool nonBlockingRedelivery);
697
704
712
717
724 void setOptimizeAcknowledge(bool optimizeAcknowledge);
725
732
739 void setOptimizeAcknowledgeTimeOut(long long optimizeAcknowledgeTimeOut);
740
750
760 void setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval);
761
768
777 void setUseRetroactiveConsumer(bool useRetroactiveConsumer);
778
785
793 void setExclusiveConsumer(bool exclusiveConsumer);
794
801 bool isSendAcksAsync() const;
802
810 void setSendAcksAsync(bool sendAcksAsync);
811
816
823 void setAlwaysSessionAsync(bool alwaysSessionAsync);
824
829
837 void setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled);
838
843
844 public: // TransportListener
845
858
868
874 virtual void onCommand(const Pointer<commands::Command> command);
875
880 virtual void onException(const decaf::lang::Exception& ex);
881
885 virtual void transportInterrupted();
886
890 virtual void transportResumed();
891
892 public:
893
901
909
916
923
930 std::string getResourceManagerId() const;
931
936 void cleanup();
937
949
965
979
984 virtual void fire(const exceptions::ActiveMQException& ex);
985
991
1000
1007
1015
1023
1029 void checkClosed() const;
1030
1037
1042
1047
1056
1065
1076
1084
1092
1100
1101 protected:
1102
1107
1108 // Sends a oneway disconnect message to the broker.
1109 void disconnect(long long lastDeliveredSequenceId);
1110
1111 // Waits for all Consumers to handle the Transport Interrupted event.
1113
1114 // Marks processing complete for a single caller when interruption processing completes.
1116
1117 // Allow subclasses to access the original Properties object for this connection.
1119
1120 // Process the WireFormatInfo command
1122
1123 // Process the ControlCommand command
1125
1126 // Process the ConnectionControl command
1128
1129 // Process the ConsumerControl command
1131
1132 };
1133
1134}}
1135
1136#endif /*_ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_*/
#define AMQCPP_API
Definition Config.h:30
Definition ActiveMQDestination.h:39
Definition ConnectionId.h:51
Definition ConnectionInfo.h:49
Definition ConsumerInfo.h:51
Pointer< commands::Response > syncRequest(Pointer< commands::Command > command, unsigned int timeout=0)
Sends a synchronous request and returns the response from the broker.
const commands::ConnectionId & getConnectionId() const
Gets the ConnectionId for this Object, if the Connection is not open than this method throws an excep...
void setTransportInterruptionProcessingComplete()
Indicates that a Connection resource that is processing the transportInterrupted event has completed.
virtual void setMessageTransformer(cms::MessageTransformer *transformer)
Set an MessageTransformer instance that is passed on to all Session objects created from this Connect...
virtual Pointer< commands::SessionId > getNextSessionId()
virtual void addSession(Pointer< activemq::core::kernels::ActiveMQSessionKernel > session)
Adds the session resources for the given session instance.
void addTempDestination(Pointer< commands::ActiveMQTempDestination > destination)
Adds the given Temporary Destination to this Connections collection of known Temporary Destinations.
void setAuditDepth(int auditDepth)
Set the audit depth for Messages for consumers when using a fault tolerant transport.
void onControlCommand(Pointer< commands::Command > command)
void setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval)
Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that ha...
void setConsumerFailoverRedeliveryWaitPeriod(long long value)
Sets the delay period for a consumer redelivery.
void setSendTimeout(unsigned int timeout)
Sets the send timeout to use when sending Message objects, this will cause all messages to be sent us...
void setRedeliveryPolicy(RedeliveryPolicy *policy)
Sets the RedeliveryPolicy instance that this factory should use when it creates new Connection instan...
void setWatchTopicAdvisories(bool value)
Sets whether this Connection is listening for advisory messages regarding temporary destination creat...
unsigned int getProducerWindowSize() const
Gets the configured producer window size for Producers that are created from this connector.
void setTransactedIndividualAck(bool transactedIndividualAck)
when true, submit individual transacted acks immediately rather than with transaction completion.
unsigned int getSendTimeout() const
Gets the assigned send timeout for this Connector.
unsigned int getConnectResponseTimeout() const
Gets the assigned connect response timeout for this Connector.
void setDispatchAsync(bool value)
Should messages be dispatched synchronously or asynchronously from the producer thread for non-durabl...
void checkClosedOrFailed() const
Check for Closed State and Failed State and Throw an exception if either is true.
virtual const cms::ConnectionMetaData * getMetaData() const
Gets the metadata for this connection.the connection MetaData pointer ( caller does not own it )....
Definition ActiveMQConnection.h:277
bool isDuplicate(Dispatcher *dispatcher, Pointer< commands::Message > message)
Allows Consumers to check if an incoming Message is a Duplicate.
void setMessagePrioritySupported(bool value)
Set whether or not this factory should create Connection objects with the Message priority support fu...
void ensureConnectionInfoSent()
If its not been sent, then send the ConnectionInfo to the Broker.
void checkClosed() const
Check for Closed State and Throw an exception if true.
bool isCheckForDuplicates() const
Gets the value of the configured Duplicate Message detection feature.
virtual void addDispatcher(const Pointer< commands::ConsumerId > &consumer, Dispatcher *dispatcher)
Adds a dispatcher for a consumer.
virtual void fire(const exceptions::ActiveMQException &ex)
Notify the exception listener.
void setAuditMaximumProducerNumber(int auditMaximumProducerNumber)
The number of Producers that will be audited.
virtual void sendPullRequest(const commands::ConsumerInfo *consumer, long long timeout)
If supported sends a message pull request to the service provider asking for the delivery of a new me...
void setUseRetroactiveConsumer(bool useRetroactiveConsumer)
Sets whether or not retroactive consumers are enabled.
bool isDeleted(Pointer< commands::ActiveMQTempDestination > destination) const
Determines whether the supplied Temporary Destination has already been deleted from the Broker.
virtual void removeSession(Pointer< activemq::core::kernels::ActiveMQSessionKernel > session)
Removes the session resources for the given session instance.
int getCompressionLevel() const
Gets the currently configured Compression level for Message bodies.
void oneway(Pointer< commands::Command > command)
Sends a message without request that the broker send a response to indicate that it was received.
void deleteTempDestination(Pointer< commands::ActiveMQTempDestination > destination)
Removes the given Temporary Destination to this Connections collection of known Temporary Destination...
void setCheckForDuplicates(bool checkForDuplicates)
Gets the value of the configured Duplicate Message detection feature.
void cleanup()
Clean up this connection object, reseting it back to a state that mirrors what a newly created Active...
ActiveMQConnection(const Pointer< transport::Transport > transport, const Pointer< decaf::util::Properties > properties)
Constructor.
const commands::ConnectionInfo & getConnectionInfo() const
Gets the ConnectionInfo for this Object, if the Connection is not open than this method throws an exc...
int getAuditDepth() const
Get the audit depth for Messages for consumers when using a fault tolerant transport.
long long getOptimizeAcknowledgeTimeOut() const
Gets the time between optimized ack batches in milliseconds.
void onConsumerControl(Pointer< commands::Command > command)
bool isNonBlockingRedelivery() const
Returns true if non-blocking redelivery of Messages is configured for Consumers that are rolled back ...
void cleanUpTempDestinations()
Removes any TempDestinations that this connection has cached, ignoring any exceptions generated becau...
void setUseCompression(bool value)
Sets whether Message body compression is enabled.
Pointer< threads::Scheduler > getScheduler() const
Gets a reference to the Connection objects built in Scheduler instance.
void setCloseTimeout(unsigned int timeout)
Sets the close timeout to use when sending the disconnect request.
virtual void stop()
Stops this service.
void setPassword(const std::string &password)
Sets the password that should be used when creating a new connection.
bool isExclusiveConsumer() const
Should all created consumers be exclusive.
virtual std::string getClientID() const
Get the Client Id for this session, the client Id is provider specific and is either assigned by the ...
void setOptimizeAcknowledge(bool optimizeAcknowledge)
Sets if Consumers are configured to use Optimized Acknowledge by default.
bool isAlwaysSyncSend() const
Gets if the Connection should always send things Synchronously.
decaf::util::concurrent::ExecutorService * getExecutor() const
void addTransportListener(transport::TransportListener *transportListener)
Adds a transport listener so that a client can be notified of events in the underlying transport,...
virtual void setExceptionListener(cms::ExceptionListener *listener)
Sets the registered Exception Listener for this connection.
virtual void onCommand(const Pointer< commands::Command > command)
Event handler for the receipt of a non-response command from the transport.
void setConnectResponseTimeout(unsigned int connectResponseTimeout)
Sets the connect response timeout to use when sending Message objects, this will protect clients usin...
virtual void start()
Starts the service.
bool isTransportFailed() const
Checks if the Connection's Transport has failed.
Definition ActiveMQConnection.h:198
void setPrefetchPolicy(PrefetchPolicy *policy)
Sets the PrefetchPolicy instance that this factory should use when it creates new Connection instance...
std::string getResourceManagerId() const
Returns the Id of the Resource Manager that this client will use should it be entered into an XA Tran...
long long getOptimizedAckScheduledAckInterval() const
Gets the configured time interval that is used to force all MessageConsumers that have optimizedAckno...
void setCompressionLevel(int value)
Sets the Compression level used when Message body compression is enabled, a value of -1 causes the Co...
bool isStarted() const
Check if this connection has been started.
Definition ActiveMQConnection.h:190
void setDefaultClientId(const std::string &clientId)
Sets the Client Id.
void setProducerWindowSize(unsigned int windowSize)
Sets the size in Bytes of messages that a producer can send before it is blocked to await a ProducerA...
void setSendAcksAsync(bool sendAcksAsync)
Sets whether Message acknowledgments are sent asynchronously meaning no response is required from the...
virtual cms::MessageTransformer * getMessageTransformer() const
Gets the currently configured MessageTransformer for this Connection.the pointer to the currently set...
PrefetchPolicy * getPrefetchPolicy() const
Gets the pointer to the current PrefetchPolicy that is in use by this ConnectionFactory.
bool isWatchTopicAdvisories() const
Is the Connection configured to watch for advisory messages to maintain state of temporary destinatio...
void removeAuditedDispatcher(Dispatcher *dispatcher)
Removes the Audit information stored for a given MessageConsumer.
virtual cms::Session * createSession(cms::Session::AcknowledgeMode ackMode)
Creates a new Session to work for this Connection using the specified acknowledgment mode.
virtual void addProducer(Pointer< kernels::ActiveMQProducerKernel > producer)
Adds an active Producer to the Set of known producers.
transport::Transport & getTransport() const
Gets a reference to this object's Transport instance.
decaf::util::ArrayList< Pointer< activemq::core::kernels::ActiveMQSessionKernel > > getSessions() const
Returns an ArrayList that contains a copy of all Sessions that are currently active in the Connection...
void setAlwaysSyncSend(bool value)
Sets if the Connection should always send things Synchronously.
RedeliveryPolicy * getRedeliveryPolicy() const
Gets the pointer to the current RedeliveryPolicy that is in use by this ConnectionFactory.
void removeTransportListener(transport::TransportListener *transportListener)
Removes a registered TransportListener from the Connection's set of Transport listeners,...
bool isClosed() const
Checks if this connection has been closed.
Definition ActiveMQConnection.h:182
const decaf::util::Properties & getProperties() const
virtual void removeDispatcher(const Pointer< commands::ConsumerId > &consumer)
Removes the dispatcher for a consumer.
void setBrokerURL(const std::string &brokerURL)
Sets the Broker URL that should be used when creating a new connection instance.
long long getConsumerFailoverRedeliveryWaitPeriod() const
Gets the delay period for a consumer redelivery.
const std::string & getBrokerURL() const
Gets the Broker URL that this factory will use when creating a new connection instance.
const std::string & getPassword() const
Gets the password that this factory will use when creating a new connection instance.
virtual void destroyDestination(const cms::Destination *destination)
Requests that the Broker removes the given Destination.
virtual cms::ExceptionListener * getExceptionListener() const
Gets the registered Exception Listener for this connection.pointer to an exception listener or NULL
virtual void transportResumed()
The transport has resumed after an interruption.
void onWireFormatInfo(Pointer< commands::Command > command)
int getAuditMaximumProducerNumber() const
The number of Producers that will be audited.
void setOptimizeAcknowledgeTimeOut(long long optimizeAcknowledgeTimeOut)
The max time in milliseconds between optimized ack batches.
decaf::lang::Exception * getFirstFailureError() const
Gets the pointer to the first exception that caused the Connection to become failed.
void onClientInternalException(const decaf::lang::Exception &ex)
Handles async client internal exceptions which don't usually affect the connection itself.
unsigned int getCloseTimeout() const
Gets the assigned close timeout for this Connector.
virtual cms::DestinationSource * getDestinationSource()
Returns the DestinationSource} object which can be used to listen to destinations being created or de...
void setUsername(const std::string &username)
Sets the username that should be used when creating a new connection.
const std::string & getUsername() const
Gets the username that this factory will use when creating a new connection instance.
bool isUseAsyncSend() const
Gets if the useAsyncSend option is set.
long long getNextTempDestinationId()
Get the Next Temporary Destination Id.
void onConnectionControl(Pointer< commands::Command > command)
virtual void removeProducer(const Pointer< commands::ProducerId > &producerId)
Removes an active Producer to the Set of known producers.
void setNonBlockingRedelivery(bool nonBlockingRedelivery)
When true a MessageConsumer will not stop Message delivery before re-delivering Messages from a rolle...
void rollbackDuplicate(Dispatcher *dispatcher, Pointer< commands::Message > message)
Mark message as received.
void setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled)
Configures whether this consumer will perform message expiration processing on all incoming messages.
virtual void destroyDestination(const commands::ActiveMQDestination *destination)
Requests that the Broker removes the given Destination.
void removeTempDestination(Pointer< commands::ActiveMQTempDestination > destination)
Removes the given Temporary Destination to this Connections collection of known Temporary Destination...
bool isUseRetroactiveConsumer() const
Should all created consumers be retroactive.
virtual void setClientID(const std::string &clientID)
Sets the client identifier for this connection.The preferred way to assign a CMS client's client iden...
void onAsyncException(const decaf::lang::Exception &ex)
Event handler for dealing with async exceptions.
void setExclusiveConsumer(bool exclusiveConsumer)
Enables or disables whether or not queue consumers should be exclusive or not for example to preserve...
virtual void close()
Closes this connection as well as any Sessions created from it (and those Sessions' consumers and pro...
void setFirstFailureError(decaf::lang::Exception *error)
Sets the pointer to the first exception that caused the Connection to become failed.
bool isSendAcksAsync() const
Returns whether Message acknowledgments are sent asynchronously meaning no response is required from ...
bool isUseCompression() const
Gets if the Connection is configured for Message body compression.
long long getNextLocalTransactionId()
Get the Next Temporary Destination Id.
virtual cms::Session * createSession()
Creates an AUTO_ACKNOWLEDGE Session.
virtual void transportInterrupted()
The transport has suffered an interruption from which it hopes to recover.
void asyncRequest(Pointer< commands::Command > command, cms::AsyncCallback *onComplete)
Sends a synchronous request and returns the response from the broker.
void setAlwaysSessionAsync(bool alwaysSessionAsync)
If this flag is not set then a separate thread is not used for dispatching messages for each Session ...
virtual void onException(const decaf::lang::Exception &ex)
Event handler for an exception from a command transport.
void disconnect(long long lastDeliveredSequenceId)
bool isTransactedIndividualAck() const
when true, submit individual transacted acks immediately rather than with transaction completion.
void setUseAsyncSend(bool value)
Sets the useAsyncSend option.
Definition ActiveMQSession.h:42
Interface for an object responsible for dispatching messages to consumers.
Definition Dispatcher.h:32
Interface for a Policy object that controls message Prefetching on various destination types in Activ...
Definition PrefetchPolicy.h:34
Interface for a RedeliveryPolicy object that controls how message Redelivery is handled in ActiveMQ-C...
Definition RedeliveryPolicy.h:34
Definition ActiveMQException.h:35
Interface for a transport layer for command objects.
Definition Transport.h:60
A listener of asynchronous exceptions from a command transport object.
Definition TransportListener.h:38
Asynchronous event interface for CMS asynchronous operations.
Definition AsyncCallback.h:37
A ConnectionMetaData object provides information describing the Connection object.
Definition ConnectionMetaData.h:31
A Destination object encapsulates a provider-specific address.
Definition Destination.h:39
Provides an object that will provide a snapshot view of Destinations that exist on the Message provid...
Definition DestinationSource.h:38
An enhanced CMS Connection instance that provides additional features above the default required feat...
Definition EnhancedConnection.h:33
If a CMS provider detects a serious problem, it notifies the client application through an ExceptionL...
Definition ExceptionListener.h:37
Provides an interface for clients to transform cms::Message objects inside the CMS MessageProducer an...
Definition MessageTransformer.h:37
A Session object is a single-threaded context for producing and consuming messages.
Definition Session.h:105
AcknowledgeMode
Definition Session.h:108
Definition Exception.h:38
Decaf's implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition Pointer.h:53
Definition ArrayList.h:39
Java-like properties class for mapping string names to string values.
Definition Properties.h:53
An Executor that provides methods to manage termination and methods that can produce a Future for tra...
Definition ExecutorService.h:56
A boolean value that may be updated atomically.
Definition AtomicBoolean.h:34
bool get() const
Gets the current value of this AtomicBoolean.
Definition AtomicBoolean.h:63
Definition ActiveMQTempDestination.h:29
Definition AbstractTransportFactory.h:30
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
Definition CachedConsumer.h:24