diff --git a/test/test_func.py b/test/test_func.py index dd25077..0478d70 100644 --- a/test/test_func.py +++ b/test/test_func.py @@ -21,6 +21,7 @@ OFFLINE_DEST = "9ZI1KN~C6ITmw2xGljY1p~36anL3uXpItFUschg0-~-ly4Q0Bh0wtbja6MJNZrAk SESSION_DEST_MAP = { "ppserver": SERVER_DEST_B64, "ppclient": CLIENT_DEST_B64, + "failedaccept": SERVER_DEST_B64, "unknowntest": CLIENT_DEST_B64, "offlinetest": CLIENT_DEST_B64 } @@ -50,8 +51,11 @@ async def fake_sam_server_handler(reader, writer): CLIENT_DEST.base64, CLIENT_DEST.private_key.base64).encode()) elif msg.cmd == "SESSION" and msg.action == "CREATE": session = msg["ID"] - writer.write("SESSION STATUS RESULT=OK DESTINATION={}\n".format( - SESSION_DEST_MAP[session]).encode()) + if session == "duplicatedid": + writer.write("SESSION STATUS RESULT=DUPLICATED_ID\n".encode()) + else: + writer.write("SESSION STATUS RESULT=OK DESTINATION={}\n".format( + SESSION_DEST_MAP[session]).encode()) elif msg.cmd == "STREAM" and msg.action == "ACCEPT": session = msg["ID"] if session == "ppserver": @@ -61,6 +65,8 @@ async def fake_sam_server_handler(reader, writer): await asyncio.sleep(0.1) data_transfer = True writer.write(b"PING") + elif session == "failedaccept": + writer.write(b"STREAM STATUS RESULT=I2P_ERROR\n") elif msg.cmd == "STREAM" and msg.action == "CONNECT": session = msg["ID"] if session == "ppclient": @@ -161,6 +167,37 @@ class TestFuncPingPong(unittest.TestCase): self.runner(context_managers_test) + def test_create_session_with_base64_destination(self): + async def coro(): + _, session_writer = await i2plib.create_session("ppserver", + destination=SERVER_DEST_B64, + sam_address=self.sam_address, loop=self.loop) + session_writer.close() + + self.runner(coro) + + @unittest.skipIf(REAL_SAM, "real SAM will not fail") + def test_duplicated_id(self): + async def coro(): + with self.assertRaises(i2plib.DuplicatedId): + _, client_session_writer = await i2plib.create_session("duplicatedid", + sam_address=self.sam_address, loop=self.loop) + + self.runner(coro) + + @unittest.skipIf(REAL_SAM, "real SAM will not fail") + def test_fail_accept(self): + async def coro(): + _, client_session_writer = await i2plib.create_session("failedaccept", + sam_address=self.sam_address, loop=self.loop) + with self.assertRaises(i2plib.I2PError): + server_reader, server_writer = await i2plib.stream_accept("failedaccept", + sam_address=self.sam_address, loop=self.loop) + + client_session_writer.close() + + self.runner(coro) + def test_unknown_dest(self): async def coro(): _, client_session_writer = await i2plib.create_session("unknowntest",