跳到主要内容

5 篇博文 含有标签「Redis」

关于Redis的相关内容

查看所有标签

Redis消息队列消费者端断连重试机制实现

· 阅读需 6 分钟
季冠臣
后端研发工程师

Redis消息队列消费者端断连重试机制实现

在分布式系统中,Redis作为消息队列使用时,经常会面临连接中断的风险,这可能由于网络波动、Redis服务崩溃或其他外部因素导致。为了保证系统的高可用性和消息的可靠性,我们需要实现一个自动重试机制,在Redis连接断开时能够自动重连,并保证消息的消费不中断或丢失。本文将重点介绍如何在Redis队列的消费者端实现连接断开后的重连机制。

1. 问题描述

在我们的架构中,消费者从Redis队列中拉取并处理消息,通常使用Jedis作为客户端与Redis交互。然而,在生产环境中,由于网络延迟、Redis服务重启或Redis连接池被耗尽等原因,Redis连接可能会出现断开。当连接断开时,消费者无法继续拉取消息,导致消息处理中断。

为了解决这个问题,我们需要设计一个重连机制:

  • 自动检测连接状态:当消费者检测到连接断开时,能够自动重试并恢复连接。
  • 保证消息不丢失:在重连期间,如果Redis服务恢复,消息能够继续被消费,不会丢失。

2. 解决方案

2.1 消费者类的重试机制设计

消费者类通过定时检查连接状态,并在Redis连接出现问题时尝试重连。我们通过引入一个标志位isDown来表示当前消费者是否处于断线状态。在连接恢复后,消费者能够自动恢复消息消费。

关键代码实现:

package com.example.redisQueue.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Redis消息队列消费者断连重试机制实现
*/
@Slf4j
@Component
public class RedisQueueConsumerListenerRunner implements Runnable {

@Autowired
private MyAbstractRedisQueueConsumer<?> consumer;

/**
* 是否宕机标志
* 通过static保证全局共享该标志
*/
public static AtomicBoolean isDown = new AtomicBoolean(false);

@Override
public void run() {
log.info("消费者启动,开始监听Redis队列...");

// 启动消息消费线程
Executors.newSingleThreadExecutor().submit(() -> {
try {
consumer.consume();
} catch (Exception e) {
log.error("消费过程中发生异常", e);
isDown.set(true); // 设置断开标志
}
});

// 启动断线重试机制
startRetryMechanism();
}

/**
* 启动断连重试机制
*/
public void startRetryMechanism() {
new Thread(() -> {
while (true) {
try {
Thread.sleep(30 * 1000L); // 每30秒检查一次连接状态

if (isDown.get()) {
log.warn("Redis连接断开,正在尝试重连...");
Thread.sleep(3000L); // 延迟3秒后重试

// 执行重连操作
consumer.reconnect();
isDown.set(false);
log.info("Redis连接恢复,重试成功");
}
} catch (InterruptedException e) {
log.error("重试线程被中断", e);
Thread.currentThread().interrupt();
}
}
}).start();
}
}

2.2 消费者类的重连方法

消费者类需要提供一个reconnect方法,用于在连接断开时重新初始化连接,恢复消费任务。

关键代码实现:

package com.example.redisQueue.consumer;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Redis消息队列消费者基类
*/
public abstract class MyAbstractRedisQueueConsumer<T> {
private Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired
private JedisPool jedisPool;

private String queueName;

public MyAbstractRedisQueueConsumer(String queueName) {
this.queueName = queueName;
}

/**
* 重连方法
* 用于连接断开后恢复连接
*/
public void reconnect() {
try (Jedis jedis = jedisPool.getResource()) {
if (jedis.isConnected()) {
logger.info("Redis连接已恢复,消费者已重新连接到队列");
} else {
logger.error("无法恢复Redis连接,正在尝试重新初始化");
// 这里可以进一步增加对Redis连接池或Jedis的重初始化逻辑
}
} catch (Exception e) {
logger.error("Redis重连失败", e);
}
}

/**
* 消费消息的抽象方法
*/
public abstract void doConsume(T message);

/**
* 消费消息
*/
public void consume() throws Exception {
try (Jedis jedis = jedisPool.getResource()) {
while (true) {
// 从Redis队列中拉取消息
List<String> message = jedis.blpop(0, queueName);
if (message != null && message.size() > 1) {
doConsume(message.get(1)); // 处理消息
}
}
} catch (Exception e) {
logger.error("消费过程中发生异常", e);
throw e;
}
}
}

2.3 线程池和重试机制的启动

通过Executors.newSingleThreadExecutor()我们为消费者启动了独立的线程来消费消息,并在消费者检测到连接断开时启动重试机制。

Executors.newSingleThreadExecutor().submit(() -> {
try {
consumer.consume(); // 启动消费
} catch (Exception e) {
log.error("消费过程中发生异常", e);
isDown.set(true); // 设置断开标志
}
});

