国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

python中Asyncio庫與Node.js的異步IO機制

前言

我花了一個夏天的時間在Node.js的web框架上,那是我第一次全職用Node.js工作。在使用了幾周后,有一件事變得很清晰,那就是我們組的工程師包括我都對Node.js中異步事件機制缺乏了解,也不清楚它底層是怎么實現(xiàn)的。我深信,對一個框架使用非常熟練高效,一定是基于對它的實現(xiàn)原理了解非常深刻之上的。所以我決定去深挖它。這份好奇和執(zhí)著最后不僅停留在Node.js上,同時也延伸到了對其它語言中異步事件機制的實現(xiàn),尤其是python。我也是拿python來開刀,去學(xué)習(xí)和實踐的。于是我接觸到了python 3.4的異步IO庫 asyncio,它同時也和我對協(xié)程(coroutine)的興趣不謀而合,可以參考我的那篇關(guān)于生成器和協(xié)程的博客(因為asyncio的異步IO是用協(xié)程實現(xiàn)的)。這篇博客是為了回答我在研究那篇博客時產(chǎn)生的問題,同時也希望能解答朋友們的一些疑惑。

這篇博客中所有的代碼都是基于Python 3.4的。這是因為Python 3.4同時引入了 selectors 和 asyncio 模塊。對于Python以前的版本,Twisted, gevent 和 tornado 都提供了類似的功能。

對于本文中剛開始的一些示例代碼,出于簡單易懂的原因,我并沒有引入錯誤處理和異常的機制。在實際編碼中,適當(dāng)?shù)漠惓L幚硎且粋€非常重要的編碼習(xí)慣。在本文的最后,我將用幾個例子來展示Python 3.4中的 asyncio 庫是如何處理異常的。

開始:重溫Hello World

我們來寫一個程序解決一個簡單的問題。本文后面篇幅的多個程序,都是在這題的基礎(chǔ)之上稍作改動,來闡述協(xié)程的思想。

寫一個程序每隔3秒打印“Hello World”,同時等待用戶命令行的輸入。用戶每輸入一個自然數(shù)n,就計算并打印斐波那契函數(shù)的值F(n),之后繼續(xù)等待下一個輸入

有這樣一個情況:在用戶輸入到一半的時候有可能就打印了“Hello World!”,不過這個case并不重要,不考慮它。

對于熟悉Node.js和JavaScript的同學(xué)可能很快能寫出類似下面的程序:

log_execution_time = require('./utils').log_execution_time;

var fib = function fib(n) {
  if (n < 2) return n;
  return fib(n - 1) + fib(n - 2);
};

var timed_fib = log_execution_time(fib);

var sayHello = function sayHello() {
  console.log(Math.floor((new Date()).getTime() / 1000) + " - Hello world!");
};

var handleInput = function handleInput(data) {
  n = parseInt(data.toString());
  console.log('fib(' + n + ') = ' + timed_fib(n));
};

process.stdin.on('data', handleInput);
setInterval(sayHello, 3000);

跟你所看到的一樣,這題使用Node.js很容易就可以做出來。我們所要做的只是設(shè)置一個周期性定時器去輸出“Hello World!”,并且在 process.stdin 的 data 事件上注冊一個回調(diào)函數(shù)。非常容易,它就是這么工作了,但是原理如何呢?讓我們先來看看Python中是如何做這樣的事情的,再來回答這個問題。

在這里也使用了一個 log_execution_time 裝飾器來統(tǒng)計斐波那契函數(shù)的計算時間。

程序中采用的 斐波那契算法 是故意使用最慢的一種的(指數(shù)復(fù)雜度)。這是因為這篇文章的主題不是關(guān)于斐波那契的(可以參考我的這篇文章,這是一個關(guān)于斐波那契對數(shù)復(fù)雜度的算法),同時因為比較慢,我可以更容易地展示一些概念。下面是Python的做法,它將使用數(shù)倍的時間。

from log_execution_time import log_execution_time

def fib(n):
    return fib(n - 1) + fib(n - 2) if n > 1 else n

timed_fib = log_execution_time(fib)

回到最初的問題,我們?nèi)绾伍_始去寫這樣一個程序呢?Python內(nèi)部并沒有類似于 setInterval 或者 setTimeOut 這樣的函數(shù)。
所以第一個可能的做法就是采用系統(tǒng)層的并發(fā)——多線程的方式:

from threading import Thread
from time import sleep
from time import time
from fib import timed_fib

def print_hello():
    while True:
        print("{} - Hello world!".format(int(time())))
        sleep(3)

def read_and_process_input():
    while True:
        n = int(input())
        print('fib({}) = {}'.format(n, timed_fib(n)))

def main():
    # Second thread will print the hello message. Starting as a daemon means
    # the thread will not prevent the process from exiting.
    t = Thread(target=print_hello)
    t.daemon = True
    t.start()
    # Main thread will read and process input
    read_and_process_input()

