高级Redis之Stream的用法示例

不想自己搭建一个mq怎么办?Redis的Stream 来帮你,Redis Stream 是 Redis 5.0 引入的一种新的数据结构,用于处理实时的、可持久化的、基于时间序列的数据流。它非常适合处理事件流、日志、消息队列等场景。下面是一个使用 Redis Stream 的具体应用场景:简单的消息队列系统。

应用场景:实时消息队列

假设你正在构建一个实时消息通知系统,多个服务需要向某个队列写入消息,多个消费者服务需要从这个队列中读取消息执行相应操作。这个消息队列需要有高性能和高可用性,并且能够应对突发流量。

以下是如何使用 Redis Stream 实现完成订单后通知会员服务加积分这个应用场景的步骤:

步骤 1: 添加必要的依赖

在你的 pom.xml 文件中添加 LettuceSpring Data Redis 依赖:

<dependencies>
    <!-- Spring Boot Starter Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- Spring Data Redis -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <!-- Lettuce dependency for Redis interaction -->
    <dependency>
        <groupId>io.lettuce.core</groupId>
        <artifactId>lettuce-core</artifactId>
        <version>6.1.5</version>
    </dependency>
</dependencies>

步骤 2: 配置 Redis 连接

在你的 application.propertiesapplication.yml 文件中配置 Redis 连接:

spring:
  redis:
    host: localhost
    port: 6379

步骤 3: 创建订单服务 (生产者)

订单服务在订单完成后将订单信息写入 Redis Stream。可以使用 Lettuce 库来与 Redis 进行交互。

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Service
public class OrderService {

    private static final String STREAM_KEY = "order_stream";
    private RedisClient redisClient;
    private StatefulRedisConnection<String, String> connection;
    private RedisCommands<String, String> commands;

    public OrderService() {
        this.redisClient = RedisClient.create("redis://localhost:6379");
        this.connection = redisClient.connect();
        this.commands = connection.sync();
    }

    public void completeOrder(String orderId, String userId, int points) {
        Map<String, String> orderData = new HashMap<>();
        orderData.put("orderId", orderId);
        orderData.put("userId", userId);
        orderData.put("points", String.valueOf(points));

        String messageId = commands.xadd(STREAM_KEY, orderData);
        System.out.println("Order completed with messageId: " + messageId);
    }

    public void close() {
        connection.close();
        redisClient.shutdown();
    }
}

步骤 4: 创建会员服务 (消费者)

会员服务从 Redis Stream 中读取消息,并处理用户积分的增加。

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.StreamMessage;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Map;

@Service
public class MemberService {

    private static final String STREAM_KEY = "order_stream";
    private static final String CONSUMER_GROUP = "member_group";
    private static final String CONSUMER_NAME = "member_service";

    private RedisClient redisClient;
    private StatefulRedisConnection<String, String> connection;
    private RedisCommands<String, String> commands;

    public MemberService() {
        this.redisClient = RedisClient.create("redis://localhost:6379");
        this.connection = redisClient.connect();
        this.commands = connection.sync();
        
        // 创建消费组
        try {
            commands.xgroupCreate(STREAM_KEY, CONSUMER_GROUP, io.lettuce.core.StreamOffset.from("0"), true);
        } catch (Exception e) {
            System.out.println("Consumer group already exists");
        }
    }

    public void consumeMessages() {
        while (true) {
            List<StreamMessage<String, String>> messages = commands.xreadgroup(
                    io.lettuce.core.Consumer.from(CONSUMER_GROUP, CONSUMER_NAME),
                    io.lettuce.core.XReadArgs.StreamOffset.lastConsumed(STREAM_KEY)
            );

            for (StreamMessage<String, String> message : messages) {
                Map<String, String> body = message.getBody();
                String orderId = body.get("orderId");
                String userId = body.get("userId");
                int points = Integer.parseInt(body.get("points"));
                
                // 处理用户积分增加逻辑
                System.out.println("Processing order: " + orderId + " for user: " + userId + ", adding points: " + points);

                // 确认处理完成
                commands.xack(STREAM_KEY, CONSUMER_GROUP, message.getId());
            }

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }

    public void close() {
        connection.close();
        redisClient.shutdown();
    }
}

步骤 5: 调整 Spring Boot 启动类

在 Spring Boot 启动类中启动订单服务和会员服务,演示消息的生产和消费:

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RedisStreamDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(RedisStreamDemoApplication.class, args);
    }

    @Bean
    public CommandLineRunner demo(OrderService orderService, MemberService memberService) {
        return args -> {
            // 模拟订单完成
            orderService.completeOrder("order123", "user1", 100);

            // 启动会员服务,处理消息
            new Thread(() -> memberService.consumeMessages()).start();

            // 等待一段时间,确保消息处理完成
            Thread.sleep(5000);

            orderService.close();
            memberService.close();
        };
    }
}