2.4 重试机制的检查和延迟

在重试机制中,我们每30秒检查一次isDown标志,如果发现连接已经断开,就延迟3秒后尝试重新连接。

if (isDown.get()) {
log.warn("Redis连接断开,正在尝试重连...");
Thread.sleep(3000L); // 延迟3秒后重试
consumer.reconnect(); // 执行重连操作
isDown.set(false); // 重连后清除断开标志
log.info("Redis连接恢复,重试成功");
}

3. 技术方案对比:static AtomicBoolean与替代方案

虽然AtomicBoolean是一种简单且有效的实现方式,但我们也可以考虑其他方案,以下是几种常见的替代方案对比。

技术对比方案

技术方案优点缺点适用场景
static AtomicBoolean- 简单易用,线程安全<br>- 适用于单一线程或少数线程的全局状态控制- 随着系统复杂度增加,可能变得不够灵活<br>- 共享全局状态可能导致问题适合小型系统或少量线程的全局状态管理
CountDownLatch- 可以灵活地控制多个线程同步<br>- 适合等待多个线程完成操作- 一次性触发,无法重复使用适合一次性控制多线程完成后进行重试操作
ScheduledExecutorService- 定时检查,易于控制<br>- 支持定期任务- 适合周期性任务,不适用于即时响应需求适合定期检查重试的场景
EventBus(消息机制)- 松耦合,适合分布式环境<br>- 可以广播事件通知其他模块- 增加了系统复杂性,依赖额外的库适合分布式或微服务架构的消息广播机制
RetryTemplate- 灵活控制重试策略,如重试次数、间隔时间等<br>- 简化重试逻辑- 引入外部依赖<br>- 配置和使用稍显复杂适合需要复杂重试逻辑的场景

总结

  • static AtomicBoolean:适合小型系统,能够简单地控制全局状态。缺点是随着系统的复杂度增加,可能变得不够灵活,且状态管理可能存在问题。
  • CountDownLatch:适合等待多个线程完成任务后进行统一操作,但无法多次触发,因此不适合需要持续重连的场景。
  • ScheduledExecutorService:适合定期检查连接状态并进行重试的场景。简单易用,但适合周期性任务,不适合即时响应。
  • EventBus:适合分布式系统中的事件驱动场景,可以实现松耦

合的异步通知机制,但会引入额外的复杂性。

  • RetryTemplate:适合需要复杂重试策略的场景,能够灵活配置重试次数和间隔时间,但相对复杂。

根据实际需求选择合适的方案,可以有效地提升系统的稳定性和可用性。

浏览量:加载中...

Redis缓存常见异常及解决方案

· 阅读需 10 分钟
季冠臣
后端研发工程师

背景Redis是一种流行的内存数据结构存储,广泛用作数据库、缓存和消息代理。Redis 主要用来做缓存使用,在提高数据查询效率、保护数据库等方面起到了关键性的作用,很大程度上提高系统的性能。当然在使用过程中,也会出现一些异常情景,导致 Redis 失去缓存作用。

Redis常见应用

  1. 缓存:Redis通常用作缓存,以存储常用数据,以减少数据库查询次数。
  2. 会话管理:Redis用于存储Web应用程序中的用户会话数据。
  3. 实时分析:Redis用于处理和分析大量的实时数据。
  4. 队列管理:Redis用于实现消息队列,以异步分发和处理任务。
  5. 排行榜和计数:Redis用于维护游戏、社交媒体和其他应用中项目的实时排名和计数。
  6. 发布/订阅消息:Redis支持发布/订阅消息模式,并用于实时聊天和通知系统。

Redis常见缓存异常

缓存雪崩 缓存穿透 缓存击穿

1、缓存雪崩

1.1现象

  • 发生在缓存数据在同一时刻大量失效,比如缓存数据的过期时间设置为同一时刻。
  • 由于缓存中的数据失效,大量的请求直接请求后端数据,导致系统请求量瞬间增加,造成系统压力过大,从而宕机。

image-20230130104242186

1.2原因

  • 缓存服务不可用。
  • 缓存服务可用,但是大量 KEY 同时失效。

