首页新闻动态正文

springCloud学习(Spring-Cloud-Stream事件驱动)【黑马java培训】

更新时间:2019年07月26日 11时18分30秒 来源:黑马程序员论坛

想想平常生活中做饭的场景,在用电饭锅做饭的同时,我们可以洗菜、切菜,等待电饭锅发出饭做好的提示我们回去拔下电饭锅电源(或者什么也不知让它处于保温状态),反正这个时候我们知道饭做好了,接下来可以炒菜了。从这里可以看出我们在日常生活中与世界的互动并不是同步的、线性的,不是简单的请求--响应模型。它是事件驱动的,我们不断的发送消息、接受消息、处理消息。
  同样在软件世界中也不全是请求--响应模型,也会需要进行异步的消息通信。使用消息实现事件通信的概念被称为消息驱动架构(Event Driven Architecture,EDA),也被称为消息驱动架构(Message Driven Architecture,MDA)。使用这类架构可以构建高度解耦的系统,该系统能够对变化做出响应,且不需要与特定的库或者服务紧密耦合。
  在 Spring Cloud 项目中可以使用Spirng Cloud Stream轻而易举的构建基于消息传递的解决方案。
为什么使用消息传递
  要解答这个问题,让我们从一个例子开始,之前一直使用的两个服务:许可证服务和组织服务。每次对许可证服务进行请求,许可证服务都要通过 http 请求到组织服务上查询组织信息。显而易见这次额外的 http 请求会花费较长的时间。如果能够将缓存组织数据的读操作,将会大幅提高许可证服务的响应时间。但是缓存数据有如下 2 个要求:
  • 缓存的数据需要在许可证服务的所有实例之间保存一致——这意味着不能将数据缓存到服务实例的内存中。
  • 在更新或者删除一个组织数据时,许可证服务缓存的数据需要失效——避免读取到过期数据,需要尽早让过时数据失效并删除。
  要实现上面的要求,现在有两种办法。
  • 使用同步请求--响应模型来实现。组织服务在组织数据变化时调用许可证服务的接口通知组织服务已经变化,或者直接操作许可证服务的缓存。
  • 使用事件驱动。组织服务发出一个异步消息。许可证服务收到该消息后清除对应的缓存。

同步请求-响应方式
  许可证服务在 redis 中缓存从组织服务中查询到的服务信息,当组织数据更新时,组织服务同步 http 请求通知许可证服务数据过期。这种方式有以下几个问题:
  • 组织服务和许可证服务紧密耦合
  • 这种方式不够灵活,如果要为组织服务添加新的消费者,必须修改组织服务代码,以让其通知新的服务数据变动。
使用消息传递方式
  同样的许可证服务在 redis 中缓存从组织服务中查询到的服务信息,当组织数据更新时,组织服务将更新信息写入到队列中。许可证服务监听消息队列。使用消息传递有一下 4 个好处:
  • 松耦合性:将服务间的依赖,变成了服务对队列的依赖,依赖关系变弱了。
  • 耐久性:即使服务消费者已经关闭了,也可以继续往里发送消息,等消费者开启后处理
  • 可伸缩性: 消息发送者不用等待消息消费者的响应,它们可以继续做各自的工作
  • 灵活性:消息发送者不用知道谁会消费这个消息,因此在有新的消息消费者时无需修改消息发送代码
spring cloud 中使用消息传递
  spring cloud 项目中可以通过 spring cloud stream 框架来轻松集成消息传递。该框架最大的特点是抽象了消息传递平台的细节,因此可以在支持的消息队列中随意切换(包括 Apache Kafka 和 RabbitMQ)。
spring cloud stream 架构
  spring cloud stream 中有 4 个组件涉及到消息发布和消息消费,分别为:
  • 发射器

      当一个服务准备发送消息时,它将使用发射器发布消息。发射器是一个 Spring 注解接口,它接收一个普通 Java 对象,表示要发布的消息。发射器接收消息,然后序列化(默认序列化为 JSON)后发布到通道中。
  • 通道

      通道是对队列的一个抽象。通道名称是与目标队列名称相关联的。但是队列名称并不会直接公开在代码中,代码永远只会使用通道名。
  • 绑定器

      绑定器是 spring cloud stream 框架的一部分,它是与特定消息平台对话的 Spring 代码。通过绑定器,使得开发人员不必依赖于特定平台的库和 API 来发布和消费消息。
  • 接收器

      服务通过接收器来从队列中接收消息,并将消息反序列化。

处理逻辑如下:
实战
  继续使用之前的项目,在许可证服务中缓存组织数据到 redis 中。
建立 redis 服务
  为方便起见,使用 docker 创建 redis,建立脚本如下:
docker run -itd --name redis --net host redis:建立 kafka 服务在组织服务中编写消息生产者
  首先在 organization 服务中引入 spring cloud stream 和 kafka 的依赖。
<dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-stream</artifactId></dependency><dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency>
  然后在 events 类中编写SimpleSouce类,用于组织数据修改,产生一条消息到队列中。代码如下:
@EnableBinding(Source.class)public class SimpleSource {    private Logger logger = LoggerFactory.getLogger(SimpleSource.class);    private Source source;    @Autowired    public SimpleSource(Source source) {        this.source = source;    }    public void publishOrChange(String action, String orgId) {        logger.info("在请求:{}中,发送kafka消息:{} for Organization Id:{}", UserContextHolder.getContext().id, action, orgId);        OrganizationChange change = new OrganizationChange(action, orgId, UserContextHolder.getContext().id);        source.output().send(MessageBuilder.withPayload(change).build());    }}
这里使用的是默认通道,Source 类定义的 output 通道发消息。后面通过 Sink 定义的 input 通道收消息。
  然后在OrganizationController类中定义一个 delete 方法,并注入 SimpleSouce 类,代码如下:
@Autowiredprivate SimpleSource simpleSource;@DeleteMapping(value = "/organization/{orgId}")public void deleteOne(@PathVariable("orgId") String id) {    logger.debug("删除了组织:{}", id);    simpleSource.publishOrChange("delete", id);}
  最后在配置文件中加入消息队列的配置:
# 省略了其他配置spring:  cloud:    stream:      bindings:        output:          destination: orgChangeTopic          content-type: application/json      kafka:        binder:          # 替换为部署kafka的ip和端口          zk-nodes: 192.168.226.5:2181          brokers: 192.168.226.5:9092
  现在我们可以测试下访问localhost:5555/apis/org/organization/12,可以看到控制台打印消息生成的日志。
在许可证服务中编写消息消费者
  集成 redis 的方法,参看。这里不作说明。
  首先引入依赖,依赖项同上面组织服务。
  然后在 event 包下创建OrgChange的类,代码如下:
