Linux的死锁检测

死锁

死锁的四大原因就不多说了

  • 互斥
  • 不可剥夺
  • 占有并等待
  • 循环等待

死锁:就是多个进程(≥2)因为争夺资源而相互等待的一种现象,若无外力推动,将无法继续运行下去。

注意,只有在多进程或者多线程之间或者他们与中断之间相互通讯或者共享资源才有可能发生死锁,单线程或者进程之间没有联系的话,一般不会发生死锁。

锁的种类比较多,这里主要说自旋锁和信号量。两者的差别就在于前者获得不到资源时的动作是不断的资源(即忙转浪费 cpu 的 cycles)而后者则表现为睡眠等待。

死锁的基本情况

自旋锁

递归使用

在同一个进程或者线程中,申请自旋锁,但没有释放之前又再次申请,一定会产生死锁。

进程得到自旋锁后阻塞或睡眠

进程在获得自旋锁之后,调用 copy_from_user(), copy_to_user(), kmalloc() 等有可能引起阻塞的函数。

中断中没有关闭,申请未释放的自旋锁

虽然在中断中是可以使用自旋锁的,但是应该在进入中断的时候关闭中断,不然的话,当中断再次进入的的时候,中断处理函数会自旋等待自旋锁可以再次使用。

在进程中申请了自旋锁,但是在释放前进入中断处理函数,中断处理函数又申请同样的自旋锁,这将导致死锁。

中断与中断下半部共享资源

和中断与进程共享资源死锁出现的情况类似

中断下半部与进程共享资源

和中断与进程共享资源死锁出现的情况类似

自旋锁三种状态

自旋锁保持期间是抢占失效的(内核是不允许被抢占的)

单CPU且内核不可抢占

自旋锁的所有操作都是空。不会引起死锁,内核进程间不存在并发操作进程,进程与中断仍然可能共享数据,存在并发操作,此时内核自旋锁已经失去效果。

单CPU且内核可抢占

  • 在获得自旋锁的时候,禁止抢占直到释放锁为止。此时可能存在死锁的情况是参考自旋锁可能死锁的一般情况。

  • 禁止内核抢占并不代表不会进行内核调度,如果在获得自旋锁后阻塞或者主动调度,内核会调度其他进程运行,被调度的内核进程返回用户空间时,会进行用户抢占,此时调用的进程再次申请上次未释放的自旋锁时,会一直自旋。但是内核被禁止抢占,从而造成死锁。

  • 内核被禁止抢占,但此时中断并没有被禁止,内核进程可能因为中断申请自旋锁而死锁。

多CPU且内核可抢占

真正的SMP情况。

当获得自旋锁的时候,禁止内核抢占知道释放锁为止。

信号量

递归使用

同理,在同一个进程或线程中,申请了信号量,但没有释放之前又再次申请,进程会一直睡眠,这种情况一定死锁。

进程得到信号量后阻塞,睡眠

由于获取到信号量的进程阻塞或者随眠,其他在获取不到后信号量也会进入睡眠等待,这种情况可能造成死锁。

中断中申请信号量

由于信号量在获取不到自旋锁后会进入睡眠等待,中断处理函数不允许睡眠,如果睡眠,中断将无法返回。

中断下半部申请信号量

中断下半部允许睡眠,这种情况不会造成死锁。

俩个进程相互等待资源

进程 1 获得信号量 A,需要信号量 B,在进程 1 需要信号量 B 之前进程 2 获得信号量 B,需要信号量 A。进程 1、2 因相互等待资源而死锁。

Linux提供的检测死锁机制

D状态死锁检测

所谓 D 状态死锁:进程长时间(系统默认配置 120 秒)处于 TASK_UNINTERRUPTIBLE 睡眠状态,这种状态下进程不响应异步信号。如:进程与外设硬件的交互(如 read),通常使用这种状态来保证进程与设备的交互过程不被打断,否则设备可能处于不可控的状态。

对于这种死锁的检测 linux 提供的是 hungtask 机制,主要内容集中在 Hung_task.c 文件中。具体实现原理如下:

1.系统创建normal级别的khungtaskd内核线程,内核线程每120秒检查一次,检查的内容:

遍历所有的线程链表,发现 D 状态的任务,就判断自最近一次切换以来是否还有切换发生,如果有,则返回,如果没有则发生切换,把任务的所有调用栈等信息打印出来。