if __name__ == '__main__':
    main()

同樣也不麻煩。但是它和Node.js版本的做法是否在效率上也是差不多的呢?來做個試驗。這個斐波那契計算地比較慢,我們嘗試一個較為大的數(shù)字就可以看到比較客觀的效果:Python中用37,Node.js中用45(JavaScript在數(shù)字計算上本身就比Python快一些)。

python3.4 hello_threads.py
1412360472 - Hello world!
37
1412360475 - Hello world!
1412360478 - Hello world!
1412360481 - Hello world!
Executing fib took 8.96 seconds.
fib(37) = 24157817
1412360484 - Hello world!

它花了將近9秒來計算,在計算的同時“Hello World!”的輸出并沒有被掛起。下面嘗試下Node.js:

node hello.js
1412360534 - Hello world!
1412360537 - Hello world!
45
Calculation took 12.793 seconds
fib(45) = 1134903170
1412360551 - Hello world!
1412360554 - Hello world!
1412360557 - Hello world!

不過Node.js在計算斐波那契的時候,“Hello World!”的輸出卻被掛起了。我們來研究下這是為什么。

事件循環(huán)和線程

對于線程和事件循環(huán)我們需要有一個簡單的認(rèn)識,來理解上面兩種解答的區(qū)別。先從線程說起,可以把線程理解成指令的序列以及CPU執(zhí)行的上下文(CPU上下文就是寄存器的值,也就是下一條指令的寄存器)。

一個同步的程序總是在一個線程中運行的,這也是為什么在等待,比如說等待IO或者定時器的時候,整個程序會被阻塞。最簡單的掛起操作是 sleep ,它會把當(dāng)前運行的線程掛起一段給定的時間。一個進程可以有多個線程,同一個進程中的線程共享了進程的一些資源,比如說內(nèi)存、地址空間、文件描述符等。

線程是由操作系統(tǒng)的調(diào)度器來調(diào)度的,調(diào)度器統(tǒng)一負責(zé)管理調(diào)度進程中的線程(當(dāng)然也包括不同進程中的線程,不過對于這部分我將不作過多描述,因為它超過了本文的范疇。),它來決定什么時候該把當(dāng)前的線程掛起,并把CPU的控制權(quán)交給另一個線程來處理。這稱為上下文切換,包括對于當(dāng)前線程上下文的保存、對目標(biāo)線程上下文的加載。上下文切換會對性能造成一定的影響,因為它本身也需要CPU周期來執(zhí)行。

操作系統(tǒng)切換線程有很多種原因:
1.另一個優(yōu)先級更高的線程需要馬上被執(zhí)行(比如處理硬件中斷的代碼)
2.線程自己想要被掛起一段時間(比如 sleep)
3.線程已經(jīng)用完了自己時間片,這個時候線程就不得不再次進入隊列,供調(diào)度器調(diào)度

回到我們之前的代碼,Python的解答是多線程的。這也解釋了兩個任務(wù)可以并行的原因,也就是在計算斐波那契這樣的CPU密集型任務(wù)的時候,沒有把其它的線程阻塞住。

再來看Node.js的解答,從計算斐波那契把定時線程阻塞住可以看出它是單線程的,這也是Node.js實現(xiàn)的方式。從操作系統(tǒng)的角度,你的Node.js程序是在單線程上運行的(事實上,根據(jù)操作系統(tǒng)的不同,libuv 庫在處理一些IO事件的時候可能會使用線程池的方式,但這并不影響你的JavaScript代碼是跑在單線程上的事實)。

基于一些原因,你可能會考慮避免多線程的方式:
1.線程在計算和資源消耗的角度是較為昂貴的
2.線程并發(fā)所帶來的問題,比如因為共享的內(nèi)存空間而帶來的死鎖和競態(tài)條件。這些又會導(dǎo)致更加復(fù)雜的代碼,在編寫代碼的時候需要時不時地注意一些線程安全的問題
當(dāng)然以上這些都是相對的,線程也是有線程的好處的。但討論那些又與本文的主題偏離了,所以就此打住。

來嘗試一下不使用多線程的方式處理最初的問題。為了做到這個,我們需要模仿一下Node.js是怎么做的:事件循環(huán)。我們需要一種方式去poll(譯者注:沒想到對這個詞的比較合適的翻譯,輪訓(xùn)?不合適。) stdin 看看它是否已經(jīng)準(zhǔn)備好輸入了。基于不同的操作系統(tǒng),有很多不同的系統(tǒng)調(diào)用,比如 poll, select, kqueue 等。在Python 3.4中,select 模塊在以上這些系統(tǒng)調(diào)用之上提供了一層封裝,所以你可以在不同的操作系統(tǒng)上很放心地使用而不用擔(dān)心跨平臺的問題。

