RabbitMQ Topic Exchange Spring Boot Example
rabbitMQ
4
springboot
40
Male avatar

loveprogramming viết ngày 01/03/2021

https://loizenai.com/rabbitmq-topic-exchange-spring-boot/

  • Tutorial: "RabbitMQ Topic Exchange Spring Boot Example" + Github Sourcecode

Topic exchange is powerful and can behave like other exchanges. In the tutorial, I show you how to build an example "RabbitMQ Topic Exchange Spring Boot Example" with github sourcecode.

  • Technologies:
    – Java 8
    – Maven
    – Spring Tool Suite
    – Spring Boot
    – RabbitMQ

    RabbitMQ Topic Exchange

    routing_key of messages sent to a topic exchange must be a list of words, delimited by dots, example:

  • #.error

  • .prod.

  • sys.#

Note:
* (star) must be an exactly one word.
# (hash) can be zero or more words.

[caption id="attachment_4905" align="alignnone" width="668"]Springboot RabbitMQ Topic Exchange Architecture Springboot RabbitMQ Topic Exchange Architecture[/caption]

With the above topic exchange design,

  • when we send a message with routing key: sys.dev.info, it will just be delivered to Q1.
  • when we send a message with routing key: app.prod.error, it will just be delivered to Q2.
  • when we send a message with routing key: sys.test.error, it will be delivered to both queues {Q1, Q2}.

Topic exchange is strong tool and it can act as other exchanges as below:

  • When a queue is bound with "#" (hash) binding key - it is as an fanout exchange.
  • When don't use * & # in bindings, it will behave as a direct exchange.

    Practices - RabbitMQ Topic Exchange Spring Boot Example

    In the tutorial, we create 2 SpringBoot project as below:

[caption id="attachment_4906" align="alignnone" width="672"]Springboot RabbitMQ Topic Project Structures Springboot RabbitMQ Topic Project Structures[/caption]

  • Step to do:
  • Create SpringBoot projects
  • Define data model
  • Implement RabbitMq Producer
  • Implement RabbitMq consumer
  • Run and check results

    Create SpringBoot Projects

    Using SpringToolSuite, create 2 SpringBoot projects, then add need dependency spring-boot-starter-amqp:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Define Java Data Model

Create Log data model for both projects:


package com.loizenai.rabbitmq.model;

public class Log {
    private String content;
    private String routingKey;
    
    public Log(){};
    
    public Log(String content, String routingKey){
        this.content = content;
        this.routingKey = routingKey;
    }
    
    public String getContent(){
        return this.content;
    }
    
    public void setContent(String content){
        this.content = content;
    }
    
    public String getRoutingKey(){
        return this.routingKey;
    }
    
    public void setRoutingKey(String routingKey){
        this.routingKey = routingKey;
    }
    
    @Override
    public String toString() {
        return String.format("{content = %s, routingKey = %s}", content, routingKey);
    }
}

Configure SpringBoot RabbitMQ Producer - RabbitMQ Topic Exchange Spring Boot Example


package com.loizenai.rabbitmq.config;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {
    
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
    public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
}

Open application.properties, add configuration:


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
jsa.rabbitmq.exchange=jsa.exchange.logs
jsa.rabbitmq.queue=jsa.queue
jsa.rabbitmq.routingkey=jsa.routingkey

Implement SpringBoot RabbitMq Producer - RabbitMQ Topic Exchange Spring Boot Example


package com.loizenai.rabbitmq.producer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.loizenai.rabbitmq.model.Log;

@Component
public class Producer {
    
    @Autowired
    private AmqpTemplate amqpTemplate;
    
    @Value("${jsa.rabbitmq.exchange}")
    private String exchange;
    
    public void produce(Log logs){
        String routingKey = logs.getRoutingKey();
        amqpTemplate.convertAndSend(exchange, routingKey, logs);
        System.out.println("Send msg = " + logs);
    }
}

Implement SpringBoot RabbitMQ Producer Client


package com.loizenai.rabbitmq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.loizenai.rabbitmq.model.Log;
import com.loizenai.rabbitmq.producer.Producer;

@SpringBootApplication
public class SpringRabbitMqProducerApplication  implements CommandLineRunner{

    public static void main(String[] args) {
        SpringApplication.run(SpringRabbitMqProducerApplication.class, args);
    }
    
    @Autowired
    Producer producer;

    @Override
    public void run(String... args) throws Exception {
        
        /**
         *  1
         */
        String content = "2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52";
        String routingKey = "sys.dev.info";
        
        // send to RabbitMQ
        producer.produce(new Log(content, routingKey));
        
        /**
         *  2
         */
        content = "2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]]";
        routingKey = "sys.test.error";
        
        // send to RabbitMQ
        producer.produce(new Log(content, routingKey));
        
        /**
         *  3
         */
        content = "2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception";
        routingKey = "app.prod.error";
        
        // send to RabbitMQ
        producer.produce(new Log(content, routingKey));
    }
}

Configure SpringBoot RabbitMq Consumer


package com.loizenai.rabbitmq.config;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {
    
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory jsaFactory(ConnectionFactory connectionFactory,
            SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(jsonMessageConverter());
        return factory;
    }
}

Open application.properties, add configuration:


spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
jsa.rabbitmq.queue=jsa.logs.sys
#jsa.rabbitmq.queue=jsa.logs.prod.error

Implement Spring Boot Consumer


package com.loizenai.rabbitmq.consumer;

import org.apache.commons.logging.Log;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class Consumer {
    
    @RabbitListener(queues="${jsa.rabbitmq.queue}", containerFactory="jsaFactory")
    public void recievedMessage(Log logs) {
        System.out.println("Recieved Message: " + logs);
    }
}

