网上很多资料在描述Java内存模型的时候,都会介绍有一个主存,然后每个工作线程有自己的工作内存。数据在主存中会有一份,在工作内存中也有一份。工作内存和主存之间会有各种原子操作去进行同步。

下图来源于这篇Blog

但是由于Java版本的不断演变,内存模型也进行了改变。本文只讲述Java内存模型的一些特性,无论是新的内存模型还是旧的内存模型,在明白了这些特性以后,看起来也会更加清晰。

1. 原子性

  • 原子性是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就不会被其它线程干扰。

一般认为cpu的指令都是原子操作,但是我们写的代码就不一定是原子操作了。

比如说i++。这个操作不是原子操作,基本分为3个操作,读取i,进行+1,赋值给i。

假设有两个线程,当第一个线程读取i=1时,还没进行+1操作,切换到第二个线程,此时第二个线程也读取的是i=1。随后两个线程进行后续+1操作,再赋值回去以后,i不是3,而是2。显然数据出现了不一致性。

再比如在32位的JVM上面去读取64位的long型数值,也不是一个原子操作。当然32位JVM读取32位整数是一个原子操作。

2. 有序性

  • 在并发时,程序的执行可能就会出现乱序。

计算机在执行代码时,不一定会按照程序的顺序来执行。

class OrderExample { 
        int a = 0; 
        boolean flag = false; 
        public void writer() 
        { 
            a = 1; 
            flag = true; 
        } 
        public void reader() 
        { 
            if (flag) 
            { 
                int i = a +1;  
            }
        } 
    }

比如上述代码,两个方法分别被两个线程调用。按照常理,写线程应该先执行a=1,再执行flag=true。当读线程进行读的时候,i=2;

但是因为a=1和flag=true,并没有逻辑上的关联。所以有可能执行的顺序颠倒,有可能先执行flag=true,再执行a=1。这时当flag=true时,切换到读线程,此时a=1还没有执行,那么读线程将i=1。

当然这个不是绝对的。是有可能会发生乱序,有可能不发生。

那么为什么会发生乱序呢?这个要从cpu指令说起,Java中的代码被编译以后,最后也是转换成汇编码的。

一条指令的执行是可以分为很多步骤的,假设cpu指令分为以下几步

  • 取指 IF
  • 译码和取寄存器操作数 ID
  • 执行或者有效地址计算 EX
  • 存储器访问 MEM
  • 写回 WB

假设这里有两条指令

一般来说我们会认为指令是串行执行的,先执行指令1,然后再执行指令2。假设每个步骤需要消耗1个cpu时间周期,那么执行这两个指令需要消耗10个cpu时间周期,这样做效率太低。事实上指令都是并行执行的,当然在第一条指令在执行IF的时候,第二条指令是不能进行IF的,因为指令寄存器等不能被同时占用。所以就如上图所示,两条指令是一种相对错开的方式并行执行。当指令1执行ID的时候,指令2执行IF。这样只用6个cpu时间周期就执行了两个指令,效率比较高。

按照这个思路我们来看下A=B+C的指令是如何执行的。

如图所示,ADD操作时有一个空闲(X)操作,因为当想让B和C相加的时候,在图中ADD的X操作时,C还没从内存中读取(当MEM操作完成时,C才从内存中读取。这里会有一个疑问,此时还没有回写(WB)到R2中,怎么会将R1与R1相加。那是因为在硬件电路当中,会使用一种叫“旁路”的技术直接把数据从硬件当中读取出来,所以不需要等待WB执行完才进行ADD)。所以ADD操作中会有一个空闲(X)时间。在SW操作中,因为EX指令不能和ADD的EX指令同时进行,所以也会有一个空闲(X)时间。

接下来举个稍微复杂点的例子

a=b+c 
d=e-f

对应的指令如下图

原因和上面的类似,这里就不分析了。我们发现,这里的X很多,浪费的时间周期很多,性能也被影响。有没有办法使X的数量减少呢?

我们希望用一些操作把X的空闲时间填充掉,因为ADD与上面的指令有数据依赖,我们希望用一些没有数据依赖的指令去填充掉这些因为数据依赖而产生的空闲时间。

我们将指令的顺序进行了改变

改变了指令顺序以后,X被消除了。总体的运行时间周期也减少了。

指令重排可以使流水线更加顺畅

当然指令重排的原则是不能破坏串行程序的语义,例如a=1,b=a+1,这种指令就不会重排了,因为重排的串行结果和原先的不同。

指令重排只是编译器或者CPU的优化一种方式,而这种优化就造成了本章一开始程序的问题。

如何解决呢?用volatile关键字,这个后面的系列会介绍到。

3. 可见性

  • 可见性是指当一个线程修改了某一个共享变量的值,其他线程是否能够立即知道这个修改。

可见性问题可能有各个环节产生。比如刚刚说的指令重排也会产生可见性问题,另外在编译器的优化或者某些硬件的优化都会产生可见性问题。

比如某个线程将一个共享值优化到了内存中,而另一个线程将这个共享值优化到了缓存中,当修改内存中值的时候,缓存中的值是不知道这个修改的。

比如有些硬件优化,程序在对同一个地址进行多次写时,它会认为是没有必要的,只保留最后一次写,那么之前写的数据在其他线程中就不可见了。

总之,可见性的问题大多都源于优化。

接下来看一个Java虚拟机层面产生的可见性问题

问题来自于一个Blog

package edu.hushi.jvm;
 
/**
 *
 * @author -10
 *
 */
public class VisibilityTest extends Thread {
 
    private boolean stop;
 
    public void run() {
        int i = 0;
        while(!stop) {
            i++;
        }
        System.out.println("finish loop,i=" + i);
    }
 
    public void stopIt() {
        stop = true;
    }
 
    public boolean getStop(){
        return stop;
    }
    public static void main(String[] args) throws Exception {
        VisibilityTest v = new VisibilityTest();
        v.start();
 
        Thread.sleep(1000);
        v.stopIt();
        Thread.sleep(2000);
        System.out.println("finish main");
        System.out.println(v.getStop());
    }
 
}

代码很简单,v线程一直不断的在while循环中i++,直到主线程调用stop方法,改变了v线程中的stop变量的值使循环停止。

看似简单的代码运行时就会出现问题。这个程序在 client 模式下是能停止线程做自增操作的,但是在 server 模式先将是无限循环。(server模式下JVM优化更多)

64位的系统上面大多都是server模式,在server模式下运行:

finish main
true

只会打印出这两句话,而不会打印出finish loop。可是能够发现stop的值已经是true了。

该Blog作者用工具将程序还原为汇编代码

这里只截取了一部分汇编代码,红色部分为循环部分,可以清楚得看到只有在0x0193bf9d才进行了stop的验证,而红色部分并没有取stop的值,所以才进行了无限循环。

这是JVM优化后的结果。如何避免呢?和指令重排一样,用volatile关键字。

如果加入了volatile,再还原为汇编代码就会发现,每次循环都会get一下stop的值。

接下来看一些在“Java语言规范”中的示例

上图说明了指令重排将会导致结果不同。

上图使r5=r2的原因是,r2=r1.x,r5=r1.x,在编译时直接将其优化成r5=r2。最后导致结果不同。

4. Happen-Before

  • 程序顺序原则:一个线程内保证语义的串行性
  • volatile规则:volatile变量的写,先发生于读,这保证了volatile变量的可见性
  • 锁规则:解锁(unlock)必然发生在随后的加锁(lock)前
  • 传递性:A先于B,B先于C,那么A必然先于C
  • 线程的start()方法先于它的每一个动作
  • 线程的所有操作先于线程的终结(Thread.join())
  • 线程的中断(interrupt())先于被中断线程的代码
  • 对象的构造函数执行结束先于finalize()方法

这些原则保证了重排的语义是一致的。

5. 线程安全的概念

指某个函数、函数库在多线程环境中被调用时,能够正确地处理各个线程的局部变量,使程序功能正确完成。

比如最开始所说的i++的例子

就会导致线程不安全。

关于线程安全的详情使用,请参考以前写的这篇Blog,或者关注后续系列,也会谈到相关内容。

1. 什么是线程

线程是进程内的执行单元

某个进程当中都有若干个线程。

线程是进程内的执行单元。

使用线程的原因是,进程的切换是非常重量级的操作,非常消耗资源。如果使用多进程,那么并发数相对来说不会很高。而线程是更细小的调度单元,更加轻量级,所以线程会较为广泛的用于并发设计。

在Java当中线程的概念和操作系统级别线程的概念是类似的。事实上,Jvm将会把Java中的线程映射到操作系统的线程区。

2. 线程的基本操作

2.1 线程状态图

上图是Java中线程的基本操作。

当new出一个线程时,其实线程并没有工作。它只是生成了一个实体,当你调用这个实例的start方法时,线程才真正地被启动。启动后到Runnable状态,Runnable表示该线程的资源等等已经被准备好,已经可以执行了,但是并不表示一定在执行状态,由于时间片轮转,该线程也可能此时并没有在执行。对于我们来说,该线程可以认为已经被执行了,但是是否真实执行,还得看物理cpu的调度。当线程任务执行结束后,线程就到了Terminated状态。

