python进程池:multiprocessing.pool
本文转至http://www.cnblogs.com/kaituorensheng/p/4465768.html,在其基础上进行了一些小小改动。
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。 Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
例1:使用进程池
from multiprocessing import freeze_support,Pool
import time
def Foo(i):
time.sleep(2)
print('___time---',time.ctime())
return i+100
def Bar(arg):
print('----exec done:',arg,time.ctime())
if __name__ == '__main__':
freeze_support()
pool = Pool(3) #线程池中的同时执行的进程数为3
for i in range(4):
pool.apply_async(func=Foo,args=(i,),callback=Bar) #线程池中的同时执行的进程数为3,当一个进程执行完毕后,如果还有新进程等待执行,则会将其添加进去
# pool.apply(func=Foo,args=(i,))
print('end')
pool.close()
pool.join()#调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
执行结果:
end
___time--- Thu Jun 16 15:11:45 2016
----exec done: 100 Thu Jun 16 15:11:45 2016
___time--- Thu Jun 16 15:11:45 2016
----exec done: 101 Thu Jun 16 15:11:45 2016
___time--- Thu Jun 16 15:11:45 2016
----exec done: 102 Thu Jun 16 15:11:45 2016
___time--- Thu Jun 16 15:11:47 2016
----exec done: 103 Thu Jun 16 15:11:47 2016
函数解释:
- apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
- close() 关闭pool,使其不在接受新的任务。
- terminate() 结束工作进程,不在处理未完成的任务。
- join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
执行说明:创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。
例2:使用进程池(阻塞)
from multiprocessing import freeze_support,Pool
import time
def Foo(i):
time.sleep(2)
print('___time---',time.ctime())
return i+100
def Bar(arg):
print('----exec done:',arg,time.ctime())
if __name__ == '__main__':
freeze_support()
pool = Pool(3) #线程池中的同时执行的进程数为3
for i in range(4):
pool.apply(func=Foo,args=(i,))
print('end')
pool.close()
pool.join()#调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
执行结果
___time--- Thu Jun 16 15:15:16 2016
___time--- Thu Jun 16 15:15:18 2016
___time--- Thu Jun 16 15:15:20 2016
___time--- Thu Jun 16 15:15:22 2016
end
例3:使用进程池,并关注结果
import multiprocessing
import time
def func(msg):
print('hello :',msg,time.ctime())
time.sleep(2)
print('end',time.ctime())
return 'done' + msg
if __name__=='__main__':
pool = multiprocessing.Pool(2)
result = []
for i in range(3):
msg = 'hello %s' %i
result.append(pool.apply_async(func=func,args=(msg,)))
pool.close()
pool.join()
for res in result:
print('***:',res.get())
print('AAAAAAAAll end--')
执行结果
注:get()函数得出每个返回结果的值
例4:使用多个进程池
import multiprocessing
import time,os,random
def Lee():
print('nRun task Lee--%s******ppid:%s'%(os.getpid(),os.getppid()),'~~~~',time.ctime())
start = time.time()
time.sleep(random.randrange(10))
end = time.time()
print('Task Lee,runs %0.2f seconds.'%(end-start),'~~~~',time.ctime())
def Marlon():
print("nRun task Marlon-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime())
start = time.time()
time.sleep(random.random() * 40)
end=time.time()
print( 'Task Marlon runs %0.2f seconds.' %(end - start),'~~~~',time.ctime())
def Allen():
print( "nRun task Allen-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime())
start = time.time()
time.sleep(random.random() * 30)
end = time.time()
print( 'Task Allen runs %0.2f seconds.' %(end - start),'~~~~',time.ctime())
def Frank():
print( "nRun task Frank-%s******ppid:%s"%(os.getpid(),os.getppid()),'~~~~',time.ctime())
start = time.time()
time.sleep(random.random() * 20)
end = time.time()
print( 'Task Frank runs %0.2f seconds.' %(end - start),'~~~~',time.ctime())
if __name__ == '__main__':
func_list = [Lee,Marlon,Allen,Frank]
print('parent process id %s'%os.getpid())
pool = multiprocessing.Pool(4)
for func in func_list:
pool.apply_async(func) #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
print( 'Waiting for all subprocesses done...')
pool.close()
pool.join() #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
print ('All subprocesses done.')
执行结果
parent process id 98552
Waiting for all subprocesses done...
Run task Lee--97316******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016
Run task Marlon-95536******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016
Run task Allen-95720******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016
Run task Frank-98784******ppid:98552 ~~~~ Thu Jun 16 15:20:50 2016
Task Allen runs 0.31 seconds. ~~~~ Thu Jun 16 15:20:51 2016
Task Lee,runs 7.00 seconds. ~~~~ Thu Jun 16 15:20:57 2016
Task Frank runs 14.48 seconds. ~~~~ Thu Jun 16 15:21:05 2016
Task Marlon runs 31.72 seconds. ~~~~ Thu Jun 16 15:21:22 2016
All subprocesses done.
#coding: utf-8
import multiprocessing
def m1(x):
print x * x
if __name__ == '__main__':
pool = multiprocessing.Pool(multiprocessing.cpu_count())
i_list = range(8)
pool.map(m1, i_list)
一次执行结果
0 1 4 9 16 25 36 49
参考:http://www.dotblogs.com.tw/rickyteng/archive/2012/02/20/69635.aspx
问题:http://bbs.chinaunix.net/thread-4111379-1-1.html
#coding: utf-8
import multiprocessing
import logging
def create_logger(i):
print i
class CreateLogger(object):
def __init__(self, func):
self.func = func
if __name__ == '__main__':
ilist = range(10)
cl = CreateLogger(create_logger)
pool = multiprocessing.Pool(multiprocessing.cpu_count())
pool.map(cl.func, ilist)
print "hello------------>"
一次执行结果
0 1 2 3 4 5 6 7 8 9 hello------------>
- js list数据 转 树状 层级 JSON,递归生成树状 层级 JSON
- jquery 图片文件转base64 显示
- AngularJS 用 $http.jsonp 跨域SyntaxError问题
- 简单的java socket 示例
- Hadoop二次开发环境构建
- Android EditText 获得输入焦点 以及requestfocus()失效的问题
- 【直播】我的基因组68:看看哪些基因的突变较多,哪些较少
- GDI+编程
- GDI编程
- ADO访问数据库
- 【直播】我的基因组76:用krona对血液全基因组的菌比例可视化
- 【直播】我的基因组74:快速给测序reads比对到物种
- 用ADO操作数据库的方法步骤
- VC如何获取对话框中控件的坐标
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 新型php漏洞挖掘之debug导致的安全漏洞(Edusoho)
- CVE-2016-3714 - ImageMagick 命令执行分析
- 知乎某处XSS+刷粉超详细漏洞技术分析
- 【STM32H7】第12章 RL-TCPnet V7.X之TCP客户端
- 【STM32F429】第12章 RL-TCPnet V7.X之TCP客户端
- Django DeleteView without confirmation template, but with CSRF attack
- 小记 TypeScript 中的循环引用问题
- 别只会搜日志了,求你懂点检索原理吧
- 分布式系统中的事务问题
- JDK 中的栈竟然是这样实现的?
- 谈一谈如何在Python开发中拒绝SSRF漏洞
- eval长度限制绕过 && PHP5.6新特性
- Cookie-Form型CSRF防御机制的不足与反思
- Python 格式化字符串漏洞(Django为例)
- unity官方案例精讲(第三章)--星际航行游戏Space Shooter