No relevant resource is found in the selected language.

This site uses cookies. By continuing to browse the site you are agreeing to our use of cookies. Read our privacy policy>Search

Reminder

To have a better experience, please upgrade your IE browser.

upgrade

NE40E V800R010C10SPC500 Feature Description - Basic Configurations 01

This is NE40E V800R010C10SPC500 Feature Description - Basic Configurations
Rate and give feedback:
Huawei uses machine translation combined with human proofreading to translate this document to different languages in order to help you better understand the content of this document. Note: Even the most advanced machine translation cannot match the quality of professional translators. Huawei shall not bear any responsibility for translation accuracy and it is recommended that you refer to the English document (a link for which has been provided).
Intermediate File in the Python Format

Intermediate File in the Python Format

ZTP supports Python script intermediate files that store device and version file information. A ZTP-enabled device can execute the Python script to download version files.

A Python script intermediate file must be suffixed with .py. The file format is shown in Python Script File Example. For details about the fields in a Python script file, see Fields in a Python Script File.

Python Script File Example

NOTE:

The following Python script file is only an example and needs to be modified based on deployment requirements.

#md5sum="5bb3da014aa88ba8111c2f1191373716"
#!/usr/bin/env python
#
# Copyright (C) Huawei Technologies Co., Ltd. 2008-2013. All rights reserved.
# ----------------------------------------------------------------------------------------------------------------------
# History:
# Date                Author                      Modification
# 20180122            Author                      created file.
# ----------------------------------------------------------------------------------------------------------------------

"""
Zero Touch Provisioning (ZTP) enables devices to automatically load version files including system software,
patch files, configuration files when the device starts up, the devices to be configured must be new devices
or have no configuration files.

This is a sample of Zero Touch Provisioning user script. You can customize it to meet the requirements of
your network environment.
"""

import httplib
import urllib
import string
import re
import sys
import xml.etree.ElementTree as etree
import os
import stat
import logging
import traceback
import hashlib

from urlparse import urlparse
from urlparse import urlunparse
from time import sleep

# error code
OK          = 0
ERR         = 1

# File server in which stores the necessary system software, configuration and patch files:
#   1) Specify the file server which supports the following format.
#      tftp://hostname
#      ftp://[username[:password]@]hostname[:port]
#      sftp://[username[:password]@]hostname[:port]
#   2) Do not add a trailing slash at the end of file server path.
FILE_SERVER = 'ftp://ftpuser:Pwd123@10.1.3.2'

# Remote file paths:
#   1) The path may include directory name and file name.
#   2) If file name is not specified, indicate the procedure can be skipped.
# File paths of system software on file server, filename extension is '.cc'.
REMOTE_PATH_IMAGE = {
  'NE40E'  : '/image/V800R010C10B092-NE40E.cc',}
# File path of configuration file on file server, filename extension is '.cfg', '.zip' or '.dat'.
REMOTE_PATH_CONFIG = '/conf_%s.cfg'
# File path of patch file on file server, filename extension is '.pat'
REMOTE_PATH_PATCH = {
    'NE40E'    :   '/patch/NE40EV800R010C10SPH001.PAT',}
# File path of md5 file, contains md5 value of image / patch / memid / license file, file extension is '.txt'
REMOTE_PATH_MD5 = '/md5.txt'
# Max times to retry get startup when no query result
GET_STARTUP_INTERVAL = 15    # seconds
MAX_TIMES_GET_STARTUP = 120   # Max times to retry
CHECK_CHECK_STARTUPSOFT_INTERVAL = 5    # seconds
MAX_TIMES_CHECK_STARTUPSOFT = 205    # Max times to check startup-system 
MAX_TIMES_CHECK_STARTUPSOFT_SLAVE = 265    # Max times to check startup-system if there is slave board

# Max times to retry when download file faild
MAX_TIMES_RETRY_DOWNLOAD = 3


class OPSConnection(object):
    """Make an OPS connection instance."""

    def __init__(self, host, port = 80):
        self.host = host
        self.port = port
        self.headers = {
            "Content-type": "application/xml",
            "Accept":       "application/xml"
            }

        self.conn = httplib.HTTPConnection(self.host, self.port)

    def close(self):
        """Close the connection"""
        self.conn.close()

    def create(self, uri, req_data):
        """Create a resource on the server"""
        ret = self._rest_call("POST", uri, req_data)
        return ret

    def delete(self, uri, req_data):
        """Delete a resource on the server"""
        ret = self._rest_call("DELETE", uri, req_data)
        return ret

    def get(self, uri, req_data = None):
        """Retrieve a resource from the server"""
        ret = self._rest_call("GET", uri, req_data)
        return ret

    def set(self, uri, req_data):
        """Update a resource on the server"""
        ret = self._rest_call("PUT", uri, req_data)
        return ret

    def _rest_call(self, method, uri, req_data):
        """REST call"""
        if req_data == None:
            body = ""
        else:
            body = req_data

        logging.info('HTTP request: %s %s HTTP/1.1', method, uri)
        self.conn.request(method, uri, body, self.headers)
        response = self.conn.getresponse()
        ret = (response.status, response.reason, response.read())
        if response.status != httplib.OK:
            logging.info('%s', body)
            logging.error('HTTP response: HTTP/1.1 %s %s\n%s', ret[0], ret[1], ret[2])
        return ret

class OPIExecError(Exception):
    """OPI executes error."""
    pass

class ZTPErr(Exception):
    """ZTP error."""
    pass

def get_addr_by_hostname(ops_conn, host, addr_type = '1'):
    """Translate a host name to IPv4 address format. The IPv4 address is returned as a string."""
    logging.info("Get IP address by host name...")
    uri = "/dns/dnsNameResolution"
    root_elem = etree.Element('dnsNameResolution')
    etree.SubElement(root_elem, 'host').text = host
    etree.SubElement(root_elem, 'addrType').text = addr_type
    req_data = etree.tostring(root_elem, "UTF-8")
    ret, _, rsp_data = ops_conn.get(uri, req_data)
    if ret != httplib.OK:
        raise OPIExecError('Failed to get address by host name')

    root_elem = etree.fromstring(rsp_data)
    namespaces = {'vrp' : 'http://www.huawei.com/netconf/vrp'}
    uri = 'data' + uri.replace('/', '/vrp:') + '/vrp:'
    elem = root_elem.find(uri + "ipv4Addr", namespaces)
    if elem is None:
        raise OPIExecError('Failed to get IP address by host name')

    return elem.text

def _ftp_download_file(ops_conn, url, local_path, vpn_Instance, ip_protocol):
    """Download file using FTP."""
    logging.info('FTP download "%s" to "%s".', url, local_path)
    uri = "/ftpc/ftpcTransferFiles/ftpcTransferFile"
    str_temp_v4 = string.Template(
'''<?xml version="1.0" encoding="UTF-8"?>
<ftpcTransferFile>
    <serverIpv4Address>$serverIp</serverIpv4Address>
    <commandType>get</commandType>
    <userName>$username</userName>
    <password>$password</password>
    <localFileName>$localPath</localFileName>
    <remoteFileName>$remotePath</remoteFileName>
    <vpnInstanceName>$vpnInstance</vpnInstanceName>
</ftpcTransferFile>
''')
    str_temp_v6 = string.Template(
'''<?xml version="1.0" encoding="UTF-8"?>
<ftpcTransferFile>
    <serverIpv6Address>$serverIp</serverIpv6Address>
    <commandType>get</commandType>
    <userName>$username</userName>
    <password>$password</password>
    <localFileName>$localPath</localFileName>
    <remoteFileName>$remotePath</remoteFileName>
    <vpnInstanceName>$vpnInstance</vpnInstanceName>
</ftpcTransferFile>
''')
    url_tuple = urlparse(url)

    if ip_protocol == 'IPv4':
        """
        if re.match(r"\d+\.\d+\.\d+\.\d+", url_tuple.hostname):
            server_ip = url_tuple.hostname
        else:
            server_ip = get_addr_by_hostname(ops_conn, url_tuple.hostname)
        """
        server_ip = url_tuple.hostname
        req_data = str_temp_v4.substitute(serverIp = server_ip, username = url_tuple.username, password = url_tuple.password,
                                       remotePath = url_tuple.path[1:], localPath = local_path,
                                       vpnInstance = vpn_Instance)
    elif ip_protocol == 'IPv6':
        idx = url_tuple.netloc.find('@')
        server_ip = url_tuple.netloc[idx+1:]
        req_data = str_temp_v6.substitute(serverIp = server_ip, username = url_tuple.username,
                                          password = url_tuple.password, remotePath = url_tuple.path[1:],
                                          localPath = local_path, vpnInstance = vpn_Instance)
    else:
        logging.info('Unknown protocol {0} to run ftp download'.format(ip_protocol))
        req_data = ""
        return ERR

    ret, _, rsq_data = ops_conn.create(uri, req_data)
    if ret != httplib.OK:
        print('Failed to download file "%s" using FTP' % os.path.basename(local_path))
        logging.error(rsq_data)
        return ERR
    
    return OK

def _del_rsa_peer_key(ops_conn, key_name):
    """Delete RSA peer key configuration"""
    logging.info("Delete RSA peer key %s", key_name)
    uri = "/rsa/rsaPeerKeys/rsaPeerKey"
    root_elem = etree.Element('rsaPeerKey')
    etree.SubElement(root_elem, 'keyName').text = key_name
    req_data = etree.tostring(root_elem, "UTF-8")
    try:
        ret, _, _ = ops_conn.delete(uri, req_data)
        if ret != httplib.OK:
            raise OPIExecError('Failed to delete RSA peer key')

    except Exception, reason:
        logging.error(reason)

def _del_sshc_rsa_key(ops_conn, server_name, key_type = 'RSA'):
    """Delete SSH client RSA key configuration"""
    logging.debug("Delete SSH client RSA key for %s", server_name)
    uri = "/sshc/sshCliKeyCfgs/sshCliKeyCfg"
    root_elem = etree.Element('sshCliKeyCfg')
    etree.SubElement(root_elem, 'serverName').text = server_name
    etree.SubElement(root_elem, 'pubKeyType').text = key_type
    req_data = etree.tostring(root_elem, "UTF-8")
    try:
        ret, _, _ = ops_conn.delete(uri, req_data)
        if ret != httplib.OK:
            raise OPIExecError('Failed to delete SSH client RSA key')

    except Exception, reason:
        logging.error(reason)

    _del_rsa_peer_key(ops_conn, server_name)

def _set_sshc_first_time(ops_conn, switch):
    """Set SSH client attribute of authenticating user for the first time access"""
    if switch not in ['Enable', 'Disable']:
        return ERR

    logging.info('Set SSH client first-time enable switch = %s', switch)
    uri = "/sshc/sshClient"
    str_temp = string.Template(
'''<?xml version="1.0" encoding="UTF-8"?>
<sshClient>
    <firstTimeEnable>$enable</firstTimeEnable>
</sshClient>
''')
    req_data = str_temp.substitute(enable = switch)
    ret, _, _ = ops_conn.set(uri, req_data)
    if ret != httplib.OK:
        if switch == 'Enable':
            raise OPIExecError('Failed to enable SSH client first-time')
        else:
            raise OPIExecError('Failed to disable SSH client first-time')

    return OK

def _sftp_download_file(ops_conn, url, local_path, vpn_Instance, ip_protocol):
    """Download file using SFTP."""
    _set_sshc_first_time(ops_conn, 'Enable')

    logging.info('SFTP download "%s".', os.path.basename(url))
    uri = "/sshc/sshcConnects/sshcConnect"
    str_temp_v4 = string.Template(
'''<?xml version="1.0" encoding="UTF-8"?>
<sshcConnect>
    <HostAddrIPv4>$serverIp</HostAddrIPv4>
    <commandType>get</commandType>
    <userName>$username</userName>
    <password>$password</password>
    <localFileName>$localPath</localFileName>
    <remoteFileName>$remotePath</remoteFileName>
    <vpnInstanceName>$vpnInstance</vpnInstanceName>
    <identityKey>ssh-rsa</identityKey>
    <transferType>SFTP</transferType>
</sshcConnect>
''')
    str_temp_v6 = string.Template(
'''<?xml version="1.0" encoding="UTF-8"?>
<sshcConnect>
    <HostAddrIPv6>$serverIp</HostAddrIPv6>
    <commandType>get</commandType>
    <userName>$username</userName>
    <password>$password</password>
    <localFileName>$localPath</localFileName>
    <remoteFileName>$remotePath</remoteFileName>
    <vpnInstanceName>$vpnInstance</vpnInstanceName>
    <identityKey>ssh-rsa</identityKey>
    <transferType>SFTP</transferType>
</sshcConnect>
''')

    url_tuple = urlparse(url)
    
    if ip_protocol == 'IPv4':
        if re.match(r"\d+\.\d+\.\d+\.\d+", url_tuple.hostname):
            server_ip = url_tuple.hostname
        else:
            server_ip = get_addr_by_hostname(ops_conn, url_tuple.hostname)

        req_data = str_temp_v4.substitute(serverIp = server_ip, username = url_tuple.username,
                                          password = url_tuple.password, remotePath = url_tuple.path[1:],
                                          localPath = local_path, vpnInstance = vpn_Instance)

    elif ip_protocol == 'IPv6':
        idx = url_tuple.netloc.find('@')
        server_ip = url_tuple.netloc[idx + 1:]

        req_data = str_temp_v6.substitute(serverIp = server_ip, username = url_tuple.username,
                                          password = url_tuple.password, remotePath = url_tuple.path[1:],
                                          localPath = local_path, vpnInstance = vpn_Instance)
    else:
        logging.info('Unknown protocol {0} to run sftp download'.format(ip_protocol))
        req_data = ""
        return ERR

    ret, _, rsq_data = ops_conn.create(uri, req_data)
    if ret != httplib.OK:
        print('Failed to download file "%s" using SFTP' % os.path.basename(local_path))
        logging.error(rsq_data)
        ret = ERR
    else:
        ret = OK

    _del_sshc_rsa_key(ops_conn, server_ip)
    _set_sshc_first_time(ops_conn, 'Disable')
    return ret

def _tftp_download_file(ops_conn, url, local_path, vpn_Instance, ip_protocol):
    """Download file using TFTP."""
    logging.info('TFTP download "%s".', os.path.basename(url))
    uri = "/tftpc/tftpcTransferFiles/tftpcTransferFile"
    str_temp_v4 = string.Template(
'''<?xml version="1.0" encoding="UTF-8"?>
<tftpcTransferFile>
    <serverIpv4Address>$serverIp</serverIpv4Address>
    <commandType>get_cmd</commandType>
    <localFileName>$localPath</localFileName>
    <remoteFileName>$remotePath</remoteFileName>
    <vpnInstanceName>$vpnInstance</vpnInstanceName>
</tftpcTransferFile>
''')

    str_temp_v6 = string.Template(
'''<?xml version="1.0" encoding="UTF-8"?>
<tftpcTransferFile>
    <serverIpv6Address>$serverIp</serverIpv6Address>
    <commandType>get_cmd</commandType>
    <localFileName>$localPath</localFileName>
    <remoteFileName>$remotePath</remoteFileName>
    <vpnInstanceName>$vpnInstance</vpnInstanceName>
</tftpcTransferFile>
''')

    url_tuple = urlparse(url)
    
    if ip_protocol == 'IPv4':

        if re.match(r"\d+\.\d+\.\d+\.\d+", url_tuple.hostname):
            server_ip = url_tuple.hostname
        else:
            server_ip = get_addr_by_hostname(ops_conn, url_tuple.hostname)

        req_data = str_temp_v4.substitute(serverIp = server_ip, remotePath = url_tuple.path[1:],
                                          localPath = local_path, vpnInstance = vpn_Instance)

    elif ip_protocol == 'IPv6':
        idx = url_tuple.netloc.find('@')
        server_ip = url_tuple.netloc[idx+1:]
        req_data = str_temp_v6.substitute(serverIp = server_ip, remotePath = url_tuple.path[1:],
                                          localPath = local_path, vpnInstance = vpn_Instance)

    else:
        logging.info('Unknown protocol {0} to run tftp download'.format(ip_protocol))
        req_data = ""
        return ERR

    ret, _, rsq_data = ops_conn.create(uri, req_data)
    if ret != httplib.OK:
        print('Failed to download file "%s" using TFTP' % os.path.basename(local_path))
        logging.error(rsq_data)
        return ERR

    return OK

def download_file(ops_conn, url, local_path, retry_times = 0, vpn_Instance = "", ip_protocol = "IPv4"):
    """Download file, support TFTP, FTP, SFTP.

    tftp://hostname/path
    ftp://[username[:password]@]hostname[:port]/path
    sftp://[username[:password]@]hostname[:port]/path

    Args:
      ops_conn: OPS connection instance
      url: URL of remote file
      local_path: local path to put the file

    Returns:
        A integer of return code
    """
    print("Info: Download %s to %s" % (url, local_path))
    url_tuple = urlparse(url)
    print("Info: Download %s to %s" % (url_tuple.path[1:], local_path))
    func_dict = {'tftp': _tftp_download_file,
                 'ftp':  _ftp_download_file,
                 'sftp': _sftp_download_file}
    scheme = url_tuple.scheme
    if scheme not in func_dict.keys():
        raise ZTPErr('Unknown file transfer scheme %s' % scheme)

    ret = OK
    cnt = 0
    while (cnt < 1 + retry_times):
        if cnt:
            print('Retry downloading...')
            logging.info('Retry downloading...')
        ret = func_dict[scheme](ops_conn, url, local_path, vpn_Instance, ip_protocol)
        if ret is OK:
            break
        cnt += 1

    if ret is not OK:
        raise ZTPErr('Failed to download file "%s"' % os.path.basename(url))

    return OK

class StartupInfo(object):
    """Startup configuration information

    image: startup system software
    config: startup saved-configuration file
    patch: startup patch package
    """
    def __init__(self, image = None, config = None, patch = None):
        self.image = image
        self.config = config
        self.patch = patch

class Startup(object):
    """Startup configuration information

    current: current startup configuration
    next: current next startup configuration
    """
    def __init__(self, ops_conn):
        self.ops_conn = ops_conn
        self.current, self.next = self._get_startup_info()

    def _get_startup_info(self):
        """Get the startup information."""
        logging.info("Get the startup information...")
        uri = "/cfg/startupInfos/startupInfo"
        req_'<?xml version="1.0" encoding="UTF-8"?>
<startupInfo>
    <position/>
    <configedSysSoft/>
    <curSysSoft/>
    <nextSysSoft/>
    <curStartupFile/>
    <nextStartupFile/>
    <curPatchFile/>
    <nextPatchFile/>
</startupInfo>'''

        cnt = 0
        while (cnt < MAX_TIMES_GET_STARTUP):
            ret, _, rsp_data = self.ops_conn.get(uri, req_data)
            if ret != httplib.OK or rsp_data is '':
                cnt += 1
                logging.warning('Failed to get the startup information')
                continue

            root_elem = etree.fromstring(rsp_data)
            namespaces = {'vrp' : 'http://www.huawei.com/netconf/vrp'}
            mpath = 'data' + uri.replace('/', '/vrp:')  # match path
            nslen = len(namespaces['vrp'])
            elem = root_elem.find(mpath, namespaces)
            if elem is not None:
                break
            logging.warning('No query result while getting startup info')
            sleep(GET_STARTUP_INTERVAL)     # sleep to wait for system ready when no query result
            cnt += 1

        if elem is None:
            raise OPIExecError('Failed to get the startup information')

        current = StartupInfo()     # current startup info
        curnext = StartupInfo()     # next startup info
        for child in elem:
            tag = child.tag[nslen + 2:]       # skip the namespace, '{namespace}text'
            if tag == 'curSysSoft':
                current.image = child.text
            elif tag == 'nextSysSoft':
                curnext.image = child.text
            elif tag == 'curStartupFile' and child.text != 'NULL':
                current.config = child.text
            elif tag == 'nextStartupFile' and child.text != 'NULL':
                curnext.config = child.text
            elif tag == 'curPatchFile' and child.text != 'NULL':
                current.patch = child.text
            elif tag == 'nextPatchFile' and child.text != 'NULL':
                curnext.patch = child.text
            else:
                continue

        return current, curnext

    def _set_startup_image_file(self, file_path, slave):
        """Set the next startup system software"""
        logging.info("Set the next startup system software to %s...", file_path)
        uri = "/sum/startupbymode"
        str_temp = string.Template(
'''<?xml version="1.0" encoding="UTF-8"?>
<startupbymode>
    <softwareName>$fileName</softwareName>
    <mode>$startupMode</mode>
