博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【BCVP】实现基于 Redis 的消息队列
阅读量:4033 次
发布时间:2019-05-24

本文共 5949 字,大约阅读时间需要 19 分钟。

聆听自己的声音

如果自己学不动了,或者感觉没有动力的时候,看看书,听听音乐,跑跑步,休息两天,重新出发,偷懒虽好,可不要贪杯。

话说上回书我们说到了,Redis的使用修改《》,通过异步的时候,基本上会解决StackExRedis组件使用过程中,可能在并发的时候遇到的问题,而且该组件也是微软官方推荐的(参考微软微服务框架eShopOnContainers),如果一定要抬杠说不好用,其实是没必要的。那今天我们继续往下说,简单说下如何基于Redis实现消息队列。

目前在市面上比较主流的消息队列中间件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ等这几种。当然常见的还是基于RabbitMQ来实现的,Redis份额稍微小了一点,但是因为Redis的仓储、缓存等多个方面的好处,使得Redis也是很火。

1

什么是消息队列

这个其实我今天不打算重点讲,因为我详细每个人能看这篇文章,肯定都知道消息队列的相关内容,但是为了不那么突兀,我就从网上粘贴几块基本概念,了解一二:

基本概念:

消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。

消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。

最终可以实现解耦的目的。

下面通过一个简单的架构模型来解释:

  • Producer:消息生产者,负责产生和发送消息到Broker。

  • Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个Queue。

  • Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理。

有哪些优缺点:

从上边的定义中,我们可以看出来,优点主要是三块:

异步、流量削峰与流控、解耦

这三个优点在高并发等三高场景还是很有必要的,甚至说是十分必要的。

典型的广播模式,一个消息可以发布到多个消费者;

消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息;

比如我们某宝下订单,或某6抢车票,那都是放到队列里缓冲的,要是都用服务端等待,可能早就崩了,当然实际上比这个复杂的多。

而且,通过订阅发布的模式,异步执行,这样就会大大缓解时间压力。

但是,随之而来的弊端也是有的:

比如为了异步,就是接收者必须轮询消息队列,才能收到最近的消息。然后还有就是不能达到实时性,说白了就是用空间换时间,从而降低瓶颈。

消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回。不能保证每个消费者接收的时间是一致的。若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。

五种常见模式

简单模式Hello World

功能:一个生产者P发送消息到队列Q,一个消费者C接收

工作队列模式Work Queue

功能:一个生产者,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列

发布/订阅模式Publish/Subscribe

功能:一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者

路由模式Routing

说明:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key

通配符(主题)模式Topic

说明:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;

更多具体的内容呢,自己感兴趣多去搜索下吧,肯定还是有很多其他问题的,我这里就不铺开了讲了,下边咱们就说说,如何在Blog.Core里添加队列吧。

2

订阅发布相关配置案例

案例有很多,自己可以根据情况自定义。

那既然要讲东西,肯定不能随便放一个算法,肯定是需要一个小demo,一个应用场景,这样更有助于初学者去理解,之前考虑了很多,一直没有想好在BlogCore里边使用什么案例场景来说一说消息队列,最后实在是没办法,只能说日志了,万事不决就说日志,好像软件开发都是这么举例的。

这里说一下,假设我们自定义了一个日志记录的方法,就是在txt里写数据,其实我现在也是这么用的,平时肯定会一边查一边写,如果并发高一下,肯定就会出现死锁或者异常的出现,那我们就可以把写日志放到消息队列里,缓冲一下,然后在写一个订阅者,专门来“盯着”队列,一有消息传过来,就写到日志文件里,这样就能很好的实现相应的目的。如果不缓冲下,有时候日志可能高达几万条,瞬间爆炸。

那说了这个小场景,接下来就简单的模拟一下吧。

1、定义消息队列操作类与接口

既然要发布和订阅消息,肯定就需要有相应的操作方法,在上一篇文章中,我新建了一个RedisBasketRepository.cs的操作类,那我们还继续在这个类文件里写吧,注意,这个实现类和接口,已经注册到服务容器了,如果你第一次操作,可以参考文章开头上篇文章内容:

///  /// 根据key获取RedisValue ///  /// 
/// ///
public async Task
ListRangeAsync(string redisKey) { return await _database.ListRangeAsync(redisKey); } ///
/// 在列表头部插入值。如果键不存在,先创建再插入值 /// ///
///
///
public async Task
ListLeftPushAsync(string redisKey, string redisValue, int db = -1) { return await _database.ListLeftPushAsync(redisKey, redisValue); } ///
/// 在列表尾部插入值。如果键不存在,先创建再插入值 /// ///
///
///
public async Task
ListRightPushAsync(string redisKey, string redisValue, int db = -1) { return await _database.ListRightPushAsync(redisKey, redisValue); } ///
/// 移除并返回存储在该键列表的第一个元素 反序列化 /// ///
///
public async Task
ListLeftPopAsync
(string redisKey, int db = -1) where T : class { return JsonConvert.DeserializeObject
(await _database.ListLeftPopAsync(redisKey)); }

我这里只是简单的Copy出来几个做例子,总的一共有12个,当然你也可以自定义增加或删除某些不必要的,核心的可以看出来,都是根据redisKey来操作的

