Add more tests

master
Viktor Villainov 2018-12-31 10:02:37 -05:00
parent 21ccb2ec0f
commit 6879b694c8
No known key found for this signature in database
GPG Key ID: 8EB38B46F33BAF2F
1 changed files with 39 additions and 2 deletions

View File

@ -21,6 +21,7 @@ OFFLINE_DEST = "9ZI1KN~C6ITmw2xGljY1p~36anL3uXpItFUschg0-~-ly4Q0Bh0wtbja6MJNZrAk
SESSION_DEST_MAP = {
"ppserver": SERVER_DEST_B64, "ppclient": CLIENT_DEST_B64,
"failedaccept": SERVER_DEST_B64,
"unknowntest": CLIENT_DEST_B64,
"offlinetest": CLIENT_DEST_B64
}
@ -50,8 +51,11 @@ async def fake_sam_server_handler(reader, writer):
CLIENT_DEST.base64, CLIENT_DEST.private_key.base64).encode())
elif msg.cmd == "SESSION" and msg.action == "CREATE":
session = msg["ID"]
writer.write("SESSION STATUS RESULT=OK DESTINATION={}\n".format(
SESSION_DEST_MAP[session]).encode())
if session == "duplicatedid":
writer.write("SESSION STATUS RESULT=DUPLICATED_ID\n".encode())
else:
writer.write("SESSION STATUS RESULT=OK DESTINATION={}\n".format(
SESSION_DEST_MAP[session]).encode())
elif msg.cmd == "STREAM" and msg.action == "ACCEPT":
session = msg["ID"]
if session == "ppserver":
@ -61,6 +65,8 @@ async def fake_sam_server_handler(reader, writer):
await asyncio.sleep(0.1)
data_transfer = True
writer.write(b"PING")
elif session == "failedaccept":
writer.write(b"STREAM STATUS RESULT=I2P_ERROR\n")
elif msg.cmd == "STREAM" and msg.action == "CONNECT":
session = msg["ID"]
if session == "ppclient":
@ -161,6 +167,37 @@ class TestFuncPingPong(unittest.TestCase):
self.runner(context_managers_test)
def test_create_session_with_base64_destination(self):
async def coro():
_, session_writer = await i2plib.create_session("ppserver",
destination=SERVER_DEST_B64,
sam_address=self.sam_address, loop=self.loop)
session_writer.close()
self.runner(coro)
@unittest.skipIf(REAL_SAM, "real SAM will not fail")
def test_duplicated_id(self):
async def coro():
with self.assertRaises(i2plib.DuplicatedId):
_, client_session_writer = await i2plib.create_session("duplicatedid",
sam_address=self.sam_address, loop=self.loop)
self.runner(coro)
@unittest.skipIf(REAL_SAM, "real SAM will not fail")
def test_fail_accept(self):
async def coro():
_, client_session_writer = await i2plib.create_session("failedaccept",
sam_address=self.sam_address, loop=self.loop)
with self.assertRaises(i2plib.I2PError):
server_reader, server_writer = await i2plib.stream_accept("failedaccept",
sam_address=self.sam_address, loop=self.loop)
client_session_writer.close()
self.runner(coro)
def test_unknown_dest(self):
async def coro():
_, client_session_writer = await i2plib.create_session("unknowntest",