redis开门之批量插入pipeLine

Redis开门之批量插入Pipeline

下发数据同步到Redis中,数据少的话几千条,多则达百万级。其中一个场景是把下发的数据同步到Redis中,数据同步完成后,把数据写入到文件中,下发给客户,客户调用。某天……

产品经理:小A,我发现我们这个数据整体下发的流程耗时有点长啊…从拉取数据到处理下发将近一个小时的时间,每天处理的XX数量也不是很多,看下是哪块耗时比较久,优化下

小A:嗯…我们现在的处理逻辑是所有的任务完成后 才会统一进行同步Redis的操作,而且目前的代码并没有批量处理数据到Redis,而是直接一条一条的处理,就很慢….

产品经理:一条一条?百万的数据 那大概处理多长时间啊?优化下吧…

小A:嗯,我看下怎么处理吧

看了之前同步Redis代码的逻辑(之前不是小A写的),异步处理、线程池、分批次、应有尽有但就是很慢。给出小A 优化前的大概框架。

try {
            redisTemplate.executePipelined(new SessionCallback<Object>() {
                @Override
                public <K, V> Object execute(RedisOperations<K, V> redisOperations) throws DataAccessException {
                    for (JSONObject redisSynchro : json) {
                        // 同步数据到redis
                        redisTemplate.opsForList().rightPush("XXX")
                        // 设置过期时间
                        redisTemplate.expire()
                    }
                    return null;
                }
            });
        } catch (Exception e) {
            log.info("同步redis发生意外异常 :" + e.getMessage());
        } finally {
            countDownLatch.countDown();
        }

小A查了下Redis批量处理数据的方法 网上给出的博客就是pipeline操作啊,代码里用的就是executePipelined 执行管道操作。嗯… 不对再查查。executePipelined的执行原理是什么?

  • 管道(Pipeline) 是 Redis 提供的一种优化手段,允许客户端将多个命令一次性发送到 Redis 服务器,而不是每次发送一个命令并等待响应。这样可以减少网络延迟,提高操作效率。

  • executePipelined() 方法的本意是将多个 Redis 操作封装为一个管道批量执行,返回所有操作的结果。

  • 客户端批量发送命令 → 服务器批量执行命令 → 客户端一次性接收所有结果。

图片[1]-redis开门之批量插入pipeLine-牛翰网

为什么第一次的执行没有通过管道执行呢?

封装层次过高: executePipelined 中使用了 redisTemplate 的操作方法,而不是直接通过 RedisConnection 来执行 Redis 的底层命令。

分散的命令: 每个 rightPush和 expire 方法都各自独立调用,即使这些操作在同一个 executePipelined 方法中,也无法通过 Redis 的原生管道机制优化。

所以如果要想使用Redis的管道操作,应该避免使用高层次封装的redisTemplat。于是乎,小A改变了上面的执行方式

redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
    StringRedisSerializer serializer = new StringRedisSerializer();
    for (ResultInfo result : tmps) {
        // 组装数据 key/value
        // 检查元素是否已存在
        if (检查条件) {
            // 如果不存在,则插入
            connection.rPush(serializer.serialize(key), serializer.serialize(value));
        }
        // 设置过期时间
        connection.expire(serializer.serialize(key), 30 * 24 * 3600L);   
    }
    return null;
});

使用了 StringRedisSerializer 对键和值进行序列化。序列化方式直接处理 Redis 底层连接的操作,通常适用于高性能的场景。

经过实测13W级数据插入耗时3秒,9W数据耗时1秒。

来源链接:https://www.cnblogs.com/kuangsun125/p/18586651

© 版权声明
THE END
支持一下吧
点赞9 分享
评论 抢沙发
头像
请文明发言!
提交
头像

昵称

取消
昵称表情代码

    暂无评论内容