找回密码
 立即注册
首页 业界区 业界 迭代器 iteration、iter 与 多线程 concurrent 交叉实践 ...

迭代器 iteration、iter 与 多线程 concurrent 交叉实践(详细)

戈森莉 昨天 23:10
迭代器iteration、iter 与 多线程 concurrent 交叉实践(详细)

实践及简介说明

​        由于在实际运用重,迭代器(或生成器)经常与多线程一并使用。本实践旨在对迭代器(及生成器)、多线程库(主要为concurrent)进行交叉实践说明,用来使读者更加理解迭代器和多线程在实际的应用。因此本篇轻教程,重实践,当然也会简要说明相关知识。
基础知识相关介绍(简要)

详细介绍建议观看Hucci写代码博主教的基础知识,详细易懂
【Python】从迭代器到生成器:小内存也能处理大数据
在Python中用多线程之前,需要先搞懂这些
你为什么需要学习迭代器、多线程

学这个教程/实践之前,你需要先看看自己有没有这个需求,当然你感兴趣也可以学。
迭代器(生成器)的优劣

优势:

  • 使用时生成数据,缓解爆缓存问题(RAM、GRAM)【主要】
  • 统一标准接口,无需关系数据的数据结构调用问题,使用 for _iter in iteration 即可调用
劣势:

  • 单次使用
  • 不支持索引
  • 单向遍历(向前)
多线程的优劣(Python限定)

优势

  • 并行执行,提高CPU的使用率
  • 充分利用I/O的中断时间,从而提高代码的执行速度
劣势:

  • 调试难度大
  • 老版本(好像是3.12之前),只有一个GIL锁,导致进程会出现抢占现象。
  • 少数据量带来的提升可能较少
01-环境来源数据

我用的是kaggle的开源数据,由于很多都是csv类型,因此我也以简单的csv数据集多线程读取进行说明,你们也可以用其他数据集,最好文本行长度>5k, 才能展现多线程、迭代器的优势,数量级低还是用单线程、串行吧,容易理解好维护。
powerlifting-database
02-任务介绍及编码思路

空有编码能力,但是没有编码思路是大忌吧。因此先概述任务及编码思路。
2.1 任务

本次实践,任务有以下几点:

  • 了解并构造迭代器类
  • 使用迭代器类对数据集(csv数据)进行数据划分
  • 使用多线程库(concurrent)来模拟多线程处理数据。
2.2 编码思路

想要达成多线程与迭代器的结合,首先需要构造合适的数据,因此需要:

  • 构建迭代器类(框架)
然后需要将数据集导入进来,完善迭代器类,还需要对输入的数据做一下分块,方便后续使用:

  • 数据导入+分块+完善迭代器类
数据来源解决了,也分块了,接下来是将数据与多线程联动了

  • 引入多线程库,处理切分的数据。
03 构建迭代器类

3.1迭代器的最小框架

可参考 python迭代器简单理解 __iter__和__next__方法
​        成为一个最小迭代器类通常需要包含以下几个类接口:
1.png

  • __init__:类初始化接口
  • __iter__: 获取迭代对象的接口
  • __next__:数据迭代接口。
3.2  构建迭代器类

为了照顾更多的读者,我这里使用传统接口对csv进行读写,有panda能力的读者可以使用pd快捷读取并构造。
  1. class read_file_batch:
  2.     def __init__(self,file_path:str, batch_size:int):
  3.         self.file_fp = open(file_path, encoding='utf-8', mode='r+')
  4.         self.batch_size = batch_size
  5.     def __iter__(self):
  6.         return self                                        #返回这个迭代器本身,我们是以类作为迭代对象,因此返回他自己吧
  7.     def __next__(self):                                #数据划分
  8.         batch_res = []
  9.         for i in range(self.batch_size):       
  10.             line = self.file_fp.readline()        #最底层获取行数据的交互接口
  11.             if line:                        #如果行非空
  12.                 batch_res.append(line)
  13.             else:                           #读到最后就算空的了
  14.                 self.file_fp.close()
  15.                 raise StopIteration
  16.         return batch_res
复制代码

  • 调用尝试:
    可以调试一下,感受一下分批的迭代及数据的输入输出。
    1. if __name__ == '__main__':
    2.     your_file_path:str = r"E:\powerlifting-database\openpowerlifting.csv"
    3.     iteration = read_file_batch(your_file_path, batch_size=1000)
    4.     count = 0
    5.     for lines in iteration:
    6.         print('-'*50+f"count:{count}"+'-'*50)
    7.         count += 1
    8.         print(lines)
    复制代码
04 引入多线程处理批量数据

4.1 多线程写法