有了這樣一個polling的機制,事件循環(huán)的實現(xiàn)就很簡單了:每個循環(huán)去看看 stdin 是否準(zhǔn)備好,如果已經(jīng)準(zhǔn)備好了就嘗試去讀取。之后去判斷上次輸出“Hello world!”是否3秒種已過,如果是那就再輸出一遍。
下面是代碼:

import selectors
import sys
from time import time
from fib import timed_fib


def process_input(stream):
    text = stream.readline()
    n = int(text.strip())
    print('fib({}) = {}'.format(n, timed_fib(n)))


def print_hello():
    print("{} - Hello world!".format(int(time())))


def main():
    selector = selectors.DefaultSelector()
    # Register the selector to poll for "read" readiness on stdin
    selector.register(sys.stdin, selectors.EVENT_READ)
    last_hello = 0  # Setting to 0 means the timer will start right away
    while True:
        # Wait at most 100 milliseconds for input to be available
        for event, mask in selector.select(0.1):
            process_input(event.fileobj)
        if time() - last_hello > 3:
            last_hello = time()
            print_hello()


if __name__ == '__main__':
    main()

然后輸出:

$ python3.4 hello_eventloop.py
1412376429 - Hello world!
1412376432 - Hello world!
1412376435 - Hello world!
37
Executing fib took 9.7 seconds.
fib(37) = 24157817
1412376447 - Hello world!
1412376450 - Hello world!

跟預(yù)計的一樣,因為使用了單線程,該程序和Node.js的程序一樣,計算斐波那契的時候阻塞了“Hello World!”輸出。
Nice!但是這個解答還是有點hard code的感覺。下一部分,我們將使用兩種方式對這個event loop的代碼作一些優(yōu)化,讓它功能更加強大、更容易編碼,分別是 回調(diào) 和 協(xié)程。

事件循環(huán)——回調(diào)

對于上面的事件循環(huán)的寫法一個比較好的抽象是加入事件的handler。這個用回調(diào)的方式很容易實現(xiàn)。對于每一種事件的類型(這個例子中只有兩種,分別是stdin的事件和定時器事件),允許用戶添加任意數(shù)量的事件處理函數(shù)。代碼不難,就直接貼出來了。這里有一點比較巧妙的地方是使用了 bisect.insort 來幫助處理時間的事件。算法描述如下:維護一個按時間排序的事件列表,最近需要運行的定時器在最前面。這樣的話每次只需要從頭檢查是否有超時的事件并執(zhí)行它們。bisect.insort 使得維護這個列表更加容易,它會幫你在合適的位置插入新的定時器事件回調(diào)函數(shù)。誠然,有多種其它的方式實現(xiàn)這樣的列表,只是我采用了這種而已。

from bisect import insort
from fib import timed_fib
from time import time
import selectors
import sys


class EventLoop(object):
    """
    Implements a callback based single-threaded event loop as a simple
    demonstration.
    """
    def __init__(self, *tasks):
        self._running = False
        self._stdin_handlers = []
        self._timers = []
        self._selector = selectors.DefaultSelector()
        self._selector.register(sys.stdin, selectors.EVENT_READ)

    def run_forever(self):
        self._running = True
        while self._running:
            # First check for available IO input
            for key, mask in self._selector.select(0):
                line = key.fileobj.readline().strip()
                for callback in self._stdin_handlers:
                    callback(line)

            # Handle timer events
            while self._timers and self._timers[0][0] < time():
                handler = self._timers[0][1]
                del self._timers[0]
                handler()

    def add_stdin_handler(self, callback):
        self._stdin_handlers.append(callback)

    def add_timer(self, wait_time, callback):
        insort(self._timers, (time() + wait_time, callback))

    def stop(self):
        self._running = False


def main():
    loop = EventLoop()

    def on_stdin_input(line):
        if line == 'exit':
            loop.stop()
            return
        n = int(line)
        print("fib({}) = {}".format(n, timed_fib(n)))

    def print_hello():
        print("{} - Hello world!".format(int(time())))
        loop.add_timer(3, print_hello)

    def f(x):
        def g():
            print(x)
        return g

    loop.add_stdin_handler(on_stdin_input)
    loop.add_timer(0, print_hello)
    loop.run_forever()


if __name__ == '__main__':
    main()

代碼很簡單,實際上Node.js底層也是采用這種方式實現(xiàn)的。然而在更復(fù)雜的應(yīng)用中,以這種方式來編寫異步代碼,尤其是又加入了異常處理機制,很快代碼就會變成所謂的回調(diào)地獄(callback hell )。引用 Guido van Rossum 關(guān)于回調(diào)方式的一段話:

要以回調(diào)的方式編寫可讀的代碼,你需要異于常人的編碼習(xí)慣。如果你不相信,去看看JavaScript的代碼就知道了——Guido van Rossum

寫異步回調(diào)代碼還有其它的方式,比如 promise 和 coroutine(協(xié)程) 。我最喜歡的方式(協(xié)程非???,我的博客中這篇文章就是關(guān)于它的)就是采用協(xié)程的方式。下一部分我們將展示使用協(xié)程封裝任務(wù)來實現(xiàn)事件循環(huán)的。

