18#ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
19#define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
62 using decaf::lang::Pointer;
63 using decaf::util::concurrent::atomic::AtomicBoolean;
129 ActiveMQSessionKernel(
const ActiveMQSessionKernel&);
130 ActiveMQSessionKernel& operator=(
const ActiveMQSessionKernel&);
205 const std::string& selector);
208 const std::string& selector,
212 const std::string& name,
213 const std::string& selector,
214 bool noLocal =
false);
280 cms::Message* message,
int deliveryMode,
int priority,
long long timeToLive,
316 return *( this->sessionInfo );
326 return *( this->sessionInfo->getSessionId() );
333 return this->connection;
347 return this->lastDeliveredSequenceId;
357 this->lastDeliveredSequenceId = value;
448 return this->transaction;
603 long long getNextProducerSequenceId() {
608 void checkClosed()
const;
623 std::string createTemporaryDestinationName();
#define AMQCPP_API
Definition Config.h:30
Definition ActiveMQTempDestination.h:36
Definition SessionId.h:51
Definition SessionInfo.h:48
Concrete connection used for all connectors to the ActiveMQ broker.
Definition ActiveMQConnection.h:62
Definition ActiveMQConsumer.h:40
Definition ActiveMQProducer.h:36
Delegate dispatcher for a single session.
Definition ActiveMQSessionExecutor.h:44
Interface for an object responsible for dispatching messages to consumers.
Definition Dispatcher.h:32
Definition MessageDispatchChannel.h:34
Definition ActiveMQProducerKernel.h:44
SessionConfig * config
Definition ActiveMQSessionKernel.h:74
util::LongSequenceGenerator consumerIds
Next available Consumer Id.
Definition ActiveMQSessionKernel.h:120
void send(kernels::ActiveMQProducerKernel *producer, Pointer< commands::ActiveMQDestination > destination, cms::Message *message, int deliveryMode, int priority, long long timeToLive, util::MemoryUsage *producerWindow, long long sendTimeout, cms::AsyncCallback *onComplete)
Sends a message from the Producer specified using this session's connection the message will be sent ...
virtual cms::MessageTransformer * getMessageTransformer() const
Gets the currently configured MessageTransformer for this Session.
virtual void redispatch(MessageDispatchChannel &unconsumedMessages)
Redispatches the given set of unconsumed messages to the consumers.
virtual void doStartTransaction()
Starts if not already start a Transaction for this Session.
void oneway(Pointer< commands::Command > command)
Sends a Command to the broker without requesting any Response be returned.
virtual cms::TextMessage * createTextMessage()
Creates a new TextMessage.
virtual cms::BytesMessage * createBytesMessage(const unsigned char *bytes, int bytesSize)
Creates a BytesMessage and sets the payload to the passed value.
long long lastDeliveredSequenceId
Last Delivered Sequence Id.
Definition ActiveMQSessionKernel.h:125
virtual bool isAutoAcknowledge() const
Definition ActiveMQSessionKernel.h:163
void setLastDeliveredSequenceId(long long value)
Sets the value of the Last Delivered Sequence Id.
Definition ActiveMQSessionKernel.h:356
Pointer< commands::ConsumerId > getNextConsumerId()
Get the Next available Consumer Id.
void fire(const exceptions::ActiveMQException &ex)
Fires the given exception to the exception listener of the connection.
virtual void commit()
Commits all messages done in this transaction and releases any locks currently held.
virtual cms::MessageConsumer * createConsumer(const cms::Destination *destination)
Creates a MessageConsumer for the specified destination.
virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const
Returns the acknowledgment mode of the session.
void acknowledge()
Request that the Session inform all its consumers to Acknowledge all Message's that have been receive...
std::auto_ptr< ActiveMQSessionExecutor > executor
Sends incoming messages to the registered consumers.
Definition ActiveMQSessionKernel.h:100
virtual cms::StreamMessage * createStreamMessage()
Creates a new StreamMessage.
virtual int getHashCode() const
Returns a Hash Code for this Session based on its SessionId.
const commands::SessionInfo & getSessionInfo() const
Gets the Session Information object for this session, if the session is closed than this method throw...
Definition ActiveMQSessionKernel.h:314
void sendAck(decaf::lang::Pointer< commands::MessageAck > ack, bool async=false)
Sends the given MessageAck command to the Broker either via Synchronous call or an Asynchronous call ...
virtual void recover()
Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged ...
virtual void stop()
Starts asynchronous message delivery.
bool isStarted() const
Indicates whether or not the session is currently in the started state.
virtual bool isDupsOkAcknowledge() const
Definition ActiveMQSessionKernel.h:167
Pointer< threads::Scheduler > getScheduler() const
Gets a Pointer to this Session's Scheduler instance.
virtual cms::TemporaryQueue * createTemporaryQueue()
Creates a TemporaryQueue object.
virtual cms::Topic * createTopic(const std::string &topicName)
Creates a topic identity given a Queue name.
void checkMessageListener() const
Checks if any MessageConsumer owned by this Session has a set MessageListener and throws an exception...
virtual cms::MessageConsumer * createDurableConsumer(const cms::Topic *destination, const std::string &name, const std::string &selector, bool noLocal=false)
Creates a durable subscriber to the specified topic, using a Message selector.
virtual cms::QueueBrowser * createBrowser(const cms::Queue *queue, const std::string &selector)
Creates a new QueueBrowser to peek at Messages on the given Queue.
void setSessionAsyncDispatch(bool sessionAsyncDispatch)
Configures asynchronous message dispatch to this session's consumers.
virtual cms::MessageConsumer * createConsumer(const cms::Destination *destination, const std::string &selector, bool noLocal)
Creates a MessageConsumer for the specified destination, using a message selector.
void dispose()
Cleans up the Session object's resources without attempting to send the Remove command to the broker,...
virtual cms::TemporaryTopic * createTemporaryTopic()
Creates a TemporaryTopic object.
AtomicBoolean closed
Indicates that this connection has been closed, it is no longer usable after this becomes true.
Definition ActiveMQSessionKernel.h:95
ActiveMQConnection * getConnection() const
Gets the ActiveMQConnection that is associated with this session.
Definition ActiveMQSessionKernel.h:332
decaf::util::ArrayList< Pointer< ActiveMQConsumerKernel > > getConsumers() const
Returns an ArrayList containing a copy of all consumers currently in use on this Session.
Pointer< commands::ProducerId > getNextProducerId()
Get the Next available Producer Id.
void removeConsumer(Pointer< ActiveMQConsumerKernel > consumer)
Dispose of a MessageConsumer from this session.
void addConsumer(Pointer< ActiveMQConsumerKernel > consumer)
Adds a MessageConsumerKernel to this session registering it with the Connection and store a reference...
Pointer< ActiveMQProducerKernel > lookupProducerKernel(Pointer< commands::ProducerId > id)
virtual bool isTransacted() const
Gets if the Sessions is a Transacted Session.
virtual void start()
Stops asynchronous message delivery.
virtual cms::QueueBrowser * createBrowser(const cms::Queue *queue)
Creates a new QueueBrowser to peek at Messages on the given Queue.
cms::ExceptionListener * getExceptionListener()
This method gets any registered exception listener of this sessions connection and returns it.
virtual bool isClientAcknowledge() const
Definition ActiveMQSessionKernel.h:171
virtual cms::MessageProducer * createProducer(const cms::Destination *destination)
Creates a MessageProducer to send messages to the specified destination.
virtual cms::Queue * createQueue(const std::string &queueName)
Creates a queue identity given a Queue name.
virtual void dispatch(const Pointer< MessageDispatch > &message)
Dispatches a message to a particular consumer.
void clearMessagesInProgress(decaf::lang::Pointer< decaf::util::concurrent::atomic::AtomicInteger > transportsInterrupted)
Request that this Session inform all of its consumers to clear all messages that are currently in pro...
util::LongSequenceGenerator producerSequenceIds
Next available Producer Sequence Id.
Definition ActiveMQSessionKernel.h:115
virtual void setMessageTransformer(cms::MessageTransformer *transformer)
Set an MessageTransformer instance that is passed on to all MessageProducer and MessageConsumer objec...
void close(Pointer< commands::ConsumerId > id)
Close the specified consumer if present in this Session.
virtual cms::Message * createMessage()
Creates a new Message.
virtual void rollback()
Rolls back all messages done in this transaction and releases any locks currently held.
virtual cms::MessageConsumer * createConsumer(const cms::Destination *destination, const std::string &selector)
Creates a MessageConsumer for the specified destination, using a message selector.
void addProducer(Pointer< ActiveMQProducerKernel > producer)
Adds a MessageProducer to this session registering it with the Connection and store a reference to it...
virtual ~ActiveMQSessionKernel()
util::LongSequenceGenerator producerIds
Next available Producer Id.
Definition ActiveMQSessionKernel.h:110
virtual void unsubscribe(const std::string &name)
Unsubscribes a durable subscription that has been created by a client.
bool isInUse(Pointer< commands::ActiveMQDestination > destination)
Checks if the given destination is currently in use by any consumers in this Session.
const commands::SessionId & getSessionId() const
Gets the Session Id object for this session, if the session is closed than this method throws an exce...
Definition ActiveMQSessionKernel.h:324
void doClose()
Performs the actual Session close operations.
virtual cms::MapMessage * createMapMessage()
Creates a new MapMessage.
ActiveMQConnection * connection
Connection.
Definition ActiveMQSessionKernel.h:89
Pointer< commands::Response > syncRequest(Pointer< commands::Command > command, unsigned int timeout=0)
Sends a synchronous request and returns the response from the broker.
Pointer< commands::SessionInfo > sessionInfo
SessionInfo for this Session.
Definition ActiveMQSessionKernel.h:79
Pointer< ActiveMQTransactionContext > transaction
Transaction Management object.
Definition ActiveMQSessionKernel.h:84
void removeProducer(Pointer< ActiveMQProducerKernel > producer)
Dispose of a MessageProducer from this session.
virtual cms::TextMessage * createTextMessage(const std::string &text)
Creates a new TextMessage and set the text to the value given.
void deliverAcks()
Request that this Session inform all of its consumers to deliver their pending acks.
Pointer< ActiveMQTransactionContext > getTransactionContext()
Gets the Pointer to this Session's TransactionContext.
Definition ActiveMQSessionKernel.h:447
long long getLastDeliveredSequenceId() const
Gets the currently set Last Delivered Sequence Id.
Definition ActiveMQSessionKernel.h:346
cms::Session::AcknowledgeMode ackMode
This Sessions Acknowledgment mode.
Definition ActiveMQSessionKernel.h:105
void wakeup()
Causes the Session to wakeup its executer and ensure all messages are dispatched.
Pointer< ActiveMQConsumerKernel > lookupConsumerKernel(Pointer< commands::ConsumerId > id)
virtual cms::BytesMessage * createBytesMessage()
Creates a BytesMessage.
ActiveMQSessionKernel(ActiveMQConnection *connection, const Pointer< commands::SessionId > &id, cms::Session::AcknowledgeMode ackMode, const decaf::util::Properties &properties)
void setPrefetchSize(Pointer< commands::ConsumerId > id, int prefetch)
Set the prefetch level for the given consumer if it exists in this Session to the value specified.
bool isSessionAsyncDispatch() const
Returns true if this session is dispatching messages to its consumers asynchronously.
bool iterateConsumers()
Gives each consumer a chance to dispatch messages that have been enqueued by calling each consumers i...
virtual void close()
Closes this session as well as any active child consumers or producers.
virtual bool isIndividualAcknowledge() const
Definition ActiveMQSessionKernel.h:175
Definition ActiveMQException.h:35
This class is used to generate a sequence of long long values that are incremented each time a new va...
Definition LongSequenceGenerator.h:32
long long getNextSequenceId()
Definition MemoryUsage.h:28
Asynchronous event interface for CMS asynchronous operations.
Definition AsyncCallback.h:37
A BytesMessage object is used to send a message containing a stream of unsigned bytes.
Definition BytesMessage.h:66
A Destination object encapsulates a provider-specific address.
Definition Destination.h:39
If a CMS provider detects a serious problem, it notifies the client application through an ExceptionL...
Definition ExceptionListener.h:37
A MapMessage object is used to send a set of name-value pairs.
Definition MapMessage.h:71
A client uses a MessageConsumer to received messages from a destination.
Definition MessageConsumer.h:63
Root of all messages.
Definition Message.h:88
A client uses a MessageProducer object to send messages to a Destination.
Definition MessageProducer.h:60
This class implements in interface for browsing the messages in a Queue without removing them.
Definition QueueBrowser.h:53
An interface encapsulating a provider-specific queue name.
Definition Queue.h:37
A Session object is a single-threaded context for producing and consuming messages.
Definition Session.h:105
AcknowledgeMode
Definition Session.h:108
@ INDIVIDUAL_ACKNOWLEDGE
Message will be acknowledged individually.
Definition Session.h:146
@ AUTO_ACKNOWLEDGE
With this acknowledgment mode, the session automatically acknowledges a client's receipt of a message...
Definition Session.h:117
@ DUPS_OK_ACKNOWLEDGE
With this acknowledgment mode, the session automatically acknowledges a client's receipt of a message...
Definition Session.h:128
@ CLIENT_ACKNOWLEDGE
With this acknowledgment mode, the client acknowledges a consumed message by calling the message's ac...
Definition Session.h:134
Interface for a StreamMessage.
Definition StreamMessage.h:61
Defines a Temporary Queue based Destination.
Definition TemporaryQueue.h:39
Defines a Temporary Topic based Destination.
Definition TemporaryTopic.h:39
Interface for a text message.
Definition TextMessage.h:41
An interface encapsulating a provider-specific topic name.
Definition Topic.h:36
Decaf's implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition Pointer.h:53
Definition ArrayList.h:39
Java-like properties class for mapping string names to string values.
Definition Properties.h:53
A boolean value that may be updated atomically.
Definition AtomicBoolean.h:34
Definition ActiveMQQueueBrowser.h:37
Definition ActiveMQTempDestination.h:29
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
Definition CachedConsumer.h:24