/ Spring Cloud  

Spring Cloud 11: Message-driven development - Stream

Message-driven development has become one of the essential development methods for microservices architecture. This is because, 1. interface call development in traditional monolithic architecture development does not exist under the microservices architecture; 2. the development of microservices architecture requires reducing the direct dependency coupling of each microservice. If one microservice directly calls another one, then the two microservices will have a strong coupling through dependencies; 3. microservice’s autonomy principle also strongly requires that microservices cannot call each other. Therefore, message-driven development has become an inevitable trend in the development of microservice architectures.

Let’s look at a scenario in the example project:

  • Product-Service-Consumer microservices need to be able to achieve autonomy and minimize dependence on Procuct-Service;
  • Product-Service-Consumer microservices need to ensure the efficiency of the service, the development team decided to cache the product data, so that only when the first load the product microservice is required to be called. When the user requests the product next time, the cache can be retrieved, thereby improving the efficiency of the service (as for the use of memory or Redis to implement caching, this is up to you).

If it is implemented according to the above scenario, the system can work stably in most cases. What should we do once the product is modified, we will never call the Product-Service-Consumer microservice in the Procuct-Service microservice. Is this makes the coupling tighter? Well, yes, now we can let the messages go at this time.

By introducing the message, the system architecture of our example project will become as shown in the following figure:



Spring Cloud have provided a very good set of components for the message development-Stream.

Example project

We need to modify the example project, which includes the following three steps:

  1. Install Kafka server;
  2. Change Product-Service microservices to send product messages;
  3. Change Product-Service-Consumer microservice to subscribe product messages.

Install Kafka server

Our example will use Kafka as the message broker. Kafka message broker is very light and efficient. If you don’t want to use Kafka then you can complete the docking with RabbitMQ as well, and the specific implementation of the business code does not need to be changed.

How to install and run the Kafka server is not described in detail here, I will have another post describing it. There are also very good documents on the Internet and the official, for example, the official documentation.

Product-Service

1. Add Stream dependency

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

2. application.properties

1
2
3
4
5
6
7
# =====================================================================================================================
# == stream / kafka ==
# =====================================================================================================================
spring.cloud.stream.bindings.output.destination=product-topic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.brokers=localhost
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092

Here we mainly set the address of the Kafka server, and which topic to send product messages. Here it is set to product-topic. However, we do not need to configure these, if not configured, then Stream will connect to Kafka server and create the corresponding topic according to the default configuration. However, the prerequisite for not configuring is that you did not change the port when installing the Kakfa server, and that the Kafka server and the Product-Service microservice are on the same server.

3. Construct product message

When the product configuration changes, such as: modification, deletion, etc., a product message needs to be constructed, and then the message can be sent to the corresponding listening microservice for processing through Kafka. Therefore, the product message code to be constructed is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class ProductMessage {
/** Message type:update, value: {@value} */
public static final String MA_UPDATE = "update";
/** Message type:delete, value: {@value} */
public static final String MA_DELETE = "delete";

// ========================================================================
// fields =================================================================
private String action;
private String itemCode;

public ProductMessage() {
}

public ProductMessage(String action, String itemCode) {
this.action = action;
this.itemCode = itemCode;
}

@Override
public String toString() {
return "action: " + this.action + " itemCode: " + this.itemCode;
}

public String getAction() {
return action;
}

public void setAction(String action) {
this.action = action;
}

public String getItemCode() {
return itemCode;
}

public void setItemCode(String itemCode) {
this.itemCode = itemCode;
}
}

The product message is very simple and contains only two fields: action and itemCode. The meaning represented is as follows:

  • action: indicates what the message is, such as a product update message or a product delete message;
  • itemCode: the item number (or product ID) of the item that was changed or deleted.

You may be wondering here, why the product message only contains these two fields, is it enough? In general, these two fields are enough for the message, but in the formal production environment, we will add other fields, which will not be discussed here. In addition, when the listener listens to the message, it can perform related processing according to the message type and the item code of the product. For example, the Product-Service-Consunmer microservice will re-load the product information by remotely requesting the product microservice according to the product item code.

4. Implement message sending