有时候在线程的执行当中,不可避免的会申请某些锁或某个对象的监视器,当无法获取时,这个线程会被阻塞住,会被挂起,到了Blocked状态。如果这个线程调用了wait方法,它就处于一个Waiting状态。进入Waiting状态的线程会等待其他线程给它notify,通知到之后由Waiting状态又切换到Runnable状态继续执行。当然等待状态有两种,一种是无限期等待,直到被notify。一直则是有限期等待,比如等待10秒还是没有被notify,则自动切换到Runnable状态。

2.2 新建线程

Thread thread = new Thread();
thread.start();

这样就开启了一个线程。

有一点需要注意的是

Thread thread = new Thread();
thread.run();

直接调用run方法是无法开启一个新线程的。

start方法其实是在一个新的操作系统线程上面去调用run方法。换句话说,直接调用run方法而不是调用start方法的话,它并不会开启新的线程,而是在调用run的当前的线程当中执行你的操作。

Thread thread = new Thread("t1")
{
    @Override
    public void run()
    {
        // TODO Auto-generated method stub
        System.out.println(Thread.currentThread().getName());
    }
};
thread.start();

如果调用start,则输出是t1

Thread thread = new Thread("t1")
{
    @Override
    public void run()
    {
        // TODO Auto-generated method stub
        System.out.println(Thread.currentThread().getName());
    }
};
thread.run();

如果是run,则输出main。(直接调用run其实就是一个普通的函数调用而已,并没有达到多线程的作用)

run方法的实现有两种方式

第一种方式,直接覆盖run方法,就如刚刚代码中所示,最方便的用一个匿名类就可以实现。

Thread thread = new Thread("t1")
{
    @Override
    public void run()
    {
        // TODO Auto-generated method stub
        System.out.println(Thread.currentThread().getName());
    }
};

第二种方式

Thread t1=new Thread(new CreateThread3());

CreateThread3()实现了Runnable接口。

在张孝祥的视频中,推荐第二种方式,称其更加面向对象。

2.3 终止线程

  • Thread.stop() 不推荐使用。它会释放所有monitor

在源码中已经明确说明stop方法被Deprecated,在Javadoc中也说明了原因。

原因在于stop方法太过”暴力”了,无论线程执行到哪里,它将会立即停止掉线程。

当写线程得到锁以后开始写入数据,写完id = 1,在准备将name = 1时被stop,释放锁。读线程获得锁进行读操作,读到的id为1,而name还是0,导致了数据不一致。

最重要的是这种错误不会抛出异常,将很难被发现。

2.4 线程中断

线程中断有3种方法

public void Thread.interrupt() // 中断线程 
public boolean Thread.isInterrupted() // 判断是否被中断 
public static boolean Thread.interrupted() // 判断是否被中断,并清除当前中断状态

什么是线程中断呢?

如果不了解Java的中断机制,这样的一种解释极容易造成误解,认为调用了线程的interrupt方法就一定会中断线程。

其实,Java的中断是一种协作机制。也就是说调用线程对象的interrupt方法并不一定就中断了正在运行的线程,它只是要求线程自己在合适的时机中断自己。每个线程都有一个boolean的中断状态(不一定就是对象的属性,事实上,该状态也确实不是Thread的字段),interrupt方法仅仅只是将该状态置为true。对于非阻塞中的线程, 只是改变了中断状态, 即Thread.isInterrupted()将返回true,并不会使程序停止;

public void run(){//线程t1
   while(true){
      Thread.yield();
   }
}
t1.interrupt();

这样使线程t1中断,是不会有效果的,只是更改了中断状态位。

如果希望非常优雅地终止这个线程,就该这样做

public void run(){ 
    while(true)
    { 
        if(Thread.currentThread().isInterrupted())
        { 
           System.out.println("Interruted!"); 
           break; 
        } 
        Thread.yield(); 
    } 
}

使用中断,就对数据一致性有了一定的保证。

对于可取消的阻塞状态中的线程, 比如等待在这些函数上的线程, Thread.sleep(), Object.wait(), Thread.join(), 这个线程收到中断信号后, 会抛出InterruptedException, 同时会把中断状态置回为false.

对于取消阻塞状态中的线程,可以这样抒写代码:

public void run(){
    while(true){
        if(Thread.currentThread().isInterrupted()){
            System.out.println("Interruted!");
            break;
        }
        try {
           Thread.sleep(2000);
        } catch (InterruptedException e) {
           System.out.println("Interruted When Sleep");
           //设置中断状态,抛出异常后会清除中断标记位
           Thread.currentThread().interrupt();
        }
        Thread.yield();
    }
}

2.5 线程挂起

挂起(suspend)和继续执行(resume)线程

  • suspend()不会释放锁
  • 如果加锁发生在resume()之前 ,则死锁发生

这两个方法都是Deprecated方法,不推荐使用。

原因在于,suspend不释放锁,因此没有线程可以访问被它锁住的临界区资源,直到被其他线程resume。因为无法控制线程运行的先后顺序,如果其他线程的resume方法先被运行,那则后运行的suspend,将一直占有这把锁,造成死锁发生。

用以下代码来模拟这个场景

package test;
 
public class Test
{
    static Object u = new Object();
    static TestSuspendThread t1 = new TestSuspendThread("t1");
    static TestSuspendThread t2 = new TestSuspendThread("t2");
 
    public static class TestSuspendThread extends Thread
    {
        public TestSuspendThread(String name)
        {
            setName(name);
        }
 
        @Override
        public void run()
        {
            synchronized (u)
            {
                System.out.println("in " + getName());
                Thread.currentThread().suspend();
            }
        }
    }
 
    public static void main(String[] args) throws InterruptedException
    {
        t1.start();
        Thread.sleep(100);
        t2.start();
        t1.resume();
        t2.resume();
        t1.join();
        t2.join();
    }
}

让t1,t2同时争夺一把锁,争夺到的线程suspend,然后再resume,按理来说,应该某个线程争夺后被resume释放了锁,然后另一个线程争夺掉锁,再被resume。

结果输出是:

in t1
in t2

说明两个线程都争夺到了锁,但是控制台的红灯还是亮着的,说明t1,t2一定有线程没有执行完。我们dump出堆来看看

发现t2一直被suspend。这样就造成了死锁。

2.6 join和yeild

yeild是个native静态方法,这个方法是想把自己占有的cpu时间释放掉,然后和其他线程一起竞争(注意yeild的线程还是有可能争夺到cpu,注意与sleep区别)。在javadoc中也说明了,yeild是个基本不会用到的方法,一般在debug和test中使用。

join方法的意思是等待其他线程结束,就如suspend那节的代码,想让主线程等待t1,t2结束以后再结束。没有结束的话,主线程就一直阻塞在那里。

package test;
 
public class Test
{
    public volatile static int i = 0;
 
    public static class AddThread extends Thread
    {
        @Override
        public void run()
        {
            for (i = 0; i < 10000000; i++)
                ;
        }
    }
 
    public static void main(String[] args) throws InterruptedException
    {
        AddThread at = new AddThread();
        at.start();
        at.join();
        System.out.println(i);
    }
}

如果把上述代码的at.join去掉,则主线程会直接运行结束,i的值会很小。如果有join,打印出的i的值一定是10000000。

那么join是怎么实现的呢?

join的本质

while(isAlive()) 
{ 
   wait(0); 
}

join()方法也可以传递一个时间,意为有限期地等待,超过了这个时间就自动唤醒。

这样就有一个问题,谁来notify这个线程呢,在thread类中没有地方调用了notify?

在javadoc中,找到了相关解释。当一个线程运行完成终止后,将会调用notifyAll方法去唤醒等待在当前线程实例上的所有线程,这个操作是jvm自己完成的。

所以javadoc中还给了我们一个建议,不要使用wait和notify/notifyall在线程实例上。因为jvm会自己调用,有可能与你调用期望的结果不同。

3. 守护线程

  • 在后台默默地完成一些系统性的服务,比如垃圾回收线程、JIT线程就可以理解为守护线程。
  • 当一个Java应用内,所有非守护进程都结束时,Java虚拟机就会自然退出。

此前有写过一篇python中如何实现,查看这里

而Java中变成守护进程就相对简单了。

Thread t=new DaemonT(); 
t.setDaemon(true); 
t.start();

这样就开启了一个守护线程。

package test;
 
public class Test
{
    public static class DaemonThread extends Thread
    {
        @Override
        public void run()
        {
            for (int i = 0; i < 10000000; i++)
            {
                System.out.println("hi");
            }
        }
    }
 
    public static void main(String[] args) throws InterruptedException
    {
        DaemonThread dt = new DaemonThread();
        dt.start();
    }
}

当线程dt不是一个守护线程时,在运行后,我们能看到控制台输出hi

当在start之前加入

dt.setDaemon(true);

控制台就直接退出了,并没有输出。

4. 线程优先级

Thread类中有3个变量定义了线程优先级。