事件循環(huán)——協(xié)程

協(xié)程 也是一個函數(shù),它在返回的同時,還可以保存返回前的運行上下文(本地變量,以及下一條指令),需要的時候可以重新加載上下文從上次離開的下一條命令繼續(xù)執(zhí)行。這種方式的 return 一般叫做 yielding。在這篇文章中我介紹了更多關(guān)于協(xié)程以及在Python中的如何使用的內(nèi)容。在我們的例子中使用之前,我將對協(xié)程做一個更簡單的介紹:

Python中 yield 是一個關(guān)鍵詞,它可以用來創(chuàng)建協(xié)程。
1.當(dāng)調(diào)用 yield value 的時候,這個 value 就被返回出去了,CPU控制權(quán)就交給了協(xié)程的調(diào)用方。調(diào)用 yield 之后,如果想要重新返回協(xié)程,需要調(diào)用Python中內(nèi)置的 next 方法。
2.當(dāng)調(diào)用 y = yield x 的時候,x被返回給調(diào)用方。要繼續(xù)返回協(xié)程上下文,調(diào)用方需要再執(zhí)行協(xié)程的 send 方法。在這個列子中,給send方法的參數(shù)會被傳入?yún)f(xié)程作為這個表達式的值(本例中,這個值會被y接收到)。

這意味著我們可以用協(xié)程來寫異步代碼,當(dāng)程序等待異步操作的時候,只需要使用yield把控制權(quán)交出去就行了,當(dāng)異步操作完成了再進入?yún)f(xié)程繼續(xù)執(zhí)行。這種方式的代碼看起來像同步的方式編寫的,非常流暢。下面是一個采用yield計算斐波那契的簡單例子:

def read_input():
    while True:
        line = yield sys.stdin
        n = int(line)
        print("fib({}) = {}".format(n, timed_fib(n)))

僅僅這樣還不夠,我們需要一個能處理協(xié)程的事件循環(huán)。在下面的代碼中,我們維護了一個列表,列表里面保存了,事件循環(huán)要運行的 task。當(dāng)輸入事件或者定時器事件發(fā)生(或者是其它事件),有一些協(xié)程需要繼續(xù)執(zhí)行(有可能也要往協(xié)程中傳入一些值)。每一個 task 里面都有一個 stack 變量保存了協(xié)程的調(diào)用棧,棧里面的每一個協(xié)程都依賴著后一個協(xié)程的完成。這個基于PEP 342中 “Trampoline”的例子實現(xiàn)的。代碼中我也使用了 functools.partial,對應(yīng)于JavaScript中的 Function.prototype.bind,即把參數(shù)綁定(curry)在函數(shù)上,調(diào)用的時候不需要再傳參了。
下面是代碼:

from bisect import insort
from collections import deque
from fib import timed_fib
from functools import partial
from time import time
import selectors
import sys
import types


class sleep_for_seconds(object):
    """
    Yield an object of this type from a coroutine to have it "sleep" for the
    given number of seconds.
    """
    def __init__(self, wait_time):
        self._wait_time = wait_time


class EventLoop(object):
    """
    Implements a simplified coroutine-based event loop as a demonstration.
    Very similar to the "Trampoline" example in PEP 342, with exception
    handling taken out for simplicity, and selectors added to handle file IO
    """
    def __init__(self, *tasks):
        self._running = False
        self._selector = selectors.DefaultSelector()

        # Queue of functions scheduled to run
        self._tasks = deque(tasks)

        # (coroutine, stack) pair of tasks waiting for input from stdin
        self._tasks_waiting_on_stdin = []

        # List of (time_to_run, task) pairs, in sorted order
        self._timers = []

        # Register for polling stdin for input to read
        self._selector.register(sys.stdin, selectors.EVENT_READ)

    def resume_task(self, coroutine, value=None, stack=()):
        result = coroutine.send(value)
        if isinstance(result, types.GeneratorType):
            self.schedule(result, None, (coroutine, stack))
        elif isinstance(result, sleep_for_seconds):
            self.schedule(coroutine, None, stack, time() + result._wait_time)
        elif result is sys.stdin:
            self._tasks_waiting_on_stdin.append((coroutine, stack))
        elif stack:
            self.schedule(stack[0], result, stack[1])

    def schedule(self, coroutine, value=None, stack=(), when=None):
        """
        Schedule a coroutine task to be run, with value to be sent to it, and
        stack containing the coroutines that are waiting for the value yielded
        by this coroutine.
        """
        # Bind the parameters to a function to be scheduled as a function with
        # no parameters.
        task = partial(self.resume_task, coroutine, value, stack)
        if when:
            insort(self._timers, (when, task))
        else:
            self._tasks.append(task)

    def stop(self):
        self._running = False

    def do_on_next_tick(self, func, *args, **kwargs):
        self._tasks.appendleft(partial(func, *args, **kwargs))

    def run_forever(self):
        self._running = True
        while self._running:
            # First check for available IO input
            for key, mask in self._selector.select(0):
                line = key.fileobj.readline().strip()
                for task, stack in self._tasks_waiting_on_stdin:
                    self.schedule(task, line, stack)
                self._tasks_waiting_on_stdin.clear()

            # Next, run the next task
            if self._tasks:
                task = self._tasks.popleft()
                task()

            # Finally run time scheduled tasks
            while self._timers and self._timers[0][0] < time():
                task = self._timers[0][1]
                del self._timers[0]
                task()

        self._running = False