6. 优点

使用 Redis Stream 实现消息队列有以下几个优点:

  1. 高性能:Redis Stream 提供了高性能的读写操作,适用于高吞吐量的场景。
  2. 持久化:Redis Stream 支持数据持久化,不会因为 Redis 重启而丢失数据。
  3. 消费组:支持创建消费者组,多消费者可以协同工作,提高消费效率。
  4. 自动化管理:Redis 可以自动管理消息的 ID、时间戳等,简化开发。

7. 缺点

  • 内存占用:Redis 是内存数据库,若消息量过大,可能会占用大量内存。
  • 学习曲线:Stream API 的使用相对于其他简单数据结构较为复杂,需要一定的学习成本。

总结

通过上述示例,我们展示了如何使用 Redis Stream 实现一个简单的消息队列系统,包括生产者发布消息、消费者读取消息和处理以及消费组的管理。Redis Stream 的高性能、持久化和自动管理特性使其非常适合处理实时数据流、消息队列等场景。希望这个示例能够帮助你更好地理解如何使用 Redis Stream 应对实际开发中的问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/775349.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Build a Large Language Model (From Scratch)附录B(gpt-4o翻译版)

来源&#xff1a;https://github.com/rasbt/LLMs-from-scratch?tabreadme-ov-file https://www.manning.com/books/build-a-large-language-model-from-scratch

模拟,CF 570C - Replacement

一、题目 1、题目描述 2、输入输出 2.1输入 2.2输出 3、原题链接 570C - Replacement 二、解题报告 1、思路分析 1、长为cnt的连续串的最小操作次数为cnt - 1 2、每次将一个非. 替换为. f要么增加1要么增加2 只有前后都是 . 的时候会增加2 同理&#xff0c;当我们将一…

【漏洞复现】飞企互联-FE企业运营管理平台——SQL注入

声明&#xff1a;本文档或演示材料仅供教育和教学目的使用&#xff0c;任何个人或组织使用本文档中的信息进行非法活动&#xff0c;均与本文档的作者或发布者无关。 文章目录 漏洞描述漏洞复现测试工具 漏洞描述 飞企互联-FE企业运营管理平台是一个基于云计算、智能化、大数据…

针对某客户报表系统数据库跑批慢进行性能分析及优化

某客户报表系统数据库跑批时间过长&#xff0c;超出源主库较多&#xff0c;故对其进行了分析调优&#xff0c;目前状态如下&#xff1a; 1、业务连接的rac的scanip&#xff0c;因为负载均衡将跑批的连接连接到了多个计算节点导致节点间通讯成本较高&#xff0c;故速率缓慢&…

2024年7月6日 十二生肖 今日运势

小运播报&#xff1a;2024年7月6日&#xff0c;星期六&#xff0c;农历六月初一 &#xff08;甲辰年庚午月辛未日&#xff09;&#xff0c;法定节假日。 红榜生肖&#xff1a;猪、马、兔 需要注意&#xff1a;狗、鼠、牛 喜神方位&#xff1a;西南方 财神方位&#xff1a;正…

Android - 模拟器

Android SDK 包括一个在您的计算机上运行的虚拟移动设备模拟器。 该模拟器可让您在不使用物理设备的情况下对 Android 应用程序进行原型设计、开发和测试。 在本章中&#xff0c;我们将探索真实安卓设备中存在的模拟器中的不同功能。 创建 AVD 如果您想模拟真实设备&#xff0c…

机器人典型的交互任务、阻抗控制的示意图、内涵、意义、存在的交互控制科学问题

机器人典型的交互任务 机器人在实际应用中经常需要完成与环境的交互任务&#xff0c;这些任务包括但不限于&#xff1a; 装配任务&#xff1a;在制造业中&#xff0c;机器人需要准确地操控和组装各种零部件&#xff0c;包括不同形状、大小和材质的物体。搬运任务&#xff1a;…

docker-compose搭建prometheus、grafana

一、安装prometheus 1、安装 version: 3.1services:prometheus:image: prom/prometheus:v2.48.0container_name: prometheushostname: prometheusrestart: alwaysvolumes:- ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml- ./prometheus/:/etc/prometheus/port…

strcpy,srtcmp,strlen函数漏洞利用

