0%

线程资产之ThreadLocal

源码来自open jdk8.0

使用

ThreadLocal用于在线程中保存一份对线程独有的变量内容,以使变量在线程中被复用,同时不受其他线程的影响。比如SimpleDateFormat实例的使用,作为一个格式工具,SimpleDateFormat可用于解析或生成日期字符串,从性能开销上讲,具有复用价值,但是比较蛋疼的是,此类实例的方法却不是线程安全的,因此在多线程环境中,很容易产生难以琢磨的怪象。因此激进的做法是在调用实例的方法时,使用锁或管程限制方法的并发调用,反而产生了锁的额外开销,并限制了程序的并发能力。考虑在线程中初始化对象,保存到ThreadLocal,实现进程内的复用。

set

1
2
3
4
5
6
7
ThreadLocal<SimpleDateFormat> threadLocal = new ThreadLocal<SimpleDateFormat>();
Thread thread = new Thread(new Runnable(){
@override
public void run(){
threadLocal.set(new SimpleDateFormat("%y%m%d"));
}
});

get

1
SimpleDateFormat format = threadLocal.get();

实际上,ThreadLocal对象本身是不存储数据的,只是变量的查询入口(以下全文称为变量的key),初始化时生成一个唯一哈希码常量,用于变量值存取,实际的变量被存储到线程相关的数据结构上了,虽然ThreadLocal并不是final类型的,但其对象内不含有成员变量,成员函数操作的对象也是调用者所在线程中的数据,因此,ThreadLocal是线程安全的,具体代码以下为分析。

源码分析

set

先看下set源码

1
2
3
4
5
6
7
8
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

getMap即获取线程内的变量保存结构threadLocals变量

1
2
3
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

其在createMap中,以ThreadLocal实例为键,变量为值,对threadLocals进行初始化

1
2
3
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}

ThreadLocalMap为开地址法实现的哈希表,源码为

1
2
3
4
5
6
7
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}

在具体实现上,变量被存储到一个Entry数组中,Entry是ThreadLocal的弱引用(WeakReference)子类,增加了被引用变量作为成员(强引用),表明,当JVM触发gc时,变量key可能被清理(当key没有被强引用或弱引用),从而该变量也就失去了查询入口。如此设计的原因,是ThreadLocal作为变量查询的依据,可以在线程的任何地方创建,调用set时会被插入到threadLocals中的Entry对象引用,而当线程启动后,threadLocals作为引用根,其所强引用的对象都是不会回收的,如果ThreadLocal为强引用,则由于threadLocals的引用,变量key无法被gc回收(相应的其关联的变量也无法被回收)。有不少人认为,使用ThreadLocal设置过的变量,应该有明确对应的remove调用,否则会产生内存泄漏。我认为并非如此确定,除非变量key的生命周期太长,后面分析源码我们会发现,一旦变量key被回收,threadLocals中的变量引用(此时的Entry被称为stale entry)也会非常快的通过接下来的查询或更新被清理,引用变量的强引用不会一直被threadLocals保持。
ThreadLocalMap的set方法源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);

for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();

if (k == key) {
e.value = value;
return;
}

if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}

tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}

