Java并发编程学习笔记-02

本文为学习 Java 并发编程的一些随笔 Part 2,记录我所认为的一些重要 or 有趣的点。

此部分主要是关于 Java 实战的一些探讨。

参考书籍:Java Concurrency in Practice 《Java并发编程实战》

构建基础模块

同步容器类

包括 VectorHashTable 以及一些功能类似的类,这些同步的封装器类是由 Collections.synchronizedXxx 等工厂方法创建的,其实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。

同步容器类的问题

同步容器类虽然都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作,常见符合操作包括:迭代、跳转(根据指定顺序找到当前元素的下一个元素)以及条件运算。

迭代器与ConcurrentModificationException

List<Widget> widgetList = Collections.synchronizedList(new ArrayList<Widget>());
...
// 可能抛出 ConcurrentModificationException
for (Widget w : widgetList)
    doSomething(w);

如果在迭代期间计数器被修改,那么 hasNextnext 将抛出 ConcurrentModificationException。然而,这种检查是在没有同步的情况下进行的,因此可能会看到失效的计数值,而迭代器可能并没有意识到已经发生了修改。这是一种设计上的权衡,从而降低并发修改操作的检测代码对程序性能带来的影响。

隐藏迭代器

在某些情况下,迭代器会隐藏起来。

public class HiddenIterator {
    private final Set<Integer> set = new HashSet<Integer>();
    
    public synchronized void add(Integer i) {
        set.add(i);
    }    
    
    public synchronized void remove(Integer i) {
        set.remove(i);
    }
    
    public void addTenThings() {
        Random r = new Random();
        for (int i = 0; i < 10; i++) {
            add(r.nextInt());
        }
        System.out.println("DEBUG: added ten elements to " + set);
    }
}

字符串的连接操作转换为调用 StringBuilder.append(Object),而这个方法又会调用容器的 toString 方法,标准容器的 toString 方法将迭代容器。

并发容器

ConcurrentHashMap

ConcurrentHashMap 并不是将每个方法都在同一个锁上进行同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制成称为分段锁(Lock Striping)。在这种机制中,任意数量的读取线程可以并发地访问 Map,并且一定数量的写入线程可以并发地修改 Map。其在并发访问环境下可以实现更高的吞吐量,而在单线程环境中只损失非常小的性能。

ConcurrentHashMap 提供的迭代器不会抛出 ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。ConcurrentHashMap 返回的迭代器具有弱一致性(Weakly Consistent),而并非“及时失败”。弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以(但不保证)在迭代器被构造后将修改操作反映给容器。

PS:ConcurrentHashMap 虽然不能被加锁来执行独占访问,但一些常见的符合操作都已经实现为原子操作并在 ConcurrentMap 接口中声明,如下所示:

public interface ConcurrentMap<K, V> extends Map<K, V> {
    // 仅当K没有相应的映射值时才插入
    V putIfAbsent(K key, V value);
    
    // 仅当K被映射到V时才移除
    boolean remove(K key, V value);
    
    // 仅当K被映射到oldValue时才替换为newValue
    boolean replace(K key, V oldValue, V newValue);
    
    // 仅当K被映射到某个值时才替换为newValue
    V replace(K key, V value);
}

CopyOnWriteArrayList

CopyOnWriteArrayList 替代同步 List,“写入即复制(Copy-On-Wirte)”容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本。

显然,每当修改容器时都会复制底层数组,这需要一定的开销,特别是当容器的规模较大时。仅当迭代操作远远多于修改操作时,才应该使用 Copy-On-Wirte 容器。

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的 puttake 方法,以及支持定时的 offerpoll 方法。如果队列已经满了,那么 put 方法将阻塞直到有空间可用;如果队列为空,那么 take 方法将会阻塞直到有元素可用。

阻塞队列支持生产者-消费者这种设计模式。

BlockingQueue 简化了生产者-消费者设计的实现过程,支持任意数量的生产者和消费者。一种最常见的生产者-消费者设计模式就是线程池与工作队列的组合。