1.3解决方案

  1. 缓存服务不可用 redis 的部署方式主要有单机、主从、哨兵和 cluster 模式。
  • 单机 只有一台机器,所有数据都存在这台机器上,当机器出现异常时,redis 将失效,可能会导致 redis 缓存雪崩。
  • 主从 主从其实就是一台机器做主,一个或多个机器做从,从节点从主节点复制数据,可以实现读写分离,主节点做写,从节点做读。 优点:当某个从节点异常时,不影响使用。 缺点:当主节点异常时,服务将不可用。
  • 哨兵 哨兵模式也是一种主从,只不过增加了哨兵的功能,用于监控主节点的状态,当主节点宕机之后会进行投票在从节点中重新选出主节点。 优点:高可用,当主节点异常时,自动在从节点当中选择一个主节点。 缺点:只有一个主节点,当数据比较多时,主节点压力会很大。
  • cluster 模式 集群采用了多主多从,按照一定的规则进行分片,将数据分别存储,一定程度上解决了哨兵模式下单机存储有限的问题。 优点:高可用,配置了多主多从,可以使数据分区,去中心化,减小了单台机子的负担. 缺点:机器资源使用比较多,配置复杂。
  • 小结 从高可用得角度考虑,使用哨兵模式和 cluster 模式可以防止因为 redis 不可用导致的缓存雪崩问题。
  1. 大量 KEY 同时失效 可以通过设置永不失效、设置不同失效时间、使用二级缓存和定时更新缓存失效时间
  • 设置永不失效 如果所有的 key 都设置不失效,不就不会出现因为 KEY 失效导致的缓存雪崩问题了。redis 设置 key 永远有效的命令如下: PERSIST key 缺点:会导致 redis 的空间资源需求变大。
  • 设置随机失效时间 如果 key 的失效时间不相同,就不会在同一时刻失效,这样就不会出现大量访问数据库的情况。 redis 设置 key 有效时间命令如下: Expire key 示例代码如下,通过 RedisClient 实现
/**
* 随机设置小于30分钟的失效时间
* @param redisKey
* @param value
*/
private void setRandomTimeForReidsKey(String redisKey,String value){
//随机函数
Random rand = new Random();
//随机获取30分钟内(30*60)的随机数
int times = rand.nextInt(1800);
//设置缓存时间(缓存的key,缓存的值,失效时间:单位秒)
redisClient.setNxEx(redisKey,value,times);
}
  • 使用二级缓存 二级缓存是使用两组缓存,1 级缓存和 2 级缓存,同一个 Key 在两组缓存里都保存,但是他们的失效时间不同,这样 1 级缓存没有查到数据时,可以在二级缓存里查询,不会直接访问数据库。 示例代码如下:
public static void main(String[] args) {
CacheTest test = new CacheTest();
//从1级缓存中获取数据
String value = test.queryByOneCacheKey("key");
//如果1级缓存中没有数据,再二级缓存中查找
if(StringUtils.isBlank(value)){
value = test.queryBySecondCacheKey("key");
//如果二级缓存中没有,从数据库中查找
if(StringUtils.isBlank(value)){
value =test.getFromDb();
//如果数据库中也没有,就返回空
if(StringUtils.isBlank(value)){
System.out.println("数据不存在!");
}else{
//二级缓存中保存数据
test.secondCacheSave("key",value);
//一级缓存中保存数据
test.oneCacheSave("key",value);
System.out.println("数据库中返回数据!");
}
}else{
//一级缓存中保存数据
test.oneCacheSave("key",value);
System.out.println("二级缓存中返回数据!");
}
}else {
System.out.println("一级缓存中返回数据!");
}
}
  • 异步更新缓存时间

    每次访问缓存时,启动一个线程或者建立一个异步任务来,更新缓存时间。

    示例代码如下:

public class CacheRunnable implements Runnable {

private ClusterRedisClientAdapter redisClient;
/**
* 要更新的key
*/
public String key;

public CacheRunnable(String key){
this.key =key;
}

@Override
public void run() {
//更细缓存时间
redisClient.expire(this.getKey(),1800);
}

public String getKey() {
return key;
}

public void setKey(String key) {
this.key = key;
}
}
public static void main(String[] args) {
CacheTest test = new CacheTest();
//从缓存中获取数据
String value = test.getFromCache("key");
if(StringUtils.isBlank(value)){
//从数据库中获取数据
value = test.getFromDb("key");
//将数据放在缓存中
test.oneCacheSave("key",value);
//返回数据
System.out.println("返回数据");
}else{
//异步任务更新缓存
CacheRunnable runnable = new CacheRunnable("key");
runnable.run();
//返回数据
System.out.println("返回数据");
}
}

2、缓存穿透

2.1现象

  • 发生在请求的数据不存在于缓存,但是请求依然会到达后端服务器。
  • 由于请求的数据不存在,每次请求都会到达后端服务器,导致大量的无效请求,增加了后端的压力。

image-20230130110129433

2.2异常原因

  • 非法调用

2.3解决方案

  1. 对于不存在的数据设置一个空值,设置有效期
  2. 对请求数据做限流
  3. 哈希/布隆过滤器预处理
  4. 使用布尔值进行标识(例如:0或1)表示数据是否存在
  5. 双重检查机制(Cache Aside Pattern)

TODO 这里只写了第一种解决方案,剩下的以后补充...

  • 缓存空值 当缓存和数据库中都没有值时,可以在缓存中存放一个空值,这样就可以减少重复查询空值引起的系统压力增大,从而优化了缓存穿透问题。 示例代码如下:
