SpringCloud 添加消息处理程序,构建并运行
修改com.example.loggingconsumer.LoggingConsumerApplication
类,如下所示:
@SpringBootApplication @EnableBinding(Sink.class) public class LoggingConsumerApplication { public static void main(String[] args) { SpringApplication.run(LoggingConsumerApplication.class, args); } @StreamListener(Sink.INPUT) public void handle(Person person) { System.out.println("Received: " + person); } public static class Person { private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } public String toString() { return this.name; } } }
从前面的清单中可以看到:
- 我们已经使用
@EnableBinding(Sink.class)
启用了Sink
绑定(输入无输出)。这样做会向框架发出信号,以启动对消息传递中间件的绑定,在该消息传递中间件自动创建绑定到Sink.INPUT
通道的目的地(即队列,主题和其他)。 - 我们添加了一个
handler
方法来接收类型为Person
的传入消息。这样做可以使您看到框架的核心功能之一:它尝试自动将传入的消息有效负载转换为类型Person
。
您现在有了一个功能齐全的Spring Cloud Stream应用程序,该应用程序确实侦听消息。为了简单起见,我们从这里开始,假设您在第一步中选择了RabbitMQ 。假设已经安装并运行了RabbitMQ,则可以通过在IDE中运行其main
方法来启动应用程序。
您应该看到以下输出:
--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672] --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . . . . . --- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg . . . --- [ main] c.e.l.LoggingConsumerApplication : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)
转到RabbitMQ管理控制台或任何其他RabbitMQ客户端,然后向input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
发送消息。anonymous.CbMIwdkJSBO1ZoPDOtHtCg
部分代表组名并已生成,因此在您的环境中它一定是不同的。对于更可预测的内容,可以通过设置spring.cloud.stream.bindings.input.group=hello
(或您喜欢的任何名称)来使用显式组名。
消息的内容应为Person
类的JSON表示形式,如下所示:
{"name":"Sam Spade"}
然后,在控制台中,您应该看到:
Received: Sam Spade
您还可以将应用程序生成并打包到引导jar中(使用./mvnw clean install
),并使用java -jar
命令运行生成的JAR。
现在,您有了一个正在运行的(尽管非常基础的)Spring Cloud Stream应用程序。
更多建议: