From 750b293cb1da07ac929b1e271773e621af139b30 Mon Sep 17 00:00:00 2001 From: Viktor Villainov Date: Wed, 26 Dec 2018 06:15:57 -0500 Subject: [PATCH] Add async context managers --- docs/api.rst | 31 +++++++++++++++++ docs/examples/wget.py | 47 ++++++++++--------------- i2plib/__init__.py | 3 +- i2plib/aiosam.py | 81 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 133 insertions(+), 29 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 8c42dc8..c7fc255 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -23,6 +23,37 @@ an :class:`asyncio.StreamWriter` instance. .. autofunction:: stream_accept .. autofunction:: get_sam_socket +Context managers +---------------- + +The following are asynchronous context managers for making I2P connections. + +You can use them like that: + +:: + + import asyncio + import i2plib + + async def connect_test(destination): + session_name = "test" + + async with i2plib.Session(session_name): + async with i2plib.StreamConnection(session_name, destination) as c: + c.write(b"PING") + resp = await c.read(4096) + + print(resp) + + loop = asyncio.get_event_loop() + loop.run_until_complete(connect_test("dummy.i2p")) + loop.stop() + +.. autoclass:: i2plib.Session + :members: +.. autoclass:: i2plib.StreamConnection + :members: + Utilities --------- diff --git a/docs/examples/wget.py b/docs/examples/wget.py index ed6813b..7723e67 100644 --- a/docs/examples/wget.py +++ b/docs/examples/wget.py @@ -4,45 +4,36 @@ from urllib.parse import urlparse import i2plib -async def http_get(sam_address, loop, session_name, url): - url = urlparse(url) - r, w = await i2plib.stream_connect(session_name, url.netloc, - sam_address=sam_address, loop=loop) - - w.write("GET {} HTTP/1.0\nHost: {}\r\n\r\n".format( - url.path, url.netloc).encode()) - - buflen, resp = 4096, b"" - while 1: - data = await r.read(buflen) - if len(data) > 0: - resp += data - else: - break - - w.close() - try: - return resp.split(b"\r\n\r\n", 1)[1].decode() - except IndexError: - return resp.decode() - async def wget(sam_address, loop, url): session_name = "wget" - await i2plib.create_session(session_name, sam_address=sam_address, loop=loop) + url = urlparse(url) + buflen, resp = 4096, b"" - res = await http_get(sam_address, loop, session_name, url) - print(res) + async with i2plib.Session(session_name, sam_address=sam_address, loop=loop): + async with i2plib.StreamConnection(session_name, url.netloc, loop=loop, + sam_address=sam_address) as c: + c.write("GET {} HTTP/1.0\nHost: {}\r\n\r\n".format( + url.path, url.netloc).encode()) + + while 1: + data = await c.read(buflen) + if len(data) > 0: + resp += data + else: + break + try: + print(resp.split(b"\r\n\r\n", 1)[1].decode()) + except IndexError: + print(resp.decode()) if __name__ == "__main__": - sam_address = i2plib.get_sam_address() - if len(sys.argv) == 2: url = sys.argv[1] if not url.startswith("http://"): url = "http://" + url loop = asyncio.get_event_loop() - loop.run_until_complete(wget(sam_address, loop, url)) + loop.run_until_complete(wget(i2plib.get_sam_address(), loop, url)) loop.stop() loop.close() else: diff --git a/i2plib/__init__.py b/i2plib/__init__.py index c6b574d..afd0fee 100644 --- a/i2plib/__init__.py +++ b/i2plib/__init__.py @@ -11,7 +11,8 @@ from .sam import Destination, PrivateKey from .aiosam import ( get_sam_socket, dest_lookup, new_destination, - create_session, stream_connect, stream_accept + create_session, stream_connect, stream_accept, + Session, StreamConnection ) from .tunnel import ClientTunnel, ServerTunnel diff --git a/i2plib/aiosam.py b/i2plib/aiosam.py index a2423e3..60b8d75 100644 --- a/i2plib/aiosam.py +++ b/i2plib/aiosam.py @@ -151,3 +151,84 @@ async def stream_accept(session_name, sam_address=i2plib.sam.DEFAULT_ADDRESS, return (reader, writer) else: raise i2plib.exceptions.SAM_EXCEPTIONS[reply["RESULT"]]() + +### Context managers + +class Session: + """Async SAM session context manager. + + :param session_name: Session nick name + :param sam_address: (optional) SAM API address + :param loop: (optional) Event loop instance + :param style: (optional) Session style, can be STREAM, DATAGRAM, RAW + :param signature_type: (optional) If the destination is TRANSIENT, this + signature type is used + :param destination: (optional) Destination to use in this session. Can be + a base64 encoded string, :class:`i2plib.Destination` + instance or None. TRANSIENT destination is used when it + is None. + :param options: (optional) A dict object with i2cp options + :return: :class:`i2plib.Session` object + """ + def __init__(self, session_name, sam_address=i2plib.sam.DEFAULT_ADDRESS, + loop=None, style="STREAM", + signature_type=i2plib.sam.Destination.default_sig_type, + destination=None, options={}): + self.session_name = session_name + self.sam_address = sam_address + self.loop = loop + self.style = style + self.signature_type = signature_type + self.destination = destination + self.options = options + + async def __aenter__(self): + self.reader, self.writer = await create_session(self.session_name, + sam_address=self.sam_address, loop=self.loop, style=self.style, + signature_type=self.signature_type, + destination=self.destination, options=self.options) + return self + + async def __aexit__(self, exc_type, exc, tb): + ### TODO handle exceptions + self.writer.close() + +class StreamConnection: + """Async stream connection context manager. + + :param session_name: Session nick name + :param destination: I2P destination to connect to + :param sam_address: (optional) SAM API address + :param loop: (optional) Event loop instance + :return: :class:`i2plib.StreamConnection` object + """ + def __init__(self, session_name, destination, + sam_address=i2plib.sam.DEFAULT_ADDRESS, loop=None): + self.session_name = session_name + self.sam_address = sam_address + self.loop = loop + self.destination = destination + + async def __aenter__(self): + self.reader, self.writer = await stream_connect(self.session_name, + self.destination, sam_address=self.sam_address, loop=self.loop) + return self + + async def read(self, length): + """Read data from socket + + :param length: buffer length + :return: data + """ + return await self.reader.read(length) + + def write(self, data): + """Write data to socket + + :param data: data + """ + self.writer.write(data) + + async def __aexit__(self, exc_type, exc, tb): + ### TODO handle exceptions + self.writer.close()