侧边栏壁纸
博主头像
ProSayJ 博主等级

Talk is cheap. Show me the code.

  • 累计撰写 72 篇文章
  • 累计创建 24 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

扫盲Redisson的watchdog机制

YangJian
2025-07-18 / 0 评论 / 0 点赞 / 5 阅读 / 0 字

为了避免Redis实现的分布式锁超时,Redisson中引入了watch dog的机制,他可以帮助我们在Redisson实例被关闭前,不断的延长锁的有效期。

Redisson 的 Watchdog(看门狗)机制,是 Redisson 分布式锁自动续期的重要保障机制,主要用于防止持锁线程因业务执行时间过长而提前释放锁,导致并发安全问题。

  • 自动续租:当一个Redisson客户端实例获取到一个分布式锁时,如果没有指定锁的超时时间,Watchdog会基于Netty的时间轮启动一个后台任务,定期向Redis发送命令,重新设置锁的过期时间,通常是锁的租约时间的1/3。这确保了即使客户端处理时间较长,所持有的锁也不会过期。

  • 续期时长:默认情况下,每10s钟做一次续期,续期时长是30s。

  • 停止续期:当锁被释放或者客户端实例被关闭时,Watchdog会自动停止对应锁的续租任务。


🔧 背景问题

在 Redisson 中获取分布式锁时,你可以设置锁的 自动释放时间(即 leaseTime),比如:

RLock lock = redissonClient.getLock("myLock");
lock.lock(10, TimeUnit.SECONDS); // 10秒后自动释放

如果业务执行超过了这 10 秒,锁就会被 Redis 自动释放,可能会造成 锁还没用完就被别的线程抢走,引发并发问题。


✅ Watchdog 机制的作用

Redisson 提供了一种 默认锁续期机制(Watchdog),用于解决上述问题:

  • 当你调用 lock()不传入 leaseTime 时,Redisson 会默认使用 Watchdog 机制;

  • 默认 锁的超时时间是 30 秒

  • Watchdog 会每隔 10 秒自动检查并续期锁,防止锁过期;

  • 一旦线程业务执行完毕并主动 unlock(),则 Watchdog 停止。

✅ 示例

// 没有指定 leaseTime,自动启用 Watchdog
lock.lock();
// Redisson 会设置默认过期时间 30 秒,然后每隔 10 秒续一次期

⚙️ Watchdog 的续期机制原理

简单流程如下:

  1. 获取锁时,Redisson 设置锁的有效期为 30 秒;

  2. 后台开启一个 watchdog 线程,每 10 秒对该锁进行续期操作;

  3. 只要当前线程还持有锁,watchdog 就会持续续期;

  4. 一旦线程执行完毕并调用 unlock(),watchdog 也会停止续期。


⚠️ 注意事项

项目

Watchdog 会自动处理

说明

指定了 leaseTime

❌ 不会自动续期

会在你指定时间后自动释放,不管业务是否完成

未指定 leaseTime

✅ 自动续期

Watchdog 启动,每 10 秒续一次期

程序异常未调用 unlock()

✅ 最终会释放

Watchdog 会检测线程是否存活,线程挂掉后锁会超时释放


🧪 Redisson 配置 watchdog 时间

可通过配置修改 watchdog 默认续期时间:

lockWatchdogTimeout: 30000 # 默认30秒(单位毫秒)

🔄 总结一句话

Redisson 的 Watchdog 是一种自动锁续期机制,用于保证当业务执行时间较长时,锁不会被自动释放而被其它线程抢占,确保分布式锁的持有安全。

如需禁用 Watchdog,务必设置 leaseTime 并自己控制锁释放时机。


续期 demo 测试

public static RedissonClient createClient() {
        Config config = new Config();
        config.useClusterServers()
                .addNodeAddress(
                        "redis://172.16.55.26:6379",
                        "redis://172.16.55.26:6380",
                        "redis://172.16.55.26:6381",

                        "redis://172.16.55.27:6379",
                        "redis://172.16.55.27:6380",
                        "redis://172.16.55.27:6381",

                        "redis://172.16.55.28:6381",
                        "redis://172.16.55.28:6381",
                        "redis://172.16.55.28:6381"

                )
                .setScanInterval(2000) // 集群状态扫描间隔(ms)
                .setPassword("123456"); // 如果有密码

        return Redisson.create(config);
    }

    public static void main(String[] args) throws InterruptedException {
        RedissonClient redisson = createClient();

        // 创建一个锁对象,参数是锁的名字(任意)
        RLock lock = redisson.getLock("my-distributed-lock");

        try {
            // 尝试获取锁,最多等待5秒,锁自动释放时间为50秒
            // boolean locked = lock.tryLock(5, 50, TimeUnit.SECONDS);

            // 没有指定 leaseTime,自动启用 Watchdog
            // Redisson 会设置默认过期时间 30 秒,然后每隔 10 秒续一次期
            boolean locked = lock.tryLock();
            if (locked) {
                System.out.println("🔐 成功获得锁,执行业务逻辑...");
                Thread.sleep(60_000); // 模拟业务处理
            } else {
                System.out.println("⚠️ 获取锁失败!");
            }
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
                System.out.println("🔓 锁已释放");
            }
        }

        redisson.shutdown();
    }

通过 redis-cli 或者控制台查看“my-distributed-lock” 的 TTL 时间 会以 10s 为周期自动续期, 默认续期时间是30s

实现原理

那么,它是如何实现的呢?
在Redisson中,watch dog的主要实现在
scheduleExpirationRenewal方法中:

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

//定时任务执行续期
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    Timeout task = getServiceManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock {} expiration", getRawName(), e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                if (res) {
                    // reschedule itself
                    renewExpiration();
                } else {
                    cancelExpirationRenewal(null);
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}


//使用LUA脚本,进行续期
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

可以看到,上面的代码的主要逻辑就是用了一个TimerTask来实现了一个定时任务,设置了internalLockLeaseTime / 3的时长进行一次锁续期。默认的超时时长是30s,那么他会每10s进行一次续期,通过LUA脚本进行续期,再续30s

不过,这个续期也不是无脑续,他也是有条件的,其中

ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());这个值得我们关注,他会从EXPIRATION_RENEWAL_MAP中尝试获取一个KV对,如果查不到,就不续期了。

EXPIRATION_RENEWAL_MAP这个东西,会在unlock的时候操作的,对他进行remove,所以一个锁如果被解了,那么就不会再继续续期了:

@Override
public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

@Override
public RFuture<Void> unlockAsync(long threadId) {
    return getServiceManager().execute(() -> unlockAsync0(threadId));
}

private RFuture<Void> unlockAsync0(long threadId) {
    CompletionStage<Boolean> future = unlockInnerAsync(threadId);
    CompletionStage<Void> f = future.handle((opStatus, e) -> {
        cancelExpirationRenewal(threadId);

        if (e != null) {
            if (e instanceof CompletionException) {
                throw (CompletionException) e;
            }
            throw new CompletionException(e);
        }
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            throw new CompletionException(cause);
        }

        return null;
    });

    return new CompletableFutureWrapper<>(f);
}

protected void cancelExpirationRenewal(Long threadId) {
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
        return;
    }
    
    if (threadId != null) {
        task.removeThreadId(threadId);
    }

    if (threadId == null || task.hasNoThreads()) {
        Timeout timeout = task.getTimeout();
        if (timeout != null) {
            timeout.cancel();
        }
        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    }
}

以上代码,第4行->16行->22行->57行。就是一次unlock过程中,对EXPIRATION_RENEWAL_MAP进行移除,进而取消下一次锁续期的实现细节。