private String queryMessager(String key){
//从缓存中获取数据
String message = getFromCache(key);
//如果缓存中没有 从数据库中查找
if(StringUtils.isBlank(message)){
message = getFromDb(key);
//如果数据库中也没有数据 就设置短时间的缓存
if(StringUtils.isBlank(message)){
//设置缓存时间(缓存的key,缓存的值,失效时间:单位秒)
redisClient.setNxEx(key,null,60);
}else{
redisClient.setNxEx(key,message,1800);
}
}
return message;
}

缺点:大量的空缓存导致资源的浪费,也有可能导致缓存和数据库中的数据不一致。

3、缓存击穿

3.1现象

当一个高流量的请求访问同一个缓存数据时,如果该数据的缓存失效,那么所有的请求都会直接请求数据库,造成数据库压力过大。

与缓存穿透区别:缓存击穿是因为某个数据被大量请求,导致其缓存失效,从而打击数据库;缓存穿透是因为请求了不存在的数据,导致所有请求都需要访问数据库。

image-20230130111239309

3.2异常原因

  • 热点 KEY 失效的同时,大量相同 KEY 请求同时访问。

3.3解决方案

  1. 加锁机制:对于特定的数据请求加锁,避免大量请求造成的缓存失效。
  2. 分布式锁:使用分布式锁来确保对特定数据的更新操作是原子性的。
  3. 后备机制:在缓存失效时使用后备机制,保证缓存请求能够得到响应。
  4. 限流机制:对请求数据进行限流,防止大量请求造成的压力。
  5. 对于过期的数据进行异步删除:避免因数据过期导致的缓存击穿。
  • 分布式锁

    使用分布式锁,同一时间只有 1 个请求可以访问到数据库,其他请求等待一段时间后,重复调用。

    示例代码如下

/**
* 根据key获取数据
* @param key
* @return
* @throws InterruptedException
*/
public String queryForMessage(String key) throws InterruptedException {
//初始化返回结果
String result = StringUtils.EMPTY;
//从缓存中获取数据
result = queryByOneCacheKey(key);
//如果缓存中有数据,直接返回
if(StringUtils.isNotBlank(result)){
return result;
}else{
//获取分布式锁
if(lockByBusiness(key)){
//从数据库中获取数据
result = getFromDb(key);
//如果数据库中有数据,就加在缓存中
if(StringUtils.isNotBlank(result)){
oneCacheSave(key,result);
}
}else {
//如果没有获取到分布式锁,睡眠一下,再接着查询数据
Thread.sleep(500);
return queryForMessage(key);
}
}
return result;
}

还可以预先设置热门数据,通过一些监控方法,及时收集热点数据,将数据预先保存在缓存中。

总结:

  • 缓存雪崩是因为缓存数据的失效导致的请求增加,造成系统压力过大,宕机。
  • 缓存穿透是因为请求的数据不存在,导致请求直接到达后端,增加了后端的压力。
  • 缓存击穿是因为某个数据被大量请求,导致其缓存失效,从而打击数据库。

其他常见的Redis错误及其解决方法:

  1. 连接被拒绝:当Redis没有运行或防火墙阻止了连接时会发生此错误。解决方案:检查Redis是否正在运行,如果防火墙阻止了连接,请为Redis添加一个例外。
  2. 达到最大客户端数:当太多客户端试图同时连接到Redis时会发生此错误。解决方案:增加Redis配置文件中的maxclients设置,或添加更多的Redis实例。
  3. 内存不足:当Redis的内存用尽时会发生此错误。解决方案:减小存储在Redis中的数据的大小,为服务器添加更多内存,或使用Redis的逐出策略自动删除最近最少使用的项目。
  4. 语法错误:当命令以错误的语法发送到Redis时会发生此错误。解决方案:检查命令的语法,然后重试。
  5. 未找到键:当尝试检索不存在的键的值时会发生此错误。解决方案:使用EXISTS命令在尝试检索其值之前检查键是否存在。
  6. 将数据加载到内存:当Redis从磁盘重新加载数据到内存时会发生此错误。解决方案:等待重新加载完成,或增加Redis配置文件中的保存间隔。
浏览量:加载中...

jedis实操redisAPI

· 阅读需 11 分钟
季冠臣
后端研发工程师

为了加深对原生redis命令的认识,我又手写了边通过jedis调用原生redis API 的实操,Redis命令十分丰富,包括的命令组有Cluster、Connection、Geo、Hashes、HyperLogLog、Keys、Lists、Pub/Sub、Scripting、Server、Sets、Sorted Sets、Strings、Transactions一共14个redis命令组两百多个redis命令,Redis中文命令大全。您可以通过下面的检索功能快速查找命令,已下是全部已知的redis命令列表。如果您有兴趣的话也可以查看我们的网站结构图,它以节点图的形式展示了所有redis命令。

Redis官网:https://redis.io/

一、使用Java操作Redis方式

1、Jedis

Jedis 是直连模式,在多个线程间共享一个 Jedis 实例时是线程不安全的,需要使用连接池

其API提供了比较全面的Redis命令的支持,相比于其他Redis 封装框架更加原生

Jedis中的方法调用是比较底层的暴露的Redis的API,Java方法基本和Redis的API保持着一致

使用阻塞的I/O,方法调用同步,程序流需要等到socket处理完I/O才能执行,不支持异步操作

2、Lettuce

高级Redis客户端,用于线程安全同步,异步响应

基于Netty的的事件驱动,可以在多个线程间并发访问, 通过异步的方式可以更好的利用系统资源

我就用Jedis操作了.....

二、亲笔演练Redis五大常用命令组

1、使用连接池

package space.jachen.jedis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
*
* TODO: 测试jedis与redis建立连接 使用线程池
*
* @author JaChen
* @date 2022/12/10 15:37
*/
public class Test2 {


public static void main(String[] args) {

// 建立连接池配置信息
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(200);
poolConfig.setMaxIdle(32);
poolConfig.setMaxWaitMillis(10*100);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setTestOnBorrow(true);

// 创建一个连接池
JedisPool pool = new JedisPool(poolConfig, "192.168.253.128", 6379);

// 从连接池获取一个现成的连接
Jedis jedis = pool.getResource();
System.out.println("jedis = " + jedis);

// 不使用的时候放回连接池
jedis.close();


}


}

2、操作String

package space.jachen.jedis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
*
* @author JaChen
* @date 2022/12/10 15:41
*/
public class TestString {


public static void main(String[] args) {

// 1、建立连接池配置信息
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(200);
poolConfig.setMaxIdle(32);
poolConfig.setMaxWaitMillis(10*100);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setTestOnBorrow(true);

// 2、创建一个连接池
JedisPool pool = new JedisPool(poolConfig, "192.168.253.128", 6379);

// 3、从连接池获取一个现成的连接
Jedis jedis = pool.getResource();
System.out.println("jedis = " + jedis);

// 4、操作String
System.out.println("===============String==============");
// TODO:存储字符串的类型,key-value
// notice:
// 1、值的长度不能超过512 MB
// 2、key命名规范,不要过长,冒号分割,业务名:表名:ID

// TODO: 应用 :
// 1、验证码
// 2、计时器、发号器
// 3、重复订单提交令牌
// 4、热点商品卡片(序列化对象存储)
// 5、分布式锁
String setName = jedis.set("name", "jachen");
System.out.println("setName = " + setName);
System.out.println(jedis.get("name"));

jedis.set("age","18");
System.out.println(jedis.get("age"));

// incr 对应 的值 进行加1操作,并返回新值。
jedis.incr("age");
System.out.println(jedis.get("age"));

// 将key对应的数字加increment。(如果key不存在,操作之前,key会被置为零。)
jedis.incrBy("age",10);
System.out.println(jedis.get("age"));

jedis.set("cart","887766");
System.out.println(jedis.get("cart"));

// 批量设置或获取多个key的值
jedis.mset("tell", "110", "weight", "65kg");
System.out.println(jedis.mget("tell","weight"));

jedis.setnx("sex","0");
System.out.println(jedis.get("sex"));

// 原子性操作
// 将key设置值为value,如果key不存在等同SET命令。
jedis.msetnx("k1","v1","k2","v2","sex","0");
System.out.println(jedis.mget("k1","k2","sex"));

// 当key存在时什么也不做, 是set if not exists的简写。
jedis.msetnx("k1","v1","k2","v2","sex","1");
System.out.println(jedis.mget("k1","k2","sex"));

// 设置key对应字符串value,
// 并且设置key在给定的seconds时间之后超时过期,原子操作
jedis.setex("car",2,"887766");
System.out.println(jedis.get("car"));
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(jedis.get("car"));

// 5、不使用的时候放回连接池
jedis.close();

}
}

运行结果:

image-20221210231902515

3、操作List

package space.jachen.jedis;

import redis.clients.jedis.Jedis;

import java.util.List;

