Java并发编程学习笔记-01

本文为学习 Java 并发编程的一些随笔 Part 1,记录我所认为的一些重要 or 有趣的点。

此部分主要是关于同步、线程安全等一些理论和技术的探讨。

参考书籍:Java Concurrency in Practice 《Java并发编程实战》

线程安全性

原子性

竞态条件

@NotThreadSafe
public class UnsafeCountingFactorizer implements Servlet {
    private long count = 0;

    public long getCount() { return count; }

    public void service(ServletRequest req, ServletResponse resp) {
        BigInteger i = extractFromRequest(req);
        BigInteger[] factors = factor(i);
        ++count;
        encodeIntoResponse(resp, factors);
    }
}

在上面的代码段里,虽然 ++count 是一种紧凑的语法,但这个操作并不是原子性的,包括:读取 count 的值、将值加1、然后将计算结果写回 count。这是一个“读取—修改—写入”的操作序列,并且其结果状态依赖于之前的状态。

在并发编程中,这种由于不恰当的执行时序而出现不正确的结果是一种非常重要的情况,也叫做竞态条件Race Condition)。

复合操作

要避免竞态条件问题,就必须在某个线程修改该变量时,通过某种方式防止其他线程使用这个变量,从而确保其他线程只能在修改操作完成之前或之后读取和修改状态,而不是在修改状态的过程中。

在Java中,可以使用一个现有的线程安全类来修复这个问题:

@ThreadSafe
public class CountingFactorizer implements Servlet {
    private final AtomicLong count = new AtomicLong(0);

    public long getCount() { return count.get(); }

    public void service(ServletRequest req, ServletResponse resp) {
        BigInteger i = extractFromRequest(req);
        BigInteger[] factors = factor(i);
        count.incrementAndGet();
        encodeIntoResponse(resp, factors);
    }
}

加锁机制

如果在 Servlet 中添加更多的状态,那么是否只需添加更多的线程安全变量就足够了?

@NotThreadSafe
public class UnsafeCachingFactorizer implements Servlet {
    private final AtomicReference<BigInteger> lastNumber = new AtomicReference<BigInteger>();

    private final AtomicReference<BigInteger[]> lastFactors = new AtomicReference<BigInteger[]>();

    public void service(ServletRequest req, ServletResponse resp) {
        BigInteger i = extractFromRequest(req);
        if(i.equals(lastNumber.get())){
            encodeIntoResponse(resp, lastFactors.get());
        }
        else {
            BigInteger[] factors = factor(i);
            lastNumber.set(i);
            lastFactors.set(factors);
            encodeIntoResponse(resp, factors);
        }
    }
}

然而,这种方法并不正确。尽管这些原子引用本身是线程安全的,但在这段代码中存在着竞态条件。

内置锁

Java提供了一种内置的锁机制来支持原子性:同步代码块(Synchronized Block)。

synchronized 关键字修饰的方法就是一个横跨整个方法体的同步代码块。

@ThreadSafe
public class SynchronizedCachingFactorizer implements Servlet {
   private final AtomicReference<BigInteger> lastNumber = new AtomicReference<BigInteger>();

   private final AtomicReference<BigInteger[]> lastFactors = new AtomicReference<BigInteger[]>();

   public synchronized void service(ServletRequest req, ServletResponse resp) {
       BigInteger i = extractFromRequest(req);
       if(i.equals(lastNumber.get())){
           encodeIntoResponse(resp, lastFactors.get());
       }
       else {
           BigInteger[] factors = factor(i);
           lastNumber.set(i);
           lastFactors.set(factors);
           encodeIntoResponse(resp, factors);
       }
   }
}

然而,这种方式过于极端,多个客户端无法同时使用因数分解 Servlet,服务的响应性非常低。这就变成了一个性能问题。

重入