strcpy,srtcmp,strlen函数漏洞利用 strcpy strcpy函数用于将字符串复制到另一个指针指向的空间中&#xff0c;遇到空字符 **b’x\00’**时停止&#xff0c;&#xff1a; 所以可以利用 strcpy不检查缓冲区 的漏洞&#xff08;构造的字符串要以\0结尾&#xff09;&#xff0c;…

Ubuntu / Debian安装FTP服务

本章教程,记录在Ubuntu中安装FTP服务的具体步骤。FTP默认端口:21 1、安装 pure-ftpd sudo apt-get install pure-ftpd2、修改默认配置 # 与 centos 不同,这里需要在 /etc/pure-ftpd/conf 文件夹下执行下列命令,增加对应配置文件: # 创建 /etc/pure-ftpd/conf/PureDB 文件…

半导体制造企业 文件共享存储应用

用户背景&#xff1a;半导体设备&#xff08;上海&#xff09;股份有限公司是一家以中国为基地、面向全球的微观加工高端设备公司&#xff0c;为集成电路和泛半导体行业提供具竞争力的高端设备和高质量的服务。 挑战&#xff1a;芯片的行业在国内迅猛发展&#xff0c;用户在上海…

3033.力扣每日一题7/5 Java

博客主页&#xff1a;音符犹如代码系列专栏&#xff1a;算法练习关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ 目录 思路 解题方法 时间复杂度 空间复杂度 Code 思路 首先创建一个与…

Vue3从入门到精通(三)

vue3插槽Slots 在 Vue3 中&#xff0c;插槽&#xff08;Slots&#xff09;的使用方式与 Vue2 中基本相同&#xff0c;但有一些细微的差异。以下是在 Vue3 中使用插槽的示例&#xff1a; // ChildComponent.vue <template><div><h2>Child Component</h2&…

解决前后端同一个端口跨域问题

前端起了一个代理 如果url是api开头的自动代理访问8080端口&#xff08;解决前后端端口不一致要么是前端代理&#xff0c;要么是后端加过滤器&#xff09; proxy:{/api:{target:http://localhost:8080,changeOrigin : true,// 替换去掉路径上的api// rewrite:(path)>path.r…

java集合(1)

目录 一.集合概述 二. 集合体系概述 1. Collection接口 1.1 List接口 1.2 Set接口 2. Map接口 三. ArrayList 1.ArrayList常用方法 2.ArrayList遍历 2.1 for循环 2.2 增强for循环 2.3 迭代器遍历 一.集合概述 我们经常需要存储一些数据类型相同的元素,之前我们学过…

(2024)KAN: Kolmogorov–Arnold Networks:评论

KAN: Kolmogorov–Arnold Networks: A review 公和众与号&#xff1a;EDPJ&#xff08;进 Q 交流群&#xff1a;922230617 或加 VX&#xff1a;CV_EDPJ 进 V 交流群&#xff09; 目录 0. 摘要 1. MLP 也有可学习的激活函数 2. 标题的意义 3. KAN 是具有样条基激活函数的 M…

昇思25天学习打卡营第17天(+1)|Diffusion扩散模型

1. 学习内容复盘 本文基于Hugging Face&#xff1a;The Annotated Diffusion Model一文翻译迁移而来&#xff0c;同时参考了由浅入深了解Diffusion Model一文。 本教程在Jupyter Notebook上成功运行。如您下载本文档为Python文件&#xff0c;执行Python文件时&#xff0c;请确…

安乃达:看不懂的募资

不好玩啊&#xff0c;高标接连被S&#xff0c;市场激进资金找到了新股作为抱团方向。 首日大涨超100%&#xff0c;两日涨幅133%&#xff0c;今天果不其然被电风扇刮走了&#xff0c;今天我们聊聊新加入A股大本营的公司——安乃达。 首先&#xff0c;安乃达是国内直驱轮毂电机头…

vulnhub--IMF

环境 攻击机&#xff1a;192.168.96.4 靶机&#xff1a;ip未知 主机探测 确定靶机ip为32的主机 端口扫描 访问80端口 外围打点 在contact.php页面源码中找到了flag1 之后没啥突破 但查看网络后发现contact.php页面请求的三个js文件的文件名很有特点&#xff0c;猜测是base64编码…

数据结构——队列练习题

在C语言中&#xff0c;.和->运算符用于访问结构体的成员变量。它们之间的区别在于&#xff1a;.运算符用于访问结构体变量的成员。->运算符用于访问结构体指针变量的成员 1a&#xff08;rear指向队尾元素后一位&#xff0c;判空判满时牺牲一个存储单元&#xff09; 首先…