</startupbymode>
''')
        if slave:
            startup_mode = 'STARTUP_MODE_ALL'
        else:
            startup_mode = 'STARTUP_MODE_PRIMARY'
        req_data = str_temp.substitute(fileName = file_path, startupMode = startup_mode)
        # it is a action operation, so use create for HTTP POST
        ret, _, _ = self.ops_conn.create(uri, req_data)
        if ret != httplib.OK:
            raise OPIExecError("Failed to set startup system software")

    def _set_startup_config_file(self, file_path):
        """Set the next startup saved-configuration file"""
        logging.info("Set the next startup saved-configuration file to %s...", file_path)
        uri = "/cfg/setStartup"
        str_temp = string.Template(
'''<?xml version="1.0" encoding="UTF-8"?>
<setStartup>
    <fileName>$fileName</fileName>
</setStartup>
''')
        req_data = str_temp.substitute(fileName = file_path)
        # it is a action operation, so use create for HTTP POST
        ret, _, _ = self.ops_conn.create(uri, req_data)
        if ret != httplib.OK:
            raise OPIExecError("Failed to set startup configuration file")

    def _del_startup_config_file(self):
        """Delete startup config file"""
        logging.info("Delete the next startup config file...")
        uri = "/cfg/clearStartup"
        req_data = '''<?xml version="1.0" encoding="UTF-8"?>
<clearStartup>
</clearStartup>
'''
        # it is a action operation, so use create for HTTP POST
        ret, _, _ = self.ops_conn.create(uri, req_data)
        if ret != httplib.OK:
            raise OPIExecError("Failed to delete startup configuration file")

    def _set_startup_patch_file(self, file_path):
        """Set the next startup patch file"""
        logging.info("Set the next startup patch file to %s...", file_path)
        uri = "/patch/startup"
        str_temp = string.Template(
'''<?xml version="1.0" encoding="UTF-8"?>
<startup>
    <packageName>$fileName</packageName>
</startup>
''')
        req_data = str_temp.substitute(fileName = file_path)
        # it is a action operation, so use create for HTTP POST
        ret, _, _ = self.ops_conn.create(uri, req_data)
        if ret != httplib.OK:
            raise OPIExecError("Failed to set startup patch file")

    def _reset_startup_patch_file(self):
        """Rest patch file for system to startup"""
        logging.info("Reset the next startup patch file...")
        uri = "/patch/resetpatch"
        req_data = '''<?xml version="1.0" encoding="UTF-8"?>
<resetpatch>
</resetpatch>
'''
        # it is a action operation, so use create for HTTP POST
        ret, _, _ = self.ops_conn.create(uri, req_data)
        if ret != httplib.OK:
            raise OPIExecError('Failed to reset patch')

    def _check_startup_image_file(self, image_file, slave):
        logging.info("Check the startup image information...")
        if slave:
            check_time = MAX_TIMES_CHECK_STARTUPSOFT_SLAVE
        else:
            check_time = MAX_TIMES_CHECK_STARTUPSOFT
        uri = "/cfg/startupInfos/startupInfo"
        req_data = '''<?xml version="1.0" encoding="UTF-8"?>
<startupInfo>
    <position/>
    <configedSysSoft/>
    <curSysSoft/>
    <nextSysSoft/>
    <curStartupFile/>
    <nextStartupFile/>
    <curPatchFile/>
    <nextPatchFile/>
</startupInfo>'''
        cnt = 0
        while (cnt < check_time):
            sys.stdout.write(".")
            ret, _, rsp_data = self.ops_conn.get(uri, req_data)
            if ret != httplib.OK or rsp_data is '':
                cnt += 1
                logging.warning('Failed to get the startup image information')
                sleep(CHECK_CHECK_STARTUPSOFT_INTERVAL)     # sleep to wait for system ready when no query result
                continue
            root_elem = etree.fromstring(rsp_data)
            namespaces = {'vrp' : 'http://www.huawei.com/netconf/vrp'}
            mpath = 'data' + uri.replace('/', '/vrp:')  # match path
            nslen = len(namespaces['vrp'])
            elem = root_elem.find(mpath, namespaces)
            if elem is not None:
                for child in elem:
                    tag = child.tag[nslen + 2:]       # skip the namespace, '{namespace}text'
                    if tag == 'nextSysSoft':
                        if child.text == image_file:
                            print ""
                            sleep(5)
                            return True
                        break
            sleep(CHECK_CHECK_STARTUPSOFT_INTERVAL)     # sleep to wait for system ready when no query result
            cnt += 1
        logging.warning('System software is not ready.')
        print ""
        return False
    def reset_startup_info(self, slave):
        """Reset startup info and delete the downloaded files"""
        logging.info("Reset the next startup information...")
        _, configured = self._get_startup_info()

        # 1. Reset next startup config file and delete it
        try:
            if configured.config != self.next.config:
                if self.next.config is None:
                    self._del_startup_config_file()
                else:
                    self._set_startup_config_file(self.next.config)
                    if configured.config is not None:
                        del_file_all(self.ops_conn, configured.config, slave)

        except Exception, reason:
            logging.error(reason)

        # 2. Reset next startup patch file
        try:
            if configured.patch != self.next.patch:
                if self.next.patch is None:
                    self._reset_startup_patch_file()
                else:
                    self._set_startup_patch_file(self.next.patch)

                if configured.patch is not None:
                    sleep(2)
                    del_file_all(self.ops_conn, configured.patch, slave)
        except Exception, reason:
            logging.error(reason)

        # 3. Reset next startup system software and delete it
        try:
            if configured.image != self.next.image:
                self._set_startup_image_file(self.next.image, slave)
                sleep(2)
                del_file_all(self.ops_conn, configured.image, slave)
        except Exception, reason:
            logging.error(reason)


    def set_startup_info(self, image_file, config_file, patch_file, slave, esn_str):
        """Set the next startup information."""
        logging.info("Set the next startup information...")
        # 1. Set next startup system software
        if image_file is not None:
            print ("Info: Set the next software '%s', please wait for a moment" % (image_file)),
            try:
                self._set_startup_image_file(image_file, slave)
                if self._check_startup_image_file(image_file, slave) is not True:
                    raise OPIExecError('Failed to check the image file')
            except Exception, reason:
                logging.error(reason)
                del_file_all(self.ops_conn, image_file, slave)
                self.reset_startup_info(slave)
                raise

        # 2. Set next startup config file
        if config_file is not None:
            print ("Info: Set the next config file '%s' " % (config_file))
            cnt = 0
            while (cnt < 3):
                try:
                    self._set_startup_config_file(config_file)
                    break
                except Exception, reason:
                    logging.error(reason)
                    if(cnt < 2):
                        cnt += 1
                        print ("Retry...")
                        sleep(3)
                        continue
                    del_file_all(self.ops_conn, config_file, slave)
                    self.reset_startup_info(slave)
                    cnt += 1
                    raise

        # 3. Set next startup patch file
        if patch_file is not None:
            print ("Info: Set the next patch file '%s' " % (patch_file))
            try:
                self._set_startup_patch_file(patch_file)
            except Exception, reason:
                logging.error(reason)
                del_file_all(self.ops_conn, patch_file, slave)
                self.reset_startup_info(slave)
                raise

def get_cwd(ops_conn):
    """Get the full filename of the current working directory"""
    logging.info("Get the current working directory...")
    uri = "/vfm/pwds/pwd"
    req_data =  \
'''<?xml version="1.0" encoding="UTF-8"?>
<pwd>
    <dictionaryName/>