/**
* @author JaChen
* @date 2022/12/10 21:05
*/
public class TestList {

public static void main(String[] args) {


// 1、获取一个Jedis连接
Jedis jedis = new Jedis("192.168.253.128",6379);

// 2、操作List
// TODO: 字符串列表,按照插入顺序排序
// 存储结构:双向链表,插入删除时间复杂度O(1)快,查找为O(n)慢
// notice:
// 1、通常添加一个元素到列表的头部(左边)或者尾部(右边)
// 2、存储的都是string字符串类型
// 3、支持分页操作,高并发项目中,第一页数据都是来源list,第二页和更多信息则是通过数据库加载
// 4、一个列表最多可以包含 232 - 1 个元素 (4294967295, 每个列表不超过40亿个元素)

// TODO: 应用:
// 1、简单队列
// 2、最新评论列表
// 3、非实时排行榜:定时计算榜单,如手机日销榜单
System.out.println("===============List===============");

// ① 将一个或多个值插入到列表头部 key value1 [value2]
jedis.lpush("phone","mate5","iphone14","iphone11","iphone5s");
System.out.println(jedis.llen("phone"));
System.out.println(jedis.lrange("phone",0,-1));

// ② 移除并获取列表最后一个元素
String rpop = jedis.rpop("phone");
System.out.println("rpop = " + rpop);

// ③ 在key对应的list的尾部添加一个元素
jedis.rpush("netbook","DELL","Mac","外星人");
System.out.println(jedis.lrange("netbook",0,-1));

// ④ 从key对应的list的尾部删除一个元素,并返回该元素
System.out.println(jedis.lpop("netbook"));

// ⑤ 移出并获取列表的最后一个元素,
// 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
// BRPOP LIST1 LIST2 .. LISTN TIMEOUT
jedis.del("netbook","iphone");
System.out.println(jedis.lrange("iphone",0,-1));
jedis.rpush("iphone","a","b","c");
List<String> list = jedis.brpop("iphone", "netbook", "1");
for (String s : list) {
System.out.println(s);
}

// ⑥ 移除元素,可以指定移除个数
jedis.lrem("iphone",1,"a");
System.out.println(jedis.lrange("iphone",0,-1));

// 3、关闭连接
jedis.close();

}

}

运行结果:

image-20221210232207004

4、操作Set

package space.jachen.jedis;

import redis.clients.jedis.Jedis;

import java.util.List;
import java.util.Set;

/**
* @author JaChen
* @date 2022/12/10 21:40
*/
public class TestSet {


public static void main(String[] args) {

// 1、获取一个Jedis连接
Jedis jedis = new Jedis("192.168.253.128",6379);

// TODO: 将一个或多个成员元素加入到集合中,已经存在于集合的成员元素将被忽略
// notice: 集合是通过哈希表实现的

// TODO: 应用:
// 1、去重
// 2、社交应用关注、粉丝、共同好友
// 3、统计网站的PV、UV、IP
// 4、大数据里面的用户画像标签集合
// 2、操作Set
System.out.println("===============Set==============");

// ① 添加一个或多个指定的member元素到集合的 key中.
// 指定的一个或者多个元素member 如果已经在集合key中存在则忽略
jedis.sadd("user1","u1","beijing","18","iphone11");
jedis.sadd("user2","u2","shanghai","19","iphone12");
jedis.sadd("user3","u3","beijing","19","iphone13");

// ② 返回集合存储的key的基数 (集合元素的数量).
System.out.println(jedis.scard("user1"));
System.out.println(jedis.scard("user2"));
System.out.println(jedis.scard("user3"));

// ③ 返回的集合元素是第一个key的集合与后面所有key的集合的差集
System.out.println(jedis.sdiff("user1", "user2", "user3"));

// ④ 返回指定所有的集合的成员的交集.
System.out.println(jedis.sinter("user1", "user2"));

// ⑤ 返回成员 member 是否是存储的集合 key的成员.
System.out.println(jedis.sismember("user1","beijing"));

// ⑥ 在key集合中移除指定的元素. 如果指定的元素不是key集合中的元素则忽略
jedis.srem("user1","iphone11");
System.out.println(jedis.scard("user1"));

// ⑦ 返回给定的多个集合的并集中的所有成员.
Set<String> sunion = jedis.sunion("user1", "user2", "user3");
for (String s : sunion) {
System.out.print(s + " ");
}
System.out.println();

// ⑧ 返回key集合所有的元素.
Set<String> user1 = jedis.smembers("user1");
for (String s : user1) {
System.out.print(s + " ");
}

// key的存活时间
System.out.println("\njedis.ttl(\"user1\") = " + jedis.ttl("user1"));

// 3、关闭连接
jedis.close();

}
}

运行结果:

image-20221210232225594

5、操作Hash

package space.jachen.jedis;

import redis.clients.jedis.Jedis;

import java.util.HashMap;

