54 CHEN

Rabbitmq Java Client Api详解

AMQP

基础概念快速入门

java client api

连接

1 2 3 4

ConnectionFactory factory = new ConnectionFactory();
  factory.setHost(hostName);
  Connection conn = factory.newConnection();
  Channel channel = conn.createChannel();

消息者线程池

1 2

ExecutorService es = Executors.newFixedThreadPool(20);
  Connection conn = factory.newConnection(es);

地址数组

1 2 3

Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
                                   , new Address(hostname2, portnumber2)};
  Connection conn = factory.newConnection(addrArr);

声明exchange与queue

1 2 3

channel.exchangeDeclare(exchangeName, "direct", true);
  String queueName = channel.queueDeclare().getQueue();
  channel.queueBind(queueName, exchangeName, routingKey);

发出消息

1 2

byte[] messageBodyBytes = "Hello, world!".getBytes();
  channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

线程安全

最简单的办法消费消息

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17

boolean autoAck = false;
  channel.basicConsume(queueName, autoAck, "myConsumerTag",
       new DefaultConsumer(channel) {
           @Override
           public void handleDelivery(String consumerTag,
                                      Envelope envelope,
                                      AMQP.BasicProperties properties,
                                      byte[] body)
               throws IOException
           {
               String routingKey = envelope.getRoutingKey();
               String contentType = properties.contentType;
               long deliveryTag = envelope.getDeliveryTag();
               // (process the message components here ...)
               channel.basicAck(deliveryTag, false);
           }
       });

零碎

原创文章如转载,请注明:转载自五四陈科学院[http://www.54chen.com]

Posted by 54chen amqp

« 小米互娱招java工程师 网络IO型程序压力测试要点 »