public final static int MIN_PRIORITY = 1;
public final static int NORM_PRIORITY = 5;
public final static int MAX_PRIORITY = 10;
package test;
 
public class Test
{
    public static class High extends Thread
    {
        static int count = 0;
        @Override
        public void run()
        {
            while (true)
            {
                synchronized (Test.class)
                {
                    count++;
                    if (count > 10000000)
                    {
                        System.out.println("High");
                        break;
                    }
                }
            }
        }
    }
    public static class Low extends Thread
    {
        static int count = 0;
        @Override
        public void run()
        {
            while (true)
            {
                synchronized (Test.class)
                {
                    count++;
                    if (count > 10000000)
                    {
                        System.out.println("Low");
                        break;
                    }
                }
            }
        }
    }
 
    public static void main(String[] args) throws InterruptedException
    {
        High high = new High();
        Low low = new Low();
        high.setPriority(Thread.MAX_PRIORITY);
        low.setPriority(Thread.MIN_PRIORITY);
        low.start();
        high.start();
    }
}

让一个高优先级的线程和低优先级的线程同时争夺一个锁,看看哪个最先完成。

当然并不一定是高优先级一定先完成。再多次运行后发现,高优先级完成的概率比较大,但是低优先级还是有可能先完成的。

5. 基本的线程同步操作

synchronized 和 Object.wait() Obejct.notify()

这一节内容详情请看以前写的一篇Blog

主要要注意的是

synchronized有三种加锁方式:

*指定加锁对象:对给定对象加锁,进入同步代码前要获得给定对象的锁。 *直接作用于实例方法:相当于对当前实例加锁,进入同步代码前要获得当前实例的锁。 *直接作用于静态方法:相当于对当前类加锁,进入同步代码前要获得当前类的锁。

作用于实例方法,则不要new两个不同的实例

作用于静态方法,只要类一样就可以了,因为加的锁是类.class,可以new两个不同实例。

wait和notify的用法:

用什么锁住,就用什么调用wait和notify

本文就不细说了。

1. 关于高并发的几个重要概念

1.1 同步和异步

首先这里说的同步和异步是指函数/方法调用方面。

很明显,同步调用会等待方法的返回,异步调用会瞬间返回,但是异步调用瞬间返回并不代表你的任务就完成了,他会在后台起个线程继续进行任务。

1.2 并发和并行

并发和并行在外在表象来说,是差不多的。由图所示,并行则是两个任务同时进行,而并发呢,则是一会做一个任务一会又切换做另一个任务。所以单个cpu是不能做并行的,只能是并发。

1.3 临界区

临界区用来表示一种公共资源或者说是共享数据,可以被多个线程使用,但是每一次,只能有一个线程使用它,一旦临界区资源被占用,其他线程要想使用这个资源,就必须等待。

1.4 阻塞和非阻塞

  • 阻塞和非阻塞通常形容多线程间的相互影响。比如一个线程占用了临界区资源,那么其它所有需要这个资源的线程就必须在这个临界区中进行等待,等待会导致线程挂起。这种情况就是阻塞。此时,如果占用资源的线程一直不愿意释放资源,那么其它所有阻塞在这个临界区上的线程都不能工作。
  • 非阻塞允许多个线程同时进入临界区

所以阻塞的方式,一般性能不会太好。根据一般的统计,如果一个线程在操作系统层面被挂起,做了上下文切换了,通常情况需要8W个时间周期来做这个事情。

1.5 死锁、饥饿、活锁

所谓死锁:是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。就如同下图中的车都想前进,却谁都无法前进。

但是死锁虽说是不好的现象,但是它是一个静态的问题,一旦发生死锁,进程被卡死,cpu占有率也是0,它不会占用cpu,它会被调出去。相对来说还是比较好发现和分析的。

与死锁相对应的是活锁。

活锁,指事物1可以使用资源,但它让其他事物先使用资源;事物2可以使用资源,但它也让其他事物先使用资源,于是两者一直谦让,都无法使用资源。

举个例子,就如同你在街上遇到个人,刚好他朝着你的反方向走,与你正面碰到,你们都想让彼此过去。你往左边移,他也往左边移,两人还是无法过去。这时你往右边移,他也往右边移,如此循环下去。

一个线程在取得了一个资源时,发现其他线程也想到这个资源,因为没有得到所有的资源,为了避免死锁把自己持有的资源都放弃掉。如果另外一个线程也做了同样的事情,他们需要相同的资源,比如A持有a资源,B持有b资源,放弃了资源以后,A又获得了b资源,B又获得了a资源,如此反复,则发生了活锁。

活锁会比死锁更难发现,因为活锁是一个动态的过程。

饥饿是指某一个或者多个线程因为种种原因无法获得所需要的资源,导致一直无法执行。

1.6 并发级别

并发级别:阻塞和非阻塞(非阻塞分为无障碍、无锁、无等待)

1.6.1 阻塞

当一个线程进入临界区后,其他线程必须等待

1.6.2 无障碍

  • 无障碍是一种最弱的非阻塞调度
  • 自由出入临界区
  • 无竞争时,有限步内完成操作
  • 有竞争时,回滚数据

和非阻塞调度相比呢,阻塞调度是一种悲观的策略,它会认为说一起修改数据是很有可能把数据改坏的。而非阻塞调度呢,是一种乐观的策略,它认为大家修改数据未必把数据改坏。但是它是一种宽进严出的策略,当它发现一个进程在临界区内发生了数据竞争,产生了冲突,那么无障碍的调度方式则会回滚这条数据。

在这个无障碍的调度方式当中,所有的线程都相当于在拿去一个系统当前的一个快照。他们一直会尝试拿去的快照是有效的为止。

1.6.3 无锁

  • 是无障碍的
  • 保证有一个线程可以胜出

与无障碍相比,无障碍并不保证有竞争时一定能完成操作,因为如果它发现每次操作都会产生冲突,那它则会不停地尝试。如果临界区内的线程互相干扰,则会导致所有的线程会卡死在临界区,那么系统性能则会有很大的影响。

而无锁增加了一个新的条件,保证每次竞争有一个线程可以胜出,则解决了无障碍的问题。至少保证了所有线程都顺利执行下去。

下面代码是Java中典型的无锁计算代码

无锁在Java中很常见

while (!atomicVar.compareAndSet(localVar, localVar+1)) 
{ 
    localVar = atomicVar.get(); 
}

1.6.4 无等待

  • 无锁的
  • 要求所有的线程都必须在有限步内完成
  • 无饥饿的

首先无等待的前提是无锁的基础上的,无锁它只保证了临界区肯定有进也有出,但是如果进的优先级都很高,那么临界区内的某些优先级低的线程可能发生饥饿,一直出不了临界区。那么无等待解决了这个问题,它保证所有的线程都必须在有限步内完成,自然是无饥饿的。

无等待是并行的最高级别,它能使这个系统达到最优状态。

无等待的典型案例:

如果只有读线程,没有线线程,那么这个则必然是无等待的。

如果既有读线程又有写线程,而每个写线程之前,都把数据拷贝一份副本,然后修改这个副本,而不是修改原始数据,因为修改副本,则没有冲突,那么这个修改的过程也是无等待的。最后需要做同步的只是将写完的数据覆盖原始数据。

由于无等待要求比较高,实现起来比较困难,所以无锁使用得会更加广泛一些。

2. 有关并行的两个重要定律

这两个定律都与加速比有关

2.1 Amdahl定律

定义了串行系统并行化后的加速比的计算公式和理论上限

加速比定义:加速比=优化前系统耗时/优化后系统耗时

举个例子:

加速比=优化前系统耗时/优化后系统耗时=500/400=1.25

这个定理表明:增加CPU处理器的数量并不一定能起到有效的作用 提高系统内可并行化的模块比重,合理增加并行处理器数量,才能以最小的投入,得到最大的加速比。

2.2 Gustafson定律

说明处理器个数,串行比例和加速比之间的关系

则加速比=n-F(n-1) //推导过程略

只要有足够的并行化,那么加速比和CPU个数成正比

在这篇文章中,我要处理求解任意数独这个问题。用了两个想法(idea):约束传播搜索后,它其实是很简单的(主旨大约只有一页代码,加上润色后也不过两页)。

数独记号和初步概念

首先我们要在记号上达成一致。一个数独谜题是由 81 个方块组成的网格。大部分爱好者把列标为 1-9,把行标为 A-I,把 9 个方块的一组(列,行,或者方框)称为一个单元,把处于同一单元的方块称为对等方块。谜题中有些方块是空白的,其他的填入了数字。谜题的主旨是这样的:

当每个单元的方块填入了 1 到 9 的一个排列时,谜题就解决了。

也就是说,在一个单元中,任何数字都不能出现两次,而且每个数字都要出现一次。这意味着每个方块的值和它的对等方块的值是不同的。下面是方块的名字、一个典型的谜题及其解答:

A1 A2 A3| A4 A5 A6| A7 A8 A9    4 . . |. . . |8 . 5     4 1 7 |3 6 9 |8 2 5 
 B1 B2 B3| B4 B5 B6| B7 B8 B9    . 3 . |. . . |. . .     6 3 2 |1 5 8 |9 4 7
 C1 C2 C3| C4 C5 C6| C7 C8 C9    . . . |7 . . |. . .     9 5 8 |7 2 4 |3 1 6 
---------+---------+---------    ------+------+------    ------+------+------
 D1 D2 D3| D4 D5 D6| D7 D8 D9    . 2 . |. . . |. 6 .     8 2 5 |4 3 7 |1 6 9 
 E1 E2 E3| E4 E5 E6| E7 E8 E9    . . . |. 8 . |4 . .     7 9 1 |5 8 6 |4 3 2 
 F1 F2 F3| F4 F5 F6| F7 F8 F9    . . . |. 1 . |. . .     3 4 6 |9 1 2 |7 5 8 
---------+---------+---------    ------+------+------    ------+------+------
 G1 G2 G3| G4 G5 G6| G7 G8 G9    . . . |6 . 3 |. 7 .     2 8 9 |6 4 3 |5 7 1 
 H1 H2 H3| H4 H5 H6| H7 H8 H9    5 . . |2 . . |. . .     5 7 3 |2 9 1 |6 8 4 
 I1 I2 I3| I4 I5 I6| I7 I8 I9    1 . 4 |. . . |. . .     1 6 4 |8 7 5 |2 9 3

每个方块都属于 3 个单元,有 20 个对等方块。例如,如下是方块 C2 的单元和对等方块:

    A2   |         |                    |         |            A1 A2 A3|         |         
    B2   |         |                    |         |            B1 B2 B3|         |         
    C2   |         |            C1 C2 C3| C4 C5 C6| C7 C8 C9   C1 C2 C3|         |         
---------+---------+---------  ---------+---------+---------  ---------+---------+---------
    D2   |         |                    |         |                    |         |         
    E2   |         |                    |         |                    |         |         
    F2   |         |                    |         |                    |         |         
---------+---------+---------  ---------+---------+---------  ---------+---------+---------
    G2   |         |                    |         |                    |         |         
    H2   |         |                    |         |                    |         |         
    I2   |         |                    |         |                    |         |

我们可以用 Python 按如下方式来实现单元、对等方块、方块的概念:

def cross(A, B):
    "Cross product of elements in A and elements in B."
    return [a+b for a in A for b in B]
 
digits   = '123456789'
rows     = 'ABCDEFGHI'
cols     = digits
squares  = cross(rows, cols)
unitlist = ([cross(rows, c) for c in cols] +
            [cross(r, cols) for r in rows] +
            [cross(rs, cs) for rs in ('ABC','DEF','GHI') for cs in ('123','456','789')])
units = dict((s, [u for u in unitlist if s in u]) 
             for s in squares)
peers = dict((s, set(sum(units[s],[]))-set([s]))
             for s in squares)

如果你对 Python 的一些特性不熟悉,请注意‘字典’是哈希表在Python中的叫法,它将每个键映射到一个值。dict((s, […]) for s in squares) 创建了一个字典,把每个方块 s 映射到是一个列表的值。表达式 [u for u in unitlist if s in u] 的意思是:这个值是由那些包含s的单元u构成的列表。所以这个赋值表达式要这么读:units是一个字典,其中每个方块映射到包含它的单元组成的列表。类似地,下一个赋值语句要这么读:peers 是个字典,其中每个方块s映射到s所属单元的方块的并集,但不包括 s 自身。

做一些测试是没有坏处的(它们都通过了):

def test():
    "A set of unit tests."
    assert len(squares) == 81
    assert len(unitlist) == 27
    assert all(len(units[s]) == 3 for s in squares)
    assert all(len(peers[s]) == 20 for s in squares)
    assert units['C2'] == [['A2', 'B2', 'C2', 'D2', 'E2', 'F2', 'G2', 'H2', 'I2'],
                           ['C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9'],
                           ['A1', 'A2', 'A3', 'B1', 'B2', 'B3', 'C1', 'C2', 'C3']]
    assert peers['C2'] == set(['A2', 'B2', 'D2', 'E2', 'F2', 'G2', 'H2', 'I2',
                               'C1', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9',
                               'A1', 'A3', 'B1', 'B3'])
    print 'All tests pass.'

既然我们有了方块、单元和对等方块,下一步是定义数独网格。事实上我们需要两个表示(representations):首先要有一个用来指定谜题初始状态的文本格式(译注:指字符串),我们会为之保留 grid 这个名字;然后还需要谜题的任何状态(部分解决或完成)的内部表示,我们称之为 values 集合,因为对于每个方块,它将给出剩余可能的值。对于文本格式(网格),我们允许字符串中用 1-9 表示数字,0 或句点表示空方块。所有其他的字符都会被忽略(包括空格,换行,破折号)。因此以下的三个网格字符串代表了同样的谜题:

"4.....8.5.3..........7......2.....6.....8.4......1.......6.3.7.5..2.....1.4......"
 
"""
400000805
030000000
000700000
020000060
000080400
000010000
000603070
500200000
104000000"""
 
"""
4 . . |. . . |8 . 5 
. 3 . |. . . |. . . 
. . . |7 . . |. . . 
------+------+------
. 2 . |. . . |. 6 . 
. . . |. 8 . |4 . . 
. . . |. 1 . |. . . 
------+------+------
. . . |6 . 3 |. 7 . 
5 . . |2 . . |. . . 
1 . 4 |. . . |. . . 
"""

现在来说 values。有人可能认为 9×9 的数组是很显然的数据结构。但是方块的名字是 ‘A1′ 这种,而不是(0,0)。因此 values 将会是字典,以方块为键。每个键的值是那个方块的可能的数字:如果数字是作为谜题的一部分给定的,或者我们已经确定它必定的值,那么就是一个数字;如果我们还不确定,那就是一堆数字。这堆数字可以用Python集合或列表来表示,但我选择用数字字符串来表示(稍后会看到为什么)。因此,A1 为 7,C7 为空的网格 可以表示为 {‘A1′: ’7′, ‘C7′: ’123456789′, …}。

这是将网格解析成 values 字典的代码:

def parse_grid(grid):
    
"""Convert grid to a dict of possible values, {square: digits}, or
    
return False if a contradiction is detected."""
    
## To start, every square can be any digit; then assign values from the grid.
    values = dict((s, digits) for s in squares)
    for s,d in grid_values(grid).items():
        if d in digits and not assign(values, s, d):
            return False 
## (Fail if we can't assign d to square s.)
    return values
 
def grid_values(grid):
    "Convert grid into a dict of {square: char} with '0' or '.' for empties."
    chars = [c for c in grid if c in digits or c in '0.']
    assert len(chars) == 81
    return dict(zip(squares, chars))

约束传播

parse_grid 函数调用 assign(values, s, d)。我们可以把这实现为 values[s] = d,但我们可以做的更多。有解决数独谜题经验的人知道,在朝着填满所有方块前进时,有两个重要的策略:

(1) 如果一个方块只有一个可能值,把这个值从方块的对等方块(的可能值)中排除。 (2) 如果一个单元只有一个可能位置来放某个值,就把值放那。

作为策略(1)的例子,如果我们把 7 填入 A1,也就是 {‘A1′: ’7′, ‘A2′:’123456789′, …},我们看到 A1 只有一个值,因此7可以从A1的对等方块A2(也包括其他所有对等方块)中移除,得到{‘A1′: ’7′, ‘A2′: ’12345689′, …}。作为策略(2)的例子,如果A3到A9都不能以3作为可能值,那么3必定属于A2,我们可以更新为{‘A1′: ’7′, ‘A2′:’3′, …}。对A2的更新,反过来可能导致对它的对等方块以及对等方块的对等方块的更新,等等。这个过程称为约束传播。

函数assign(values, s, d)会返回更新后的values(包括来自约束传播的更新),但是如果产生了矛盾(即赋值不能保证一致),assign会返回False。例如,如果网格是以数字’77…’开头的,当我们试图把7赋给A2时,assign会注意到A2不可能是7,因为A2为7的可能性已经被它的对等方块A1排除了。

对于一个方块来说,基本操作不是赋值,而是排除一个可能的值,我们用eliminate(values, s, d)来实现它。一旦我们有了eliminate,assign(values, s, d)就可以定义为“从s中排除d以外的所有值”。

def assign(values, s, d):
    
"""Eliminate all the other values (except d) from values[s] and propagate.
    
Return values, except return False if a contradiction is detected."""
    other_values = values[s].replace(d, '')
    if all(eliminate(values, s, d2) for d2 in other_values):
        return values
    else:
        return False
 
def eliminate(values, s, d):
    
"""Eliminate d from values[s]; propagate when values or places <= 2.
    
Return values, except return False if a contradiction is detected."""
    if d not in values[s]:
        return values 
## Already eliminated
    values[s] = values[s].replace(d,'')
    
## (1) If a square s is reduced to one value d2, then eliminate d2 from the peers.
    if len(values[s]) == 0:
    return False 
