commit aa67e876cf7e27d67a6f9baf186bf2b2a70cad4e from: Jelmer Vernooij date: Mon May 16 17:18:40 2022 UTC Refactor paramiko tests. commit - 2e30f2846c31d0bcd48b9bbfa5158f0f943eabfd commit + aa67e876cf7e27d67a6f9baf186bf2b2a70cad4e blob - d22895cbcdc7e44fbc1c8c9c1673a3397ba7b965 blob + 33496543afbf40e7153e151911ee656ebfb30b8f --- dulwich/contrib/test_paramiko_vendor.py +++ dulwich/contrib/test_paramiko_vendor.py @@ -30,11 +30,41 @@ from unittest import skipIf try: import paramiko except ImportError: - paramiko = None + has_paramiko = False else: + has_paramiko = True from dulwich.contrib.paramiko_vendor import ParamikoSSHVendor + class Server(paramiko.ServerInterface): + """http://docs.paramiko.org/en/2.4/api/server.html""" + def __init__(self, commands, *args, **kwargs): + super(Server, self).__init__(*args, **kwargs) + self.commands = commands + def check_channel_exec_request(self, channel, command): + self.commands.append(command) + return True + + def check_auth_password(self, username, password): + if username == USER and password == PASSWORD: + return paramiko.AUTH_SUCCESSFUL + return paramiko.AUTH_FAILED + + def check_auth_publickey(self, username, key): + pubkey = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY)) + if username == USER and key == pubkey: + return paramiko.AUTH_SUCCESSFUL + return paramiko.AUTH_FAILED + + def check_channel_request(self, kind, chanid): + if kind == "session": + return paramiko.OPEN_SUCCEEDED + return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED + + def get_allowed_auths(self, username): + return "password,publickey" + + USER = 'testuser' PASSWORD = 'test' SERVER_KEY = """\ @@ -95,38 +125,7 @@ WxtWBWHwxfSmqgTXilEA3ALJp0kNolLnEttnhENwJpZHlqtes0ZA4w -----END RSA PRIVATE KEY-----""" -if paramiko is not None: - class Server(paramiko.ServerInterface): - """http://docs.paramiko.org/en/2.4/api/server.html""" - def __init__(self, commands, *args, **kwargs): - super(Server, self).__init__(*args, **kwargs) - self.commands = commands - - def check_channel_exec_request(self, channel, command): - self.commands.append(command) - return True - - def check_auth_password(self, username, password): - if username == USER and password == PASSWORD: - return paramiko.AUTH_SUCCESSFUL - return paramiko.AUTH_FAILED - - def check_auth_publickey(self, username, key): - pubkey = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY)) - if username == USER and key == pubkey: - return paramiko.AUTH_SUCCESSFUL - return paramiko.AUTH_FAILED - - def check_channel_request(self, kind, chanid): - if kind == "session": - return paramiko.OPEN_SUCCEEDED - return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED - - def get_allowed_auths(self, username): - return "password,publickey" - - -@skipIf(paramiko is None, "paramiko is not installed") +@skipIf(not has_paramiko, "paramiko is not installed") class ParamikoSSHVendorTests(TestCase): def setUp(self):