阻塞队列也提供了一个 offer 方法,如果数据项不能被添加到队列中,那么将返回一个失败状态。这样你就能创建更多灵活的策略来处理负荷过载的情况,例如减轻负载、将多余的工作项序列化并写入磁盘、减少生产者线程的数量、通过某种方式来抑制生产者线程。

在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:它们能抑制并防止产生过多的工作项,使应用程序在负荷过在的情况下变得更加健壮。

虽然生产者-消费者模式能够将生产者和消费者的代码彼此解耦开来,但它们的行为仍然会通过共享工作队列间接地耦合在一起。开发人员总会假设消费者处理工作的速率能高赶上生产者生成工作项的速率,因此通常不会为工作队列的大小设置边界,但这将导致在之后需要重新设计系统架构。

在类库中包含了 BlockingQueue 的多种实现,其中 LinkedBlocingQueueArrayBlocingQueue 是FIFO队列,二者分别与 LinkedListArrayList 类似,但比同步 List 有更好的并发性能; PriorityBlockingQueue 是一个按优先级排序的队列,可以按照某种排序来处理元素而不是FIFO,可以根据元素的自然顺序来比较元素(如果实现了 Comparable 方法)或者使用 Comparator 来比较;而 SynchronousQueue 实际上不是一个真正的队列,不会为队列中的元素维护存储空间。它维护一组线程,这些线程在等待着把元素加入或移出队列。这种方式可以直接交付工作,从而降低了将数据从生产者移动到消费者的延迟,并且当交付被接受时,它就知道消费者已经得到了任务,而不是简单地把任务放入一个队列。

示例:桌面搜索

public class FileCrawler implements Runnable{
    private final BlockingQueue<File> fileQueue;
    private final FileFilter fileFilter;
    private final File root;
    
    //...
    
    @Override
    public void run() {
        try {
            crawl(root);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private void crawl(File root) throws InterruptedException {
        File[] entries = root.listFiles(fileFilter);
        if (null != entries) {
            for (File entry : entries) {
                if (entry.isDirectory()) {
                    crawl(entry);
                } else if (!alreadyIndexed(entry)) {
                    fileQueue.put(entry);
                }
            }
        }
    }
}




public class Indexer implements Runnable {
    private final BlockingQueue<File> queue;

    public Indexer(BlockingQueue<File> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                indexFile(queue.take());
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

生产者-消费者模式提供了一种适合线程的方法将桌面搜索问题分解为更简单的组件。将文件遍历与建立索引等功能分解为独立的操作,这比把所有功能都放到一个操作实现有着更高的代码可读性和可重用性:每个操作只需完成一个任务,并且阻塞队列将负责所有的控制流,因此每个功能的代码都更加简单和清晰。

生产者-消费者模式同样能带来许多性能优势。生产者和消费者可以并发的执行。如果生产者和消费者一个是I/O密集型,一个是CPU密集型,那么并发执行的吞吐率要高于串行执行的吞吐率。如果生产者和消费者的并行度不同,那么将它们耦合在一起会把整体并行度降低成二者中更小的并行度。

阻塞方法与中断方法

线程可能会阻塞或暂停执行,原因有多种:

Java 中当某方法抛出 InterruptedException 时,表示该方法是一个阻塞方法,如果这个方法被中断,那么它将努力提前结束阻塞状态。

Thread 提供了 interrupt 方法,用于中断线程或者查询线程是否已经被中断。每个线程都有一个布尔类型的属性,表示线程的中断状态,当中断线程时将设置这个状态。

中断是一种协作机制。一个线程不能强制其他线程停止正在执行的操作而去执行其他的操作。当线程A中断线程B时,A只是要求B在执行到某个可以暂停的地方停止正在执行的操作(前提是B愿意停下来)。方法对中断请求的响应度越高,就越容易及时取消那些执行时间很长的操作。

当调用一个会抛出 InterruptedException 异常的方法时,调用方的方法也会变成一个阻塞方法,并且必须要处理对中断的响应。对于库代码,有两种基本选择:

public class TaskRunnable implements Runnable {
    BlockingQueue<Task> queue;
    // ...
    public void run() {
        try {
            processTask(queue.take());
        } catch (InterruptedException e) {
            // 恢复被中断的状态
            Thread.currentThread().interrupt();
        }
    }
}

在出现 InterruptedException 时不应该做的事情是捕获它但不做出任何响应。这将导致调用栈上更高层的代码无法对中断采取处理措施(因为无法感知线程被中断)。

同步工具类

同步工具类可以根据自身的状态来协调线程的控制流。除了阻塞队列,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。

所有的同步工具类都包含一些特定的结构化属性:它们封装了一些状态,这些状态将决定执行同步工具类的线程是继续执行还是等待,此外还提供了一些方法对状态进行操作,以及另一些方法用于高效地等待同步工具类进入到预期状态。

闭锁

闭锁可以延迟线程的进度直到其达到终止状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如:

CountDownLatch 是一种灵活的闭锁实现,适用于上述各种情况。

e.g. 测试n个线程并发执行某个任务时需要的时间。

public class TestHarness {
    public long timeTasks(int nThreads, final Runnable task)
        throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);
        
        for (int i = 0; i < nThreads; ++i) {
            Thread t = new Thread() {
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException ignored) { }
                }
            };
            t.start();
        }
        
        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;
    }
}