## Contradiction: removed last value
    elif len(values[s]) == 1:
        d2 = values[s]
        if not all(eliminate(values, s2, d2) for s2 in peers[s]):
            return False
    
## (2) If a unit u is reduced to only one place for a value d, then put it there.
    for u in units[s]:
    dplaces = [s for s in u if d in values[s]]
    if len(dplaces) == 0:
        return False 
## Contradiction: no place for this value
    elif len(dplaces) == 1:
        
# d can only be in one place in unit; assign it there
            if not assign(values, dplaces[0], d):
                return False
    return values

在能走得更远之前,我们需要能显示一个谜题:

def display(values):
    "Display these values as a 2-D grid."
    width = 1+max(len(values[s]) for s in squares)
    line = '+'.join(['-'*(width*3)]*3)
    for r in rows:
        print ''.join(values[r+c].center(width)+('|' if c in '36' else '')
                      for c in cols)
        if r in 'CF': print line
    print

现在我们准备好了。我从来自于欧拉计划数独问题的一系列简单谜题中挑出了第一个例子:

>>> grid1 = '003020600900305001001806400008102900700000008006708200002609500800203009005010300'
 
>>> display(parse_grid(grid1))
4 8 3 |9 2 1 |6 5 7
9 6 7 |3 4 5 |8 2 1
2 5 1 |8 7 6 |4 9 3
------+------+------
5 4 8 |1 3 2 |9 7 6
7 2 9 |5 6 4 |1 3 8
1 3 6 |7 9 8 |2 4 5
------+------+------
3 7 2 |6 8 9 |5 1 4
8 1 4 |2 5 3 |7 6 9
6 9 5 |4 1 7 |3 8 2

在这个例子中,谜题完全是靠生搬硬套策略(1)和(2)解决的。不幸的是,这并不总能行得通。这是一系列困难谜题中的第一个例子:

>>> grid2 = '4.....8.5.3..........7......2.....6.....8.4......1.......6.3.7.5..2.....1.4......'
 
>>> display(parse_grid(grid2))
   4      1679   12679  |  139     2369    269   |   8      1239     5   
 26789     3    1256789 | 14589   24569   245689 | 12679    1249   124679
  2689   15689   125689 |   7     234569  245689 | 12369   12349   123469
------------------------+------------------------+------------------------
  3789     2     15789  |  3459   34579    4579  | 13579     6     13789 
  3679   15679   15679  |  359      8     25679  |   4     12359   12379 
 36789     4     56789  |  359      1     25679  | 23579   23589   23789 
------------------------+------------------------+------------------------
  289      89     289   |   6      459      3    |  1259     7     12489 
   5      6789     3    |   2      479      1    |   69     489     4689 
   1      6789     4    |  589     579     5789  | 23569   23589   23689

在这个例子中,我们离解决谜题还差得远呢!有61个方块还不确定。接下来要怎么做?我们可以编码更复杂的策略。例如,naked twins策略寻找同一单元的两个方块,它们有着相同的两个可能数字。给定{‘A5′: ’26′, ‘A6′:’26′, …},我们可以断定2和6必定在A5和A6中(尽管我们不知道分别在哪个方块),因此对于A行单元的其他方块,我们可以排除2和6。我们在eliminate中加上elif len(values[s]) == 2测试来实现这条策略。

像这样编码策略是一条可能的路线,但会需要几百行代码(有几十条这样的策略),我们也不确定能否解决每个谜题。

搜索

另一条路线是搜索一个解答:系统地尝试所有可能性直到找到一个解。这个方法的代码只有十几行,但是我们冒着另一个风险:程序可能要运行很长时间。考虑上面的grid2,A2有4种可能性(1679)),A3有5种可能性(12679)。总共就是20种可能性,如果我们一直乘下去,对于整个谜题是 4.62838344192 × 1038 种可能性。我们要怎么应对?有两个选择。

首先,我们可以尝试暴力方法。假设我们有一个非常高效的程序,计算一个位置只需一条指令;我们可以利用下一代的计算技术,比方说1024核的10GHz处理器,假设我们可以用上一百万这样的处理器;再假设我们购物时选了一个时光机,可以回退130亿年到宇宙的起点,开始运行我们的程序。我们可以算出至今我们差不多完成了这个谜题的1%。

第二个选择是每条机器指令处理远多于一种可能性。这看起来不可能,但幸运的是这恰好就是约束传播所做的。我们不需要尝试所有4 × 1038种可能性,因为尝试一种后,我们立即就能排除很多其他的可能性。例如,这个谜题的方块H7有两种可能性,6和9。我们可以尝试9,很快就发现有矛盾。这意味着我们排除的不是一种可能性,而是4 × 1038种选择中的一半。

事实上,要解决这个特定的谜题我们只要考虑25种可能性,61个未填充的方块中我们只需显式地搜索9个,约束传播会帮我们搞定剩下的。对于清单中的95个困难谜题,平均起来每个谜题只要考虑64种可能性,在所有谜题中都不需要搜索超过16个方块。

这个搜索算法是什么呢?很简单:首先确保我们还没有发现解或者矛盾,选择一个未填充的方块,考虑它的所有可能值。每次考虑一个值,尝试把它赋给方块,从得到的局面继续搜索。换言之,我们搜索这样的值d:我们可以从将d赋给方块s后的局面成功地搜索到一个解。如果搜索导致失败的局面,返回并考虑d的另一个值。这是一个递归的搜索,我们称之为深度优先搜索,因为在考虑s取另一不同值之前,我们(递归地)考虑values[s] = d条件下的所有可能性。

为了避免记录的复杂,每次递归调用search我们都创建values的新的拷贝。这样,搜索树的每个分支都是独立的,不会与另一个分支混淆。(这就是为什么将方块的可能值集合实现为字符串:我可以用values.copy()来拷贝values,简单又高效。如果我用Python集合或列表来实现可能值,我就要用copy.deepcopy(values),就没那么高效了。)另一种方法是记录每次对values的改动,走进死胡同时撤销改动。这就是所谓的回溯搜索。当搜索中的每一步只对大的数据结构做单个改动时,这是可行的。但当每次赋值会通过约束传播导致很多其他的改动时,这么做很复杂。

当我们实现搜索时要做两个选择:变量顺序(先尝试哪个方块?)和值顺序(对于当前方块先尝试哪个数字?)。对于变量顺序,我们用一种称为最小剩余值的常见启发方法,也就是说选择可能值数目最少的方块(或之一)。为什么这么做?考虑上面的grid2。假设我们先选择B3。它有7种可能值(1256789),因此我们猜错的期望是6/7。如果我们选择G2,它只有2种可能值,我们猜错的期望只有1/2。因此我们选择有最少可能值、猜对概率最高的方块。对于值顺序,我们不会做什么特别的事情,只是按数字大小顺序来考虑。

现在我们已经准备好用search函数来定义solve函数了:

def solve(grid): return search(parse_grid(grid))
 
def search(values):
    "Using depth-first search and propagation, try all possible values."
    if values is False:
        return False 
## Failed earlier
    if all(len(values[s]) == 1 for s in squares): 
        return values 
## Solved!
    
## Chose the unfilled square s with the fewest possibilities
    n,s = min((len(values[s]), s) for s in squares if len(values[s]) > 1)
    return some(search(assign(values.copy(), s, d)) 
        for d in values[s])
 
def some(seq):
    "Return some element of seq that is true."
    for e in seq:
        if e: return e
    return False

就这样,我们搞定了。只用了一页代码,现在我们可以解决任何数独谜题了。

结果

你可以查看完整程序。下面是在命令行运行程序的输出。它解决了来自文件的50个简单谜题95个困难谜题(请看95个谜题解答),我通过搜索最难的数独找到的11个谜题,以及一批随机谜题:

% python sudo.py
All tests pass.
Solved 50 of 50 easy puzzles (avg 0.01 secs (86 Hz), max 0.03 secs).
Solved 95 of 95 hard puzzles (avg 0.04 secs (24 Hz), max 0.18 secs).
Solved 11 of 11 hardest puzzles (avg 0.01 secs (71 Hz), max 0.02 secs).
Solved 99 of 99 random puzzles (avg 0.01 secs (85 Hz), max 0.02 secs).

分析

上面的每个谜题都用不到五分之一秒解决。真正困难的谜题会怎么样呢?芬兰数学家Arto Inkala把他的2006 puzzle描述为“目前已知最难的数独”,把2010 puzzle描述为“我创作过的最难数独”。我的程序都在0.01秒内解决了它们(solve_all将会在下面定义):

>>> solve_all(from_file("hardest.txt")[0:2], 'Inkala')
8 5 . |. . 2 |4 . . 
7 2 . |. . . |. . 9
. . 4 |. . . |. . . 
------+------+------
. . . |1 . 7 |. . 2
3 . 5 |. . . |9 . . 
. 4 . |. . . |. . . 
------+------+------
. . . |. 8 . |. 7 . 
. 1 7 |. . . |. . . 
. . . |. 3 6 |. 4 . 
 
