迭代器iteration、iter 与 多线程 concurrent 交叉实践(详细)
实践及简介说明
由于在实际运用重,迭代器(或生成器)经常与多线程一并使用。本实践旨在对迭代器(及生成器)、多线程库(主要为concurrent)进行交叉实践说明,用来使读者更加理解迭代器和多线程在实际的应用。因此本篇轻教程,重实践,当然也会简要说明相关知识。
基础知识相关介绍(简要)
详细介绍建议观看Hucci写代码博主教的基础知识,详细易懂
【Python】从迭代器到生成器:小内存也能处理大数据
在Python中用多线程之前,需要先搞懂这些
你为什么需要学习迭代器、多线程
学这个教程/实践之前,你需要先看看自己有没有这个需求,当然你感兴趣也可以学。
迭代器(生成器)的优劣
优势:
- 使用时生成数据,缓解爆缓存问题(RAM、GRAM)【主要】
- 统一标准接口,无需关系数据的数据结构调用问题,使用 for _iter in iteration 即可调用
劣势:
多线程的优劣(Python限定)
优势:
- 并行执行,提高CPU的使用率
- 充分利用I/O的中断时间,从而提高代码的执行速度
劣势:
- 调试难度大
- 老版本(好像是3.12之前),只有一个GIL锁,导致进程会出现抢占现象。
- 少数据量带来的提升可能较少
01-环境来源数据
我用的是kaggle的开源数据,由于很多都是csv类型,因此我也以简单的csv数据集多线程读取进行说明,你们也可以用其他数据集,最好文本行长度>5k, 才能展现多线程、迭代器的优势,数量级低还是用单线程、串行吧,容易理解好维护。
powerlifting-database
02-任务介绍及编码思路
空有编码能力,但是没有编码思路是大忌吧。因此先概述任务及编码思路。
2.1 任务
本次实践,任务有以下几点:
- 了解并构造迭代器类
- 使用迭代器类对数据集(csv数据)进行数据划分
- 使用多线程库(concurrent)来模拟多线程处理数据。
2.2 编码思路
想要达成多线程与迭代器的结合,首先需要构造合适的数据,因此需要:
然后需要将数据集导入进来,完善迭代器类,还需要对输入的数据做一下分块,方便后续使用:
数据来源解决了,也分块了,接下来是将数据与多线程联动了
03 构建迭代器类
3.1迭代器的最小框架
可参考 python迭代器简单理解 __iter__和__next__方法
成为一个最小迭代器类通常需要包含以下几个类接口:
- __init__:类初始化接口
- __iter__: 获取迭代对象的接口
- __next__:数据迭代接口。
3.2 构建迭代器类
为了照顾更多的读者,我这里使用传统接口对csv进行读写,有panda能力的读者可以使用pd快捷读取并构造。- class read_file_batch:
- def __init__(self,file_path:str, batch_size:int):
- self.file_fp = open(file_path, encoding='utf-8', mode='r+')
- self.batch_size = batch_size
- def __iter__(self):
- return self #返回这个迭代器本身,我们是以类作为迭代对象,因此返回他自己吧
- def __next__(self): #数据划分
- batch_res = []
- for i in range(self.batch_size):
- line = self.file_fp.readline() #最底层获取行数据的交互接口
- if line: #如果行非空
- batch_res.append(line)
- else: #读到最后就算空的了
- self.file_fp.close()
- raise StopIteration
- return batch_res
复制代码
- 调用尝试:
可以调试一下,感受一下分批的迭代及数据的输入输出。- if __name__ == '__main__':
- your_file_path:str = r"E:\powerlifting-database\openpowerlifting.csv"
- iteration = read_file_batch(your_file_path, batch_size=1000)
- count = 0
- for lines in iteration:
- print('-'*50+f"count:{count}"+'-'*50)
- count += 1
- print(lines)
复制代码 04 引入多线程处理批量数据
4.1 多线程写法
其实蛮简单的,只要把接口函数写好、数据处理一下,就行了。
- 导入库:
- from concurrent.futures import ThreadPoolExecutor as Excecutor
复制代码 - 使用上下文管理工具管理线程(确保安全及释放):
- with Excecutor(max_workers=8) as executor:
复制代码 - 使用map接口进行多线程指令执行:
- executor.map(
- fn= function, iter_element1, iter_element2, iter_element3
- )#function是你对应的函数名, iter_element1-3是函数依赖的输入数据
复制代码 - 汇总:
- from concurrent.futures import ThreadPoolExecutor as Excecutorwith Excecutor(max_workers=8) as executor: executor.map(
- fn= function, iter_element1, iter_element2, iter_element3
- )#function是你对应的函数名, iter_element1-3是函数依赖的输入数据
复制代码 4.2 引入多线程处理数据
- 首先,模拟一个处理行数据的函数用户处理行信息:
- def process_dealing(line:str):
- '''
- 模拟处理某一行的数据
- :param line:处理每一行
- '''
- print(f'line top 20 info is :{line[:20]}')
- pass
复制代码 - 在迭代器的迭代过程中对数据进行处理。
- if __name__ == '__main__':
- your_file_path:str = r"E:\powerlifting-database\openpowerlifting.csv"
- iteration = read_file_batch(your_file_path, batch_size=1000)
- count = 0
- with Excecutor(max_workers=8) as executor:
- for lines in iteration:
- print('-'*50+f"count:{count}"+'-'*50)
- count += 1
- print(lines)
- executor.map(process_dealing, lines)
- #第一个参数是处理的函数,后面是*iteration 就是你函数依赖的参数的可迭代对象,这里就是放入list[str]
复制代码 - 结束了,根据你的需要可以对数据进行处理了,其实最难的步骤在于如何构造迭代器(生成器),多线程其实俩步就走完了。
05 (非必须)使用迭代器、多线程前后对比
- from concurrent.futures import ThreadPoolExecutor as Excecutor
- import time
- import tracemalloc #跟踪内存接口,内置
- def process_dealing(line:str):
- '''
- 模拟处理某一行的数据
- :param line:处理每一行
- '''
- # print(f'line top 20 info is :{line[:20]}')
- pass
- class read_file_batch:
- def __init__(self,file_path:str, batch_size:int):
- self.file_fp = open(file_path, encoding='utf-8', mode='r+')
- self.batch_size = batch_size
- def __iter__(self):
- return self
- def __next__(self): #数据划分,对迭代器进行迭代,实际上是获取他的next,因此在这里划分数据
- batch_res = []
- for i in range(self.batch_size): #取batch个
- line = self.file_fp.readline()
- if line: #如果行非空
- batch_res.append(line)
- else: #读到最后就算空的了
- self.file_fp.close()
- raise StopIteration
- return batch_res #返回迭代数据
- def main_concurrent(file_path,batch_size:int):
- start_malloc = tracemalloc.start()
- start_time = time.time()
- your_file_path:str = file_path
- iteration = read_file_batch(your_file_path, batch_size=batch_size)
- count = 0
- with Excecutor(max_workers=8) as executor:
- for lines in iteration: #返回是batch行,每一行是一个大的str
- print('-'*50+f"count:{count}"+'-'*50+f"RAM cost:{round(tracemalloc.get_tracemalloc_memory() / 1024**2,4)} M")
- count += 1
- print(lines[:50]) #节约时间,只取前50字符
- executor.map(process_dealing, lines)
- #第一个参数是处理的函数,后面是*iteration 就是你函数依赖的参数的可迭代对象,这里就是放入list[str]
- print(f'spend time:{round(time.time()-start_time,4)}')
- def main_not_concurrent(file_path,batch_size:int=1000):
- start_time = time.time()
- start_trace = tracemalloc.start()
- with open(file=file_path,encoding='utf-8',mode='r+') as fp:
- data = fp.readlines()
- for idx,line in enumerate(data):
- if idx % batch_size == 0:
- print('-'*50 +str(idx)+'-'*50+f"RAM cost:{round(tracemalloc.get_tracemalloc_memory() / 1024**2,4)} M")
- print(line[:50])
- process_dealing(line)
- print(f'spend time:{round(time.time()-start_time,4)}')
- if __name__ == '__main__':
- file_path = r'E:\openpowerlifting.csv'
- batch_size = 1000
- main_concurrent(file_path,batch_size)
- # main_not_concurrent(file_path,batch_size)
- # 其实能看出来这种数据量还是直接缓存在内存比较好,但使用多线程+迭代器确实减少了很多内存
复制代码 来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |