Python格式的中间文件
Python脚本文件是两种中间文件格式中的一种。设备通过运行Python脚本来下载版本文件。
Python脚本文件的文件名必须以“.py”作为后缀名,格式如Python脚本文件示例所示。详细脚本文件解释请见Python脚本文件解释。
Python脚本文件示例
该脚本文件仅作为样例,用户可以根据实际开局场景进行修改。
#md5sum="5bb3da014aa88ba8111c2f1191373716" #!/usr/bin/env python # # Copyright (C) Huawei Technologies Co., Ltd. 2008-2013. All rights reserved. # ---------------------------------------------------------------------------------------------------------------------- # History: # Date Author Modification # 20180122 Author created file. # ---------------------------------------------------------------------------------------------------------------------- """ Zero Touch Provisioning (ZTP) enables devices to automatically load version files including system software, patch files, configuration files when the device starts up, the devices to be configured must be new devices or have no configuration files. This is a sample of Zero Touch Provisioning user script. You can customize it to meet the requirements of your network environment. """ import httplib import urllib import string import re import sys import xml.etree.ElementTree as etree import os import stat import logging import traceback import hashlib import ops from urlparse import urlparse from urlparse import urlunparse from time import sleep # error code OK = 0 ERR = 1 # File server in which stores the necessary system software, configuration and patch files: # 1) Specify the file server which supports the following format. # tftp://hostname # ftp://[username[:password]@]hostname[:port] # sftp://[username[:password]@]hostname[:port] # 2) Do not add a trailing slash at the end of file server path. FILE_SERVER = 'ftp://ftpuser:Pwd123@10.1.3.2' # Remote file paths: # 1) The path may include directory name and file name. # 2) If file name is not specified, indicate the procedure can be skipped. # File paths of system software on file server, filename extension is '.cc'. REMOTE_PATH_IMAGE = { 'NE40E' : '/image/V800R011C10B092-NE40E.cc',} # File path of configuration file on file server, filename extension is '.cfg', '.zip' or '.dat'. REMOTE_PATH_CONFIG = '/conf_%s.cfg' # File path of patch file on file server, filename extension is '.pat' REMOTE_PATH_PATCH = { 'NE40E' : '/patch/NE40EV800R011C10SPH001.PAT',} # File path of md5 file, contains md5 value of image / patch / memid / license file, file extension is '.txt' REMOTE_PATH_MD5 = '/md5.txt' # Max times to retry get startup when no query result GET_STARTUP_INTERVAL = 15 # seconds MAX_TIMES_GET_STARTUP = 120 # Max times to retry CHECK_CHECK_STARTUPSOFT_INTERVAL = 5 # seconds MAX_TIMES_CHECK_STARTUPSOFT = 205 # Max times to check startup-system MAX_TIMES_CHECK_STARTUPSOFT_SLAVE = 265 # Max times to check startup-system if there is slave board # Max times to retry when download file faild MAX_TIMES_RETRY_DOWNLOAD = 3 class OPSConnection(object): """Make an OPS connection instance.""" def __init__(self, host, port = 80): self.host = host self.port = port self.headers = { "Content-type": "application/xml", "Accept": "application/xml" } self.conn = httplib.HTTPConnection(self.host, self.port) def close(self): """Close the connection""" self.conn.close() def create(self, uri, req_data): """Create a resource on the server""" ret = self._rest_call("POST", uri, req_data) return ret def delete(self, uri, req_data): """Delete a resource on the server""" ret = self._rest_call("DELETE", uri, req_data) return ret def get(self, uri, req_data = None): """Retrieve a resource from the server""" ret = self._rest_call("GET", uri, req_data) return ret def set(self, uri, req_data): """Update a resource on the server""" ret = self._rest_call("PUT", uri, req_data) return ret def _rest_call(self, method, uri, req_data): """REST call""" if req_data == None: body = "" else: body = req_data self.conn.request(method, uri, body, self.headers) response = self.conn.getresponse() ret = (response.status, response.reason, response.read()) if response.status != httplib.OK: logging.info('%s', body) logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret[0], ret[1], ret[2]) return ret class OPIExecError(Exception): """OPI executes error.""" pass class ZTPErr(Exception): """ZTP error.""" pass gbl_ops = ops.ops() def write_security_log_by_ops(message, level="info"): if level == "error": working_level = ops.ERROR else: working_level = ops.INFORMATIONAL gbl_ops.syslog(message, working_level, ops.DIAGNOSE) def get_addr_by_hostname(ops_conn, host, addr_type = '1'): """Translate a host name to IPv4 address format. The IPv4 address is returned as a string.""" logging.info("Get IP address by host name...") uri = "/dns/dnsNameResolution" root_elem = etree.Element('dnsNameResolution') etree.SubElement(root_elem, 'host').text = host etree.SubElement(root_elem, 'addrType').text = addr_type req_data = etree.tostring(root_elem, "UTF-8") ret, _, rsp_data = ops_conn.get(uri, req_data) if ret != httplib.OK: raise OPIExecError('Failed to get address by host name') root_elem = etree.fromstring(rsp_data) namespaces = {'vrp' : 'http://www.huawei.com/netconf/vrp'} uri = 'data' + uri.replace('/', '/vrp:') + '/vrp:' elem = root_elem.find(uri + "ipv4Addr", namespaces) if elem is None: raise OPIExecError('Failed to get IP address by host name') return elem.text def _ftp_download_file(ops_conn, url, local_path, vpn_Instance, ip_protocol): """Download file using FTP.""" uri = "/ftpc/ftpcTransferFiles/ftpcTransferFile" str_temp_v4 = string.Template( '''<?xml version="1.0" encoding="UTF-8"?> <ftpcTransferFile> <serverIpv4Address>$serverIp</serverIpv4Address> <commandType>get</commandType> <userName>$username</userName> <password>$password</password> <localFileName>$localPath</localFileName> <remoteFileName>$remotePath</remoteFileName> <vpnInstanceName>$vpnInstance</vpnInstanceName> </ftpcTransferFile> ''') str_temp_v6 = string.Template( '''<?xml version="1.0" encoding="UTF-8"?> <ftpcTransferFile> <serverIpv6Address>$serverIp</serverIpv6Address> <commandType>get</commandType> <userName>$username</userName> <password>$password</password> <localFileName>$localPath</localFileName> <remoteFileName>$remotePath</remoteFileName> <ipv6VpnName>$vpnInstance</ipv6VpnName> </ftpcTransferFile> ''') url_tuple = urlparse(url) if ip_protocol == 'IPv4': server_ip = get_server_ip(ops_conn, url, "ipv4") req_data = str_temp_v4.substitute(serverIp = server_ip, username = url_tuple.username, password = url_tuple.password, remotePath = url_tuple.path[1:], localPath = local_path, vpnInstance = vpn_Instance) elif ip_protocol == 'IPv6': server_ip = get_server_ip(ops_conn, url, "ipv6") req_data = str_temp_v6.substitute(serverIp = server_ip, username = url_tuple.username, password = url_tuple.password, remotePath = url_tuple.path[1:], localPath = local_path, vpnInstance = vpn_Instance) else: logging.info('Unknown protocol {0} to run ftp download'.format(ip_protocol)) req_ return ERR ret, _, rsq_data = ops_conn.create(uri, req_data) if ret != httplib.OK: logging.error('Failed to download file "%s" using FTP' % os.path.basename(local_path)) logging.error(rsq_data) return ERR return OK def _del_rsa_peer_key(ops_conn, key_name): """Delete RSA peer key configuration""" logging.info("Delete RSA peer key %s", key_name) uri = "/rsa/rsaPeerKeys/rsaPeerKey" root_elem = etree.Element('rsaPeerKey') etree.SubElement(root_elem, 'keyName').text = key_name req_data = etree.tostring(root_elem, "UTF-8") try: ret, _, _ = ops_conn.delete(uri, req_data) if ret != httplib.OK: raise OPIExecError('Failed to delete RSA peer key') except Exception, reason: logging.error(reason) def _del_sshc_rsa_key(ops_conn, server_name, key_type = 'RSA'): """Delete SSH client RSA key configuration""" logging.debug("Delete SSH client RSA key for %s", server_name) uri = "/sshc/sshCliKeyCfgs/sshCliKeyCfg" root_elem = etree.Element('sshCliKeyCfg') etree.SubElement(root_elem, 'serverName').text = server_name etree.SubElement(root_elem, 'pubKeyType').text = key_type req_data = etree.tostring(root_elem, "UTF-8") try: ret, _, _ = ops_conn.delete(uri, req_data) if ret != httplib.OK: raise OPIExecError('Failed to delete SSH client RSA key') except Exception, reason: logging.error(reason) _del_rsa_peer_key(ops_conn, server_name) def _set_sshc_first_time(ops_conn, switch): """Set SSH client attribute of authenticating user for the first time access""" if switch not in ['Enable', 'Disable']: return ERR logging.info('Set SSH client first-time enable switch = %s', switch) uri = "/sshc/sshClient" str_temp = string.Template( '''<?xml version="1.0" encoding="UTF-8"?> <sshClient> <firstTimeEnable>$enable</firstTimeEnable> </sshClient> ''') req_data = str_temp.substitute(enable = switch) ret, _, _ = ops_conn.set(uri, req_data) if ret != httplib.OK: if switch == 'Enable': raise OPIExecError('Failed to enable SSH client first-time') else: raise OPIExecError('Failed to disable SSH client first-time') return OK def _sftp_download_file(ops_conn, url, local_path, vpn_Instance, ip_protocol): """Download file using SFTP.""" _set_sshc_first_time(ops_conn, 'Enable') uri = "/sshc/sshcConnects/sshcConnect" str_temp_v4 = string.Template( '''<?xml version="1.0" encoding="UTF-8"?> <sshcConnect> <HostAddrIPv4>$serverIp</HostAddrIPv4> <commandType>get</commandType> <userName>$username</userName> <password>$password</password> <localFileName>$localPath</localFileName> <remoteFileName>$remotePath</remoteFileName> <vpnInstanceName>$vpnInstance</vpnInstanceName> <identityKey>ssh-rsa</identityKey> <transferType>SFTP</transferType> </sshcConnect> ''') str_temp_v6 = string.Template( '''<?xml version="1.0" encoding="UTF-8"?> <sshcConnect> <HostAddrIPv6>$serverIp</HostAddrIPv6> <commandType>get</commandType> <userName>$username</userName> <password>$password</password> <localFileName>$localPath</localFileName> <remoteFileName>$remotePath</remoteFileName> <ipv6VpnName>$vpnInstance</ipv6VpnName> <identityKey>ssh-rsa</identityKey> <transferType>SFTP</transferType> </sshcConnect> ''') url_tuple = urlparse(url) if ip_protocol == 'IPv4': server_ip = get_server_ip(ops_conn, url, "ipv4") req_data = str_temp_v4.substitute(serverIp = server_ip, username = url_tuple.username, password = url_tuple.password, remotePath = url_tuple.path[1:], localPath = local_path, vpnInstance = vpn_Instance) elif ip_protocol == 'IPv6': server_ip = get_server_ip(ops_conn, url, "ipv6") req_data = str_temp_v6.substitute(serverIp = server_ip, username = url_tuple.username, password = url_tuple.password, remotePath = url_tuple.path[1:], localPath = local_path, vpnInstance = vpn_Instance) else: logging.info('Unknown protocol {0} to run sftp download'.format(ip_protocol)) req_data = "" return ERR ret, _, rsq_data = ops_conn.create(uri, req_data) if ret != httplib.OK: logging.error('Failed to download file "%s" using SFTP' % os.path.basename(local_path)) logging.error(rsq_data) ret = ERR else: ret = OK _del_sshc_rsa_key(ops_conn, server_ip) _set_sshc_first_time(ops_conn, 'Disable') return ret def _tftp_download_file(ops_conn, url, local_path, vpn_Instance, ip_protocol): """Download file using TFTP.""" uri = "/tftpc/tftpcTransferFiles/tftpcTransferFile" str_temp_v4 = string.Template( '''<?xml version="1.0" encoding="UTF-8"?> <tftpcTransferFile> <serverIpv4Address>$serverIp</serverIpv4Address> <commandType>get_cmd</commandType> <localFileName>$localPath</localFileName> <remoteFileName>$remotePath</remoteFileName> <vpnInstanceName>$vpnInstance</vpnInstanceName> </tftpcTransferFile> ''') str_temp_v6 = string.Template( '''<?xml version="1.0" encoding="UTF-8"?> <tftpcTransferFile> <serverIpv6Address>$serverIp</serverIpv6Address> <commandType>get_cmd</commandType> <localFileName>$localPath</localFileName> <remoteFileName>$remotePath</remoteFileName> <ipv6VpnName>$vpnInstance</ipv6VpnName> </tftpcTransferFile> ''') url_tuple = urlparse(url) if ip_protocol == 'IPv4': server_ip = get_server_ip(ops_conn, url, "ipv4") req_data = str_temp_v4.substitute(serverIp = server_ip, remotePath = url_tuple.path[1:], localPath = local_path, vpnInstance = vpn_Instance) elif ip_protocol == 'IPv6': server_ip = get_server_ip(ops_conn, url, "ipv6") req_data = str_temp_v6.substitute(serverIp = server_ip, remotePath = url_tuple.path[1:], localPath = local_path, vpnInstance = vpn_Instance) else: logging.info('Unknown protocol {0} to run tftp download'.format(ip_protocol)) req_data = "" return ERR ret, _, rsq_data = ops_conn.create(uri, req_data) if ret != httplib.OK: logging.error('Failed to download file "%s" using TFTP' % os.path.basename(local_path)) logging.error(rsq_data) return ERR return OK def get_server_ip(ops_conn, url, ip_protocol): url_tuple = urlparse(url) if ip_protocol.lower() == "ipv4": if re.match(r"\d+\.\d+\.\d+\.\d+", url_tuple.hostname): server_ip = url_tuple.hostname else: server_ip = get_addr_by_hostname(ops_conn, url_tuple.hostname) else: idx = url_tuple.netloc.find('@') server_ip = url_tuple.netloc[idx + 1:] return server_ip def download_file(ops_conn, url, local_path, retry_times = 0, vpn_Instance = "", ip_protocol = "IPv4"): """Download file, support TFTP, FTP, SFTP. tftp://hostname/path ftp://[username[:password]@]hostname[:port]/path sftp://[username[:password]@]hostname[:port]/path Args: ops_conn: OPS connection instance url: URL of remote file local_path: local path to put the file Returns: A integer of return code """ url_tuple = urlparse(url) logging.info("Info: Download %s to %s" % (url_tuple.path[1:], local_path)) func_dict = {'tftp': _tftp_download_file, 'ftp': _ftp_download_file, 'sftp': _sftp_download_file} scheme = url_tuple.scheme if scheme not in func_dict.keys(): raise ZTPErr('Unknown file transfer scheme %s' % scheme) ret = OK cnt = 0 while (cnt < 1 + retry_times): if cnt: logging.info('Retry downloading...') ret = func_dict[scheme](ops_conn, url, local_path, vpn_Instance, ip_protocol) if ret is OK: break cnt += 1 if ret is not OK: raise ZTPErr('Failed to download file "%s"' % os.path.basename(url)) return OK class StartupInfo(object): """Startup configuration information image: startup system software config: startup saved-configuration file patch: startup patch package """ def __init__(self, image = None, config = None, patch = None): self.image = image self.config = config self.patch = patch class Startup(object): """Startup configuration information current: current startup configuration next: current next startup configuration """ def __init__(self, ops_conn): self.ops_conn = ops_conn self.current, self.next = self._get_startup_info() def _get_startup_info(self): """Get the startup information.""" logging.info("Get the startup information...") uri = "/cfg/startupInfos/startupInfo" req_'<?xml version="1.0" encoding="UTF-8"?> <startupInfo> <position/> <configedSysSoft/> <curSysSoft/> <nextSysSoft/> <curStartupFile/> <nextStartupFile/> <curPatchFile/> <nextPatchFile/> </startupInfo>''' cnt = 0 while (cnt < MAX_TIMES_GET_STARTUP): ret, _, rsp_data = self.ops_conn.get(uri, req_data) if ret != httplib.OK or rsp_data is '': cnt += 1 logging.warning('Failed to get the startup information') continue root_elem = etree.fromstring(rsp_data) namespaces = {'vrp' : 'http://www.huawei.com/netconf/vrp'} mpath = 'data' + uri.replace('/', '/vrp:') # match path nslen = len(namespaces['vrp']) elem = root_elem.find(mpath, namespaces) if elem is not None: break logging.warning('No query result while getting startup info') sleep(GET_STARTUP_INTERVAL) # sleep to wait for system ready when no query result cnt += 1 if elem is None: raise OPIExecError('Failed to get the startup information') current = StartupInfo() # current startup info curnext = StartupInfo() # next startup info for child in elem: tag = child.tag[nslen + 2:] # skip the namespace, '{namespace}text' if tag == 'curSysSoft': current.image = child.text elif tag == 'nextSysSoft': curnext.image = child.text elif tag == 'curStartupFile' and child.text != 'NULL': current.config = child.text elif tag == 'nextStartupFile' and child.text != 'NULL': curnext.config = child.text elif tag == 'curPatchFile' and child.text != 'NULL': current.patch = child.text elif tag == 'nextPatchFile' and child.text != 'NULL': curnext.patch = child.text else: continue return current, curnext def _set_startup_image_file(self, file_path, slave): """Set the next startup system software""" logging.info("Set the next startup system software to %s...", file_path) uri = "/sum/startupbymode" str_temp = string.Template( '''<?xml version="1.0" encoding="UTF-8"?> <startupbymode> <softwareName>$fileName</softwareName> <mode>$startupMode</mode> </startupbymode> ''') if slave: startup_mode = 'STARTUP_MODE_ALL' else: startup_mode = 'STARTUP_MODE_PRIMARY' req_data = str_temp.substitute(fileName = file_path, startupMode = startup_mode) # it is a action operation, so use create for HTTP POST ret, _, _ = self.ops_conn.create(uri, req_data) if ret != httplib.OK: raise OPIExecError("Failed to set startup system software") write_security_log_by_ops("Succeed to set startup system software to {0}".format(file_path)) def _set_startup_config_file(self, file_path): """Set the next startup saved-configuration file""" logging.info("Set the next startup saved-configuration file to %s...", file_path) uri = "/cfg/setStartup" str_temp = string.Template( '''<?xml version="1.0" encoding="UTF-8"?> <setStartup> <fileName>$fileName</fileName> </setStartup> ''') req_data = str_temp.substitute(fileName = file_path) # it is a action operation, so use create for HTTP POST ret, _, _ = self.ops_conn.create(uri, req_data) if ret != httplib.OK: raise OPIExecError("Failed to set startup configuration file") write_security_log_by_ops("Succeed to set startup saved-configuration file to {0}".format(file_path)) def _del_startup_config_file(self): """Delete startup config file""" logging.info("Delete the next startup config file...") uri = "/cfg/clearStartup" req_data = '''<?xml version="1.0" encoding="UTF-8"?> <clearStartup> </clearStartup> ''' # it is a action operation, so use create for HTTP POST ret, _, _ = self.ops_conn.create(uri, req_data) if ret != httplib.OK: raise OPIExecError("Failed to delete startup configuration file") def _set_startup_patch_file(self, file_path): """Set the next startup patch file""" logging.info("Set the next startup patch file to %s...", file_path) uri = "/patch/startup" str_temp = string.Template( '''<?xml version="1.0" encoding="UTF-8"?> <startup> <packageName>$fileName</packageName> </startup> ''') req_data = str_temp.substitute(fileName = file_path) # it is a action operation, so use create for HTTP POST ret, _, _ = self.ops_conn.create(uri, req_data) if ret != httplib.OK: raise OPIExecError("Failed to set startup patch file") write_security_log_by_ops("Succeed to set startup patch software to {0}".format(file_path)) def _reset_startup_patch_file(self): """Rest patch file for system to startup""" logging.info("Reset the next startup patch file...") uri = "/patch/resetpatch" req_data = '''<?xml version="1.0" encoding="UTF-8"?> <resetpatch> </resetpatch> ''' # it is a action operation, so use create for HTTP POST ret, _, _ = self.ops_conn.create(uri, req_data) if ret != httplib.OK: raise OPIExecError('Failed to reset patch') def _check_startup_image_file(self, image_file, slave): logging.info("Check the startup image information...") if slave: check_time = MAX_TIMES_CHECK_STARTUPSOFT_SLAVE else: check_time = MAX_TIMES_CHECK_STARTUPSOFT uri = "/cfg/startupInfos/startupInfo" req_data = '''<?xml version="1.0" encoding="UTF-8"?> <startupInfo> <position/> <configedSysSoft/> <curSysSoft/> <nextSysSoft/> <curStartupFile/> <nextStartupFile/> <curPatchFile/> <nextPatchFile/> </startupInfo>''' cnt = 0 while (cnt < check_time): sys.stdout.write(".") ret, _, rsp_data = self.ops_conn.get(uri, req_data) if ret != httplib.OK or rsp_data is '': cnt += 1 logging.warning('Failed to get the startup image information') sleep(CHECK_CHECK_STARTUPSOFT_INTERVAL) # sleep to wait for system ready when no query result continue root_elem = etree.fromstring(rsp_data) namespaces = {'vrp' : 'http://www.huawei.com/netconf/vrp'} mpath = 'data' + uri.replace('/', '/vrp:') # match path nslen = len(namespaces['vrp']) elem = root_elem.find(mpath, namespaces) if elem is not None: for child in elem: tag = child.tag[nslen + 2:] # skip the namespace, '{namespace}text' if tag == 'nextSysSoft': if child.text == image_file: sleep(5) return True break sleep(CHECK_CHECK_STARTUPSOFT_INTERVAL) # sleep to wait for system ready when no query result cnt += 1 logging.warning('System software is not ready.') return False def _check_startup_patch_file(self, patch_file, slave): logging.info("Check the startup patch information...") if slave: check_time = MAX_TIMES_CHECK_STARTUPSOFT_SLAVE else: check_time = MAX_TIMES_CHECK_STARTUPSOFT uri = "/cfg/startupInfos/startupInfo" req_data = '''<?xml version="1.0" encoding="UTF-8"?> <startupInfo> <position/> <configedSysSoft/> <curSysSoft/> <nextSysSoft/> <curStartupFile/> <nextStartupFile/> <curPatchFile/> <nextPatchFile/> </startupInfo>''' cnt = 0 while (cnt < check_time): ret, _, rsp_data = self.ops_conn.get(uri, req_data) if ret != httplib.OK or rsp_data is '': cnt += 1 logging.warning('Failed to get the startup patch information') sleep(CHECK_CHECK_STARTUPSOFT_INTERVAL) # sleep to wait for system ready when no query result continue root_elem = etree.fromstring(rsp_data) namespaces = {'vrp': 'http://www.huawei.com/netconf/vrp'} mpath = 'data' + uri.replace('/', '/vrp:') # match path nslen = len(namespaces['vrp']) elem = root_elem.find(mpath, namespaces) if elem is not None: for child in elem: tag = child.tag[nslen + 2:] # skip the namespace, '{namespace}text' if tag == 'nextPatchFile': if child.text == patch_file: sleep(5) logging.info("System patch check ok") return True break sleep(CHECK_CHECK_STARTUPSOFT_INTERVAL) # sleep to wait for system ready when no query result cnt += 1 logging.warning('System patch is not ready.') return False def reset_startup_info(self, slave): """Reset startup info and delete the downloaded files""" logging.info("Reset the next startup information...") _, configured = self._get_startup_info() # 1. Reset next startup config file and delete it try: if configured.config != self.next.config: if self.next.config is None: self._del_startup_config_file() else: self._set_startup_config_file(self.next.config) if configured.config is not None: del_file_all(self.ops_conn, configured.config, slave) except Exception, reason: logging.error(reason) # 2. Reset next startup patch file try: if configured.patch != self.next.patch: if self.next.patch is None: self._reset_startup_patch_file() else: self._set_startup_patch_file(self.next.patch) if configured.patch is not None: sleep(2) del_file_all(self.ops_conn, configured.patch, slave) except Exception, reason: logging.error(reason) # 3. Reset next startup system software and delete it try: if configured.image != self.next.image: self._set_startup_image_file(self.next.image, slave) sleep(2) del_file_all(self.ops_conn, configured.image, slave) except Exception, reason: logging.error(reason) def set_startup_info(self, image_file, config_file, patch_file, slave, esn_str): """Set the next startup information.""" logging.info("Set the next startup information...") # 1. Set next startup system software if image_file is not None: logging.info("Info: Set the next software '%s', please wait for a moment" % (image_file)), try: self._set_startup_image_file(image_file, slave) if self._check_startup_image_file(image_file, slave) is not True: raise OPIExecError('Failed to check the image file') except Exception, reason: logging.error(reason) del_file_all(self.ops_conn, image_file, slave) self.reset_startup_info(slave) raise # 2. Set next startup config file if config_file is not None: logging.info("Info: Set the next config file '%s' " % (config_file)) cnt = 0 while (cnt < 3): try: self._set_startup_config_file(config_file) break except Exception, reason: logging.error(reason) if(cnt < 2): cnt += 1 logging.warning("Retry...") sleep(3) continue del_file_all(self.ops_conn, config_file, slave) self.reset_startup_info(slave) cnt += 1 raise # 3. Set next startup patch file if patch_file is not None: logging.info("Info: Set the next patch file '%s' " % (patch_file)) try: self._set_startup_patch_file(patch_file) if self._check_startup_patch_file(patch_file, slave) is not True: raise OPIExecError('Failed to check the patch file') except Exception, reason: logging.error(reason) del_file_all(self.ops_conn, patch_file, slave) self.reset_startup_info(slave) raise def get_cwd(ops_conn): """Get the full filename of the current working directory""" logging.info("Get the current working directory...") uri = "/vfm/pwds/pwd" req_data = \ '''<?xml version="1.0" encoding="UTF-8"?> <pwd> <dictionaryName/> </pwd> ''' ret, _, rsp_data = ops_conn.get(uri, req_data) if ret != httplib.OK or rsp_data is '': raise OPIExecError('Failed to get the current working directory') root_elem = etree.fromstring(rsp_data) namespaces = {'vrp' : 'http://www.huawei.com/netconf/vrp'} uri = 'data' + uri.replace('/', '/vrp:') + '/vrp:' elem = root_elem.find(uri + "dictionaryName", namespaces) if elem is None: raise OPIExecError('Failed to get the current working directory for no "directoryName" element') return elem.text def file_exist(ops_conn, file_path, dir_path = None): """Returns True if file_path refers to an existing file, otherwise returns False""" uri = "/vfm/dirs/dir" str_temp_1 = string.Template( '''<?xml version="1.0" encoding="UTF-8"?> <dir> <fileName>$fileName</fileName> </dir> ''') str_temp_2 = string.Template( '''<?xml version="1.0" encoding="UTF-8"?> <dir> <dirName>$dirName</dirName> <fileName>$fileName</fileName> </dir> ''') if dir_path: req_data = str_temp_2.substitute(dirName = dir_path, fileName = file_path) else: req_data = str_temp_1.substitute(fileName = file_path) ret, _, rsp_data = ops_conn.get(uri, req_data) if ret != httplib.OK or rsp_data is '': raise OPIExecError('Failed to list information about the file "%s"' % file_path) root_elem = etree.fromstring(rsp_data) namespaces = {'vrp' : 'http://www.huawei.com/netconf/vrp'} uri = 'data' + uri.replace('/', '/vrp:') + '/vrp:' elem = root_elem.find(uri + "fileName", namespaces) if elem is None: return False return True def del_file(ops_conn, file_path): """Delete a file permanently""" if file_path is None or file_path is '': return logging.info("Delete file %s permanently", file_path) uri = "/vfm/deleteFileUnRes" str_temp = string.Template( '''<?xml version="1.0" encoding="UTF-8"?> <deleteFileUnRes> <fileName>$filePath</fileName> </deleteFileUnRes> ''') req_data = str_temp.substitute(filePath = file_path) try: # it is a action operation, so use create for HTTP POST ret, _, _ = ops_conn.create(uri, req_data) if ret != httplib.OK: raise OPIExecError('Failed to delete the file "%s" permanently' % file_path) except Exception, reason: logging.error(reason) def del_file_all(ops_conn, file_path, slave): """Delete a file permanently on all main boards""" if file_path: del_file(ops_conn, file_path) if slave: del_file(ops_conn, 'slave#' + file_path) def copy_file(ops_conn, src_path, dest_path): """Copy a file""" logging.info('Copy file %s to %s...', src_path, dest_path) if "slave" in dest_path: file_name = dest_path.split(":/")[1] if file_exist(ops_conn, file_name, "slave#cfcard:"): logging.info("Detect dest file exist, delete it first") del_file(ops_conn, dest_path) uri = "/vfm/copyFile" str_temp = string.Template( '''<?xml version="1.0" encoding="UTF-8"?> <copyFile> <srcFileName>$src</srcFileName> <desFileName>$dest</desFileName> </copyFile> ''') req_data = str_temp.substitute(src = src_path, dest = dest_path) # it is a action operation, so use create for HTTP POST ret, _, _ = ops_conn.create(uri, req_data) if ret != httplib.OK: file_name = dest_path.split(":/")[1] if file_exist(ops_conn, file_name, "slave#cfcard:"): logging.info("Exists file copy fragment, delete it") del_file(ops_conn, dest_path) raise OPIExecError('Failed to copy "%s" to "%s"' % (src_path, dest_path)) def has_slave_mpu(ops_conn): """Whether device has slave MPU, returns a bool value""" logging.info("Test whether device has slave MPU...") uri = "/devm/phyEntitys" req_data = \ '''<?xml version="1.0" encoding="UTF-8"?> <phyEntitys> <phyEntity> <entClass>mpuModule</entClass> <entStandbyState/> <position/> </phyEntity> </phyEntitys> ''' ret, _, rsp_data = ops_conn.get(uri, req_data) if ret != httplib.OK or rsp_data is '': raise OPIExecError('Failed to get the device slave information') root_elem = etree.fromstring(rsp_data) namespaces = {'vrp' : 'http://www.huawei.com/netconf/vrp'} uri = 'data' + uri.replace('/', '/vrp:') + '/vrp:' for entity in root_elem.findall(uri + 'phyEntity', namespaces): elem = entity.find("vrp:entStandbyState", namespaces) if elem is not None and elem.text == 'slave': return True return False def get_system_info(ops_conn): """Get system info, returns a dict""" logging.info("Get the system information...") uri = "/system/systemInfo" req_data = \ '''<?xml version="1.0" encoding="UTF-8"?> <systemInfo> <productName/> <esn/> <mac/> </systemInfo> ''' ret, _, rsp_data = ops_conn.get(uri, req_data) if ret != httplib.OK or rsp_data is '': raise OPIExecError('Failed to get the system information') sys_info = {}.fromkeys(('productName', 'esn', 'mac')) root_elem = etree.fromstring(rsp_data) namespaces = {'vrp' : 'http://www.huawei.com/netconf/vrp'} uri = 'data' + uri.replace('/', '/vrp:') nslen = len(namespaces['vrp']) elem = root_elem.find(uri, namespaces) if elem is not None: for child in elem: tag = child.tag[nslen + 2:] # skip the namespace, '{namespace}esn' if tag in sys_info.keys(): sys_info[tag] = child.text return sys_info def test_file_paths(image, config, patch, md5_file): """Test whether argument paths are valid.""" logging.info("Test whether argument paths are valid...") # check image file path file_name = os.path.basename(image) if file_name is not '' and not file_name.lower().endswith('.cc'): logging.error('Error: Invalid filename extension of system software') return False # check config file path file_name = os.path.basename(config) file_name = file_name.lower() _, ext = os.path.splitext(file_name) if file_name is not '' and ext not in ['.cfg', '.zip', '.dat']: logging.error('Error: Invalid filename extension of configuration file') return False # check patch file path file_name = os.path.basename(patch) if file_name is not '' and not file_name.lower().endswith('.pat'): logging.error('Error: Invalid filename extension of patch file') return False # check md5 file path file_name = os.path.basename(md5_file) if file_name is not '' and not file_name.lower().endswith('.txt'): logging.error('Error: Invalid filename extension of md5 file') return False return True def md5sum(fname, need_skip_first_line = False): """ Calculate md5 num for this file. """ def read_chunks(fhdl): '''read chunks''' chunk = fhdl.read(8096) while chunk: yield chunk chunk = fhdl.read(8096) else: fhdl.seek(0) md5_obj = hashlib.md5() if isinstance(fname, basestring): with open(fname, "rb") as fhdl: #skip the first line fhdl.seek(0) if need_skip_first_line: fhdl.readline() for chunk in read_chunks(fhdl): md5_obj.update(chunk) elif fname.__class__.__name__ in ["StringIO", "StringO"] or isinstance(fname, file): for chunk in read_chunks(fname): md5_obj.update(chunk) else: pass return md5_obj.hexdigest() def md5_get_from_file(fname): """Get md5 num form file, stored in first line""" with open(fname, "rb") as fhdl: fhdl.seek(0) line_first = fhdl.readline() # if not match pattern, the format of this file is not supported if not re.match('^#md5sum="[\\w]{32}"[\r\n]+$', line_first): return 'None' return line_first[9:41] def md5_check_with_first_line(fname): """Validate md5 for this file""" work_fname = os.path.join("ztp", fname) md5_calc = md5sum(work_fname, True) md5_file = md5_get_from_file(work_fname) if md5_file.lower() != md5_calc: logging.warning('MD5 check failed, file %s', fname) logging.warning('MD5 checksum of the file "%s" is %s', fname, md5_calc) logging.warning('MD5 checksum received from the file "%s" is %s', fname, md5_file) return False return True def md5_check_with_dic(md5_dic, fname): """md5 check with dic""" if not md5_dic.has_key(fname): logging.info('md5_dic does not has key %s, no need to do md5 verification', fname) return True md5sum_result = md5sum(fname, False) if md5_dic[fname].lower() == md5sum_result: return True logging.warning('MD5 check failed, file %s', fname) logging.warning('MD5 checksum of the file "%s" is %s', fname, md5sum_result) logging.warning('MD5 checksum received for the file "%s" is %s', fname, md5_dic[fname]) return False def parse_md5_file(fname): """parse md5 file""" def read_line(fhdl): """read a line by loop""" line = fhdl.readline() while line: yield line line = fhdl.readline() else: fhdl.seek(0) md5_dic = {} work_fname = os.path.join("ztp", fname) with open(work_fname, "rb") as fhdl: for line in read_line(fhdl): line_spilt = line.split() if 2 != len(line_spilt): continue dic_tmp = {line_spilt[0]: line_spilt[1]} md5_dic.update(dic_tmp) return md5_dic def verify_and_parse_md5_file(fname): """ vefiry data integrity of md5 file and parse this file format of this file is like: ------------------------------------------------------------------ #md5sum="517cf194e2e1960429c6aedc0e4dba37" file-name md5 conf_5618642831132.cfg c0ace0f0542950beaacb39cd1c3b5716 ------------------------------------------------------------------ """ if not md5_check_with_first_line(fname): return ERR, None return OK, parse_md5_file(fname) def check_parameter(aset): seq = ['&', '>', '<', '"', "'"] if aset: for c in seq: if c in aset: return True return False def check_filename(ops_conn): sys_info = get_system_info(ops_conn) url_tuple = urlparse(FILE_SERVER) if check_parameter(url_tuple.username) or check_parameter(url_tuple.password): raise ZTPErr('Invalid username or password, the name should not contain: '+'&'+' >'+' <'+' "'+" '.") file_name = os.path.basename(REMOTE_PATH_IMAGE.get(sys_info['productName'], '')) if file_name is not '' and check_parameter(file_name): raise ZTPErr('Invalid filename of system software, the name should not contain: '+'&'+' >'+' <'+' "'+" '.") file_name = os.path.basename(REMOTE_PATH_CONFIG) if file_name is not '' and check_parameter(file_name): raise ZTPErr('Invalid filename of configuration file, the name should not contain: '+'&'+' >'+' <'+' "'+" '.") file_name = os.path.basename(REMOTE_PATH_PATCH.get(sys_info['productName'], '')) if file_name is not '' and check_parameter(file_name): raise ZTPErr('Invalid filename of patch file, the name should not contain: '+'&'+' >'+' <'+' "'+" '.") file_name = os.path.basename(REMOTE_PATH_MD5) if file_name is not '' and check_parameter(file_name): raise ZTPErr('Invalid filename of md5 file, the name should not contain: '+'&'+' >'+' <'+' "'+" '.") return OK def main_proc(ops_conn, vpn_Instance, ip_protocol): """Main processing""" sys_info = get_system_info(ops_conn) # Get system info, such as esn and system mac cwd = get_cwd(ops_conn) # Get the current working directory startup = Startup(ops_conn) slave = has_slave_mpu(ops_conn) # Check whether slave MPU board exists or not chg_flag = False check_filename(ops_conn) # check remote file paths if not test_file_paths(REMOTE_PATH_IMAGE.get(sys_info['productName'], ''), REMOTE_PATH_CONFIG, REMOTE_PATH_PATCH.get(sys_info['productName'], ''), REMOTE_PATH_MD5): return ERR # download md5 file first, used to verify data integrity of files which will be downloaded next local_path_md5 = None file_path = REMOTE_PATH_MD5 if not file_path.startswith('/'): file_path = '/' + file_path file_name = os.path.basename(file_path) if file_name is not '': url = FILE_SERVER + file_path local_path_md5 = os.path.join(cwd, "ztp", file_name) ret = download_file(ops_conn, url, local_path_md5, MAX_TIMES_RETRY_DOWNLOAD, vpn_Instance, ip_protocol) logging.info("download md5 ret={}".format(ret)) if ret is ERR: logging.error('Error: Failed to download MD5 file "%s"' % file_name) return ERR logging.info('Info: Download MD5 file successfully') ret, md5_dic = verify_and_parse_md5_file(file_name) # delete the file immediately os.remove(os.path.join("ztp", file_name)) if ret is ERR: logging.error('Error: MD5 check failed, file "%s"' % file_name) return ERR else: md5_dic = {} # download configuration file local_path_config = None file_path = REMOTE_PATH_CONFIG if "%s" in file_path: file_path = REMOTE_PATH_CONFIG % sys_info['esn'] if not file_path.startswith('/'): file_path = '/' + file_path file_name = os.path.basename(file_path) if file_name is not '': url = FILE_SERVER + file_path local_path_config = cwd + file_name ret = download_file(ops_conn, url, local_path_config, MAX_TIMES_RETRY_DOWNLOAD, vpn_Instance, ip_protocol) if ret is ERR or not file_exist(ops_conn, file_name): logging.error('Error: Failed to download configuration file "%s"' % file_name) del_file_all(ops_conn, local_path_config, slave) return ERR logging.info('Info: Download configuration file successfully') if not md5_check_with_dic(md5_dic, file_name): logging.error('Error: MD5 check failed, file "%s"' % file_name) del_file_all(ops_conn, local_path_config, slave) return ERR if slave: copy_file(ops_conn, local_path_config, 'slave#' + local_path_config) chg_flag = True # download patch file local_path_patch = None file_path = REMOTE_PATH_PATCH.get(sys_info['productName'], '') if not file_path.startswith('/'): file_path = '/' + file_path file_name = os.path.basename(file_path) if startup.current.patch: cur_pat = os.path.basename(startup.current.patch).lower() else: cur_pat = '' if file_name is not '' and file_name.lower() != cur_pat: url = FILE_SERVER + file_path local_path_patch = cwd + file_name ret = download_file(ops_conn, url, local_path_patch, MAX_TIMES_RETRY_DOWNLOAD, vpn_Instance, ip_protocol) if ret is ERR or not file_exist(ops_conn, file_name): logging.error('Error: Failed to download patch file "%s"' % file_name) del_file_all(ops_conn, local_path_config, slave) del_file_all(ops_conn, local_path_patch, slave) return ERR logging.info('Info: Download patch file successfully') if not md5_check_with_dic(md5_dic, file_name): logging.error('Error: MD5 check failed, file "%s"' % file_name) del_file_all(ops_conn, local_path_config, slave) del_file_all(ops_conn, local_path_patch, slave) return ERR if slave: copy_file(ops_conn, local_path_patch, 'slave#' + local_path_patch) chg_flag = True # download system software local_path_image = None file_path = REMOTE_PATH_IMAGE.get(sys_info['productName'], '') if not file_path.startswith('/'): file_path = '/' + file_path file_name = os.path.basename(file_path) if startup.current.image: cur_image = os.path.basename(startup.current.image).lower() else: cur_image = '' if file_name is not '' and file_name.lower() != cur_image: url = FILE_SERVER + file_path local_path_image = cwd + file_name ret = download_file(ops_conn, url, local_path_image, MAX_TIMES_RETRY_DOWNLOAD, vpn_Instance, ip_protocol) if ret is ERR or not file_exist(ops_conn, file_name): if file_exist(ops_conn, file_name): del_file_all(ops_conn, local_path_image, slave) logging.error('Error: Failed to download system software "%s"' % file_name) del_file_all(ops_conn, local_path_config, slave) del_file_all(ops_conn, local_path_patch, slave) del_file_all(ops_conn, local_path_image, slave) return ERR logging.info('Info: Download system software file successfully') if not md5_check_with_dic(md5_dic, file_name): logging.error('Error: MD5 check failed, file "%s"' % file_name) del_file_all(ops_conn, local_path_config, slave) del_file_all(ops_conn, local_path_patch, slave) del_file_all(ops_conn, local_path_image, slave) return ERR if slave: copy_file(ops_conn, local_path_image, 'slave#' + local_path_image) chg_flag = True if chg_flag is False: return ERR # set startup info startup.set_startup_info(local_path_image, local_path_config, local_path_patch, slave, sys_info['esn']) return OK def main(vpn_Instance = '', ip_protocol = 'IPv4'): """The main function of user script. It is called by ZTP frame, so do not remove or change this function. Args: Raises: Returns: user script processing result """ host = "localhost" try: # Make an OPS connection instance. ops_conn = OPSConnection(host) ret = main_proc(ops_conn, vpn_Instance, ip_protocol) except OPIExecError, reason: logging.error('OPI execute error: %s', reason) ret = ERR except ZTPErr, reason: logging.error('ZTP error: %s', reason) ret = ERR except IOError, reason: logging.error("Error: %s" % reason) ret = ERR except Exception, reason: logging.error(reason) traceinfo = traceback.format_exc() logging.debug(traceinfo) ret = ERR finally: # Close the OPS connection ops_conn.close() return ret if __name__ == "__main__": main()
Python脚本文件解释
加粗的内容表示用户可以修改,请根据实际运行环境进行配置。
指定该脚本文件的MD5校验码。
#md5sum="5bb3da014aa88ba8111c2f1191373716"
用户可以通过该MD5校验码对下载的脚本文件进行完整性检测。
用户可以使用MD5计算工具(如md5sum)生成脚本文件的MD5校验码。生成MD5校验码时,文件中不能包含“#md5sum=”字段。请在生成MD5校验码后再将“#md5sum=”写入文件开头。
指定文件获取方式。
FILE_SERVER = 'ftp://ftpuser:Pwd123@10.1.3.2'
用户可以选择从tftp/ftp/sftp服务器获取版本文件:
- tftp://hostname
- ftp://[username[:password]@]hostname[:port]
- sftp://[username[:password]@]hostname[:port]
其中username、password、port参数为可选项。
指定系统软件的路径及文件名。
REMOTE_PATH_IMAGE = { 'NE40E' : '/image/V800R011C10B092-NE40E.cc',}
“NE40E”指设备的型号。
“/image/V800R011C10B092-NE40E.cc”为该型号设备获取的系统软件的路径及文件名。
如果不需要加载系统软件,可以设置值为空,例如:REMOTE_PATH_IMAGE = { 'NE40E' : '', }
指定配置文件的路径及文件名。
REMOTE_PATH_CONFIG = '/conf_%s.cfg'
“%s”表示设备的序列号,不可编辑。设备通过序列号信息获取对应的配置文件。
指定补丁文件的路径及文件名。
REMOTE_PATH_PATCH = { 'NE40E' : '/patch/NE40EV800R011C10SPH001.PAT',}
“NE40E”指设备的型号。
“/patch/NE40EV800R011C10SPH001.PAT”为该型号设备获取的补丁软件的路径及文件名。
如果不需要加载补丁文件,可以设置值为空,例如:REMOTE_PATH_PATCH = {}
指定MD5校验文件的路径及文件名。
REMOTE_PATH_MD5 = '/md5.txt'
用户可以通过MD5校验文件对设备下载的文件进行完整性检测。
MD5校验文件格式请参见版本文件的完整性校验。
如果不需要对下载的文件进行校验,可以将该值设置为空:''。
定义获取设备启动信息的间隔。
GET_STARTUP_INTERVAL = 15
获取设备启动信息的时间间隔。
定义获取设备启动信息的重试次数。
MAX_TIMES_GET_STARTUP = 120
获取启动信息失败时,最大重试次数。
定义检查系统软件是否设置成功的间隔。
CHECK_CHECK_STARTUPSOFT_INTERVAL = 5
检查系统软件是否设置成功的时间间隔。
定义单主控设备检查系统软件是否设置成功的重试次数。
MAX_TIMES_CHECK_STARTUPSOFT = 205
单主控设备检查系统软件设置失败时,最大重试次数。
定义双主控设备检查系统软件是否设置成功的重试次数。
MAX_TIMES_CHECK_STARTUPSOFT_SLAVE = 265
双主控设备检查系统软件设置失败时,最大重试次数。
定义下载重试次数。
MAX_TIMES_RETRY_DOWNLOAD = 3
下载文件时的重试次数。
创建OPS连接。
class OPSConnection()
此模块不需要用户编辑。
定义HTTP头。
def __init__()
此模块不需要用户编辑。
封装OPS连接。
self.conn = httplib.HTTPConnection()
此模块不需要用户编辑。
关闭OPS连接。
def close()
此模块不需要用户编辑。
增加设备资源操作。
def create()
此模块不需要用户编辑。
删除设备资源操作。
def delete()
此模块不需要用户编辑。
获取设备资源操作。
def get()
此模块不需要用户编辑。
设置设备资源操作。
def set()
此模块不需要用户编辑。
定义请求为REST风格。
def _rest_call()
此模块不需要用户编辑。
打印debug日志。
logging.debug()
此模块不需要用户编辑。
OPS进程执行异常。
class OPIExecError()
此模块不需要用户编辑。
自动部署执行异常。
class ZTPErr()
此模块不需要用户编辑。
域名解析。
def get_addr_by_hostname()
此模块不需要用户编辑。
定义FTP方式下载文件。
def _ftp_download_file()
此模块不需要用户编辑。
SFTP下载失败后清除RSA密钥。
def _del_rsa_peer_key()
此模块不需要用户编辑。
SSH下载失败后,清除SSH的服务器地址和RSA密钥。
def _del_sshc_rsa_key()
此模块不需要用户编辑。
设置SSH客户端首次访问认证用户属性。
def _set_sshc_first_time()
此模块不需要用户编辑。
定义SFTP方式下载文件。
def _sftp_download_file()
此模块不需要用户编辑。
定义TFTP方式下载文件。
def _tftp_download_file()
此模块不需要用户编辑。
定义文件下载参数。
def download_file()
此模块不需要用户编辑。
获取启动信息、启动设备。
class StartupInfo()
class Startup()
此模块不需要用户编辑。
获取用户工作目录。
def get_cwd()
此模块不需要用户编辑。
检查需下载的文件是否存在。
def file_exist()
此模块不需要用户编辑。
操作失败后删除文件。
def del_file()
文件加载失败后,要删去设备上加载失败的文件,使设备回归到启用ZTP前的状态,方便后续操作。
此模块不需要用户编辑。
复制文件。
def copy_file()
此模块不需要用户编辑。
判断设备是否有备用主控板。
def has_slave_mpu()
此模块不需要用户编辑。
获取设备的系统信息。
def get_system_info()
此模块不需要用户编辑。
检查文件路径是否合法。
def test_file_paths()
此模块不需要用户编辑。
对文件进行MD5校验。
def md5sum()
def md5_get_from_file()
def md5_check_with_first_line()
def md5_check_with_dic()
def parse_md5_file()
def verify_and_parse_md5_file()
此模块不需要用户编辑。
检查用户名、密码、文件名是否有特殊字符。
def check_parameter()
def check_filename()
此模块不需要用户编辑。
定义设备上电自动部署功能的总流程。
def main_proc()
def main()
if __name__ == "__main__": main()
此模块不需要用户编辑。
main函数要求必须有,否则脚本无法运行。