def print_every(message, interval):
    """
    Coroutine task to repeatedly print the message at the given interval
    (in seconds)
    """
    while True:
        print("{} - {}".format(int(time()), message))
        yield sleep_for_seconds(interval)


def read_input(loop):
    """
    Coroutine task to repeatedly read new lines of input from stdin, treat
    the input as a number n, and calculate and display fib(n).
    """
    while True:
        line = yield sys.stdin
        if line == 'exit':
            loop.do_on_next_tick(loop.stop)
            continue
        n = int(line)
        print("fib({}) = {}".format(n, timed_fib(n)))


def main():
    loop = EventLoop()
    hello_task = print_every('Hello world!', 3)
    fib_task = read_input(loop)
    loop.schedule(hello_task)
    loop.schedule(fib_task)
    loop.run_forever()


if __name__ == '__main__':
    main()

代碼中我們也實現(xiàn)了一個 do_on_next_tick 的函數(shù),可以在下次事件循環(huán)的時候注冊想要執(zhí)行的函數(shù),這個跟Node.js中的process.nextTick多少有點像。我使用它來實現(xiàn)了一個簡單的 exit 特性(即便我可以直接調(diào)用 loop.stop())。

我們也可以使用協(xié)程來重構(gòu)斐波那契算法代替原有的遞歸方式。這么做的好處在于,協(xié)程間可以并發(fā)運行,包括輸出“Hello World!”的協(xié)程。
斐波那契算法重構(gòu)如下:

from event_loop_coroutine import EventLoop
from event_loop_coroutine import print_every
import sys


def fib(n):
    if n <= 1:
        yield n
    else:
        a = yield fib(n - 1)
        b = yield fib(n - 2)
        yield a + b


def read_input(loop):
    while True:
        line = yield sys.stdin
        n = int(line)
        fib_n = yield fib(n)
        print("fib({}) = {}".format(n, fib_n))


def main():
    loop = EventLoop()
    hello_task = print_every('Hello world!', 3)
    fib_task = read_input(loop)
    loop.schedule(hello_task)
    loop.schedule(fib_task)
    loop.run_forever()


if __name__ == '__main__':
    main()

程序的輸出:

$ python3.4 fib_coroutine.py
1412727829 - Hello world!
1412727832 - Hello world!
28
1412727835 - Hello world!
1412727838 - Hello world!
fib(28) = 317811
1412727841 - Hello world!
1412727844 - Hello world!

不重復(fù)造車輪

前面兩個部分,我們分別使用了回調(diào)函數(shù)和協(xié)程實現(xiàn)了事件循環(huán)來寫異步的邏輯,對于實踐學(xué)習(xí)來說確實是一種不錯的方式,但是Python中已經(jīng)有了非常成熟的庫提供事件循環(huán)。Python3.4中的 asyncio 模塊提供了事件循環(huán)和協(xié)程來處理IO操作、網(wǎng)絡(luò)操作等。在看更多有趣的例子前,針對上面的代碼我們用 asyncio 模塊來重構(gòu)一下:

import asyncio
import sys
from time import time
from fib import timed_fib


def process_input():
    text = sys.stdin.readline()
    n = int(text.strip())
    print('fib({}) = {}'.format(n, timed_fib(n)))


@asyncio.coroutine
def print_hello():
    while True:
        print("{} - Hello world!".format(int(time())))
        yield from asyncio.sleep(3)


def main():
    loop = asyncio.get_event_loop()
    loop.add_reader(sys.stdin, process_input)
    loop.run_until_complete(print_hello())


if __name__ == '__main__':
    main()

上面的代碼中 @asyncio.coroutine 作為裝飾器來裝飾協(xié)程,yield from 用來從其它協(xié)程中接收參數(shù)。

異常處理

Python中的協(xié)程允許異常在協(xié)程調(diào)用棧中傳遞,在協(xié)程掛起的地方捕獲到異常狀態(tài)。我們來看一個簡單的例子:

def coroutine():
    print("Starting")
    try:
        yield "Let's pause until continued."
        print("Continuing")
    except Exception as e:
        yield "Got an exception: " + str(e)


def main():
    c = coroutine()
    next(c)  # Execute until the first yield
    # Now throw an exception at the point where the coroutine has paused
    value = c.throw(Exception("Have an exceptional day!"))
    print(value)