@EnableBinding(Sink.class) //使用Sink接口中定义的通道来监听传入消息public class OrgChange {    private Logger logger = LoggerFactory.getLogger(OrgChange.class);    @StreamListener(Sink.INPUT)    public void loggerSink(OrganizationChange change){        logger.info("收到一个消息,组织id为:{},关联id为:{}",change.getOrgId(),change.getId());        //删除失效缓存        RedisUtils.del(RedisKeyUtils.getOrgCacheKey(change.getOrgId()));    }}//下面两个都在util包下//RedisKeyUtils.java代码如下public class RedisKeyUtils {    private static final String  ORG_CACHE_PREFIX = "orgCache_";    public static String getOrgCacheKey(String orgId){        return ORG_CACHE_PREFIX+orgId;    }}//RedisUtils.java代码如下@Component@SuppressWarnings("all")public class RedisUtils {    public static RedisTemplate redisTemplate;    @Autowired    public void setRedisTemplate(RedisTemplate redisTemplate) {        RedisUtils.redisTemplate = redisTemplate;    }    public static boolean setObj(String key,Object value){        return setObj(key,value,0);    }    /**     * Description:     *     * @author fanxb     * @date 2019/2/21 15:21     * @param key 键     * @param value 值     * @param time 过期时间,单位ms     * @return boolean 是否成功     */    public static boolean setObj(String key,Object value,long time){        try{            if(time<=0){                redisTemplate.opsForValue().set(key,value);            }else{                redisTemplate.opsForValue().set(key,value,time,TimeUnit.MILLISECONDS);            }            return true;        }catch (Exception e){            e.printStackTrace();            return false;        }    }    public static Object get(String key){        if(key==null){            return null;        }        try{            Object obj = redisTemplate.opsForValue().get(key);            return obj;        }catch (Exception e){            e.printStackTrace();            return null;        }    }    public static void del(String... key){        if(key!=null && key.length>0){            redisTemplate.delete(CollectionUtils.arrayToList(key));        }    }}
  上面用到的是 Sink.INPUT 通道,这个和之前的 Source.OUTPUT 通道刚好一队,一个负责收,一个负责发。
  然后修改OrganizationByRibbonService.java文件中的getOrganizationWithRibbon方法:
    public Organization getOrganizationWithRibbon(String id) {        String key = RedisKeyUtils.getOrgCacheKey(id);        //先从redis缓存取数据        Object res = RedisUtils.get(key);        if (res == null) {            logger.info("当前数据无缓存:{}", id);            try{            ResponseEntity<Organization> responseEntity = restTemplate.exchange("http://organizationservice/organization/{id}",                    HttpMethod.GET, null, Organization.class, id);            res = responseEntity.getBody();            RedisUtils.setObj(key, res);            }catch (Exception e){                e.printStackTrace();            }        } else {            logger.info("当前数据为缓存数据:{}", id);        }        return (Organization) res;    }
  最后修改配置文件,为 input 通道指定 topic,配置如下:
spring:  cloud:    stream:      bindings:        input:          destination: orgChangeTopic          content-type: application/json          # 定义将要消费消息的消费者组的名称          # 可能多个服务监听同一个消息队列。如果定义了消费者组,那么同组中只要有一个消费了消息,剩余的不会再次消费该消息,保证只有消息的          # 一个副本会被该组的某个实例所消费          group: licensingGroup      kafka:        binder:          zk-nodes: 192.168.226.5:2181          brokers: 192.168.226.5:9092
基本和发送的配置相同,只是这里是为input通道映射队列,然后还定义了一个组名,避免一个消息被重复消费。
  现在来多次访问localhost:5555/apis/licensingservice/licensingByRibbon/12,可以看到 licensingservice 控制台打印数据从缓存中读取,如下所示:
然后再以 delete 访问localhost:5555/apis/org/organization/12清除缓存,再次访问 licensingservice 服务,结果如下:
自定义通道
  上面用的是Spring Cloud Stream自带的 input/output 通道,那么要如何自定义通道呢?下面以自定义customInput/customOutput通道为例。
自定义发数据通道public interface CustomOutput {    @Output("customOutput")    MessageChannel out();}
  对于每个自定义的发数据通道,需使用@OutPut 注解标记的返回 MessageChannel 类的方法。
自定义收数据通道public interface CustomInput {    @Input("customInput")    SubscribableChannel in();}
  同上,对应自定义的收数据通道,需要使用@Input 注解标记的返回 SubscribableChannel 类的方法。
结束
  看完本篇你应该已经能够在 Spring Cloud 中集成 Spring Cloud Stream 消息队列了,貌似这个也能用到普通的 spring boot 项目中,比直接集成 mq 更加的优雅。

————————————

有任何问题欢迎随时在文章下方留言

推荐了解热门学科

java培训 Python人工智能 Web前端培训 PHP培训
区块链培训 影视制作培训 C++培训 产品经理培训
UI设计培训 新媒体培训 产品经理培训 Linux运维
大数据培训 智能机器人软件开发




传智播客是一家致力于培养高素质软件开发人才的科技公司“黑马程序员”是传智播客旗下高端IT教育品牌。自“黑马程序员”成立以来,教学研发团队一直致力于打造精品课程资源,不断在产、学、研3个层面创新自己的执教理念与教学方针,并集中“黑马程序员”的优势力量,针对性地出版了计算机系列教材50多册,制作教学视频数+套,发表各类技术文章数百篇。

传智播客从未停止思考

传智播客副总裁毕向东在2019IT培训行业变革大会提到,“传智播客意识到企业的用人需求已经从初级程序员升级到中高级程序员,具备多领域、多行业项目经验的人才成为企业用人的首选。”

中级程序员和初级程序员的差别在哪里?
项目经验。毕向东表示,“中级程序员和初级程序员最大的差别在于中级程序员比初级程序员多了三四年的工作经验,从而多出了更多的项目经验。“为此,传智播客研究院引进曾在知名IT企业如阿里、IBM就职的高级技术专家,集中研发面向中高级程序员的课程,用以满足企业用人需求,尽快补全IT行业所需的人才缺口。

何为中高级程序员课程?

传智播客进行了定义。中高级程序员课程,是在当前主流的初级程序员课程的基础上,增加多领域多行业的含金量项目,从技术的广度和深度上进行拓展“我们希望用5年的时间,打造上百个高含金量的项目,覆盖主流的32个行业。”传智播客课程研发总监于洋表示。




黑马程序员热门视频教程【点击播放】

Python入门教程完整版(懂中文就能学会) 零起点打开Java世界的大门
C++| 匠心之作 从0到1入门学编程 PHP|零基础入门开发者编程核心技术
Web前端入门教程_Web前端html+css+JavaScript 软件测试入门到精通


在线咨询 我要报名
和我们在线交谈!