关于多线程本质的思考和使用技巧
前言
近来,公司因为项目过多,人手不足,一直在进行面试。过程中同事总是问道:多线程是什么,谈谈你对多线程的理解?以我愚见,这并不是一个可以在面试中快速回答的问题,如果面试的时候向我提问,我觉得我无法有条理的回答这个问题。因此,以总结多线程开发为目标,我写下这篇笔记,用于记录自己对多线程的理解和思考,以备不时之需。
什么是多线程
不论初入开发生涯的小白和深耕多年的老兵,提及多线程,第一想到就是加锁,用来确保代码正确执行,避免程序调度的不可预测性导致的错误。但这只是问题的表层,在复杂的并发场景中,锁只是工具,而不是答案。多线程开发远不止“避免冲突”,它是一场在“性能”与“正确性”之间的博弈。
多线程开发的目标
多线程开发的根本目标:在并发环境下,正确高效的保证对共享资源的访问。
- 正确性:不论线程如何调度,指令如何优化,确保程序的正常运行。
- 性能:充分利用多核 CPU,提升吞吐、响应速度。
多线程开发问题的本源
我将多线程问题总结为这四类。
问题类型 | 描述 |
---|---|
指令重排 | CPU 或编译器为了优化,会调整语句顺序执行,导致逻辑失真 |
缓存不一致 | 不同线程可能看到同一个变量的不同值 |
非原子操作 | 多步骤操作中途被其他线程打断,导致逻辑出错 |
同步代价 | 加锁带来上下文切换和等待,降低性能 |
代码实践
-
指令重排序
指令重排并不会影响单线程的语义,但在多线程环境中,它可能导致“已经构造的对象”被其他线程提前访问,触发
NullReferenceException
或逻辑错误。例:双重检查锁下的单例初始化
class Singleton { private static volatile Singleton _instance; private static object _lock = new object(); public static Singleton Instance { get { if (_instance == null) { lock (_lock) { if (_instance == null) { _instance = new Singleton(); // 非原子操作 + 指令重排 } } } return _instance; } } }
volatile
禁止指令重排没有 volatile 的场景下,可能出现对象“已分配但未初始化”
延伸方案
- 使用
Lazy<T>
避免双检锁与重排问题 - 使用
Thread.MemoryBarrier()
精细控制执行顺序
- 使用
-
缓存不一致
多核 CPU 每个核心拥有自己的缓存,导致线程对同一变量的访问结果可能不一致。
例:线程 A 设置标志位,线程 B 却一直看不到变化
volatile bool _shouldStop = false;
void Worker() {
while (!_shouldStop) {
// do something
}
}
void Stop() {
_shouldStop = true;
}
加上 volatile
保证线程 B 能“看到”线程 A 的写入
或者通过锁封装 _shouldStop
,隐式解决可见性
-
非原子操作
例:多线程计数器累加出错
int counter = 0; Parallel.For(0, 10000, i => { counter++; // 错误! 非原子操作 }); int counter = 0; Parallel.For(0, 10000, i => { Interlocked.Increment(ref counter); // 正确 原子操作 });
避免使用 lock 的性能开销
支持 Increment, Decrement, CompareExchange 等原子操作
-
同步代价
例:任务过多时线程阻塞严重,导致性能瓶颈
SemaphoreSlim semaphore = new SemaphoreSlim(10); Parallel.ForEach(tasks, async task => { await semaphore.WaitAsync(); try { await DoWork(task); } finally { semaphore.Release(); } });
替代思路
- 限流但不阻塞的任务调度:
Channel<T>
+ 消费者模型 - 利用
Task.Factory.StartNew
创建长时间运行任务,避免线程饥饿 - 用对象池(比如
ConcurrentBag
)重用资源,减少锁粒度
- 限流但不阻塞的任务调度:
延伸多线程的使用方式
以下是我在多线程开发中常用的一些工具,它们不是简单的 API,而是有明确使用语境、性能取舍的并发工具。
-
ConcurrentDictionary.GetOrAdd()
: 在高并发场景中,我们经常希望“某个对象在多个线程中只初始化一次”,传统做法可能是加锁或双重检查,但
ConcurrentDictionary
通过内部的分段锁和原子操作,实现了线程安全的初始化逻辑:var instance = dict.GetOrAdd(key, k => new ExpensiveObject());
-
lock
,Monitor
,SpinLock
:lock
/Monitor.Enter/Exit
lock (_lockObj) { // 临界区 }
- 最常用的同步方式,基于
Monitor
- 自动释放锁,结构清晰,推荐首选
Monitor.TryEnter
:支持超时if (Monitor.TryEnter(_lockObj, TimeSpan.FromSeconds(2))) { try { /* ... */ } finally { Monitor.Exit(_lockObj); } }
SpinLock
:避免上下文切换的高性能锁SpinLock _spinLock = new SpinLock(); bool lockTaken = false; _spinLock.Enter(ref lockTaken); // 临界区 _spinLock.Exit();
- 适合锁持有时间极短的场景
- 无线程切换,减少调度开销
- 注意死锁风险 + 不支持递归加锁
- 最常用的同步方式,基于
-
ThreadLocal<T>
:多线程共享变量容易引发冲突,不如不共享。
ThreadLocal<T>
允许每个线程持有自己的副本,避免锁:ThreadLocal<Random> rng = new ThreadLocal<Random>(() => new Random()); int num = rng.Value.Next();
- 常用于 Random、日志上下文、缓冲区等隔离场景
- 不适合长生命周期对象(会引起内存泄漏)
-
线程池调度:
Task.Run
,TaskFactory
,Parallel.ForEach
Task.Run
:将工作提交给线程池,避免频繁创建线程TaskFactory.StartNew
:高级配置(调度器、长时间运行等)Parallel.ForEach
:简洁处理并行集合任务(如批处理、文件处理)
Parallel.ForEach(myList, item => { Process(item); });
注意:线程池线程默认不能被控制上下文,如需隔离状态应结合
ThreadLocal
或信号量。 -
[MethodImpl(MethodImplOptions.Synchronized)]
:方法级同步声明(不推荐)[MethodImpl(MethodImplOptions.Synchronized)] void MyCriticalMethod() { // 隐式锁定 this }
等价于在方法体前加 lock(this),可能导致外部死锁,不透明、难调试
-
Lazy<T>
:Lazy<HeavyObject> lazyObj = new Lazy<HeavyObject>(() => new HeavyObject()); var obj = lazyObj.Value; // 初始化只发生一次
- 内部实现使用双检锁+volatile,线程安全
- 默认线程安全(LazyThreadSafetyMode.ExecutionAndPublication)
-
System.Threading.Channels
:var channel = Channel.CreateUnbounded<string>(); // Producer await channel.Writer.WriteAsync("msg"); // Consumer await foreach (var msg in channel.Reader.ReadAllAsync()) { Process(msg); }
- 内部使用环形缓冲 + 原子操作,无需锁
- 广泛用于 高性能日志、异步消息、管道通信
-
CancellationToken
:var cts = new CancellationTokenSource(); var token = cts.Token; var task = Task.Run(() => { while (!token.IsCancellationRequested) { // work } }, token); cts.Cancel(); // 触发取消
- 支持协作式停止线程
- 适用于定时任务、消费者线程、异步服务
-
BlockingCollection
:线程安全的队列 + 阻塞消费var queue = new BlockingCollection<string>(); // Producer Task.Run(() => { for (int i = 0; i < 100; i++) { queue.Add($"msg-{i}"); } queue.CompleteAdding(); }); // Consumer foreach (var msg in queue.GetConsumingEnumerable()) { Console.WriteLine(msg); }
- 自动处理线程同步
- 自动等待生产或消费,无需手动
wait
或signal
- 适合简化 Producer-Consumer 模型
-
使用
ValueTask
减少分配(高频异步方法)public ValueTask<int> ReadAsync() { if (_cachedResult != null) { return new ValueTask<int>(_cachedResult); } return new ValueTask<int>(ReadFromDiskAsync()); }
- 避免频繁创建 Task 对象
- 适合“同步返回的概率高”的场景,如缓存读取
来源链接:https://www.cnblogs.com/simonJameson/p/18899605
如有侵犯您的版权,请及时联系3500663466#qq.com(#换@),我们将第一时间删除本站数据。
暂无评论内容