Python AsyncIO and PyZMQ

Escribir código seguro y escalable fácilmente

images/python.svg
Autor:Carlos Jenkins, KuraLabs S.R.L
Email:carlos.jenkins@kuralabs.io
Fecha:26 de Julio, 2018

Introduction

A partir de Python 3.4* se introdujo AsyncIO, con conjunto de bibliotecas y mejoras al lenguaje que permiten procesamiento asíncrono*.

Compararemos modelos de concurrencia tradicionales como threading y multiprocess con el basado en event loops, así como algunos frameworks que nos ayudarán a crear RESTful APIs rápidamente y de forma segura.

Agenda

¿Qué es AsyncIO?

AsyncIO es un con conjunto de bibliotecas y mejoras al lenguaje que permiten procesamiento asíncrono*.

Nos concentraremos en Async/Await, que es una forma de escribir código asíncrono que parece síncrono.

A una función async se le conoce como una co-rutina.

Async/Await

async def hello_world():
    await some_future()

Comparación con otros modelos de concurrencia

Modelo de Concurrencia CPUs Cambio de Contexto Criterio
Threading (In Python) 1 Python Interpreter (GIL, CPython)* 10ms, 100 byte codes, sleep()
Multiprocessing n Kernel Scheduler (Linux, CFS)* Black Magic (CFS)
AsyncIO 1 Event Loop await

Threading

images/threading.svg

Multiprocessing

images/multiprocessing.svg

Async

images/async.svg

HTTP con AsyncIO

from aiohttp import web

app = web.Application(middlewares=middlewares)
app['conf'] = parse_config(args.conf)
app.on_startup.append(setup_db)

app.router.add_get(
    '/voter/name/{name}/{province_filter:\d+}/{page:\d+}',
    handler_name
)
web.run_app(app, path=args.path)

HTTP con AsyncIO (2)

async def handler_name(request):
    return web.json_response({'hello': 'world'})

Bonus: Validación de Schema

Es fácil crear un middleware que valide solicitudes y respuestas basados en un esquema que valide no sólo el tipo de dato pero la semántica del dato.

Cerberus:

from cerberus import Validator

validator = Validator(schema)
validated = validator.validated(data)
errors = validator.errors

Bonus: Validación de Schema (2)

SCHEMA_NAME_REQUEST = {
    'name': {
        'required': True,
        'type': 'string',
        'empty': False,
    },
    'province_filter': {
        'required': False,
        'default': 0,
        'type': 'integer',
        'coerce': int,
        'min': 0,
        'max': 8,
    },
    'page': {
        'required': False,
        'default': 0,
        'type': 'integer',
        'coerce': int,
        'min': 0,
        'max': 100,
    },
}

Bases de datos con AsyncIO

DBMS Tipo Driver
MySQL Relational AIOMySQL
MongoDB Document Oriented Motor
InfluxDB Time Series AioInflux

Comunicación entre procesos

PyZMQ: Super-sockets. Multiple topologies, buffered, async.

from zmq.asyncio import Context
from zmq import PUSH

async def setup(app):
    mysocket = Context.instance().socket(PUSH)
    mysocket.connect(app['conf']['mysocket']['path'])
    app['mysocket'] = mysocket

# ...
app['conf'] = parse_config(args.conf)
app.on_startup.append(setup)

Comunicación entre procesos (2)

PUSH:

from umsgpack import packb

mysocket = request.app['mysocket']
data = {
    'time': time(),
}
await mysocket.send(packb(data))

Comunicación entre procesos (3)

PULL:

from asyncio import get_event_loop

loop = get_event_loop()
try:
    loop.run_until_complete(consume(path))
finally:
    loop.close()

Comunicación entre procesos (4)

from zmq import PULL
from umsgpack import unpackb

async def consume(path):
    mysocket = Context.instance().socket(PULL)
    socket.bind(path)

    while KEEP_RUNNING:
        events = await mysocket.poll()
        for _ in range(events):
            frame = await mysocket.recv(copy=False)
            print(unpackb(frame))
    socket.close()

ProTips

Escalando con AsyncIO

Supervisor: Gestión de procesos.

[program:myapp]
numprocs = NUM_CPUS
numprocs_start = 1

; Unix socket paths are specified by command line.
command=myapp -vvv \
  --path=/var/run/myapp/myapp_%(process_num)s.sock \
  --conf=/etc/myapp/myapp.toml

Escalando con AsyncIO (2)

NGINX: /etc/nginx/sites-available/myapp

upstream myapp {
    # fail_timeout=0 means we always retry an upstream even
    # if it failed to return a good HTTP response

    # Unix domain servers
    server unix:/var/run/myapp/myapp_1.sock fail_timeout=0;
    server unix:/var/run/myapp/myapp_2.sock fail_timeout=0;
    server unix:/var/run/myapp/myapp_3.sock fail_timeout=0;
    server unix:/var/run/myapp/myapp_4.sock fail_timeout=0;
}

Escalando con AsyncIO (3)

NGINX: /etc/nginx/sites-available/myapp

server {
    listen 80;
    # ...

    location / {
        proxy_set_header Host $http_host;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_redirect off;
        proxy_buffering off;
        proxy_pass http://myapp;
    }
}

Escalando con AsyncIO (4)

Nuestros resultados:

¿Preguntas?

Muchas gracias.

https://carlos.jenkins.co.cr/presentations/asyncio_pyzmq

Autor:Carlos Jenkins, KuraLabs S.R.L
Email:carlos.jenkins@kuralabs.io
Web:https://kuralabs.io/