首页   注册   登录
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
23571113
V2EX  ›  C++

如果读文件的速度比处理的快怎么办

  •  
  •   23571113 · 42 天前 · 3956 次点击
    这是一个创建于 42 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我要处理一个大文件, 用的是 libuv(reactor 模型)来处理异步 I/O,但是一个问题是,我非阻塞分段读了文件之后(文件很大),放到线程池中做一些修改,然后按顺序非阻塞写到另外一个文件里.
    问题一是 写文件是远程写,而读文件是本地读,这样非阻塞 I/O 会导致内存爆炸.
    问题二是 既然是非阻塞 I/O 我怎么,输出怎么拼回去.

    40 条回复    2020-03-02 11:29:17 +08:00
    learningman
        1
    learningman   42 天前
    disk cache
    secondwtq
        2
    secondwtq   42 天前   ❤️ 3
    隐约觉得看到过有资料提过类似的情形: https://zeux.io/2019/04/20/qgrep-internals 从“First, the read speed and search speed can be substantially unbalanced.”开始

    TLDR:作者实现了一个比 ag 和 ripgrep 等还快的代码检索工具(主要是加索引,ag 等一般是没索引的所以速度没法比),作者使用线程池来实现搜索,输入和输出都是单线程处理。在输入端用了一个队列,队列在数据超过一定量(也就是输入太快了)时会阻塞掉 push,在输出端使用了一个“ordered queue”,也就是 worker 线程在把结果交给输出线程时会附带位置顺序信息,输出线程只会在缓存里有下一块数据时才会写进去。
    catror
        3
    catror   42 天前 via Android
    1. 无论怎么读,你自己都可以控制读取速度的
    2. 预分配文件,写的时候指定迁移即可,都有专门的函数
    catror
        4
    catror   42 天前 via Android
    @catror 迁移 --> 偏移
    msg7086
        5
    msg7086   42 天前
    分段读的时候检测一下内存占用,然后阻塞?
    laminux29
        6
    laminux29   42 天前
    建议题主尝试先不编程,而是在 Visio 上把流程图画一画,跟着流程走一走。

    编程不仅是一门技术,更是一门艺术。
    而艺术在于设计。

    多设计,少 Coding,才是成为大神之道。
    lihongming
        7
    lihongming   42 天前 via iPhone
    参考 TCP 协议模型
    FrankHB
        8
    FrankHB   42 天前
    @laminux29 要先设计是没错,但设计跟编程不冲突。只要是自己的设计,本质上总是可编程的。你看来只是要强调先不去拿代码实现罢了;然而画图真的就是给没有编程能力的外行人展示用的低效方法。
    (严格意义上,图是可编程的,但拿 Visio 这种本来就不是为了这个目的设计的工具的方案,还不配实用。)
    xenme
        9
    xenme   41 天前 via iPhone
    就是 buffer 的概念么

    读的时候放到你的临时文件,临时文件达到一定大小,相当于 buffer 满了,得阻塞读了
    boywhp
        10
    boywhp   41 天前
    读得快写得慢 你不会暂停一会再读?
    Cbdy
        11
    Cbdy   41 天前 via Android
    那你可以读慢一点
    rapiz
        12
    rapiz   41 天前
    不就是生产者消费者模型吗
    reus
        13
    reus   41 天前
    Michaelssss
        14
    Michaelssss   41 天前 via Android
    你让我想起了 Linus 的读取 240GB/S 的硬盘😂过多占用内存总线
    lqf96
        15
    lqf96   41 天前 via iPhone
    我怎么感觉这其实就是 backpressure 的问题…
    encro
        16
    encro   41 天前
    @laminux29
    @FrankHB

    除了先画图,或者直接编程,有一种中间方法是先写注释,写注释过程过程理清楚,注释中说明为什么要这样做,然后写代码时填空即可,时间不多(必要),代码质量还高,且不容易出错。
    一般我都会采用这种方式,偶尔涉及到比如多个系统多次交互,可能需要先画图。
    abutter
        17
    abutter   41 天前
    就是乱序然后重组,发给多个线程工作的时候要带标识,例如序号,文件偏移地址之类的唯一标识,处理完后的线程将数据交给一个线程,这个线程负责数组排序重组然后写入。
    abutter
        18
    abutter   41 天前
    问题 1,可以通过线程池的线程的数量限制获得最大的的处理能力,然后读取要根据数据完成的情况进行,而不是只要可以读就读。
    问题 2,如上一个回答。
    23571113
        19
    23571113   41 天前
    @catror 问题是文件修改过了,我没法知道偏移量了.
    23571113
        20
    23571113   41 天前
    @secondwtq 大佬我觉得加同步队列的方法靠谱,毕竟我感觉写的慢的话,处理的再快也没用.
    LZ 是小白,导师也很久没写代码了,准确来说需要我做一个备份软件,存储接各大云服务商的存储服务(例如阿里云 OSS,AWS S3),然后能够做到把不同文件的相似数据块给去掉. 导师给了我一个月时间, 我现在的情况是相关论文基本都看完了, 语言会 C++和一些非常基础的库,golang 稍微入门. 第一版计划让我跑在两个节点上,一个做在线去重,一个做离线去重,请问来的及吗?
    23571113
        21
    23571113   41 天前
    @secondwtq
    @catror
    @msg7086
    @laminux29
    @FrankHB
    @boywhp
    @Cbdy
    @rapiz
    @abutter
    对了,客户还要能够恢复数据. 我现在愁死了,不知道如何下手. 大佬们帮帮忙看看我该从哪里开始.
    abutter
        22
    abutter   41 天前
    你的需求我是没有完全看懂,啥叫能够“然后能够做到把不同文件的相似数据块给去掉”?不重复传递一样的数据?

    云存储提供的 API 不同,可能实现方式不同。
    23571113
        23
    23571113   41 天前
    @abutter 比如两个文件有 4KB 的数据是相同的,我只存 4KB 数据到云上,而不是存 8KB.
    abutter
        24
    abutter   41 天前
    librsync 应该有你需要的,用来确定文件是否变化,以及哪些部分变化。

    还有一个问题,要看你是的云服务商提供的 API。否则需要加 meta 文件来解决。
    FrankHB
        25
    FrankHB   41 天前
    Content-agnostic target deduplication... 没做过不清楚细节,之前找到过的也是文件系统级的。不过这种需求如果不是服务商给特别的支持,可以考虑用户侧自己把要传输的文件都压缩了,顺便也会 dedup。
    恢复数据是指什么?你这备份数据的时候不会把原件给扔了吧。如果是指传输中断可以恢复进度,做日志实现事务。
    laminux29
        26
    laminux29   41 天前
    @FrankHB 做编程就像建房砌砖一样。小房子,不做设计,先砌砖当然可行。但大型工程,必然要先有设计图。
    Samuelcc
        27
    Samuelcc   41 天前 via Android
    背压下
    Sasasu
        28
    Sasasu   41 天前
    异步里很常见简单一个问题,除非你的程序是单纯的数据库 gui 的 api 都会有这个问题。

    常见的思路是把线程分两组,一组用于异步事件轮询,一组用于阻塞任务。无论是 CPU 密集还是 IO 密集都可以这样搞,IO 密集阻塞如果去不掉的话就开一堆线程去做,CPU 密集就少开点线程。
    比如
    http://docs.libuv.org/en/v1.x/threadpool.html This thread pool is internally used to run all file system operations, as well as getaddrinfo and getnameinfo requests.
    https://docs.rs/tokio-threadpool/0.1.18/tokio_threadpool/ Backup threads are intended only to support the blocking API.
    asio 应该可以兼容手写的 thread poll

    更 "工程" 的思路是实现抢占,比如 Go,只有一个 pool 然后一堆协程去抢,当然就会出现程序性能下限高上线极低。
    Mutoo
        29
    Mutoo   41 天前
    生产者消费者 + 流模型,可以参考一下 node 的 steam 的实现,readable 和 writable 设计的非常好。
    对其设定缓冲区(buffer/highWaterMark)。之间用 pipe 串连起来,完全是异步实现。
    Sasasu
        30
    Sasasu   41 天前
    > 然后按顺序非阻塞写到另外一个文件里.

    接受者一个持有一个当前 index,发送者把自己的 index - 1 一个 CAS 到接受者上
    失败就休眠,等发送者唤醒自己
    成功就返回成功,并唤醒发送者

    发送者写完一个块就唤醒等在自己身上的接受者
    Sasasu
        31
    Sasasu   41 天前
    writer 一开始一个持有一个当前 index = 0

    worker 把自己的 index - 1 用 CAS 推到 writer 上
    > 失败就休眠,等 writer 唤醒自己
    > 成功就返回成功,并唤醒 writer

    writer 写完一个块就唤醒等在自己身上的 worker
    23571113
        32
    23571113   41 天前
    @Sasasu 我觉得 writer 不应该和线程池之间共用 index 之类的概念,因为他们是不同的东西. 我觉得按顺序完成应该由线程池来保证的. 比如线程池内部有两个变量, 一个是递增的给任务分配的任务号, 一个用来判断当前可以执行结束的任务号.
    Sasasu
        33
    Sasasu   41 天前
    原理都差不多,但是没见任何 runtime 有现成的功能....
    outoftimeerror
        34
    outoftimeerror   41 天前   ❤️ 1
    reactive stream 了解一下,akka-stream 提供了实现方式
    DelayNoMore
        35
    DelayNoMore   41 天前
    如果是 go,用 waitgroup 搞定
    reus
        36
    reus   40 天前   ❤️ 1
    你们都是纸上谈兵……
    实际上一个 content addressable 系统,一百来行就能实现核心功能
    https://play.golang.org/p/LJWU2ut6_TH
    reus
        37
    reus   40 天前
    这个实际上就是和 bt 类似的东西,将文件或者文件夹分成多个块,每个块计算哈希,然后种子文件里只记录哈希和文件夹结构,这样就能用种子去下载整个文件。存储就直接用 kv 类就行,键是哈希,值是哈希对应的内容,这样相同的内容自然就只存一份了。
    git 也是类似的思路,这些系统有个统称是 content addressable system。不难做的,用 go 吧。你看我花个十几二十分钟都做出个原型了,你们还在讨论 io 模型,讨论流程图……
    23571113
        38
    23571113   40 天前
    @reus 基本是这样的,但是有些策略, 比如 chunk 划分不应该用定长的, 如果两份相同的数据, 把一份前面加一个字节, 定长的 chunk 就找不到重复的. (你可以搜一下 content defined chunking).
    reus
        39
    reus   40 天前 via Android
    @23571113 学习了
    hardwork
        40
    hardwork   32 天前
    大致的模型,主线程分段读,每读一段分发到 uv_queue_work 去处理,设置完成回调,回调中做发送处理。
    问题 1, 读处理要控制内存,可以简单控制一下任务数量,发出去一个任务加一,发送完成一个减一,达到上限就休息一下,更精细的就记录内存使用量。精确等待控制可以用 cond signal 来做。否则 sleep 一下也可以。
    问题 2, 顺序写回在回调里做,弄个数组存完成任务序号,回调里顺序检测数组从头开始发送,碰到还没完成的回调返回即可。记录一下上次顺序完成序号,下次检测从上次完成序号开始。

    跑回调的线程和发任务的线程是同一个,所以上面检测都是线程安全的。
    关于   ·   FAQ   ·   API   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   1400 人在线   最高记录 5168   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 23:54 · PVG 07:54 · LAX 16:54 · JFK 19:54
    ♥ Do have faith in what you're doing.