Run and Check Results

  • Setup RabbitMq Exchange, Queues:

Enable rabbitmq_management by cmd: rabbitmq-plugins enable rabbitmq_management --online

Then go to: http://localhost:15672 -> login with user/password: guest/guest

[caption id="attachment_4907" align="alignnone" width="594"]Spring Boot RabbitMQ Topic Exchange Connection Spring Boot RabbitMQ Topic Exchange Connection[/caption]

  • Add exchange:

Go to http://localhost:15672/#/exchanges, add exchange: jsa.exchange.logs

[caption id="attachment_4908" align="alignnone" width="649"]Springboot Rabbitmq Topic Exchange - Create Exchange Springboot Rabbitmq Topic Exchange - Create Exchange[/caption]

  • Add Queue:

Go to http://localhost:15672/#/queues, add 2 queues: jsa.logs.sys, jsa.logs.prod.error.

[caption id="attachment_4909" align="alignnone" width="700"]SpringBoot RabbitMQ Topic 2 Queue 1 SpringBoot RabbitMQ Topic 2 Queue 1[/caption]

  • Binding the queues with above exchange:

[caption id="attachment_4910" align="alignnone" width="431"]SpringBoot RabbitMQ Topic Exchange Bindling SpringBoot RabbitMQ Topic Exchange Bindling[/caption]

Run & Check Results

  • Run SpringBoot-RabbitMQ-Producer with commandline mvn spring-boot:run.

  • Console's logs:


Send msg = {content = 2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52, routingKey = sys.dev.info}
Send msg = {content = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]], routingKey = sys.test.error}
Send msg = {content = 2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception, routingKey = app.prod.error}
  • See queues' status:

[caption id="attachment_4911" align="alignnone" width="700"]SpringBoot RabbitMQ Topic Message Queue After Sending SpringBoot RabbitMQ Topic Message Queue After Sending[/caption]

  • Run Spring-Boot-RabbitMQ-Consumer which listen to jsa.logs.sys queue with configuration: jsa.rabbitmq.queue=jsa.logs.sys:

  • Console's logs:


Recieved Message: {content = 2014-03-05 10:58:51.1  INFO 45469 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52, routingKey = sys.dev.info}
Recieved Message: {content = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]], routingKey = sys.test.error}
  • See queues's status:

[caption id="attachment_4912" align="alignnone" width="700"]SpringBoot RabbitMQ Topic After Consume in jsa.logs_.sys_ SpringBoot RabbitMQ Topic After Consume in jsa.logs_.sys_[/caption]

  • Run Spring-Boot-RabbitMQ-Consumer which listen to jsa.logs.prod.error queue with configuration: jsa.rabbitmq.queue=jsa.logs.prod.error:

  • Console's Logs:


Recieved Message: {content = 2017-10-10 10:57:51.10 ERROR in ch.qos.logback.core.joran.spi.Interpreter@4:71 - no applicable action for [springProperty], current ElementPath is [[configuration][springProperty]], routingKey = sys.test.error}
Recieved Message: {content = 2017-10-10 10:57:51.112  ERROR java.lang.Exception: java.lang.Exception, routingKey = app.prod.error}
  • See queues's status:

[caption id="attachment_4913" align="alignnone" width="700"]SpringBoot RabbitMQ Topic After Consume in jsa.logs_.prod_.error_ SpringBoot RabbitMQ Topic After Consume in jsa.logs_.prod_.error_[/caption]

Read More

Related posts:
Bình luận


White
{{ comment.user.name }}
Bỏ hay Hay
{{comment.like_count}}
Male avatar
{{ comment_error }}
Hủy
   

Hiển thị thử

Chỉnh sửa

Male avatar

loveprogramming

188 bài viết.
63 người follow
Kipalog
{{userFollowed ? 'Following' : 'Follow'}}
Cùng một tác giả
Male avatar
1 0
Tutorial Link: (Link) (Ảnh) Django is a Pythonbased free and opensource web framework that follows the modeltemplateview architectural pattern. A...
loveprogramming viết 5 tháng trước
1 0
Male avatar
1 0
https://loizenai.com/angular11nodejspostgresqlcrudexample/ Angular 11 Node.js PostgreSQL Crud Example (Ảnh) Tutorial: “Angular 11 Node.js Postg...
loveprogramming viết 4 tháng trước
1 0
Male avatar
1 0
Angular Spring Boot jwt Authentication Example Github https://loizenai.com/angularspringbootjwt/ (Ảnh) Tutorial: ” Angular Spring Boot jwt Authe...
loveprogramming viết 4 tháng trước
1 0
Bài viết liên quan
Male avatar
0 0
https://grokonez.com/frontend/angular/angular6/kotlinspringbootangular6crudhttpclientmysqlexamplespringdatajparestapisexample Kotlin Spring Boot +...
loveprogramming viết 6 ngày trước
0 0
Male avatar
0 0
https://grokonez.com/springframework/springmvc/angular4springbootcassandracrudexample no_toc]In this tutorial, grokonez shows you Angular Http Cli...
loveprogramming viết 1 tháng trước
0 0
{{like_count}}

kipalog

{{ comment_count }}

bình luận

{{liked ? "Đã kipalog" : "Kipalog"}}


Male avatar
{{userFollowed ? 'Following' : 'Follow'}}
188 bài viết.
63 người follow

 Đầu mục bài viết

Vẫn còn nữa! x

Kipalog vẫn còn rất nhiều bài viết hay và chủ đề thú vị chờ bạn khám phá!