/**
* @author JaChen
* @date 2022/12/10 22:03
*/
public class TestHash {

public static void main(String[] args) {


// 1、获取一个Jedis连接
Jedis jedis = new Jedis("192.168.253.128",6379);


// 2、Hash操作
// TODO:string类型的field和value的映射表,hash特别适合用于存储对象
// notice:每个 hash 可以存储 232 - 1 键值对(40多亿)

// TODO:应用:
// 1、购物车
// 2、用户个人信息
// 3、商品详情
System.out.println("===============Hash==============");

// ① 设置 key 指定的哈希集中指定字段的值
jedis.hset("class","redis","80");
System.out.println(jedis.hget("class","redis"));

HashMap<String, String> hashMap = new HashMap<>();
hashMap.put("java","100");
hashMap.put("c++","90");
hashMap.put("c","96");
jedis.hmset("class",hashMap);
// ② 返回 key 指定的哈希集中所有的字段和值
System.out.println(jedis.hgetAll("class"));

// ③ 从 key 指定的哈希集中移除指定的域
jedis.hdel("class","c","c++");
System.out.println(jedis.hgetAll("class"));
// ④ 返回hash里面field是否存在
System.out.println(jedis.hexists("class","c"));

// ⑤ 增加 key 指定的哈希集中指定字段的数值, 如果是-1 则是递减
// HINCRBY key field increment
jedis.hincrBy("class","java",-5);
System.out.println(jedis.hgetAll("class"));
jedis.hincrBy("class","java",-5);
System.out.println(jedis.hgetAll("class"));

// 3、关闭Jedis连接
jedis.close();

}

}

运行结果:

image-20221210232256172

6、操作SortedSet

package space.jachen.jedis;

import redis.clients.jedis.Jedis;

import java.util.HashMap;

/**
* @author JaChen
* @date 2022/12/10 22:36
*/
public class TestSortedSet {

public static void main(String[] args) {

// 1、获取一个Jedis连接
Jedis jedis = new Jedis("192.168.253.128",6379);


// 2、操作SortedSet 可叫zSet
// TODO:
// 1、有序集合可以看做是在Set集合的的基础上为集合中的每个元素维护了一个顺序值: score,
// 它允许集合中的元素可以按照score进行排序
// 2、如果某个成员已经是有序集的成员,
// 那么更新这个成员的分数值,分数值可以是整数值或双精度浮点数。
// 3、用于将一个或多个成员元素及其分数值加入到有序集当中
// notice:
// 1、底层使用到了Ziplist压缩列表和“跳跃表”两种存储结构
// 2、如果重复添加相同的数据,score值将被反复覆盖,保留最后一次修改的结果

// TODO:
// 应用:
// 1、实时排行榜:商品热销榜、体育类应用热门球队、积分榜
// 2、优先级任务、队列
// 3、朋友圈 文章点赞-取消,逻辑:用户只能点赞或取消,
// 统计一篇文章被点赞了多少次,可以直接取里面有多少个成员
System.out.println("===============SortedSet==============");

// ① 向有序集合添加一个或多个成员,或者更新已存在成员的分数
jedis.zadd("goods",100,"iPhone14");

// ② 获取有序集合的成员数
System.out.println(jedis.zcard("goods"));

// ③ 有序集合中对指定成员的分数加上增量 increment
jedis.zincrby("goods",10,"iPhone14");
System.out.println(jedis.zscore("goods","iPhone14"));
jedis.zincrby("goods",10,"iPhone14");
System.out.println(jedis.zscore("goods","iPhone14"));

HashMap<String, Double> hashMap = new HashMap<>();
hashMap.put("meta8",60D);
hashMap.put("iPhone12",101D);
hashMap.put("iPhone6s",30D);
jedis.zadd("goods",hashMap);
// ④ 计算在有序集合中指定区间分数的成员数
System.out.println(jedis.zcount("goods",10,150));
// ⑤ 通过索引区间返回有序集合指定区间内的成员, 成员的位置按分数值递增(从小到大)来排序
System.out.println(jedis.zrange("goods",0,-1));
// 根据score正序输出
System.out.println(jedis.zrangeByScore("goods",0D,100));
// ⑥ 通过索引区间返回有序集合指定区间内的成员, 成员的位置按分数值递增(从大到小)来排序
System.out.println(jedis.zrevrange("goods",0,-1));
// ⑦ 返回有序集合中指定成员的排名,有序集成员按分数值递减(从大到小)排序
System.out.println(jedis.zrevrank("goods","meta8"));
// ⑧ 返回有序集key中成员member的排名。其中有序集成员按score值递增(从小到大)顺序排列
System.out.println(jedis.zrank("goods","meta8"));


// 3、 关闭jedis连接
jedis.close();


}
}

运行结果:

image-20221210232340442

总结

除了要了解这些数据结构的常用场景之前,还有继续加深对他们底层实现的了解,比如SortedSet底层为什么要使用跳跃表呢?为什么使用红黑树等等。

浏览量:加载中...

减少切换上下文开销

· 阅读需 2 分钟
季冠臣
后端研发工程师

我们怎么能减少切换上下文带来的开销呢?

cpu为线程分配时间片,时间片非常短(毫秒级别),cpu不停的切换线程执行,在切换前会保存上一个任务的状态,以便下次切换回这个任务时,可以再加载这个任务的状态,让我们感觉是多个程序同时运行的。