if __name__ == '__main__':
    main()

輸出如下:

Starting
Got an exception: Have an exceptional day!

這個特性使得用異常處理問題有一個統(tǒng)一的處理方式,不管是在同步還是異步的代碼中,因為事件循環(huán)可以合理地捕獲以及傳遞異常。我們來看一個事件循環(huán)和多層調(diào)用協(xié)程的例子:

import asyncio


@asyncio.coroutine
def A():
    raise Exception("Something went wrong in A!")


@asyncio.coroutine
def B():
    a = yield from A()
    yield a + 1


@asyncio.coroutine
def C():
    try:
        b = yield from B()
        print(b)
    except Exception as e:
        print("C got exception:", e)


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(C())


if __name__ == '__main__':
    main()

輸出:

C got exception: Something went wrong in A!

在上面的例子中,協(xié)程C依賴B的結(jié)果,B又依賴A的結(jié)果,A最后拋出了一個異常。最后這個異常一直被傳遞到了C,然后被捕獲輸出。這個特性與同步的代碼的方式基本一致,并不用手動在B中捕獲、再拋出!

當(dāng)然,這個例子非常理論化,沒有任何創(chuàng)意。讓我們來看一個更像生產(chǎn)環(huán)境中的例子:我們使用 ipify 寫一個程序異步地獲取本機的ip地址。因為 asyncio 庫并沒有HTTP客戶端,我們不得不在TCP層手動寫一個HTTP請求,并且解析返回信息。這并不難,因為API的內(nèi)容都以及胸有成竹了(僅僅作為例子,不是產(chǎn)品代碼),說干就干。實際應(yīng)用中,使用 aiohttp 模塊是一個更好的選擇。下面是實現(xiàn)代碼:

import asyncio
import json


host = 'api.ipify.org'
request_headers = {'User-Agent': 'python/3.4',
                   'Host': host,
                   'Accept': 'application/json',
                   'Accept-Charset': 'UTF-8'}


@asyncio.coroutine
def write_headers(writer):
    for key, value in request_headers.items():
        writer.write((key + ': ' + value + '\r\n').encode())
    writer.write(b'\r\n')
    yield from writer.drain()


@asyncio.coroutine
def read_headers(reader):
    response_headers = {}
    while True:
        line_bytes = yield from reader.readline()
        line = line_bytes.decode().strip()
        if not line:
            break
        key, value = line.split(':', 1)
        response_headers[key.strip()] = value.strip()
    return response_headers

@asyncio.coroutine
def get_my_ip_address(verbose):
    reader, writer = yield from asyncio.open_connection(host, 80)
    writer.write(b'GET /?format=json HTTP/1.1\r\n')
    yield from write_headers(writer)
    status_line = yield from reader.readline()
    status_line = status_line.decode().strip()
    http_version, status_code, status = status_line.split(' ')
    if verbose:
        print('Got status {} {}'.format(status_code, status))
    response_headers = yield from read_headers(reader)
    if verbose:
        print('Response headers:')
        for key, value in response_headers.items():
            print(key + ': ' + value)
    # Assume the content length is sent by the server, which is the case
    # with ipify
    content_length = int(response_headers['Content-Length'])
    response_body_bytes = yield from reader.read(content_length)
    response_body = response_body_bytes.decode()
    response_object = json.loads(response_body)
    writer.close()
    return response_object['ip']

@asyncio.coroutine
def print_my_ip_address(verbose):
    try:
        ip_address = yield from get_my_ip_address(verbose)
        print("My IP address is:")
        print(ip_address)
    except Exception as e:
        print("Error: ", e)


def main():
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(print_my_ip_address(verbose=True))
    finally:
        loop.close()


if __name__ == '__main__':
    main()

是不是跟同步代碼看著很像?:沒有回調(diào)函數(shù),沒有復(fù)雜的錯誤處理邏輯,非常簡單、可讀性非常高的代碼。
下面是程序的輸出,沒有任何錯誤:

$ python3.4 ipify.py
Got status 200 OK
Response headers:
Content-Length: 21
Server: Cowboy
Connection: keep-alive
Via: 1.1 vegur
Content-Type: application/json
Date: Fri, 10 Oct 2014 03:46:31 GMT
My IP address is:
# <my IP address here, hidden for privacy!>

使用協(xié)程來處理異步邏輯的主要優(yōu)勢在我看來就是:錯誤處理與同步代碼幾乎一致。比如在上面的代碼中,協(xié)程調(diào)用鏈中任意一環(huán)出錯,并不會導(dǎo)致什么問題,錯誤與同步代碼一樣被捕獲,然后處理。

依賴多個互不相關(guān)協(xié)程的返回結(jié)果

