Package proton
[frames] | no frames]

Source Code for Package proton

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  The proton module defines a suite of APIs that implement the AMQP 1.0 
  22  protocol. 
  23   
  24  The proton APIs consist of the following classes: 
  25   
  26   - L{Messenger} -- A messaging endpoint. 
  27   - L{Message}   -- A class for creating and/or accessing AMQP message content. 
  28   - L{Data}      -- A class for creating and/or accessing arbitrary AMQP encoded 
  29                    data. 
  30   
  31  """ 
  32  from __future__ import absolute_import 
  33   
  34  from cproton import * 
  35  from .wrapper import Wrapper 
  36  from proton import _compat 
  37   
  38  import weakref, socket, sys, threading 
  39   
  40  try: 
  41    import uuid 
42 43 - def generate_uuid():
44 return uuid.uuid4()
45 46 except ImportError: 47 """ 48 No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases. 49 """ 50 import struct
51 - class uuid:
52 - class UUID:
53 - def __init__(self, hex=None, bytes=None):
54 if [hex, bytes].count(None) != 1: 55 raise TypeError("need one of hex or bytes") 56 if bytes is not None: 57 self.bytes = bytes 58 elif hex is not None: 59 fields=hex.split("-") 60 fields[4:5] = [fields[4][:4], fields[4][4:]] 61 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
62
63 - def __cmp__(self, other):
64 if isinstance(other, uuid.UUID): 65 return cmp(self.bytes, other.bytes) 66 else: 67 return -1
68
69 - def __str__(self):
70 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
71
72 - def __repr__(self):
73 return "UUID(%r)" % str(self)
74
75 - def __hash__(self):
76 return self.bytes.__hash__()
77 78 import os, random, time 79 rand = random.Random() 80 rand.seed((os.getpid(), time.time(), socket.gethostname()))
81 - def random_uuid():
82 data = [rand.randint(0, 255) for i in xrange(16)] 83 84 # From RFC4122, the version bits are set to 0100 85 data[6] &= 0x0F 86 data[6] |= 0x40 87 88 # From RFC4122, the top two bits of byte 8 get set to 01 89 data[8] &= 0x3F 90 data[8] |= 0x80 91 return "".join(map(chr, data))
92
93 - def uuid4():
94 return uuid.UUID(bytes=random_uuid())
95
96 - def generate_uuid():
97 return uuid4()
98 99 # 100 # Hacks to provide Python2 <---> Python3 compatibility 101 # 102 try: 103 bytes() 104 except NameError: 105 bytes = str 106 try: 107 long() 108 except NameError: 109 long = int 110 try: 111 unicode() 112 except NameError: 113 unicode = str 114 115 116 VERSION_MAJOR = PN_VERSION_MAJOR 117 VERSION_MINOR = PN_VERSION_MINOR 118 VERSION_POINT = PN_VERSION_POINT 119 VERSION = (VERSION_MAJOR, VERSION_MINOR, VERSION_POINT) 120 API_LANGUAGE = "C" 121 IMPLEMENTATION_LANGUAGE = "C"
122 123 -class Constant(object):
124
125 - def __init__(self, name):
126 self.name = name
127
128 - def __repr__(self):
129 return self.name
130
131 -class ProtonException(Exception):
132 """ 133 The root of the proton exception hierarchy. All proton exception 134 classes derive from this exception. 135 """ 136 pass
137
138 -class Timeout(ProtonException):
139 """ 140 A timeout exception indicates that a blocking operation has timed 141 out. 142 """ 143 pass
144
145 -class Interrupt(ProtonException):
146 """ 147 An interrupt exception indicaes that a blocking operation was interrupted. 148 """ 149 pass
150
151 -class MessengerException(ProtonException):
152 """ 153 The root of the messenger exception hierarchy. All exceptions 154 generated by the messenger class derive from this exception. 155 """ 156 pass
157
158 -class MessageException(ProtonException):
159 """ 160 The MessageException class is the root of the message exception 161 hierarhcy. All exceptions generated by the Message class derive from 162 this exception. 163 """ 164 pass
165 166 EXCEPTIONS = { 167 PN_TIMEOUT: Timeout, 168 PN_INTR: Interrupt 169 } 170 171 PENDING = Constant("PENDING") 172 ACCEPTED = Constant("ACCEPTED") 173 REJECTED = Constant("REJECTED") 174 RELEASED = Constant("RELEASED") 175 MODIFIED = Constant("MODIFIED") 176 ABORTED = Constant("ABORTED") 177 SETTLED = Constant("SETTLED") 178 179 STATUSES = { 180 PN_STATUS_ABORTED: ABORTED, 181 PN_STATUS_ACCEPTED: ACCEPTED, 182 PN_STATUS_REJECTED: REJECTED, 183 PN_STATUS_RELEASED: RELEASED, 184 PN_STATUS_MODIFIED: MODIFIED, 185 PN_STATUS_PENDING: PENDING, 186 PN_STATUS_SETTLED: SETTLED, 187 PN_STATUS_UNKNOWN: None 188 } 189 190 AUTOMATIC = Constant("AUTOMATIC") 191 MANUAL = Constant("MANUAL")
192 193 -class Messenger(object):
194 """ 195 The L{Messenger} class defines a high level interface for sending 196 and receiving L{Messages<Message>}. Every L{Messenger} contains a 197 single logical queue of incoming messages and a single logical queue 198 of outgoing messages. These messages in these queues may be destined 199 for, or originate from, a variety of addresses. 200 201 The messenger interface is single-threaded. All methods 202 except one (L{interrupt}) are intended to be used from within 203 the messenger thread. 204 205 206 Address Syntax 207 ============== 208 209 An address has the following form:: 210 211 [ amqp[s]:// ] [user[:password]@] domain [/[name]] 212 213 Where domain can be one of:: 214 215 host | host:port | ip | ip:port | name 216 217 The following are valid examples of addresses: 218 219 - example.org 220 - example.org:1234 221 - amqp://example.org 222 - amqps://example.org 223 - example.org/incoming 224 - amqps://example.org/outgoing 225 - amqps://fred:trustno1@example.org 226 - 127.0.0.1:1234 227 - amqps://127.0.0.1:1234 228 229 Sending & Receiving Messages 230 ============================ 231 232 The L{Messenger} class works in conjuction with the L{Message} class. The 233 L{Message} class is a mutable holder of message content. 234 235 The L{put} method copies its L{Message} to the outgoing queue, and may 236 send queued messages if it can do so without blocking. The L{send} 237 method blocks until it has sent the requested number of messages, 238 or until a timeout interrupts the attempt. 239 240 241 >>> message = Message() 242 >>> for i in range(3): 243 ... message.address = "amqp://host/queue" 244 ... message.subject = "Hello World %i" % i 245 ... messenger.put(message) 246 >>> messenger.send() 247 248 Similarly, the L{recv} method receives messages into the incoming 249 queue, and may block as it attempts to receive the requested number 250 of messages, or until timeout is reached. It may receive fewer 251 than the requested number. The L{get} method pops the 252 eldest L{Message} off the incoming queue and copies it into the L{Message} 253 object that you supply. It will not block. 254 255 256 >>> message = Message() 257 >>> messenger.recv(10): 258 >>> while messenger.incoming > 0: 259 ... messenger.get(message) 260 ... print message.subject 261 Hello World 0 262 Hello World 1 263 Hello World 2 264 265 The blocking flag allows you to turn off blocking behavior entirely, 266 in which case L{send} and L{recv} will do whatever they can without 267 blocking, and then return. You can then look at the number 268 of incoming and outgoing messages to see how much outstanding work 269 still remains. 270 """ 271
272 - def __init__(self, name=None):
273 """ 274 Construct a new L{Messenger} with the given name. The name has 275 global scope. If a NULL name is supplied, a UUID based name will 276 be chosen. 277 278 @type name: string 279 @param name: the name of the messenger or None 280 281 """ 282 self._mng = pn_messenger(name) 283 self._selectables = {}
284
285 - def __del__(self):
286 """ 287 Destroy the L{Messenger}. This will close all connections that 288 are managed by the L{Messenger}. Call the L{stop} method before 289 destroying the L{Messenger}. 290 """ 291 if hasattr(self, "_mng"): 292 pn_messenger_free(self._mng) 293 del self._mng
294
295 - def _check(self, err):
296 if err < 0: 297 if (err == PN_INPROGRESS): 298 return 299 exc = EXCEPTIONS.get(err, MessengerException) 300 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng)))) 301 else: 302 return err
303 304 @property
305 - def name(self):
306 """ 307 The name of the L{Messenger}. 308 """ 309 return pn_messenger_name(self._mng)
310
311 - def _get_certificate(self):
312 return pn_messenger_get_certificate(self._mng)
313
314 - def _set_certificate(self, value):
315 self._check(pn_messenger_set_certificate(self._mng, value))
316 317 certificate = property(_get_certificate, _set_certificate, 318 doc=""" 319 Path to a certificate file for the L{Messenger}. This certificate is 320 used when the L{Messenger} accepts or establishes SSL/TLS connections. 321 This property must be specified for the L{Messenger} to accept 322 incoming SSL/TLS connections and to establish client authenticated 323 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS 324 connections do not require this property. 325 """) 326
327 - def _get_private_key(self):
328 return pn_messenger_get_private_key(self._mng)
329
330 - def _set_private_key(self, value):
331 self._check(pn_messenger_set_private_key(self._mng, value))
332 333 private_key = property(_get_private_key, _set_private_key, 334 doc=""" 335 Path to a private key file for the L{Messenger's<Messenger>} 336 certificate. This property must be specified for the L{Messenger} to 337 accept incoming SSL/TLS connections and to establish client 338 authenticated outgoing SSL/TLS connection. Non client authenticated 339 SSL/TLS connections do not require this property. 340 """) 341
342 - def _get_password(self):
343 return pn_messenger_get_password(self._mng)
344
345 - def _set_password(self, value):
346 self._check(pn_messenger_set_password(self._mng, value))
347 348 password = property(_get_password, _set_password, 349 doc=""" 350 This property contains the password for the L{Messenger.private_key} 351 file, or None if the file is not encrypted. 352 """) 353
354 - def _get_trusted_certificates(self):
355 return pn_messenger_get_trusted_certificates(self._mng)
356
357 - def _set_trusted_certificates(self, value):
358 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
359 360 trusted_certificates = property(_get_trusted_certificates, 361 _set_trusted_certificates, 362 doc=""" 363 A path to a database of trusted certificates for use in verifying the 364 peer on an SSL/TLS connection. If this property is None, then the peer 365 will not be verified. 366 """) 367
368 - def _get_timeout(self):
369 t = pn_messenger_get_timeout(self._mng) 370 if t == -1: 371 return None 372 else: 373 return millis2secs(t)
374
375 - def _set_timeout(self, value):
376 if value is None: 377 t = -1 378 else: 379 t = secs2millis(value) 380 self._check(pn_messenger_set_timeout(self._mng, t))
381 382 timeout = property(_get_timeout, _set_timeout, 383 doc=""" 384 The timeout property contains the default timeout for blocking 385 operations performed by the L{Messenger}. 386 """) 387
388 - def _is_blocking(self):
389 return pn_messenger_is_blocking(self._mng)
390
391 - def _set_blocking(self, b):
392 self._check(pn_messenger_set_blocking(self._mng, b))
393 394 blocking = property(_is_blocking, _set_blocking, 395 doc=""" 396 Enable or disable blocking behavior during L{Message} sending 397 and receiving. This affects every blocking call, with the 398 exception of L{work}. Currently, the affected calls are 399 L{send}, L{recv}, and L{stop}. 400 """) 401
402 - def _is_passive(self):
403 return pn_messenger_is_passive(self._mng)
404
405 - def _set_passive(self, b):
406 self._check(pn_messenger_set_passive(self._mng, b))
407 408 passive = property(_is_passive, _set_passive, 409 doc=""" 410 When passive is set to true, Messenger will not attempt to perform I/O 411 internally. In this mode it is necessary to use the selectables API to 412 drive any I/O needed to perform requested actions. In this mode 413 Messenger will never block. 414 """) 415
416 - def _get_incoming_window(self):
417 return pn_messenger_get_incoming_window(self._mng)
418
419 - def _set_incoming_window(self, window):
420 self._check(pn_messenger_set_incoming_window(self._mng, window))
421 422 incoming_window = property(_get_incoming_window, _set_incoming_window, 423 doc=""" 424 The incoming tracking window for the messenger. The messenger will 425 track the remote status of this many incoming deliveries after they 426 have been accepted or rejected. Defaults to zero. 427 428 L{Messages<Message>} enter this window only when you take them into your application 429 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>} 430 without explicitly accepting or rejecting the oldest message, then the 431 message that passes beyond the edge of the incoming window will be assigned 432 the default disposition of its link. 433 """) 434
435 - def _get_outgoing_window(self):
436 return pn_messenger_get_outgoing_window(self._mng)
437
438 - def _set_outgoing_window(self, window):
439 self._check(pn_messenger_set_outgoing_window(self._mng, window))
440 441 outgoing_window = property(_get_outgoing_window, _set_outgoing_window, 442 doc=""" 443 The outgoing tracking window for the messenger. The messenger will 444 track the remote status of this many outgoing deliveries after calling 445 send. Defaults to zero. 446 447 A L{Message} enters this window when you call the put() method with the 448 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1 449 times, status information will no longer be available for the 450 first message. 451 """) 452
453 - def start(self):
454 """ 455 Currently a no-op placeholder. 456 For future compatibility, do not L{send} or L{recv} messages 457 before starting the L{Messenger}. 458 """ 459 self._check(pn_messenger_start(self._mng))
460
461 - def stop(self):
462 """ 463 Transitions the L{Messenger} to an inactive state. An inactive 464 L{Messenger} will not send or receive messages from its internal 465 queues. A L{Messenger} should be stopped before being discarded to 466 ensure a clean shutdown handshake occurs on any internally managed 467 connections. 468 """ 469 self._check(pn_messenger_stop(self._mng))
470 471 @property
472 - def stopped(self):
473 """ 474 Returns true iff a L{Messenger} is in the stopped state. 475 This function does not block. 476 """ 477 return pn_messenger_stopped(self._mng)
478
479 - def subscribe(self, source):
480 """ 481 Subscribes the L{Messenger} to messages originating from the 482 specified source. The source is an address as specified in the 483 L{Messenger} introduction with the following addition. If the 484 domain portion of the address begins with the '~' character, the 485 L{Messenger} will interpret the domain as host/port, bind to it, 486 and listen for incoming messages. For example "~0.0.0.0", 487 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any 488 local interface and listen for incoming messages with the last 489 variant only permitting incoming SSL connections. 490 491 @type source: string 492 @param source: the source of messages to subscribe to 493 """ 494 sub_impl = pn_messenger_subscribe(self._mng, source) 495 if not sub_impl: 496 self._check(pn_error_code(pn_messenger_error(self._mng))) 497 raise MessengerException("Cannot subscribe to %s"%source) 498 return Subscription(sub_impl)
499
500 - def put(self, message):
501 """ 502 Places the content contained in the message onto the outgoing 503 queue of the L{Messenger}. This method will never block, however 504 it will send any unblocked L{Messages<Message>} in the outgoing 505 queue immediately and leave any blocked L{Messages<Message>} 506 remaining in the outgoing queue. The L{send} call may be used to 507 block until the outgoing queue is empty. The L{outgoing} property 508 may be used to check the depth of the outgoing queue. 509 510 When the content in a given L{Message} object is copied to the outgoing 511 message queue, you may then modify or discard the L{Message} object 512 without having any impact on the content in the outgoing queue. 513 514 This method returns an outgoing tracker for the L{Message}. The tracker 515 can be used to determine the delivery status of the L{Message}. 516 517 @type message: Message 518 @param message: the message to place in the outgoing queue 519 @return: a tracker 520 """ 521 message._pre_encode() 522 self._check(pn_messenger_put(self._mng, message._msg)) 523 return pn_messenger_outgoing_tracker(self._mng)
524
525 - def status(self, tracker):
526 """ 527 Gets the last known remote state of the delivery associated with 528 the given tracker. 529 530 @type tracker: tracker 531 @param tracker: the tracker whose status is to be retrieved 532 533 @return: one of None, PENDING, REJECTED, MODIFIED, or ACCEPTED 534 """ 535 disp = pn_messenger_status(self._mng, tracker); 536 return STATUSES.get(disp, disp)
537
538 - def buffered(self, tracker):
539 """ 540 Checks if the delivery associated with the given tracker is still 541 waiting to be sent. 542 543 @type tracker: tracker 544 @param tracker: the tracker whose status is to be retrieved 545 546 @return: true if delivery is still buffered 547 """ 548 return pn_messenger_buffered(self._mng, tracker);
549
550 - def settle(self, tracker=None):
551 """ 552 Frees a L{Messenger} from tracking the status associated with a given 553 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up 554 to the most recent will be settled. 555 """ 556 if tracker is None: 557 tracker = pn_messenger_outgoing_tracker(self._mng) 558 flags = PN_CUMULATIVE 559 else: 560 flags = 0 561 self._check(pn_messenger_settle(self._mng, tracker, flags))
562
563 - def send(self, n=-1):
564 """ 565 This call will block until the indicated number of L{messages<Message>} 566 have been sent, or until the operation times out. If n is -1 this call will 567 block until all outgoing L{messages<Message>} have been sent. If n is 0 then 568 this call will send whatever it can without blocking. 569 """ 570 self._check(pn_messenger_send(self._mng, n))
571
572 - def recv(self, n=None):
573 """ 574 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value 575 for I{n} is supplied, this call will receive as many L{messages<Message>} as it 576 can buffer internally. If the L{Messenger} is in blocking mode, this 577 call will block until at least one L{Message} is available in the 578 incoming queue. 579 """ 580 if n is None: 581 n = -1 582 self._check(pn_messenger_recv(self._mng, n))
583
584 - def work(self, timeout=None):
585 """ 586 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}. 587 This will block for the indicated timeout. 588 This method may also do I/O work other than sending and receiving 589 L{messages<Message>}. For example, closing connections after messenger.L{stop}() 590 has been called. 591 """ 592 if timeout is None: 593 t = -1 594 else: 595 t = secs2millis(timeout) 596 err = pn_messenger_work(self._mng, t) 597 if (err == PN_TIMEOUT): 598 return False 599 else: 600 self._check(err) 601 return True
602 603 @property
604 - def receiving(self):
605 return pn_messenger_receiving(self._mng)
606
607 - def interrupt(self):
608 """ 609 The L{Messenger} interface is single-threaded. 610 This is the only L{Messenger} function intended to be called 611 from outside of the L{Messenger} thread. 612 Call this from a non-messenger thread to interrupt 613 a L{Messenger} that is blocking. 614 This will cause any in-progress blocking call to throw 615 the L{Interrupt} exception. If there is no currently blocking 616 call, then the next blocking call will be affected, even if it 617 is within the same thread that interrupt was called from. 618 """ 619 self._check(pn_messenger_interrupt(self._mng))
620
621 - def get(self, message=None):
622 """ 623 Moves the message from the head of the incoming message queue into 624 the supplied message object. Any content in the message will be 625 overwritten. 626 627 A tracker for the incoming L{Message} is returned. The tracker can 628 later be used to communicate your acceptance or rejection of the 629 L{Message}. 630 631 If None is passed in for the L{Message} object, the L{Message} 632 popped from the head of the queue is discarded. 633 634 @type message: Message 635 @param message: the destination message object 636 @return: a tracker 637 """ 638 if message is None: 639 impl = None 640 else: 641 impl = message._msg 642 self._check(pn_messenger_get(self._mng, impl)) 643 if message is not None: 644 message._post_decode() 645 return pn_messenger_incoming_tracker(self._mng)
646
647 - def accept(self, tracker=None):
648 """ 649 Signal the sender that you have acted on the L{Message} 650 pointed to by the tracker. If no tracker is supplied, 651 then all messages that have been returned by the L{get} 652 method are accepted, except those that have already been 653 auto-settled by passing beyond your incoming window size. 654 655 @type tracker: tracker 656 @param tracker: a tracker as returned by get 657 """ 658 if tracker is None: 659 tracker = pn_messenger_incoming_tracker(self._mng) 660 flags = PN_CUMULATIVE 661 else: 662 flags = 0 663 self._check(pn_messenger_accept(self._mng, tracker, flags))
664
665 - def reject(self, tracker=None):
666 """ 667 Rejects the L{Message} indicated by the tracker. If no tracker 668 is supplied, all messages that have been returned by the L{get} 669 method are rejected, except those that have already been auto-settled 670 by passing beyond your outgoing window size. 671 672 @type tracker: tracker 673 @param tracker: a tracker as returned by get 674 """ 675 if tracker is None: 676 tracker = pn_messenger_incoming_tracker(self._mng) 677 flags = PN_CUMULATIVE 678 else: 679 flags = 0 680 self._check(pn_messenger_reject(self._mng, tracker, flags))
681 682 @property
683 - def outgoing(self):
684 """ 685 The outgoing queue depth. 686 """ 687 return pn_messenger_outgoing(self._mng)
688 689 @property
690 - def incoming(self):
691 """ 692 The incoming queue depth. 693 """ 694 return pn_messenger_incoming(self._mng)
695
696 - def route(self, pattern, address):
697 """ 698 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table. 699 700 The route procedure may be used to influence how a L{Messenger} will 701 internally treat a given address or class of addresses. Every call 702 to the route procedure will result in L{Messenger} appending a routing 703 rule to its internal routing table. 704 705 Whenever a L{Message} is presented to a L{Messenger} for delivery, it 706 will match the address of this message against the set of routing 707 rules in order. The first rule to match will be triggered, and 708 instead of routing based on the address presented in the message, 709 the L{Messenger} will route based on the address supplied in the rule. 710 711 The pattern matching syntax supports two types of matches, a '%' 712 will match any character except a '/', and a '*' will match any 713 character including a '/'. 714 715 A routing address is specified as a normal AMQP address, however it 716 may additionally use substitution variables from the pattern match 717 that triggered the rule. 718 719 Any message sent to "foo" will be routed to "amqp://foo.com": 720 721 >>> messenger.route("foo", "amqp://foo.com"); 722 723 Any message sent to "foobar" will be routed to 724 "amqp://foo.com/bar": 725 726 >>> messenger.route("foobar", "amqp://foo.com/bar"); 727 728 Any message sent to bar/<path> will be routed to the corresponding 729 path within the amqp://bar.com domain: 730 731 >>> messenger.route("bar/*", "amqp://bar.com/$1"); 732 733 Route all L{messages<Message>} over TLS: 734 735 >>> messenger.route("amqp:*", "amqps:$1") 736 737 Supply credentials for foo.com: 738 739 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1"); 740 741 Supply credentials for all domains: 742 743 >>> messenger.route("amqp://*", "amqp://user:password@$1"); 744 745 Route all addresses through a single proxy while preserving the 746 original destination: 747 748 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); 749 750 Route any address through a single broker: 751 752 >>> messenger.route("*", "amqp://user:password@broker/$1"); 753 """ 754 self._check(pn_messenger_route(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
755
756 - def rewrite(self, pattern, address):
757 """ 758 Similar to route(), except that the destination of 759 the L{Message} is determined before the message address is rewritten. 760 761 The outgoing address is only rewritten after routing has been 762 finalized. If a message has an outgoing address of 763 "amqp://0.0.0.0:5678", and a rewriting rule that changes its 764 outgoing address to "foo", it will still arrive at the peer that 765 is listening on "amqp://0.0.0.0:5678", but when it arrives there, 766 the receiver will see its outgoing address as "foo". 767 768 The default rewrite rule removes username and password from addresses 769 before they are transmitted. 770 """ 771 self._check(pn_messenger_rewrite(self._mng, unicode2utf8(pattern), unicode2utf8(address)))
772
773 - def selectable(self):
774 return Selectable.wrap(pn_messenger_selectable(self._mng))
775 776 @property
777 - def deadline(self):
778 tstamp = pn_messenger_deadline(self._mng) 779 if tstamp: 780 return millis2secs(tstamp) 781 else: 782 return None
783
784 -class Message(object):
785 """The L{Message} class is a mutable holder of message content. 786 787 @ivar instructions: delivery instructions for the message 788 @type instructions: dict 789 @ivar annotations: infrastructure defined message annotations 790 @type annotations: dict 791 @ivar properties: application defined message properties 792 @type properties: dict 793 @ivar body: message body 794 @type body: bytes | unicode | dict | list | int | long | float | UUID 795 """ 796 797 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY 798
799 - def __init__(self, body=None, **kwargs):
800 """ 801 @param kwargs: Message property name/value pairs to initialise the Message 802 """ 803 self._msg = pn_message() 804 self._id = Data(pn_message_id(self._msg)) 805 self._correlation_id = Data(pn_message_correlation_id(self._msg)) 806 self.instructions = None 807 self.annotations = None 808 self.properties = None 809 self.body = body 810 for k,v in _compat.iteritems(kwargs): 811 getattr(self, k) # Raise exception if it's not a valid attribute. 812 setattr(self, k, v)
813
814 - def __del__(self):
815 if hasattr(self, "_msg"): 816 pn_message_free(self._msg) 817 del self._msg
818
819 - def _check(self, err):
820 if err < 0: 821 exc = EXCEPTIONS.get(err, MessageException) 822 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg)))) 823 else: 824 return err
825
826 - def _pre_encode(self):
827 inst = Data(pn_message_instructions(self._msg)) 828 ann = Data(pn_message_annotations(self._msg)) 829 props = Data(pn_message_properties(self._msg)) 830 body = Data(pn_message_body(self._msg)) 831 832 inst.clear() 833 if self.instructions is not None: 834 inst.put_object(self.instructions) 835 ann.clear() 836 if self.annotations is not None: 837 ann.put_object(self.annotations) 838 props.clear() 839 if self.properties is not None: 840 props.put_object(self.properties) 841 body.clear() 842 if self.body is not None: 843 body.put_object(self.body)
844
845 - def _post_decode(self):
846 inst = Data(pn_message_instructions(self._msg)) 847 ann = Data(pn_message_annotations(self._msg)) 848 props = Data(pn_message_properties(self._msg)) 849 body = Data(pn_message_body(self._msg)) 850 851 if inst.next(): 852 self.instructions = inst.get_object() 853 else: 854 self.instructions = None 855 if ann.next(): 856 self.annotations = ann.get_object() 857 else: 858 self.annotations = None 859 if props.next(): 860 self.properties = props.get_object() 861 else: 862 self.properties = None 863 if body.next(): 864 self.body = body.get_object() 865 else: 866 self.body = None
867
868 - def clear(self):
869 """ 870 Clears the contents of the L{Message}. All fields will be reset to 871 their default values. 872 """ 873 pn_message_clear(self._msg) 874 self.instructions = None 875 self.annotations = None 876 self.properties = None 877 self.body = None
878
879 - def _is_inferred(self):
880 return pn_message_is_inferred(self._msg)
881
882 - def _set_inferred(self, value):
883 self._check(pn_message_set_inferred(self._msg, bool(value)))
884 885 inferred = property(_is_inferred, _set_inferred, doc=""" 886 The inferred flag for a message indicates how the message content 887 is encoded into AMQP sections. If inferred is true then binary and 888 list values in the body of the message will be encoded as AMQP DATA 889 and AMQP SEQUENCE sections, respectively. If inferred is false, 890 then all values in the body of the message will be encoded as AMQP 891 VALUE sections regardless of their type. 892 """) 893
894 - def _is_durable(self):
895 return pn_message_is_durable(self._msg)
896
897 - def _set_durable(self, value):
898 self._check(pn_message_set_durable(self._msg, bool(value)))
899 900 durable = property(_is_durable, _set_durable, 901 doc=""" 902 The durable property indicates that the message should be held durably 903 by any intermediaries taking responsibility for the message. 904 """) 905
906 - def _get_priority(self):
907 return pn_message_get_priority(self._msg)
908
909 - def _set_priority(self, value):
910 self._check(pn_message_set_priority(self._msg, value))
911 912 priority = property(_get_priority, _set_priority, 913 doc=""" 914 The priority of the message. 915 """) 916
917 - def _get_ttl(self):
918 return millis2secs(pn_message_get_ttl(self._msg))
919
920 - def _set_ttl(self, value):
921 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
922 923 ttl = property(_get_ttl, _set_ttl, 924 doc=""" 925 The time to live of the message measured in seconds. Expired messages 926 may be dropped. 927 """) 928
929 - def _is_first_acquirer(self):
930 return pn_message_is_first_acquirer(self._msg)
931
932 - def _set_first_acquirer(self, value):
933 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
934 935 first_acquirer = property(_is_first_acquirer, _set_first_acquirer, 936 doc=""" 937 True iff the recipient is the first to acquire the message. 938 """) 939
940 - def _get_delivery_count(self):
941 return pn_message_get_delivery_count(self._msg)
942
943 - def _set_delivery_count(self, value):
944 self._check(pn_message_set_delivery_count(self._msg, value))
945 946 delivery_count = property(_get_delivery_count, _set_delivery_count, 947 doc=""" 948 The number of delivery attempts made for this message. 949 """) 950 951
952 - def _get_id(self):
953 return self._id.get_object()
954 - def _set_id(self, value):
955 if type(value) in _compat.INT_TYPES: 956 value = ulong(value) 957 self._id.rewind() 958 self._id.put_object(value)
959 id = property(_get_id, _set_id, 960 doc=""" 961 The id of the message. 962 """) 963
964 - def _get_user_id(self):
965 return pn_message_get_user_id(self._msg)
966
967 - def _set_user_id(self, value):
968 self._check(pn_message_set_user_id(self._msg, value))
969 970 user_id = property(_get_user_id, _set_user_id, 971 doc=""" 972 The user id of the message creator. 973 """) 974
975 - def _get_address(self):
976 return utf82unicode(pn_message_get_address(self._msg))
977
978 - def _set_address(self, value):
979 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
980 981 address = property(_get_address, _set_address, 982 doc=""" 983 The address of the message. 984 """) 985
986 - def _get_subject(self):
987 return utf82unicode(pn_message_get_subject(self._msg))
988
989 - def _set_subject(self, value):
990 self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
991 992 subject = property(_get_subject, _set_subject, 993 doc=""" 994 The subject of the message. 995 """) 996
997 - def _get_reply_to(self):
998 return utf82unicode(pn_message_get_reply_to(self._msg))
999
1000 - def _set_reply_to(self, value):
1001 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
1002 1003 reply_to = property(_get_reply_to, _set_reply_to, 1004 doc=""" 1005 The reply-to address for the message. 1006 """) 1007
1008 - def _get_correlation_id(self):
1009 return self._correlation_id.get_object()
1010 - def _set_correlation_id(self, value):
1011 if type(value) in _compat.INT_TYPES: 1012 value = ulong(value) 1013 self._correlation_id.rewind() 1014 self._correlation_id.put_object(value)
1015 1016 correlation_id = property(_get_correlation_id, _set_correlation_id, 1017 doc=""" 1018 The correlation-id for the message. 1019 """) 1020
1021 - def _get_content_type(self):
1022 return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
1023
1024 - def _set_content_type(self, value):
1025 self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
1026 1027 content_type = property(_get_content_type, _set_content_type, 1028 doc=""" 1029 The content-type of the message. 1030 """) 1031
1032 - def _get_content_encoding(self):
1033 return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
1034
1035 - def _set_content_encoding(self, value):
1036 self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
1037 1038 content_encoding = property(_get_content_encoding, _set_content_encoding, 1039 doc=""" 1040 The content-encoding of the message. 1041 """) 1042
1043 - def _get_expiry_time(self):
1044 return millis2secs(pn_message_get_expiry_time(self._msg))
1045
1046 - def _set_expiry_time(self, value):
1047 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
1048 1049 expiry_time = property(_get_expiry_time, _set_expiry_time, 1050 doc=""" 1051 The expiry time of the message. 1052 """) 1053
1054 - def _get_creation_time(self):
1055 return millis2secs(pn_message_get_creation_time(self._msg))
1056
1057 - def _set_creation_time(self, value):
1058 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
1059 1060 creation_time = property(_get_creation_time, _set_creation_time, 1061 doc=""" 1062 The creation time of the message. 1063 """) 1064
1065 - def _get_group_id(self):
1066 return utf82unicode(pn_message_get_group_id(self._msg))
1067
1068 - def _set_group_id(self, value):
1069 self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
1070 1071 group_id = property(_get_group_id, _set_group_id, 1072 doc=""" 1073 The group id of the message. 1074 """) 1075
1076 - def _get_group_sequence(self):
1077 return pn_message_get_group_sequence(self._msg)
1078
1079 - def _set_group_sequence(self, value):
1080 self._check(pn_message_set_group_sequence(self._msg, value))
1081 1082 group_sequence = property(_get_group_sequence, _set_group_sequence, 1083 doc=""" 1084 The sequence of the message within its group. 1085 """) 1086
1087 - def _get_reply_to_group_id(self):
1088 return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
1089
1090 - def _set_reply_to_group_id(self, value):
1091 self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
1092 1093 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, 1094 doc=""" 1095 The group-id for any replies. 1096 """) 1097
1098 - def encode(self):
1099 self._pre_encode() 1100 sz = 16 1101 while True: 1102 err, data = pn_message_encode(self._msg, sz) 1103 if err == PN_OVERFLOW: 1104 sz *= 2 1105 continue 1106 else: 1107 self._check(err) 1108 return data
1109
1110 - def decode(self, data):
1111 self._check(pn_message_decode(self._msg, data)) 1112 self._post_decode()
1113
1114 - def send(self, sender, tag=None):
1115 dlv = sender.delivery(tag or sender.delivery_tag()) 1116 encoded = self.encode() 1117 sender.stream(encoded) 1118 sender.advance() 1119 if sender.snd_settle_mode == Link.SND_SETTLED: 1120 dlv.settle() 1121 return dlv
1122
1123 - def recv(self, link):
1124 """ 1125 Receives and decodes the message content for the current delivery 1126 from the link. Upon success it will return the current delivery 1127 for the link. If there is no current delivery, or if the current 1128 delivery is incomplete, or if the link is not a receiver, it will 1129 return None. 1130 1131 @type link: Link 1132 @param link: the link to receive a message from 1133 @return the delivery associated with the decoded message (or None) 1134 1135 """ 1136 if link.is_sender: return None 1137 dlv = link.current 1138 if not dlv or dlv.partial: return None 1139 dlv.encoded = link.recv(dlv.pending) 1140 link.advance() 1141 # the sender has already forgotten about the delivery, so we might 1142 # as well too 1143 if link.remote_snd_settle_mode == Link.SND_SETTLED: 1144 dlv.settle() 1145 self.decode(dlv.encoded) 1146 return dlv
1147
1148 - def __repr2__(self):
1149 props = [] 1150 for attr in ("inferred", "address", "reply_to", "durable", "ttl", 1151 "priority", "first_acquirer", "delivery_count", "id", 1152 "correlation_id", "user_id", "group_id", "group_sequence", 1153 "reply_to_group_id", "instructions", "annotations", 1154 "properties", "body"): 1155 value = getattr(self, attr) 1156 if value: props.append("%s=%r" % (attr, value)) 1157 return "Message(%s)" % ", ".join(props)
1158
1159 - def __repr__(self):
1160 tmp = pn_string(None) 1161 err = pn_inspect(self._msg, tmp) 1162 result = pn_string_get(tmp) 1163 pn_free(tmp) 1164 self._check(err) 1165 return result
1166
1167 -class Subscription(object):
1168
1169 - def __init__(self, impl):
1170 self._impl = impl
1171 1172 @property
1173 - def address(self):
1174 return pn_subscription_address(self._impl)
1175 1176 _DEFAULT = object()
1177 1178 -class Selectable(Wrapper):
1179 1180 @staticmethod
1181 - def wrap(impl):
1182 if impl is None: 1183 return None 1184 else: 1185 return Selectable(impl)
1186
1187 - def __init__(self, impl):
1188 Wrapper.__init__(self, impl, pn_selectable_attachments)
1189
1190 - def _init(self):
1191 pass
1192
1193 - def fileno(self, fd = _DEFAULT):
1194 if fd is _DEFAULT: 1195 return pn_selectable_get_fd(self._impl) 1196 elif fd is None: 1197 pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET) 1198 else: 1199 pn_selectable_set_fd(self._impl, fd)
1200
1201 - def _is_reading(self):
1202 return pn_selectable_is_reading(self._impl)
1203
1204 - def _set_reading(self, val):
1205 pn_selectable_set_reading(self._impl, bool(val))
1206 1207 reading = property(_is_reading, _set_reading) 1208
1209 - def _is_writing(self):
1210 return pn_selectable_is_writing(self._impl)
1211
1212 - def _set_writing(self, val):
1213 pn_selectable_set_writing(self._impl, bool(val))
1214 1215 writing = property(_is_writing, _set_writing) 1216
1217 - def _get_deadline(self):
1218 tstamp = pn_selectable_get_deadline(self._impl) 1219 if tstamp: 1220 return millis2secs(tstamp) 1221 else: 1222 return None
1223
1224 - def _set_deadline(self, deadline):
1225 pn_selectable_set_deadline(self._impl, secs2millis(deadline))
1226 1227 deadline = property(_get_deadline, _set_deadline) 1228
1229 - def readable(self):
1230 pn_selectable_readable(self._impl)
1231
1232 - def writable(self):
1233 pn_selectable_writable(self._impl)
1234
1235 - def expired(self):
1236 pn_selectable_expired(self._impl)
1237
1238 - def _is_registered(self):
1239 return pn_selectable_is_registered(self._impl)
1240
1241 - def _set_registered(self, registered):
1242 pn_selectable_set_registered(self._impl, registered)
1243 1244 registered = property(_is_registered, _set_registered, 1245 doc=""" 1246 The registered property may be get/set by an I/O polling system to 1247 indicate whether the fd has been registered or not. 1248 """) 1249 1250 @property
1251 - def is_terminal(self):
1252 return pn_selectable_is_terminal(self._impl)
1253
1254 - def terminate(self):
1255 pn_selectable_terminate(self._impl)
1256
1257 - def release(self):
1258 pn_selectable_release(self._impl)
1259
1260 -class DataException(ProtonException):
1261 """ 1262 The DataException class is the root of the Data exception hierarchy. 1263 All exceptions raised by the Data class extend this exception. 1264 """ 1265 pass
1266
1267 -class UnmappedType:
1268
1269 - def __init__(self, msg):
1270 self.msg = msg
1271
1272 - def __repr__(self):
1273 return "UnmappedType(%s)" % self.msg
1274
1275 -class ulong(long):
1276
1277 - def __repr__(self):
1278 return "ulong(%s)" % long.__repr__(self)
1279
1280 -class timestamp(long):
1281
1282 - def __repr__(self):
1283 return "timestamp(%s)" % long.__repr__(self)
1284
1285 -class symbol(unicode):
1286
1287 - def __repr__(self):
1288 return "symbol(%s)" % unicode.__repr__(self)
1289
1290 -class char(unicode):
1291
1292 - def __repr__(self):
1293 return "char(%s)" % unicode.__repr__(self)
1294
1295 -class byte(int):
1296
1297 - def __repr__(self):
1298 return "byte(%s)" % int.__repr__(self)
1299
1300 -class short(int):
1301
1302 - def __repr__(self):
1303 return "short(%s)" % int.__repr__(self)
1304
1305 -class int32(int):
1306
1307 - def __repr__(self):
1308 return "int32(%s)" % int.__repr__(self)
1309
1310 -class ubyte(int):
1311
1312 - def __repr__(self):
1313 return "ubyte(%s)" % int.__repr__(self)
1314
1315 -class ushort(int):
1316
1317 - def __repr__(self):
1318 return "ushort(%s)" % int.__repr__(self)
1319
1320 -class uint(long):
1321
1322 - def __repr__(self):
1323 return "uint(%s)" % long.__repr__(self)
1324
1325 -class float32(float):
1326
1327 - def __repr__(self):
1328 return "float32(%s)" % float.__repr__(self)
1329
1330 -class decimal32(int):
1331
1332 - def __repr__(self):
1333 return "decimal32(%s)" % int.__repr__(self)
1334
1335 -class decimal64(long):
1336
1337 - def __repr__(self):
1338 return "decimal64(%s)" % long.__repr__(self)
1339
1340 -class decimal128(bytes):
1341
1342 - def __repr__(self):
1343 return "decimal128(%s)" % bytes.__repr__(self)
1344
1345 -class Described(object):
1346
1347 - def __init__(self, descriptor, value):
1348 self.descriptor = descriptor 1349 self.value = value
1350
1351 - def __repr__(self):
1352 return "Described(%r, %r)" % (self.descriptor, self.value)
1353
1354 - def __eq__(self, o):
1355 if isinstance(o, Described): 1356 return self.descriptor == o.descriptor and self.value == o.value 1357 else: 1358 return False
1359 1360 UNDESCRIBED = Constant("UNDESCRIBED")
1361 1362 -class Array(object):
1363
1364 - def __init__(self, descriptor, type, *elements):
1365 self.descriptor = descriptor 1366 self.type = type 1367 self.elements = elements
1368
1369 - def __iter__(self):
1370 return iter(self.elements)
1371
1372 - def __repr__(self):
1373 if self.elements: 1374 els = ", %s" % (", ".join(map(repr, self.elements))) 1375 else: 1376 els = "" 1377 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1378
1379 - def __eq__(self, o):
1380 if isinstance(o, Array): 1381 return self.descriptor == o.descriptor and \ 1382 self.type == o.type and self.elements == o.elements 1383 else: 1384 return False
1385
1386 -class Data:
1387 """ 1388 The L{Data} class provides an interface for decoding, extracting, 1389 creating, and encoding arbitrary AMQP data. A L{Data} object 1390 contains a tree of AMQP values. Leaf nodes in this tree correspond 1391 to scalars in the AMQP type system such as L{ints<INT>} or 1392 L{strings<STRING>}. Non-leaf nodes in this tree correspond to 1393 compound values in the AMQP type system such as L{lists<LIST>}, 1394 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. 1395 The root node of the tree is the L{Data} object itself and can have 1396 an arbitrary number of children. 1397 1398 A L{Data} object maintains the notion of the current sibling node 1399 and a current parent node. Siblings are ordered within their parent. 1400 Values are accessed and/or added by using the L{next}, L{prev}, 1401 L{enter}, and L{exit} methods to navigate to the desired location in 1402 the tree and using the supplied variety of put_*/get_* methods to 1403 access or add a value of the desired type. 1404 1405 The put_* methods will always add a value I{after} the current node 1406 in the tree. If the current node has a next sibling the put_* method 1407 will overwrite the value on this node. If there is no current node 1408 or the current node has no next sibling then one will be added. The 1409 put_* methods always set the added/modified node to the current 1410 node. The get_* methods read the value of the current node and do 1411 not change which node is current. 1412 1413 The following types of scalar values are supported: 1414 1415 - L{NULL} 1416 - L{BOOL} 1417 - L{UBYTE} 1418 - L{USHORT} 1419 - L{SHORT} 1420 - L{UINT} 1421 - L{INT} 1422 - L{ULONG} 1423 - L{LONG} 1424 - L{FLOAT} 1425 - L{DOUBLE} 1426 - L{BINARY} 1427 - L{STRING} 1428 - L{SYMBOL} 1429 1430 The following types of compound values are supported: 1431 1432 - L{DESCRIBED} 1433 - L{ARRAY} 1434 - L{LIST} 1435 - L{MAP} 1436 """ 1437 1438 NULL = PN_NULL; "A null value." 1439 BOOL = PN_BOOL; "A boolean value." 1440 UBYTE = PN_UBYTE; "An unsigned byte value." 1441 BYTE = PN_BYTE; "A signed byte value." 1442 USHORT = PN_USHORT; "An unsigned short value." 1443 SHORT = PN_SHORT; "A short value." 1444 UINT = PN_UINT; "An unsigned int value." 1445 INT = PN_INT; "A signed int value." 1446 CHAR = PN_CHAR; "A character value." 1447 ULONG = PN_ULONG; "An unsigned long value." 1448 LONG = PN_LONG; "A signed long value." 1449 TIMESTAMP = PN_TIMESTAMP; "A timestamp value." 1450 FLOAT = PN_FLOAT; "A float value." 1451 DOUBLE = PN_DOUBLE; "A double value." 1452 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." 1453 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." 1454 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." 1455 UUID = PN_UUID; "A UUID value." 1456 BINARY = PN_BINARY; "A binary string." 1457 STRING = PN_STRING; "A unicode string." 1458 SYMBOL = PN_SYMBOL; "A symbolic string." 1459 DESCRIBED = PN_DESCRIBED; "A described value." 1460 ARRAY = PN_ARRAY; "An array value." 1461 LIST = PN_LIST; "A list value." 1462 MAP = PN_MAP; "A map value." 1463 1464 type_names = { 1465 NULL: "null", 1466 BOOL: "bool", 1467 BYTE: "byte", 1468 UBYTE: "ubyte", 1469 SHORT: "short", 1470 USHORT: "ushort", 1471 INT: "int", 1472 UINT: "uint", 1473 CHAR: "char", 1474 LONG: "long", 1475 ULONG: "ulong", 1476 TIMESTAMP: "timestamp", 1477 FLOAT: "float", 1478 DOUBLE: "double", 1479 DECIMAL32: "decimal32", 1480 DECIMAL64: "decimal64", 1481 DECIMAL128: "decimal128", 1482 UUID: "uuid", 1483 BINARY: "binary", 1484 STRING: "string", 1485 SYMBOL: "symbol", 1486 DESCRIBED: "described", 1487 ARRAY: "array", 1488 LIST: "list", 1489 MAP: "map" 1490 } 1491 1492 @classmethod
1493 - def type_name(type): return Data.type_names[type]
1494
1495 - def __init__(self, capacity=16):
1496 if type(capacity) in _compat.INT_TYPES: 1497 self._data = pn_data(capacity) 1498 self._free = True 1499 else: 1500 self._data = capacity 1501 self._free = False
1502
1503 - def __del__(self):
1504 if self._free and hasattr(self, "_data"): 1505 pn_data_free(self._data) 1506 del self._data
1507
1508 - def _check(self, err):
1509 if err < 0: 1510 exc = EXCEPTIONS.get(err, DataException) 1511 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data)))) 1512 else: 1513 return err
1514
1515 - def clear(self):
1516 """ 1517 Clears the data object. 1518 """ 1519 pn_data_clear(self._data)
1520
1521 - def rewind(self):
1522 """ 1523 Clears current node and sets the parent to the root node. Clearing the 1524 current node sets it _before_ the first node, calling next() will advance to 1525 the first node. 1526 """ 1527 assert self._data is not None 1528 pn_data_rewind(self._data)
1529
1530 - def next(self):
1531 """ 1532 Advances the current node to its next sibling and returns its 1533 type. If there is no next sibling the current node remains 1534 unchanged and None is returned. 1535 """ 1536 found = pn_data_next(self._data) 1537 if found: 1538 return self.type() 1539 else: 1540 return None
1541
1542 - def prev(self):
1543 """ 1544 Advances the current node to its previous sibling and returns its 1545 type. If there is no previous sibling the current node remains 1546 unchanged and None is returned. 1547 """ 1548 found = pn_data_prev(self._data) 1549 if found: 1550 return self.type() 1551 else: 1552 return None
1553
1554 - def enter(self):
1555 """ 1556 Sets the parent node to the current node and clears the current node. 1557 Clearing the current node sets it _before_ the first child, 1558 call next() advances to the first child. 1559 """ 1560 return pn_data_enter(self._data)
1561
1562 - def exit(self):
1563 """ 1564 Sets the current node to the parent node and the parent node to 1565 its own parent. 1566 """ 1567 return pn_data_exit(self._data)
1568
1569 - def lookup(self, name):
1570 return pn_data_lookup(self._data, name)
1571
1572 - def narrow(self):
1573 pn_data_narrow(self._data)
1574
1575 - def widen(self):
1576 pn_data_widen(self._data)
1577
1578 - def type(self):
1579 """ 1580 Returns the type of the current node. 1581 """ 1582 dtype = pn_data_type(self._data) 1583 if dtype == -1: 1584 return None 1585 else: 1586 return dtype
1587
1588 - def encoded_size(self):
1589 """ 1590 Returns the size in bytes needed to encode the data in AMQP format. 1591 """ 1592 return pn_data_encoded_size(self._data)
1593
1594 - def encode(self):
1595 """ 1596 Returns a representation of the data encoded in AMQP format. 1597 """ 1598 size = 1024 1599 while True: 1600 cd, enc = pn_data_encode(self._data, size) 1601 if cd == PN_OVERFLOW: 1602 size *= 2 1603 elif cd >= 0: 1604 return enc 1605 else: 1606 self._check(cd)
1607
1608 - def decode(self, encoded):
1609 """ 1610 Decodes the first value from supplied AMQP data and returns the 1611 number of bytes consumed. 1612 1613 @type encoded: binary 1614 @param encoded: AMQP encoded binary data 1615 """ 1616 return self._check(pn_data_decode(self._data, encoded))
1617
1618 - def put_list(self):
1619 """ 1620 Puts a list value. Elements may be filled by entering the list 1621 node and putting element values. 1622 1623 >>> data = Data() 1624 >>> data.put_list() 1625 >>> data.enter() 1626 >>> data.put_int(1) 1627 >>> data.put_int(2) 1628 >>> data.put_int(3) 1629 >>> data.exit() 1630 """ 1631 self._check(pn_data_put_list(self._data))
1632
1633 - def put_map(self):
1634 """ 1635 Puts a map value. Elements may be filled by entering the map node 1636 and putting alternating key value pairs. 1637 1638 >>> data = Data() 1639 >>> data.put_map() 1640 >>> data.enter() 1641 >>> data.put_string("key") 1642 >>> data.put_string("value") 1643 >>> data.exit() 1644 """ 1645 self._check(pn_data_put_map(self._data))
1646
1647 - def put_array(self, described, element_type):
1648 """ 1649 Puts an array value. Elements may be filled by entering the array 1650 node and putting the element values. The values must all be of the 1651 specified array element type. If an array is described then the 1652 first child value of the array is the descriptor and may be of any 1653 type. 1654 1655 >>> data = Data() 1656 >>> 1657 >>> data.put_array(False, Data.INT) 1658 >>> data.enter() 1659 >>> data.put_int(1) 1660 >>> data.put_int(2) 1661 >>> data.put_int(3) 1662 >>> data.exit() 1663 >>> 1664 >>> data.put_array(True, Data.DOUBLE) 1665 >>> data.enter() 1666 >>> data.put_symbol("array-descriptor") 1667 >>> data.put_double(1.1) 1668 >>> data.put_double(1.2) 1669 >>> data.put_double(1.3) 1670 >>> data.exit() 1671 1672 @type described: bool 1673 @param described: specifies whether the array is described 1674 @type element_type: int 1675 @param element_type: the type of the array elements 1676 """ 1677 self._check(pn_data_put_array(self._data, described, element_type))
1678
1679 - def put_described(self):
1680 """ 1681 Puts a described value. A described node has two children, the 1682 descriptor and the value. These are specified by entering the node 1683 and putting the desired values. 1684 1685 >>> data = Data() 1686 >>> data.put_described() 1687 >>> data.enter() 1688 >>> data.put_symbol("value-descriptor") 1689 >>> data.put_string("the value") 1690 >>> data.exit() 1691 """ 1692 self._check(pn_data_put_described(self._data))
1693
1694 - def put_null(self):
1695 """ 1696 Puts a null value. 1697 """ 1698 self._check(pn_data_put_null(self._data))
1699
1700 - def put_bool(self, b):
1701 """ 1702 Puts a boolean value. 1703 1704 @param b: a boolean value 1705 """ 1706 self._check(pn_data_put_bool(self._data, b))
1707
1708 - def put_ubyte(self, ub):
1709 """ 1710 Puts an unsigned byte value. 1711 1712 @param ub: an integral value 1713 """ 1714 self._check(pn_data_put_ubyte(self._data, ub))
1715
1716 - def put_byte(self, b):
1717 """ 1718 Puts a signed byte value. 1719 1720 @param b: an integral value 1721 """ 1722 self._check(pn_data_put_byte(self._data, b))
1723
1724 - def put_ushort(self, us):
1725 """ 1726 Puts an unsigned short value. 1727 1728 @param us: an integral value. 1729 """ 1730 self._check(pn_data_put_ushort(self._data, us))
1731
1732 - def put_short(self, s):
1733 """ 1734 Puts a signed short value. 1735 1736 @param s: an integral value 1737 """ 1738 self._check(pn_data_put_short(self._data, s))
1739
1740 - def put_uint(self, ui):
1741 """ 1742 Puts an unsigned int value. 1743 1744 @param ui: an integral value 1745 """ 1746 self._check(pn_data_put_uint(self._data, ui))
1747
1748 - def put_int(self, i):
1749 """ 1750 Puts a signed int value. 1751 1752 @param i: an integral value 1753 """ 1754 self._check(pn_data_put_int(self._data, i))
1755
1756 - def put_char(self, c):
1757 """ 1758 Puts a char value. 1759 1760 @param c: a single character 1761 """ 1762 self._check(pn_data_put_char(self._data, ord(c)))
1763
1764 - def put_ulong(self, ul):
1765 """ 1766 Puts an unsigned long value. 1767 1768 @param ul: an integral value 1769 """ 1770 self._check(pn_data_put_ulong(self._data, ul))
1771
1772 - def put_long(self, l):
1773 """ 1774 Puts a signed long value. 1775 1776 @param l: an integral value 1777 """ 1778 self._check(pn_data_put_long(self._data, l))
1779
1780 - def put_timestamp(self, t):
1781 """ 1782 Puts a timestamp value. 1783 1784 @param t: an integral value 1785 """ 1786 self._check(pn_data_put_timestamp(self._data, t))
1787
1788 - def put_float(self, f):
1789 """ 1790 Puts a float value. 1791 1792 @param f: a floating point value 1793 """ 1794 self._check(pn_data_put_float(self._data, f))
1795
1796 - def put_double(self, d):
1797 """ 1798 Puts a double value. 1799 1800 @param d: a floating point value. 1801 """ 1802 self._check(pn_data_put_double(self._data, d))
1803
1804 - def put_decimal32(self, d):
1805 """ 1806 Puts a decimal32 value. 1807 1808 @param d: a decimal32 value 1809 """ 1810 self._check(pn_data_put_decimal32(self._data, d))
1811
1812 - def put_decimal64(self, d):
1813 """ 1814 Puts a decimal64 value. 1815 1816 @param d: a decimal64 value 1817 """ 1818 self._check(pn_data_put_decimal64(self._data, d))
1819
1820 - def put_decimal128(self, d):
1821 """ 1822 Puts a decimal128 value. 1823 1824 @param d: a decimal128 value 1825 """ 1826 self._check(pn_data_put_decimal128(self._data, d))
1827
1828 - def put_uuid(self, u):
1829 """ 1830 Puts a UUID value. 1831 1832 @param u: a uuid value 1833 """ 1834 self._check(pn_data_put_uuid(self._data, u.bytes))
1835
1836 - def put_binary(self, b):
1837 """ 1838 Puts a binary value. 1839 1840 @type b: binary 1841 @param b: a binary value 1842 """ 1843 self._check(pn_data_put_binary(self._data, b))
1844
1845 - def put_memoryview(self, mv):
1846 """Put a python memoryview object as an AMQP binary value""" 1847 self.put_binary(mv.tobytes())
1848
1849 - def put_buffer(self, buff):
1850 """Put a python buffer object as an AMQP binary value""" 1851 self.put_binary(bytes(buff))
1852
1853 - def put_string(self, s):
1854 """ 1855 Puts a unicode value. 1856 1857 @type s: unicode 1858 @param s: a unicode value 1859 """ 1860 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1861
1862 - def put_symbol(self, s):
1863 """ 1864 Puts a symbolic value. 1865 1866 @type s: string 1867 @param s: the symbol name 1868 """ 1869 self._check(pn_data_put_symbol(self._data, s.encode('ascii')))
1870
1871 - def get_list(self):
1872 """ 1873 If the current node is a list, return the number of elements, 1874 otherwise return zero. List elements can be accessed by entering 1875 the list. 1876 1877 >>> count = data.get_list() 1878 >>> data.enter() 1879 >>> for i in range(count): 1880 ... type = data.next() 1881 ... if type == Data.STRING: 1882 ... print data.get_string() 1883 ... elif type == ...: 1884 ... ... 1885 >>> data.exit() 1886 """ 1887 return pn_data_get_list(self._data)
1888
1889 - def get_map(self):
1890 """ 1891 If the current node is a map, return the number of child elements, 1892 otherwise return zero. Key value pairs can be accessed by entering 1893 the map. 1894 1895 >>> count = data.get_map() 1896 >>> data.enter() 1897 >>> for i in range(count/2): 1898 ... type = data.next() 1899 ... if type == Data.STRING: 1900 ... print data.get_string() 1901 ... elif type == ...: 1902 ... ... 1903 >>> data.exit() 1904 """ 1905 return pn_data_get_map(self._data)
1906
1907 - def get_array(self):
1908 """ 1909 If the current node is an array, return a tuple of the element 1910 count, a boolean indicating whether the array is described, and 1911 the type of each element, otherwise return (0, False, None). Array 1912 data can be accessed by entering the array. 1913 1914 >>> # read an array of strings with a symbolic descriptor 1915 >>> count, described, type = data.get_array() 1916 >>> data.enter() 1917 >>> data.next() 1918 >>> print "Descriptor:", data.get_symbol() 1919 >>> for i in range(count): 1920 ... data.next() 1921 ... print "Element:", data.get_string() 1922 >>> data.exit() 1923 """ 1924 count = pn_data_get_array(self._data) 1925 described = pn_data_is_array_described(self._data) 1926 type = pn_data_get_array_type(self._data) 1927 if type == -1: 1928 type = None 1929 return count, described, type
1930
1931 - def is_described(self):
1932 """ 1933 Checks if the current node is a described value. The descriptor 1934 and value may be accessed by entering the described value. 1935 1936 >>> # read a symbolically described string 1937 >>> assert data.is_described() # will error if the current node is not described 1938 >>> data.enter() 1939 >>> data.next() 1940 >>> print data.get_symbol() 1941 >>> data.next() 1942 >>> print data.get_string() 1943 >>> data.exit() 1944 """ 1945 return pn_data_is_described(self._data)
1946
1947 - def is_null(self):
1948 """ 1949 Checks if the current node is a null. 1950 """ 1951 return pn_data_is_null(self._data)
1952
1953 - def get_bool(self):
1954 """ 1955 If the current node is a boolean, returns its value, returns False 1956 otherwise. 1957 """ 1958 return pn_data_get_bool(self._data)
1959
1960 - def get_ubyte(self):
1961 """ 1962 If the current node is an unsigned byte, returns its value, 1963 returns 0 otherwise. 1964 """ 1965 return ubyte(pn_data_get_ubyte(self._data))
1966
1967 - def get_byte(self):
1968 """ 1969 If the current node is a signed byte, returns its value, returns 0 1970 otherwise. 1971 """ 1972 return byte(pn_data_get_byte(self._data))
1973
1974 - def get_ushort(self):
1975 """ 1976 If the current node is an unsigned short, returns its value, 1977 returns 0 otherwise. 1978 """ 1979 return ushort(pn_data_get_ushort(self._data))
1980
1981 - def get_short(self):
1982 """ 1983 If the current node is a signed short, returns its value, returns 1984 0 otherwise. 1985 """ 1986 return short(pn_data_get_short(self._data))
1987
1988 - def get_uint(self):
1989 """ 1990 If the current node is an unsigned int, returns its value, returns 1991 0 otherwise. 1992 """ 1993 return uint(pn_data_get_uint(self._data))
1994
1995 - def get_int(self):
1996 """ 1997 If the current node is a signed int, returns its value, returns 0 1998 otherwise. 1999 """ 2000 return int32(pn_data_get_int(self._data))
2001
2002 - def get_char(self):
2003 """ 2004 If the current node is a char, returns its value, returns 0 2005 otherwise. 2006 """ 2007 return char(_compat.unichar(pn_data_get_char(self._data)))
2008
2009 - def get_ulong(self):
2010 """ 2011 If the current node is an unsigned long, returns its value, 2012 returns 0 otherwise. 2013 """ 2014 return ulong(pn_data_get_ulong(self._data))
2015
2016 - def get_long(self):
2017 """ 2018 If the current node is an signed long, returns its value, returns 2019 0 otherwise. 2020 """ 2021 return long(pn_data_get_long(self._data))
2022
2023 - def get_timestamp(self):
2024 """ 2025 If the current node is a timestamp, returns its value, returns 0 2026 otherwise. 2027 """ 2028 return timestamp(pn_data_get_timestamp(self._data))
2029
2030 - def get_float(self):
2031 """ 2032 If the current node is a float, returns its value, raises 0 2033 otherwise. 2034 """ 2035 return float32(pn_data_get_float(self._data))
2036
2037 - def get_double(self):
2038 """ 2039 If the current node is a double, returns its value, returns 0 2040 otherwise. 2041 """ 2042 return pn_data_get_double(self._data)
2043 2044 # XXX: need to convert
2045 - def get_decimal32(self):
2046 """ 2047 If the current node is a decimal32, returns its value, returns 0 2048 otherwise. 2049 """ 2050 return decimal32(pn_data_get_decimal32(self._data))
2051 2052 # XXX: need to convert
2053 - def get_decimal64(self):
2054 """ 2055 If the current node is a decimal64, returns its value, returns 0 2056 otherwise. 2057 """ 2058 return decimal64(pn_data_get_decimal64(self._data))
2059 2060 # XXX: need to convert
2061 - def get_decimal128(self):
2062 """ 2063 If the current node is a decimal128, returns its value, returns 0 2064 otherwise. 2065 """ 2066 return decimal128(pn_data_get_decimal128(self._data))
2067
2068 - def get_uuid(self):
2069 """ 2070 If the current node is a UUID, returns its value, returns None 2071 otherwise. 2072 """ 2073 if pn_data_type(self._data) == Data.UUID: 2074 return uuid.UUID(bytes=pn_data_get_uuid(self._data)) 2075 else: 2076 return None
2077
2078 - def get_binary(self):
2079 """ 2080 If the current node is binary, returns its value, returns "" 2081 otherwise. 2082 """ 2083 return pn_data_get_binary(self._data)
2084
2085 - def get_string(self):
2086 """ 2087 If the current node is a string, returns its value, returns "" 2088 otherwise. 2089 """ 2090 return pn_data_get_string(self._data).decode("utf8")
2091
2092 - def get_symbol(self):
2093 """ 2094 If the current node is a symbol, returns its value, returns "" 2095 otherwise. 2096 """ 2097 return symbol(pn_data_get_symbol(self._data).decode('ascii'))
2098
2099 - def copy(self, src):
2100 self._check(pn_data_copy(self._data, src._data))
2101
2102 - def format(self):
2103 sz = 16 2104 while True: 2105 err, result = pn_data_format(self._data, sz) 2106 if err == PN_OVERFLOW: 2107 sz *= 2 2108 continue 2109 else: 2110 self._check(err) 2111 return result
2112
2113 - def dump(self):
2114 pn_data_dump(self._data)
2115
2116 - def put_dict(self, d):
2117 self.put_map() 2118 self.enter() 2119 try: 2120 for k, v in d.items(): 2121 self.put_object(k) 2122 self.put_object(v) 2123 finally: 2124 self.exit()
2125
2126 - def get_dict(self):
2127 if self.enter(): 2128 try: 2129 result = {} 2130 while self.next(): 2131 k = self.get_object() 2132 if self.next(): 2133 v = self.get_object() 2134 else: 2135 v = None 2136 result[k] = v 2137 finally: 2138 self.exit() 2139 return result
2140
2141 - def put_sequence(self, s):
2142 self.put_list() 2143 self.enter() 2144 try: 2145 for o in s: 2146 self.put_object(o) 2147 finally: 2148 self.exit()
2149
2150 - def get_sequence(self):
2151 if self.enter(): 2152 try: 2153 result = [] 2154 while self.next(): 2155 result.append(self.get_object()) 2156 finally: 2157 self.exit() 2158 return result
2159
2160 - def get_py_described(self):
2161 if self.enter(): 2162 try: 2163 self.next() 2164 descriptor = self.get_object() 2165 self.next() 2166 value = self.get_object() 2167 finally: 2168 self.exit() 2169 return Described(descriptor, value)
2170
2171 - def put_py_described(self, d):
2172 self.put_described() 2173 self.enter() 2174 try: 2175 self.put_object(d.descriptor) 2176 self.put_object(d.value) 2177 finally: 2178 self.exit()
2179
2180 - def get_py_array(self):
2181 """ 2182 If the current node is an array, return an Array object 2183 representing the array and its contents. Otherwise return None. 2184 This is a convenience wrapper around get_array, enter, etc. 2185 """ 2186 2187 count, described, type = self.get_array() 2188 if type is None: return None 2189 if self.enter(): 2190 try: 2191 if described: 2192 self.next() 2193 descriptor = self.get_object() 2194 else: 2195 descriptor = UNDESCRIBED 2196 elements = [] 2197 while self.next(): 2198 elements.append(self.get_object()) 2199 finally: 2200 self.exit() 2201 return Array(descriptor, type, *elements)
2202
2203 - def put_py_array(self, a):
2204 described = a.descriptor != UNDESCRIBED 2205 self.put_array(described, a.type) 2206 self.enter() 2207 try: 2208 if described: 2209 self.put_object(a.descriptor) 2210 for e in a.elements: 2211 self.put_object(e) 2212 finally: 2213 self.exit()
2214 2215 put_mappings = { 2216 None.__class__: lambda s, _: s.put_null(), 2217 bool: put_bool, 2218 ubyte: put_ubyte, 2219 ushort: put_ushort, 2220 uint: put_uint, 2221 ulong: put_ulong, 2222 byte: put_byte, 2223 short: put_short, 2224 int32: put_int, 2225 long: put_long, 2226 float32: put_float, 2227 float: put_double, 2228 decimal32: put_decimal32, 2229 decimal64: put_decimal64, 2230 decimal128: put_decimal128, 2231 char: put_char, 2232 timestamp: put_timestamp, 2233 uuid.UUID: put_uuid, 2234 bytes: put_binary, 2235 unicode: put_string, 2236 symbol: put_symbol, 2237 list: put_sequence, 2238 tuple: put_sequence, 2239 dict: put_dict, 2240 Described: put_py_described, 2241 Array: put_py_array 2242 } 2243 # for python 3.x, long is merely an alias for int, but for python 2.x 2244 # we need to add an explicit int since it is a different type 2245 if int not in put_mappings: 2246 put_mappings[int] = put_int 2247 # Python >=3.0 has 'memoryview', <=2.5 has 'buffer', >=2.6 has both. 2248 try: put_mappings[memoryview] = put_memoryview 2249 except NameError: pass 2250 try: put_mappings[buffer] = put_buffer 2251 except NameError: pass 2252 get_mappings = { 2253 NULL: lambda s: None, 2254 BOOL: get_bool, 2255 BYTE: get_byte, 2256 UBYTE: get_ubyte, 2257 SHORT: get_short, 2258 USHORT: get_ushort, 2259 INT: get_int, 2260 UINT: get_uint, 2261 CHAR: get_char, 2262 LONG: get_long, 2263 ULONG: get_ulong, 2264 TIMESTAMP: get_timestamp, 2265 FLOAT: get_float, 2266 DOUBLE: get_double, 2267 DECIMAL32: get_decimal32, 2268 DECIMAL64: get_decimal64, 2269 DECIMAL128: get_decimal128, 2270 UUID: get_uuid, 2271 BINARY: get_binary, 2272 STRING: get_string, 2273 SYMBOL: get_symbol, 2274 DESCRIBED: get_py_described, 2275 ARRAY: get_py_array, 2276 LIST: get_sequence, 2277 MAP: get_dict 2278 } 2279 2280
2281 - def put_object(self, obj):
2282 putter = self.put_mappings[obj.__class__] 2283 putter(self, obj)
2284
2285 - def get_object(self):
2286 type = self.type() 2287 if type is None: return None 2288 getter = self.get_mappings.get(type) 2289 if getter: 2290 return getter(self) 2291 else: 2292 return UnmappedType(str(type))
2293
2294 -class ConnectionException(ProtonException):
2295 pass
2296
2297 -class Endpoint(object):
2298 2299 LOCAL_UNINIT = PN_LOCAL_UNINIT 2300 REMOTE_UNINIT = PN_REMOTE_UNINIT 2301 LOCAL_ACTIVE = PN_LOCAL_ACTIVE 2302 REMOTE_ACTIVE = PN_REMOTE_ACTIVE 2303 LOCAL_CLOSED = PN_LOCAL_CLOSED 2304 REMOTE_CLOSED = PN_REMOTE_CLOSED 2305
2306 - def _init(self):
2307 self.condition = None
2308
2309 - def _update_cond(self):
2310 obj2cond(self.condition, self._get_cond_impl())
2311 2312 @property
2313 - def remote_condition(self):
2314 return cond2obj(self._get_remote_cond_impl())
2315 2316 # the following must be provided by subclasses
2317 - def _get_cond_impl(self):
2318 assert False, "Subclass must override this!"
2319
2320 - def _get_remote_cond_impl(self):
2321 assert False, "Subclass must override this!"
2322
2323 - def _get_handler(self):
2324 from . import reactor 2325 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 2326 if ractor: 2327 on_error = ractor.on_error 2328 else: 2329 on_error = None 2330 record = self._get_attachments() 2331 return WrappedHandler.wrap(pn_record_get_handler(record), on_error)
2332
2333 - def _set_handler(self, handler):
2334 from . import reactor 2335 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 2336 if ractor: 2337 on_error = ractor.on_error 2338 else: 2339 on_error = None 2340 impl = _chandler(handler, on_error) 2341 record = self._get_attachments() 2342 pn_record_set_handler(record, impl) 2343 pn_decref(impl)
2344 2345 handler = property(_get_handler, _set_handler) 2346 2347 @property
2348 - def transport(self):
2349 return self.connection.transport
2350
2351 -class Condition:
2352
2353 - def __init__(self, name, description=None, info=None):
2354 self.name = name 2355 self.description = description 2356 self.info = info
2357
2358 - def __repr__(self):
2359 return "Condition(%s)" % ", ".join([repr(x) for x in 2360 (self.name, self.description, self.info) 2361 if x])
2362
2363 - def __eq__(self, o):
2364 if not isinstance(o, Condition): return False 2365 return self.name == o.name and \ 2366 self.description == o.description and \ 2367 self.info == o.info
2368
2369 -def obj2cond(obj, cond):
2370 pn_condition_clear(cond) 2371 if obj: 2372 pn_condition_set_name(cond, str(obj.name)) 2373 pn_condition_set_description(cond, obj.description) 2374 info = Data(pn_condition_info(cond)) 2375 if obj.info: 2376 info.put_object(obj.info)
2377
2378 -def cond2obj(cond):
2379 if pn_condition_is_set(cond): 2380 return Condition(pn_condition_get_name(cond), 2381 pn_condition_get_description(cond), 2382 dat2obj(pn_condition_info(cond))) 2383 else: 2384 return None
2385
2386 -def dat2obj(dimpl):
2387 if dimpl: 2388 d = Data(dimpl) 2389 d.rewind() 2390 d.next() 2391 obj = d.get_object() 2392 d.rewind() 2393 return obj
2394
2395 -def obj2dat(obj, dimpl):
2396 if obj is not None: 2397 d = Data(dimpl) 2398 d.put_object(obj)
2399
2400 -def secs2millis(secs):
2401 return long(secs*1000)
2402
2403 -def millis2secs(millis):
2404 return float(millis)/1000.0
2405
2406 -def timeout2millis(secs):
2407 if secs is None: return PN_MILLIS_MAX 2408 return secs2millis(secs)
2409
2410 -def millis2timeout(millis):
2411 if millis == PN_MILLIS_MAX: return None 2412 return millis2secs(millis)
2413
2414 -def unicode2utf8(string):
2415 """Some Proton APIs expect a null terminated string. Convert python text 2416 types to UTF8 to avoid zero bytes introduced by other multi-byte encodings. 2417 This method will throw if the string cannot be converted. 2418 """ 2419 if string is None: 2420 return None 2421 if _compat.IS_PY2: 2422 if isinstance(string, unicode): 2423 return string.encode('utf-8') 2424 elif isinstance(string, str): 2425 return string 2426 else: 2427 # decoding a string results in bytes 2428 if isinstance(string, str): 2429 string = string.encode('utf-8') 2430 # fall through 2431 if isinstance(string, bytes): 2432 return string.decode('utf-8') 2433 raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string)))
2434
2435 -def utf82unicode(string):
2436 """Covert C strings returned from proton-c into python unicode""" 2437 if string is None: 2438 return None 2439 if isinstance(string, _compat.TEXT_TYPES): 2440 # already unicode 2441 return string 2442 elif isinstance(string, _compat.BINARY_TYPES): 2443 return string.decode('utf8') 2444 else: 2445 raise TypeError("Unrecognized string type")
2446
2447 -class Connection(Wrapper, Endpoint):
2448 """ 2449 A representation of an AMQP connection 2450 """ 2451 2452 @staticmethod
2453 - def wrap(impl):
2454 if impl is None: 2455 return None 2456 else: 2457 return Connection(impl)
2458
2459 - def __init__(self, impl = pn_connection):
2460 Wrapper.__init__(self, impl, pn_connection_attachments)
2461
2462 - def _init(self):
2463 Endpoint._init(self) 2464 self.offered_capabilities = None 2465 self.desired_capabilities = None 2466 self.properties = None
2467
2468 - def _get_attachments(self):
2469 return pn_connection_attachments(self._impl)
2470 2471 @property
2472 - def connection(self):
2473 return self
2474 2475 @property
2476 - def transport(self):
2477 return Transport.wrap(pn_connection_transport(self._impl))
2478
2479 - def _check(self, err):
2480 if err < 0: 2481 exc = EXCEPTIONS.get(err, ConnectionException) 2482 raise exc("[%s]: %s" % (err, pn_connection_error(self._impl))) 2483 else: 2484 return err
2485
2486 - def _get_cond_impl(self):
2487 return pn_connection_condition(self._impl)
2488
2489 - def _get_remote_cond_impl(self):
2490 return pn_connection_remote_condition(self._impl)
2491
2492 - def collect(self, collector):
2493 if collector is None: 2494 pn_connection_collect(self._impl, None) 2495 else: 2496 pn_connection_collect(self._impl, collector._impl) 2497 self._collector = weakref.ref(collector)
2498
2499 - def _get_container(self):
2500 return utf82unicode(pn_connection_get_container(self._impl))
2501 - def _set_container(self, name):
2502 return pn_connection_set_container(self._impl, unicode2utf8(name))
2503 2504 container = property(_get_container, _set_container) 2505
2506 - def _get_hostname(self):
2507 return utf82unicode(pn_connection_get_hostname(self._impl))
2508 - def _set_hostname(self, name):
2509 return pn_connection_set_hostname(self._impl, unicode2utf8(name))
2510 2511 hostname = property(_get_hostname, _set_hostname, 2512 doc=""" 2513 Set the name of the host (either fully qualified or relative) to which this 2514 connection is connecting to. This information may be used by the remote 2515 peer to determine the correct back-end service to connect the client to. 2516 This value will be sent in the Open performative, and will be used by SSL 2517 and SASL layers to identify the peer. 2518 """) 2519
2520 - def _get_user(self):
2521 return utf82unicode(pn_connection_get_user(self._impl))
2522 - def _set_user(self, name):
2523 return pn_connection_set_user(self._impl, unicode2utf8(name))
2524 2525 user = property(_get_user, _set_user) 2526
2527 - def _get_password(self):
2528 return None
2529 - def _set_password(self, name):
2530 return pn_connection_set_password(self._impl, unicode2utf8(name))
2531 2532 password = property(_get_password, _set_password) 2533 2534 @property
2535 - def remote_container(self):
2536 """The container identifier specified by the remote peer for this connection.""" 2537 return pn_connection_remote_container(self._impl)
2538 2539 @property
2540 - def remote_hostname(self):
2541 """The hostname specified by the remote peer for this connection.""" 2542 return pn_connection_remote_hostname(self._impl)
2543 2544 @property
2546 """The capabilities offered by the remote peer for this connection.""" 2547 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
2548 2549 @property
2551 """The capabilities desired by the remote peer for this connection.""" 2552 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
2553 2554 @property
2555 - def remote_properties(self):
2556 """The properties specified by the remote peer for this connection.""" 2557 return dat2obj(pn_connection_remote_properties(self._impl))
2558
2559 - def open(self):
2560 """ 2561 Opens the connection. 2562 2563 In more detail, this moves the local state of the connection to 2564 the ACTIVE state and triggers an open frame to be sent to the 2565 peer. A connection is fully active once both peers have opened it. 2566 """ 2567 obj2dat(self.offered_capabilities, 2568 pn_connection_offered_capabilities(self._impl)) 2569 obj2dat(self.desired_capabilities, 2570 pn_connection_desired_capabilities(self._impl)) 2571 obj2dat(self.properties, pn_connection_properties(self._impl)) 2572 pn_connection_open(self._impl)
2573
2574 - def close(self):
2575 """ 2576 Closes the connection. 2577 2578 In more detail, this moves the local state of the connection to 2579 the CLOSED state and triggers a close frame to be sent to the 2580 peer. A connection is fully closed once both peers have closed it. 2581 """ 2582 self._update_cond() 2583 pn_connection_close(self._impl) 2584 if hasattr(self, '_session_policy'): 2585 # break circular ref 2586 del self._session_policy
2587 2588 @property
2589 - def state(self):
2590 """ 2591 The state of the connection as a bit field. The state has a local 2592 and a remote component. Each of these can be in one of three 2593 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking 2594 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT, 2595 REMOTE_ACTIVE and REMOTE_CLOSED. 2596 """ 2597 return pn_connection_state(self._impl)
2598
2599 - def session(self):
2600 """ 2601 Returns a new session on this connection. 2602 """ 2603 ssn = pn_session(self._impl) 2604 if ssn is None: 2605 raise(SessionException("Session allocation failed.")) 2606 else: 2607 return Session(ssn)
2608
2609 - def session_head(self, mask):
2610 return Session.wrap(pn_session_head(self._impl, mask))
2611 2614 2615 @property
2616 - def work_head(self):
2617 return Delivery.wrap(pn_work_head(self._impl))
2618 2619 @property
2620 - def error(self):
2621 return pn_error_code(pn_connection_error(self._impl))
2622
2623 - def free(self):
2624 pn_connection_release(self._impl)
2625
2626 -class SessionException(ProtonException):
2627 pass
2628
2629 -class Session(Wrapper, Endpoint):
2630 2631 @staticmethod
2632 - def wrap(impl):
2633 if impl is None: 2634 return None 2635 else: 2636 return Session(impl)
2637
2638 - def __init__(self, impl):
2639 Wrapper.__init__(self, impl, pn_session_attachments)
2640
2641 - def _get_attachments(self):
2642 return pn_session_attachments(self._impl)
2643
2644 - def _get_cond_impl(self):
2645 return pn_session_condition(self._impl)
2646
2647 - def _get_remote_cond_impl(self):
2648 return pn_session_remote_condition(self._impl)
2649
2650 - def _get_incoming_capacity(self):
2651 return pn_session_get_incoming_capacity(self._impl)
2652
2653 - def _set_incoming_capacity(self, capacity):
2654 pn_session_set_incoming_capacity(self._impl, capacity)
2655 2656 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) 2657
2658 - def _get_outgoing_window(self):
2659 return pn_session_get_outgoing_window(self._impl)
2660
2661 - def _set_outgoing_window(self, window):
2662 pn_session_set_outgoing_window(self._impl, window)
2663 2664 outgoing_window = property(_get_outgoing_window, _set_outgoing_window) 2665 2666 @property
2667 - def outgoing_bytes(self):
2668 return pn_session_outgoing_bytes(self._impl)
2669 2670 @property
2671 - def incoming_bytes(self):
2672 return pn_session_incoming_bytes(self._impl)
2673
2674 - def open(self):
2675 pn_session_open(self._impl)
2676
2677 - def close(self):
2678 self._update_cond() 2679 pn_session_close(self._impl)
2680
2681 - def next(self, mask):
2682 return Session.wrap(pn_session_next(self._impl, mask))
2683 2684 @property
2685 - def state(self):
2686 return pn_session_state(self._impl)
2687 2688 @property
2689 - def connection(self):
2690 return Connection.wrap(pn_session_connection(self._impl))
2691
2692 - def sender(self, name):
2693 return Sender(pn_sender(self._impl, unicode2utf8(name)))
2694
2695 - def receiver(self, name):
2696 return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
2697
2698 - def free(self):
2699 pn_session_free(self._impl)
2700
2701 -class LinkException(ProtonException):
2702 pass
2703 2896
2897 -class Terminus(object):
2898 2899 UNSPECIFIED = PN_UNSPECIFIED 2900 SOURCE = PN_SOURCE 2901 TARGET = PN_TARGET 2902 COORDINATOR = PN_COORDINATOR 2903 2904 NONDURABLE = PN_NONDURABLE 2905 CONFIGURATION = PN_CONFIGURATION 2906 DELIVERIES = PN_DELIVERIES 2907 2908 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED 2909 DIST_MODE_COPY = PN_DIST_MODE_COPY 2910 DIST_MODE_MOVE = PN_DIST_MODE_MOVE 2911 2912 EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK 2913 EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION 2914 EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION 2915 EXPIRE_NEVER = PN_EXPIRE_NEVER 2916
2917 - def __init__(self, impl):
2918 self._impl = impl
2919
2920 - def _check(self, err):
2921 if err < 0: 2922 exc = EXCEPTIONS.get(err, LinkException) 2923 raise exc("[%s]" % err) 2924 else: 2925 return err
2926
2927 - def _get_type(self):
2928 return pn_terminus_get_type(self._impl)
2929 - def _set_type(self, type):
2930 self._check(pn_terminus_set_type(self._impl, type))
2931 type = property(_get_type, _set_type) 2932
2933 - def _get_address(self):
2934 """The address that identifies the source or target node""" 2935 return utf82unicode(pn_terminus_get_address(self._impl))
2936 - def _set_address(self, address):
2937 self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
2938 address = property(_get_address, _set_address) 2939
2940 - def _get_durability(self):
2941 return pn_terminus_get_durability(self._impl)
2942 - def _set_durability(self, seconds):
2943 self._check(pn_terminus_set_durability(self._impl, seconds))
2944 durability = property(_get_durability, _set_durability) 2945
2946 - def _get_expiry_policy(self):
2947 return pn_terminus_get_expiry_policy(self._impl)
2948 - def _set_expiry_policy(self, seconds):
2949 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2950 expiry_policy = property(_get_expiry_policy, _set_expiry_policy) 2951
2952 - def _get_timeout(self):
2953 return pn_terminus_get_timeout(self._impl)
2954 - def _set_timeout(self, seconds):
2955 self._check(pn_terminus_set_timeout(self._impl, seconds))
2956 timeout = property(_get_timeout, _set_timeout) 2957
2958 - def _is_dynamic(self):
2959 """Indicates whether the source or target node was dynamically 2960 created""" 2961 return pn_terminus_is_dynamic(self._impl)
2962 - def _set_dynamic(self, dynamic):
2963 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2964 dynamic = property(_is_dynamic, _set_dynamic) 2965
2966 - def _get_distribution_mode(self):
2967 return pn_terminus_get_distribution_mode(self._impl)
2968 - def _set_distribution_mode(self, mode):
2969 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2970 distribution_mode = property(_get_distribution_mode, _set_distribution_mode) 2971 2972 @property
2973 - def properties(self):
2974 """Properties of a dynamic source or target.""" 2975 return Data(pn_terminus_properties(self._impl))
2976 2977 @property
2978 - def capabilities(self):
2979 """Capabilities of the source or target.""" 2980 return Data(pn_terminus_capabilities(self._impl))
2981 2982 @property
2983 - def outcomes(self):
2984 return Data(pn_terminus_outcomes(self._impl))
2985 2986 @property
2987 - def filter(self):
2988 """A filter on a source allows the set of messages transfered over 2989 the link to be restricted""" 2990 return Data(pn_terminus_filter(self._impl))
2991
2992 - def copy(self, src):
2993 self._check(pn_terminus_copy(self._impl, src._impl))
2994
2995 -class Sender(Link):
2996 """ 2997 A link over which messages are sent. 2998 """ 2999
3000 - def offered(self, n):
3001 pn_link_offered(self._impl, n)
3002
3003 - def stream(self, data):
3004 """ 3005 Send specified data as part of the current delivery 3006 3007 @type data: binary 3008 @param data: data to send 3009 """ 3010 return self._check(pn_link_send(self._impl, data))
3011
3012 - def send(self, obj, tag=None):
3013 """ 3014 Send specified object over this sender; the object is expected to 3015 have a send() method on it that takes the sender and an optional 3016 tag as arguments. 3017 3018 Where the object is a Message, this will send the message over 3019 this link, creating a new delivery for the purpose. 3020 """ 3021 if hasattr(obj, 'send'): 3022 return obj.send(self, tag=tag) 3023 else: 3024 # treat object as bytes 3025 return self.stream(obj)
3026
3027 - def delivery_tag(self):
3028 if not hasattr(self, 'tag_generator'): 3029 def simple_tags(): 3030 count = 1 3031 while True: 3032 yield str(count) 3033 count += 1
3034 self.tag_generator = simple_tags() 3035 return next(self.tag_generator)
3036
3037 -class Receiver(Link):
3038 """ 3039 A link over which messages are received. 3040 """ 3041
3042 - def flow(self, n):
3043 """Increases the credit issued to the remote sender by the specified number of messages.""" 3044 pn_link_flow(self._impl, n)
3045
3046 - def recv(self, limit):
3047 n, binary = pn_link_recv(self._impl, limit) 3048 if n == PN_EOS: 3049 return None 3050 else: 3051 self._check(n) 3052 return binary
3053
3054 - def drain(self, n):
3055 pn_link_drain(self._impl, n)
3056
3057 - def draining(self):
3058 return pn_link_draining(self._impl)
3059
3060 -class NamedInt(int):
3061 3062 values = {} 3063
3064 - def __new__(cls, i, name):
3065 ni = super(NamedInt, cls).__new__(cls, i) 3066 cls.values[i] = ni 3067 return ni
3068
3069 - def __init__(self, i, name):
3070 self.name = name
3071
3072 - def __repr__(self):
3073 return self.name
3074
3075 - def __str__(self):
3076 return self.name
3077 3078 @classmethod
3079 - def get(cls, i):
3080 return cls.values.get(i, i)
3081
3082 -class DispositionType(NamedInt):
3083 values = {}
3084
3085 -class Disposition(object):
3086 3087 RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED") 3088 ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED") 3089 REJECTED = DispositionType(PN_REJECTED, "REJECTED") 3090 RELEASED = DispositionType(PN_RELEASED, "RELEASED") 3091 MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED") 3092
3093 - def __init__(self, impl, local):
3094 self._impl = impl 3095 self.local = local 3096 self._data = None 3097 self._condition = None 3098 self._annotations = None
3099 3100 @property
3101 - def type(self):
3102 return DispositionType.get(pn_disposition_type(self._impl))
3103
3104 - def _get_section_number(self):
3105 return pn_disposition_get_section_number(self._impl)
3106 - def _set_section_number(self, n):
3107 pn_disposition_set_section_number(self._impl, n)
3108 section_number = property(_get_section_number, _set_section_number) 3109
3110 - def _get_section_offset(self):
3111 return pn_disposition_get_section_offset(self._impl)
3112 - def _set_section_offset(self, n):
3113 pn_disposition_set_section_offset(self._impl, n)
3114 section_offset = property(_get_section_offset, _set_section_offset) 3115
3116 - def _get_failed(self):
3117 return pn_disposition_is_failed(self._impl)
3118 - def _set_failed(self, b):
3119 pn_disposition_set_failed(self._impl, b)
3120 failed = property(_get_failed, _set_failed) 3121
3122 - def _get_undeliverable(self):
3123 return pn_disposition_is_undeliverable(self._impl)
3124 - def _set_undeliverable(self, b):
3125 pn_disposition_set_undeliverable(self._impl, b)
3126 undeliverable = property(_get_undeliverable, _set_undeliverable) 3127
3128 - def _get_data(self):
3129 if self.local: 3130 return self._data 3131 else: 3132 return dat2obj(pn_disposition_data(self._impl))
3133 - def _set_data(self, obj):
3134 if self.local: 3135 self._data = obj 3136 else: 3137 raise AttributeError("data attribute is read-only")
3138 data = property(_get_data, _set_data) 3139
3140 - def _get_annotations(self):
3141 if self.local: 3142 return self._annotations 3143 else: 3144 return dat2obj(pn_disposition_annotations(self._impl))
3145 - def _set_annotations(self, obj):
3146 if self.local: 3147 self._annotations = obj 3148 else: 3149 raise AttributeError("annotations attribute is read-only")
3150 annotations = property(_get_annotations, _set_annotations) 3151
3152 - def _get_condition(self):
3153 if self.local: 3154 return self._condition 3155 else: 3156 return cond2obj(pn_disposition_condition(self._impl))
3157 - def _set_condition(self, obj):
3158 if self.local: 3159 self._condition = obj 3160 else: 3161 raise AttributeError("condition attribute is read-only")
3162 condition = property(_get_condition, _set_condition)
3163
3164 -class Delivery(Wrapper):
3165 """ 3166 Tracks and/or records the delivery of a message over a link. 3167 """ 3168 3169 RECEIVED = Disposition.RECEIVED 3170 ACCEPTED = Disposition.ACCEPTED 3171 REJECTED = Disposition.REJECTED 3172 RELEASED = Disposition.RELEASED 3173 MODIFIED = Disposition.MODIFIED 3174 3175 @staticmethod
3176 - def wrap(impl):
3177 if impl is None: 3178 return None 3179 else: 3180 return Delivery(impl)
3181
3182 - def __init__(self, impl):
3183 Wrapper.__init__(self, impl, pn_delivery_attachments)
3184
3185 - def _init(self):
3186 self.local = Disposition(pn_delivery_local(self._impl), True) 3187 self.remote = Disposition(pn_delivery_remote(self._impl), False)
3188 3189 @property
3190 - def tag(self):
3191 """The identifier for the delivery.""" 3192 return pn_delivery_tag(self._impl)
3193 3194 @property
3195 - def writable(self):
3196 """Returns true for an outgoing delivery to which data can now be written.""" 3197 return pn_delivery_writable(self._impl)
3198 3199 @property
3200 - def readable(self):
3201 """Returns true for an incoming delivery that has data to read.""" 3202 return pn_delivery_readable(self._impl)
3203 3204 @property
3205 - def updated(self):
3206 """Returns true if the state of the delivery has been updated 3207 (e.g. it has been settled and/or accepted, rejected etc).""" 3208 return pn_delivery_updated(self._impl)
3209
3210 - def update(self, state):
3211 """ 3212 Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED. 3213 """ 3214 obj2dat(self.local._data, pn_disposition_data(self.local._impl)) 3215 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl)) 3216 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl)) 3217 pn_delivery_update(self._impl, state)
3218 3219 @property
3220 - def pending(self):
3221 return pn_delivery_pending(self._impl)
3222 3223 @property
3224 - def partial(self):
3225 """ 3226 Returns true for an incoming delivery if not all the data is 3227 yet available. 3228 """ 3229 return pn_delivery_partial(self._impl)
3230 3231 @property
3232 - def local_state(self):
3233 """Returns the local state of the delivery.""" 3234 return DispositionType.get(pn_delivery_local_state(self._impl))
3235 3236 @property
3237 - def remote_state(self):
3238 """ 3239 Returns the state of the delivery as indicated by the remote 3240 peer. 3241 """ 3242 return DispositionType.get(pn_delivery_remote_state(self._impl))
3243 3244 @property
3245 - def settled(self):
3246 """ 3247 Returns true if the delivery has been settled by the remote peer. 3248 """ 3249 return pn_delivery_settled(self._impl)
3250
3251 - def settle(self):
3252 """ 3253 Settles the delivery locally. This indicates the aplication 3254 considers the delivery complete and does not wish to receive any 3255 further events about it. Every delivery should be settled locally. 3256 """ 3257 pn_delivery_settle(self._impl)
3258 3259 @property
3260 - def work_next(self):
3261 return Delivery.wrap(pn_work_next(self._impl))
3262 3263 @property 3269 3270 @property
3271 - def session(self):
3272 """ 3273 Returns the session over which the delivery was sent or received. 3274 """ 3275 return self.link.session
3276 3277 @property
3278 - def connection(self):
3279 """ 3280 Returns the connection over which the delivery was sent or received. 3281 """ 3282 return self.session.connection
3283 3284 @property
3285 - def transport(self):
3286 return self.connection.transport
3287
3288 -class TransportException(ProtonException):
3289 pass
3290
3291 -class TraceAdapter:
3292
3293 - def __init__(self, tracer):
3294 self.tracer = tracer
3295
3296 - def __call__(self, trans_impl, message):
3297 self.tracer(Transport.wrap(trans_impl), message)
3298
3299 -class Transport(Wrapper):
3300 3301 TRACE_OFF = PN_TRACE_OFF 3302 TRACE_DRV = PN_TRACE_DRV 3303 TRACE_FRM = PN_TRACE_FRM 3304 TRACE_RAW = PN_TRACE_RAW 3305 3306 CLIENT = 1 3307 SERVER = 2 3308 3309 @staticmethod
3310 - def wrap(impl):
3311 if impl is None: 3312 return None 3313 else: 3314 return Transport(_impl=impl)
3315
3316 - def __init__(self, mode=None, _impl = pn_transport):
3317 Wrapper.__init__(self, _impl, pn_transport_attachments) 3318 if mode == Transport.SERVER: 3319 pn_transport_set_server(self._impl) 3320 elif mode is None or mode==Transport.CLIENT: 3321 pass 3322 else: 3323 raise TransportException("Cannot initialise Transport from mode: %s" % str(mode))
3324
3325 - def _init(self):
3326 self._sasl = None 3327 self._ssl = None
3328
3329 - def _check(self, err):
3330 if err < 0: 3331 exc = EXCEPTIONS.get(err, TransportException) 3332 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl)))) 3333 else: 3334 return err
3335
3336 - def _set_tracer(self, tracer):
3337 pn_transport_set_pytracer(self._impl, TraceAdapter(tracer));
3338
3339 - def _get_tracer(self):
3340 adapter = pn_transport_get_pytracer(self._impl) 3341 if adapter: 3342 return adapter.tracer 3343 else: 3344 return None
3345 3346 tracer = property(_get_tracer, _set_tracer, 3347 doc=""" 3348 A callback for trace logging. The callback is passed the transport and log message. 3349 """) 3350
3351 - def log(self, message):
3352 pn_transport_log(self._impl, message)
3353
3354 - def require_auth(self, bool):
3355 pn_transport_require_auth(self._impl, bool)
3356 3357 @property
3358 - def authenticated(self):
3359 return pn_transport_is_authenticated(self._impl)
3360
3361 - def require_encryption(self, bool):
3362 pn_transport_require_encryption(self._impl, bool)
3363 3364 @property
3365 - def encrypted(self):
3366 return pn_transport_is_encrypted(self._impl)
3367 3368 @property
3369 - def user(self):
3370 return pn_transport_get_user(self._impl)
3371
3372 - def bind(self, connection):
3373 """Assign a connection to the transport""" 3374 self._check(pn_transport_bind(self._impl, connection._impl))
3375
3376 - def unbind(self):
3377 """Release the connection""" 3378 self._check(pn_transport_unbind(self._impl))
3379
3380 - def trace(self, n):
3381 pn_transport_trace(self._impl, n)
3382
3383 - def tick(self, now):
3384 """Process any timed events (like heartbeat generation). 3385 now = seconds since epoch (float). 3386 """ 3387 return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
3388
3389 - def capacity(self):
3390 c = pn_transport_capacity(self._impl) 3391 if c >= PN_EOS: 3392 return c 3393 else: 3394 return self._check(c)
3395
3396 - def push(self, binary):
3397 n = self._check(pn_transport_push(self._impl, binary)) 3398 if n != len(binary): 3399 raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary)))
3400
3401 - def close_tail(self):
3402 self._check(pn_transport_close_tail(self._impl))
3403
3404 - def pending(self):
3405 p = pn_transport_pending(self._impl) 3406 if p >= PN_EOS: 3407 return p 3408 else: 3409 return self._check(p)
3410
3411 - def peek(self, size):
3412 cd, out = pn_transport_peek(self._impl, size) 3413 if cd == PN_EOS: 3414 return None 3415 else: 3416 self._check(cd) 3417 return out
3418
3419 - def pop(self, size):
3420 pn_transport_pop(self._impl, size)
3421
3422 - def close_head(self):
3423 self._check(pn_transport_close_head(self._impl))
3424 3425 @property
3426 - def closed(self):
3427 return pn_transport_closed(self._impl)
3428 3429 # AMQP 1.0 max-frame-size
3430 - def _get_max_frame_size(self):
3431 return pn_transport_get_max_frame(self._impl)
3432
3433 - def _set_max_frame_size(self, value):
3434 pn_transport_set_max_frame(self._impl, value)
3435 3436 max_frame_size = property(_get_max_frame_size, _set_max_frame_size, 3437 doc=""" 3438 Sets the maximum size for received frames (in bytes). 3439 """) 3440 3441 @property
3442 - def remote_max_frame_size(self):
3443 return pn_transport_get_remote_max_frame(self._impl)
3444
3445 - def _get_channel_max(self):
3446 return pn_transport_get_channel_max(self._impl)
3447
3448 - def _set_channel_max(self, value):
3449 if pn_transport_set_channel_max(self._impl, value): 3450 raise SessionException("Too late to change channel max.")
3451 3452 channel_max = property(_get_channel_max, _set_channel_max, 3453 doc=""" 3454 Sets the maximum channel that may be used on the transport. 3455 """) 3456 3457 @property
3458 - def remote_channel_max(self):
3459 return pn_transport_remote_channel_max(self._impl)
3460 3461 # AMQP 1.0 idle-time-out
3462 - def _get_idle_timeout(self):
3463 return millis2secs(pn_transport_get_idle_timeout(self._impl))
3464
3465 - def _set_idle_timeout(self, sec):
3466 pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
3467 3468 idle_timeout = property(_get_idle_timeout, _set_idle_timeout, 3469 doc=""" 3470 The idle timeout of the connection (float, in seconds). 3471 """) 3472 3473 @property
3474 - def remote_idle_timeout(self):
3475 return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
3476 3477 @property
3478 - def frames_output(self):
3479 return pn_transport_get_frames_output(self._impl)
3480 3481 @property
3482 - def frames_input(self):
3483 return pn_transport_get_frames_input(self._impl)
3484
3485 - def sasl(self):
3486 return SASL(self)
3487
3488 - def ssl(self, domain=None, session_details=None):
3489 # SSL factory (singleton for this transport) 3490 if not self._ssl: 3491 self._ssl = SSL(self, domain, session_details) 3492 return self._ssl
3493 3494 @property
3495 - def condition(self):
3496 return cond2obj(pn_transport_condition(self._impl))
3497 3498 @property
3499 - def connection(self):
3500 return Connection.wrap(pn_transport_connection(self._impl))
3501
3502 -class SASLException(TransportException):
3503 pass
3504
3505 -class SASL(Wrapper):
3506 3507 OK = PN_SASL_OK 3508 AUTH = PN_SASL_AUTH 3509 SYS = PN_SASL_SYS 3510 PERM = PN_SASL_PERM 3511 TEMP = PN_SASL_TEMP 3512 3513 @staticmethod
3514 - def extended():
3515 return pn_sasl_extended()
3516
3517 - def __init__(self, transport):
3518 Wrapper.__init__(self, transport._impl, pn_transport_attachments) 3519 self._sasl = pn_sasl(transport._impl)
3520
3521 - def _check(self, err):
3522 if err < 0: 3523 exc = EXCEPTIONS.get(err, SASLException) 3524 raise exc("[%s]" % (err)) 3525 else: 3526 return err
3527 3528 @property
3529 - def user(self):
3530 return pn_sasl_get_user(self._sasl)
3531 3532 @property
3533 - def mech(self):
3534 return pn_sasl_get_mech(self._sasl)
3535 3536 @property
3537 - def outcome(self):
3538 outcome = pn_sasl_outcome(self._sasl) 3539 if outcome == PN_SASL_NONE: 3540 return None 3541 else: 3542 return outcome
3543
3544 - def allowed_mechs(self, mechs):
3545 pn_sasl_allowed_mechs(self._sasl, mechs)
3546
3547 - def _get_allow_insecure_mechs(self):
3548 return pn_sasl_get_allow_insecure_mechs(self._sasl)
3549
3550 - def _set_allow_insecure_mechs(self, insecure):
3551 pn_sasl_set_allow_insecure_mechs(self._sasl, insecure)
3552 3553 allow_insecure_mechs = property(_get_allow_insecure_mechs, _set_allow_insecure_mechs, 3554 doc=""" 3555 Allow unencrypted cleartext passwords (PLAIN mech) 3556 """) 3557
3558 - def done(self, outcome):
3559 pn_sasl_done(self._sasl, outcome)
3560
3561 - def config_name(self, name):
3562 pn_sasl_config_name(self._sasl, name)
3563
3564 - def config_path(self, path):
3565 pn_sasl_config_path(self._sasl, path)
3566
3567 -class SSLException(TransportException):
3568 pass
3569
3570 -class SSLUnavailable(SSLException):
3571 pass
3572
3573 -class SSLDomain(object):
3574 3575 MODE_CLIENT = PN_SSL_MODE_CLIENT 3576 MODE_SERVER = PN_SSL_MODE_SERVER 3577 VERIFY_PEER = PN_SSL_VERIFY_PEER 3578 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME 3579 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER 3580
3581 - def __init__(self, mode):
3582 self._domain = pn_ssl_domain(mode) 3583 if self._domain is None: 3584 raise SSLUnavailable()
3585
3586 - def _check(self, err):
3587 if err < 0: 3588 exc = EXCEPTIONS.get(err, SSLException) 3589 raise exc("SSL failure.") 3590 else: 3591 return err
3592
3593 - def set_credentials(self, cert_file, key_file, password):
3594 return self._check( pn_ssl_domain_set_credentials(self._domain, 3595 cert_file, key_file, 3596 password) )
3597 - def set_trusted_ca_db(self, certificate_db):
3598 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain, 3599 certificate_db) )
3600 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
3601 return self._check( pn_ssl_domain_set_peer_authentication(self._domain, 3602 verify_mode, 3603 trusted_CAs) )
3604
3605 - def allow_unsecured_client(self):
3606 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
3607
3608 - def __del__(self):
3609 pn_ssl_domain_free(self._domain)
3610
3611 -class SSL(object):
3612 3613 @staticmethod
3614 - def present():
3615 return pn_ssl_present()
3616
3617 - def _check(self, err):
3618 if err < 0: 3619 exc = EXCEPTIONS.get(err, SSLException) 3620 raise exc("SSL failure.") 3621 else: 3622 return err
3623
3624 - def __new__(cls, transport, domain, session_details=None):
3625 """Enforce a singleton SSL object per Transport""" 3626 if transport._ssl: 3627 # unfortunately, we've combined the allocation and the configuration in a 3628 # single step. So catch any attempt by the application to provide what 3629 # may be a different configuration than the original (hack) 3630 ssl = transport._ssl 3631 if (domain and (ssl._domain is not domain) or 3632 session_details and (ssl._session_details is not session_details)): 3633 raise SSLException("Cannot re-configure existing SSL object!") 3634 else: 3635 obj = super(SSL, cls).__new__(cls) 3636 obj._domain = domain 3637 obj._session_details = session_details 3638 session_id = None 3639 if session_details: 3640 session_id = session_details.get_session_id() 3641 obj._ssl = pn_ssl( transport._impl ) 3642 if obj._ssl is None: 3643 raise SSLUnavailable() 3644 if domain: 3645 pn_ssl_init( obj._ssl, domain._domain, session_id ) 3646 transport._ssl = obj 3647 return transport._ssl
3648
3649 - def cipher_name(self):
3650 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 ) 3651 if rc: 3652 return name 3653 return None
3654
3655 - def protocol_name(self):
3656 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 ) 3657 if rc: 3658 return name 3659 return None
3660 3661 SHA1 = PN_SSL_SHA1 3662 SHA256 = PN_SSL_SHA256 3663 SHA512 = PN_SSL_SHA512 3664 MD5 = PN_SSL_MD5 3665 3666 CERT_COUNTRY_NAME = PN_SSL_CERT_SUBJECT_COUNTRY_NAME 3667 CERT_STATE_OR_PROVINCE = PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE 3668 CERT_CITY_OR_LOCALITY = PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY 3669 CERT_ORGANIZATION_NAME = PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME 3670 CERT_ORGANIZATION_UNIT = PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT 3671 CERT_COMMON_NAME = PN_SSL_CERT_SUBJECT_COMMON_NAME 3672
3673 - def get_cert_subject_subfield(self, subfield_name):
3674 subfield_value = pn_ssl_get_remote_subject_subfield(self._ssl, subfield_name) 3675 return subfield_value
3676
3677 - def get_cert_subject(self):
3678 subject = pn_ssl_get_remote_subject(self._ssl) 3679 return subject
3680
3682 # Pass in an unhandled enum 3683 return self.get_cert_subject_subfield(10)
3684 3685 # Convenience functions for obtaining the subfields of the subject field.
3686 - def get_cert_common_name(self):
3688
3689 - def get_cert_organization(self):
3691
3692 - def get_cert_organization_unit(self):
3694
3695 - def get_cert_locality_or_city(self):
3697
3698 - def get_cert_country(self):
3700
3701 - def get_cert_state_or_province(self):
3703
3704 - def get_cert_fingerprint(self, fingerprint_length, digest_name):
3705 rc, fingerprint_str = pn_ssl_get_cert_fingerprint(self._ssl, fingerprint_length, digest_name) 3706 if rc == PN_OK: 3707 return fingerprint_str 3708 return None
3709 3710 # Convenience functions for obtaining fingerprint for specific hashing algorithms
3712 return self.get_cert_fingerprint(41, 10)
3713
3714 - def get_cert_fingerprint_sha1(self):
3715 return self.get_cert_fingerprint(41, SSL.SHA1)
3716
3718 # sha256 produces a fingerprint that is 64 characters long 3719 return self.get_cert_fingerprint(65, SSL.SHA256)
3720
3722 # sha512 produces a fingerprint that is 128 characters long 3723 return self.get_cert_fingerprint(129, SSL.SHA512)
3724
3725 - def get_cert_fingerprint_md5(self):
3726 return self.get_cert_fingerprint(33, SSL.MD5)
3727 3728 @property
3729 - def remote_subject(self):
3730 return pn_ssl_get_remote_subject( self._ssl )
3731 3732 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN 3733 RESUME_NEW = PN_SSL_RESUME_NEW 3734 RESUME_REUSED = PN_SSL_RESUME_REUSED 3735
3736 - def resume_status(self):
3737 return pn_ssl_resume_status( self._ssl )
3738
3739 - def _set_peer_hostname(self, hostname):
3740 self._check(pn_ssl_set_peer_hostname( self._ssl, unicode2utf8(hostname) ))
3741 - def _get_peer_hostname(self):
3742 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 ) 3743 self._check(err) 3744 return utf82unicode(name)
3745 peer_hostname = property(_get_peer_hostname, _set_peer_hostname, 3746 doc=""" 3747 Manage the expected name of the remote peer. Used to authenticate the remote. 3748 """)
3749
3750 3751 -class SSLSessionDetails(object):
3752 """ Unique identifier for the SSL session. Used to resume previous session on a new 3753 SSL connection. 3754 """ 3755
3756 - def __init__(self, session_id):
3757 self._session_id = session_id
3758
3759 - def get_session_id(self):
3760 return self._session_id
3761 3762 3763 wrappers = { 3764 "pn_void": lambda x: pn_void2py(x), 3765 "pn_pyref": lambda x: pn_void2py(x), 3766 "pn_connection": lambda x: Connection.wrap(pn_cast_pn_connection(x)), 3767 "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)), 3768 "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)), 3769 "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)), 3770 "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)), 3771 "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x)) 3772 }
3773 3774 -class Collector:
3775
3776 - def __init__(self):
3777 self._impl = pn_collector()
3778
3779 - def put(self, obj, etype):
3780 pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number)
3781
3782 - def peek(self):
3783 return Event.wrap(pn_collector_peek(self._impl))
3784
3785 - def pop(self):
3786 ev = self.peek() 3787 pn_collector_pop(self._impl)
3788
3789 - def __del__(self):
3790 pn_collector_free(self._impl) 3791 del self._impl
3792 3793 if "TypeExtender" not in globals():
3794 - class TypeExtender:
3795 - def __init__(self, number):
3796 self.number = number
3797 - def next(self):
3798 try: 3799 return self.number 3800 finally: 3801 self.number += 1
3802
3803 -class EventType(object):
3804 3805 _lock = threading.Lock() 3806 _extended = TypeExtender(10000) 3807 TYPES = {} 3808
3809 - def __init__(self, name=None, number=None, method=None):
3810 if name is None and number is None: 3811 raise TypeError("extended events require a name") 3812 try: 3813 self._lock.acquire() 3814 if name is None: 3815 name = pn_event_type_name(number) 3816 3817 if number is None: 3818 number = self._extended.next() 3819 3820 if method is None: 3821 method = "on_%s" % name 3822 3823 self.name = name 3824 self.number = number 3825 self.method = method 3826 3827 self.TYPES[number] = self 3828 finally: 3829 self._lock.release()
3830
3831 - def __repr__(self):
3832 return self.name
3833
3834 -def dispatch(handler, method, *args):
3835 m = getattr(handler, method, None) 3836 if m: 3837 return m(*args) 3838 elif hasattr(handler, "on_unhandled"): 3839 return handler.on_unhandled(method, *args)
3840
3841 -class EventBase(object):
3842
3843 - def __init__(self, clazz, context, type):
3844 self.clazz = clazz 3845 self.context = context 3846 self.type = type
3847
3848 - def dispatch(self, handler):
3849 return dispatch(handler, self.type.method, self)
3850
3851 -def _none(x): return None
3852 3853 DELEGATED = Constant("DELEGATED")
3854 3855 -def _core(number, method):
3856 return EventType(number=number, method=method)
3857
3858 -class Event(Wrapper, EventBase):
3859 3860 REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init") 3861 REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced") 3862 REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final") 3863 3864 TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task") 3865 3866 CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init") 3867 CONNECTION_BOUND = _core(PN_CONNECTION_BOUND, "on_connection_bound") 3868 CONNECTION_UNBOUND = _core(PN_CONNECTION_UNBOUND, "on_connection_unbound") 3869 CONNECTION_LOCAL_OPEN = _core(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open") 3870 CONNECTION_LOCAL_CLOSE = _core(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close") 3871 CONNECTION_REMOTE_OPEN = _core(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open") 3872 CONNECTION_REMOTE_CLOSE = _core(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close") 3873 CONNECTION_FINAL = _core(PN_CONNECTION_FINAL, "on_connection_final") 3874 3875 SESSION_INIT = _core(PN_SESSION_INIT, "on_session_init") 3876 SESSION_LOCAL_OPEN = _core(PN_SESSION_LOCAL_OPEN, "on_session_local_open") 3877 SESSION_LOCAL_CLOSE = _core(PN_SESSION_LOCAL_CLOSE, "on_session_local_close") 3878 SESSION_REMOTE_OPEN = _core(PN_SESSION_REMOTE_OPEN, "on_session_remote_open") 3879 SESSION_REMOTE_CLOSE = _core(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close") 3880 SESSION_FINAL = _core(PN_SESSION_FINAL, "on_session_final") 3881 3882 LINK_INIT = _core(PN_LINK_INIT, "on_link_init") 3883 LINK_LOCAL_OPEN = _core(PN_LINK_LOCAL_OPEN, "on_link_local_open") 3884 LINK_LOCAL_CLOSE = _core(PN_LINK_LOCAL_CLOSE, "on_link_local_close") 3885 LINK_LOCAL_DETACH = _core(PN_LINK_LOCAL_DETACH, "on_link_local_detach") 3886 LINK_REMOTE_OPEN = _core(PN_LINK_REMOTE_OPEN, "on_link_remote_open") 3887 LINK_REMOTE_CLOSE = _core(PN_LINK_REMOTE_CLOSE, "on_link_remote_close") 3888 LINK_REMOTE_DETACH = _core(PN_LINK_REMOTE_DETACH, "on_link_remote_detach") 3889 LINK_FLOW = _core(PN_LINK_FLOW, "on_link_flow") 3890 LINK_FINAL = _core(PN_LINK_FINAL, "on_link_final") 3891 3892 DELIVERY = _core(PN_DELIVERY, "on_delivery") 3893 3894 TRANSPORT = _core(PN_TRANSPORT, "on_transport") 3895 TRANSPORT_ERROR = _core(PN_TRANSPORT_ERROR, "on_transport_error") 3896 TRANSPORT_HEAD_CLOSED = _core(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed") 3897 TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed") 3898 TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed") 3899 3900 SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init") 3901 SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated") 3902 SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable") 3903 SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable") 3904 SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired") 3905 SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error") 3906 SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final") 3907 3908 @staticmethod
3909 - def wrap(impl, number=None):
3910 if impl is None: 3911 return None 3912 3913 if number is None: 3914 number = pn_event_type(impl) 3915 3916 event = Event(impl, number) 3917 3918 # check for an application defined ApplicationEvent and return that. This 3919 # avoids an expensive wrap operation invoked by event.context 3920 if pn_event_class(impl) == PN_PYREF and \ 3921 isinstance(event.context, EventBase): 3922 return event.context 3923 else: 3924 return event
3925
3926 - def __init__(self, impl, number):
3927 Wrapper.__init__(self, impl, pn_event_attachments) 3928 self.__dict__["type"] = EventType.TYPES[number]
3929
3930 - def _init(self):
3931 pass
3932
3933 - def copy(self):
3934 copy = pn_event_copy(self._impl) 3935 return Event.wrap(copy)
3936 3937 @property
3938 - def clazz(self):
3939 cls = pn_event_class(self._impl) 3940 if cls: 3941 return pn_class_name(cls) 3942 else: 3943 return None
3944 3945 @property
3946 - def root(self):
3947 return WrappedHandler.wrap(pn_event_root(self._impl))
3948 3949 @property
3950 - def context(self):
3951 """Returns the context object associated with the event. The type of this depend on the type of event.""" 3952 return wrappers[self.clazz](pn_event_context(self._impl))
3953
3954 - def dispatch(self, handler, type=None):
3955 type = type or self.type 3956 if isinstance(handler, WrappedHandler): 3957 pn_handler_dispatch(handler._impl, self._impl, type.number) 3958 else: 3959 result = dispatch(handler, type.method, self) 3960 if result != DELEGATED and hasattr(handler, "handlers"): 3961 for h in handler.handlers: 3962 self.dispatch(h, type)
3963 3964 3965 @property
3966 - def reactor(self):
3967 """Returns the reactor associated with the event.""" 3968 return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl))
3969
3970 - def __getattr__(self, name):
3971 r = self.reactor 3972 if r and hasattr(r, 'subclass') and r.subclass.__name__.lower() == name: 3973 return r 3974 else: 3975 return super(Event, self).__getattr__(name)
3976 3977 @property
3978 - def transport(self):
3979 """Returns the transport associated with the event, or null if none is associated with it.""" 3980 return Transport.wrap(pn_event_transport(self._impl))
3981 3982 @property
3983 - def connection(self):
3984 """Returns the connection associated with the event, or null if none is associated with it.""" 3985 return Connection.wrap(pn_event_connection(self._impl))
3986 3987 @property
3988 - def session(self):
3989 """Returns the session associated with the event, or null if none is associated with it.""" 3990 return Session.wrap(pn_event_session(self._impl))
3991 3992 @property 3996 3997 @property
3998 - def sender(self):
3999 """Returns the sender link associated with the event, or null if 4000 none is associated with it. This is essentially an alias for 4001 link(), that does an additional checkon the type of the 4002 link.""" 4003 l = self.link 4004 if l and l.is_sender: 4005 return l 4006 else: 4007 return None
4008 4009 @property
4010 - def receiver(self):
4011 """Returns the receiver link associated with the event, or null if 4012 none is associated with it. This is essentially an alias for 4013 link(), that does an additional checkon the type of the link.""" 4014 l = self.link 4015 if l and l.is_receiver: 4016 return l 4017 else: 4018 return None
4019 4020 @property
4021 - def delivery(self):
4022 """Returns the delivery associated with the event, or null if none is associated with it.""" 4023 return Delivery.wrap(pn_event_delivery(self._impl))
4024
4025 - def __repr__(self):
4026 return "%s(%s)" % (self.type, self.context)
4027
4028 -class LazyHandlers(object):
4029 - def __get__(self, obj, clazz):
4030 if obj is None: 4031 return self 4032 ret = [] 4033 obj.__dict__['handlers'] = ret 4034 return ret
4035
4036 -class Handler(object):
4037 handlers = LazyHandlers() 4038
4039 - def on_unhandled(self, method, *args):
4040 pass
4041
4042 -class _cadapter:
4043
4044 - def __init__(self, handler, on_error=None):
4045 self.handler = handler 4046 self.on_error = on_error
4047
4048 - def dispatch(self, cevent, ctype):
4049 ev = Event.wrap(cevent, ctype) 4050 ev.dispatch(self.handler)
4051
4052 - def exception(self, exc, val, tb):
4053 if self.on_error is None: 4054 _compat.raise_(exc, val, tb) 4055 else: 4056 self.on_error((exc, val, tb))
4057
4058 -class WrappedHandlersChildSurrogate:
4059 - def __init__(self, delegate):
4060 self.handlers = [] 4061 self.delegate = weakref.ref(delegate)
4062
4063 - def on_unhandled(self, method, event):
4064 delegate = self.delegate() 4065 if delegate: 4066 dispatch(delegate, method, event)
4067
4068 4069 -class WrappedHandlersProperty(object):
4070 - def __get__(self, obj, clazz):
4071 if obj is None: 4072 return None 4073 return self.surrogate(obj).handlers
4074
4075 - def __set__(self, obj, value):
4076 self.surrogate(obj).handlers = value
4077
4078 - def surrogate(self, obj):
4079 key = "_surrogate" 4080 objdict = obj.__dict__ 4081 surrogate = objdict.get(key, None) 4082 if surrogate is None: 4083 objdict[key] = surrogate = WrappedHandlersChildSurrogate(obj) 4084 obj.add(surrogate) 4085 return surrogate
4086
4087 -class WrappedHandler(Wrapper):
4088 4089 handlers = WrappedHandlersProperty() 4090 4091 @classmethod
4092 - def wrap(cls, impl, on_error=None):
4093 if impl is None: 4094 return None 4095 else: 4096 handler = cls(impl) 4097 handler.__dict__["on_error"] = on_error 4098 return handler
4099
4100 - def __init__(self, impl_or_constructor):
4101 Wrapper.__init__(self, impl_or_constructor) 4102 if list(self.__class__.__mro__).index(WrappedHandler) > 1: 4103 # instantiate the surrogate 4104 self.handlers.extend([])
4105
4106 - def _on_error(self, info):
4107 on_error = getattr(self, "on_error", None) 4108 if on_error is None: 4109 _compat.raise_(info[0], info[1], info[2]) 4110 else: 4111 on_error(info)
4112
4113 - def add(self, handler):
4114 if handler is None: return 4115 impl = _chandler(handler, self._on_error) 4116 pn_handler_add(self._impl, impl) 4117 pn_decref(impl)
4118
4119 - def clear(self):
4120 pn_handler_clear(self._impl)
4121
4122 -def _chandler(obj, on_error=None):
4123 if obj is None: 4124 return None 4125 elif isinstance(obj, WrappedHandler): 4126 impl = obj._impl 4127 pn_incref(impl) 4128 return impl 4129 else: 4130 return pn_pyhandler(_cadapter(obj, on_error))
4131
4132 -class Url(object):
4133 """ 4134 Simple URL parser/constructor, handles URLs of the form: 4135 4136 <scheme>://<user>:<password>@<host>:<port>/<path> 4137 4138 All components can be None if not specifeid in the URL string. 4139 4140 The port can be specified as a service name, e.g. 'amqp' in the 4141 URL string but Url.port always gives the integer value. 4142 4143 @ivar scheme: Url scheme e.g. 'amqp' or 'amqps' 4144 @ivar user: Username 4145 @ivar password: Password 4146 @ivar host: Host name, ipv6 literal or ipv4 dotted quad. 4147 @ivar port: Integer port. 4148 @ivar host_port: Returns host:port 4149 """ 4150 4151 AMQPS = "amqps" 4152 AMQP = "amqp" 4153
4154 - class Port(int):
4155 """An integer port number that can be constructed from a service name string""" 4156
4157 - def __new__(cls, value):
4158 """@param value: integer port number or string service name.""" 4159 port = super(Url.Port, cls).__new__(cls, cls._port_int(value)) 4160 setattr(port, 'name', str(value)) 4161 return port
4162
4163 - def __eq__(self, x): return str(self) == x or int(self) == x
4164 - def __ne__(self, x): return not self == x
4165 - def __str__(self): return str(self.name)
4166 4167 @staticmethod
4168 - def _port_int(value):
4169 """Convert service, an integer or a service name, into an integer port number.""" 4170 try: 4171 return int(value) 4172 except ValueError: 4173 try: 4174 return socket.getservbyname(value) 4175 except socket.error: 4176 # Not every system has amqp/amqps defined as a service 4177 if value == Url.AMQPS: return 5671 4178 elif value == Url.AMQP: return 5672 4179 else: 4180 raise ValueError("Not a valid port number or service name: '%s'" % value)
4181
4182 - def __init__(self, url=None, defaults=True, **kwargs):
4183 """ 4184 @param url: URL string to parse. 4185 @param defaults: If true, fill in missing default values in the URL. 4186 If false, you can fill them in later by calling self.defaults() 4187 @param kwargs: scheme, user, password, host, port, path. 4188 If specified, replaces corresponding part in url string. 4189 """ 4190 if url: 4191 self._url = pn_url_parse(unicode2utf8(str(url))) 4192 if not self._url: raise ValueError("Invalid URL '%s'" % url) 4193 else: 4194 self._url = pn_url() 4195 for k in kwargs: # Let kwargs override values parsed from url 4196 getattr(self, k) # Check for invalid kwargs 4197 setattr(self, k, kwargs[k]) 4198 if defaults: self.defaults()
4199
4200 - class PartDescriptor(object):
4201 - def __init__(self, part):
4202 self.getter = globals()["pn_url_get_%s" % part] 4203 self.setter = globals()["pn_url_set_%s" % part]
4204 - def __get__(self, obj, type=None): return self.getter(obj._url)
4205 - def __set__(self, obj, value): return self.setter(obj._url, str(value))
4206 4207 scheme = PartDescriptor('scheme') 4208 username = PartDescriptor('username') 4209 password = PartDescriptor('password') 4210 host = PartDescriptor('host') 4211 path = PartDescriptor('path') 4212
4213 - def _get_port(self):
4214 portstr = pn_url_get_port(self._url) 4215 return portstr and Url.Port(portstr)
4216
4217 - def _set_port(self, value):
4218 if value is None: pn_url_set_port(self._url, None) 4219 else: pn_url_set_port(self._url, str(Url.Port(value)))
4220 4221 port = property(_get_port, _set_port) 4222
4223 - def __str__(self): return pn_url_str(self._url)
4224
4225 - def __repr__(self): return "Url(%r)" % str(self)
4226
4227 - def __eq__(self, x): return str(self) == str(x)
4228 - def __ne__(self, x): return not self == x
4229
4230 - def __del__(self):
4231 pn_url_free(self._url); 4232 del self._url
4233
4234 - def defaults(self):
4235 """ 4236 Fill in missing values (scheme, host or port) with defaults 4237 @return: self 4238 """ 4239 self.scheme = self.scheme or self.AMQP 4240 self.host = self.host or '0.0.0.0' 4241 self.port = self.port or self.Port(self.scheme) 4242 return self
4243 4244 __all__ = [ 4245 "API_LANGUAGE", 4246 "IMPLEMENTATION_LANGUAGE", 4247 "ABORTED", 4248 "ACCEPTED", 4249 "AUTOMATIC", 4250 "PENDING", 4251 "MANUAL", 4252 "REJECTED", 4253 "RELEASED", 4254 "MODIFIED", 4255 "SETTLED", 4256 "UNDESCRIBED", 4257 "Array", 4258 "Collector", 4259 "Condition", 4260 "Connection", 4261 "Data", 4262 "Delivery", 4263 "Disposition", 4264 "Described", 4265 "Endpoint", 4266 "Event", 4267 "EventType", 4268 "Handler", 4269 "Link", 4270 "Message", 4271 "MessageException", 4272 "Messenger", 4273 "MessengerException", 4274 "ProtonException", 4275 "VERSION_MAJOR", 4276 "VERSION_MINOR", 4277 "Receiver", 4278 "SASL", 4279 "Sender", 4280 "Session", 4281 "SessionException", 4282 "SSL", 4283 "SSLDomain", 4284 "SSLSessionDetails", 4285 "SSLUnavailable", 4286 "SSLException", 4287 "Terminus", 4288 "Timeout", 4289 "Interrupt", 4290 "Transport", 4291 "TransportException", 4292 "Url", 4293 "char", 4294 "dispatch", 4295 "symbol", 4296 "timestamp", 4297 "ulong", 4298 "byte", 4299 "short", 4300 "int32", 4301 "ubyte", 4302 "ushort", 4303 "uint", 4304 "float32", 4305 "decimal32", 4306 "decimal64", 4307 "decimal128" 4308 ] 4309