Asynchronous HTTP client

Pulsar ships with a fully featured, HttpClient class for multiple asynchronous HTTP requests. The client has an has no dependencies and API very similar to python requests library.

Getting Started

To get started, one builds a client for multiple sessions:

from pulsar.apps import http
sessions = http.HttpClient()

and than makes requests, in a coroutine:

async def mycoroutine():
    ...
    response = await sessions.get('http://www.bbc.co.uk')
    return response.text()

The response is an HttpResponse object which contains all the information about the request and the result:

>>> request = response.request
>>> print(request.headers)
Connection: Keep-Alive
User-Agent: pulsar/0.8.2-beta.1
Accept-Encoding: deflate, gzip
Accept: */*
>>> response.status_code
200
>>> print(response.headers)
...

The request attribute of HttpResponse is an instance of the original HttpRequest.

Passing Parameters In URLs

You can attach parameters to the url by passing the params dictionary:

response = sessions.get('http://bla.com',
                        params={'page': 2, 'key': 'foo'})
response.url   // 'http://bla.com?page=2&key=foo'

You can also pass a list of items as a value:

params = {key1': 'value1', 'key2': ['value2', 'value3']}
response = sessions.get('http://bla.com', params=params)
response.url   // http://bla.com?key1=value1&key2=value2&key2=value3

Post data

Simple data

Posting data is as simple as passing the data parameter:

sessions.post(..., data={'entry1': 'bla', 'entry2': 'doo'})

JSON data

Posting data encoded as JSON is as simple as passing the json parameter:

sessions.post(..., json={'entry1': 'bla', 'entry2': 'doo'})

File data

Posting data as multipart-encoded is as simple as passing the files parameter:

files = {'file': open('report.xls', 'rb')}
sessions.post(..., files=files)

Streaming data

It is possible to post streaming data too. Streaming data can be a simple generator:

sessions.post(..., data=(b'blabla' for _ in range(10)))

or a generator of a mixture of synchronous and asynchronous data:

def stream():
    fut = asyncio.Future()
    asyncio.get_event_loop().call_later(1, fut.set_result, b'two')
    yield b'one'
    yield fut
    yield b'three'

sessions.post(..., data=stream())

Authentication

Authentication, either basic or digest, can be added by passing the auth parameter during a request. For basic authentication:

sessions.get(..., auth=('<username>','<password>'))

same as:

from pulsar.apps.http import HTTPBasicAuth

sessions.get(..., auth=HTTPBasicAuth('<username>','<password>'))

or digest:

from pulsar.apps.http import HTTPDigestAuth

sessions.get(..., auth=HTTPDigestAuth('<username>','<password>'))

In either case the authentication is handled by adding additional headers to your requests.

TLS/SSL

Supported for TLS is out of the box:

sessions.get('https://github.com/timeline.json')

The HttpClient can verify SSL certificates for HTTPS requests, just like a web browser. To check a host’s SSL certificate, you can use the verify argument:

sessions = HttpClient()
sessions.verify       // True
sessions = HttpClient(verify=False)
sessions.verify       // False

By default, verify is set to True.

You can override the verify argument during requests too:

sessions.get('https://github.com/timeline.json')
sessions.get('https://locahost:8020', verify=False)

You can pass verify the path to a CA_BUNDLE file or directory with certificates of trusted CAs:

sessions.get('https://locahost:8020', verify='/path/to/ca_bundle')

You can also specify a local cert to use as client side certificate, as a single file (containing the private key and the certificate) or as a tuple of both files’ paths:

sessions.get('https://...', cert=('/path/client.cert', '/path/client.key'))

or persistent:

s = HttpClient(cert='/path/client.cert')
s.cert   // '/path/client.cert'

Streaming

This is an event-driven client, therefore streaming support is native.

The raw stream

The easiest way to use streaming is to pass the stream=True parameter during a request and access the HttpResponse.raw attribute. For example:

async def body_coroutine(url):
    # wait for response headers
    response = await sessions.get(url, stream=True)
    #
    async for data in response.raw:
       # data is a chunk of bytes
       ...

The raw attribute is an asynchronous iterable over bytes and it can be iterated once only. When iterating over a raw attribute which has been already iterated, StreamConsumedError is raised.

The attribute has the read method for reading the whole body at once:

await response.raw.read()

Data processed hook

Another approach to streaming is to use the data_processed event handler. For example:

def new_data(response, **kw):
    if response.status_code == 200:
        data = response.recv_body()
        # do something with this data

response = sessions.get(..., data_processed=new_data)

The response recv_body() method fetches the parsed body of the response and at the same time it flushes it. Check the proxy server example for an application using the HttpClient streaming capabilities.

WebSocket

The http client support websocket upgrades. First you need to have a websocket handler, a class derived from WS:

from pulsar.apps import ws

class Echo(ws.WS):

    def on_message(self, websocket, message):
        websocket.write(message)

The websocket response is obtained by:

ws = await sessions.get('ws://...', websocket_handler=Echo())

Client Options

Several options are available to customise how the HTTP client works

Pool size

The HTTP client maintain connections _pools with remote hosts. The parameter which control the pool size for each domain is pool_size which is set to 10 by default.

Redirects

By default Requests will perform location redirection for all verbs except HEAD.

The HttpResponse.history list contains the Response objects that were created in order to complete the request. For example:

response = await sessions.get('http://github.com')
response.status_code    # 200
response.history        # [<Response [301]>]

If you’re using GET, OPTIONS, POST, PUT, PATCH or DELETE, you can disable redirection handling with the allow_redirects parameter:

response = await sessions.get('http://github.com', allow_redirects=False)
response.status_code    # 301
response.history        # []

Decompression

Decompression of the response body is automatic. To disable decompression pass the decompress parameter to a request:

response = await sessions.get('https://github.com', decompress=False)
response.status_code    # 200
response.text()         # UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte

Alternatively, the decompress flag can be set at session level:

sessions = HttpClient(decompress=False)
response = await sessions.get('https://github.com')
response.status_code    # 200
response.text()         # UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte

Synchronous Mode

Can be used in synchronous mode if the loop did not start, alternatively it is possible to use it in synchronous mode on a new thread:

sessions = HttpClient(loop=new_event_loop())

Events

Events control the behaviour of the HttpClient when certain conditions occur. They are useful for handling standard HTTP event such as redirects, websocket upgrades, streaming or anything your application requires.

One time events

There are three one time events associated with an HttpResponse object:

  • pre_request, fired before the request is sent to the server. Callbacks receive the response argument.
  • on_headers, fired when response headers are available. Callbacks receive the response argument.
  • post_request, fired when the response is done. Callbacks receive the response argument.

Adding event handlers can be done at sessions level:

def myheader_handler(response, exc=None):
    if not exc:
        print('got headers!')

sessions.event('on_headers').bind(myheader_handler)

or at request level:

sessions.get(..., on_headers=myheader_handler)

By default, the HttpClient has one pre_request callback for handling HTTP tunneling, three on_headers callbacks for handling 100 Continue, websocket upgrade and cookies, and one post_request callback for handling redirects.

Many time events

In addition to the three one time events, the HttpClient supports two additional events which can occur several times while processing a given response:

  • data_received is fired when new data has been received but not yet parsed
  • data_processed is fired just after the data has been parsed by the HttpResponse. This is the event one should bind to when performing http streaming.

both events support handlers with a signature:

def handler(response, data=None):
    ...

where response is the HttpResponse handling the request and data is the raw data received.

API

The main classes here are the HttpClient, a subclass of AbstractClient, the HttpResponse, returned by http requests and the HttpRequest.

HTTP Client

class pulsar.apps.http.HttpClient(proxies=None, headers=None, verify=True, cookies=None, store_cookies=True, cert=None, max_redirects=10, decompress=True, version=None, websocket_handler=None, parser=None, trust_env=True, loop=None, client_version=None, timeout=None, stream=False, pool_size=10, frame_parser=None, logger=None, close_connections=False, keep_alive=None)[source]

A client for HTTP/HTTPS servers.

It handles pool of asynchronous connections.

Parameters:
headers

Default headers for this HttpClient.

Default: DEFAULT_HTTP_HEADERS.

cookies

Default cookies for this HttpClient.

store_cookies

If True it remembers response cookies and sends them back to servers.

Default: True

timeout

Default timeout for requests. If None or 0, no timeout on requests

proxies

Dictionary of proxy servers for this client.

pool_size

The size of a pool of connection for a given host.

connection_pools

Dictionary of connection pools for different hosts

DEFAULT_HTTP_HEADERS

Default headers for this HttpClient

connection_pool

Connection Pool factory

alias of Pool

client_version = 'pulsar/2.0.2'

String for the User-Agent header.

max_redirects = 10

Maximum number of redirects.

It can be overwritten on request().

version = 'HTTP/1.1'

Default HTTP request version for this HttpClient.

It can be overwritten on request().

get(url, **kwargs)[source]

Sends a GET request and returns a HttpResponse object.

Params url:url for the new HttpRequest object.
Parameters:**kwargs – Optional arguments for the request() method.
options(url, **kwargs)[source]

Sends a OPTIONS request and returns a HttpResponse object.

Params url:url for the new HttpRequest object.
Parameters:**kwargs – Optional arguments for the request() method.
head(url, **kwargs)[source]

Sends a HEAD request and returns a HttpResponse object.

Params url:url for the new HttpRequest object.
Parameters:**kwargs – Optional arguments for the request() method.
post(url, **kwargs)[source]

Sends a POST request and returns a HttpResponse object.

Params url:url for the new HttpRequest object.
Parameters:**kwargs – Optional arguments for the request() method.
put(url, **kwargs)[source]

Sends a PUT request and returns a HttpResponse object.

Params url:url for the new HttpRequest object.
Parameters:**kwargs – Optional arguments for the request() method.
patch(url, **kwargs)[source]

Sends a PATCH request and returns a HttpResponse object.

Params url:url for the new HttpRequest object.
Parameters:**kwargs – Optional arguments for the request() method.
delete(url, **kwargs)[source]

Sends a DELETE request and returns a HttpResponse object.

Params url:url for the new HttpRequest object.
Parameters:**kwargs – Optional arguments for the request() method.
request(method, url, **params)[source]

Constructs and sends a request to a remote server.

It returns a Future which results in a HttpResponse object.

Parameters:
Return type:

a coroutine

close()[source]

Close all connections

ssl_context(verify=True, cert_reqs=None, check_hostname=False, certfile=None, keyfile=None, cafile=None, capath=None, cadata=None, **kw)[source]

Create a SSL context object.

This method should not be called by from user code

create_tunnel_connection(req)[source]

Create a tunnel connection

HTTP Request

class pulsar.apps.http.HttpRequest(client, url, method, inp_params=None, headers=None, data=None, files=None, json=None, history=None, auth=None, charset=None, max_redirects=10, source_address=None, allow_redirects=False, decompress=True, version=None, wait_continue=False, websocket_handler=None, cookies=None, params=None, stream=False, proxies=None, verify=True, cert=None, **extra)[source]

An HttpClient request for an HTTP resource.

This class has a similar interface to urllib.request.Request.

Parameters:
  • files – optional dictionary of name, file-like-objects.
  • allow_redirects – allow the response to follow redirects.
method

The request method

version

HTTP version for this request, usually HTTP/1.1

history

List of past HttpResponse (collected during redirects).

wait_continue

if True, the HttpRequest includes the Expect: 100-Continue header.

stream

Allow for streaming body

ssl

Context for TLS connections.

If this is a tunneled request and the tunnel connection is not yet established, it returns None.

proxy

Proxy server for this request.

tunnel

Tunnel for this request.

encode()[source]

The bytes representation of this HttpRequest.

Called by HttpResponse when it needs to encode this HttpRequest before sending it to the HTTP resource.

has_header(header_name)[source]

Check header_name is in this request headers.

get_header(header_name, default=None)[source]

Retrieve header_name from this request headers.

remove_header(header_name)[source]

Remove header_name from this request.

HTTP Response

class pulsar.apps.http.HttpResponse(connection)[source]

A ProtocolConsumer for the HTTP client protocol.

Initialised by a call to the HttpClient.request method.

url

The request full url.

history

List of HttpResponse objects from the history of the request. Any redirect responses will end up here. The list is sorted from the oldest to the most recent request.

cookies

Dictionary of cookies set by the server or None.

raw

A raw asynchronous Http response

Returns the parsed header links of the response, if any

text

Decode content as a string.

json()[source]

Decode content as a JSON object.

decode_content()[source]

Return the best possible representation of the response body.

raise_for_status()[source]

Raises stored HTTPError or URLError, if occurred.

info()[source]

Required by python CookieJar.

Return headers.

OAuth1

class pulsar.apps.http.oauth.OAuth1(client_id=None, client=None, **kw)[source]

Add OAuth1 authentication to pulsar HttpClient

OAuth2

class pulsar.apps.http.oauth.OAuth2(client_id=None, client=None, **kw)[source]

Add OAuth2 authentication to pulsar HttpClient