activemq-cpp-3.9.5
ActiveMQConsumerKernel.h
Go to the documentation of this file.
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17#ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQCONSUMERKERNEL_H_
18#define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQCONSUMERKERNEL_H_
19
20#include <cms/MessageConsumer.h>
21#include <cms/MessageListener.h>
23#include <cms/Message.h>
24#include <cms/CMSException.h>
25
34
36#include <decaf/lang/Pointer.h>
38
39namespace activemq {
40namespace core {
41namespace kernels {
42
43 using decaf::lang::Pointer;
44 using decaf::util::concurrent::atomic::AtomicBoolean;
45
47 class ActiveMQConsumerKernelConfig;
48
49 class AMQCPP_API ActiveMQConsumerKernel : public cms::MessageConsumer, public Dispatcher {
50 private:
51
55 ActiveMQConsumerKernelConfig* internal;
56
60 ActiveMQSessionKernel* session;
61
66
67 private:
68
69 ActiveMQConsumerKernel(const ActiveMQConsumerKernel&);
70 ActiveMQConsumerKernel& operator=(const ActiveMQConsumerKernel&);
71
72 public:
73
77 const std::string& name,
78 const std::string& selector,
79 int prefetch,
80 int maxPendingMessageCount,
81 bool noLocal,
82 bool browser,
83 bool dispatchAsync,
84 cms::MessageListener* listener);
85
87
88 public: // Interface Implementation
89
90 virtual void start();
91
92 virtual void stop();
93
94 virtual void close();
95
97
98 virtual cms::Message* receive(int millisecs);
99
101
103
105
107
109
110 virtual std::string getMessageSelector() const;
111
113
115
116 public: // Dispatcher Methods
117
118 virtual void dispatch( const Pointer<MessageDispatch>& message );
119
120 virtual int getHashCode() const;
121
122 public: // ActiveMQConsumerKernel Methods
123
130
137
144
150 void commit();
151
157 void rollback();
158
164 void doClose();
165
171 void dispose();
172
178
184
188 bool isClosed() const;
189
195
201
207 bool iterate();
208
215
220
226
232 long long getLastDeliveredSequenceId() const;
233
240
248
255
263 void setFailoverRedeliveryWaitPeriod(long long value);
264
271 void setLastDeliveredSequenceId(long long value);
272
277
288
296
304
312
317 void setPrefetchSize(int prefetchSize);
318
325
333
343
348
355 void setOptimizeAcknowledge(bool value);
356
361
369 void setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled);
370
380
381 protected:
382
399
405
412
413 private:
414
416
417 void applyDestinationOptions(Pointer<commands::ConsumerInfo> info);
418
419 void sendPullRequest(long long timeout);
420
421 void checkClosed() const;
422
423 void checkMessageListener() const;
424
425 void ackLater(Pointer<commands::MessageDispatch> message, int ackType);
426
427 void immediateIndividualTransactedAck(Pointer<commands::MessageDispatch> dispatch);
428
429 Pointer<commands::MessageAck> makeAckForAllDeliveredMessages(int type);
430
431 bool isAutoAcknowledgeEach() const;
432
433 bool isAutoAcknowledgeBatch() const;
434
435 void registerSync();
436
437 void clearDeliveredList();
438
439 };
440
441}}}
442
443#endif /* _ACTIVEMQ_CORE_KERNELS_ACTIVEMQCONSUMERKERNEL_H_ */
#define AMQCPP_API
Definition Config.h:30
Interface for an object responsible for dispatching messages to consumers.
Definition Dispatcher.h:32
Interface for a RedeliveryPolicy object that controls how message Redelivery is handled in ActiveMQ-C...
Definition RedeliveryPolicy.h:34
bool isSynchronizationRegistered() const
Has this Consumer Transaction Synchronization been added to the transaction.
decaf::lang::Exception * getFailureError() const
Gets the error that caused this Consumer to be in a Failed state, or NULL if there is no Error.
virtual cms::Message * receive(int millisecs)
Synchronously Receive a Message, time out after defined interval.
bool iterate()
Deliver any pending messages to the registered MessageListener if there is one, return true if not al...
bool isInUse(Pointer< commands::ActiveMQDestination > destination) const
Checks if the given destination is the Destination that this Consumer is subscribed to.
virtual std::string getMessageSelector() const
Gets this message consumer's message selector expression.
void dispose()
Cleans up this objects internal resources.
virtual void setMessageTransformer(cms::MessageTransformer *transformer)
Set an MessageTransformer instance that is applied to all cms::Message objects before they are dispat...
virtual cms::MessageListener * getMessageListener() const
Gets the MessageListener that this class will send mew Message notification events to.
bool isTransactedIndividualAck() const
Will Message's in a transaction be acknowledged using the Individual Acknowledge mode.
long long setFailoverRedeliveryWaitPeriod() const
Returns the delay after a failover before Message redelivery starts.
void commit()
Called to Commit the current set of messages in this Transaction.
void setOptimizedAckScheduledAckInterval(long long value)
Sets the time in Milliseconds to schedule an automatic acknowledge of outstanding messages when optim...
virtual void setMessageAvailableListener(cms::MessageAvailableListener *listener)
Sets the MessageAvailableListener that this class will send events to if the consumer is in synchrono...
virtual void setMessageListener(cms::MessageListener *listener)
Sets the MessageListener that this class will send notifs on.
virtual cms::MessageTransformer * getMessageTransformer() const
Gets the currently configured MessageTransformer for this MessageConsumer.
void setFailoverRedeliveryWaitPeriod(long long value)
Sets the time in milliseconds to delay after failover before starting message redelivery.
void doClose()
Performs the actual close operation on this consumer.
virtual int getHashCode() const
HashCode method allowing Dispatcher instances to be used in HashMap etc.
const Pointer< commands::ConsumerInfo > & getConsumerInfo() const
Get the Consumer information for this consumer.
void afterMessageIsConsumed(Pointer< commands::MessageDispatch > dispatch, bool messageExpired)
Post-consume processing.
virtual cms::Message * receive()
Synchronously Receive a Message.
void setPrefetchSize(int prefetchSize)
Sets the current prefetch size for the consumer as indicated by a Broker ConsumerControl command.
void setOptimizeAcknowledge(bool value)
Enable or disable optimized acknowledge for this consumer.
void setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled)
Configures whether this consumer will perform message expiration processing on all incoming messages.
virtual cms::Message * receiveNoWait()
Receive a Message, does not wait if there isn't a new message to read, returns NULL if nothing read.
bool isRedeliveryExpectedInCurrentTransaction(Pointer< commands::MessageDispatch > dispatch) const
Returns true if the given MessageDispatch is expected to be redelivered in the currently open transac...
void setRedeliveryPolicy(RedeliveryPolicy *policy)
Sets the RedeliveryPolicy this Consumer should use when a rollback is performed on a transacted Consu...
RedeliveryPolicy * getRedeliveryPolicy() const
Gets a pointer to this Consumer's Redelivery Policy object, the Consumer retains ownership of this po...
void acknowledge(Pointer< commands::MessageDispatch > dispatch)
Method called to acknowledge the Message contained in the given MessageDispatch.
void acknowledge()
Method called to acknowledge all messages that have been received so far.
void setFailureError(decaf::lang::Exception *error)
Sets the Exception that has caused this Consumer to be in a failed state.
void deliverAcks()
Forces this consumer to send all pending acks to the broker.
void acknowledge(Pointer< commands::MessageDispatch > dispatch, int ackType)
Method called to acknowledge the Message contained in the given MessageDispatch.
virtual cms::MessageAvailableListener * getMessageAvailableListener() const
Gets the MessageAvailableListener that this class will send mew Message notification events to.
const Pointer< commands::ConsumerId > & getConsumerId() const
Get the Consumer Id for this consumer.
void clearMessagesInProgress()
Called on a Failover to clear any pending messages.
void setLastDeliveredSequenceId(long long value)
Sets the value of the Last Delivered Sequence Id.
void setSynchronizationRegistered(bool value)
Sets the Synchronization Registered state of this consumer.
void rollback()
Called to Roll back the current set of messages in this Transaction.
virtual void start()
Starts the service.
long long getOptimizedAckScheduledAckInterval() const
Time in Milliseconds before an automatic acknowledge is done for any outstanding delivered Messages.
void setTransactedIndividualAck(bool value)
Set if Message's in a transaction be acknowledged using the Individual Acknowledge mode.
long long getLastDeliveredSequenceId() const
Gets the currently set Last Delivered Sequence Id.
virtual void close()
Closes this object and deallocates the appropriate resources.
ActiveMQConsumerKernel(ActiveMQSessionKernel *session, const Pointer< commands::ConsumerId > &id, const Pointer< commands::ActiveMQDestination > &destination, const std::string &name, const std::string &selector, int prefetch, int maxPendingMessageCount, bool noLocal, bool browser, bool dispatchAsync, cms::MessageListener *listener)
Pointer< MessageDispatch > dequeue(long long timeout)
Used by synchronous receive methods to wait for messages to come in.
virtual void stop()
Stops this service.
void inProgressClearRequired()
Signals that a Failure occurred and that anything in-progress in the consumer should be cleared.
void beforeMessageIsConsumed(Pointer< commands::MessageDispatch > dispatch)
Pre-consume processing.
virtual void dispatch(const Pointer< MessageDispatch > &message)
Dispatches a message to a particular consumer.
Definition ActiveMQSessionKernel.h:67
A listener interface similar to the MessageListener interface.
Definition MessageAvailableListener.h:33
A client uses a MessageConsumer to received messages from a destination.
Definition MessageConsumer.h:63
Root of all messages.
Definition Message.h:88
A MessageListener object is used to receive asynchronously delivered messages.
Definition MessageListener.h:33
Provides an interface for clients to transform cms::Message objects inside the CMS MessageProducer an...
Definition MessageTransformer.h:37
Definition Exception.h:38
Decaf's implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition Pointer.h:53
Definition ActiveMQQueueBrowser.h:37
Definition ActiveMQTempDestination.h:29
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
Definition CachedConsumer.h:24