上下文的频繁切换,会带来一定的性能开销

那么如何减少上下文切换的开销

1、无锁并发编程 无锁并发编程。多线程竞争锁时,会引起上下文切换,所以多线程处理数据时,可以用一些办法来避免使用锁,如将数据的ID按照Hash算法取模分段,不同的线程2处理不同段的数据

2、CAS Java的Atomic包使用CAS算法来更新数据,而不需要加锁。使用最少线程

3、使用最少线程 避免创建不需要的线程,比如任务很少,但是创建了很多线程来处理,这样会造成大量线程都处于等待状态

4、协程 在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换。

浏览量:加载中...

RDB和AOF持久化小结

· 阅读需 4 分钟
季冠臣
后端研发工程师

背景:简单总结了一些Redis的两种持久化方式 RDB和AOF

Redis RDB持久化流程

RDB其实就是把数据以快照的形式保存在磁盘上,也是默认的持久化方式。这种方式是将内存中的数据以快照的方式写入到二进制文件中,默认文件名dump.rdb

RDB 的触发条件有手动触发和自动触发两种:

手动触发: Save命令: 会同步阻塞redis服务器进程 直到rdb文件创建完毕为止 在服务器阻塞期间不能处理命令请求(基本已经弃用了) Bgsave命令: 会在主线程fork一个子进程 由子进程去创建一份rdb文件 临时存储数据 父进程继续执行其他指令。

自动触发: 1、Save m n: 在配置文件中设置 save m n 指定当m秒内发生n次变化时 会发生bgsave。 2、其他自动触发机制 在主从复制场景下 从节点执行全量操作 则主节点会执行bgsave 执行shutdown命令 会自动执行rdb持久化。

优势:

  • RDB文件紧凑 全量备份 非常时候适合用于备份和灾难恢复

  • 生产RDB文件的时候 redis主进程会fork()一个子进程处理所有工作,主进程不需要进行任何磁盘IO操作

  • RDB在恢复大数据集时的速度比ADF的恢复速度快很多

劣势:

  • 当进行快照持久化时,会开启一个子进程专门负责快照持久化 子进程会拥有父进程的内存数据 父进程修改内存 子进程无影响 所以在快照持久化期间修改的数据不会被保存 可能丢失数据。

Redis AOF持久化流程

AOF的工作机制很简单,redis会将每个收到的写的命令通过write函数追加到文件中 也就是我们所说的日志记录

持久化原理:

每当有一个写命令过来 就直接保存在我们的AOF文件中 文件重写原理: AOF持久化的文件会越来越大 ,为了压缩AOF文件的持久化文件。Redis提供了bgrewriteaof命令 将内存中的数据以命令的方式保存在临时文件中,同时会frok出一条新进程来将文件重写。重写AOF文件的操作,并没有读取旧的AOF文件,而是将整个内存的数据库内容用命令的方式重写了个新的AOF文件,于快照类似。

三种触发机制:

每修改同步always:同步持久化 每次发生数据变更会被立即记录到磁盘 性能较差 但数据完整性比较好。 每秒同步everysec:异步操作 每秒记录 如果一秒内宕dang机,有数据丢失。 no:操作系统控制的写回 性能好 宕机时丢失数据较多

优势:

  • AOF可以更好的保护数据不丢失 一般会每隔1秒 通过一个后台线程执行一次fsync操作 最多丢失1秒数据

  • AOF日志文件即使过大的时候 出现后台重写操作 也不会影响客户端的读写

  • 三种写回策略体现了一个重要的原则 trade-off 取舍 ,指在性能和可靠性保证之间做出取舍 即用空间换时间 体现出了redis的高性能。

劣势:

  • 对于同一份数据来说 AOF日志文件通常比RDB数据快照文件更大

  • AOF开启后 支持QPS会比EDB支持的写QPS低 因为AOF一般会配置fsync一次日志文件 每秒一次fsync 性能也很高

  • 以前AOF发生过bug 就是AOF记录的日志 进行数据恢复的时候 没有恢复一模一样的数据处理。


总结

  1. Redis 默认开启RDB持久化方式,在指定的时间间隔内,执行指定次数的写操作,则将内存中的数据写入到磁盘中。
  2. RDB 持久化适合大规模的数据恢复但它的数据一致性和完整性较差。
  3. Redis 需要手动开启AOF持久化方式,默认是每秒将写操作日志追加到AOF文件中。
  4. AOF 的数据完整性比RDB高,但记录内容多了,会影响数据恢复的效率。
  5. Redis 针对 AOF文件大的问题,提供重写的瘦身机制。
  6. 若只打算用Redis 做缓存,可以关闭持久化。
  7. 若打算使用Redis 的持久化。建议RDB和AOF都开启。其实RDB更适合做数据的备份,留一后手。AOF出问题了,还有RDB。
浏览量:加载中...