/ Spring Cloud  

Spring Cloud 12: Message bus

Before we start talking about Spring Cloud Bus, let’s look at another IT term: ESB (Enterprise Service Bus). ESB is described in Wikipedia (https://en.wikipedia.org/wiki/Enterprise_service_bus):

An enterprise service bus (ESB) implements a communication system between mutually interacting software applications in a service-oriented architecture (SOA). It represents a software architecture for distributed computing, and is a special variant of the more general client-server model, wherein any application may behave as server or client. ESB promotes agility and flexibility with regard to high-level protocol communication between applications. Its primary use is in enterprise application integration (EAI) of heterogeneous and complex service landscapes.

Enterprise service bus usually provides an abstraction layer on the enterprise message system, so that the integration architect can use the value of the message to complete the integration without coding. In simple terms, the enterprise service bus is another abstraction layer that is built on top of the message middleware, so that we can complete the processing of business logic without caring about message-related processing.

At this point, have you suddenly understood the relationship between Spring Cloud Bus and Spring Cloud Stream? When you first know these two components, most of you will be confused about what is the difference between the two? What is the connection between them? Stream abstracts and encapsulates the message middleware to provide a unified interface for us to send and listen to messages, while Bus is abstracted and encapsulated again on the basis of Stream, so that we can complete the processing of business logic without understanding the basics of message sending and listening.

So how does Spring Cloud Bus do it for us? In a word, it is the event mechanism.

Spring’s event mechanism

There is an event mechanism in the Spring framework, which is an implementation of the observer pattern. Observer pattern establishes a kind of object-to-object relationship. When an object(called: observation target) changes, it will automatically notify other objects(called: observer), and these observers will make corresponding reaction. An observation target can correspond to multiple observers, and there is no mutual connection between these observers. You can add and delete observers as needed, making the system easier to expand. The following purposes can be achieved through the Spring event mechanism:

  • Decoupling between application modules;
  • You can define multiple processing methods for the same event according to your needs;
  • Not disturbing the main line application is an excellent Open and Close Principle (OCP) practice.

When we introduce event mechanism in our application, we need to use the following interfaces or abstract classes in Spring:

  • ApplicationEventPublisher: This is an interface for publishing an event;
  • ApplicationEvent: This is an abstract class used to define an event;
  • ApplicationListener <E extends ApplicationEvent>: This is an interface that implements event listening.

The context of the Spring application, ApplicationContext, implements the ApplicationEventPublisher interface by default, so when publishing events, we can directly use the ApplicationContext.publishEvent() method.

A typical Spring event sending and listening code is as follows:

Define event

For example, we define a user event:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class UserEvent extends ApplicationEvent {
/** Message type */
public static final String ET_UPDATE = "update";

// ========================================================================
// fields =================================================================
private String action;
private User user;

// ========================================================================
// constructor ============================================================
public UserEvent(User user) {
super(user);
this.user = user;
}

public UserEvent(User user, String action) {
super(user);
this.action = action;
this.user = user;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("action", this.getAction())
.add("user", this.getUser()).toString();
}

// ==================================================================
// setter/getter ====================================================
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}

public User getUser() {
return user;
}
public void setUser(User user) {
this.user = user;
}
}

Define listener

We define a user event listener and handle the event when the user changes:

1
2
3
4
5
6
7
8
9
10
@Component
public class UserEventListener implements ApplicationListener<UserEvent> {
protected Logger logger = LoggerFactory.getLogger(this.getClass());

@Override
public void onApplicationEvent(UserEvent userEvent) {
this.logger.debug("Received user event: {} ", userEvent);
// TODO: detail business logic
}
}

Event listening is relatively simple. You only need to implement the ApplicationListener interface and handle it accordingly.

Publish event

We can implement it directly in Event class. For example, we change the above UserEvent to the following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class UserEvent extends ApplicationEvent {
// ignore previous code

/**
* Pubblish event
*/
public void fire() {
ApplicationContext context = ApplicationContextHolder.getApplicationContext();
if(null != context) {
logger.debug("Publish event:{}", this);
context.publishEvent(this);
}else{
logger.warn("Can't obtain application context");
}
}
}

Then we can publish events with the following code where needed:

1
new UserEvent(user, UserEvent.ET_UPDATE).fire();

Spring Cloud Bus mechanism