</pwd>
'''
    ret, _, rsp_data = ops_conn.get(uri, req_data)
    if ret != httplib.OK or rsp_data is '':
        raise OPIExecError('Failed to get the current working directory')

    root_elem = etree.fromstring(rsp_data)
    namespaces = {'vrp' : 'http://www.huawei.com/netconf/vrp'}
    uri = 'data' + uri.replace('/', '/vrp:') + '/vrp:'
    elem = root_elem.find(uri + "dictionaryName", namespaces)
    if elem is None:
        raise OPIExecError('Failed to get the current working directory for no "directoryName" element')

    return elem.text

def file_exist(ops_conn, file_path):
    """Returns True if file_path refers to an existing file, otherwise returns False"""
    uri = "/vfm/dirs/dir"
    str_temp = string.Template(
'''<?xml version="1.0" encoding="UTF-8"?>
<dir>
    <fileName>$fileName</fileName>
</dir>
''')
    req_data = str_temp.substitute(fileName = file_path)
    ret, _, rsp_data = ops_conn.get(uri, req_data)
    if ret != httplib.OK or rsp_data is '':
        raise OPIExecError('Failed to list information about the file "%s"' % file_path)

    root_elem = etree.fromstring(rsp_data)
    namespaces = {'vrp' : 'http://www.huawei.com/netconf/vrp'}
    uri = 'data' + uri.replace('/', '/vrp:') + '/vrp:'
    elem = root_elem.find(uri + "fileName", namespaces)
    if elem is None:
        return False

    return True

def del_file(ops_conn, file_path):
    """Delete a file permanently"""
    if file_path is None or file_path is '':
        return

    logging.info("Delete file %s permanently", file_path)
    uri = "/vfm/deleteFileUnRes"
    str_temp = string.Template(
'''<?xml version="1.0" encoding="UTF-8"?>
<deleteFileUnRes>
    <fileName>$filePath</fileName>
</deleteFileUnRes>
''')
    req_data = str_temp.substitute(filePath = file_path)
    try:
        # it is a action operation, so use create for HTTP POST
        ret, _, _ = ops_conn.create(uri, req_data)
        if ret != httplib.OK:
            raise OPIExecError('Failed to delete the file "%s" permanently' % file_path)

    except Exception, reason:
        logging.error(reason)

def del_file_all(ops_conn, file_path, slave):
    """Delete a file permanently on all main boards"""
    if file_path:
        del_file(ops_conn, file_path)
        if slave:
            del_file(ops_conn, 'slave#' + file_path)

def copy_file(ops_conn, src_path, dest_path):
    """Copy a file"""
    print('Info: Copy file %s to %s...' % (src_path, dest_path))
    logging.info('Copy file %s to %s...', src_path, dest_path)
    uri = "/vfm/copyFile"
    str_temp = string.Template(
'''<?xml version="1.0" encoding="UTF-8"?>
<copyFile>
    <srcFileName>$src</srcFileName>
    <desFileName>$dest</desFileName>
</copyFile>
''')
    req_data = str_temp.substitute(src = src_path, dest = dest_path)

    # it is a action operation, so use create for HTTP POST
    ret, _, _ = ops_conn.create(uri, req_data)
    if ret != httplib.OK:
        raise OPIExecError('Failed to copy "%s" to "%s"' % (src_path, dest_path))

def has_slave_mpu(ops_conn):
    """Whether device has slave MPU, returns a bool value"""
    logging.info("Test whether device has slave MPU...")
    uri = "/devm/phyEntitys"
    req_data =  \
'''<?xml version="1.0" encoding="UTF-8"?>
<phyEntitys>
    <phyEntity>
        <entClass>mpuModule</entClass>
        <entStandbyState/>
        <position/>
    </phyEntity>
</phyEntitys>
'''
    ret, _, rsp_data = ops_conn.get(uri, req_data)
    if ret != httplib.OK or rsp_data is '':
        raise OPIExecError('Failed to get the device slave information')

    root_elem = etree.fromstring(rsp_data)
    namespaces = {'vrp' : 'http://www.huawei.com/netconf/vrp'}
    uri = 'data' + uri.replace('/', '/vrp:') + '/vrp:'
    for entity in root_elem.findall(uri + 'phyEntity', namespaces):
        elem = entity.find("vrp:entStandbyState", namespaces)
        if elem is not None and elem.text == 'slave':
            return True

    return False

def get_system_info(ops_conn):
    """Get system info, returns a dict"""
    logging.info("Get the system information...")
    uri = "/system/systemInfo"
    req_data =  \
'''<?xml version="1.0" encoding="UTF-8"?>
<systemInfo>
    <productName/>
    <esn/>
    <mac/>
</systemInfo>
'''
    ret, _, rsp_data = ops_conn.get(uri, req_data)
    if ret != httplib.OK or rsp_data is '':
        raise OPIExecError('Failed to get the system information')

    sys_info = {}.fromkeys(('productName', 'esn', 'mac'))
    root_elem = etree.fromstring(rsp_data)
    namespaces = {'vrp' : 'http://www.huawei.com/netconf/vrp'}
    uri = 'data' + uri.replace('/', '/vrp:')
    nslen = len(namespaces['vrp'])
    elem = root_elem.find(uri, namespaces)
    if elem is not None:
        for child in elem:
            tag = child.tag[nslen + 2:]       # skip the namespace, '{namespace}esn'
            if tag in sys_info.keys():
                sys_info[tag] = child.text

    return sys_info

def test_file_paths(image, config, patch, md5_file):
    """Test whether argument paths are valid."""
    logging.info("Test whether argument paths are valid...")
    # check image file path
    file_name = os.path.basename(image)
    if file_name is not '' and not file_name.lower().endswith('.cc'):
        print('Error: Invalid filename extension of system software')
        return False

    # check config file path
    file_name = os.path.basename(config)
    file_name = file_name.lower()
    _, ext = os.path.splitext(file_name)
    if file_name is not '' and ext not in ['.cfg', '.zip', '.dat']:
        print('Error: Invalid filename extension of configuration file')
        return False

    # check patch file path
    file_name = os.path.basename(patch)
    if file_name is not '' and not file_name.lower().endswith('.pat'):
        print('Error: Invalid filename extension of patch file')
        return False

    # check md5 file path
    file_name = os.path.basename(md5_file)
    if file_name is not '' and not file_name.lower().endswith('.txt'):
        print('Error: Invalid filename extension of md5 file')
        return False

    return True

def md5sum(fname, need_skip_first_line = False):
    """
    Calculate md5 num for this file.
    """

    def read_chunks(fhdl):
        '''read chunks'''
        chunk = fhdl.read(8096)
        while chunk:
            yield chunk
            chunk = fhdl.read(8096)
        else:
            fhdl.seek(0)

    md5_obj = hashlib.md5()
    if isinstance(fname, basestring):
        with open(fname, "rb") as fhdl:
            #skip the first line
            fhdl.seek(0)
            if need_skip_first_line:
                fhdl.readline()
            for chunk in read_chunks(fhdl):
                md5_obj.update(chunk)
    elif fname.__class__.__name__ in ["StringIO", "StringO"] or isinstance(fname, file):
        for chunk in read_chunks(fname):
            md5_obj.update(chunk)
    else:
        pass
    return md5_obj.hexdigest()

def md5_get_from_file(fname):
    """Get md5 num form file, stored in first line"""

    with open(fname, "rb") as fhdl:
        fhdl.seek(0)
        line_first = fhdl.readline()

    # if not match pattern, the format of this file is not supported
    if not re.match('^#md5sum="[\\w]{32}"[\r\n]+$', line_first):
        return 'None'

    return line_first[9:41]

def md5_check_with_first_line(fname):
    """Validate md5 for this file"""

    fname = os.path.basename(fname)
    md5_calc = md5sum(fname, True)
    md5_file = md5_get_from_file(fname)

    if md5_file.lower() != md5_calc:
        logging.warning('MD5 check failed, file %s', fname)
        print('MD5 checksum of the file "%s" is %s' % (fname, md5_calc))
        logging.warning('MD5 checksum of the file "%s" is %s', fname, md5_calc)
        print('MD5 checksum received from the file "%s" is %s' % (fname, md5_file))
        logging.warning('MD5 checksum received from the file "%s" is %s', fname, md5_file)
        return False

    return True

def md5_check_with_dic(md5_dic, fname):
    """md5 check with dic"""
    if not md5_dic.has_key(fname):
        logging.info('md5_dic does not has key %s, no need to do md5 verification', fname)
        return True

    md5sum_result = md5sum(fname, False)
    if md5_dic[fname] == md5sum_result:
        return True

    print('MD5 checksum of the file "%s" is %s' % (fname, md5sum_result))
    print('MD5 checksum received for the file "%s" is %s' % (fname, md5_dic[fname]))
    logging.warning('MD5 check failed, file %s', fname)
    logging.warning('MD5 checksum of the file "%s" is %s', fname, md5sum_result)
    logging.warning('MD5 checksum received for the file "%s" is %s', fname, md5_dic[fname])

    return False

def parse_md5_file(fname):
    """parse md5 file"""

    def read_line(fhdl):
        """read a line by loop"""
        line = fhdl.readline()
        while line:
            yield line
            line = fhdl.readline()
        else:
            fhdl.seek(0)

    md5_dic = {}
    with open(fname, "rb") as fhdl:
        for line in read_line(fhdl):
            line_spilt = line.split()
            if 2 != len(line_spilt):
                continue
            dic_tmp = {line_spilt[0]: line_spilt[1]}
            md5_dic.update(dic_tmp)
    return md5_dic

def verify_and_parse_md5_file(fname):
    """
    vefiry data integrity of md5 file and parse this file

    format of this file is like:
    ------------------------------------------------------------------
    #md5sum="517cf194e2e1960429c6aedc0e4dba37"

    file-name              md5
    conf_5618642831132.cfg c0ace0f0542950beaacb39cd1c3b5716
    ------------------------------------------------------------------
    """
    if not md5_check_with_first_line(fname):
        return ERR, None
    return OK, parse_md5_file(fname)

def check_parameter(aset):
    seq = ['&', '>', '<', '"', "'"]
    if aset:
        for c in seq:
             if c in aset:
                    return True
    return False

def check_filename(ops_conn):
    sys_info = get_system_info(ops_conn)
    url_tuple = urlparse(FILE_SERVER)
    if check_parameter(url_tuple.username) or check_parameter(url_tuple.password):
        raise ZTPErr('Invalid username or password, the name should not contain: '+'&'+' >'+' <'+' "'+" '.")
    file_name = os.path.basename(REMOTE_PATH_IMAGE.get(sys_info['productName'], ''))
    if file_name is not '' and check_parameter(file_name):
       raise ZTPErr('Invalid filename of system software, the name should not contain: '+'&'+' >'+' <'+' "'+" '.")
    file_name = os.path.basename(REMOTE_PATH_CONFIG)
    if file_name is not '' and check_parameter(file_name):
       raise ZTPErr('Invalid filename of configuration file, the name should not contain: '+'&'+' >'+' <'+' "'+" '.")
    file_name = os.path.basename(REMOTE_PATH_PATCH.get(sys_info['productName'], ''))
    if file_name is not '' and check_parameter(file_name):
       raise ZTPErr('Invalid filename of patch file, the name should not contain: '+'&'+' >'+' <'+' "'+" '.")
    file_name = os.path.basename(REMOTE_PATH_MD5)
    if file_name is not '' and check_parameter(file_name):
       raise ZTPErr('Invalid filename of md5 file, the name should not contain: '+'&'+' >'+' <'+' "'+" '.")
    return OK

def main_proc(ops_conn, vpn_Instance, ip_protocol):
    """Main processing"""
    sys_info = get_system_info(ops_conn)    # Get system info, such as esn and system mac
    cwd = get_cwd(ops_conn)                 # Get the current working directory
    startup = Startup(ops_conn)
    slave = has_slave_mpu(ops_conn)         # Check whether slave MPU board exists or not
    chg_flag = False

    check_filename(ops_conn)

    # check remote file paths
    if not test_file_paths(REMOTE_PATH_IMAGE.get(sys_info['productName'], ''), REMOTE_PATH_CONFIG,
                           REMOTE_PATH_PATCH.get(sys_info['productName'], ''), REMOTE_PATH_MD5):
        return ERR

    # download md5 file first, used to verify data integrity of files which will be downloaded next
    local_path_md5 = None
    file_path = REMOTE_PATH_MD5
    if not file_path.startswith('/'):
        file_path = '/' + file_path
    file_name = os.path.basename(file_path)
    if file_name is not '':
        url = FILE_SERVER + file_path
        local_path_md5 = cwd + file_name
        ret = download_file(ops_conn, url, local_path_md5, MAX_TIMES_RETRY_DOWNLOAD, vpn_Instance, ip_protocol)
        if ret is ERR or not file_exist(ops_conn, file_name):
            print('Error: Failed to download MD5 file "%s"' % file_name)
            return ERR
        print('Info: Download MD5 file successfully')
        ret, md5_dic = verify_and_parse_md5_file(file_name)
        # delete the file immediately
        del_file_all(ops_conn, local_path_md5, None)
        if ret is ERR:
            print('Error: MD5 check failed, file "%s"' % file_name)
            return ERR
    else:
        md5_dic = {}

    # download configuration file
    local_path_config = None
    file_path = REMOTE_PATH_CONFIG
    if "%s" in file_path:
        file_path = REMOTE_PATH_CONFIG % sys_info['esn']
    if not file_path.startswith('/'):
        file_path = '/' + file_path
    file_name = os.path.basename(file_path)
    if file_name is not '':
        url = FILE_SERVER + file_path
        local_path_config = cwd + file_name
        ret = download_file(ops_conn, url, local_path_config, MAX_TIMES_RETRY_DOWNLOAD, vpn_Instance, ip_protocol)
        if ret is ERR or not file_exist(ops_conn, file_name):
            print('Error: Failed to download configuration file "%s"' % file_name)
            return ERR
        print('Info: Download configuration file successfully')
        if not md5_check_with_dic(md5_dic, file_name):
            print('Error: MD5 check failed, file "%s"' % file_name)
            del_file_all(ops_conn, local_path_config, slave)
            return ERR
        if slave:
            copy_file(ops_conn, local_path_config, 'slave#' + local_path_config)
        chg_flag = True

    # download patch file
    local_path_patch = None
    file_path = REMOTE_PATH_PATCH.get(sys_info['productName'], '')
    if not file_path.startswith('/'):
        file_path = '/' + file_path
    file_name = os.path.basename(file_path)
    if startup.current.patch:
        cur_pat = os.path.basename(startup.current.patch).lower()
    else:
        cur_pat = ''
    if file_name is not '' and file_name.lower() != cur_pat:
        url  = FILE_SERVER + file_path
        local_path_patch = cwd + file_name
        ret = download_file(ops_conn, url, local_path_patch, MAX_TIMES_RETRY_DOWNLOAD, vpn_Instance, ip_protocol)
        if ret is ERR or not file_exist(ops_conn, file_name):
            print('Error: Failed to download patch file "%s"' % file_name)
            del_file_all(ops_conn, local_path_config, slave)
            return ERR
        print('Info: Download patch file successfully')
        if not md5_check_with_dic(md5_dic, file_name):
            print('Error: MD5 check failed, file "%s"' % file_name)
            del_file_all(ops_conn, local_path_config, slave)
            del_file_all(ops_conn, local_path_patch, slave)
            return ERR
        if slave:
            copy_file(ops_conn, local_path_patch, 'slave#' + local_path_patch)
        chg_flag = True

    # download system software
    local_path_image = None
    file_path = REMOTE_PATH_IMAGE.get(sys_info['productName'], '')
    if not file_path.startswith('/'):
        file_path = '/' + file_path
    file_name = os.path.basename(file_path)
    if startup.current.image:
        cur_image = os.path.basename(startup.current.image).lower()
    else:
        cur_image = ''
    if file_name is not '' and file_name.lower() != cur_image:
        url  = FILE_SERVER + file_path
        local_path_image = cwd + file_name
        ret = download_file(ops_conn, url, local_path_image, MAX_TIMES_RETRY_DOWNLOAD, vpn_Instance, ip_protocol)
        if ret is ERR or not file_exist(ops_conn, file_name):
            if file_exist(ops_conn, file_name):
                del_file_all(ops_conn, local_path_image, slave)
            print('Error: Failed to download system software "%s"' % file_name)
            del_file_all(ops_conn, local_path_config, slave)
            del_file_all(ops_conn, local_path_patch, slave)
            return ERR
        print('Info: Download system software file successfully')
        if not md5_check_with_dic(md5_dic, file_name):
            print('Error: MD5 check failed, file "%s"' % file_name)
            del_file_all(ops_conn, local_path_config, slave)
            del_file_all(ops_conn, local_path_patch, slave)
            del_file_all(ops_conn, local_path_image, slave)
            return ERR
        if slave:
            copy_file(ops_conn, local_path_image, 'slave#' + local_path_image)
        chg_flag = True

    if chg_flag is False:
        return ERR

    # set startup info
    startup.set_startup_info(local_path_image, local_path_config, local_path_patch,
                             slave, sys_info['esn'])

    return OK

def main(vpn_Instance = '', ip_protocol = 'IPv4'):
    """The main function of user script. It is called by ZTP frame, so do not remove or change this function.

    Args:
    Raises:
    Returns: user script processing result
    """
    host = "localhost"
    try:
        # Make an OPS connection instance.
        ops_conn = OPSConnection(host)
        ret = main_proc(ops_conn, vpn_Instance, ip_protocol)

    except OPIExecError, reason:
        logging.error('OPI execute error: %s', reason)
        print("Error: %s" % reason)
        ret = ERR

    except ZTPErr, reason:
        logging.error('ZTP error: %s', reason)
        print("Error: %s" % reason)
        ret = ERR

    except IOError, reason:
        print("Error: %s" % reason)
        ret = ERR

    except Exception, reason:
        logging.error(reason)
        traceinfo = traceback.format_exc()
        logging.debug(traceinfo)
        ret = ERR

    finally:
        # Close the OPS connection
        ops_conn.close()

    return ret

if __name__ == "__main__":
    main()

Fields in a Python Script File

NOTE:

The information in bold can be modified based on actual requirements.

  • Specify an MD5 hash for the script file.

    #md5sum="5bb3da014aa88ba8111c2f1191373716"

    The MD5 hash is used to check the integrity of the script file.

    An MD5 hash for a script file can be generated using an MD5 calculation tool, such as md5sum.
    NOTE:

    The script file cannot contain #md5sum= when the MD5 hash is being generated. Add #md5sum= at the beginning of the script file after the MD5 hash is generated.

  • Specify the path where version files can be obtained.

    FILE_SERVER = 'ftp://ftpuser:Pwd123@10.1.3.2'
    

    Version files can be downloaded from a TFTP, FTP, or SFTP file server. The path format varies according to the file server type.

    • tftp://hostname
    • ftp://[username[:password]@]hostname[:port]
    • sftp://[username[:password]@]hostname[:port]

    username, password, and port are optional.

  • Specify the path and name of system software.

    REMOTE_PATH_IMAGE = {
      'NE40E'  : '/image/V800R010C10B092-NE40E.cc',}

    "NE40E" indicates the device type.

    "/image/V800R010C10B092-NE40E.cc" indicates the path and name of the system software to be obtained by the specified device type.

    If the system software is not required, set an empty value for this field. For example:
    REMOTE_PATH_IMAGE   = {
        'NE40E'    :   '',
    }
    
  • Specify the path and name of a configuration file.

    REMOTE_PATH_CONFIG = '/conf_%s.cfg'

    %s indicates a device ESN, based on which you can obtain a configuration file. This field cannot be edited.

  • Specify the path and name of a patch file.

    REMOTE_PATH_PATCH = {
        'NE40E'    :   '/patch/NE40EV800R010C10SPH001.PAT',}

    "NE40E" indicates the device type.

    "/patch/NE40EV800R010C10SPH001.PAT" indicates the path and name of the patch file to be obtained by the specified device type.

    If a patch file is not required, set an empty value for this field. For example:
    REMOTE_PATH_PATCH = ''
  • Specify the path and name of an MD5 hash file.

    REMOTE_PATH_MD5 = '/md5.txt'

    An MD5 hash is used to check the integrity of a downloaded file.

    For details about the format of an MD5 hash file, see Version File Integrity Check.

    If a downloaded file does not need to be checked, set an empty value for this field.

  • Define the interval at which device startup information is collected.

    GET_STARTUP_INTERVAL = 15

    This field defines the interval at which device startup information is collected.

  • Define the maximum number of times for device startup information collection attempts.

    MAX_TIMES_GET_STARTUP = 120

    This field defines the maximum number of attempts to collect device startup information if the information fails to be collected.

  • Define the interval for checking whether system software is successfully set.

    CHECK_CHECK_STARTUPSOFT_INTERVAL = 5

    This field defines the interval for checking whether system software is successfully set.

  • Define the maximum number of attempts to check whether system software is successfully set on a device with a single main control board.

    MAX_TIMES_CHECK_STARTUPSOFT = 205

    This field defines the maximum number of attempts to check whether system software is successfully set on a device with a single main control board if the system software setting fails.

  • Define the maximum number of attempts to check whether system software is successfully set on a device with dual main control boards.

    MAX_TIMES_CHECK_STARTUPSOFT_SLAVE = 265

    This field defines the maximum number of attempts to check whether system software is successfully set on a device with dual main control boards if the system software setting fails.

  • Define the maximum number of file download attempts.

    MAX_TIMES_RETRY_DOWNLOAD = 3

    This field defines the maximum number of attempts to download a file if the file fails to be downloaded.

  • Create an OPS connection

    class OPSConnection()

    This field does not require user editing.

  • Define an HTTP header.

    def __init__()

    This field does not require user editing.

  • Encapsulate an OPS connection.

    self.conn = httplib.HTTPConnection()

    This field does not require user editing.

  • Tear down an OPS connection.

    def close()
    

    This field does not require user editing.

  • Add device resources.

    def create()
    

    This field does not require user editing.

  • Delete device resources.

    def delete()
    

    This field does not require user editing.

  • Obtain device resources.

    def get()
    

    This field does not require user editing.

  • Set device resources.

    def set()
    

    This field does not require user editing.

  • Define the Representational State Transfer (REST) standard for requests.

    def _rest_call()

    This field does not require user editing.

  • Enable the device to output debugging logs.

    logging.debug()

    This field does not require user editing.

  • Define an OPS execution error.

    class OPIExecError()

    This field does not require user editing.

  • Define a ZTP error.

    class ZTPErr()

    This field does not require user editing.

  • Enable domain name resolution.

    def get_addr_by_hostname()

    This field does not require user editing.

  • Define the FTP file download mode.

    def _ftp_download_file()

    This field does not require user editing.

  • Enable the function to clear an RSA key if an SFTP download failure occurs.

    def _del_rsa_peer_key()

    This field does not require user editing.

  • Enable the function to clear an RSA key and SSH server address if an SSH download failure occurs.

    def _del_sshc_rsa_key()

    This field does not require user editing.

  • Set the attributes of the SSH user during the first access.

    def _set_sshc_first_time()

    This field does not require user editing.

  • Define the SFTP file download mode.

    def _sftp_download_file()

    This field does not require user editing.

  • Define the TFTP file download mode.

    def _tftp_download_file()

    This field does not require user editing.

  • Define the file download parameters.

    def download_file()

    This field does not require user editing.

  • Obtain startup device and information.

    class StartupInfo()
    class Startup()

    This field does not require user editing.

  • Obtain the working directory.

    def get_cwd()

    This field does not require user editing.

  • Check whether the files to be downloaded exist.

    def file_exist()

    This field does not require user editing.

  • Delete files if an operation failure occurs.

    def del_file()

    If a file fails to be loaded, all files downloaded by the device must be deleted to roll the device back to the state before ZTP was performed.

    This field does not require user editing.

  • Copy a file.

    def copy_file()

    This field does not require user editing.

  • Check whether a slave main control board is available.

    def has_slave_mpu()

    This field does not require user editing.

  • Obtain the system information of a device.

    def get_system_info()

    This field does not require user editing.

  • Check the validity of file paths.

    def test_file_paths()

    This field does not require user editing.

  • Enable MD5 check for files.

    def md5sum()
    def md5_get_from_file()
    def md5_check_with_first_line()
    def md5_check_with_dic()
    def parse_md5_file()
    def verify_and_parse_md5_file()

    This field does not require user editing.

  • Check whether user names, passwords, and file names contain special characters.

    def check_parameter()
    def check_filename()

    This field does not require user editing.

  • Define the overall ZTP process.

    def main_proc()
    def main()
    if __name__ == "__main__":
        main()

    This field does not require user editing.

    The main function must be provided. Otherwise, the script cannot be executed.

Translation
Download
Updated: 2019-01-03

Document ID: EDOC1100055037

Views: 5550

Downloads: 37

Average rating:
This Document Applies to these Products
Related Version
Related Documents
Share
Previous Next