When the user changes or deletes the product in the Product-Service microservice, the above product message needs to be constructed and sent. The corresponding code is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Service
public class ProductService {
private Logger logger = LoggerFactory.getLogger(ProductService.class);

private Source source;
private List<ProductDto> productList;

@Autowired
public ProductService(Source source) {
this.source = source;
this.productList = this.buildProducts();
}

/**
* Get product list
*/
public List<ProductDto> findAll() {
return this.productList;
}

/**
* get product by item
*/
public ProductDto findOne(String itemCode) {
for (ProductDto productDto : this.productList) {
if (productDto.getItemCode().equalsIgnoreCase(itemCode))
return productDto;
}

return null;
}

/**
* update or save product
*/
public ProductDto save(ProductDto productDto) {
for (ProductDto sourceProductDto : this.productList) {
if (sourceProductDto.getItemCode().equalsIgnoreCase(productDto.getItemCode())) {
sourceProductDto.setName(sourceProductDto.getName() + "-new");
sourceProductDto.setPrice(sourceProductDto.getPrice() + 100);
productDto = sourceProductDto;
break;
}
}

// send message
this.sendMsg(ProductMessage.MA_UPDATE, productDto.getItemCode());

return productDto;
}

/**
* Implement send message
*/
protected void sendMsg(String msgAction, String itemCode) {
ProductMessage productMsg = new ProductMessage(msgAction, itemCode);
this.logger.debug("Send message:{} ", productMsg);

// 发送消息
this.source.output().send(MessageBuilder.withPayload(productMsg).build());
}

protected List<ProductDto> buildProducts() {
List<ProductDto> products = new ArrayList<>();
products.add(new ProductDto("item-1", "test-1", "brand1", 100));
products.add(new ProductDto("item-2", "test-2", "brand2", 200));
products.add(new ProductDto("item-3", "test-3", "brand3", 300));
products.add(new ProductDto("item-4", "test-4", "brand4", 400));
products.add(new ProductDto("item-5", "test-5", "brand5", 500));
products.add(new ProductDto("item-6", "test-6", "brand6", 600));

return products;
}
}

Sending a message is very simple, we just need to call the source.output().send() method to send the message. You may be a little confused here, what is the source, and where does it pop out. Don’t worry, now you just need to understand that this is an abstract message sending interface provided by Spring Cloud Stream. You can get a message sending channel through output() in this interface, and then you can send() the message to that channel. We will talk about the specific principle later.

5. Add message sending endpoint

We need to add an endpoint to simulate user saving/updating product information. In the above code, we can know that when we save/update the product information, the product message will be sent, so the new endpoint only needs to call this method. The specific code is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@RestController
@RequestMapping("/products")
public class ProductEndpoint {
private static final Logger LOGGER = LoggerFactory.getLogger(ProductEndpoint.class);

@Autowired
ProductService productService;

@Value("${server.port}")
private int serverPort = 0;

@RequestMapping(method = RequestMethod.GET)
public List<ProductDto> list() {
LOGGER.info("Server port {}", serverPort);

return productService.findAll();
}

@RequestMapping(value = "/{itemCode}", method = RequestMethod.GET)
public ProductDto detail(@PathVariable String itemCode) {
LOGGER.info("Server port {}", serverPort);

return productService.findOne(itemCode);
}

@RequestMapping(value = "/{itemCode}", method = RequestMethod.POST)
public ProductDto save(@PathVariable String itemCode) {
ProductDto productDto = this.productService.findOne(itemCode);

if (null != productDto) {
this.productService.save(productDto);
}

return productDto;
}
}

6. Binding Message Channel

Lastly, we need to bind Kafka message broker when the microservice starts. The implementation code is as follows:

1
2
3
4
5
6
7
8
@EnableDiscoveryClient
@EnableBinding(Source.class)
@SpringBootApplication
public class ProductServiceApplication {
public static void main(String[] args) {
SpringApplication.run(ProductServiceApplication.class, args);
}
}

Just add a @EnableBinding(Source.class) annotation to the application main class.

Ok, now we have implemented the message sending of the Product-Service microservice. Let’s complete the message subscribbing in the Product-Service-Consumer microservice.

Product-Service-Consumer

1. Add Stream dependency

This is the same as Product-Service.

2. application.properties

1
2
3
4
5
6
7
8
# =====================================================================================================================
# == stream / kafka ==
# =====================================================================================================================
spring.cloud.stream.bindings.input.destination=product-topic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=mallWebGroup
spring.cloud.stream.kafka.binder.brokers=localhost
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092

This configuration is similar to the Product-Service microservice, but we need to change the output to input, indicating that the message input channel is configured here.

In addition, we also added a group configuration attribute. The specific meaning of this attribute will be explained later.

3. Copy ProductMessage to this project

4. Implement message subscribbing

Using Stream, we only need to add the @StreamListener annotation to the corresponding listening method. The specific code is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@EnableBinding(Sink.class)
public class ProductMessageListener {
protected Logger logger = LoggerFactory.getLogger(this.getClass());

@Qualifier("PRODUCT-SERVICE")
@Autowired
protected ProductService productService;

@StreamListener(Sink.INPUT)
public void onProductMessage(ProductMessage productMsg) {
if (ProductMessage.MA_UPDATE.equalsIgnoreCase(productMsg.getAction())) {
this.logger.debug("Receive product update message, item code: {}", productMsg.getItemCode());
// request new product info
Product productDto = this.productService.loadByItemCode(productMsg.getItemCode());
if (null != productDto)
this.logger.debug("Update product info:{}", productDto);
else
this.logger.debug("Item code:{} doesn't exist", productMsg.getItemCode());
} else if (ProductMessage.MA_DELETE.equalsIgnoreCase(productMsg.getAction())) {
this.logger.debug("Receive product delete message: {}", productMsg.getItemCode());
} else {
this.logger.debug("Unknown product info: {}", productMsg);
}
}
}