We learned about Spring‘s event mechanism above, so how does Spring Cloud Bus combine the event mechanism with Stream? In summary, the mechanism is as follows:

  1. Add the @RemoteApplicationEventScan annotation to the application that needs to publish or listen to events. With this annotation we can start the binding of the message channel mentioned in the Stream;
  2. For event publishing, you need to extend the ApplicationEvent extension class-RemoteApplicationEvent. When this type of event is published through ApplicationContext.publishEvent(), Spring Cloud Bus will wrap the event, form a message we are familiar with, and then send it to the message broker through the default springCloudBus message channel;
  3. For event listeners, you don’t need to make any changes, and you can still listen to messages in the same way as above. However, it should be noted that the events defined in step 2 must also be defined in the consumer microservices project, and the entire class names need to be consistent (if they are inconsistent, a little extra work is needed).

With Spring Cloud Bus, we can develop like writing a monolithic application without having to deal with a lot of concepts such as message broker, topics, messages, channels, and so on.

Let’s look at how we can modify the Stream demo to incooperate Spring Cloud Bus.

Refactor Spring Cloud Stream demo

Refactor Product-Service

1. Add bus dependency

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

2. Create Product Event

We change the product message to an event with the following code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class ProductEvent extends RemoteApplicationEvent {
/** Message type:update, value: {@value} */
public static final String ET_UPDATE = "update";
/** Message type:delete, value: {@value} */
public static final String ET_DELETE = "delete";

// ========================================================================
// fields =================================================================
private String action;
private String itemCode;

public ProductEvent() {
super();
}

public ProductEvent(Object source, String originService, String destinationService, String action, String itemCode) {
super(source, originService, destinationService);
this.action = action;
this.itemCode = itemCode;
}

@Override
public String toString() {
return "action: " + this.action + " itemCode: " + this.itemCode;
}

public String getAction() {
return action;
}

public void setAction(String action) {
this.action = action;
}

public String getItemCode() {
return itemCode;
}

public void setItemCode(String itemCode) {
this.itemCode = itemCode;
}
}

Here the difference in constructor is that you need to specify the originService and destinationService when constructing an event. For event publishers, originService is itself, and destinationService refers to those microservice instances that need to publish this event. The format of the destinationService configuration is: {serviceId}: {appContextId}. During configuration, serviceId and appContextId can use wildcards. If both variables use wildcards (*:**), the event will be published to all microservice instances. If only the appContextId is omitted, the event will only be published to all instances of the specified microservice. For example: userservice:**, the event will only be published to the userservice microservice.

3. Implement event publishing

We change the code in ProductService as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@Service
public class ProductService {
private Logger logger = LoggerFactory.getLogger(ProductService.class);

private List<ProductDto> productList;

@Autowired
BusProperties busProperties;

@Autowired
public ProductService() {
this.productList = this.buildProducts();
}

/**
* Get product list
*/
public List<ProductDto> findAll() {
return this.productList;
}

/**
* get product by item
*/
public ProductDto findOne(String itemCode) {
for (ProductDto productDto : this.productList) {
if (productDto.getItemCode().equalsIgnoreCase(itemCode))
return productDto;
}

return null;
}

/**
* update or save product
*/
public ProductDto save(ProductDto productDto) {
for (ProductDto sourceProductDto : this.productList) {
if (sourceProductDto.getItemCode().equalsIgnoreCase(productDto.getItemCode())) {
sourceProductDto.setName(sourceProductDto.getName() + "-new");
sourceProductDto.setPrice(sourceProductDto.getPrice() + 100);
productDto = sourceProductDto;
break;
}
}

// publish event
this.fireEvent(ProductEvent.ET_UPDATE, productDto);

return productDto;
}

/**
* Implement publish event
*/
protected void fireEvent(String eventAction, ProductDto productDto) {
ProductEvent productEvent = new ProductEvent(productDto,
busProperties.getId(),
"*:**",
eventAction,
productDto.getItemCode());
this.logger.info("Publish event:{} ", productEvent);

// Publish
RemoteApplicationEventPublisher.publishEvent(productEvent);
}

protected List<ProductDto> buildProducts() {
List<ProductDto> products = new ArrayList<>();
products.add(new ProductDto("item-1", "test-1", "brand1", 100));
products.add(new ProductDto("item-2", "test-2", "brand2", 200));
products.add(new ProductDto("item-3", "test-3", "brand3", 300));
products.add(new ProductDto("item-4", "test-4", "brand4", 400));
products.add(new ProductDto("item-5", "test-5", "brand5", 500));
products.add(new ProductDto("item-6", "test-6", "brand6", 600));

return products;
}
}