在上面的例子中,我們寫的程序是順序執(zhí)行的,雖然使用了協(xié)程,但互不相關(guān)的協(xié)程并沒有完美地并發(fā)。也就是說,協(xié)程中的每一行代碼都依賴于前一行代碼的執(zhí)行完畢。有時候我們需要一些互不相關(guān)的協(xié)程并發(fā)執(zhí)行、等待它們的完成結(jié)果,并不在意它們的執(zhí)行順序。比如,使用網(wǎng)絡(luò)爬蟲的時候,我們會給頁面上的所有外鏈發(fā)送請求,并把返回結(jié)果放入處理隊列中。

協(xié)程可以讓我們用同步的方式編寫異步的代碼,但是對于處理互不相關(guān)的任務(wù)不論是完成后馬上處理抑或是最后統(tǒng)一處理,回調(diào)的方式看上去是最好的選擇。但是,Python 3.4的 asyncio 模塊同時也提供了以上兩種情形的支持。分別是函數(shù) asyncio.as_completed 和 asyncio.gather 。

我們來一個例子,例子中需要同時加載3個URL。采用兩種方式:
1.使用 asyncio.as_completed 一旦請求完成就處理
2.使用 asyncio.gather 等待所有都完成一起處理
與其加載真的URL地址,我們采用一個更簡單的方式,讓協(xié)程掛起隨機長度的時間。
下面是代碼:

import asyncio
import random


@asyncio.coroutine
def get_url(url):
    wait_time = random.randint(1, 4)
    yield from asyncio.sleep(wait_time)
    print('Done: URL {} took {}s to get!'.format(url, wait_time))
    return url, wait_time


@asyncio.coroutine
def process_as_results_come_in():
    coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
    for coroutine in asyncio.as_completed(coroutines):
        url, wait_time = yield from coroutine
        print('Coroutine for {} is done'.format(url))


@asyncio.coroutine
def process_once_everything_ready():
    coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
    results = yield from asyncio.gather(*coroutines)
    print(results)


def main():
    loop = asyncio.get_event_loop()
    print("First, process results as they come in:")
    loop.run_until_complete(process_as_results_come_in())
    print("\nNow, process results once they are all ready:")
    loop.run_until_complete(process_once_everything_ready())


if __name__ == '__main__':
    main()

輸出如下:

$ python3.4 gather.py
First, process results as they come in:
Done: URL URL2 took 2s to get!
Coroutine for URL2 is done
Done: URL URL3 took 3s to get!
Coroutine for URL3 is done
Done: URL URL1 took 4s to get!
Coroutine for URL1 is done

Now, process results once they are all ready:
Done: URL URL1 took 1s to get!
Done: URL URL2 took 3s to get!
Done: URL URL3 took 4s to get!
[('URL1', 1), ('URL2', 3), ('URL3', 4)]

更加深入