This code is very simple to implement a product message monitoring method onProductMsg, and in this method different processing according to the message type.

Similarly, for Sink, I won’t go into details here. You just need to understand that this is an abstract message listening interface provided by Spring Cloud Stream. When the interface class is added the @StreamListener annotation, Stream will send Kafka to add a message subscription. The subscribed message topic is the product-topic that we specified in the configuration file. When there is a message in the topic, Stream will deserialize the message in the topic into ProductMessage, and then execute the specific message listening method.

Testing

Start the service in order:

  1. Kafka
  2. Service-discovery
  3. Product-Service
  4. Product-Service-Consumer

In the console of the Product-Service microservice, you can see output similar to the following:

1
2
2020-03-01 13:46:07.599 DEBUG             44575 --- [nio-2100-exec-1] o.s.web.servlet.DispatcherServlet        : POST "/products/item-2", parameters={}
2020-03-01 13:46:07.656 INFO 44575 --- [nio-2100-exec-1] s.productservice.service.ProductService : Send message:action: update itemCode: item-2

From the output log, you can see that the product message has been sent to the message broker. If we look at the console of the Product-Service-Consumer microservice, we can see the output of the following figure:

1
2
2020-03-01 13:46:08.005  INFO             44577 --- [container-0-C-1] istener$$EnhancerBySpringCGLIB$$cb550e3b : Receive product update message, item code: item-2
2020-03-01 13:46:08.005 INFO 44577 --- [container-0-C-1] istener$$EnhancerBySpringCGLIB$$cb550e3b : Update product info:springclouddemo.mall.entity.Product@57708467

It can be seen from the log output that the Product-Service-Consumer microservice has been able to correctly receive the product message, and then re-requested and obtained the latest information of the product.

Details of Spring Cloud Stream

From the core principle of Spring Cloud Stream, Stream provides an abstraction layer for sending and receiving messages with message broker. Through this abstraction layer, the direct coupling between messaging in the business and the middleware actually used is stripped, so that we can easily interface with various message broker, and it is also very easy to replace the message broker. This is the same as using the ORM framework, which can smoothly switch between multiple databases.

Stream application model

Stream provides the following models



There are several core concepts on the model diagram:

  • Source: When you need to send a message, you need to go through the Source. The Source will serialize the message (POJO object) we want to send (the default is converted to a JSON format string), and then send these data to the Channel;
  • Sink: When we need to listen to the message, we need to use Sink. Sink is responsible for obtaining the message from the message Channel, deserializing the message into a message object (POJO object), and then handing it to the specific message monitoring process for processing;
  • Channel: message Channel is one of the abstractions of Stream. Usually we need to specify the topic queue name when sending or listening to the message broker, but once we need to change the topic name, we need to modify the code of message sending or message monitoring, but through Channel abstraction, our code only needs to use the Channel. The specific Channel corresponds to the topic, which can be specified in the configuration file, so that when the topic changes, we don’t need to make any changes to the code, so as to achieve the decoupling.
  • Binder: Another abstraction layer in Stream. Integration with different message broker can be achieved through different Binders. For example, in the above example, we are using Binder for Kafka. Binder provides a unified messaging interface, so that we can deploy different message broker according to actual needs, or adjust our configuration based on the messaging broker deployed in actual production.

Stream application principle

From the above, we understand the application model of Stream, and we are clear about the message sending logic and process. So how do we operate during the actual message sending and listening?

In use, Stream provides the following three annotations:

  • @Input: create a message input channel for message monitoring;
  • @Output: create a message output channel for message sending;
  • @EnableBinding: Establish a binding to the message channel.

When using it, we can create multiple channels through @Input and @Output. Using these two annotations to create channels is very simple. You only need to annotate them to the corresponding method of the interface, and you don’t need to implement the annotation specifically. . When the Stream framework is started, the corresponding implementation is generated through dynamic code generation technology based on these two annotations, and injected into the Spring application context, so that we can use it directly in the code.

Output annotation

For the @Output annotation, the return value of the annotated method must be MessageChannel, and MessageChannel is the channel for sending specific messages. For example, the following code:

1
2
3
4
5
6
7
public interface ProductSource {
@Output
MessageChannel hotProducts();

@Output
MessageChannel selectedProducts();
}

We can send messages through the message channel created by ProductSource.

Input annotation