8 5 9 |6 1 2 |4 3 7
7 2 3 |8 5 4 |1 6 9
1 6 4 |3 7 9 |5 2 8
------+------+------
9 8 6 |1 4 7 |3 5 2
3 7 5 |2 6 8 |9 1 4
2 4 1 |5 9 3 |7 8 6
------+------+------
4 3 2 |9 8 1 |6 7 5
6 1 7 |4 2 5 |8 9 3
5 9 8 |7 3 6 |2 4 1
 
(0.01 seconds)
 
. . 5 |3 . . |. . . 
8 . . |. . . |. 2 . 
. 7 . |. 1 . |5 . . 
------+------+------
4 . . |. . 5 |3 . . 
. 1 . |. 7 . |. . 6
. . 3 |2 . . |. 8 . 
------+------+------
. 6 . |5 . . |. . 9
. . 4 |. . . |. 3 . 
. . . |. . 9 |7 . . 
 
1 4 5 |3 2 7 |6 9 8
8 3 9 |6 5 4 |1 2 7
6 7 2 |9 1 8 |5 4 3
------+------+------
4 9 6 |1 8 5 |3 7 2
2 1 8 |4 7 3 |9 5 6
7 5 3 |2 9 6 |4 8 1
------+------+------
3 6 7 |5 4 2 |8 1 9
9 8 4 |7 6 1 |2 3 5
5 2 1 |8 3 9 |7 6 4
 
(0.01 seconds)
 
Solved 2 of 2 Inkala puzzles (avg 0.01 secs (99 Hz), max 0.01 secs).

我猜如果我想要一个真正困难的谜题,我得自己来制作。我不知道怎么创作困难的谜题,因此我生成了一百万个随机谜题。我用来生成随机谜题的算法很简单:首先,随机打乱方块的顺序。考虑到可能的数字选择,挨个在每个方块中填入随机数字。如果出现了矛盾,就重新来过。如果填充了至少17个方块以及8种不同的数字,就结束这个过程。(注意:如果填充少于17个方块或者用了不到8种不同数字,数独会有多个解。感谢Olivier Grégoire关于8种不同数字的很好的建议。)即使有这些检查,我的随机谜题仍不能保证有唯一解。很多有多个解,还有一些(大约0.2%)没有解。出现在书报上的谜题总是有唯一解。

0.032%    (1 in 3,000)    took more than 0.1 seconds
0.014%    (1 in 7,000)    took more than 1 second
0.003%    (1 in 30,000)    took more than 10 seconds
0.0001%    (1 in 1,000,000)    took more than 100 seconds

解决一个随机谜题平均用时为0.01秒,超过99.95%用了不到0.1秒,但有一些用时长得多:

0.032%    (1 in 3,000)    超过0.1
0.014%    (1 in 7,000)    超过1秒
0.003%    (1 in 30,000)    超过10秒
0.0001%    (1 in 1,000,000)    超过100秒

这里是100万个谜题中用时超过1秒的139个,排好序了,分别用线性和对数坐标表示:

很难从图中做出结论。最后几个值的上涨是显著的吗?如果我生成一千万个谜题,会不会有一个用时超过1000秒?下面是一百万个随机谜题中最难的(对于我的程序来说):

>>> hard1  = '.....6....59.....82....8....45........3........6..3.54...325..6..................'
>>> solve_all([hard1])
. . . |. . 6 |. . . 
. 5 9 |. . . |. . 8
2 . . |. . 8 |. . . 
------+------+------
. 4 5 |. . . |. . . 
. . 3 |. . . |. . . 
. . 6 |. . 3 |. 5 4
------+------+------
. . . |3 2 5 |. . 6
. . . |. . . |. . . 
. . . |. . . |. . . 
 
4 3 8 |7 9 6 |2 1 5
6 5 9 |1 3 2 |4 7 8
2 7 1 |4 5 8 |6 9 3
------+------+------
8 4 5 |2 1 9 |3 6 7
7 1 3 |5 6 4 |8 2 9
9 2 6 |8 7 3 |1 5 4
------+------+------
1 9 4 |3 2 5 |7 8 6
3 6 2 |9 8 7 |5 4 1
5 8 7 |6 4 1 |9 3 2
 
(188.79 seconds)

不幸的是,这不是一个真正的数独谜题,因为它有多个解。(它是在我包含了Olivier Grégoire关于8种不同数字的建议之前生成的,因此对于这个谜题的任何解,交换1和7,都得到另一个解。)但这是一个本质上很难的谜题吗?或者它的困难性是我的search程序所使用的变量顺序和值顺序方案的产物?为了检验,我随机化了值顺序(我把search最后一行中的for d in values[s]改成for d in shuffled(values[s]),我用了random.shuffle来实现shuffled)。结果明显地两边倒:30次尝试中的27次用了不到0.02秒,其他的3次每次用时都超过190秒(大约长了10000倍)。这个谜题有多个解,随机化的search找到13个不同的解。我的猜测是在搜索早期的某处,有一系列的方块(可能2个),如果我们选则了某种错误的值的组合来填充方块,就需要大约190秒来发现矛盾。但如果做了其他选择,我们很快要么发现一个解,要么发现矛盾,从另一个选择前进。因此算法的速度取决于它是否能避免选中致命的值组合。

随机化大部分时候都是有效的(30次中的27次),但通过考虑更好的值顺序我们可能会做的更好(一个流行的启发方式是最少约束值,也就是先选对对等方块约束最少的值),或者尝试更智能的变量顺序。

在我能给出对困难谜题一个好的刻画前,还需要更多的实验。我决定在另一个一百万随机谜题上做实验,这次保留运行时间的平均值,50th(中位数),90和99分位数,最大值和标准差这些统计值。结果是相似的,除了这次有两个谜题用时超过100秒,还有一个长得多:1439秒。事实上这个谜题是那没有解的0.2%中的一个,因此它可能不算。但主要的信息是即使我们抽样更多,平均值和中位数差不多保持不变,但最大值引人注目地保持增长。标准差也有微升,但主要是由于99分位数之外的那些很少的长用时。这是重尾分布,而不是正态分布。

为了比较,下面左边的表给出了解决谜题用时的统计,右边的表给出了从平均值为0.014,标准差为1.4794的正态(高斯)分布抽样的统计。注意对于一百万的抽样,高斯分布的最大值是平均值之上5个标准差(大致就是你对高斯分布所期望的),然而最长谜题运行时间在平均值之上1000个标准差。

这里是用时1439秒的无解谜题:

. . . |. . 5 |. 8 . 
. . . |6 . 1 |. 4 3 
. . . |. . . |. . . 
------+------+------
. 1 . |5 . . |. . . 
. . . |1 . 6 |. . . 
3 . . |. . . |. . 5 
------+------+------
5 3 . |. . . |. 6 1 
. . . |. . . |. . 4 
. . . |. . . |. . .

下面是定义solve_all的以及用它来验证来自文件和随机谜题的代码:

import time, random
 
def solve_all(grids, name='', showif=0.0):
    
"""Attempt to solve a sequence of grids. Report results.
    
When showif is a number of seconds, display puzzles that take longer.
    
When showif is None, don't display any puzzles."""
    def time_solve(grid):
        start = time.clock()
        values = solve(grid)
        t = time.clock()-start
        
## Display puzzles that take long enough
        if showif is not None and t > showif:
            display(grid_values(grid))
            if values: display(values)
            print '(%.2f seconds)n' % t
        return (t, solved(values))
    times, results = zip(*[time_solve(grid) for grid in grids])
    N = len(grids)
    if N > 1:
        print "Solved %d of %d %s puzzles (avg %.2f secs (%d Hz), max %.2f secs)." % (
            sum(results), N, name, sum(times)/N, N/sum(times), max(times))
 
def solved(values):
    "A puzzle is solved if each unit is a permutation of the digits 1 to 9."
    def unitsolved(unit): return set(values[s] for s in unit) == set(digits)
    return values is not False and all(unitsolved(unit) for unit in unitlist)
 
def from_file(filename, sep='n'):
    "Parse a file into a list of strings, separated by sep."
    return file(filename).read().strip().split(sep)
 
def random_puzzle(N=17):
    
"""Make a random puzzle with N or more assignments. Restart on contradictions.
    
Note the resulting puzzle is not guaranteed to be solvable, but empirically
    
about 99.8% of them are solvable. Some have multiple solutions."""
    values = dict((s, digits) for s in squares)
    for s in shuffled(squares):
        if not assign(values, s, random.choice(values[s])):
            break
        ds = [values[s] for s in squares if len(values[s]) == 1]
        if len(ds) >= N and len(set(ds)) >= 8:
            return ''.join(values[s] if len(values[s])==1 else '.' for s in squares)
    return random_puzzle(N) 
## Give up and make a new puzzle
 
def shuffled(seq):
    "Return a randomly shuffled copy of the input sequence."
    seq = list(seq)
    random.shuffle(seq)
    return seq
 
grid1  = '003020600900305001001806400008102900700000008006708200002609500800203009005010300'
grid2  = '4.....8.5.3..........7......2.....6.....8.4......1.......6.3.7.5..2.....1.4......'
hard1  = '.....6....59.....82....8....45........3........6..3.54...325..6..................'
 