有很多內(nèi)容本篇文章并沒有涉及到,比如 Futures 和 *libuv*。這個視頻(需要翻)(http://www#youtube#com/watch?v=1coLC-MUCJc)是介紹Python中的異步IO的。本篇文章中也有可能有很多我遺漏的內(nèi)容,歡迎隨時在評論中給我補充。


文章來源地址http://www.zghlxwxcb.cn/article/437.html

到此這篇關(guān)于python中Asyncio庫與Node.js的異步IO機制的文章就介紹到這了,更多相關(guān)內(nèi)容可以在右上角搜索或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

原文地址:http://www.zghlxwxcb.cn/article/437.html

如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請聯(lián)系站長進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Python asyncio高性能異步編程 詳解

    Python asyncio高性能異步編程 詳解

    目錄 一、協(xié)程 1.1、greenlet實現(xiàn)協(xié)程 1.2、yield 1.3、asyncio 1.4、async await 二、協(xié)程意義 三、異步編程 3.1、事件循環(huán) 3.2、快速上手 3.3、await 3.4、Task對象 3.5、asyncio.Future對象 3.5、concurrent.futures.Future對象 3.7、異步迭代器 3.8、異步上下文管理器 四、uvloop 五、實戰(zhàn)案例

    2024年02月20日
    瀏覽(33)
  • Python異步編程探究:深入理解asyncio的使用和原理【第130篇—asyncio】

    Python異步編程探究:深入理解asyncio的使用和原理【第130篇—asyncio】

    前些天發(fā)現(xiàn)了一個巨牛的人工智能學(xué)習(xí)網(wǎng)站,通俗易懂,風(fēng)趣幽默,忍不住分享一下給大家?!军c擊進入巨牛的人工智能學(xué)習(xí)網(wǎng)站】。 隨著計算機應(yīng)用程序的復(fù)雜性不斷增加,對于高效處理I/O密集型任務(wù)的需求也越來越迫切。在Python中,asyncio模塊提供了一種強大的異步編程

    2024年04月12日
    瀏覽(30)
  • Python 中的 Async IO [異步 IO]

    Python 中的 Async IO [異步 IO]

    Async IO是一種并發(fā)編程設(shè)計,在Python中得到了專門的支持,從Python 3.4到3.7,并且可能還會繼續(xù)發(fā)展。 你可能會擔(dān)心地想到:“并發(fā)、并行、線程、多進程。這已經(jīng)夠難理解的了。異步IO又是什么?\\\" 本教程旨在幫助你回答這個問題,讓你更好地理解Python中異步IO的方法。 以下

    2024年02月03日
    瀏覽(22)
  • Python異步編程之web框架 異步vs同步 文件IO任務(wù)壓測對比

    Python異步編程之web框架 異步vs同步 文件IO任務(wù)壓測對比

    主題: 比較異步框架和同步框架在文件IO操作的性能差異 python版本 :python 3.8 壓測工具 :locust web框架 :同步:flask 異步:aiohttp、starlette 異步文件模塊 :aiofiles、anyio.Path 請求并發(fā)量 : 模擬10個用戶 服務(wù)器配置 : Intel(R) i7-12700F 客戶端配置 :Intel(R) i7-8700 3.20GHz flask是python中輕

    2024年02月06日
    瀏覽(25)
  • Python異步編程之web框架異步vs同步 無IO任務(wù)壓測對比

    Python異步編程之web框架異步vs同步 無IO任務(wù)壓測對比

    在python編程中,通過協(xié)程實現(xiàn)的異步編程號稱能夠提高IO密集型任務(wù)的并發(fā)量。本系列比較web服務(wù)器同步框架和異步框架的性能差異,包括無IO接口和常見IO操作,如文件、mysql、redis等。使用壓測工具locust測試相同條件下兩種編程模式能夠處理請求的速度。 主題: 單純比較異

    2024年02月06日
    瀏覽(20)
  • Python異步編程之web框架 異步vs同步 數(shù)據(jù)庫IO任務(wù)并發(fā)支持對比

    Python異步編程之web框架 異步vs同步 數(shù)據(jù)庫IO任務(wù)并發(fā)支持對比

    主題: 比較異步框架和同步框架在數(shù)據(jù)庫IO操作的性能差異 python版本 :python 3.8 數(shù)據(jù)庫 :mysql 8.0.27 (docker部署) 壓測工具 :locust web框架 :同步:flask 異步:starlette 請求并發(fā)量 : 模擬10個用戶 服務(wù)器配置 : Intel(R) i7-12700F 客戶端配置 :Intel(R) i7-8700 3.20GHz python中操作數(shù)據(jù)庫通常

    2024年02月08日
    瀏覽(30)
  • Python異步編程之web框架 異步vs同步 數(shù)據(jù)庫IO任務(wù)壓測對比

    Python異步編程之web框架 異步vs同步 數(shù)據(jù)庫IO任務(wù)壓測對比

    主題: 比較異步框架和同步框架在數(shù)據(jù)庫IO操作的性能差異 python版本 :python 3.8 數(shù)據(jù)庫 :mysql 8.0.27 (docker部署) 壓測工具 :locust web框架 :同步:flask 異步:starlette 請求并發(fā)量 : 模擬10個用戶 服務(wù)器配置 : Intel(R) i7-12700F 客戶端配置 :Intel(R) i7-8700 3.20GHz python中操作數(shù)據(jù)庫通常

    2024年02月08日
    瀏覽(30)
  • 【文末送書】Python高并發(fā)編程:探索異步IO和多線程并發(fā)

    【文末送書】Python高并發(fā)編程:探索異步IO和多線程并發(fā)

    歡迎關(guān)注博主 Mindtechnist 或加入【智能科技社區(qū)】一起學(xué)習(xí)和分享Linux、C、C++、Python、Matlab,機器人運動控制、多機器人協(xié)作,智能優(yōu)化算法,濾波估計、多傳感器信息融合,機器學(xué)習(xí),人工智能等相關(guān)領(lǐng)域的知識和技術(shù)。搜索關(guān)注公粽號 《機器和智能》 發(fā)送“刷題寶

    2024年02月15日
    瀏覽(22)
  • Asyncio 協(xié)程異步筆記

    協(xié)程不是計算機提供,而是程序員人為創(chuàng)造。 協(xié)程(coroutine),也可以被稱為微線程,是一種用戶態(tài)內(nèi)的上下文切換技術(shù)。簡而言之,其實就是通過一個線程實現(xiàn)代碼塊互相切換運行。例如: 實現(xiàn)協(xié)程有這么幾種方法: greenlet ,早期模塊。 yield 。 asyncio 裝飾器(py

    2024年02月08日
    瀏覽(18)
  • JS執(zhí)行機制--同步與異步

    JS執(zhí)行機制--同步與異步

    單線程 JavaScript語言具有單線程的特點,同一個時間只能做一件事情。這是因為JavaScript腳本語言是為了處理頁面中用戶的交互,以及操作DOM而誕生的。如果對某個DOM元素進行添加和刪除,不同同時進行。應(yīng)該是先添加,再刪除,事件有序。 單線程的特點是所有任務(wù)都需要排隊

    2023年04月20日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包