`

redis实现异步消息处理

阅读更多

 

 如何使用redis实现消息的订阅与发布

 

 测试类

  1. /**
  2. *redis消息订阅发布测试类
  3. */
  4. @Controller
  5. @RequestMapping(value="/queue")
  6. publicclassTestControlller{
  7. @Autowired
  8. privateRedisTemplate redisService;
  9. static{
  10. newTestControlller().initThread("msgQueue");
  11. }
  12. /**订阅消息-启动消费队列数据线程**/
  13. public void initThread(String key){
  14. newThread(new blpopMessageHandler(key)).start();
  15. }
  16. /**发布消息--往队列里写数据**/
  17. @RequestMapping(value="/brpush")
  18. @UnSession
  19. @ResponseBody
  20. publicboolean brpush(String key,String name,int age)throwsException{
  21. boolean flag = redisService.rpush(key,JSON.toJSONString(newEdu(name,age)));
  22. return flag;
  23. }
  24. /**线程实时监听获取数据**/
  25. class blpopMessageHandler implementsRunnable{
  26. privateString key;
  27. privateRedisTemplate redisTemplate=(RedisTemplate)SpringContextUtil.getBean("redisTemplate");
  28. publicString getKey(){
  29. return key;
  30. }
  31. publicvoid setKey(String key){
  32. this.key = key;
  33. }
  34. public blpopMessageHandler(String key){
  35. this.key = key;
  36. }
  37. @Override
  38. publicvoid run(){
  39. do{
  40. //获取到数据
  41. String result = redisTemplate.blpop(1000,key);
  42. if(EmptyUtil.isNotEmpty(result)){
  43. //do something
  44. Edu edu = JSON.parseObject(result,Edu.class);
  45. System.out.println("【线程一】"+edu.getName()+"---->"+edu.getAge());
  46. }
  47. }while(true);
  48. }
  49. }
  50. }

model类

  1.  
  2. publicclassEduimplementsSerializable{
  3. privatestaticfinallong serialVersionUID =-6336530413316596246L;
  4. privateString name;
  5. privateint age;
  6. publicString getName(){
  7. return name;
  8. }
  9. publicvoid setName(String name){
  10. this.name = name;
  11. }
  12. publicint getAge(){
  13. return age;
  14. }
  15. publicvoid setAge(int age){
  16. this.age = age;
  17. }
  18. publicEdu(String name,int age){
  19. this.name = name;
  20. this.age = age;
  21. }
  22. @Override
  23. publicString toString(){
  24. return"Edu{"+
  25. "name='"+ name +'\''+
  26. ", age="+ age +
  27. '}';
  28. }
  29. }

操作redis类

  1.  
  2. /**
  3. * 实现Jedis对象的封装,实现操作
  4. */
  5. @Component
  6. publicclassRedisTemplate{
  7. privatestaticLogger logger =LoggerFactory.getLogger(RedisTemplate.class);
  8. // 封装一个pool池用于jedis对象的管理
  9. privatePool<Jedis> jedisPool;
  10. publicRedisTemplate(Pool<Jedis> jedisPool){
  11. this.jedisPool = jedisPool;
  12. }
  13. /**
  14. * 存储REDIS队列 顺序存储
  15. * @param key reids键名
  16. * @param value 键值
  17. */
  18. publicboolean rpush(String key,String value){
  19. Jedis jedis =null;
  20. boolean flag =false;
  21. try{
  22. jedis = jedisPool.getResource();
  23. jedis.rpush(key.getBytes("utf-8"),value.getBytes("utf-8"));
  24. flag =true;
  25. }catch(Exception e){
  26. //释放redis对象
  27. jedisPool.returnBrokenResource(jedis);
  28. e.printStackTrace();
  29. }finally{
  30. //返还到连接池
  31. close(jedis);
  32. }
  33. return flag;
  34. }
  35. /**
  36. * 获取队列数据
  37. * @param key 键名
  38. * @return
  39. */
  40. publicString lpop(String key){
  41. byte[] bytes =null;
  42. Jedis jedis =null;
  43. try{
  44. jedis = jedisPool.getResource();
  45. bytes = jedis.lpop(key.getBytes("utf-8"));
  46. }catch(Exception e){
  47. //释放redis对象
  48. jedisPool.returnBrokenResource(jedis);
  49. e.printStackTrace();
  50. }finally{
  51. //返还到连接池
  52. close(jedis);
  53. }
  54. if(EmptyUtil.isNotEmpty(bytes)){
  55. try{
  56. returnnewString(bytes,"utf-8");
  57. }catch(UnsupportedEncodingException e){
  58. e.printStackTrace();
  59. }
  60. }
  61. returnnull;
  62. }
  63. /**
  64. * 获取阻塞list中的数据
  65. * @param timeout
  66. * @param key
  67. * @return
  68. */
  69. publicString blpop(int timeout,String key){
  70. List<String> list =newArrayList<>();
  71. Jedis jedis =null;
  72. try{
  73. jedis = jedisPool.getResource();
  74. list = jedis.blpop(timeout,key);
  75. }catch(Exception e){
  76. //释放redis对象
  77. jedisPool.returnBrokenResource(jedis);
  78. e.printStackTrace();
  79. }finally{
  80. //返还到连接池
  81. close(jedis);
  82. }
  83. returnEmptyUtil.isNotEmpty(list)&& list.size()>1? list.get(1):null;
  84. }
  85. privatevoid close(Jedis jedis){
  86. try{
  87. jedisPool.returnResource(jedis);
  88. }catch(Exception e){
  89. if(jedis.isConnected()){
  90. jedis.quit();
  91. jedis.disconnect();
  92. }
  93. }
  94. }
  95. }
分享到:
评论

相关推荐

    若依前后端分离版去redis版/无redis版本

    消息队列:Redis支持发布/订阅模式,可用于事件驱动架构,实现异步消息传递。 缺点 复杂性:引入Redis增加了系统的复杂性,需要考虑数据一致性、部署维护等问题。 成本:部署和维护Redis需要额外的成本和资源。 无...

    reactphp-redis:基于ReactPHP的异步Redis客户端实现

    线索/ reactphp-redis 基于异步客户端实现。 是一个开源的高级内存中键值数据库。 它提供了一组简单的原子操作,以便使用其原始数据类型。 其轻巧的设计和快速的操作使其成为现代应用程序堆栈的理想选择。 该库为您...

    php+redis实现消息队列功能示例

    把瞬间服务器的请求处理换成异步处理,缓解服务器的压力 实现数据顺序排列获取 redis实现消息队列步骤如下: 1).redis函数rpush,lpop 2).建议定时任务入队列 3)创建定时任务出队列 文件:demo.php插入数据到redis...

    异步redis队列实现 数据入库的方法

    1、异步队列处理 2、redis 过滤(就是只处理当天第一次请求) 3、redis 辅助存储app名称(验证过后批量插入数据app名称表中) 4、拼接插入的以及新增的如详细表中 解决办法: 1、接口修改 redis 过滤 + 如l

    cpp_redis:C ++ 11轻量级Redis客户端:异步,线程安全,无依赖项,流水线,多平台-不再维护-请检查https:github.comcpp-rediscpp_redis

    cpp_redis cpp_redis是C ++ 11异步多平台轻量级Redis客户端,支持同步操作,流水线,标记和高可用性。需求cpp_redis没有依赖项。 它唯一的要求是C++11 。 它没有网络模块,因此您可以自由配置自己的模块,也可以使用...

    redispatcher:Redis代理的Python异步消息传递服务

    使用者只是一个类,它定义了它将要侦听的mssages的结构,并且是一个实现用于处理该消息的逻辑的函数。出版您定义的每个使用者都将为您提供一种易于publish方法,您可以使用该方法将消息排队。 因为我们使用P

    PHP消息队列实现及应用.txt

    但消息队列可以做的不止是这一类场景,它在解耦、消峰、异步、一致性等方面都有很大的用武之地。因此如何合理使用消息队列来处理一些特殊的业务需求,这就是我们这节课要解决的内容。 【课程目录】 第1章 消息队列...

    30道Redis面试题.docx

    30道Redis经典面试题,学会拿高薪...Redis的出色之处不仅仅是性能,Redis最大的魅力是支持保存多种数据结构,此外单个value的最大限制是1GB,不像 memcached只能保存1MB的数据,因此Redis可以用来实现很多有用的功能。

    Redis常见面试题汇总.pdf

    Redis可以用来实现很多有用的功能,比方说用他的List来做FIFO双向链表,实现-个轻量 级的高性能消息队列服务,用他的Set可以做高性能的tag系统等等。另外Redis也可以对存 入的Key-Value设置expire时间,因此也可以被...

    秒杀系统(SpringBoot + Redis + RabbitMq)

    业务和异常因为时间关系就直接写在了controller,根据需要修改位置,另外RabbitMq的异步处理和多线程业务,看需要也可以开启。秒杀系统(SpringBoot + Redis + RabbitMq)

    Redis的.net客户端StackExchange.Redis.zip

     一般使用更专业的消息队列来处理这种业务场景,因此这里就略过了。三种命令模式Sync vs Async vs Fire-and-Forget最后,这里有三种命令模式分别对应 StackExchange.Redis的三类不同的使用场景。Sync,同步模式会直接...

    最详细Redis学习资料(源码)

    发布/订阅:Redis提供了发布/订阅功能,能够实现消息的异步发布和订阅,适用于实时通知、事件驱动等场景。 分布式支持:Redis提供了分布式部署和数据分片功能,能够构建高可用性、可扩展性的分布式系统。 高性能:...

    Java实现消息队列,异步队列、延迟队列、死信队列、各种异常逻辑处理

    Java实现消息队列,异步队列、延迟队列、死信队列、各种异常逻辑处理。 完全开源、自主研发,实际项目中使用验证,功能稳定,性能优,部署时集成Redis即可。 基于springboot框架,启动快,各种容错,占用内存资源少

    基于 Springboot + Redis + Kafka 的秒杀系统,乐观锁 + 缓存 + 限流 + 异步

    异步:将同步请求转换为异步请求,来提高并发量,本质也是削峰处理 利用缓存:创建订单时,每次都需要先查询判断库存,只有少部分成功的请求才会创建订单,因此可以将商品信息放在缓存中,减少数据库查询 负载均衡:...

    46道史上最全Redis面试题

    比方说用他的 List 来做 FIFO 双向链表,实现一个轻量级的高性 能消息队列服务,用他的 Set 可以做高性能的 tag 系统等等。 另外 Redis 也可以对存入的 Key-Value 设置 expire 时间,因此也可以被当作一 个功能加强...

    基于Springboot+Mybatis+Redis+MySql+RabbitMq的校园医疗管理系统,毕业设计

    消息队列:使用RabbitMQ进行异步消息处理。 构建工具:使用Maven进行项目构建。 3. 系统设计 数据库设计:设计数据库模型,包括用户、预约记录、就诊记录、药品等实体。 用户界面设计:设计用户友好的界面,确保良好...

    Redis上实现分布式锁以提高性能的方案研究

    任务队列用到分布式锁的情况比较多,在将业务逻辑中可以异步处理的操作放入队列,在其他线程中处理后出队,此时队列中使用了分布式锁,保证入队和出队的一致性。关于redis队列这块的逻辑分析,我将在下一次对其进行...

    Springboot+Mybatis-plus+ SpringMvc+Shiro+Redis企业级报表后台管理系统.rar

    存储过程等),用Redis做中间缓存,缓存数据 实现异步处理,定时任务,整合Quartz Job以及Spring Task 邮件管理功能, 整合spring-boot-starter-mail发送邮件等, 数据源:druid 用户管理,菜单管理,角色管理,代码...

    Springboot+Mybatis-plus+ SpringMvc+Shiro+Redis企业级报表后台管理系统

    实现异步处理,定时任务,整合Quartz Job以及Spring Task 邮件管理功能, 整合spring-boot-starter-mail发送邮件等, 数据源:druid 用户管理,菜单管理,角色管理,代码生成 运行环境 jdk8+oracle+redis+...

    基于Django与Celery实现异步对列任务

    本译文结合Django+Celery+Redis实现一个定期从Flickr获取图片并展示的简单案例,方便大家理解实现异步对列任务的过程。刚接触django的时候,我经历过的最让人沮丧的事情是需要定期运行一段代码。我写了一个需要每天...

Global site tag (gtag.js) - Google Analytics