Task
ListRangeAsync(string redisKey); Task
ListLeftPushAsync(string redisKey, string redisValue, int db = -1); Task
ListRightPushAsync(string redisKey, string redisValue, int db = -1); Task
ListRightPushAsync(string redisKey, IEnumerable
redisValue, int db = -1); Task
ListLeftPopAsync
(string redisKey, int db = -1) where T : class; Task
ListRightPopAsync
(string redisKey, int db = -1) where T : class; Task
ListLeftPopAsync(string redisKey, int db = -1); Task
ListRightPopAsync(string redisKey, int db = -1); Task
ListLengthAsync(string redisKey, int db = -1); Task
> ListRangeAsync(string redisKey, int db = -1); Task
> ListRangeAsync(string redisKey, int start, int stop, int db = -1); Task
ListDelRangeAsync(string redisKey, string redisValue, long type = 0, int db = -1);  Task ListClearAsync(string redisKey, int db = -1);

2、如何发布消息与接收消息

上边定义好了相应的操作方法以后,就很简单了,我们来发布一条消息来试试:

[HttpGet] [AllowAnonymous] public async Task RedisMq() {     var msg = "这里是一条日志";     await _redisBasketRepository.ListLeftPushAsync(RedisMqKey.Loging, msg); }

就是这么简单,构造函数注入以后,直接调用相应的方法,就把消息msg推送到了队列里了,这里的redisKey,我用了常量定义,具体可操作Blog.Core源代码。

现在是发布消息特别简单,只需要一行接口,那如何去获取呢,在上边的获取方法中,我们定义的是:

Task
ListRangeAsync(string redisKey);

这个方法也是可以的,只不过我们需要对其进行转换,毕竟存的msg是字符串string类型的,但是这里的返回类型的RedisValue[],所以需要劈里啪啦转化一下。

但是这里有一个问题,就是如何去定时获取呢,也就是如何设计一个订阅者进行消费消息呢,这需要思考下,当然比较简单的就是while(true){},可能平时就是这么使用的,不过还是不是那么爽快,可以写一个组件来处理,简单快捷,正好,有一个大佬已经封装好了,我们可以直接拿来用,如果你有什么问题,可以给他提issue。

3、InitQ组件来订阅消息

在nuget中,可以直接安装组个组件:

他的开源地址是:

https://github.com/wmowm/Initq

使用方法很简单,可以参考他的README里的介绍:

1、先添加服务

/// /// Redis 消息队列 启动服务/// public static class RedisInitMqSetup{    public static void AddRedisInitMqSetup(this IServiceCollection services)    {        if (services == null) throw new ArgumentNullException(nameof(services));        services.AddInitQ(m =>        {            //时间间隔            m.SuspendTime = 5000;            //redis服务器地址            m.ConnectionString = "127.0.0.1:6379";            //对应的订阅者类,需要new一个实例对象,当然你也可以传参,比如日志对象            m.ListSubscribe = new List
() { new RedisSubscribe()}; //显示日志 m.ShowLog = false; }); }}

2、定义订阅者

public class RedisSubscribe : IRedisSubscribe {     [Subscribe(RedisMqKey.Loging)]     private async Task SubRedisLoging(string msg)     {         Console.WriteLine($"队列{RedisMqKey.Loging} 消费到/接受到 消息:{msg}");         await Task.CompletedTask;     } }

整体很简单,继承接口,然后添加上特性,这个特性里的参数,就是我们消息发布的时候的那个key,然后方法的参数,就是对应的消息msg,是不是很简单。

当然这里你可以传递一个日志的对象实例,这样就把日志信息分流到了队列里,然后队列走到这个订阅者里,由这里进行缓冲,然后把日志填充到日志文件,从而达到减峰的目的。

最终的效果可以看看:

好啦,今天的redis消息队列已经说完了,还是很简单的,其中重点还是那五种模式要自己好好了解下,然后整体过程自己把握把握,至于RabbitMQ,这个以后再说吧。

END

扫码关注

老张的哲学

更多精彩等着你

转载地址:http://ugzdi.baihongyu.com/

你可能感兴趣的文章
Intellij IDEA启动优化,让开发的感觉飞起来
查看>>
使用 Springboot 对 Kettle 进行调度开发
查看>>
Kettle链接MySQL报错:Driver class 'org.gjt.mm.mysql.Driver' could not be found
查看>>
如何优雅的编程,lombok你怎么这么好用
查看>>
一文看清HBase的使用场景
查看>>
除了负载均衡,Nginx还可以做很多,限流、缓存、黑白名单
查看>>
解析zookeeper的工作流程
查看>>
搞定Java面试中的数据结构问题
查看>>
CentOS7 安装 MySQL8
查看>>
springcloud中fegin第一次跨模块调用超时
查看>>
scratch win10 环境搭建
查看>>
谷歌浏览器安装 elasticsearch-head 插件
查看>>
Oracle 字符串批量替换
查看>>
Flink 使用 Scala 编程中注意的隐式转换
查看>>
Flink SQL 开发的代码结构
查看>>
Flink SQL 项目通用模板一
查看>>
Flink 连接 MySQL 错误:The server time zone value ‘Öйú±ê׼ʱ¼ä‘ is unrecognized or represents
查看>>
大数据集群巡检,最佳实践记录
查看>>
纯JS实现QQ右下角弹窗demo
查看>>
JS页面一键分享QQ空间、新浪微博、豆瓣等小工具
查看>>