2.代码实现

首先,hung_task_init 创建一个名为 khungtaskd 的内核线程,内核线程的工作由 watchdog 来完成。

1
2
3
4
5
6
static int __init hung_task_init(void)
{
atomic_notifier_chain_register(&panic_notifier_list, &panic_block);
watchdog_task = kthread_run(watchdog, NULL, "khungtaskd");
return 0;
}

其次,我们看 watchdog 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static int watchdog(void *dummy)
{
// 将内核线程设置为 normal 级别
set_user_nice(current, 0);
for (; ;) {
// 设置 hungtask 的校验时间间隔,用户可以修改这个时间,默认为 120 秒
unsigned long timeout = sysctl_hung_task_timeout_secs;
while (schedule_timeout_interruptible(timeout_jiffies(timeout)))
timeout = sysctl_hung_task_timeout_secs;
// 核心的检查代码在下面的函数中实现。
check_hung_uninterruptible_tasks(timeout);
}
return 0;
}

最后,我们分析一下 hungtask 的核心实现 check_hung_uninterruptible_tasks,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static void check_hung_uninterruptible_tasks(unsigned long timeout)
{
int max_count = sysctl_hung_task_check_count;
int batch_count = HUNG_TASK_BATCHING;
struct task_struct *g, *t;
/*
* If the system crashed already then all bets are off,
* do not report extra hung tasks:
*/
// 判断系统是否已经 die、oops 或者 panic 了,若是系统已经 crash 了,就无需再做 hungtask 了。
if (test_taint(TAINT_DIE) || did_panic)
return;
rcu_read_lock();
// 检查进程的列表,寻找 D 状态的任务
do_each_thread(g, t) {
// 判断用户是否设置了检查进程的数量,若是已经达到用户设置的限制,就跳出循环。
if (!max_count--)
goto unlock;
// 判断是否到达批处理的个数,做这个批处理的目的就是因为整个检查是在关抢占的前提下进行的,可能进程列表的进程数很多,为了防止 hungtask 垄断 cpu,所以,做了一个批处理的限制,到达批处理的数量后,就放一下权,给其他的进程运行的机会。
if (!--batch_count) {
batch_count = HUNG_TASK_BATCHING;
rcu_lock_break(g, t);
/* Exit if t or g was unhashed during refresh. */
if (t->state == TASK_DEAD || g->state == TASK_DEAD)
goto unlock;
}
/* use "==" to skip the TASK_KILLABLE tasks waiting on NFS */
// 如果进程处于 D 状态,就开始把相关的信息显示出来了。
if (t->state == TASK_UNINTERRUPTIBLE)
check_hung_task(t, timeout);
} while_each_thread(g, t);
unlock:
rcu_read_unlock();
}
static void check_hung_task(struct task_struct *t, unsigned long timeout)
{
// 统计进程的切换次数 = 主动切换次数 + 被动切换次数
unsigned long switch_count = t->nvcsw + t->nivcsw;
/*
* Ensure the task is not frozen.
* Also, when a freshly created task is scheduled once, changes
* its state to TASK_UNINTERRUPTIBLE without having ever been
* switched out once, it musn
*/

R状态死锁检测

所谓 R 状态死锁:进程长时间(系统默认配置 60 秒)处于 TASK_RUNNING 状态垄断 cpu 而不发生切换,一般情况下是进程关抢占后长时候干活,有时候可能进程关抢占后处于死循环或者睡眠后,这样就造成系统异常。

对于这种死锁的检测机制 linux 提供的机制是 softlockup。主要集中在 softlockup.c 文件中。

1.系统创建一个 fifo 的进程,此进程周期性的清一下时间戳(per cpu),而系统的时钟中断中会被 softlockup 挂入一个钩子(softlockup_tick), 这个钩子就是每个时钟中断到来的时候都检查是否每 cpu 的时间戳被 touch 了,若在阀值 60 秒内都没有被 touch,系统就打印调试信息。

2.代码实现

首先,系统初始化的时候为每个 cpu 创建一个 watchdog 线程,这个线程是 fifo 的。具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static int __init spawn_softlockup_task(void)
{
void *cpu = (void *)(long)smp_processor_id();
int err;
// 可以通过启动参数禁止 softlockup
if (nosoftlockup)
return 0;
// 下面两个回调函数就是为每个 cpu 创建一个 watchdog 线程
err = cpu_callback(&cpu_nfb, CPU_UP_PREPARE, cpu);
if (err == NOTIFY_BAD) {
BUG();
return 1;
}
cpu_callback(&cpu_nfb, CPU_ONLINE, cpu);
register_cpu_notifier(&cpu_nfb);
atomic_notifier_chain_register(&panic_notifier_list, &panic_block);
return 0;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static int __cpuinit
cpu_callback(struct notifier_block *nfb, unsigned long action, void *hcpu)
{
int hotcpu = (unsigned long)hcpu;
struct task_struct *p;
switch (action) {
case CPU_UP_PREPARE:
case CPU_UP_PREPARE_FROZEN:
BUG_ON(per_cpu(softlockup_watchdog, hotcpu));
// 创建 watchdog 内核线程
p = kthread_create(watchdog, hcpu, "watchdog/%d", hotcpu);
if (IS_ERR(p)) {
printk(KERN_ERR "watchdog for %i failed\n", hotcpu);
return NOTIFY_BAD;
}
// 将时间戳清零
per_cpu(softlockup_touch_ts, hotcpu) = 0;
// 设置 watchdog
per_cpu(softlockup_watchdog, hotcpu) = p;
// 绑定 cpu
kthread_bind(p, hotcpu);
break;
case CPU_ONLINE:
case CPU_ONLINE_FROZEN:
// 唤醒 watchdog 这个内核线程
wake_up_process(per_cpu(softlockup_watchdog, hotcpu));
break;
......
return NOTIFY_OK;
}

其次,我们看一下内核线程 watchdog 的实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int watchdog(void *__bind_cpu)
{
// 将 watchdog 设置为 fifo
struct sched_param param = {.sched_priority = MAX_RT_PRIO-1};
sched_setscheduler(current, SCHED_FIFO, ?m);
// 清狗,即 touch 时间戳
/* initialize timestamp */
__touch_softlockup_watchdog();
// 当前进程可以被信号打断
set_current_state(TASK_INTERRUPTIBLE);
/*
* Run briefly once per second to reset the softlockup timestamp.
* If this gets delayed for more than 60 seconds then the
* debug-printout triggers in softlockup_tick().
*/
while (!kthread_should_stop()) {
// 核心实现就是 touch 时间戳,让出 cpu
__touch_softlockup_watchdog();
schedule();
if (kthread_should_stop())
break;
set_current_state(TASK_INTERRUPTIBLE);
}
__set_current_state(TASK_RUNNING);
return 0;
}

最后,softlockup 在时钟中断中挂上一个钩子 softlockup_tick,每个时钟中断都来检查 watchdog 这个 fifo 进程是否有 touch 过时间戳,若是 60 秒都没有 touch 过,就向系统上报异常信息了,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void softlockup_tick(void)
{
// 取得当前的 cpu
int this_cpu = smp_processor_id();
// 获得当前 cpu 的时间戳
unsigned long touch_ts = per_cpu(softlockup_touch_ts, this_cpu);
unsigned long print_ts;
// 获得进程当前的寄存器组
struct pt_regs *regs = get_irq_regs();
unsigned long now;
// 如果没有打开 softlockup,就将这个 cpu 对应的时间戳清零
/* Is detection switched off? */
if (!per_cpu(softlockup_watchdog, this_cpu) || softlockup_thresh <= 0) {
/* Be sure we don

总结一下就是,就是两条线并行工作:一条线是 fifo 级别的内核线程负责清时间戳,另一条线是时钟中断定期检查时间戳是否有被清过,若是到了阀值都没有被请过,则打印 softlockup 的信息。

长时间关中断检测

长时间关中断检测可以有几种实现机制,而利用 nmi watchdog 来检查这种长时间关中断情况,是比较简单的。其原理是需要软硬件配合,硬件通常提供一个计数器(可以递增也可以递减),当记数到某个值得时候,系统就硬件复位。而 nmi watchdog 就定期(小于这个计数到达系统复位的时间)的去清一下系统的计数,若是某个进程长时间关中断,则可能导致 nmi watchdog 得不到清,最终系统复位。