hw日常 HW日报

 网络   2022-10-12 17:01   31
import com.xfgg.demo.channel.ShopChannel;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.support.MessageBuilder;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource; RestControllerpublic class ShopService { Resource(name ShopChannel.SHOP_OUTPUT) private MessageChannel sendShopMessageChannel; GetMapping( /sendMsg ) public String sendShopMessage(String content) { boolean isSendSuccess sendShopMessageChannel. send(MessageBuilder.withPayload(content).build()); return isSendSuccess ? 发送乐成 : 发送退步 ; StreamListener(ShopChannel.SHOP_INPUT) public void receive(Message String message) { System.out.println(message.getPayload());

定义煽动类

package com.xfgg.demo;import com.xfgg.demo.channel.ShopChannel;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.stream.annotation.EnableBinding; SpringBootApplication EnableBinding(ShopChannel.class)public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args);

定义application.yml文件

spring: application: name: shop-server cloud: stream: bindings: #配置自身定义的通道与哪其中间件交互 shop_input: #ShopChannel里Input以及Output的值 destination: xfgg #目的主旨 shop_output: destination: xfgg default-binder: kafka #默认的binder是kafka kafka: bootstrap-servers: localhost:9092 #kafka办事地方 consumer: group-id: consumer1 producer: key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer client-id: producer1server: port: 8100

正在办事上就把这些配置参加到办事C中就也许了 乐成的束缚了今天的残留课题 动机图

改动办事C来完结kafka纪录入参 把ShopChannel Sink Source迁徙到办事C中

hw日常 HW日报

改动UserAdminController

Resource(name ShopChannel.SHOP_OUTPUT) private MessageChannel sendShopMessageChannel; RequestMapping(value /getUser/{id} ) public User getUserById( PathVariable(value id ) int id,String content){ User user userService.getUserById(id); System.out.println(user); boolean isSendSuccess sendShopMessageChannel.send(MessageBuilder.withPayload(content).build()); return isSendSuccess?userService.getUserById(id) : null;

发明报错了 这个bug卡了半天 shit 搞了半天发明是kafka的课题 由于我以前也弄了一个不异配置的办事 因而会产生辩论导致这一个连贯没有上kafka 没法孕育损耗者以及破费者 这是kafka的课题 则休止kafka 移除kafka并煽动kafka 而且配置文件也写错了 最坑爹的是 写成.号 由于是从yml文件变换到properties文件 新的配置文件

server.port 8092spring.application.name turbine-ceureka.instance.prefer-ip-address trueeureka.instance.hostname ${spring.cloud.client.ip-address}eureka.instance.instance-id ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}eureka.client.service-url.defaultZone http://${eureka.instance.hostname}:8761/eureka/management.endpoints.web.exposure.include *feign.hystrix.enabled true#数据源相干配置spring.datasource.driver-class-name com.mysql.cj.jdbc.Driverspring.datasource.url jdbc:mysql://localhost:3306/kgc?useUnicode true characterEncoding UTF-8 zeroDateTimeBehavior convertToNull allowMultiQueries truespring.datasource.username rootspring.datasource.password 123456#映照xml文件mybatis.mapper-locations mapper/*.xmlmybatis.type-aliases-package com.xfgg.demo#redis配置#数据库索引spring.redis.database 0#办事器地方spring.redis.host 127.0.0.1#办事器端口spring.redis.port 6379#办事器连贯明码spring.redis.password spring.redis.jedis.pool.max-active 20spring.redis.jedis.pool.max-wait -1spring.redis.jedis.pool.max-idle 10spring.redis.jedis.pool.min-idle 0spring.redis.timeout 1000#kafka配置spring.kafka.bootstrap-servers localhost:9092spring.kafka.consumer.group-id consumer1spring.kafka.producer.key-serializer org.apache.kafka.common.serialization.ByteArraySerializerspring.kafka.producer.value-serializer org.apache.kafka.common.serialization.ByteArraySerializerspring.kafka.producer.client-id producer1spring.cloud.stream.bindings.shop_input.destination xfggspring.cloud.stream.bindings.shop_output.destination xfggspring.cloud.stream.default-binder kafka

kafka的配置详解

kafka损耗者配置项

kafka: producer: max-request-size: 10485760 bootstrap-servers: 10.80.111.214:9092 request-required-acks: 1 retries: 5 batch-size: 16384 linger: 1 buffer-memory: 134217728 block-on-buffer-full: false key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer

kafka损耗者配置到spring容器

Value( ${kafka.producer.max-request-size} ) private String maxRequestSize; Value( ${kafka.producer.bootstrap-servers} ) private String servers; Value( ${kafka.producer.request-required-acks} ) private String requiredAcks; Value( ${kafka.producer.retries} ) private String retries; Value( ${kafka.producer.batch-size} ) private String batchSize; Value( ${kafka.producer.linger} ) private String linger; Value( ${kafka.producer.buffer-memory} ) private String bufferMemory; Value( ${kafka.producer.key-serializer} ) private String key; Value( ${kafka.producer.value-serializer} ) private String value; Bean( kafkaTemplate ) public KafkaTemplate String, String kafkaTemplate() { return new KafkaTemplate String, String (producerFactory()); public ProducerFactory String, String producerFactory() { Map String, Object properties new HashMap ();

参数的全部意思 //重试恳求 假设恳求退步损耗者会主动重试 假设起用重试 则会有反复动态的大概性 properties.put(ProducerConfig.RETRIES_CONFIG,retries) //最大动态巨细 properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize); /** * Server告竣 producer request 前须要确认的数目。 acks 0时 producer没有会等待确认 直接推广到socket等待发送 * acks 1时 等待leader写到local log就行 acks all或acks -1时 等待isr中一切副本确认 留神 确认都是 broker * 领受到动态放入内存就直接前往确认 没有是须要等待数据写入磁盘后才前往确认 这也是kafka快的缘由 / properties.put(ProducerConfig.ACKS_CONFIG, requiredAcks); / * Producer也许将发往统一个Partition的数据做成一个Produce * Request发送恳求 即Batch批处置 以削减恳求次数 该值即为每次批处置的巨细。 * 其它每个Request恳求蕴含多个Batch 每个Batch对于应一个Partition 且一个Request发送的想法Broker均为这些partition的leader副本。 * 若将该值设为0 则没有会施行批处置 **/ properties.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);

/** * 默认缓冲可马上发送 即遍缓冲空间还没有满 不过 假设你想削减恳求的数目 也许树立linger.ms大于0。 * 这将差遣损耗者发送恳求以前等待一段时光 指望更多的动态增添到未满的批中。这一致于TCP的算法 比如下面的代码段 * 大概100条动态正在一个恳求发送 由于咱们树立了linger(迁延)时光为1毫秒 然后 假设咱们没有填满缓冲区 * 这个树立将推广1毫秒的迟延恳求以等待更多的动态。 须要留神的是 正在高负载下 相近的时光普通也会组成批 即使是 * linger.ms 0。正在没有处于高负载的状况下 假设树立比0大 以少许的迟延价值调换更少的 更无效的恳求。 / properties.put(ProducerConfig.LINGER_MS_CONFIG, linger); / * 掌握损耗者可用的缓存总量 假设动态发送速率比其传输到办事器的快 将会耗尽这个缓存空间。 * 当缓存空间耗尽 其他发送挪用将被阻滞 阻滞时光的阈值经过max.block.ms设定 之后它将抛出一个TimeoutException。 */ properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, key); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, value); return new DefaultKafkaProducerFactory String, String (properties

收起 进展全文
本文地址:http://yz.ziyouea.com/p/30271.html
版权声明:本站文章来自网络,如有违规侵权请联系我们下架。