RabbitMQ Topic Exchange Spring Boot Example
rabbitMQ
10
springboot
164
Male avatar

loveprogramming viết ngày 01/03/2021

https://loizenai.com/rabbitmq-topic-exchange-spring-boot/

  • Tutorial: "RabbitMQ Topic Exchange Spring Boot Example" + Github Sourcecode

Topic exchange is powerful and can behave like other exchanges. In the tutorial, I show you how to build an example "RabbitMQ Topic Exchange Spring Boot Example" with github sourcecode.

  • Technologies:
    – Java 8
    – Maven
    – Spring Tool Suite
    – Spring Boot
    – RabbitMQ

    RabbitMQ Topic Exchange

    routing_key of messages sent to a topic exchange must be a list of words, delimited by dots, example:

  • #.error

  • .prod.

  • sys.#

Note:
* (star) must be an exactly one word.
# (hash) can be zero or more words.

[caption id="attachment_4905" align="alignnone" width="668"]Springboot RabbitMQ Topic Exchange Architecture Springboot RabbitMQ Topic Exchange Architecture[/caption]

With the above topic exchange design,

  • when we send a message with routing key: sys.dev.info, it will just be delivered to Q1.
  • when we send a message with routing key: app.prod.error, it will just be delivered to Q2.
  • when we send a message with routing key: sys.test.error, it will be delivered to both queues {Q1, Q2}.

Topic exchange is strong tool and it can act as other exchanges as below:

  • When a queue is bound with "#" (hash) binding key - it is as an fanout exchange.
  • When don't use * & # in bindings, it will behave as a direct exchange.

    Practices - RabbitMQ Topic Exchange Spring Boot Example

    In the tutorial, we create 2 SpringBoot project as below:

[caption id="attachment_4906" align="alignnone" width="672"]Springboot RabbitMQ Topic Project Structures Springboot RabbitMQ Topic Project Structures[/caption]

  • Step to do:
  • Create SpringBoot projects
  • Define data model
  • Implement RabbitMq Producer
  • Implement RabbitMq consumer
  • Run and check results

    Create SpringBoot Projects

    Using SpringToolSuite, create 2 SpringBoot projects, then add need dependency spring-boot-starter-amqp:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Define Java Data Model

Create Log data model for both projects:


package com.loizenai.rabbitmq.model;

public class Log {
    private String content;
    private String routingKey;
    
    public Log(){};
    
    public Log(String content, String routingKey){
        this.content = content;
        this.routingKey = routingKey;
    }
    
    public String getContent(){
        return this.content;
    }
    
    public void setContent(String content){
        this.content = content;
    }
    
    public String getRoutingKey(){
        return this.routingKey;
    }
    
    public void setRoutingKey(String routingKey){
        this.routingKey = routingKey;
    }
    
    @Override
    public String toString() {
        return String.format("{content = %s, routingKey = %s}", content, routingKey);
    }
}

Configure SpringBoot RabbitMQ Producer - RabbitMQ Topic Exchange Spring Boot Example


package com.loizenai.rabbitmq.config;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {
    
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
}

Open application.properties, add configuration:


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
jsa.rabbitmq.exchange=jsa.exchange.logs
jsa.rabbitmq.queue=jsa.queue
jsa.rabbitmq.routingkey=jsa.routingkey

Implement SpringBoot RabbitMq Producer - RabbitMQ Topic Exchange Spring Boot Example


package com.loizenai.rabbitmq.producer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.loizenai.rabbitmq.model.Log;

@Component
public class Producer {
    
    @Autowired
    private AmqpTemplate amqpTemplate;
    
    @Value("${jsa.rabbitmq.exchange}")
    private String exchange;
    
    public void produce(Log logs){
        String routingKey = logs.getRoutingKey();
        amqpTemplate.convertAndSend(exchange, routingKey, logs);
        System.out.println("Send msg = " + logs);
    }
}

Implement SpringBoot RabbitMQ Producer Client


package com.loizenai.rabbitmq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.loizenai.rabbitmq.model.Log;
import com.loizenai.rabbitmq.producer.Producer;

@SpringBootApplication
public class SpringRabbitMqProducerApplication  implements CommandLineRunner{

    public static void main(String[] args) {
        SpringApplication.run(SpringRabbitMqProducerApplication.class, args);
    }
    
    @Autowired
    Producer producer;

    @Override
    public void run(String... args) throws Exception {
        
        /**
         *  1
         */
        String content = "2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52";
        String routingKey = "sys.dev.info";
        
        // send to RabbitMQ
        producer.produce(new Log(content, routingKey));
        
        /**
         *  2
         */
        content = "2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]]";
        routingKey = "sys.test.error";
        
        // send to RabbitMQ
        producer.produce(new Log(content, routingKey));
        
        /**
         *  3
         */
        content = "2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception";
        routingKey = "app.prod.error";
        
        // send to RabbitMQ
        producer.produce(new Log(content, routingKey));
    }
}

Configure SpringBoot RabbitMq Consumer


package com.loizenai.rabbitmq.config;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {
    
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory jsaFactory(ConnectionFactory connectionFactory,
            SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(jsonMessageConverter());
        return factory;
    }
}

Open application.properties, add configuration:


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
jsa.rabbitmq.queue=jsa.logs.sys
#jsa.rabbitmq.queue=jsa.logs.prod.error

Implement Spring Boot Consumer


package com.loizenai.rabbitmq.consumer;

import org.apache.commons.logging.Log;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class Consumer {
    
    @RabbitListener(queues="${jsa.rabbitmq.queue}", containerFactory="jsaFactory")
    public void recievedMessage(Log logs) {
        System.out.println("Recieved Message: " + logs);
    }
}

