V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
bthulu
V2EX  ›  程序员

一个线程更新数据, 多个线程读数据, 这种怎么保证线程安全?

  •  
  •   bthulu · 274 天前 · 4240 次点击
    这是一个创建于 274 天前的主题,其中的信息可能已经有所发展或是发生改变。

    .Net 相关
    线程 0 调用硬件异步 API, 拿到数据后, 从 devices 根据 id 取到 Device 实例, 更新硬件最新数据到这个实例上.
    同时有多个监控线程每隔 100 毫秒读取一次所有设备状态, 并根据设备状态执行一次或多次耗时较长的异步操作, 并在异步操作执行完成后, 对硬件数据进行部分更新.
    这个要怎么做才能确保线程安全?

        // 设备集中存储处
        ConcurrentDictionary<int, Device> devices = new();
    
        // 设备类
        public class Device
        {
            public int Id { get; init; }
            public bool Enable { get; set; }
            public string Group { get; init; } = "";
            public int[] Locations { get; init; } = Array.Empty<int>();
            public int Margin { get; set; }
            public int RsCount { get; init; }
            public bool EnableSplit { get; init; }
            public int DynamicMerge { get; set; }
            public int Width { get; set; }
            public int Length { get; set; }
            public int LeftLength { get; set; }
            public int LoadEdge { get; set; }
            public int Dest { get; set; }
        }
    
        // 数据更新线程相关
        public Thread0Executor()
        {
        	public async Task Execute()
            {
                var data = await GetDataFromHardwareApi();
                Update(data, devices);
            }
        }
    
        // 数据监控处理线程相关
        public MonitorThreadExecutor()
        {
        	public async Task Execute()
            {
                Resolve(devices);
                await Operate0();
                DoSomething();
                await Operate1();
                DoSomething();
            }
            
            public async Task Operate0()
            {
                try
                {
                	await CallApi();
                	Update(devices);
                }
                catch()
                {
                	UpdateIfError(devices);
                }
            }
        }
    

    异步方法中根本没办法使用锁, 顶多用用信号量 Semaphore 来代替锁.

    这里也不能对整个 Execute 方法用锁. 因为监控线程中的异步操作耗时是不一定的, 可能因为网络问题花个几分钟都有可能.

    貌似也没法仅对非异步代码进行加锁, 因为同步异步代码是混杂在一块的, 没法单独对非异步代码进行加锁.

    也考虑过弄个类似 ANDROID 里的 UI 线程和子线程的东西, 数据读取和更新都放在 UI 线程里, 异步操作放在子线程里. 但是搞了半天没搞出来.

    最后的最后, 实在没办法了, 我在想要不把 Device 的所有属性都加一个 volatile 关键字. 我这里更新数据的时候基本不会看原来数据是多少, 不会出现count++这种情况, 貌似 volatile 是可行的. 但是实际这个 Device 有几十个属性, 并且有一两千个 Device, 如果每个属性都加一个 volatile 关键字, 那就是 2000*50=100 万个属性带 volatile 了. 这会不会极大地影响程序运行性能?

    47 条回复    2024-03-08 14:44:46 +08:00
    svnware
        1
    svnware  
       274 天前
    单写多读不就已经是线程安全的了么。。。
    wamson
        2
    wamson  
       274 天前 via iPhone
    看标题,寻思,这不就是个读写锁么😳
    laminux29
        3
    laminux29  
       274 天前
    你这不是一线程更新,多线程读,而是多线程读写。

    这种问题,没把握的话,直接丢给 MSSQL ,如果对数据一致性要求严谨,用序列化级别的事务去操作数据。
    如果要求不严谨,直接用 EF 的乐观锁或最终一致性。
    wayne1007
        4
    wayne1007  
       274 天前
    double buffer ,写线程 先 load 数据,然后和更新 buffer 的 idx 0->1 或者 1->0
    wayne1007
        5
    wayne1007  
       274 天前
    @wayne1007 读线程,直接按当前的 idx 读数据,就 idx 切换的瞬间,可能不一致,用信号量也不一定能完全保证,看你这个场景够不够用,不用锁的话
    bthulu
        6
    bthulu  
    OP
       274 天前
    @wamson 读写锁不行的, 异步调用有可能耗时特别长. 总不能某个设备接口耗时过长时, 其他设备都不能用了吧?
    namonai
        7
    namonai  
       274 天前
    @svnware 不一样的。比如一段数据,写入一半的时候被读取,读到的就是 broken 的数据。哪怕是对单个字节进行读写操作,也可能存在问题,所以至少要使用原子操作进行保护。
    codcrafts
        8
    codcrafts  
       274 天前
    我没太懂,你这种情况下会有线程安全问题吗?我感觉不会
    bthulu
        9
    bthulu  
    OP
       274 天前
    @wayne1007 如果用锁, 只要能保证某个线程调用异步操作耗时特别长时, 其他线程可以干活而不是在那干等着就行.
    bthulu
        10
    bthulu  
    OP
       274 天前
    @laminux29 丢 SQL 里不到万不得已不考虑, 尽量在内存这一层面解决, 实在么办法了再考虑丢 SQL 里去.
    guo4224
        11
    guo4224  
       274 天前
    临界区……
    namonai
        12
    namonai  
       274 天前   ❤️ 1
    @bthulu 你可以试试 trible buffer ,编号 0 、1 、2 ,读线程实现一个 getIndex(),初始的 valid index 是 0 ,需要对数据更改的时候,index + 1 ,往 1 上写,写完了以后 valide index 也更新到 1 ,这个时候 0 和 1 的数据都是有效的,过了一小段时间,0 就没人访问了。在这段时间里如果又有需要写入的数据,那就往 2 上写。这样子可以始终保证读到的数据是完整的。可以把写入操作放在一个单独的线程里进行,其他线程如果有修改数据的需要,就通过队列传递数据过去。
    billccn
        13
    billccn  
       274 天前
    `devices`这个字典一定程度上就是一个手搓的数据库,你这个里面要考虑的情况很多,比如:

    1. 字典需要动态增删吗?不需要的话这个`Concurrent`是徒增开销
    2. 字典里面的值(就是每个 Device 实例)会被多个线程同时引用吗?字典的`Concurrent`是不会管里面的值是不是 thread-safe
    3. Device 实例需要 referential equivalence 吗?不需要的话建议把这个类变成只读的,每次更新的时候直接替换整个实例最安全
    4. Device 与 Device 之间有关系吗?有的话你可能需要考虑如何 atomically 更新这个字典
    bthulu
        14
    bthulu  
    OP
       274 天前
    @billccn 字典可以保证 Device 实例引用线程安全. 这里主要的问题就是这个 Device 实例上的茫茫多的属性怎么保证线程安全
    geelaw
        15
    geelaw  
       274 天前 via iPhone
    >异步方法中根本没办法使用锁, 顶多用用信号量 Semaphore 来代替锁.
    >貌似也没法仅对非异步代码进行加锁, 因为同步异步代码是混杂在一块的, 没法单独对非异步代码进行加锁.

    规则是 lock 里面不可以有 await (可以实现,但是几乎总是错误的,因此语言层面拒绝这样做),在 async 方法中 lock 是完全 OK 的。

    ConcurrentDictionary 已经确保每次访问它的成员都是原子的,然而这不代表对它的访问逻辑就已经线程安全,比如一段代码里连续访问它的成员两次,那么在中途其他线程可能已经修改过了这个字典。说这点是预防针,楼主在 #14 提到这是为了确保 Device 存在 devices 里面的引用安全。

    要保证每个 Device 实例线程安全,最简单的思路是细粒度,比如操作每个 device 的时候 lock 之。如果操作过程需要异步,那么我想象中楼主说有多个线程查看 devices 并做一些事,意思是如果 A 线程处理了 device1 则 B 线程应该跳过并处理 device2 ,这种情况下因为 device 被占用时无需等待,所以可以用 interlocked operation 实现:

    1. 在 device 上加上一个 int 字段 InUse ,表示目前是否在处理它,初始化为 0 。
    2. 要访问一个 device ,先用 Interlocked.Exchange 查看 InUse 并设置为 1 ,如果 InUse 之前也是 1 ,则跳过。
    3. 否则 InUse 之前是 0 并且被原子设置为 1 ,此时当前方法认为自己接管该 device 并开始异步硬件 API 操作,在 await 结束、处理完 device 后,重新 Exchange 把 InUse 还原为 0 。
    dogfeet
        16
    dogfeet  
       274 天前
    如果更新的时候不看原来的数据,且 [多个监控线程每隔 100 毫秒读取一次所有设备状态, 并根据设备状态执行一次或多次耗时较长的异步操作] 这个过程中数据变化了也没关系的话,可以考虑直接将 Device 变为不可变(所有字段都 readonly)。

    C# 不是特别熟了,devices 本身读写是线程安全的,里面的 device 只要每次更新的时候是替换一个新的不可变对象,这在 java 中是线程安全的。

    几十个字段的拷贝,应该也还好。
    me1onsoda
        17
    me1onsoda  
       274 天前
    @svnware 这是什么道理,难道一个写者一个读者就不要做互斥访问临界区了?
    zzzyk
        18
    zzzyk  
       274 天前
    无锁队列看行不行。
    CLMan
        19
    CLMan  
       274 天前   ❤️ 1
    这个问题的核心是你业务逻辑的“线程安全”是如何定义的(只有你自己知道),至于是用锁、读写锁、Semaphore 、无锁、volatile 等,这些纯粹是实现细节,取决于你对并发相关基础知识(操作系统领域)以及特定语言(这里是.NET )相关库和语法的熟悉程度。

    由于不了解你的业务逻辑实现细节,我只能提问:

    - 线程 0 是只写吗,是否依赖 Device 当前的状态?
    - 监控线程统计所有设备状态时,以及执行异步操作时,是否允许线程 0 进行更新?
    - 监控线程的异步任务与线程 0 是否存在写入相同的内存区域的情况?
    - 监控线程的异步任务是否可能执行超过 100 毫秒,如果超过,是否允许多个监控线程的异步任务同时执行?如果允许,它们的写是否冲突?

    你至少需要补充以上细节,才能让回答者更好的帮你解决问题。
    iceheart
        20
    iceheart  
       274 天前 via Android
    多个副本数据策略。

    属性数据放两个以上副本,由一个 volatile 索引指定最新副本。

    写线程更新副本后再更新索引。

    读线程按索引访问副本数据。
    bthulu
        21
    bthulu  
    OP
       274 天前
    @CLMan 线程 0 只写, 不依赖 Device 当前的状态
    监控线程执行异步操作时,允许线程 0 进行更新
    监控线程的异步任务跟线程 0 写入的就是相同的内存区域
    监控线程的异步任务是轮询执行的, 执行完毕后等 100 毫秒再次执行,且执行时间可能长达几分钟。允许多个监控线程的异步任务同时执行。他们的写存在冲突。
    xuanbg
        22
    xuanbg  
       274 天前
    这……单写不是已经线程安全了么?看内容貌似又不是,OP 还是直接说需求吧,这问题都说不清楚,实在让人挠头。
    zzl22100048
        23
    zzl22100048  
       274 天前
    layxy
        24
    layxy  
       274 天前
    又不是多写,单写多读没啥线程安全问题吧
    1008610001
        25
    1008610001  
       274 天前
    看描述。。。只有一个线程负责写数据 不存在线程安全的问题啊
    lakehylia
        26
    lakehylia  
       274 天前
    简单点,直接用事务线程不行么?其他多个线程都是提交事务给事务线程负责读写,然后事务线程回调结果。
    4kingRAS
        27
    4kingRAS  
       274 天前
    读写操作是原子的吗?原子的,一个线程写根本没多线程问题
    如果不是原子的,先尝试做到原子,做不到就读写时加锁
    wu00
        28
    wu00  
       274 天前
    是不是想太多了?
    ConcurrentDictionary 本就是线程安全集合,TryAdd(),TryUpdate()都是原子操作。
    所以就算你 Thread0 、Monitor1 、Monitor2 三个线程并发 ConcurrentDictionary 进行操作,也不会出现线程安全问题;会出现的是你业务上的“线程安全”问题:到底谁的优先级更高?
    cloud107202
        29
    cloud107202  
       274 天前
    这里可以考虑做个线程读写分离。没接触过 .Net 我会用 Java 的 type 与 API 描述,自行对应一下:

    首先把成员 devices 与相关的操作都封装到一个类型里面,对外暴露一个 public 的阻塞队列成员变量,Java 的话我会用有阻塞语义的 ArrayBlockingQueue. 这个类型在构建的时候(onCreation),启动一个单线程去 poll 这个 Queue. devices 的更新逻辑都由这个单线程完成

    外面的异步操作获取到设备信息后,以 ImmutableEvent 的形式把必要的信息封装描述好,放入队列. 形如 ArrayBlockingQueue<DeviceUpdatedEvent> 这样子,里面的单线程 poll 到事件直接更新 Dictionary 即可。

    最后剩下这个“多个监控线程每隔 100 毫秒读取一次所有设备状态” ,这里简单起见可以将 devices 也设置成 public ,直接在外面访问 devices 成员(重点是:一定要约定好,在 poll 的线程之外的逻辑,全部只能 read 这个 ConcurrentDictionary )。因为 Dictionary 本身使用了线程安全的 ConcurrentDictionary ,对它的 CRUD 是线程安全的,只需要防止外面监控程序获取到某个尚未更新完成的某个 Device 实例(有点像 DB 的脏读),这里给 Device 每个属性设置 volatile 肯定是不合适的:可以考虑前面提到的,在负责 poll 的单线程,获取到更新事件后,不要就地改变 device 对象本身的属性值,而是以 deepCopy 的方式创建个全新的 Device 实例。然后用 ConcurrentDictionary.put(key, value) 的 API 直接更新整个 Device 对象,规避外部监控线程在 scan 的时候,获取到属性更新不完整的 stale state
    jones2000
        30
    jones2000  
       274 天前
    奇偶读写,2 个内存块( 0 号,1 号),0 号写的时候,1 号读。1 号写的时候,0 号读。
    dode
        31
    dode  
       274 天前
    调整锁的粒度
    liuky
        32
    liuky  
       274 天前
    使用阻塞队列 BlockingCollection 试试,
    qping
        33
    qping  
       274 天前
    我感觉 27 楼说的做到写原子操作就可以了

    Device 应该是一个 immutable 得对象,不可变
    想要更新只能 clone ,然后 update 到字典中
    sparklee
        34
    sparklee  
       274 天前
    单个线程更新, 所有需要更新的操作都做成 任务 都放到任务队列
    yansideyu
        35
    yansideyu  
       274 天前
    楼主的问题是所有线程更新数据的时候,需要更新多个属性,怎么避免没有全部更新完的情况下,其他线程读取了数据。拿到了脏数据?
    i8086
        36
    i8086  
       274 天前
    楼主意思应该是多线程更新集合里 Device 类型属性值的问题?

    用 volatile 就好了,目前是最方便。
    qping
        37
    qping  
       274 天前
    又仔细看了下,你是多线程写啊,MonitorThread (多个)和 Thread0 都能更新, 那存在一些问题

    1. MonitorThread 和 Thread0 是否会写入冲突
    如果 MonitorThread 和 Thread0 写入相同得内存,那感觉就是设计有问题
    那我假想他们不会冲突

    2. 多个 MonitorThread 冲突的问题
    多个 MonitorThread 每次都更新全部的 devices ,这个设计也很奇怪

    假设已经做到通过锁或其他手段,保证一个 MonitorThread 更新是原子级别的。
    MonitorThread A 先启动,MonitorThread B 后启动,因为等待时间长 A 的结果却比 B 后写入,这样没有问题吗?

    我觉得,应该可以有多个 MonitorThread 线程,但是每个 Device 只能同一时间被一个 MonitorThread 更新
    实现方法上,可以用队列,每次更新 MonitorThread 从队列中取一个 Device ,如果更新完重新还回
    shapper
        39
    shapper  
       273 天前
    task 本身就是开新线程,减少锁粒度,锁 devices 就可以,把具体 device 分配到 task ,task 只修改自己引用的 device ,不修改 devices ;
    dogfeet
        40
    dogfeet  
       273 天前
    @bthulu 看起来就是写不依赖读,或者说写需要的读状态可以是旧数据(只需完整,无需最新)。那么单纯的将 Device 变为不可变就行。ConcurrentDictionary 单纯的读写本身是原子的,查了一下,不可变的线程安全 C# 与 Java 是一致的。
    nevermoreluo
        41
    nevermoreluo  
       273 天前
    除了 Group 都是 int 或者 bool ,Group 不动的话 保证原子性应该就好了吧....
    svnware
        42
    svnware  
       273 天前
    @namonai 不考虑数据一致性,允许脏读就无所谓了
    xumng123
        43
    xumng123  
       273 天前 via iPhone
    已经是安全的了
    bthulu
        44
    bthulu  
    OP
       273 天前
    @qping Thread0 会不依赖原有属性值更新所有 Device 的属性. MonitorThread 会读大部分的 devices, 并更新小部分 devices.
    m2276699
        45
    m2276699  
       273 天前
    这样的业务应该用事件驱动
    johnnyyeen
        46
    johnnyyeen  
       272 天前
    1 生产者对多消费者,给每个消费者一个队列。
    通过原子操作(信号或者锁)的方式保护生产者与消费者的竞争条件(我写数据你取走数据)。
    svnware
        47
    svnware  
       265 天前
    @me1onsoda 举个栗子,对硬件熟悉的话,机器字长多少,每次写入刚好是一个机器字长就不会被打断,肯定是安全的。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5389 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 350ms · UTC 08:06 · PVG 16:06 · LAX 00:06 · JFK 03:06
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.