博学而笃志 切问而近思 仁在其中
详情
Spring Kafka和Spring Boot配置
作者:--     发布时间:2019-11-20     评论:0     阅读:0

在下面的教程中,我们将演示如何使用spring boot配置spring kafka。 spring boot使用合理的默认配置spring kafka。并使用application.yml属性文件覆盖这些默认值。

项目设置

  • spring kafka: 2.1.4.release
  • spring boot: 2.0.0.release
  • apache kafka: kafka_2.11-1.0.0
  • maven: 3.5

此前已经学习了如何创建一个kafka消费者和生产者,它可以手动配置生产者和消费者。 在这个例子中,我们将使用spring boot使用合理的默认值来配置它们。

下载并安装apache kafka

要下载并安装apache kafka,请阅读官方文档( https://kafka.apache.org/quickstart )。 本教程假设服务器使用默认配置启动,并且没有更改服务器端口。

maven的依赖

这个项目中,使用apache maven来管理项目依赖关系。 确保以下依赖关系在类路径中。pom.xml 文件的内容如下所示 -

<project xmlns="http://maven.apache.org/pom/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
         xsi:schemalocation="http://maven.apache.org/pom/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelversion>4.0.0</modelversion>
    <groupid>com.h3.spring.kafka</groupid>
    <artifactid>springboot-config</artifactid>
    <version>1.0.0-snapshot</version>
    <url>https://www.h3.com</url>
    <description>spring kafka spring boot</description>
    <name>spring kafka - ${project.artifactid}</name>

    <parent>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-starter-parent</artifactid>
        <version>2.0.0.release</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceencoding>utf-8</project.build.sourceencoding>
        <spring-kafka.version>2.1.4.release</spring-kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter</artifactid>
        </dependency>
        <dependency>
            <groupid>org.springframework.kafka</groupid>
            <artifactid>spring-kafka</artifactid>
            <version>${spring-kafka.version}</version>
        </dependency>

        <!-- testing -->
        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-test</artifactid>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupid>org.springframework.kafka</groupid>
            <artifactid>spring-kafka-test</artifactid>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupid>org.springframework.boot</groupid>
                <artifactid>spring-boot-maven-plugin</artifactid>
            </plugin>
        </plugins>
        <defaultgoal>compile</defaultgoal>
    </build>

</project>

整个项目的目录结构如下所示 -

使用spring boot发送spring kafka消息

spring boot根据application.yml属性文件中配置的属性自动配置并初始化kafkatemplate。 通过使用@service注解,使sender类符合spring容器的要求来执行自动发现。

sender.java 的代码如下所示 -

package com.h3.kafka;

import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.value;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.stereotype.service;

@service
public class sender {

    private static final logger log = loggerfactory.getlogger(sender.class);

    @autowired
    private kafkatemplate<string, string> kafkatemplate;

    @value("${app.topic.foo}")
    private string topic;

    public void send(string message){
        log.info("sending message='{}' to topic='{}'", message, topic);
        kafkatemplate.send(topic, message);
    }
}

用spring boot接收kafka消息

concurrentkafkalistenercontainerfactorykafkamessagelistenercontainer bean也由spring boot自动配置。 可以选择使用application.yml属性文件来配置这些bean。

通过使用@kafkalistener来注解一个方法spring kafka会自动创建一个消息监听器容器。

receiver.java 实现的代码如下所示 -

package com.h3.kafka;

import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.messaging.messageheaders;
import org.springframework.messaging.handler.annotation.headers;
import org.springframework.messaging.handler.annotation.payload;
import org.springframework.stereotype.service;

@service
public class receiver {

    private static final logger log = loggerfactory.getlogger(receiver.class);

    @kafkalistener(topics = "${app.topic.foo}")
    public void receive(@payload string message,
                        @headers messageheaders headers) {
        log.info("received message='{}'", message);
        headers.keyset().foreach(key -> log.info("{}: {}", key, headers.get(key)));
    }

}

使用application.yml配置应用程序

spring boot会尝试根据pom.xml文件中指定的依赖关系自动配置应用程序,并设置合理的默认值。这里还没有配置任何consumer,producer或kafkatemplate bean,spring引导将使用spring引导默认值自动配置它们。 这些值可以使用application.yml属性文件重写。可以找到更多关于spring boot kafka properties的信息

还创建了一个在src/main/resources文件夹中的application.yml属性文件。 这些属性通过spring引导注入到配置类中。

spring:
  kafka:
    consumer:
      group-id: foo
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.stringdeserializer
      value-deserializer: org.apache.kafka.common.serialization.stringdeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.stringserializer
      value-serializer: org.apache.kafka.common.serialization.stringserializer

app:
  topic:
    foo: foo.t

logging:
  level:
    root: warn
    org.springframework.web: info
    com.h3: debug

运行应用程序

在运行这个项目程序之前,需要运行 zookeeperkafka ,如下所示 -

启动zookeeper服务 -

d:\software\kafka_2.12-1.0.1\bin\windows> zookeeper-server-start.bat d:\software\kafka_2.12-1.0.1\config\zookeeper.properties

启动kafka服务 -

d:\software\kafka_2.12-1.0.1\bin\windows> kafka-server-start.bat d:\software\kafka_2.12-1.0.1\config\server.properties

最后,编写了一个简单的spring boot应用程序来演示应用程序。使这个演示工作,需要在端口9092上运行的本地主机上的kafka服务器,这是kafka的默认配置。

package com.h3.kafka;

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.commandlinerunner;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;

@springbootapplication
public class springkafkaapplication implements commandlinerunner {

    public static void main(string[] args) {
        springapplication.run(springkafkaapplication.class, args);
    }

    @autowired
    private sender sender;

    @override
    public void run(string... strings) throws exception {
        sender.send("spring kafka and spring boot configuration example");
    }

示例

当运行应用程序时,应该得到以下输出。


  .   ____          _            __ _ _
 /\\\\ / ___'_ __ _ _(_)_ __  __ _ \\ \\ \\ \\
( ( )\\___ | '_ | '_| | '_ \\/ _` | \\ \\ \\ \\
 \\\\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: spring boot ::        (v2.0.0.release)

2018-03-14 11:22:25.177  info 2892 --- [           main] com.h3.kafka.springkafkaapplication  : starting springkafkaapplication on my-pc with pid 2892 (f:\\worksp\\spring-kafka\\springboot-config\\target\\classes started by administrator in f:\\worksp\\spring-kafka\\springboot-config)
2018-03-14 11:22:25.181 debug 2892 --- [           main] com.h3.kafka.springkafkaapplication  : running with spring boot v2.0.0.release, spring v5.0.4.release
2018-03-14 11:22:25.182  info 2892 --- [           main] com.h3.kafka.springkafkaapplication  : no active profile set, falling back to default profiles: default
2018-03-14 11:22:26.869  info 2892 --- [           main] com.h3.kafka.springkafkaapplication  : started springkafkaapplication in 2.208 seconds (jvm running for 2.751)
2018-03-14 11:22:26.871  info 2892 --- [           main] com.h3.kafka.sender                  : sending message='spring kafka and spring boot configuration example' to topic='foo.t'
... ...
2018-03-14 11:22:36.035  warn 2892 --- [ntainer#0-0-c-1] org.apache.kafka.clients.networkclient   : [consumer clientid=consumer-1, groupid=foo] error while fetching metadata with correlation id 10 : {foo.t=leader_not_available}
2018-03-14 11:22:36.156  warn 2892 --- [ad | producer-1] org.apache.kafka.clients.networkclient   : [producer clientid=producer-1] error while fetching metadata with correlation id 7 : {foo.t=leader_not_available}
2018-03-14 11:22:36.163  warn 2892 --- [ntainer#0-0-c-1] org.apache.kafka.clients.networkclient   : [consumer clientid=consumer-1, groupid=foo] error while fetching metadata with correlation id 12 : {foo.t=leader_not_available}
2018-03-14 11:22:36.433  warn 2892 --- [ad | producer-1] org.apache.kafka.clients.networkclient   : [producer clientid=producer-1] error while fetching metadata with correlation id 8 : {foo.t=leader_not_available}
2018-03-14 11:22:36.436  warn 2892 --- [ntainer#0-0-c-1] org.apache.kafka.clients.networkclient   : [consumer clientid=consumer-1, groupid=foo] error while fetching metadata with correlation id 14 : {foo.t=leader_not_available}
2018-03-14 11:22:38.559  warn 2892 --- [ntainer#0-0-c-1] org.apache.kafka.clients.networkclient   : [consumer clientid=consumer-1, groupid=foo] error while fetching metadata with correlation id 16 : {foo.t=leader_not_available}
2018-03-14 11:22:38.559  warn 2892 --- [ad | producer-1] org.apache.kafka.clients.networkclient   : [producer clientid=producer-1] error while fetching metadata with correlation id 9 : {foo.t=leader_not_available}
2018-03-14 11:22:40.028  warn 2892 --- [ad | producer-1] org.apache.kafka.clients.networkclient   : [producer clientid=producer-1] error while fetching metadata with correlation id 10 : {foo.t=leader_not_available}
2018-03-14 11:22:56.203  info 2892 --- [ntainer#0-0-c-1] com.h3.kafka.receiver                : received message='spring kafka and spring boot configuration example'
2018-03-14 11:22:56.205  info 2892 --- [ntainer#0-0-c-1] com.h3.kafka.receiver                : kafka_offset: 0
2018-03-14 11:22:56.206  info 2892 --- [ntainer#0-0-c-1] com.h3.kafka.receiver                : kafka_nativeheaders: recordheaders(headers = [], isreadonly = false)
2018-03-14 11:22:56.206  info 2892 --- [ntainer#0-0-c-1] com.h3.kafka.receiver                : kafka_consumer: org.apache.kafka.clients.consumer.kafkaconsumer@68cba188
2018-03-14 11:22:56.206  info 2892 --- [ntainer#0-0-c-1] com.h3.kafka.receiver                : kafka_timestamptype: create_time
2018-03-14 11:22:56.206  info 2892 --- [ntainer#0-0-c-1] com.h3.kafka.receiver                : kafka_receivedmessagekey: null
2018-03-14 11:22:56.207  info 2892 --- [ntainer#0-0-c-1] com.h3.kafka.receiver                : kafka_receivedpartitionid: 0
2018-03-14 11:22:56.207  info 2892 --- [ntainer#0-0-c-1] com.h3.kafka.receiver                : kafka_receivedtopic: foo.t
2018-03-14 11:22:56.207  info 2892 --- [ntainer#0-0-c-1] com.h3.kafka.receiver                : kafka_receivedtimestamp: 1520997760772


下一篇:测试
相关文章
loading......
最新动态
所有评论

loading......

网站声明:
本站部分内容来自网络,如您发现本站内容
侵害到您的利益,请联系本站管理员处理。
联系站长
373515719@qq.com
关于本站:
编程参考手册