FutureTask

FutureTask 实现了 Future 语义,表示一种更抽象的可生成结果的计算。FutureTask 表示的计算时通过 Callable 来实现的,相当于一种可生成结果的 Runnable,并且可以处于以下3种状态:

Future.get 的行为取决于任务的状态。如果任务已经完成,那么 get 会立即返回结果,否则会阻塞直到任务进入完成状态,然后返回结果或抛出异常。FutureTask 将计算机过从执行计算的线程传递到获取这个结果的线程,其规范确保了这种传递过程能实现结果的安全发布。

public class Preloader {
    private final FutureTask<ProductInfo> future =
            new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
                @Override
                public ProductInfo call() throws DataLoadExcption {
                    return loadProductInfo();
                }
            });
    private final Thread thread = new Thread(future);
    
    public void start() { thread.start(); }
    
    public ProductInfo get()
        throws DataLoadException, InterruptedException {
        try {
            return future.get();
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof DataLoadException) {
                throw (DataLoadException) cause;
            } else {
                throw launderThrowable(cause);
            }
        }
    }
}

Preloader 创建了一个 FutureTask,其中包含从数据库加载产品信息的任务,以及一个执行运算的线程。由于在构造函数或静态初始化方法中启动线程并不是一种好方法,因此提供了一个 start 方法来启动线程。当程序随后需要访问数据时,可以调用 get 方法。

Callable 表示的任务可以抛出受检查的或未受检查的异常,并且任何代码都可能抛出一个 Error。无论任务代码抛出什么异常,都会被封装到一个 ExecutionException 种,并在 Future.get 中被重新抛出。

get 不仅需要处理可能出现的 ExecutionException(以及未检查的 CancellationException),而且还由于ExecutionException 是作为一个 Throwable 类返回的,处理并不容易。所以使用 launderThrowable 辅助方法来封装一些复杂的异常处理逻辑。Preloader 会首先检查已知的受检查异常,并重新抛出它们。剩下的未检查异常留给 launderThrowable 处理并抛出。其实现如下:

public static RuntimeException launderThrowable(Throwable t) {
    if (t instanceof RuntimeException) {
        return (RuntimeException)t;
    } else if (t instanceof Error) {
        throw (Error) t;
    } else {
        throw new IllegalStateException("Not unchecked", t);
    }
}

信号量

计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。

下面的代码是将容器变成有界阻塞的:

public class BoundedHashSet<T> {
    private final Set<T> set;
    private final Semaphore sem;
    
    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<T>());
        sem = new Semaphore(bound);
    }
    
    public boolean add(T o) throws InterruptedExcption {
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally {
            if (!wasAdded) {
                sem.release();
            }
        }
    }
    
    public boolean remove(Object o) {
        boolean wasRemoved = set.remove(o);
        if (wasRemoved) {
            sem.release();
        }
        return wasRemoved;
    }
}

栅栏

栅栏能阻塞一组线程知道某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。

CyclicBarrier 可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时将调用 await 方法,这个方法将阻塞直到所有线程都到达栅栏位置。

public class CellularAutomata {
    private final Board mainBoard;
    private final CyclicBarrier barrier;
    private final Worker[] workers;
    
    public CellularAutomata(Board board) {
        this.mainBoard = board;
        int count = Runtime.getRuntime().availableProcessors();
        this.barrier = new CyclicBarrier(count,
                                        new Runnable() {
                                            public void run() {
                                                mainBoard.commitNewValues();
                                            }
                                        });
        this.workers = new Worker[count];
        for (int i = 0; i < count; i++) {
            workers[i] = new Worker(mainBoard.getSubBoard(count, i));
        }
    }
    
    private class Worker implements Runnable {
        private final Board board;
        
        public Worker(Board board) {
            this.board = board;
        }
        
        public void run() {
            while (!board.hasConverged()) {
                for (int x = 0; x < board.getMaxX(); x++) {
                    for (int y = 0; y < board.getMaxY(); y++) {
                        board.setNewValue(x, y, computeValue(x, y));
                    }
                }
                
                try {
                    barrier.await();
                } catch (InterruptedException ex) {
                    return;
                } catch (BrokenBarrierException ex) {
                    return;
                }
            }
        }
        
        public void start() {
            for (int i = 0; i < workers.length; i++) {
                new Thread(workers[i]).start();
            }
            mainBoard.waitForConvergence();
        }
    }
}

上面的代码段给出了如何通过栅栏来计算细胞的自动化模拟。在把模拟过程并行化时,为每个元素(相当于细胞)分配一个独立的线程时不现实的,因为这将产生过多的线程,在协调这些线程上导致的开销会降低计算性能。合理的做法是将问题分解成一定数量的子问题,为每个子问题分配一个线程来进行求解,之后再将所有的结果合并。其中分解的线程数 count 等于可用CPU数量。

另一种形式的栅栏是 Exchanger,它是一种两方(Two-Party)栅栏,各方在栅栏位置上交换数据。当两方执行不对称的操作时,例如一个线程向缓冲区写入数据,另一个线程从缓冲区读取数据。这些线程可以使用 Exchanger 来汇合,并将满的缓冲区与空的缓冲区交换。

构建高效且可伸缩的结果缓存

具体迭代过程见书 P85

public class Memozier<A, V> implements Computable<A, V> {
    private final ConcurrentMap<A, Future<V>> cache =
        new ConcurrentHashMap<A, Future<V>>();
    private final Computable<A, V> c;
    
    public Memozier(Computable<A, V> c) {
        this.c = c;
    }
    
    public V compute(final A arg) throws InterruptedException {
        while (true) {
            Future<V> f = cache.get(arg);
            if (f == null) {
                Callable<V> eval = new Callable<V>() {
                    public V call() throws InterruptedException {
                        return c.compute(arg);
                    }
                };
                FutureTask<V> ft = new FutureTask<V>(eval);
                f = cache.putIfAbsent(arg, ft);	// ConcurrentHashMap中如果键值存在,putIfAbsent返回该键值;如果不存在,则返回null
                if (f == null) {
                    f = ft;
                    ft.run();
                }
            }
            
            try {
                return f.get();
            } catch (CancellationException e) {
                cache.remove(arg, f);
            } catch (ExecutionException e) {
                throw launderThrowable(e.getCause());
            }
        }
    }
}