主要用于解决哈希冲突,删除被gc清除的对象引用,调整哈希表大小。

  • 哈希冲突的解决办法为开地址法,算法描述就不赘述了。
  • 对象的清除过程比较复杂,无论是否产生哈希冲突,都会进行清理,其中有如下几个关键方法:expungeStale,cleanSomeSlots,replaceStaleEntry。
  • 当没有对象被清理且哈希表负载因子达到阀值,触发哈希表调整,入口函数为rehash。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    // 替换存储列表中staleSlot位置处的值
    private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;
    Entry e;

    // 查找替换位置处往前连续个stale entry的序列起始索引。
    int slotToExpunge = staleSlot;
    for (int i = prevIndex(staleSlot, len);
    (e = tab[i]) != null;
    i = prevIndex(i, len))
    if (e.get() == null)
    slotToExpunge = i;

    // 由于开地址可能造成的元素后移,从替换位置开始遍历,来查找前值,直至遇到一个空位退出循环。期间,如果碰到key与查询key相同的元素,则将此元素插入到staleSlot,原位置处插入新的变量,并返回,这样可以保证哈希表是正确的。完成后,调用cleanSomeSlots来清理被gc回收的引用。
    for (int i = nextIndex(staleSlot, len);
    (e = tab[i]) != null;
    i = nextIndex(i, len)) {
    ThreadLocal<?> k = e.get();

    if (k == key) {
    e.value = value;

    tab[i] = tab[staleSlot];
    tab[staleSlot] = e;

    if (slotToExpunge == staleSlot)
    slotToExpunge = i;
    // 开始清理stale entry
    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
    return;
    }

    if (k == null && slotToExpunge == staleSlot)
    slotToExpunge = i;
    }

    // 没有相同key元素,直接插入指定位置
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);

    // 清除被回收句柄
    if (slotToExpunge != staleSlot)
    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    // 清除指定位置元素,并对其后的开地址元素进行调整
    private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;

    // 指定位置直接清除。
    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    size--;

    // 开地址产生的空位,需要探查其后的非空元素,进行重新哈希。
    Entry e;
    int i;
    for (i = nextIndex(staleSlot, len);
    (e = tab[i]) != null;
    i = nextIndex(i, len)) {
    ThreadLocal<?> k = e.get();
    if (k == null) {
    e.value = null;
    tab[i] = null;
    size--;
    } else {
    int h = k.threadLocalHashCode & (len - 1);
    if (h != i) {
    tab[i] = null;

    // 哈希冲突
    while (tab[h] != null)
    h = nextIndex(h, len);
    tab[h] = e;
    }
    }
    }
    return i;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // 从指定位置i开始,进行logn次探查清理,如果有元素被清理,返回true,否则返回false。logn是对一次探查和全局探查之间的权衡选择,即考虑速度,又考虑清理效果。
    private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {
    i = nextIndex(i, len);
    Entry e = tab[i];
    if (e != null && e.get() == null) {
    n = len;
    removed = true;
    i = expungeStaleEntry(i);
    }
    } while ( (n >>>= 1) != 0);
    return removed;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    private void rehash() {
    // 清除所有stale entry
    expungeStaleEntries();

    // 超过负载因子阀值,进行扩容
    if (size >= threshold - threshold / 4)
    resize();
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    // 哈希表扩容,每次容量增加一倍。
    private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2;
    Entry[] newTab = new Entry[newLen];
    int count = 0;

    for (int j = 0; j < oldLen; ++j) {
    Entry e = oldTab[j];
    if (e != null) {
    ThreadLocal<?> k = e.get();
    if (k == null) {
    e.value = null; // Help the GC
    } else {
    int h = k.threadLocalHashCode & (newLen - 1);
    while (newTab[h] != null)
    h = nextIndex(h, newLen);
    newTab[h] = e;
    count++;
    }
    }
    }

    setThreshold(newLen);
    size = count;
    table = newTab;
    }

get

获取变量的过程相对就简单多了

1
2
3
4
5
6
7
8
9
10
11
12
13
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

核心为getEntry实现

1
2
3
4
5
6
7
8
9
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
// 直接查找,在负载因子不高的情况下,直接查找一般都能获得结果
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 当直接查找失败时,从查找位置只会开始往后探查,直至查到目标元素或遇到空位
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;

while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
if (k == null)
// 如果发现元素的ThreadLocal变量被gc回收,则清除元素
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}

remove

前面分析可以知道,get及set都可能对threadLocals中的变量进行清理,依据是其中元素的key是否存在。但是有时需要提前进行元素清理,或程序表意中明确需要删除某个变量,可以调用remove方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
// 通过开地址法找到匹配的元素位置,清理此元素,并对其后的连续非空元素进行调整。
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear();
expungeStaleEntry(i);
return;
}
}
}

总结

用于线程内变量保持复用,ThreadLocal是非常方便的,比如非线程安全的单例,可以拆分成每个线程独享的实例,来避免并发冲突,而不用额外的锁开销。但是,通过对源码的分析,依然需要注意:

  1. 对于冲突的避免,ThreadLocal是使得变量被线程排他性访问,如果变量的引用被传递给其他线程,则冲突依然无法避免。
  2. 确实存在内存泄露的可能,在线程使用ThreadLocal复用少量大对象的时候尤其明显,发生在ThreadLocal在remove之前被gc回收情况下,如果之后又没有触发线程threadLocals哈希表的stale entry清理,变量将一直被根引用threadLocals强引用无法回收,且无法通过常规手段再获得此数据引用,也就是说,这块内存产生了泄露。