当某个线程请求一个由其他线程持有的锁时,发出的请求就会阻塞。然而,由于内置锁是可重入的,因此如果某个线程试图获得一个由它持有的锁,那么这个请求就会成功,“重入”意味着获取锁的操作的粒度是“线程”,而不是“调用”(这与 POXIS 线程(pthread)互斥体的默认加锁行为不同

public class Widget {
    public synchronized void doSomeThing() {
        // ...
    }
}

public class LoggingWidget extends Widget {
    @override
    public synchronized void doSomeThing() {
        // ...
        System.out.println(toString() + ": calling doSomeThing");
        super.doSomeThing();
    }
}

在上面这段代码中,如果没有可重入的锁,那么就会产生死锁。

活跃性与性能

SynchronizedCachingFactorizer 中,同步方式是对整个 service 方法进行同步。这种简单且粗粒度的方法能确保线程安全性,但付出的代价却很高。这个 service 方法每次只有一个线程能执行,背离了 Servlet 框架的初衷。

可以将代码修改为使用独立的同步代码块。

@ThreadSafe
public class CachingFactorizer implements Servlet {
   private BigInteger lastNumber;

   private BigInteger[] lastFactors;

   public synchronized void service(ServletRequest req, ServletResponse resp) {
       BigInteger i = extractFromRequest(req);
       BigInteger[] factors = null;

       synchronized (this) {
           if (i.equals(lastNumber)) {
               ++cacheHits;
               factors = lastFactors.clone();
           }
       }

       if (factors == null) {
           factors = factor(i);
           synchronized (this) {
               lastNumber = i;
               lastFactors = factors.clone();
           }
       }

       encodeIntoResponse(resp, factors);
   }
}

重构后的 CachingFactorizer 实现了在简单性和并发性之间的平衡。要判断同步代码块的合理大小,需要在各种设计需求之间进行权衡,包括安全性、简单性和性能。

对象的共享

可见性

public class NoVisibility {
  	private static boolean ready;
  	private static int number;
  
  	private static class ReaderThread extends Thread {
      	@Override
      	public void run() {
          	while (!ready) {
              	Thread.yield();
              	System.out.println(number);
            }
        }
    }
  
  	public static void main(String[] args) {
      	new ReaderThread().start();
      	number = 42;
      	ready = true;
    }
}

在上面这段代码中,虽然看起来会输出42,但事实很有可能输出0,或者根本无法终止。因为代码中根本没有使用足够的同步机制,无法保证主线程写入的 ready 值和 number 值对于读线程来说是可见的。

这种现象被称作“重排序(Reordering)”。在没有同步的情况下,编译器、处理器以及运行时等都可能对操作的执行顺序进行调整(这种设计有利于JVM利用现代多核处理器的性能)。

非原子的64位操作

对于非 volatile 类型的 longdouble 变量,JVM允许将64位的操作分解为两个32位的操作。当读取非 volatile 类型的 longdouble 变量时,如果对该变量的读操作和写操作在不同的线程中执行,那么很可能会读取到某个值的高32位和另一个值的低32位。因此即使不考虑数据失效的问题,多线程中使用共享且可变的 longdouble 变量也是不安全的。

volatile 变量

当把变量声明为 volatile 类型,编译和运行时都会注意到这个变量是共享的,因此不会将该变量上的操作与其他内存操作一起重排序。volatile 变量不会被缓存在寄存器或者其他处理器不可见的地方。

volatile 是一种比 synchronized 更轻量级的同步机制。volatile 通常用作某个操作完成、发生或状态的标志。需要注意 volatile 的语义不足以确保递增操作 count++ 的原子性。

PS:仅当 volatile 能简化代码的实现以及对同步策略的验证时,才应该使用。

发布与逸出

发布(Publish)一个对象的意思是指,使对象能够在当前作用域之外的代码中使用。在许多情况下,我们需要确保对象以及内部状态不被发布;而在某些情况下,我们又需要发布某个对象,并且如果在发布时要确保线程安全性,则可能需要同步。

发布内部状态可能会破坏封装性,并使得程序难以维持不变性条件。当某个不应该发布的对象被发布时,这种情况就被称作逸出(Escape)。如下面代码段所示:

class UnsafeStates {
  	private String[] states = new String[] {
      	"Success", "Error", "Fail"
    };
  	public String[] getStates() {
      	return states;
    }
}

数组 status 已经逸出了它所在的作用域,这本应是个私有的变量。我们无法知道其他线程会对已发布的引用执行什么操作,误用该引用的风险是中存在。当某个对象逸出后,我们必须假设有某个类或线程可能会误用该对象。这正是需要使用封装的最主要原因:封装能够使得对程序的正确性进行分析变得可能,并使得无意中破坏设计约束条件变得更难。

线程封闭

当访问共享的可变数据时,通常需要使用同步。一种避免使用同步的方式就是不共享数据。如果仅在单线程内访问数据,就不需要同步。这种技术被称作线程封闭(Thread Confinement)。当某个对象封闭在一个线程中时,这种用法将自动实现线程安全性,技术被封闭的对象本身不是线程安全的。

Ad-hoc 线程封闭

Ad-hoc 线程封闭是指,维护线程封闭性的职责完全由程序实现来承担。

栈封闭

栈封闭是线程封闭的一种特例,在栈封闭中,只能通过局部变量才能访问对象。

ThreadLocal 类

这个类能使线程中某个值与保存值的对象关联起来。ThreadLocal 提供了 getset 等访问接口或方法,这些方法为每个使用该变量的线程都存有一份独立的副本,因此 get 总是返回由当前执行线程在调用 set 时设置的最新值。

ThreadLocal 对象通常用于防止对可变的单实例变量(Singleton)或全局变量进行共享。

例如,JDBC 的连接对象不一定是线程安全的,因此当多线程程序中在没有协同的情况下使用全局变量时就不是线程安全的。通过将 JDBC 保存到 ThreadLocal 对象中,每个线程都会拥有属于自己的连接,如下所示。

private static ThreadLocal<Connection> connectionHolder
    = new ThreadLocal<Connection>() {
    	public Connection initialValue() {
            return DriverManager.getConnetion(DB_URL);
        }
	};

public static Connection getConnection() {
    return connetionHolder.get();
}

不变性

如果某个对象在被创建后其状态就不能被修改,那么这个对象就成为不可变对象。不可变对象一定是线程安全的

当满足以下条件时,对象才是不可变的:

安全发布

可变对象必须通过安全的方式来发布,这通常意味着在发布和使用该对象的线程时都必须使用同步。

要安全地发布一个对象,对象的引用以及对象的状态必须同时对其他线程可见。一个正确构造的对象可以通过以下方式来安全地发布:

对象的组合

实例封闭

如果某对象不是线程安全的,那么可以通过多种技术使其在多线程程序中安全地使用。你可以确保该对象只能由单个线程访问(线程封闭),或者通过一个锁来保护对该对象的所有访问。

将数据封装在对象内部,可以将数据的访问限制在对象的方法上,从而更容易确保线程在访问数据时总能持有正确的锁。

@ThreadSafe
public class PersonSet {
    private final Set<Person> mySet = new HashSet<Person>();
    
    public synchronized void addPerson(Person p) {
        mySet.add(p);
    }
    
    public synchronized boolean containsPerson(Person p) {
        return mySet.contains(p);
    }
}

上面的代码段通过封闭与加锁等机制使一个类成为线程安全的(即使这个类的状态变量 HashSet 并不是线程安全的)。由于 mySet 是私有的并且不会逸出,因此 HashSet 被封闭在 PersonSet 中。但如果 Person 类是可变的,那么在访问从 PersonSet 中获得的 Person 对象时,还需要额外的同步。

Java 监视器模式

Java 监视器模式仅仅是一种编写代码的约定,对于任何一种锁对象,只要自始至终都是用该锁对象,都可以用来保护对象的状态。下面的代码即是用私有锁来保护状态。

public class PrivateLock {
    private final Object myLock = new Object();
    Widget widget;
    
    void someMethod() {
        synchronized(myLock) {
            // 修改或访问 widgte 的状态
        }
    }
}

基于监视器模式的示例

基于监视器模式的车辆追踪

@ThreadSafe
public class MonitorVehicleTracher {
    private final Map<String, MutablePoint> locations;
    
    public MonitorVehicleTracher(Map<String, MutablePoint> locations) {
        this.locations = this.deepCopy(locations);
    }
    
    public synchronized Map<String, MutablePoint> getLocations() {
        return this.deepCopy(locations);
    }
    
    public synchronized MutablePoint getLocation(String id) {
        MutablePoint loc = locations.get(id);
        return null == loc ? null : new MutablePoint(loc);
    }
    
    public synchronized void setLocation(String id, int x, int y) {
        MutablePoint loc = locations.get(id);
        if(null == loc) {
            throw new IllegalArgumentException("No such ID: " + id);
        }
        loc.x = x;
        loc.y = y;
    }

    private static Map<String, MutablePoint> deepCopy(Map<String, MutablePoint m) {
        Map<String, MutablePoint> result = new HashMap<String, MutablePoint>();
        for (String id : m.keySet()) {
            result.put(id, new MutablePoint(m.get(id)));
        }
        return Collections.unmodifiableMap(result);
    }
}

@NotThreadSafe
public class MutablePoint {
    public int x, y;
    
    public MutablePoint() {
        x = 0;
        y = 0;
    }
    
    public MutablePoint(MutablePoint p) {
        this.x = p.x;
        this.y = p.y;
    }
}

虽然 MutablePoint 不是线程安全的,但追踪器类是线程安全的。其所包含的 Map 对象和可变的 Point 对象都未曾发布。

在某种程度上,这种实现方式是通过在返回客户代码之前复制可变的数据来维持线程安全性的。通常情况下,这并不存在性能问题,但在车辆容器非常大的情况将极大地降低性能(由于 deepCopy 是从一个 synchronized 方法中调用的,因此在执行时间较长的复制操作中,tracker 的内置锁将一直被占有,当有大量车辆需要追踪时,会严重降低用户界面的响应灵敏度)。

此外由于每次调用 getLocation 就要复制数据,因此将出现一种错误情况——虽然车辆实际位置发生了变化,但返回的信息却保持不变。这种情况是好是坏,取决于需求。如果在 location 集合上存在内部的一致性需求,那么这就是优点,在这种情况下返回一致的快照就非常重要。然而,如果调用者需要每辆车的最新信息,那么这就是缺点,因为这需要非常频繁地刷新快照。

线程安全性的委托

大多数对象都是组合对象。当从头开始构建一个类,或者将多个非线程安全的类组合为一个类时,Java 监视器模式是非常有用的。但是,如果类中的各个组件都已经是线程安全的,会是什么情况呢?在某些情况下,其是线程安全的,而在某些情况下,这仅仅是一个好的开端。

基于委托的示例

基于委托的车辆追踪

@ThreadSafe
public class DelegatingVehicleTracher {
    private final ConcurrentMap<String, Point> locations;
    private final Map<String, Point> unmodifiableMap;
    
    
    public DelegatingVehicleTracher(Map<String, Point> points) {
        this.locations = new ConcurrentMap<String, Point>(points);
        this.unmodifiableMap = Colletions.unmodifiableMap(this.locations);
    }
    
    public synchronized Map<String, Point> getLocations() {
        return this.unmodifiableMap;
    }
    
    public synchronized Point getLocation(String id) {
        return locations.get(id);
    }
    
    public synchronized void setLocation(String id, int x, int y) {
        if (locations.replace(id, new Point(x, y)) == null) {
            throw new IllegalArgumentException("invalid vechile name: " + id);
        }
    }
}

@Immutable
public class Point {
    public final int x, y;
    
    public Point(Point p) {
        this.x = p.x;
        this.y = p.y;
    }
}

由于 Point 类是不可变的,因而它是线程安全的。不可变的值可以被自由地共享与发布,因此在返回 location 时不需要复制。

DelegatingVehicleTracker 中没有使用任何显式的同步,所有对状态的访问都由 ConcurrentMap 来管理,而且 Map 中所有的键值都是不可变的。

需要注意的是,getLoactions 返回的是一个不可修改但却实时的车辆位置视图。这可能是优点,也可能是缺点,取决于你的需求。

如果需要一个不发生变化的车辆视图,那么 getLocations 可以返回对 locations 这个 Map 对象的一个浅拷贝(Shallow Copy)。如下所示:

public synchronized Map<String, Point> getLocations() {
    return Colletions.unmodifiableMap(new HashMap<String, Point>(locations));
}

发布底层的状态变量

当把线程安全性委托给某个对象的底层状态变量时,在什么条件下才可以发布这些变量从而使其他类能修改他们?答案仍然取决于在类中对这些变量施加了哪些不变性条件。

如果一个状态变量是线程安全的,并且没有任何不变性条件来约束它的值,在变量的操作上也不存在任何不允许的状态转换,那么就可以安全地发布这个变量。

发布状态的示例

发布状态的车辆追踪

@ThreadSafe
public class PublishingVehicleTracher {
    private final ConcurrentMap<String, SafePoint> locations;
    private final Map<String, SafePoint> unmodifiableMap;
    
    
    public PublishingVehicleTracher(Map<String, SafePoint> points) {
        this.locations = new ConcurrentMap<String, SafePoint>(points);
        this.unmodifiableMap = Colletions.unmodifiableMap(this.locations);
    }
    
    public synchronized Map<String, SafePoint> getLocations() {
        return this.unmodifiableMap;
    }
    
    public synchronized Point getLocation(String id) {
        return locations.get(id);
    }
    
    public synchronized void setLocation(String id, int x, int y) {
        if (!locations.containsKey(id)) {
            throw new IllegalArgumentException("invalid vehicle name: " + id);
        }
        locations.get(id).set(x, y);
    }
}

@ThreadSafe
public class SafePoint {
    private int x, y;
    
    public SafePoint(int[] a) {
        this(a[0], a[1]);
    }
    
    public SafePoint(SafePoint p) {
        this(p.get())
    }
    
    public SafePoint(int x, int y) {
        this.x = x;
        this.y = y;
    }
    
    public synchronized int[] get() {
        return new int[] {x, y};
    }
    
    public synchronized void set(int x, int y) {
        this.x = x;
        this.y = y;
    }
}

PublishingVehicleTracker 将其线程安全性委托给底层的 ConcurrentHashMap,但 Map 中的元素是线程安全且可变的 PointgetLocation 方法返回的是 Map 中的一个不可变副本。

在现有的线程安全类中添加功能

重用能降低开发工作量、开发风险(因为所有的类都已经通过测试)以及维护成本。但更多时候,现有的类只能支持大部分的操作,此时就需要在不破坏线程安全性的情况下添加一个新的操作。

例如,假设需要一个线程安全的链表,它需要提供一个原子的“若没有则添加”的操作。

要添加一个新的原子操作,最安全的方法是修改原始的类,但这通常无法做到,因为你可能无法访问或修改类的源代码。要想修改原始的类,就需要理解代码中的同步策略,这样新增加的功能才能与原有的设计保持一致。如果直接将新方法添加到类中,那么意味着实现同步策略的所有代码仍然处于一个源代码文件中,从而更容易理解和维护。

另一种方法是扩展这个类(假定在设计这个类时考虑了可扩展性)。比如下面的扩展 Vector。但不是所有的类都像 Vector 那样将状态向子类公开,因此也就不适用这种方法。

@ThreadSafe
public class BetterVector<E> extends Vector<E> {
    public synchronized boolean putIfAbsent(E x) {
        boolean absent = !contains(x);
        if (absent) {
            add(x);
        }
        return absent;
    }
}

扩展的方法比直接将代码添加到类中更加脆弱,因为现在的同步策略实现被分布到多个单独维护的源代码文件中。如果底层的类改变了同步策略并选择了不同的锁来保护它的状态变量,那么子类会被破坏,因为在同步策略改变后它无法再使用正确的锁来控制对基类状态的并发访问。

客户端加锁机制

先看一段错误的代码。

@NotThreadSafe
public class ListHelper<E> {
    public List<E> list = Colletions.synchronizedList(new ArrayList<E>());
    
    public synchronized boolean putIfAbsent(E x) {
        boolean absent = !list.contains(x);
        if (absent) {
            list.add(x);
        }
        return absent;
    }
}

这段代码的问题在于在错误的锁上进行了同步。无论 List 使用哪一个锁来保护它的状态,可以确定的是,这个锁并不是 ListHelper 上的锁。ListHelper 只是带来了同步的假象,尽管所有的链表操作都被声明为 synchronized,但却使用了不同的锁,这意味着 pubIfAbsent 相对于 List 的其他操作来说并不是原子的

要想使这个方法能正确执行,必须使 List 在实现客户端加锁或外部加锁时使用同一个锁。客户端加锁指,对于使用某个对象 X 的客户端代码,使用 X 本身用于保护其状态的锁来保护这段客户端代码。如下:

@ThreadSafe
public class ListHelper<E> {
    public List<E> list = Colletions.synchronizedList(new ArrayList<E>());
    
    public boolean putIfAbsent(E x) {
        synchronized (list) {
            boolean absent = !list.contains(x);
            if (absent) {
                list.add(x);
            }
            return absent;
        }
    }
}

然而,客户端加锁比扩展类更加脆弱,因为它将类 C 的加锁代码放到与 C 完全无关的其他类中。

组合

下面的代码段使用组合(Composition)的方式为现有的类添加一个原子操作。通过将 List 对象的操作委托给底层的 List 示例来实现 List 的操作。(ImprovedList 假设把某个链表对象传给构造函数以后,客户代码不会再直接使用这个对象,而是通过 ImprovedList 来访问)

@ThreadSafe
public class ImprovedList<T> implements List<T> {
    private final List<T> list;
    
    public ImprovedList(List<T> list) {
        this.list = list;
    }
    
    public synchronized boolean putIfAbsent(E x) {
        boolean absent = !list.contains(x);
        if (absent) {
            list.add(x);
        }
        return absent;
    }
    
    public synchronized void clear() {
        list.clear();
    }
    // ... 按照类似的方式委托List的其他方法
}