东莞前10大互联网公司,襄阳抖音seo找哪家,长沙做网站哪家公司好,计算机软件开发是干什么的进程是资源分配的最小单位#xff0c;线程是CPU调度的最小单位。每一个进程中至少有一个线程。 Python对并发编程的支持 (1)多线程#xff1a;threading#xff0c;利用CPU和IO可以同时执行的原理#xff0c;让CPU不会干巴巴等待IO完成。 (2)多进程#xff1a;multiproces… 进程是资源分配的最小单位线程是CPU调度的最小单位。每一个进程中至少有一个线程。 Python对并发编程的支持 (1)多线程threading利用CPU和IO可以同时执行的原理让CPU不会干巴巴等待IO完成。 (2)多进程multiprocessing利用多核CPU的能力真正的并行执行任务。 (3)异步IOasyncio在单线程利用CPU和IO同时执行的原理实现函数异步执行。 (4)使用Lock对资源加锁防止冲突访问。 (5)使用Queue实现不同线程/进程之间的数据通信实现生产者-消费者模式。 (6)使用线程池Pool/进程池Pool简化线程/进程的任务提交、等待结束、获取结果。 (7)使用subprocess启动外部程序的进程并进行输入输出交互。 Python并发编程有三种方式 多线程Thread、多进程Process、多协程Coroutine。 为什么要引入并发编程 场景1一个网络爬虫按顺序爬取花了1小时采用并发下载减少到20分钟 场景2一个APP应用优化前每次打开页面需要3秒采用异步并发提升到每次200毫秒 引入并发就是为了提升程序运行速度。 多线程、多进程、多协程的对比 怎样根据任务选择对应技术 GIL全局解释器锁 GIL全局解释器锁Global Interpreter Lock是一种在Python解释器中使用的机制它的主要作用是防止同一时间内多个线程同时执行 Python 代码。 在 Python 中由于存在 GIL 锁的机制因此在多线程执行 Python 代码时同一时间只有一个线程能够占用 CPU 执行 Python 代码其他线程将一直处于等待状态。 这种机制有利于保证 Python 代码的稳定性和线程安全但也带来了一定的性能损耗。因此对于 CPU 密集型的 Python 应用程序多线程并不能提高其运行速度。相反对于 I/O 密集型的应用程序多线程可以有效地提升其运行效率。 GIL步骤 在多线程环境中Python 解释器按以下方式执行 设置 GIL切换到一个线程去运行运行指定数量的字节码指令或者线程主动让出控制(可以调用 time.sleep(0))把线程设置为睡眠状态解锁 GIL再次重复以上所有步骤。 在调用外部代码(如 C/C扩展函数)的时候GIL将会被锁定直到这个函数结束为止(由于在这期间没有Python的字节码被运行所以不会做线程切换)编写扩展的程序员可以主动解锁GIL。 GIL全局解释器相关背景
GIL锁就是保证在统一时刻只有一个线程执行所有的线程必须拿到GIL锁才有执行权限
1. Python代码运行在解释器上嘛有解释器来执行或者解释
2. Python解释器的种类1、CPython 2、IPython 3、PyPy 4、Jython 5、IronPython
3. 当前市场使用的最多(95%)的解释器就是CPython解释器
4. GIL全局解释器锁是存在于CPython中
5. 结论是同一时刻只有一个线程在执行? 想避免的问题是出现多个线程抢夺资源的情况比如现在起一个线程来回收垃圾数据回收a1这个变量另外一个线程也要使用这个变量a当垃圾回收线程还没没有把变量a回收完毕另一个线程就来抢夺这个变量a使用。怎么避免的这个问题那就是在Python这门语言设计之处就直接在解释器上添加了一把锁这把锁就是为了让统一时刻只有一个线程在执行言外之意就是哪个线程想执行就必须先拿到这把锁(GIL), 只有等到这个线程把GIL锁释放掉别的线程才能拿到然后具备了执行权限.
GIL全局解释器需要注意的问题
1. python有GIL锁的原因同一个进程下多个线程实际上同一时刻只有一个线程在执行2. 只有在python上开进程用的多其他语言一般不开多进程只开多线程就够了3. cpython解释器开多线程不能利用多核优势只有开多进程才能利用多核优势其他语言不存在这个问题4. 8核cpu电脑充分利用起我这个8核至少起8个线程8条线程全是计算---计算机cpu使用率是100%5. 如果不存在GIL锁一个进程下开启8个线程它就能够充分利用cpu资源跑满cpu6. cpython解释器中好多代码模块都是基于GIL锁机制写起来的改不了了---》我们不能有8个核但我现在只能用1核----》开启多进程---》每个进程下开启的线程可以被多个cpu调度执行7. cpython解释器io密集型使用多线程计算密集型使用多进程 I / O密集型: 遇到io操作会切换cpu假设你开了8个线程8个线程都有io操作---》io操作不消耗cpu---》一段时间内看上去其实8个线程都执行了, 选多线程好一些 计算密集型: 消耗cpu如果开了8个线程第一个线程会一直占着cpu而不会调度到其他线程执行其他7个线程根本没执行所以我们开8个进程每个进程有一个线程8个进程下的线程会被8个cpu执行从而效率高. 互斥锁 互斥锁的作用在多线程的情况下同时执行一个数据会发生数据错乱的问题互斥锁可以防止这种情况发生。
n 10
from threading import Lock
import timedef task(lock):lock.acquire()global ntemp ntime.sleep(0.5)n temp - 1lock.release()拿时间换空间空间换时间 时间复杂度from threading import Threadif __name__ __main__:tt []lockLock()for i in range(10):t Thread(targettask, args(lock, ))t.start()tt.append(t)for j in tt:j.join()print(主, n)
GIL锁互斥锁 面试题 面试题既然有了GIL锁为什么还要互斥锁? (多线程下) 举例比如我起了2个线程来执行aa1,a一开始是0 1. 第一个线程来了拿到a0开始执行aa1,这个时候结果a就是1了 2. 第一个线程得到的结果1还没有赋值回去给a,这个时候第二个线程来了拿到的a是 0继续执行 aa1结果还是1 3. 加了互斥锁就能够解决多线程下操作同一个数据发生错乱的问题 线程队列(线程里使用队列) 为什么线程中还有使用队列? 同一个进程下多个线程数据是共享的为什么先同一个进程下还会去使用队列呢 因为队列是管道 锁所以用队列还是为了保证数据的安全
程队列1. 先进先出2. 后进先出3. 优先级的队列from multiprocessing import Queue线程队列import queue
queue.Queue()# queue.Queue 的缺点是它的实现涉及到多个锁和条件变量因此可能会影响性能和内存效率。
import queueqqueue.Queue() # 无限大、
q.put(first)
q.put(second)
q.put(third)
q.put(third)print(q.get())
print(q.get())
print(q.get())## 后进先出
import queue# Lifolast in first out
qqueue.LifoQueue()
q.put(first)
q.put(second)
q.put(third)print(q.get())
print(q.get())
print(q.get())## 优先级队列
import queueqqueue.PriorityQueue()
#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,a))
q.put((10,b))
q.put((30,c))print(q.get())
print(q.get())
print(q.get())结果(数字越小优先级越高,优先级高的优先出队):
(10, b)
(20, a)
(30, c)进程池和线程池的使用 池池子、容器类型可以盛放多个元素 进程池提前定义好一个池子然后往这个池子里面添加进程以后只需要往这个进程池里面丢任务就行了然后有这个进程池里面的任意一个进程来执行任务 线程池提前定义好一个池子然后往这个池子里面添加线程以后只需要往这个线程池里面丢任务就行了然后有这个线程池里面的任意一个线程来执行任务 def task(n, m):return nmdef task1():return {username:kevin, password:123}
开进程池
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutordef callback(res):print(res) # Future at 0x1ed5a5e5610 statefinished returned intprint(res.result()) # 3def callback1(res):print(res) # Future at 0x1ed5a5e5610 statefinished returned intprint(res.result()) # {username: kevin, password: 123}print(res.result().get(username))
if __name__ __main__:poolProcessPoolExecutor(3) # 定义一个进程池里面有3个进程## 2. 往池子里面丢任务pool.submit(task, m1, n2).add_done_callback(callback)pool.submit(task1).add_done_callback(callback1)pool.shutdown() # join closeprint(123) 进程池和线程池有什么好处呢? (1)降低资源消耗。通过重复利用已创建的线程降低线程创建、销毁线程造成的消耗。 (2)提高响应速度。当任务到达时任务可以不需要等到线程创建就能立即执行。 (3)提高线程的可管理性。线程是稀缺资源如果无限制的创建不仅会消耗系统资源还会降低系统的稳定性使用线程池可以进行统一的分配、调优和监控。 Concurrent.futures模块(爬虫)
模块介绍 concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor线程池提供异步调用 ProcessPoolExecutor进程池提供异步调用 Both implement the same interface, which is defined by the abstract Executor class. 基本方法 submit(fn, *args, **kwargs)异步提交任务 map(func, *iterables, timeoutNone, chunksize1)取代for循环submit的操作 shutdown(waitTrue)相当于进程池的pool.close()pool.join()操作 waitTrue等待池内所有任务执行完毕回收完资源后才继续waitFalse立即返回并不会等待池内的任务执行完毕但不管wait参数为何值整个程序都会等到所有任务执行完毕submit和map必须在shutdown之前 result(timeoutNone)取得结果 add_done_callback(fn)回调函数 done()判断某一个线程是否完成 cancle()取消某个任务 ThreadPoolExecutor线程池 常用函数 将函数提交到线程池里面运行的时候会自动创建Future对象并返回。这个Future对象里面就包含了函数的执行状态(比如此时是处于暂停、运行中还是完成等)。并且函数在执行完毕之后还会调用future.set_result将自身的返回值设置进去。 (1)创建一个线程池可以指定max_workers参数表示最多创建多少个线程。如果不指定那么每提交一个函数都会为其创建一个线程。 在启动线程池的时候肯定是需要设置容量的不然处理几千个函数要开启几千个线程。 (2)通过submit即可将函数提交到线程池一旦提交就会立刻运行。因为开启了一个新的线程主线程会继续往下执行。至于submit的参数按照函数名对应参数提交即可。 (3)future相当于一个容器包含了内部函数的执行状态。 (4)函数执行完毕时会将返回值设置在future里也就是说一旦执行了 future.set_result那么就表示函数执行完毕了然后外界可以调用result拿到返回值。 from concurrent.futures import ThreadPoolExecutor
import timedef task(name, n):time.sleep(n)return f{name} 睡了 {n} 秒executor ThreadPoolExecutor()
future executor.submit(task, 屏幕前的你, 3)print(future) # Future at 0x7fbf701726d0 staterunning
print(future.running()) # 函数是否正在运行中True
print(future.done()) # 函数是否执行完毕Falsetime.sleep(3) # 主程序也sleep 3秒,显然此时函数已经执行完毕了print(future) # Future at 0x7fbf701726d0 statefinished returned str返回值类型是str
print(future.running()) # False
print(future.done()) # Trueprint(future.result())多线程爬取网页
import requestsdef get_page(url):resrequests.get(url)nameurl.rsplit(/)[-1].htmlreturn {name:name,text:res.content}def call_back(fut):print(fut.result()[name])with open(fut.result()[name],wb) as f:f.write(fut.result()[text])if __name__ __main__:poolThreadPoolExecutor(2)urls[http://www.baidu.com,http://www.cnblogs.com,http://www.taobao.com]for url in urls:pool.submit(get_page,url).add_done_callback(call_back)
协程理论 核心理解切换是程序员级别的切换我们自己切不是操作系统切的 协程的本质最大效率的利用计算机的CPU资源欺骗计算机让计算机cpu一直保持工作状态 协程是单线程下的并发又称微线程纤程。英文名Coroutine。一句话说明什么是协程协程是一种用户态的轻量级线程即协程是由用户程序自己控制调度的。 需要强调的是 python的线程属于内核级别的即由操作系统控制调度如单线程遇到io或执行时间过长就会被迫交出cpu执行权限切换其他线程运行单线程内开启协程一旦遇到io就会从应用程序级别而非操作系统控制切换以此来提升效率非io操作的切换与效率无关 对比操作系统控制线程的切换用户在单线程内控制协程的切换。 优点如下 协程的切换开销更小属于程序级别的切换操作系统完全感知不到因而更加轻量级单线程内就可以实现并发的效果最大限度地利用cpu 缺点如下 协程的本质是单线程下无法利用多核可以是一个程序开启多个进程每个进程内开启多个线程每个线程内开启协程协程指的是单个线程因而一旦协程出现阻塞将会阻塞整个线程 总结协程特点 必须在只有一个单线程里实现并发修改共享数据不需加锁用户程序里自己保存多个控制流的上下文栈附加一个协程遇到IO操作自动切换到其它协程如何实现检测IOyield、greenlet都无法实现就用到了gevent模块select机制 协程之greenlet模块
一、安装模块 安装pip3 install greenlet 二、greenlet实现状态切换
from greenlet import greenletdef eat(name):print(%s eat 1 %name)g2.switch(nick)print(%s eat 2 %name)g2.switch()
def play(name):print(%s play 1 %name)g1.switch()print(%s play 2 %name)g1greenlet(eat)
g2greenlet(play)g1.switch(nick)#可以在第一次switch时传入参数以后都不需要单纯的切换在没有io的情况下或者没有重复开辟内存空间的操作反而会降低程序的执行速度。
三、效率对比
#顺序执行
import time
def f1():res1for i in range(100000000):residef f2():res1for i in range(100000000):res*istarttime.time()
f1()
f2()
stoptime.time()
print(run time is %s %(stop-start)) #10.985628366470337#切换
from greenlet import greenlet
import time
def f1():res1for i in range(100000000):resig2.switch()def f2():res1for i in range(100000000):res*ig1.switch()starttime.time()
g1greenlet(f1)
g2greenlet(f2)
g1.switch()
stoptime.time()
print(run time is %s %(stop-start)) # 52.763017892837524greenlet只是提供了一种比generator更加便捷的切换方式当切到一个任务执行时如果遇到io那就原地阻塞仍然是没有解决遇到IO自动切换来提升效率的问题。 单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作我们完全可以在执行任务1时遇到阻塞就利用阻塞的时间去执行任务2…如此才能提高效率这就用到了Gevent模块。 协程之gevent模块
1 猴子补丁 1这个词原来为Guerrilla Patch杂牌军、游击队说明这部分不是原装的在英文里guerilla发音和gorllia(猩猩)相似再后来就写了monkey(猴子)。 2还有一种解释是说由于这种方式将原来的代码弄乱了(messing with it)在英文里叫monkeying about(顽皮的)所以叫做Monkey Patch。 1.1 猴子补丁的功能(一切皆对象) 拥有在模块运行时替换的功能, 例如: 一个函数对象赋值给另外一个函数对象(把函数原本的执行的功能给替换了)
class Monkey():def play(self):print(猴子在玩)class Dog():def play(self):print(狗子在玩)
mMonkey()
m.play()
m.playDog().play
m.play()1.2 monkey patch的应用场景 这里有一个比较实用的例子,很多用到import json, 后来发现ujson性能更高,如果觉得把每个文件的import json改成import ujson as json成本较高, 或者说想测试一下ujson替换是否符合预期, 只需要在入口加上:
import json
import ujsondef monkey_patch_json():json.__name__ ujsonjson.dumps ujson.dumpsjson.loads ujson.loads
monkey_patch_json()
aajson.dumps({name:lqz,age:19})
print(aa)
1.3 Gevent介绍 Gevent 是一个第三方库可以轻松通过gevent实现并发同步或异步编程在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部但它们被协作式地调度。 用法
#用法
g1gevent.spawn(func,1,,2,3,x4,y5)创建一个协程对象g1spawn括号内第一个参数是函数名如eat后面可以有多个参数可以是位置实参或关键字实参都是传给函数eat的g2gevent.spawn(func2)g1.join() #等待g1结束g2.join() #等待g2结束#或者上述两步合作一步gevent.joinall([g1,g2])g1.value#拿到func1的返回值
示例1遇到io自动切
import gevent
def eat(name):print(%s eat 1 %name)gevent.sleep(2)print(%s eat 2 %name)def play(name):print(%s play 1 %name)gevent.sleep(1)print(%s play 2 %name)g1gevent.spawn(eat,lqz)
g2gevent.spawn(play,namelqz)
g1.join()
g2.join()
#或者gevent.joinall([g1,g2])
print(主)示例二 上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面如timesocket模块之前或者我们干脆记忆成要用gevent需要将from gevent import monkey;monkey.patch_all()放到文件的开头from gevent import monkey;monkey.patch_all()import gevent
import time
def eat():print(eat food 1)time.sleep(2)print(eat food 2)def play():print(play 1)time.sleep(1)print(play 2)g1gevent.spawn(eat)
g2gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print(主)# 我们可以用threading.current_thread().getName()来查看每个g1和g2查看的结果为DummyThread-n即假线程协程实现高并发
服务端
服务端
from gevent import monkey;monkey.patch_all()
import gevent
from socket import socket
# from multiprocessing import Process
from threading import Threaddef talk(conn):while True:try:data conn.recv(1024)if len(data) 0: breakprint(data)conn.send(data.upper())except Exception as e:print(e)conn.close()def server(ip, port):server socket()server.bind((ip, port))server.listen(5)while True:conn, addr server.accept()# tProcess(targettalk,args(conn,))# tThread(targettalk,args(conn,))# t.start()gevent.spawn(talk, conn)if __name__ __main__:g1 gevent.spawn(server, 127.0.0.1, 8080)g1.join()
客户端
客户端import socket
from threading import current_thread, Threaddef socket_client():cli socket.socket()cli.connect((127.0.0.1, 8080))while True:ss %s say hello % current_thread().getName()cli.send(ss.encode(utf-8))data cli.recv(1024)print(data)for i in range(5000):t Thread(targetsocket_client)t.start()END