Python SSH服务器(twisted.conch)命令过滤和端口转发
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python SSH服务器(twisted.conch)命令过滤和端口转发相关的知识,希望对你有一定的参考价值。
我需要创建一个SSH服务器(已为该作业选择twisted.conch),它将执行以下操作:
- 执行端口转发(附加的代码不这样做,我不知道要修改什么)
- 在命令执行之前过滤命令(或者至少在之前或之后记录它们)。
下面附带的代码创建了一个完美的SSH和SFTP服务器,但它缺少一个主要组件 - 端口转发(和命令过滤,但这不像端口转发那么重要)
我尽可能搜索,但找不到这两个..请帮助我, - 这是拼图的最后一个平安。
#!/usr/bin/env python
from twisted.conch.unix import UnixSSHRealm
from twisted.cred.portal import Portal
from twisted.cred.credentials import IUsernamePassword
from twisted.cred.checkers import ICredentialsChecker
from twisted.cred.error import UnauthorizedLogin
from twisted.conch.ssh.factory import SSHFactory
from twisted.internet import reactor, defer
from twisted.conch.ssh.transport import SSHServerTransport
from twisted.conch.ssh.userauth import SSHUserAuthServer
from twisted.conch.ssh.connection import SSHConnection
from twisted.conch.ssh.keys import Key
from zope.interface import implements
from subprocess import Popen,PIPE
from crypt import crypt
publicKey = 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az64fxjHf9imyRJbixtQhlH9lfNjUIx+4LmrJH5QNRsFporcHDKOTwTTYLh5KmRpslkYHRivcJSkbh/C+BR3utDS555mV'
privateKey = """-----BEGIN RSA PRIVATE KEY-----
MIIByAIBAAJhAK8ycfDmDpyZs3+LXwRLy4vA1T6yd/3PZNiPwM+uH8Yx3/YpskSW
4sbUIZR/ZXzY1CMfuC5qyR+UDUbBaaK3Bwyjk8E02C4eSpkabJZGB0Yr3CUpG4fw
vgUd7rQ0ueeZlQIBIwJgbh+1VZfr7WftK5lu7MHtqE1S1vPWZQYE3+VUn8yJADyb
Z4fsZaCrzW9lkIqXkE3GIY+ojdhZhkO1gbG0118sIgphwSWKRxK0mvh6ERxKqIt1
xJEJO74EykXZV4oNJ8sjAjEA3J9r2ZghVhGN6V8DnQrTk24Td0E8hU8AcP0FVP+8
PQm/g/aXf2QQkQT+omdHVEJrAjEAy0pL0EBH6EVS98evDCBtQw22OZT52qXlAwZ2
gyTriKFVoqjeEjt3SZKKqXHSApP/AjBLpF99zcJJZRq2abgYlf9lv1chkrWqDHUu
DZttmYJeEfiFBBavVYIF1dOlZT0G8jMCMBc7sOSZodFnAiryP+Qg9otSBjJ3bQML
pSTqy7c3a2AScC/YyOwkDaICHnnD3XyjMwIxALRzl0tQEKMXs6hH8ToUdlLROCrP
EhQ0wahUTCk1gKA4uPD6TMTChavbh4K63OvbKg==
-----END RSA PRIVATE KEY-----"""
# check if username/password is valid
def checkPassword(username,password):
try:
ret=False
if username and password:
output=Popen(["grep",username,"/etc/shadow"],stdout=PIPE,stderr=PIPE).communicate()[0]
hash=""
if output:
tmp=output.split(":")
if tmp>=2:
hash=tmp[1]
del tmp
ret=crypt(password,hash)==hash
del output,hash
except Exception,e:
ret=False
return ret
# authorization methods
class XSSHAuth(object):
credentialInterfaces=IUsernamePassword,implements(ICredentialsChecker)
def requestAvatarId(self, credentials):
#print "Credentials:",credentials.username,credentials.password
if credentials.username=="root" and credentials.password and checkPassword(credentials.username,credentials.password):
# successful authorization
return defer.succeed(credentials.username)
# failed authorization
return defer.fail(UnauthorizedLogin("invalid password"))
class XSSHUserAuthServer(SSHUserAuthServer):
def _ebPassword(self, reason):
addr = self.transport.getPeer().address
if addr.host!="3.22.116.85" and addr.host!="127.0.0.1":
p1 = Popen(["iptables","-I","INPUT","-s",addr.host,"-j","DROP"], stdout=PIPE, stderr=PIPE)
p1.communicate()
print(addr.host, addr.port, self.user, self.method)
self.transport.loseConnection()
return defer.fail(UnauthorizedLogin("invalid password"))
# the transport class - we use it to log MOST OF THE ACTIONS executed thru the server
class XSSHTransport(SSHServerTransport):
ourVersionString="SSH-2.0-X"
logCommand=""
def connectionMade(self):
print "Connection made",self.getPeer()
SSHServerTransport.connectionMade(self)
#self.transport.loseConnection()
def connectionLost(self,reason):
print "Connection closed",self.getPeer()
SSHServerTransport.connectionLost(self,reason)
def dataReceived(self, data):
SSHServerTransport.dataReceived(self,data)
def dispatchMessage(self, messageNum, payload):
SSHServerTransport.dispatchMessage(self,messageNum,payload)
# start the server
class XSSHFactory(SSHFactory):
protocol=XSSHTransport
factory = XSSHFactory()
factory.publicKeys = {'ssh-rsa': Key.fromString(data=publicKey)}
factory.privateKeys = {'ssh-rsa': Key.fromString(data=privateKey)}
factory.services = {
'ssh-userauth': XSSHUserAuthServer,
'ssh-connection': SSHConnection
}
portal=Portal(UnixSSHRealm())
portal.registerChecker(XSSHAuth())
factory.portal=portal
reactor.listenTCP(22, factory)
reactor.run()
答案
既然你正在使用实现UnixConchUser
的global_tcpip_forward
,它确实可行。当我运行你的例子并用ssh -L4321:remote.host:1234 root@localhost -p 2222
和telnet localhost 4321
连接到它时,我会通过remote.host 1234
进行隧道传输。您必须更详细地说明您的问题。
另一答案
命令日志可以在dataReceived(self, data)
中完成:
def dataReceived(self, data):
SSHServerTransport.dataReceived(self,data)
self.buf += data
if data == '
':
cmd = self.buf
self.buf = ''
但它无法很好地处理删除键,制表符,向上箭头,向下箭头和其他特殊字符。我想知道你最后得到的命令。
以上是关于Python SSH服务器(twisted.conch)命令过滤和端口转发的主要内容,如果未能解决你的问题,请参考以下文章
python自动化管理sshy(ssh,ssh-copy-id,ssh-agent)
python自动化管理sshy(ssh,ssh-copy-id,ssh-agent)
python ssh登录网管后继续ssh登录其它机器,咋写接下来的代码?