Rails 源代码 100 天(7)

上上次我们说到,Rails 在创建 HasManyReflection 对象的时候把一个实例变量赋值为Mutex.new。今天,我们的目标就是探索一下Mutex,以及他在 Reflection 对象中的作用。

首先说,有些 web 服务器是基于线程的。线程的好处是,在一个进程中可以创建多个线程。要想探索 Mutex ,首先要了解线程和进程。为了讨论方便,我会用英文 Thread 和 Process 来表示这两个概念,因为在中文中,线程和进程这两个词容易看混。

多个 thread 中都可以执行自己的程序,比如在 Ruby 用线程来执行数组的插入:

1
2
3
4
5
6
arr = []
10.times do |i|
Thread.new do
arr << i
end
end

这和在一个 process 中执行的循环插入数组是不一样的:

1
2
3
4
arr = []
10.times do |i|
arr << i
end

前者得到的arr可能是:[4, 1, 0, 2, 5, 6, 7, 8, 9, 3], 而后者得到的结果一定是:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]。出现这种差异的原因在于,各个线程都在执行自己的arr << i,而且是哪个 Thread 拿到了执行权,哪个 Thread 就执行这个操作,因为不一定是哪个 Thread 先拿到执行权,所以会产生插入位置无法预测的现象。

从另一个角度讲,出现这个顺序不可预测的情况,也不能全怪1 Thread,因为谁让你非得去插入同一个数组呢?如果你想要的是类似于不同线程分别计算的计算过程,比如2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def calculate_sum(arr)
sleep(2)
sum = 0
arr.each do |item|
sum += item
end
sum
end

@items1 = [12, 34, 55]
@items2 = [45, 90, 2]
@items3 = [99, 22, 31]

threads = 3.times do |i|
Thread.new(i) do |i|
items = instance_variable_get("@items#{i}")
puts "items#{i} = #{calculate_sum(items)}"
end
end

计算结果就不会受 Thread 的执行顺序的影响。

然而,有些时候,如果我们即想用 Thread,又想处理一个可以被不同 Thread 操作的数据,怎么避免结果不可预测的情况呢?

这个时候,Mutex就可以派上用场了。

说到Mutex,我们先聊一个题外话: Mutex 的全称是 Mutual Exclusion,翻译成中文应该叫「相互独立」,这让我想到了在 Business Consulting 领域一个知名度很高的思考框架叫做:MECE,mutually exclusive, collectively exhaustive,翻译成中文叫做「相互独立,且穷尽」,意思大致是,把事情发展的可能情况都想到,但每种可能情况之间界限清晰,而不能给把实际上相同的情况起了两个不同的名字来定义。

这其中对我们理解 Mutex 有意义的一句话是:相互独立。Mutex 在线程管理中的作用是类似的,他让不同的 Thread 相互独立,而且,这里的「相互独立」指的是时间上不可重叠。

还有一个比较俏皮的解释,有兴趣的可以看看这个 Stack Overflow 上的回答

下面分别给出不使用 Mutext 的一段示例代码,new 出来的几个 thread 都操作了一个共同的 @sum 资源:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def calculate_sum(arr)
@sum = 0
m = Mutex.new
arr.each do |item|
Thread.new do
print item
tmp = @sum
sleep(rand(2))
@sum = tmp + item
end
end
end

arr = [1,2,3,4,5,6,7,8,9]
calculate_sum(arr)
sleep(3)
puts @sum

首先说,在倒数第二行,加上一行sleep(3),是为了确保所有的 threads 都执行完毕。如果不加这一行的话,可能程序已经执行到最后的puts @sum,但之前 new 出来的 thread 还没有执行完。

执行这段代码,打印出来的结果可能是这样的:

1
2
123956784
12

也就是说,某个 thread 拿到了最后一次执行@sum = tmp + item的机会,但由于他之前的 tmp 停留在一个留后的值上,导致最后的@sum 被赋值成了12。

通常人们把这种不同 thread 可能在同一时间操作同一个资源的的情况称为 race conditions.

接下来,我们给出 race conditions 问题的解决方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def calculate_sum(arr)
@sum = 0
m = Mutex.new
arr.each do |item|
Thread.new do
print '*'
m.lock
print item
tmp = @sum
sleep(rand(2))
@sum = tmp + item
print '!'
m.unlock
end
end
end

arr = [1,2,3,4,5,6,7,8,9]
calculate_sum(arr)
sleep(10)
puts @sum

通常,这段代码中倒数第二行的sleep(10)是为了保证每个 thread 都执行完毕。不过这次 sleep 的时间比上一次长,这是为什么呢?

当在某个线程中有 mutex 对象执行了m.lock,其他 thread 虽然可以继续执行,但是,如果在他们自己的执行过程中也遇到了m.lock,他们就得判断这个 mutex 是不是已经被锁住,如果是,就必须等在m.lock的门外,直到刚刚锁住 mutex 的那个线程执行m.unlock —— 解锁,其他等在门外的 thread 才有机会再去争抢 m.lock 的执行权。所以,下面下面这段代码在同一时间,只能有一个 thread 在执行:

1
2
3
4
5
print item
tmp = @sum
sleep(rand(2))
@sum = tmp + item
print '!'

这段代码就是 lockunlock 之间的代码,其中的 @sum资源在同一时间也就只能被单独一个 thread 操作,从而避免了竞争情况。

又因为这段代码同一时间只能被一个 thread 之前,其他 thread 都在等候,所以最终这个程序的执行时间就要算上这些等待时间,因此更长一点。

不过这里还有一个小问题,如何证明一个 thread 执行m.lock之后,其他 thread 在遇到同样的 m.lock 之前可以继续进行呢?看看我们上一大段代码的执行结果吧:

1
2
***3******!2!1!4!5!6!7!8!9!
45

Thread.new 之后,m.lock 之前特意加上了 print '*'。在程序打印3之后,打印!之前,mutex 肯定是处于锁住的状态的,但是在3 和它之后的!之间有那么多的 *,可以说明其他 thread 在执行到 m.lock 之前,依然可以在打印字符串,而不是停下来。

最后,除了 lockunlock 这对方法可以开关 mutex 的锁,ruby 中还提供了一个synchronize 方法,这样可以把 lockunlock 之前的代码作为代码块传入 synchronize:

1
2
3
4
5
6
7
m.synchronize do
print item
tmp = @sum
sleep(rand(2))
print '!'
@sum = tmp + item
end

而在 Rails 中,Reflection 对象在被 new 出来后,一个 Mutex 对象就被赋值给 @scope_lock 这个实例变量。当 reflection 对象执行 association_scope_cache 方法时,叫做会用到这个 mutex:

1
2
3
4
5
6
7
8
9
def association_scope_cache(conn, owner)
key = conn.prepared_statements
if polymorphic?
key = [key, owner._read_attribute(@foreign_type)]
end
@association_scope_cache[key] ||= @scope_lock.synchronize {
@association_scope_cache[key] ||= yield
}
end

具体这个 association_scope_cache 方法是在做什么,我们继续探索。

1. 其实没什么可怪的,不确定性是事实而已。
2. 代码参考自这里,有改动