if __name__ == '__main__':
    test()
    solve_all(from_file("easy50.txt", '========'), "easy", None)
    solve_all(from_file("top95.txt"), "hard", None)
    solve_all(from_file("hardest.txt"), "hardest", None)
    solve_all([random_puzzle() for _ in range(99)], "random", 100.0)

为什么?

我为什么要做这个?如计算机安全专家Ben Laurie说的,数独是“对人的理智的拒绝服务攻击”(译注:指数独容易让人上瘾)。我所知道的几个人(包括我的妻子)都被这个病毒感染了,我想这篇文章可以证明他们不用再在数独上花时间了。这对于我的朋友没起作用(尽管我的妻子自此之后不靠我的帮助独立改掉了这个嗜好),但至少有一位陌生人写到这篇文章对他有用,因此我使得世界更多产了。也许沿途我还教了点关于Python,约束传播和搜索的东西。

有两个关键的非面向对象编程概念需要马上理解:

1.重复的代码是一件坏事。 2.代码永远都在改变。

除了一些单任务和只运行一次的微小的”用完即弃”的程序,你几乎总是需要为了解决bug或增加新功能而更新你的代码。大部分编写良好的软件是那种可读性高,易于修改的软件。

如果你经常在你的程序中复制/黏贴代码,那么当你修改它的时候,就需要在很多地方做出同样的改动。这是棘手的。如果在某些地方遗漏了修改,你将到处修改bug或者实现的新功能有不一致性。重复的代码是一件坏事。程序中重复的代码将会把你置于bug和头痛之中。

函数让你摆脱重复的代码。你只需要将代码写到函数中一次,就可以在程序中的任何需要运行代码的地方调用它就可以。更新函数的代码就可以自动更新调用函数的地方。正如函数使得更新代码变得容易,使用面向对象编程技术也会组织你的代码使它更容易改变。记住代码总是在改变的。

一个角色扮演游戏栗子

大多数OOP教程都是令人作恶的。它们有”汽车”类和”鸣笛”方法,其他一些例子与新手写的或者之前接触过的实际程序都是不相关的。因此本博文将使用一个RPG类视频游戏(回忆下魔兽,宠物小精灵,或龙与地下城的世界)。我们已经习惯于将游戏中的事物想象成一些整数与字符的集合。看看Diablo(暗黑破坏神)角色屏幕或D&D角色表单上的数字:

从这些RPG视频游戏中去除图片,角色,装甲,其他对象只是变量形式的一个整数或者字符值的集合。不使用面向对象概念,你可以在Python中这样实现这些事物:

name = 'Elsa'
health = 50
magicPoints = 80
inventory = {'gold': 40, 'healing potion': 2, 'key': 1}
 
print('The hero %s has %s health.' % (name, health))

以上变量名都是非常通用的。为了在游戏中增加怪兽,你需要重命名玩家角色变量,并且增加一个怪兽角色:

heroName = 'Elsa'
heroHealth = 50
heroMagicPoints = 80
heroInventory = {'gold': 40, 'healing potion': 2, 'key': 1}
monsterName = 'Goblin'
monsterHealth = 20
monsterMagicPoints = 0
monsterInventory = {'gold': 12, 'dagger': 1}
 
print('The hero %s has %s health.' % (heroName, heroHealth))

当然你希望有更多的怪物,接着你会有类似monster1Name, monster2Name等等的变量。这不是一种好的编码方法,因此你可能会使用怪物变量列表:

monsterName = ['Goblin', 'Dragon', 'Goblin']
monsterHealth = [20, 300, 18]
monsterMagicPoints = [0, 200, 0]
monsterInventory = [{'gold': 12, 'dagger': 1}, {'gold': 890, 'magic amulet': 1}, {'gold': 15, 'dagger': 1}]

然后列表索引0处是第一个哥布林的状态,龙的状态在索引1处,另一个哥布林在索引2处。这样你可以在这些变量中存放很多怪物。

但是,这种代码容易导致错误。如果你的列表不同步,程序将无法正常工作。例如玩家击败了索引0处的哥布林,程序调用了vanquishMonster()函数。但这个函数有一个bug,它会(意外的)删除列表中的所有值除了monsterInventory:

def vanquishMonster(monsterIndex):
    del monsterName[monsterIndex]
    del monsterHealth[monsterIndex]
    del monsterMagicPoints[monsterIndex]
    # Note there is no del for monsterInventory
 
vanquishMonster(0)

怪兽列表最后看起来像这样:

monsterName = ['Dragon', 'Goblin']
monsterHealth = [300, 18]
monsterMagicPoints = [200, 0]
monsterInventory = [{'gold': 12, 'dagger': 1}, {'gold': 890, 'magic amulet': 1}, {'gold': 15, 'dagger': 1}]

现在龙的道具看起来跟之前哥布林的道具一样。第二个哥布林的道具是之前龙的道具。游戏迅速失控了。问题是你把一个怪物的数据散布在多个变量中。解决这个问题的方法是将一个怪物的数据放入一个字典里,然后使用一个字典列表:

monsters = [{'name': 'Goblin', 'health': 20, 'magic points': 0, 'inventory': {'gold': 12, 'dagger': 1}},
            {'name': 'Dragon', 'health': 300, 'magic points': 200, 'inventory': {'gold': 890, 'magic amulet': 1}},
            {'name': 'Goblin', 'health': 18, 'magic points': 0, 'inventory': {'gold': 15, 'dagger': 1}}]

啊哈!这段代码变得更加复杂了。例如,一个怪兽的状态是一个字典列表中的字典项。假如咒语或者道具栏也有它们自己的属性,且需要放到字典该怎么办?假如一个道具栏中的物品是一个背包,它本身包含了其他道具该怎么办?这个怪物列表会变得紧张。

这点是面向对象程序设计通过创建一个新的数据类型就可以解决的。

创建类

类是你程序中新数据类型的蓝图。面向对象编程对装甲,怪物等建模提供了新的方法,该方法比列表和字典的大杂烩好得多。虽然需要花些时间来熟悉OOP的概念。

事实上,因为英雄角色与怪兽们拥有相同的属性(健康值,状态值等等),我们只需要一个英雄与怪兽共享的通用的LivingThing类。你的代码可以变为:

class LivingThing():
    def __init__(self, name, health, magicPoints, inventory):
        self.name = name
        self.health = health
        self.magicPoints = magicPoints
        self.inventory = inventory
 
# Create the LivingThing object for the hero.
hero = LivingThing('Elsa', 50, 80, {})
monsters = []
monsters.append(LivingThing('Goblin', 20, 0, {'gold': 12, 'dagger': 1}))
monsters.append(LivingThing('Dragon', 300, 200, {'gold': 890, 'magic amulet': 1}))
 
print('The hero %s has %s health.' % (hero.name, hero.health))

嘿,瞧瞧,使用类已经把我们的代码削减了一半,因为我们可以对玩家角色和怪兽使用同样的代码。

在上面的代码中,你可以定义新的数据类型/类(除了学院派,但是这两个术语基本上是一样的。参见Stack Overflow – What’s the difference between a type and a class?)名为LivingThing。你可以实例化LivingThing变量/对象(重复一次,这两个术语基本上也是相同的)就好像你可以拥有整形值,字符值或布尔值一样。

上面代码特定于Python的细节:

class LivingThing():

上面class声明定义了一个新类,就像def声明定义一个新函数。该类名是LivingThing。

def __init__(self, name, health, magicPoints, inventory):

上面代码为LivingThing类定义了一个方法。”方法“只是命名属于这个类的函数。(参考Stack Overflow – What is the difference between a method and a function?)

这个方法很特别。init()用于类的构造函数(或者称为”构造方法”,”构造函数”,简写为”ctor”)。而一个类是一个新数据类型的蓝图,你还需要创建这个数据类型的值,以便于存储到变量或者使用函数传递。

当调用构造器创建新对象时,执行构造器中的代码,并返回一个新对象。这就是

hero = LivingThing('Elsa', 50, 80, {})

这行的意思。无论类名是什么,构造器总是被命名为__init__。

如果类没有__init__()方法,Python会为类提供通用的构造方法,该方法什么都不做。但是__init__()是初始化建立一个新对象的绝佳地方。

在Python语言中,方法中的第一个变量是self。self变量用于创建成员变量,后面会做解释。

构造函数体:

self.name = name
self.health = health
self.magicPoints = magicPoints
self.inventory = inventory

这看上去有点重复,但这段代码所作的就是对由构造函数创建的对象的成员变量赋值。成员变量开头是self.表示这些成员变量属于创建的对象,且不是函数中的普通局部变量。

调用构造器

# Create the LivingThing object for the hero.
hero = LivingThing('Elsa', 50, 80, {})
monsters = [LivingThing('Goblin', 20, 0, {'gold': 12, 'dagger': 1}),
            LivingThing('Dragon', 300, 200, {'gold': 890, 'magic amulet': 1}),
            LivingThing('Goblin', 18, 0, {'gold': 15, 'dagger': 1})]
 
