Add async context managers

master
Viktor Villainov 2018-12-26 06:15:57 -05:00
parent aa58132220
commit 750b293cb1
No known key found for this signature in database
GPG Key ID: 8EB38B46F33BAF2F
4 changed files with 133 additions and 29 deletions

View File

@ -23,6 +23,37 @@ an :class:`asyncio.StreamWriter` instance.
.. autofunction:: stream_accept
.. autofunction:: get_sam_socket
Context managers
----------------
The following are asynchronous context managers for making I2P connections.
You can use them like that:
::
import asyncio
import i2plib
async def connect_test(destination):
session_name = "test"
async with i2plib.Session(session_name):
async with i2plib.StreamConnection(session_name, destination) as c:
c.write(b"PING")
resp = await c.read(4096)
print(resp)
loop = asyncio.get_event_loop()
loop.run_until_complete(connect_test("dummy.i2p"))
loop.stop()
.. autoclass:: i2plib.Session
:members:
.. autoclass:: i2plib.StreamConnection
:members:
Utilities
---------

View File

@ -4,45 +4,36 @@ from urllib.parse import urlparse
import i2plib
async def http_get(sam_address, loop, session_name, url):
url = urlparse(url)
r, w = await i2plib.stream_connect(session_name, url.netloc,
sam_address=sam_address, loop=loop)
w.write("GET {} HTTP/1.0\nHost: {}\r\n\r\n".format(
url.path, url.netloc).encode())
buflen, resp = 4096, b""
while 1:
data = await r.read(buflen)
if len(data) > 0:
resp += data
else:
break
w.close()
try:
return resp.split(b"\r\n\r\n", 1)[1].decode()
except IndexError:
return resp.decode()
async def wget(sam_address, loop, url):
session_name = "wget"
await i2plib.create_session(session_name, sam_address=sam_address, loop=loop)
url = urlparse(url)
buflen, resp = 4096, b""
res = await http_get(sam_address, loop, session_name, url)
print(res)
async with i2plib.Session(session_name, sam_address=sam_address, loop=loop):
async with i2plib.StreamConnection(session_name, url.netloc, loop=loop,
sam_address=sam_address) as c:
c.write("GET {} HTTP/1.0\nHost: {}\r\n\r\n".format(
url.path, url.netloc).encode())
while 1:
data = await c.read(buflen)
if len(data) > 0:
resp += data
else:
break
try:
print(resp.split(b"\r\n\r\n", 1)[1].decode())
except IndexError:
print(resp.decode())
if __name__ == "__main__":
sam_address = i2plib.get_sam_address()
if len(sys.argv) == 2:
url = sys.argv[1]
if not url.startswith("http://"):
url = "http://" + url
loop = asyncio.get_event_loop()
loop.run_until_complete(wget(sam_address, loop, url))
loop.run_until_complete(wget(i2plib.get_sam_address(), loop, url))
loop.stop()
loop.close()
else:

View File

@ -11,7 +11,8 @@ from .sam import Destination, PrivateKey
from .aiosam import (
get_sam_socket, dest_lookup, new_destination,
create_session, stream_connect, stream_accept
create_session, stream_connect, stream_accept,
Session, StreamConnection
)
from .tunnel import ClientTunnel, ServerTunnel

View File

@ -151,3 +151,84 @@ async def stream_accept(session_name, sam_address=i2plib.sam.DEFAULT_ADDRESS,
return (reader, writer)
else:
raise i2plib.exceptions.SAM_EXCEPTIONS[reply["RESULT"]]()
### Context managers
class Session:
"""Async SAM session context manager.
:param session_name: Session nick name
:param sam_address: (optional) SAM API address
:param loop: (optional) Event loop instance
:param style: (optional) Session style, can be STREAM, DATAGRAM, RAW
:param signature_type: (optional) If the destination is TRANSIENT, this
signature type is used
:param destination: (optional) Destination to use in this session. Can be
a base64 encoded string, :class:`i2plib.Destination`
instance or None. TRANSIENT destination is used when it
is None.
:param options: (optional) A dict object with i2cp options
:return: :class:`i2plib.Session` object
"""
def __init__(self, session_name, sam_address=i2plib.sam.DEFAULT_ADDRESS,
loop=None, style="STREAM",
signature_type=i2plib.sam.Destination.default_sig_type,
destination=None, options={}):
self.session_name = session_name
self.sam_address = sam_address
self.loop = loop
self.style = style
self.signature_type = signature_type
self.destination = destination
self.options = options
async def __aenter__(self):
self.reader, self.writer = await create_session(self.session_name,
sam_address=self.sam_address, loop=self.loop, style=self.style,
signature_type=self.signature_type,
destination=self.destination, options=self.options)
return self
async def __aexit__(self, exc_type, exc, tb):
### TODO handle exceptions
self.writer.close()
class StreamConnection:
"""Async stream connection context manager.
:param session_name: Session nick name
:param destination: I2P destination to connect to
:param sam_address: (optional) SAM API address
:param loop: (optional) Event loop instance
:return: :class:`i2plib.StreamConnection` object
"""
def __init__(self, session_name, destination,
sam_address=i2plib.sam.DEFAULT_ADDRESS, loop=None):
self.session_name = session_name
self.sam_address = sam_address
self.loop = loop
self.destination = destination
async def __aenter__(self):
self.reader, self.writer = await stream_connect(self.session_name,
self.destination, sam_address=self.sam_address, loop=self.loop)
return self
async def read(self, length):
"""Read data from socket
:param length: buffer length
:return: data
"""
return await self.reader.read(length)
def write(self, data):
"""Write data to socket
:param data: data
"""
self.writer.write(data)
async def __aexit__(self, exc_type, exc, tb):
### TODO handle exceptions
self.writer.close()