并且在unlockAsync方法中,不管unlockInnerAsync是否执行成功,还是抛了异常,都不影响cancelExpirationRenewal的执行,也可以理解为,只要unlock方法被调用了,即使解锁未成功,那么也可以停止下一次的锁续期。

什么情况会进行续期

当我们使用Redisson创建一个分布式锁的时候,并不是所有情况都会续期的,我们可以看下以下加锁过程的代码实现:

private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime > 0) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
    ttlRemainingFuture = new CompletableFutureWrapper<>(s);

    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

注意看第15-19行,只有当leaseTime <= 0的时候,Redisson才会进行续期,所以,当我们加锁时,如果指定了超时时间,那么是不会被续期的。

什么情况会停止续期

首先,就是我们上面讲过的那种,如果一个锁的unlock方法被调用了,那么就会停止续期。
那么,取消续期的核心代码如下:

protected void cancelExpirationRenewal(Long threadId) {
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
        return;
    }
    
    if (threadId != null) {
        task.removeThreadId(threadId);
    }

    if (threadId == null || task.hasNoThreads()) {
        Timeout timeout = task.getTimeout();
        if (timeout != null) {
            timeout.cancel();
        }
        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    }
}

主要就是通过EXPIRATION_RENEWAL_MAP.remove来做的。那么cancelExpirationRenewal还有下面一处调用:


protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        try {
            renewExpiration();
        } finally {
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

也就是说,在尝试开启续期的过程中,如果线程被中断了,那么就会取消续期动作了。

目前,Redisson是没有针对最大续期次数和最大续期时间的支持的。所以,正常情况下,如果没有解锁,是会一直续期下去的。

但是需要注意的是,Redisson的续期是Netty的时间轮(TimerTask、Timeout、Timer)的,并且操作都是基于JVM的,所以,当应用宕机、下线或者重启后,续期任务就没有了。这样也能在一定程度上避免机器挂了但是锁一直不释放导致的死锁问题。

watchdog一直续期,客户端挂了怎么办?

因为watch dog的续期,是没有次数和时长的限制的,也就是说,如果没有主动解锁,那么他就会一直续期下去。

那么,就会有人提出这样的问题:

  1. 一直续期,别人不是拿不到锁了吗?

  2. 一旦客户端挂了但是锁还没释放怎么办?

  3. 如果解锁失败了,怎么办?

一直续期,别人不是拿不到锁了吗?

上面这两个问题你可能不知道答案,那下面这几个你一定知道答案:
分布式锁的目的是什么?防止并发。
锁续期的条件是什么?还没解锁。
什么情况下会没解锁?任务没执行完。
那么,如果一个任务没执行完,我就一直给他续期,让他不断地延长锁时长,防止并发,有毛病吗?没有啊!
如果你就是不想一直续期,那你就自己指定一个超时时间就行了。就不要用他的续期机制就好了。

一旦客户端挂了但是锁还没释放怎么办?

如果,应用集群中的一台机器,拿到了分布式锁,但是在执行的过程中,他挂了,还没来得及把锁释放,那么会有问题么?

因为我们知道,锁的续期是Redisson实现的,而Redisson的后台任务是基于JVM运行的,也就是说,如果这台机器挂了,那么Redisson的后台任务也就没办法继续执行了。

那么他也就不会会再继续续期了,那么到了期限之后,锁就会自动解除了。这样就可以避免因为一个实例宕机导致分布式锁的不可用。

watchdog解锁失败,会不会导致一直续期下去?

不会的,因为在解锁过程中,不管是解锁失败了,还是解锁时抛了异常,都还是会把本地的续期任务停止,避免下次续期。

具体实现如下:

@Override
public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

这是redisson中解锁方法的入口,这里调用了unlockAsync方法,传入了当前线程的ID

@Override
public RFuture<Void> unlockAsync(long threadId) {
    return getServiceManager().execute(() -> unlockAsync0(threadId));
}

private RFuture<Void> unlockAsync0(long threadId) {
    CompletionStage<Boolean> future = unlockInnerAsync(threadId);
    CompletionStage<Void> f = future.handle((opStatus, e) -> {
        cancelExpirationRenewal(threadId);

        if (e != null) {
            if (e instanceof CompletionException) {
                throw (CompletionException) e;
            }
            throw new CompletionException(e);
        }
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            throw new CompletionException(cause);
        }

        return null;
    });

    return new CompletableFutureWrapper<>(f);
}

