pom.xml
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <spring-cloud.version>Dalston.SR1</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
yml文件
spring: cloud: stream: defaultBinder: kafka bindings: issueInPutMessage: destination: devops-cca content-type: application/json group: default issueOutPutMessage: destination: devops-cca content-type: application/json producer: partitionKeyExpression: payload.id partitionCount: 2 kafka: binder: brokers: 10.40.64.53 zkNodes: 10.40.64.53 autoAddPartitions: true server: port: 9050
SenderApplication.java
@SpringBootApplication public class SenderApplication { public static void main(String[] args) { SpringApplication.run(SenderApplication.class, args); } }
MainController.java
@RestController @EnableBinding({IssueInputMessage.class, IssueOutputMessage.class}) public class MainController { @Autowired IssueOutputMessage issueOutputMessage; @Value("${spring.cloud.stream.bindings.issueOutPutMessage.producer.partitionCount}") private String partitionCount; private void sendMore() { for (int _index = 0; _index < 10; _index++) { int index = Integer.parseInt(String.valueOf(Math.round(Math.random() * Math.pow(10, partitionCount.length())) / Integer.parseInt(partitionCount))); IssueMessage chatMessage = new IssueMessage(); chatMessage.setMessage(String.format("message %s", _index)); chatMessage.setId(index); issueOutputMessage.issueOutPutMessage().send(MessageBuilder.withPayload(chatMessage).build()); } } private void sendOnlyOne() { int index = Integer.parseInt(String.valueOf(Math.round(Math.random() * Math.pow(10, partitionCount.length())) / Integer.parseInt(partitionCount))); IssueMessage chatMessage = new IssueMessage(); chatMessage.setMessage(String.format("message %s", Math.round(Math.random() * 10))); chatMessage.setId(index); issueOutputMessage.issueOutPutMessage().send(MessageBuilder.withPayload(chatMessage).build()); } @RequestMapping("/") public String index() { sendOnlyOne(); return "ok"; } @StreamListener(IssueInputMessage.ISSUE_INPUT_MESSAGE) public void analyzingCancel(Message<IssueMessage> message) { IssueMessage chatMessage = message.getPayload(); System.out.println(chatMessage.getMessage()); } }
public interface IssueInputMessage { String ISSUE_INPUT_MESSAGE = "issueInPutMessage"; @Input SubscribableChannel issueInPutMessage(); }
public class IssueMessage<T> { private int id; public int getId() { return id; } public void setId(int id) { this.id = id; } private T message; public T getMessage() { return message; } public void setMessage(T message) { this.message = message; } }
public interface IssueOutputMessage { @Output MessageChannel issueOutPutMessage(); }
相关推荐
spring cloud stream kafka 消息驱动集成
先启动消费者(kafka,kafka1) 在启动kafka2 自己安装好kafka 及zookeeper文件,有什么疑问,可以一起交流交流。
Spring Cloud系列教程 Spring Boot Spring Cloud Stream 和 Kafka案例教程 SpringCloud系列教程、SpringBoot、 Stream、Kafka、案例教程
卡夫卡春天云流 Apache Kafka的Spring Cloud Stream展示
spring-cloud-stream-app-descriptor-Celsius.SR3.stream-apps-kafka-10-docker
spring-cloud-stream-kafka:Spring Cloud Streams Kafka Avro
主要介绍了spring-cloud-stream结合kafka使用详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
spring cloud finchely euraka discovery ,message服务,注册到注册中心,集成spring cloud stream kafka,提供可供其他服务访问的接口,并且划分为不同的模块,欢迎下载学习和交流。
.使用Spring Cloud搭建服务注册中心 2.使用Spring Cloud搭建高可用服务注册中心 ...28.Spring Cloud Bus整合Kafka 29.Spring Cloud Stream初窥 30.Spring Cloud Stream使用细节 31.Spring Cloud系列勘误
包含内容: Spring Cloud系列教程 Spring Boot Spring Cloud Stream...springcloud-config-oracle-bus-kafka.zipspringcloud-feign.zip springcloud-producer.zip springcloud-producer-consumer.zip springcloudstudy.
BUS、Turbine、Zipkin、Cache、Spring Cloud Admin、API Gateway、ELK Spring Cloud Security、 Spring Cloud Stream Component: RoketMQ、Kafka、MongoDB、OSS、Redis、Swagger、Zuul、Label、BASE、Charts、...
Spring Cloud系列教程 | 第十一篇:Spring Boot Spring Cloud Stream 和 Kafka案例教程-附件资源
但是目前SpringCloudStream只支持RabbitMQ和Kafka的自动化配置。首先,我们通过一个简单的示例对SpringCloudStream有一个初步的认识。我们中间件使用RabbitMQ,创建spring-cloud-stream模块编辑pom.xml文件,引入...
Spring Cloud Stream示例应用程序该存储库包含使用Spring Cloud Stream编写的应用程序的集合。 所有的应用程序都是独立的。 它们可以与Kafka或RabbitMQ中间件技术一起运行。 您可以选择针对Kafka和Rabbit的本地或...
spring-cloud
SpringCloud Demo实例。 Demo中包含高可用Eureka/Zuul/Spring Config/Feign/Ribbon/Spring Cloud Stream(kafka)相关Demo
camunda-bpm-spring-boot Camunda BPM Spring Boot 集成 有用的资源 如何导入 REST 类(而不是 war 文件) 如果您想添加 Camunda BPM Rest API,您必须将以下依赖项添加到您的 Maven 构建中。 < groupId>org....
spring-cloud-sleuth-master/spring-cloud-security-master/spring-cloud-cluster-master/spring-cloud-stream-master...kafka-master/spring-cloud-stream-binder-rabbit-master/spring-cloud-stream-starters-master/