SpringCloud 用法示例

2023-11-28 16:27 更新

在本节中,我们将说明针对特定方案使用前面的属性。

示例:将autoCommitOffset设置为false并依靠手动进行

此示例说明了如何在用户应用程序中手动确认偏移。

本示例要求将spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset设置为false在您的示例中使用相应的输入通道名称。

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}

示例:安全配置

Apache Kafka 0.9支持客户端和代理之间的安全连接。要利用此功能,请遵循Apache Kafka文档中的准则以及Confluent文档中的Kafka 0.9 安全准则使用spring.cloud.stream.kafka.binder.configuration选项为活页夹创建的所有客户端设置安全性属性。

例如,要将security.protocol设置为SASL_SSL,请设置以下属性:

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

可以以类似方式设置所有其他安全属性。

使用Kerberos时,请遵循参考文档中的说明来创建和引用JAAS配置。

Spring Cloud Stream支持通过使用JAAS配置文件并使用Spring Boot属性将JAAS配置信息传递到应用程序。

使用JAAS配置文件

可以使用系统属性为Spring Cloud Stream应用程序设置JAAS和(可选)krb5文件位置。以下示例显示如何通过使用JAAS配置文件使用SASL和Kerberos启动Spring Cloud Stream应用程序:

 java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
   --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用Spring Boot Properties

作为使用JAAS配置文件的替代方法,Spring Cloud Stream提供了一种通过使用Spring Boot属性为Spring Cloud Stream应用程序设置JAAS配置的机制。

以下属性可用于配置Kafka客户端的登录上下文:

spring.cloud.stream.kafka.binder.jaas.loginModule

登录模块名称。正常情况下无需设置。

默认值:com.sun.security.auth.module.Krb5LoginModule

spring.cloud.stream.kafka.binder.jaas.controlFlag

登录模块的控制标志。

默认值:required

spring.cloud.stream.kafka.binder.jaas.options

使用包含登录模块选项的键/值对进行映射。

默认值:空地图。

以下示例显示如何使用Spring Boot配置属性使用SASL和Kerberos启动Spring Cloud Stream应用程序:

 java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.autoCreateTopics=false \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
   --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
   --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
   --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
   --spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM

前面的示例表示以下JAAS文件的等效项:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="[email protected]";
};

如果所需的主题已经存在于代理上或将由管理员创建,则可以关闭自动创建,仅需要发送客户端JAAS属性。

 请勿在同一应用程序中混合使用JAAS配置文件和Spring Boot属性。如果-Djava.security.auth.login.config系统属性已经存在,则Spring Cloud Stream将忽略Spring Boot属性。

 autoCreateTopicsautoAddPartitions与Kerberos一起使用时要小心。通常,应用程序可能使用在Kafka和Zookeeper中没有管理权限的主体。因此,依靠Spring Cloud Stream创建/修改主题可能会失败。在安全的环境中,强烈建议您使用Kafka工具创建主题并以管理方式管理ACL。

示例:暂停和恢复使用方

如果希望暂停使用但不引起分区重新平衡,则可以暂停并恢复使用方。通过将Consumer作为参数添加到@StreamListener中,可以简化此操作。要恢复,需要为ListenerContainerIdleEvent实例使用ApplicationListener事件的发布频率由idleEventInterval属性控制。由于使用者不是线程安全的,因此必须在调用线程上调用这些方法。

以下简单的应用程序显示了如何暂停和恢复:

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@StreamListener(Sink.INPUT)
	public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
		System.out.println(in);
		consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
	}

	@Bean
	public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
		return event -> {
			System.out.println(event);
			if (event.getConsumer().paused().size() > 0) {
				event.getConsumer().resume(event.getConsumer().paused());
			}
		};
	}

}
以上内容是否对您有帮助:
在线笔记
App下载
App下载

扫描二维码

下载编程狮App

公众号
微信公众号

编程狮公众号