其实蛮简单的,只要把接口函数写好、数据处理一下,就行了。

  • 导入库:
    1. from concurrent.futures import ThreadPoolExecutor as Excecutor
    复制代码
  • 使用上下文管理工具管理线程(确保安全及释放):
    1. with Excecutor(max_workers=8) as executor:
    复制代码
  • 使用map接口进行多线程指令执行:
    1. executor.map(
    2.     fn= function, iter_element1, iter_element2, iter_element3
    3. )#function是你对应的函数名, iter_element1-3是函数依赖的输入数据
    复制代码
  • 汇总:
    1. from concurrent.futures import ThreadPoolExecutor as Excecutorwith Excecutor(max_workers=8) as executor:        executor.map(
    2.     fn= function, iter_element1, iter_element2, iter_element3
    3. )#function是你对应的函数名, iter_element1-3是函数依赖的输入数据
    复制代码
4.2 引入多线程处理数据


  • 首先,模拟一个处理行数据的函数用户处理行信息:
    1. def process_dealing(line:str):
    2.     '''
    3.     模拟处理某一行的数据
    4.     :param line:处理每一行
    5.     '''
    6.     print(f'line top 20 info is :{line[:20]}')
    7.     pass
    复制代码
  • 在迭代器的迭代过程中对数据进行处理。
    1. if __name__ == '__main__':
    2.     your_file_path:str = r"E:\powerlifting-database\openpowerlifting.csv"
    3.     iteration = read_file_batch(your_file_path, batch_size=1000)
    4.     count = 0
    5.     with Excecutor(max_workers=8) as executor:
    6.         for lines in iteration:
    7.             print('-'*50+f"count:{count}"+'-'*50)
    8.             count += 1
    9.             print(lines)
    10.             executor.map(process_dealing, lines)
    11.             #第一个参数是处理的函数,后面是*iteration 就是你函数依赖的参数的可迭代对象,这里就是放入list[str]
    复制代码
  • 结束了,根据你的需要可以对数据进行处理了,其实最难的步骤在于如何构造迭代器(生成器),多线程其实俩步就走完了。
05 (非必须)使用迭代器、多线程前后对比
  1. from concurrent.futures import ThreadPoolExecutor as Excecutor
  2. import time
  3. import tracemalloc  #跟踪内存接口,内置
  4. def process_dealing(line:str):
  5.     '''
  6.     模拟处理某一行的数据
  7.     :param line:处理每一行
  8.     '''
  9.     # print(f'line top 20 info is :{line[:20]}')
  10.     pass
  11. class read_file_batch:
  12.     def __init__(self,file_path:str, batch_size:int):
  13.         self.file_fp = open(file_path, encoding='utf-8', mode='r+')
  14.         self.batch_size = batch_size
  15.     def __iter__(self):
  16.         return self
  17.     def __next__(self):                     #数据划分,对迭代器进行迭代,实际上是获取他的next,因此在这里划分数据
  18.         batch_res = []
  19.         for i in range(self.batch_size):    #取batch个
  20.             line = self.file_fp.readline()
  21.             if line:                        #如果行非空
  22.                 batch_res.append(line)
  23.             else:                           #读到最后就算空的了
  24.                 self.file_fp.close()
  25.                 raise StopIteration
  26.         return batch_res                    #返回迭代数据
  27. def main_concurrent(file_path,batch_size:int):
  28.     start_malloc = tracemalloc.start()
  29.     start_time = time.time()
  30.     your_file_path:str = file_path
  31.     iteration = read_file_batch(your_file_path, batch_size=batch_size)
  32.     count = 0
  33.     with Excecutor(max_workers=8) as executor:
  34.         for lines in iteration:                         #返回是batch行,每一行是一个大的str
  35.             print('-'*50+f"count:{count}"+'-'*50+f"RAM cost:{round(tracemalloc.get_tracemalloc_memory() / 1024**2,4)} M")
  36.             count += 1
  37.             print(lines[:50])                           #节约时间,只取前50字符
  38.             executor.map(process_dealing, lines)
  39.             #第一个参数是处理的函数,后面是*iteration 就是你函数依赖的参数的可迭代对象,这里就是放入list[str]
  40.     print(f'spend time:{round(time.time()-start_time,4)}')
  41. def main_not_concurrent(file_path,batch_size:int=1000):
  42.     start_time = time.time()
  43.     start_trace = tracemalloc.start()
  44.     with open(file=file_path,encoding='utf-8',mode='r+') as fp:
  45.         data = fp.readlines()
  46.         for idx,line in enumerate(data):
  47.             if idx % batch_size == 0:
  48.                 print('-'*50 +str(idx)+'-'*50+f"RAM cost:{round(tracemalloc.get_tracemalloc_memory() / 1024**2,4)} M")
  49.                 print(line[:50])
  50.                 process_dealing(line)
  51.     print(f'spend time:{round(time.time()-start_time,4)}')
  52. if __name__ == '__main__':
  53.     file_path = r'E:\openpowerlifting.csv'
  54.     batch_size = 1000
  55.     main_concurrent(file_path,batch_size)
  56.     # main_not_concurrent(file_path,batch_size)
  57.     # 其实能看出来这种数据量还是直接缓存在内存比较好,但使用多线程+迭代器确实减少了很多内存
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册