For the @Input annotation, the return value of the annotated method must be a SubscribableChannel, and the SubscribableChannel is the channel for message monitoring. For example, the following code:

1
2
3
4
public interface ProductSink {
@Input
SubscribableChannel productOrders();
}

We can listen to messages through the message channel created by ProductSink.

About Input, Output out of the box

Maybe you are a bit confused. Before in the example, we used Source and Sink in the code. So what is the relationship between these two classes and the annotations above? Let’s take a look at the source code of these two interfaces:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Source
public interface Source {

String OUTPUT = "output";

@Output(Source.OUTPUT)
MessageChannel output();
}

// Sink
public interface Sink {

String INPUT = "input";

@Input(Sink.INPUT)
SubscribableChannel input();
}

@Input and @Output are annotations of the core application of Stream, while Source and Sink are just two interfaces provided by Stream for us out of the box. With or without these interfaces, we can use Stream normally.

In addition, Stream also provides an out-of-the-box interface Processor, the source code is as follows:

1
2
public interface Processor extends Source, Sink {
}

That is to say, Processor can be used as message sending and message listening at the same time. This interface will be very useful when we develop message pipeline type applications.

Custom message channel name

Earlier, we talk abbout message Channel is an abstraction of Stream. Through this abstraction, you can avoid coupling with the specific topic of message broker. So what is going on? From the Source and Sink source code, you can see that the annotated @Output and @Input annotations have a parameter, which is output and input respectively. At this time, you can observe our previous configuration:

1
2
3
4
5
6
7
8
# prodcut-service
spring.cloud.stream.bindings.output.destination=product-topic
spring.cloud.stream.bindings.output.content-type=application/json

# prodcut-service-consumer
spring.cloud.stream.bindings.input.destination=product-topic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=mallWebGroup

From the configuration, you can see the configuration of the destination property, which specifies the output and inout, respectively, the name of the message channel used in the Stream. Therefore, we can set the name of the message channel through these two annotations, such as:

1
2
3
4
5
6
7
public interface ProductProcessor {
@Output("pmsoutput")
MessageChannel productOutput();

@Input("pmsinput")
SubscribableChannel input();
}

In this way, when we use the ProductProcessor interface to implement message sending and listening, we need to configure it in the configuration file as follows:

1
2
3
4
5
6
7
8
# Send
spring.cloud.stream.bindings.pmsoutput.destination=product-topic
spring.cloud.stream.bindings.pmsoutput.content-type=application/json

# Subscribe
spring.cloud.stream.bindings.pmsinput.destination=product-topic
spring.cloud.stream.bindings.pmsinput.content-type=application/json
spring.cloud.stream.bindings.pmsinput.group=mallWebGroup

Binding

Now that both the message sending channel and the listening channel have been created, you can connect them to specific message broker to complete the sending and monitoring of messages, and the @EnableBinding annotation is used to implement this function. The specific usage is as follows:

1
2
3
4
5
6
7
8
9
10
11
// Send binding
@EnableBinding(Source.class)
public class Application {

}

// Listening
@EnableBinding(Sink.class)
public class ProductMsgListener {

}

It should be noted that @EnableBinding can bind multiple interfaces at the same time, as follows:

1
@EnableBinding(value={ProductSource.class, ProductSink.class})

Use channels directly

The code we sent message previously is as follows:

1
2
3
4
5
6
7
protected void sendMsg(String msgAction, String itemCode) {
ProductMessage productMsg = new ProductMessage(msgAction, itemCode);
this.logger.info("Send message:{} ", productMsg);

// Send
this.source.output().send(MessageBuilder.withPayload(productMsg).build());
}

Since the MessageChannel provided by @Output is used when the message is sent, can we use it directly? Indeed, this is possible. We can change the above code to the following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service
public class ProductService {
protected Logger logger = LoggerFactory.getLogger(ProductService.class);

private MessageChannel output;
private List<ProductDto> productList;

@Autowired
public ProductService(MessageChannel output) {
this.output = output;
this.productList = this.buildProducts();
}

// Ignore other code
protected void sendMsg(String msgAction, String itemCode) {
ProductMsg productMsg = new ProductMsg(msgAction, itemCode);
this.logger.debug("Send message:{} ", productMsg);

// 发送消息
this.output.send(MessageBuilder.withPayload(productMsg).build());
}
}

By default the Id of the MessageChannelBean created by Stream is the method name, but if we add a name definition to the @Output annotation:

1
2
3
4
public interface ProductSource {
@Output("pmsoutput")
MessageChannel output();
}

Then at this time Stream will use pmsoutput as the Bean’s Id, and our code needs to be as follows:

1
2
3
4
5
@Autowired
public ProductService(@Qualifier("pmsoutput") MessageChannel output) {
this.output = output;
this.productList = this.buildProducts();
}

Check out the source code here: Stream demo