Run and Check Results

  • Setup RabbitMq Exchange, Queues:

Enable rabbitmq_management by cmd: rabbitmq-plugins enable rabbitmq_management --online

Then go to: http://localhost:15672 -> login with user/password: guest/guest

[caption id="attachment_4907" align="alignnone" width="594"]Spring Boot RabbitMQ Topic Exchange Connection Spring Boot RabbitMQ Topic Exchange Connection[/caption]

  • Add exchange:

Go to http://localhost:15672/#/exchanges, add exchange: jsa.exchange.logs

[caption id="attachment_4908" align="alignnone" width="649"]Springboot Rabbitmq Topic Exchange - Create Exchange Springboot Rabbitmq Topic Exchange - Create Exchange[/caption]

  • Add Queue:

Go to http://localhost:15672/#/queues, add 2 queues: jsa.logs.sys, jsa.logs.prod.error.

[caption id="attachment_4909" align="alignnone" width="700"]SpringBoot RabbitMQ Topic 2 Queue 1 SpringBoot RabbitMQ Topic 2 Queue 1[/caption]

  • Binding the queues with above exchange:

[caption id="attachment_4910" align="alignnone" width="431"]SpringBoot RabbitMQ Topic Exchange Bindling SpringBoot RabbitMQ Topic Exchange Bindling[/caption]

Run & Check Results

  • Run SpringBoot-RabbitMQ-Producer with commandline mvn spring-boot:run.

  • Console's logs:


Send msg = {content = 2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52, routingKey = sys.dev.info}
Send msg = {content = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]], routingKey = sys.test.error}
Send msg = {content = 2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception, routingKey = app.prod.error}
  • See queues' status:

[caption id="attachment_4911" align="alignnone" width="700"]SpringBoot RabbitMQ Topic Message Queue After Sending SpringBoot RabbitMQ Topic Message Queue After Sending[/caption]

  • Run Spring-Boot-RabbitMQ-Consumer which listen to jsa.logs.sys queue with configuration: jsa.rabbitmq.queue=jsa.logs.sys:

  • Console's logs:


Recieved Message: {content = 2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52, routingKey = sys.dev.info}
Recieved Message: {content = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]], routingKey = sys.test.error}
  • See queues's status:

[caption id="attachment_4912" align="alignnone" width="700"]SpringBoot RabbitMQ Topic After Consume in jsa.logs_.sys_ SpringBoot RabbitMQ Topic After Consume in jsa.logs_.sys_[/caption]

  • Run Spring-Boot-RabbitMQ-Consumer which listen to jsa.logs.prod.error queue with configuration: jsa.rabbitmq.queue=jsa.logs.prod.error:

  • Console's Logs:


Recieved Message: {content = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]], routingKey = sys.test.error}
Recieved Message: {content = 2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception, routingKey = app.prod.error}
  • See queues's status:

[caption id="attachment_4913" align="alignnone" width="700"]SpringBoot RabbitMQ Topic After Consume in jsa.logs_.prod_.error_ SpringBoot RabbitMQ Topic After Consume in jsa.logs_.prod_.error_[/caption]

Read More

Related posts:
Bình luận


White
{{ comment.user.name }}
Bỏ hay Hay
{{comment.like_count}}
Male avatar
{{ comment_error }}
Hủy
   

Hiển thị thử

Chỉnh sửa

Male avatar

loveprogramming

545 bài viết.
98 người follow
Kipalog
{{userFollowed ? 'Following' : 'Follow'}}
Cùng một tác giả
Male avatar
1 0
Tutorial Link: (Link) (Ảnh) Django is a Pythonbased free and opensource web framework that follows the modeltemplateview architectural pattern. A...
loveprogramming viết hơn 1 năm trước
1 0
Male avatar
1 0
https://loizenai.com/angular11nodejspostgresqlcrudexample/ Angular 11 Node.js PostgreSQL Crud Example (Ảnh) Tutorial: “Angular 11 Node.js Postg...
loveprogramming viết hơn 1 năm trước
1 0
Male avatar
1 0
Angular Spring Boot jwt Authentication Example Github https://loizenai.com/angularspringbootjwt/ (Ảnh) Tutorial: ” Angular Spring Boot jwt Authe...
loveprogramming viết hơn 1 năm trước
1 0
Bài viết liên quan
Male avatar
0 0
RabbitMQ là gì? Với lập trình viên thì rabbitmq rất đáng giá, nếu không có các hệ thống message broker như rabbitmq thì bất cứ lúc nào cần đẩy dat...
thynguyen viết gần 2 năm trước
0 0
Male avatar
0 0
https://grokonez.com/springframework/springamqp/rabbitmqsendreceivejavaobjectmessagesspringrabbitmqspringboot RabbitMQ – How to send/receive Java ...
loveprogramming viết 1 năm trước
0 0
Male avatar
0 0
https://grokonez.com/javaintegration/distributedsystem/rabbitmqcreatespringrabbitmqpublishsubcribepatternspringboot RabbitMq – How to create Sprin...
loveprogramming viết 1 năm trước
0 0
{{like_count}}

kipalog

{{ comment_count }}

bình luận

{{liked ? "Đã kipalog" : "Kipalog"}}


Male avatar
{{userFollowed ? 'Following' : 'Follow'}}
545 bài viết.
98 người follow

 Đầu mục bài viết

Vẫn còn nữa! x

Kipalog vẫn còn rất nhiều bài viết hay và chủ đề thú vị chờ bạn khám phá!