Class | Bunny::Queue09 |
In: |
lib/bunny/queue09.rb
|
Parent: | Qrack::Queue |
Queues store and forward messages. Queues can be configured in the server or created at runtime. Queues must be attached to at least one exchange in order to receive messages from publishers.
# File lib/bunny/queue09.rb, line 14 14: def initialize(client, name, opts = {}) 15: # check connection to server 16: raise Bunny::ConnectionError, 'Not connected to server' if client.status == :not_connected 17: 18: @client = client 19: @opts = opts 20: @delivery_tag = nil 21: 22: # Queues without a given name are named by the server and are generally 23: # bound to the process that created them. 24: if !name 25: opts = { 26: :passive => false, 27: :durable => false, 28: :exclusive => true, 29: :auto_delete => true, 30: :deprecated_ticket => 0 31: }.merge(opts) 32: end 33: 34: # ignore the :nowait option if passed, otherwise program will hang waiting for a 35: # response that will not be sent by the server 36: opts.delete(:nowait) 37: 38: client.send_frame( 39: Qrack::Protocol09::Queue::Declare.new({ :queue => name || '', :nowait => false, :deprecated_ticket => 0 }.merge(opts)) 40: ) 41: 42: method = client.next_method 43: 44: client.check_response(method, Qrack::Protocol09::Queue::DeclareOk, "Error declaring queue #{name}") 45: 46: @name = method.queue 47: client.queues[@name] = self 48: end
Acknowledges one or more messages delivered via the Deliver or Get-Ok methods. The client can ask to confirm a single message or a set of messages up to and including a specific message.
# File lib/bunny/queue09.rb, line 67 67: def ack(opts={}) 68: # Set delivery tag 69: if delivery_tag.nil? and opts[:delivery_tag].nil? 70: raise Bunny::AcknowledgementError, "No delivery tag received" 71: else 72: self.delivery_tag = opts[:delivery_tag] if delivery_tag.nil? 73: end 74: 75: client.send_frame( 76: Qrack::Protocol09::Basic::Ack.new({:delivery_tag => delivery_tag, :multiple => false}.merge(opts)) 77: ) 78: 79: # reset delivery tag 80: self.delivery_tag = nil 81: end
Binds a queue to an exchange. Until a queue is bound it will not receive any messages. Queues are bound to the direct exchange ’’ by default. If error occurs, a Bunny::ProtocolError is raised.
:bind_ok if successful.
# File lib/bunny/queue09.rb, line 100 100: def bind(exchange, opts = {}) 101: exchange = exchange.respond_to?(:name) ? exchange.name : exchange 102: 103: # ignore the :nowait option if passed, otherwise program will hang waiting for a 104: # response that will not be sent by the server 105: opts.delete(:nowait) 106: 107: client.send_frame( 108: Qrack::Protocol09::Queue::Bind.new({ :queue => name, 109: :exchange => exchange, 110: :routing_key => opts.delete(:key), 111: :nowait => false, 112: :deprecated_ticket => 0 }.merge(opts)) 113: ) 114: 115: method = client.next_method 116: 117: client.check_response(method, Qrack::Protocol09::Queue::BindOk, 118: "Error binding queue: #{name} to exchange: #{exchange}") 119: 120: # return message 121: :bind_ok 122: end
Requests that a queue is deleted from broker/server. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration. Removes reference from queues if successful. If an error occurs raises Bunny::ProtocolError.
:delete_ok if successful
# File lib/bunny/queue09.rb, line 147 147: def delete(opts = {}) 148: # ignore the :nowait option if passed, otherwise program will hang waiting for a 149: # response that will not be sent by the server 150: opts.delete(:nowait) 151: 152: client.send_frame( 153: Qrack::Protocol09::Queue::Delete.new({ :queue => name, :nowait => false, :deprecated_ticket => 0 }.merge(opts)) 154: ) 155: 156: method = client.next_method 157: 158: client.check_response(method, Qrack::Protocol09::Queue::DeleteOk, "Error deleting queue #{name}") 159: 160: client.queues.delete(name) 161: 162: # return confirmation 163: :delete_ok 164: end
Gets a message from a queue in a synchronous way. If error occurs, raises Bunny::ProtocolError.
Hash {:header, :payload, :delivery_details}. :delivery_details is a hash {:consumer_tag, :delivery_tag, :redelivered, :exchange, :routing_key}.
If the queue is empty the returned hash will contain the values -
:header => nil :payload => :queue_empty :delivery_details => nil
N.B. If a block is provided then the hash will be passed into the block and the return value will be nil.
# File lib/bunny/queue09.rb, line 195 195: def pop(opts = {}, &blk) 196: 197: # do we want to have to provide an acknowledgement? 198: ack = opts.delete(:ack) 199: 200: client.send_frame( 201: Qrack::Protocol09::Basic::Get.new({ :queue => name, 202: :consumer_tag => name, 203: :no_ack => !ack, 204: :nowait => true, 205: :deprecated_ticket => 0 }.merge(opts)) 206: ) 207: 208: method = client.next_method 209: 210: if method.is_a?(Qrack::Protocol09::Basic::GetEmpty) then 211: queue_empty = true 212: elsif !method.is_a?(Qrack::Protocol09::Basic::GetOk) 213: raise Bunny::ProtocolError, "Error getting message from queue #{name}" 214: end 215: 216: if !queue_empty 217: # get delivery tag to use for acknowledge 218: self.delivery_tag = method.delivery_tag if ack 219: 220: header = client.next_payload 221: 222: # If maximum frame size is smaller than message payload body then message 223: # will have a message header and several message bodies 224: msg = '' 225: while msg.length < header.size 226: msg += client.next_payload 227: end 228: 229: msg_hash = {:header => header, :payload => msg, :delivery_details => method.arguments} 230: 231: else 232: msg_hash = {:header => nil, :payload => :queue_empty, :delivery_details => nil} 233: end 234: 235: # Pass message hash to block or return message hash 236: blk ? blk.call(msg_hash) : msg_hash 237: 238: end
Removes all messages from a queue. It does not cancel consumers. Purged messages are deleted without any formal "undo" mechanism. If an error occurs raises Bunny::ProtocolError.
:purge_ok if successful
# File lib/bunny/queue09.rb, line 256 256: def purge(opts = {}) 257: # ignore the :nowait option if passed, otherwise program will hang waiting for a 258: # response that will not be sent by the server 259: opts.delete(:nowait) 260: 261: client.send_frame( 262: Qrack::Protocol09::Queue::Purge.new({ :queue => name, :nowait => false, :deprecated_ticket => 0 }.merge(opts)) 263: ) 264: 265: method = client.next_method 266: 267: client.check_response(method, Qrack::Protocol09::Queue::PurgeOk, "Error purging queue #{name}") 268: 269: # return confirmation 270: :purge_ok 271: 272: end
Returns hash {:message_count, :consumer_count}.
# File lib/bunny/queue09.rb, line 282 282: def status 283: client.send_frame( 284: Qrack::Protocol09::Queue::Declare.new({ :queue => name, :passive => true, :deprecated_ticket => 0 }) 285: ) 286: method = client.next_method 287: {:message_count => method.message_count, :consumer_count => method.consumer_count} 288: end
# File lib/bunny/queue09.rb, line 290 290: def subscribe(opts = {}, &blk) 291: # Create subscription 292: s = Bunny::Subscription09.new(client, self, opts) 293: s.start(&blk) 294: 295: # Reset when subscription finished 296: @subscription = nil 297: end
Removes a queue binding from an exchange. If error occurs, a Bunny::ProtocolError is raised.
:unbind_ok if successful.
# File lib/bunny/queue09.rb, line 316 316: def unbind(exchange, opts = {}) 317: exchange = exchange.respond_to?(:name) ? exchange.name : exchange 318: 319: # ignore the :nowait option if passed, otherwise program will hang waiting for a 320: # response that will not be sent by the server 321: opts.delete(:nowait) 322: 323: client.send_frame( 324: Qrack::Protocol09::Queue::Unbind.new({ :queue => name, 325: :exchange => exchange, 326: :routing_key => opts.delete(:key), 327: :nowait => false, 328: :deprecated_ticket => 0 }.merge(opts) 329: ) 330: ) 331: 332: method = client.next_method 333: 334: client.check_response(method, Qrack::Protocol09::Queue::UnbindOk, "Error unbinding queue #{name}") 335: 336: # return message 337: :unbind_ok 338: end
Cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer.
:unsubscribe_ok if successful
# File lib/bunny/queue09.rb, line 358 358: def unsubscribe(opts = {}) 359: # Default consumer_tag from subscription if not passed in 360: consumer_tag = subscription ? subscription.consumer_tag : opts[:consumer_tag] 361: 362: # Must have consumer tag to tell server what to unsubscribe 363: raise Bunny::UnsubscribeError, 364: "No consumer tag received" if !consumer_tag 365: 366: # Cancel consumer 367: client.send_frame( Qrack::Protocol09::Basic::Cancel.new(:consumer_tag => consumer_tag, 368: :nowait => false)) 369: 370: method = client.next_method 371: 372: client.check_response(method, Qrack::Protocol09::Basic::CancelOk, 373: "Error unsubscribing from queue #{name}") 374: 375: # Reset subscription 376: @subscription = nil 377: 378: # Return confirmation 379: :unsubscribe_ok 380: 381: end