print('The hero %s has %s health.' % (hero.name, hero.health))

Python中调用构造器就像是一个函数调用,该函数名为类名。因此LivingThing()就是调用LivingThing类的__init__()构造器。

LivingThing('Elsa', 50, 80, {})

调用创建了一个新的LivingThing对象,并保存在到hero变量里。以上代码还创建了3个怪兽LivingThing对象并保存在monsters列表中。

至此我们开始看到了面向对象编程的好处。如果其他程序员读了你的代码,当他们看到LivingThing()调用,他们知道他们可以搜索LivingThing类,然后从LivingThing类中找出所有他们想知道的细节。

但一个更大的好处是当你试图更新LivingThing类时才能体会的。

更新类

假如你想给你的RPG增加”饥饿”度属性。如果一个英雄或怪兽的饥饿度为0,他们一点也不饿。但如果他们的饥饿度超过100,那么他们将受到伤害并且健康值每天递减。你可以这样改变__init__()函数:

def __init__(self, name, health, magicPoints, inventory):
    self.name = name
    self.health = health
    self.magicPoints = magicPoints
    self.inventory = inventory
    self.hunger = 0 # all living things start with hunger level 0

不需要修改其他任何代码行,你游戏中所有LivingThing对象现在都有了饥饿度。你不需要担心某些LivingThing对象有hunger成员变量,而有些没有:所有LivingThing对象都更新了。

你也不需要改变任何构造器调用,因为你没有在__init__()函数的参数列表总中增加一个新的饥饿度参数。这是因为队一个新的LivingThing对象的饥饿度来说0是一个很好的默认值。如果你在__init__()函数的参数列表总中增加一个新的饥饿度参数,那么你需要更新所有调用构造器的代码。但这对其他函数也是一样的。

如果你的RPG有很多类似的默认值,通过使用类的构造器进行默认值赋值,就可以避免当量的”样板”代码。

方法

方法具有执行代码来影响对象本身的用途。例如,你可以编码来直接修改LivingThing对象的健康度:

hero = LivingThing('Elsa', 50, {})
hero.health -= 10 # Elsa takes 10 points of damage

但这样处理伤害不是一个非常健壮的方式。每当有什么东西受到伤害时就需要检查很多其他的游戏逻辑。例如,假设你想要检查一个角色在受到伤害后,它是否死亡。你需要这样的代码:

hero = LivingThing('Elsa', 50, {})
hero.health -= 10 # Elsa takes 10 points of damage
if hero.health < 0:
    print(hero.name + ' has died!')

以上方法的问题是你需要检查各处代码来减少LivingThing对象的健康值。但是重复的代码是一件坏事。阻止重复的代码的非OOP方式可能是把以上方法放入一个函数中:

def takeDamage(livingThingObject, dmgAmount):
    livingThingObject.health = self.health - dmgAmount
    if livingThingObject.health < 0:
        print(livingThingObject.name + ' is dead!') 
 
hero = LivingThing('Elsa', 50, {})
takeDamage(hero, 10) # Elsa takes 10 points of damage

这是一个更好的解决方案,因为任何更新takeDamage()(例如装甲防护,保护性法术,增益效果等)只需要增加到takeDamage()函数中。

然而,不利的一面是,当您的程序规模增长,takeDamage()函数很容易迷失在其中。takeDamage()函数与LivingThing类的关系并不明显。如果你的程序有成百上千的函数,它将很难指出哪一个函数与LivingThing类有关系。

解决的方法是将这个函数变成LivingThing类的方法:

class LivingThing():
    # ...other code in the class...
 
    def takeDamage(self, dmgAmount):
        self.health = self.health - dmgAmount
        if self.health == 0:
            print(self.name + ' is dead!')
 
    # ...other code in the class...
 
hero = LivingThing('Elsa', 50, {})
hero.takeDamage(10) # Elsa takes 10 points of damage

一旦你的程序有很多类,每个类有许多方法和成员变量,你将开始看到OOP可以帮助组织你的程序而使他更易于管理。

公共与私有方法

方法与成员变量可以被标示为public或private。公共方法可以和公共成员变量可以在类内部或外部的任何代码调用和赋值。私有方法和私有成员变量只能在对象自己的类内部被调用和赋值。

在某些语言中,例如Java,这种”可以被调用/赋值”由编译器严格的保证。而Python,却没有”私有”和”公共”的概念。所有方法和成员变量都是”公共”的。然而,语言规定如果一个方法名开头是下划线,它就被认为是一个私有方法。这就是为什么你将看到_takeDamage()等方法了。你可以方便的编写代码从对象的类的外部调用私有函数或者设置私有成员变量,但你已经被彻底警告不要去这样做了。

公共/私有的区别的原因是为了解释类如何与外部代码进行交互的。(参考Stack Overflow – Why “private” methods in the object oriented?)类的程序员期望其他人不会编写代码调用私有方法或设置私有成员变量。

例如,takeDamage()方法包括健康值低于0就检查死亡。你可能想要在代码中添加各种各样的其他检查。护甲、敏捷性和防护法术来减少伤害的可能因素。LivingThing对象可能穿着一件魔法斗篷,通过增加抗损害值,而不是减少它们的健康值来进行治疗。这个游戏的所有逻辑都可以放入takeDamage()方法中。

如果你偶然的把代码放到那里,所有的OOP结构就毫无意义了

class LivingThing():
        # ...code in the class...
 
hero = LivingThing('Elsa', 50, {})
 
# ...some more code...
 
if someCondition:
    hero.health -= 50

语句hero.health -= 50会减少50点健康值,而不会考虑Elsa穿着哪种装甲,或者她有防护法术,或者她穿着魔法治疗披风。这段代码将直截了当的减少50点健康值。

很容易忘掉takeDamage()方法并且偶尔写出这样的代码。它不会检查英雄对象的健康值是否低于0。游戏继续运行好像Elsa还活着,及时她的健康值是负数!我们可以使用公共/私有成员和方法来避免这个bug。

如果你重命名health成员变量为_health且标记为私有的,那么当你写成这样就很容易捕获这个bug:

hero = LivingThing('Elsa', 50, {})
 
# ...some more code...
 
if someCondition:
    hero._health -= 50 # WAIT! This code is outside the hero object's class but modifying a private member variable! This must be a bug!

在一种语言中如Java,如果编译器确保私有/公共访问,它就不可能编写非法访问自由成员和方法的程序。面向对象编程帮助我们防止这种bug。

继承

使用LivingThing类表示龙是不错的,但是除了LivingThing类提供的属性外,龙有很多其他的属性。因此你想创建一个新的Dragon类,它包含如airSpeed和breathType(可以使用 ‘fire’,’blizzard’, ‘lightning’, ‘poison gas’等字符串表示)等成员变量。

因为Dragon对象也包含health,magicPoints,inventory和其他LivingThing对象的属性,你可以创建一个新的Dragon类,并且从LivingThing类复制/黏贴所有代码。但这将导致重复的代码这一坏习惯。

相反,可以使Dragon类作为LivingThing类的子类:

class LivingThing():
    # ...other code in the class...
 
class Dragon(LivingThing):
    # ...Dragon-specific code in the class...

实际上是说,”一个龙也是一种LivingThing,还有一些附加的方法和成员变量”。当你创建龙对象时,它会自动的拥有LivingThing的方法和成员变量(拯救我们脱离重复的代码)。但它也有龙特有的方法和成员变量。进一步说,任何处理LivingThing对象的代码都可以自动的操作龙对象,因为龙对象已经拥有了LivingThing成员变量和方法。这个原则被称为子类型多态性。

然而在实践中,继承是容易滥用的。你必须确保任何对LivingThing类做出的改变和更新都适用于Dragon类和所有其他LivingThing的子类。这可能不总是那么简单直接。

例如,如果你创建了LivingThing类的Monster和Hero子类,接着创建了Monster类的 FlyingMonster 和MagicalMonster子类,新的Dragon类继承自FlyingMonster 类还是MagicalMonster类?或者可能它只是Monster类的子类

这就是继承和OOP开始变得棘手且严谨的争论哪种是”正确”设计类的方式之所在。我希望报纸这篇博文简短而简单,因此我要把这些问题留给读者作为练习来调查。(参考 Stack Overflow – Prefer composition over inheritance?Wikipedia – Composition over inheritance)

总结

我讨厌由面向对象编程开始的面向初学者的程序教程。OOP是十分抽象的概念。有一些经验和编写过大型程序之前,你不会理解为什么使用类和对象使编程更容易。相反,它会给初学者留下一条陡峭的学习曲线去爬,他们不知道为什么攀登它。我希望这个RPG例子至少让你领略了为什么OOP是有帮助的。有更多的OOP。如果你想了解更多,试试搜索“python object oriented design” 和 “python design patterns”。

如果你仍然对OOP概念感到迷惑,放心大胆的编写没有类的程序。你不需要它们,它们将导致过度设计的代码。但是一旦你已经有了一些编码经验,面向对象编程的好处会变得更加明显。祝你好运!