commit - 2e30f2846c31d0bcd48b9bbfa5158f0f943eabfd
commit + aa67e876cf7e27d67a6f9baf186bf2b2a70cad4e
blob - d22895cbcdc7e44fbc1c8c9c1673a3397ba7b965
blob + 33496543afbf40e7153e151911ee656ebfb30b8f
--- dulwich/contrib/test_paramiko_vendor.py
+++ dulwich/contrib/test_paramiko_vendor.py
try:
import paramiko
except ImportError:
- paramiko = None
+ has_paramiko = False
else:
+ has_paramiko = True
from dulwich.contrib.paramiko_vendor import ParamikoSSHVendor
+ class Server(paramiko.ServerInterface):
+ """http://docs.paramiko.org/en/2.4/api/server.html"""
+ def __init__(self, commands, *args, **kwargs):
+ super(Server, self).__init__(*args, **kwargs)
+ self.commands = commands
+ def check_channel_exec_request(self, channel, command):
+ self.commands.append(command)
+ return True
+
+ def check_auth_password(self, username, password):
+ if username == USER and password == PASSWORD:
+ return paramiko.AUTH_SUCCESSFUL
+ return paramiko.AUTH_FAILED
+
+ def check_auth_publickey(self, username, key):
+ pubkey = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY))
+ if username == USER and key == pubkey:
+ return paramiko.AUTH_SUCCESSFUL
+ return paramiko.AUTH_FAILED
+
+ def check_channel_request(self, kind, chanid):
+ if kind == "session":
+ return paramiko.OPEN_SUCCEEDED
+ return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
+
+ def get_allowed_auths(self, username):
+ return "password,publickey"
+
+
USER = 'testuser'
PASSWORD = 'test'
SERVER_KEY = """\
-----END RSA PRIVATE KEY-----"""
-if paramiko is not None:
- class Server(paramiko.ServerInterface):
- """http://docs.paramiko.org/en/2.4/api/server.html"""
- def __init__(self, commands, *args, **kwargs):
- super(Server, self).__init__(*args, **kwargs)
- self.commands = commands
-
- def check_channel_exec_request(self, channel, command):
- self.commands.append(command)
- return True
-
- def check_auth_password(self, username, password):
- if username == USER and password == PASSWORD:
- return paramiko.AUTH_SUCCESSFUL
- return paramiko.AUTH_FAILED
-
- def check_auth_publickey(self, username, key):
- pubkey = paramiko.RSAKey.from_private_key(StringIO(CLIENT_KEY))
- if username == USER and key == pubkey:
- return paramiko.AUTH_SUCCESSFUL
- return paramiko.AUTH_FAILED
-
- def check_channel_request(self, kind, chanid):
- if kind == "session":
- return paramiko.OPEN_SUCCEEDED
- return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
-
- def get_allowed_auths(self, username):
- return "password,publickey"
-
-
-@skipIf(paramiko is None, "paramiko is not installed")
+@skipIf(not has_paramiko, "paramiko is not installed")
class ParamikoSSHVendorTests(TestCase):
def setUp(self):