activemq-cpp-3.9.5
ActiveMQSessionKernel.h
Go to the documentation of this file.
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
19#define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
20
21#include <cms/Session.h>
23
25#include <activemq/util/Usage.h>
42
43#include <decaf/lang/Pointer.h>
48
49#include <string>
50#include <memory>
51
52namespace activemq {
53namespace core {
54
56 class ActiveMQConsumer;
57 class ActiveMQProducer;
59
60namespace kernels {
61
62 using decaf::lang::Pointer;
63 using decaf::util::concurrent::atomic::AtomicBoolean;
64
65 class SessionConfig;
66
67 class AMQCPP_API ActiveMQSessionKernel : public virtual cms::Session, public Dispatcher {
68 private:
69
71
72 protected:
73
74 SessionConfig* config;
75
80
85
90
96
100 std::auto_ptr<ActiveMQSessionExecutor> executor;
101
106
111
116
121
126
127 private:
128
129 ActiveMQSessionKernel(const ActiveMQSessionKernel&);
130 ActiveMQSessionKernel& operator=(const ActiveMQSessionKernel&);
131
132 public:
133
137 const decaf::util::Properties& properties);
138
140
145 virtual void redispatch(MessageDispatchChannel& unconsumedMessages);
146
150 virtual void start();
151
155 virtual void stop();
156
161 bool isStarted() const;
162
163 virtual bool isAutoAcknowledge() const {
164 return this->ackMode == cms::Session::AUTO_ACKNOWLEDGE;
165 }
166
167 virtual bool isDupsOkAcknowledge() const {
168 return this->ackMode == cms::Session::DUPS_OK_ACKNOWLEDGE;
169 }
170
171 virtual bool isClientAcknowledge() const {
172 return this->ackMode == cms::Session::CLIENT_ACKNOWLEDGE;
173 }
174
175 virtual bool isIndividualAcknowledge() const {
176 return this->ackMode == cms::Session::INDIVIDUAL_ACKNOWLEDGE;
177 }
178
183
184 public: // Methods from ActiveMQMessageDispatcher
185
190 virtual void dispatch(const Pointer<MessageDispatch>& message);
191
192 public: // Implements Methods
193
194 virtual void close();
195
196 virtual void commit();
197
198 virtual void rollback();
199
200 virtual void recover();
201
203
205 const std::string& selector);
206
208 const std::string& selector,
209 bool noLocal);
210
212 const std::string& name,
213 const std::string& selector,
214 bool noLocal = false);
215
217
219
220 virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue, const std::string& selector);
221
222 virtual cms::Queue* createQueue(const std::string& queueName);
223
224 virtual cms::Topic* createTopic(const std::string& topicName);
225
227
229
231
233
234 virtual cms::BytesMessage* createBytesMessage(const unsigned char* bytes, int bytesSize);
235
237
239
240 virtual cms::TextMessage* createTextMessage( const std::string& text );
241
243
245
246 virtual bool isTransacted() const;
247
248 virtual void unsubscribe(const std::string& name);
249
250 public: // ActiveMQSessionKernel specific Methods
251
280 cms::Message* message, int deliveryMode, int priority, long long timeToLive,
281 util::MemoryUsage* producerWindow, long long sendTimeout, cms::AsyncCallback* onComplete);
282
292
301
308
315 this->checkClosed();
316 return *( this->sessionInfo );
317 }
318
325 this->checkClosed();
326 return *( this->sessionInfo->getSessionId() );
327 }
328
333 return this->connection;
334 }
335
340
346 long long getLastDeliveredSequenceId() const {
347 return this->lastDeliveredSequenceId;
348 }
349
356 void setLastDeliveredSequenceId(long long value) {
357 this->lastDeliveredSequenceId = value;
358 }
359
370
386
398
409
421
432
440 virtual void doStartTransaction();
441
448 return this->transaction;
449 }
450
456
462
469
473 void wakeup();
474
480
486
493 void doClose();
494
501 void dispose();
502
513
521
528
533
538
548
555
561 virtual int getHashCode() const;
562
573
580
586 void setSessionAsyncDispatch(bool sessionAsyncDispatch);
587
596
597 private:
598
603 long long getNextProducerSequenceId() {
604 return this->producerSequenceIds.getNextSequenceId();
605 }
606
607 // Checks for the closed state and throws if so.
608 void checkClosed() const;
609
610 // Send the Destination Creation Request to the Broker, alerting it
611 // that we've created a new Temporary Destination.
612 // @param tempDestination - The new Temporary Destination
613 void createTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
614
615 // Send the Destination Destruction Request to the Broker, alerting
616 // it that we've removed an existing Temporary Destination.
617 // @param tempDestination - The Temporary Destination to remove
618 void destroyTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
619
620 // Creates a new Temporary Destination name using the connection id
621 // and a rolling count.
622 // @return a unique Temporary Destination name
623 std::string createTemporaryDestinationName();
624
625 };
626
627}}}
628
629#endif /* _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_ */
#define AMQCPP_API
Definition Config.h:30
Definition ActiveMQTempDestination.h:36
Definition SessionId.h:51
Definition SessionInfo.h:48
Concrete connection used for all connectors to the ActiveMQ broker.
Definition ActiveMQConnection.h:62
Definition ActiveMQConsumer.h:40
Definition ActiveMQProducer.h:36
Delegate dispatcher for a single session.
Definition ActiveMQSessionExecutor.h:44
Interface for an object responsible for dispatching messages to consumers.
Definition Dispatcher.h:32
Definition MessageDispatchChannel.h:34
Definition ActiveMQProducerKernel.h:44
SessionConfig * config
Definition ActiveMQSessionKernel.h:74
util::LongSequenceGenerator consumerIds
Next available Consumer Id.
Definition ActiveMQSessionKernel.h:120
void send(kernels::ActiveMQProducerKernel *producer, Pointer< commands::ActiveMQDestination > destination, cms::Message *message, int deliveryMode, int priority, long long timeToLive, util::MemoryUsage *producerWindow, long long sendTimeout, cms::AsyncCallback *onComplete)
Sends a message from the Producer specified using this session's connection the message will be sent ...
virtual cms::MessageTransformer * getMessageTransformer() const
Gets the currently configured MessageTransformer for this Session.
virtual void redispatch(MessageDispatchChannel &unconsumedMessages)
Redispatches the given set of unconsumed messages to the consumers.
virtual void doStartTransaction()
Starts if not already start a Transaction for this Session.
void oneway(Pointer< commands::Command > command)
Sends a Command to the broker without requesting any Response be returned.
virtual cms::TextMessage * createTextMessage()
Creates a new TextMessage.
virtual cms::BytesMessage * createBytesMessage(const unsigned char *bytes, int bytesSize)
Creates a BytesMessage and sets the payload to the passed value.
long long lastDeliveredSequenceId
Last Delivered Sequence Id.
Definition ActiveMQSessionKernel.h:125
virtual bool isAutoAcknowledge() const
Definition ActiveMQSessionKernel.h:163
void setLastDeliveredSequenceId(long long value)
Sets the value of the Last Delivered Sequence Id.
Definition ActiveMQSessionKernel.h:356
Pointer< commands::ConsumerId > getNextConsumerId()
Get the Next available Consumer Id.
void fire(const exceptions::ActiveMQException &ex)
Fires the given exception to the exception listener of the connection.
virtual void commit()
Commits all messages done in this transaction and releases any locks currently held.
virtual cms::MessageConsumer * createConsumer(const cms::Destination *destination)
Creates a MessageConsumer for the specified destination.
virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const
Returns the acknowledgment mode of the session.
void acknowledge()
Request that the Session inform all its consumers to Acknowledge all Message's that have been receive...
std::auto_ptr< ActiveMQSessionExecutor > executor
Sends incoming messages to the registered consumers.
Definition ActiveMQSessionKernel.h:100
virtual cms::StreamMessage * createStreamMessage()
Creates a new StreamMessage.
virtual int getHashCode() const
Returns a Hash Code for this Session based on its SessionId.
const commands::SessionInfo & getSessionInfo() const
Gets the Session Information object for this session, if the session is closed than this method throw...
Definition ActiveMQSessionKernel.h:314
void sendAck(decaf::lang::Pointer< commands::MessageAck > ack, bool async=false)
Sends the given MessageAck command to the Broker either via Synchronous call or an Asynchronous call ...
virtual void recover()
Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged ...
virtual void stop()
Starts asynchronous message delivery.
bool isStarted() const
Indicates whether or not the session is currently in the started state.
virtual bool isDupsOkAcknowledge() const
Definition ActiveMQSessionKernel.h:167
Pointer< threads::Scheduler > getScheduler() const
Gets a Pointer to this Session's Scheduler instance.
virtual cms::TemporaryQueue * createTemporaryQueue()
Creates a TemporaryQueue object.
virtual cms::Topic * createTopic(const std::string &topicName)
Creates a topic identity given a Queue name.
void checkMessageListener() const
Checks if any MessageConsumer owned by this Session has a set MessageListener and throws an exception...
virtual cms::MessageConsumer * createDurableConsumer(const cms::Topic *destination, const std::string &name, const std::string &selector, bool noLocal=false)
Creates a durable subscriber to the specified topic, using a Message selector.
virtual cms::QueueBrowser * createBrowser(const cms::Queue *queue, const std::string &selector)
Creates a new QueueBrowser to peek at Messages on the given Queue.
void setSessionAsyncDispatch(bool sessionAsyncDispatch)
Configures asynchronous message dispatch to this session's consumers.
virtual cms::MessageConsumer * createConsumer(const cms::Destination *destination, const std::string &selector, bool noLocal)
Creates a MessageConsumer for the specified destination, using a message selector.
void dispose()
Cleans up the Session object's resources without attempting to send the Remove command to the broker,...
virtual cms::TemporaryTopic * createTemporaryTopic()
Creates a TemporaryTopic object.
AtomicBoolean closed
Indicates that this connection has been closed, it is no longer usable after this becomes true.
Definition ActiveMQSessionKernel.h:95
ActiveMQConnection * getConnection() const
Gets the ActiveMQConnection that is associated with this session.
Definition ActiveMQSessionKernel.h:332
decaf::util::ArrayList< Pointer< ActiveMQConsumerKernel > > getConsumers() const
Returns an ArrayList containing a copy of all consumers currently in use on this Session.
Pointer< commands::ProducerId > getNextProducerId()
Get the Next available Producer Id.
void removeConsumer(Pointer< ActiveMQConsumerKernel > consumer)
Dispose of a MessageConsumer from this session.
void addConsumer(Pointer< ActiveMQConsumerKernel > consumer)
Adds a MessageConsumerKernel to this session registering it with the Connection and store a reference...
Pointer< ActiveMQProducerKernel > lookupProducerKernel(Pointer< commands::ProducerId > id)
virtual bool isTransacted() const
Gets if the Sessions is a Transacted Session.
virtual void start()
Stops asynchronous message delivery.
virtual cms::QueueBrowser * createBrowser(const cms::Queue *queue)
Creates a new QueueBrowser to peek at Messages on the given Queue.
cms::ExceptionListener * getExceptionListener()
This method gets any registered exception listener of this sessions connection and returns it.
virtual bool isClientAcknowledge() const
Definition ActiveMQSessionKernel.h:171
virtual cms::MessageProducer * createProducer(const cms::Destination *destination)
Creates a MessageProducer to send messages to the specified destination.
virtual cms::Queue * createQueue(const std::string &queueName)
Creates a queue identity given a Queue name.
virtual void dispatch(const Pointer< MessageDispatch > &message)
Dispatches a message to a particular consumer.
void clearMessagesInProgress(decaf::lang::Pointer< decaf::util::concurrent::atomic::AtomicInteger > transportsInterrupted)
Request that this Session inform all of its consumers to clear all messages that are currently in pro...
util::LongSequenceGenerator producerSequenceIds
Next available Producer Sequence Id.
Definition ActiveMQSessionKernel.h:115
virtual void setMessageTransformer(cms::MessageTransformer *transformer)
Set an MessageTransformer instance that is passed on to all MessageProducer and MessageConsumer objec...
void close(Pointer< commands::ConsumerId > id)
Close the specified consumer if present in this Session.
virtual cms::Message * createMessage()
Creates a new Message.
virtual void rollback()
Rolls back all messages done in this transaction and releases any locks currently held.
virtual cms::MessageConsumer * createConsumer(const cms::Destination *destination, const std::string &selector)
Creates a MessageConsumer for the specified destination, using a message selector.
void addProducer(Pointer< ActiveMQProducerKernel > producer)
Adds a MessageProducer to this session registering it with the Connection and store a reference to it...
util::LongSequenceGenerator producerIds
Next available Producer Id.
Definition ActiveMQSessionKernel.h:110
virtual void unsubscribe(const std::string &name)
Unsubscribes a durable subscription that has been created by a client.
bool isInUse(Pointer< commands::ActiveMQDestination > destination)
Checks if the given destination is currently in use by any consumers in this Session.
const commands::SessionId & getSessionId() const
Gets the Session Id object for this session, if the session is closed than this method throws an exce...
Definition ActiveMQSessionKernel.h:324
void doClose()
Performs the actual Session close operations.
virtual cms::MapMessage * createMapMessage()
Creates a new MapMessage.
ActiveMQConnection * connection
Connection.
Definition ActiveMQSessionKernel.h:89
Pointer< commands::Response > syncRequest(Pointer< commands::Command > command, unsigned int timeout=0)
Sends a synchronous request and returns the response from the broker.
Pointer< commands::SessionInfo > sessionInfo
SessionInfo for this Session.
Definition ActiveMQSessionKernel.h:79
Pointer< ActiveMQTransactionContext > transaction
Transaction Management object.
Definition ActiveMQSessionKernel.h:84
void removeProducer(Pointer< ActiveMQProducerKernel > producer)
Dispose of a MessageProducer from this session.
virtual cms::TextMessage * createTextMessage(const std::string &text)
Creates a new TextMessage and set the text to the value given.
void deliverAcks()
Request that this Session inform all of its consumers to deliver their pending acks.
Pointer< ActiveMQTransactionContext > getTransactionContext()
Gets the Pointer to this Session's TransactionContext.
Definition ActiveMQSessionKernel.h:447
long long getLastDeliveredSequenceId() const
Gets the currently set Last Delivered Sequence Id.
Definition ActiveMQSessionKernel.h:346
cms::Session::AcknowledgeMode ackMode
This Sessions Acknowledgment mode.
Definition ActiveMQSessionKernel.h:105
void wakeup()
Causes the Session to wakeup its executer and ensure all messages are dispatched.
Pointer< ActiveMQConsumerKernel > lookupConsumerKernel(Pointer< commands::ConsumerId > id)
virtual cms::BytesMessage * createBytesMessage()
Creates a BytesMessage.
ActiveMQSessionKernel(ActiveMQConnection *connection, const Pointer< commands::SessionId > &id, cms::Session::AcknowledgeMode ackMode, const decaf::util::Properties &properties)
void setPrefetchSize(Pointer< commands::ConsumerId > id, int prefetch)
Set the prefetch level for the given consumer if it exists in this Session to the value specified.
bool isSessionAsyncDispatch() const
Returns true if this session is dispatching messages to its consumers asynchronously.
bool iterateConsumers()
Gives each consumer a chance to dispatch messages that have been enqueued by calling each consumers i...
virtual void close()
Closes this session as well as any active child consumers or producers.
virtual bool isIndividualAcknowledge() const
Definition ActiveMQSessionKernel.h:175
Definition ActiveMQException.h:35
This class is used to generate a sequence of long long values that are incremented each time a new va...
Definition LongSequenceGenerator.h:32
Definition MemoryUsage.h:28
Asynchronous event interface for CMS asynchronous operations.
Definition AsyncCallback.h:37
A BytesMessage object is used to send a message containing a stream of unsigned bytes.
Definition BytesMessage.h:66
A Destination object encapsulates a provider-specific address.
Definition Destination.h:39
If a CMS provider detects a serious problem, it notifies the client application through an ExceptionL...
Definition ExceptionListener.h:37
A MapMessage object is used to send a set of name-value pairs.
Definition MapMessage.h:71
A client uses a MessageConsumer to received messages from a destination.
Definition MessageConsumer.h:63
Root of all messages.
Definition Message.h:88
A client uses a MessageProducer object to send messages to a Destination.
Definition MessageProducer.h:60
Provides an interface for clients to transform cms::Message objects inside the CMS MessageProducer an...
Definition MessageTransformer.h:37
This class implements in interface for browsing the messages in a Queue without removing them.
Definition QueueBrowser.h:53
An interface encapsulating a provider-specific queue name.
Definition Queue.h:37
A Session object is a single-threaded context for producing and consuming messages.
Definition Session.h:105
AcknowledgeMode
Definition Session.h:108
@ INDIVIDUAL_ACKNOWLEDGE
Message will be acknowledged individually.
Definition Session.h:146
@ AUTO_ACKNOWLEDGE
With this acknowledgment mode, the session automatically acknowledges a client's receipt of a message...
Definition Session.h:117
@ DUPS_OK_ACKNOWLEDGE
With this acknowledgment mode, the session automatically acknowledges a client's receipt of a message...
Definition Session.h:128
@ CLIENT_ACKNOWLEDGE
With this acknowledgment mode, the client acknowledges a consumed message by calling the message's ac...
Definition Session.h:134
Interface for a StreamMessage.
Definition StreamMessage.h:61
Defines a Temporary Queue based Destination.
Definition TemporaryQueue.h:39
Defines a Temporary Topic based Destination.
Definition TemporaryTopic.h:39
Interface for a text message.
Definition TextMessage.h:41
An interface encapsulating a provider-specific topic name.
Definition Topic.h:36
Decaf's implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition Pointer.h:53
Definition ArrayList.h:39
Java-like properties class for mapping string names to string values.
Definition Properties.h:53
A boolean value that may be updated atomically.
Definition AtomicBoolean.h:34
Definition ActiveMQQueueBrowser.h:37
Definition ActiveMQTempDestination.h:29
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
Definition CachedConsumer.h:24