The code of RemoteApplicationEventPublisher:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class RemoteApplicationEventPublisher {
protected static Logger logger = LoggerFactory.getLogger(RemoteApplicationEventPublisher.class);

public static void publishEvent(RemoteApplicationEvent event){
ApplicationContext context = ApplicationContextHolder.getApplicationContext();
if(null != context) {
context.publishEvent(event);
logger.info("Publish:{}", event);
}else{
logger.warn("Unable to get application context");
}
}
}

4. Main application

Finally, modify the main class and add the @RemoteApplicationEventScan annotation:

1
2
3
4
5
6
7
8
@EnableDiscoveryClient
@RemoteApplicationEventScan
@SpringBootApplication
public class ProductServiceApplication {
public static void main(String[] args) {
SpringApplication.run(ProductServiceApplication.class, args);
}
}

Note: the remote event must be defined in a subpackage of the class annotated by @RemoteApplicationEventScan annotation, otherwise remote event publishing cannot be achieved

Refactor Product-Service-Consumer

1. Add bus dependency

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

2. Copy ProductEvent to this project

3. Implement event listening

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Component
public class ProductEventListener implements ApplicationListener<ProductEvent> {
protected Logger logger = LoggerFactory.getLogger(this.getClass());

@Qualifier("productServiceFallback")
@Autowired
protected ProductService productService;

@Override
public void onApplicationEvent(ProductEvent productEvent) {
if (ProductEvent.ET_UPDATE.equalsIgnoreCase(productEvent.getAction())) {
this.logger.debug("Received update event itemCode: {}", productEvent.getItemCode());
// get new product info
Product productDto = this.productService.loadByItemCode(productEvent.getItemCode());
if (null != productDto)
this.logger.debug("Update product info:{}", productDto);
else
this.logger.debug("itemCode:{} no exist", productEvent.getItemCode());
} else if (ProductEvent.ET_DELETE.equalsIgnoreCase(productEvent.getAction())) {
this.logger.debug("Received delete event itemCode: {}", productEvent.getItemCode());
} else {
this.logger.debug("Unknown product event: {}", productEvent);
}
}
}

4. Main class

As with Product-Service, remote message scanning needs to be enabled for both event publishing and event monitoring. Add the @RemoteApplicationEventScan annotation directly to the main class

1
2
3
4
5
6
7
8
9
10
@EnableFeignClients
@EnableDiscoveryClient
@EnableCircuitBreaker
@RemoteApplicationEventScan
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}

Testing

Start service in the following order:

  1. Kafka server;
  2. Service-discovery;
  3. Product-Service microservice;
  4. Product-Service-Consumer microservices.

POST a HTTP request to: http://localhost:2100/products/item-2. In the console of the Product-Service microservice, you can see output similar to the following:

1
2
2020-03-06 23:40:26.498 DEBUG             82437 --- [nio-2100-exec-1] o.s.web.servlet.DispatcherServlet        : POST "/products/item-2", parameters={}
2020-03-06 23:40:26.517 INFO 82437 --- [nio-2100-exec-1] s.productservice.service.ProductService : Publish event:action: update itemCode: item-2

From the output log, you can see that the product event has been published. If at this time we look at the console of the Product-Service-Consumer microservice, we can see the output of the following:

1
2
2020-03-06 23:40:33.722  INFO             82441 --- [container-0-C-1] s.mall.event.ProductEventListener        : Received update event itemCode: item-2
2020-03-06 23:40:33.723 INFO 82441 --- [container-0-C-1] s.mall.event.ProductEventListener : Update product info:springclouddemo.mall.entity.Product@14a7134

From the log output, you can see that the Product-Service-Consumer microservice has been able to correctly receive the product change event and handle it accordingly.

Conclusion

It is indeed easier to understand and easier to use Bus from the refactored code. This is very good for simple applications, such as broadcasting. A typical application is configuration refresh in Config. When both Config and Bus are introduced into a project, configuration changes can be broadcasted via the /bus/refresh endpoint, allowing the corresponding microservice to reload configuration data.

Of course, another layer of the simplicity of Bus is that it is not flexible enough, so whether you use Bus or directly use Streams in your project depends on your needs.

Check out the source code here: Bus demo