You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

254 lines
8.5 KiB

#
# This file is part of pysnmp software.
#
# Copyright (c) 2005-2016, Ilya Etingof <ilya@glas.net>
# License: http://pysnmp.sf.net/license.html
#
from pyasn1.type import univ
from pysnmp.proto import rfc1155, rfc1157, error
from pysnmp import nextid
# Shortcuts to SNMP types
Integer = univ.Integer
OctetString = univ.OctetString
Null = univ.Null
null = Null('')
ObjectIdentifier = univ.ObjectIdentifier
IpAddress = rfc1155.IpAddress
NetworkAddress = rfc1155.NetworkAddress
Counter = rfc1155.Counter
Gauge = rfc1155.Gauge
TimeTicks = rfc1155.TimeTicks
Opaque = rfc1155.Opaque
VarBind = rfc1157.VarBind
VarBindList = rfc1157.VarBindList
GetRequestPDU = rfc1157.GetRequestPDU
GetNextRequestPDU = rfc1157.GetNextRequestPDU
GetResponsePDU = rfc1157.GetResponsePDU
SetRequestPDU = rfc1157.SetRequestPDU
TrapPDU = rfc1157.TrapPDU
Message = rfc1157.Message
class VarBindAPI:
def setOIDVal(self, varBind, oidVal):
(oid, val) = oidVal
varBind.setComponentByPosition(0, oid)
if val is None:
val = null
varBind.setComponentByPosition(1).getComponentByPosition(1).setComponentByType(val.getTagSet(), val, 1, verifyConstraints=False)
return varBind
def getOIDVal(self, varBind):
return varBind[0], varBind[1].getComponent(1)
apiVarBind = VarBindAPI()
getNextRequestID = nextid.Integer(0xffffff)
class PDUAPI:
_errorStatus = rfc1157._errorStatus.clone(0)
_errorIndex = Integer(0)
def setDefaults(self, pdu):
pdu.setComponentByPosition(
0, getNextRequestID(), verifyConstraints=False
)
pdu.setComponentByPosition(
1, self._errorStatus, verifyConstraints=False
)
pdu.setComponentByPosition(
2, self._errorIndex, verifyConstraints=False
)
pdu.setComponentByPosition(3)
def getRequestID(self, pdu):
return pdu.getComponentByPosition(0)
def setRequestID(self, pdu, value):
pdu.setComponentByPosition(0, value)
def getErrorStatus(self, pdu):
return pdu.getComponentByPosition(1)
def setErrorStatus(self, pdu, value):
pdu.setComponentByPosition(1, value)
def getErrorIndex(self, pdu, muteErrors=False):
errorIndex = pdu.getComponentByPosition(2)
if errorIndex > len(pdu[3]):
if muteErrors:
return errorIndex.clone(len(pdu[3]))
raise error.ProtocolError(
'Error index out of range: %s > %s' % (errorIndex, len(pdu[3]))
)
return errorIndex
def setErrorIndex(self, pdu, value):
pdu.setComponentByPosition(2, value)
def setEndOfMibError(self, pdu, errorIndex):
self.setErrorIndex(pdu, errorIndex)
self.setErrorStatus(pdu, 2)
def setNoSuchInstanceError(self, pdu, errorIndex):
self.setEndOfMibError(pdu, errorIndex)
def getVarBindList(self, pdu):
return pdu.getComponentByPosition(3)
def setVarBindList(self, pdu, varBindList):
varBindList = pdu.setComponentByPosition(3, varBindList)
def getVarBinds(self, pdu):
varBinds = []
for varBind in pdu.getComponentByPosition(3):
varBinds.append(apiVarBind.getOIDVal(varBind))
return varBinds
def setVarBinds(self, pdu, varBinds):
varBindList = pdu.setComponentByPosition(3).getComponentByPosition(3)
varBindList.clear()
idx = 0
for varBind in varBinds:
if isinstance(varBind, VarBind):
varBindList.setComponentByPosition(idx, varBind)
else:
varBindList.setComponentByPosition(idx)
apiVarBind.setOIDVal(
varBindList.getComponentByPosition(idx), varBind
)
idx += 1
def getResponse(self, reqPDU):
rspPDU = GetResponsePDU()
self.setDefaults(rspPDU)
self.setRequestID(rspPDU, self.getRequestID(reqPDU))
return rspPDU
def getVarBindTable(self, reqPDU, rspPDU):
if apiPDU.getErrorStatus(rspPDU) == 2:
varBindRow = []
for varBind in apiPDU.getVarBinds(reqPDU):
varBindRow.append((varBind[0], null))
return [varBindRow]
else:
return [apiPDU.getVarBinds(rspPDU)]
apiPDU = PDUAPI()
class TrapPDUAPI:
_networkAddress = None
_entOid = ObjectIdentifier((1, 3, 6, 1, 4, 1, 20408))
_genericTrap = rfc1157._genericTrap.clone('coldStart')
_zeroInt = univ.Integer(0)
_zeroTime = TimeTicks(0)
def setDefaults(self, pdu):
if self._networkAddress is None:
try:
import socket
agentAddress = IpAddress(socket.gethostbyname(socket.gethostname()))
except:
agentAddress = IpAddress('0.0.0.0')
self._networkAddress = NetworkAddress().setComponentByPosition(0, agentAddress)
pdu.setComponentByPosition(0, self._entOid, verifyConstraints=False)
pdu.setComponentByPosition(1, self._networkAddress, verifyConstraints=False)
pdu.setComponentByPosition(2, self._genericTrap, verifyConstraints=False)
pdu.setComponentByPosition(3, self._zeroInt, verifyConstraints=False)
pdu.setComponentByPosition(4, self._zeroTime, verifyConstraints=False)
pdu.setComponentByPosition(5)
def getEnterprise(self, pdu):
return pdu.getComponentByPosition(0)
def setEnterprise(self, pdu, value):
pdu.setComponentByPosition(0, value)
def getAgentAddr(self, pdu):
return pdu.getComponentByPosition(1).getComponentByPosition(0)
def setAgentAddr(self, pdu, value):
pdu.setComponentByPosition(1).getComponentByPosition(1).setComponentByPosition(0, value)
def getGenericTrap(self, pdu):
return pdu.getComponentByPosition(2)
def setGenericTrap(self, pdu, value):
pdu.setComponentByPosition(2, value)
def getSpecificTrap(self, pdu):
return pdu.getComponentByPosition(3)
def setSpecificTrap(self, pdu, value):
pdu.setComponentByPosition(3, value)
def getTimeStamp(self, pdu):
return pdu.getComponentByPosition(4)
def setTimeStamp(self, pdu, value):
pdu.setComponentByPosition(4, value)
def getVarBindList(self, pdu):
return pdu.getComponentByPosition(5)
def setVarBindList(self, pdu, varBindList):
varBindList = pdu.setComponentByPosition(5, varBindList)
def getVarBinds(self, pdu):
varBinds = []
for varBind in pdu.getComponentByPosition(5):
varBinds.append(apiVarBind.getOIDVal(varBind))
return varBinds
def setVarBinds(self, pdu, varBinds):
varBindList = pdu.setComponentByPosition(5).getComponentByPosition(5)
varBindList.clear()
idx = 0
for varBind in varBinds:
if isinstance(varBind, VarBind):
varBindList.setComponentByPosition(idx, varBind)
else:
varBindList.setComponentByPosition(idx)
apiVarBind.setOIDVal(
varBindList.getComponentByPosition(idx), varBind
)
idx = idx + 1
apiTrapPDU = TrapPDUAPI()
class MessageAPI:
_version = rfc1157._version.clone(0)
_community = univ.OctetString('public')
def setDefaults(self, msg):
msg.setComponentByPosition(0, self._version, verifyConstraints=False)
msg.setComponentByPosition(1, self._community, verifyConstraints=False)
return msg
def getVersion(self, msg):
return msg.getComponentByPosition(0)
def setVersion(self, msg, value):
msg.setComponentByPosition(0, value)
def getCommunity(self, msg):
return msg.getComponentByPosition(1)
def setCommunity(self, msg, value):
msg.setComponentByPosition(1, value)
def getPDU(self, msg):
return msg.getComponentByPosition(2).getComponent()
def setPDU(self, msg, value):
msg.setComponentByPosition(2).getComponentByPosition(2).setComponentByType(value.getTagSet(), value, 1, verifyConstraints=False)
def getResponse(self, reqMsg):
rspMsg = Message()
self.setDefaults(rspMsg)
self.setVersion(rspMsg, self.getVersion(reqMsg))
self.setCommunity(rspMsg, self.getCommunity(reqMsg))
self.setPDU(rspMsg, apiPDU.getResponse(self.getPDU(reqMsg)))
return rspMsg
apiMessage = MessageAPI()