这里就是unlock的核心逻辑了。主要看两个关键步骤:

CompletionStage<Boolean> future = unlockInnerAsync(threadId);
CompletionStage<Void> f = future.handle((opStatus, e) -> {
});

第一步是执行解锁的动作,第二部是执行解锁后的操作。注意看,这里用到了一个CompletionStage,并且通过handle方法进行了后续的操作。

  • CompletionStage 是 Java 8 引入的一个接口,位于 java.util.concurrent 包中。它代表了一个异步操作的阶段,这个阶段在某个任务的计算完成时会执行。

  • CompletionStage 提供了一种非阻塞的方式来处理一系列的异步操作步骤。

  • 每一个操作步骤都可以以 CompletionStage 的形式表示,这些步骤可以串行执行,也可以并行执行或者应用某种组合。通过这种方式,CompletionStage 提供了强大的异步编程模型,允许开发者以链式调用的方式来组织复杂的异步逻辑。(他其实是CompletableFuture的父类


CompletionStage的handle 方法提供了一种机制来处理前一个阶段的结果或异常,无论该阶段是正常完成还是异常完成。他的方法签名如下:

<T> CompletionStage<T> handle(BiFunction<? super T, Throwable, ? extends T> fn);

handle 方法接收一个 BiFunction,这个函数有两个参数:计算的结果(如果计算成功完成)和抛出的异常(如果计算失败)。这使得 handle 方法可以在一个地方同时处理操作的成功和失败情况。

  • 如果前一个阶段成功完成,handle 方法中的函数将被调用,其中的异常参数(Throwable)将为 null,而结果参数将携带操作的结果。

  • 如果前一个阶段失败或抛出异常,handle 方法同样会被调用,但这次结果参数将为 null,而异常参数将携带相应的异常信息。

那么也就是说,CompletionStage 的 handle 方法允许你在前一个操作无论是成功完成、失败,还是抛出异常的情况下,都能够执行 handle 方法中定义的逻辑。

所以,不管上面的unlockInnerAsync过程中,解锁是否成功,是否因为网络原因等出现了异常,后续的代码都能正常执行。那后续的代码是什么呢?

CompletionStage<Void> f = future.handle((opStatus, e) -> {
    cancelExpirationRenewal(threadId);

    if (e != null) {
        if (e instanceof CompletionException) {
            throw (CompletionException) e;
        }
        throw new CompletionException(e);
    }
    if (opStatus == null) {
        IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                + id + " thread-id: " + threadId);
        throw new CompletionException(cause);
    }

    return null;
});

这段代码一上来就调用了cancelExpirationRenewal

protected void cancelExpirationRenewal(Long threadId) {
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
        return;
    }
    
    if (threadId != null) {
        task.removeThreadId(threadId);
    }

    if (threadId == null || task.hasNoThreads()) {
        Timeout timeout = task.getTimeout();
        if (timeout != null) {
            timeout.cancel();
        }
        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    }
}

逻辑挺简单的,就是从EXPIRATION_RENEWAL_MAP中把当前线程移除掉。

续期是需要依赖EXPIRATION_RENEWAL_MAP的,如果某个线程不在EXPIRATION_RENEWAL_MAP里面了,就不会再被续期了。

所以,如果解锁过程中失败了,redisson也能保证不会再被续期了。除非移除EXPIRATION_RENEWAL_MAP的这个动作也失败了,但是从本地的map中移除一个key失败的概率还是极低的。

0

评论区