Wiznet makers

gavinchang

Published August 27, 2023 ©

53 UCC

25 WCC

60 VAR

0 Contests

3 Followers

0 Following

Original Link

Micropython ESP32 C3 connected to W5500 wired Ethernet UDP&TCP protocol communication module

Micropython ESP32 C3 connected to W5500 wired Ethernet UDP&TCP protocol communication module

COMPONENTS
PROJECT DESCRIPTION


First, the purpose

       In this section, we will learn how to use Hezhou ESP32 C3 to connect W5500 wired Ethernet UDP&TCP protocol communication module for network communication experiments.

2. Environment

       ESP32 C3 development board (MicroPython v1.19.1 on 2022-06-18) + W5500 wired Ethernet UDP&TCP protocol communication module + several Dupont cables + Win10 commercial version

       ESP32 C3 and module wiring method:

PS: Please note that the W5500 module is connected to the development board through the SPI serial port Dupont cable, the development board is connected to the computer through the USB to serial port, the computer is connected to the router, and the W5500 LAN port is connected to the router through the network cable, so that the W5500 and the computer are in the same network segment .

3. Drive 1

wiznet5k.py

# SPDX-FileCopyrightText: 2008 Bjoern Hartmann
# SPDX-FileCopyrightText: 2010 WIZnet
# SPDX-FileCopyrightText: 2010 Arduino LLC
# SPDX-FileCopyrightText: 2018 Paul Stoffregen
# SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries
# SPDX-FileCopyrightText: 2021 Patrick Van Oosterwijck
# SPDX-FileCopyrightText: 2021 Adam Cummick
# SPDX-FileCopyrightText: 2021 Vincenzo D'Angelo
#
# SPDX-License-Identifier: MIT



"""
`wiznet5k`
================================================================================
Pure-Python interface for WIZNET 5k ethernet modules.
* Author(s): WIZnet, Arduino LLC, Bjoern Hartmann, Paul Stoffregen, Brent Rubell,
 Patrick Van Oosterwijck, Vincenzo D'Angelo
Implementation Notes
--------------------
**Software and Dependencies:**
* Micropython on ESP32 (tested on ESP32-WROVER-IE equipped of Wiznet5500)
"""

import time
import wiznet5k_dhcp as dhcp
import wiznet5k_dns as dns

from random import randint
from micropython import const
from machine import SPI


# Wiznet5k Registers
REG_MR = const(0x0000)  # Mode
REG_GAR = const(0x0001)  # Gateway IP Address
REG_SUBR = const(0x0005)  # Subnet Mask Address
REG_VERSIONR_W5500 = const(0x0039)  # W5500 Silicon Version
REG_SHAR = const(0x0009)  # Source Hardware Address
REG_SIPR = const(0x000F)  # Source IP Address
REG_PHYCFGR = const(0x002E)  # W5500 PHY Configuration

# Wiznet5k Socket Registers
REG_SNMR = const(0x0000)  # Socket n Mode
REG_SNCR = const(0x0001)  # Socket n Command
REG_SNIR = const(0x0002)  # Socket n Interrupt
REG_SNSR = const(0x0003)  # Socket n Status
REG_SNPORT = const(0x0004)  # Socket n Source Port
REG_SNDIPR = const(0x000C)  # Destination IP Address
REG_SNDPORT = const(0x0010)  # Destination Port
REG_SNRX_RSR = const(0x0026)  # RX Free Size
REG_SNRX_RD = const(0x0028)  # Read Size Pointer
REG_SNTX_FSR = const(0x0020)  # Socket n TX Free Size
REG_SNTX_WR = const(0x0024)  # TX Write Pointer

# SNSR Commands
SNSR_SOCK_CLOSED = const(0x00)
SNSR_SOCK_INIT = const(0x13)
SNSR_SOCK_LISTEN = const(0x14)
SNSR_SOCK_SYNSENT = const(0x15)
SNSR_SOCK_SYNRECV = const(0x16)
SNSR_SOCK_ESTABLISHED = const(0x17)
SNSR_SOCK_FIN_WAIT = const(0x18)
SNSR_SOCK_CLOSING = const(0x1A)
SNSR_SOCK_TIME_WAIT = const(0x1B)
SNSR_SOCK_CLOSE_WAIT = const(0x1C)
SNSR_SOCK_LAST_ACK = const(0x1D)
SNSR_SOCK_UDP = const(0x22)
SNSR_SOCK_IPRAW = const(0x32)
SNSR_SOCK_MACRAW = const(0x42)
SNSR_SOCK_PPPOE = const(0x5F)

# Sock Commands (CMD)
CMD_SOCK_OPEN = const(0x01)
CMD_SOCK_LISTEN = const(0x02)
CMD_SOCK_CONNECT = const(0x04)
CMD_SOCK_DISCON = const(0x08)
CMD_SOCK_CLOSE = const(0x10)
CMD_SOCK_SEND = const(0x20)
CMD_SOCK_SEND_MAC = const(0x21)
CMD_SOCK_SEND_KEEP = const(0x22)
CMD_SOCK_RECV = const(0x40)

# Socket n Interrupt Register
SNIR_SEND_OK = const(0x10)
SNIR_TIMEOUT = const(0x08)
SNIR_RECV = const(0x04)
SNIR_DISCON = const(0x02)
SNIR_CON = const(0x01)

CH_SIZE = const(0x100)
SOCK_SIZE = const(0x800)  # MAX W5k socket size
# Register commands
MR_RST = const(0x80)  # Mode Register RST
# Socket mode register
SNMR_CLOSE = const(0x00)
SNMR_TCP = const(0x21)
SNMR_UDP = const(0x02)
SNMR_IPRAW = const(0x03)
SNMR_MACRAW = const(0x04)
SNMR_PPPOE = const(0x05)

MAX_PACKET = const(4000)
LOCAL_PORT = const(0x400)
# Default hardware MAC address
DEFAULT_MAC = (0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED)

# Maximum number of sockets to support, differs between chip versions.
W5200_W5500_MAX_SOCK_NUM = const(0x08)
SOCKET_INVALID = const(255)

# UDP socket struct.
UDP_SOCK = {"bytes_remaining": 0, "remote_ip": 0, "remote_port": 0}

# Source ports in use
SRC_PORTS = [0] * W5200_W5500_MAX_SOCK_NUM


class WIZNET5K:  # pylint: disable=too-many-public-methods
   """Interface for WIZNET5K module.
   :param ~machine.SPI spi_bus: The SPI bus the Wiznet module is connected to.
   :param ~machine.Pin cs: Chip select pin.
   :param ~machine.Pin rst: Optional reset pin.
   :param bool is_dhcp: Whether to start DHCP automatically or not.
   :param list mac: The Wiznet's MAC Address.
   :param str hostname: The desired hostname, with optional {} to fill in MAC.
   :param int dhcp_timeout: Timeout in seconds for DHCP response.
   :param bool debug: Enable debugging output.
   """

   TCP_MODE = const(0x21)
   UDP_MODE = const(0x02)
   TLS_MODE = const(0x03)  # This is NOT currently implemented

   # pylint: disable=too-many-arguments
   def __init__(self, spi_bus, cs, reset=None, is_dhcp=True, mac=DEFAULT_MAC, hostname=None, dhcp_timeout=30, debug=False):
       self._debug = debug
       self._chip_type = None
       self._device = spi_bus
       self._device.init(baudrate=500000, polarity=0, phase=0) #baudrate=8000000
       self._cs = cs        
       # reset wiznet module prior to initialization
       if reset:
           reset.on()
           time.sleep(0.1)
           reset.off()
           time.sleep(0.1)
       # Buffer for reading params from module
       self._pbuff = bytearray(8)
       self._rxbuf = bytearray(MAX_PACKET)
       # attempt to initialize the module
       self._ch_base_msb = 0
       assert self._w5100_init() == 1, "Failed to initialize WIZnet module."
       # Set MAC address
       self.mac_address = mac
       self.src_port = 0
       self._dns = 0
       # Set DHCP
       self._dhcp_client = None
       if is_dhcp:
           ret = self.set_dhcp(hostname, dhcp_timeout)
           if ret != 0:
               self._dhcp_client = None
           assert ret == 0, "Failed to configure DHCP Server!"


   def set_dhcp(self, hostname=None, response_timeout=30):
       """Initializes the DHCP client and attempts to retrieve
       and set network configuration from the DHCP server.
       Returns 0 if DHCP configured, -1 otherwise.
       :param str hostname: The desired hostname, with optional {} to fill in MAC.
       :param int response_timeout: Time to wait for server to return packet, in seconds.
       """
       if self._debug:
           print("* Initializing DHCP")
       # First, wait link status is on
       # to avoid the code during DHCP - assert self.link_status, "Ethernet cable disconnected!"
       start_time = time.time()
       while True:
           if self.link_status or ((time.time() - start_time) > 5):
               break
           time.sleep(1)
           if self._debug:
               print("My Link is:", self.link_status)
       # Return IP assigned by DHCP
       self._dhcp_client = dhcp.DHCP(self, self.mac_address, hostname, response_timeout, debug=self._debug)
       ret = self._dhcp_client.request_dhcp_lease()
       if ret == 1:
           if self._debug:
               _ifconfig = self.ifconfig
               print("* Found DHCP Server:")
               print("IP: {}\nSubnet Mask: {}\nGW Addr: {}\nDNS Server: {}".format(*_ifconfig))
           return 0
       return -1


   def maintain_dhcp_lease(self):
       """Maintain DHCP lease"""
       if self._dhcp_client is not None:
           self._dhcp_client.maintain_dhcp_lease()


   def get_host_by_name(self, hostname):
       """Convert a hostname to a packed 4-byte IP Address.
       Returns a 4 bytearray.
       """
       if self._debug:
           print("* Get host by name")
       if isinstance(hostname, str):
           hostname = bytes(hostname, "utf-8")
       # Return IP assigned by DHCP
       _dns_client = dns.DNS(self, self._dns, debug=self._debug)
       ret = _dns_client.gethostbyname(hostname)
       if self._debug:
           print("* Resolved IP: ", ret)
       assert ret != -1, "Failed to resolve hostname!"
       return ret


   @property
   def max_sockets(self):
       """Returns max number of sockets supported by chip."""
       if self._chip_type == "w5500":
           return W5200_W5500_MAX_SOCK_NUM
       return -1


   @property
   def chip(self):
       """Returns the chip type."""
       return self._chip_type


   @property
   def ip_address(self):
       """Returns the configured IP address."""
       return self.read(REG_SIPR, 0x00, 4)


   def pretty_ip(self, ip):  # pylint: disable=no-self-use, invalid-name
       """Converts a bytearray IP address to a
       dotted-quad string for printing
       """
       return "%d.%d.%d.%d" % (ip[0], ip[1], ip[2], ip[3])


   def unpretty_ip(self, ip):  # pylint: disable=no-self-use, invalid-name
       """Converts a dotted-quad string to a bytearray IP address"""
       octets = [int(x) for x in ip.split(".")]
       return bytes(octets)


   @property
   def mac_address(self):
       """Returns the hardware's MAC address."""
       return self.read(REG_SHAR, 0x00, 6)


   @mac_address.setter
   def mac_address(self, address):
       """Sets the hardware MAC address.
       :param tuple address: Hardware MAC address.
       """
       self.write(REG_SHAR, 0x04, address)


   def pretty_mac(self, mac):  # pylint: disable=no-self-use, invalid-name
       """Converts a bytearray MAC address to a
       dotted-quad string for printing
       """
       return "%s:%s:%s:%s:%s:%s" % (
           hex(mac[0]),
           hex(mac[1]),
           hex(mac[2]),
           hex(mac[3]),
           hex(mac[4]),
           hex(mac[5]),
       )


   def remote_ip(self, socket_num):
       """Returns the IP address of the host who sent the current incoming packet.
       :param int socket num: Desired socket.
       """
       if socket_num >= self.max_sockets:
           return self._pbuff
       for octet in range(0, 4):
           self._pbuff[octet] = self._read_socket(socket_num, REG_SNDIPR + octet)[0]
       return self.pretty_ip(self._pbuff)


   @property
   def link_status(self):
       """ "Returns if the PHY is connected."""
       if self._chip_type == "w5500":
           data = self.read(REG_PHYCFGR, 0x00)
           return data[0] & 0x01
       return 0


   @property
   def link_speed(self):
       """ "Returns link speed."""
       if self._chip_type == "w5500":
           data = self.read(REG_PHYCFGR, 0x00)
           if data[0] & 0x01:  # link up
               if data[0] & 0x02:
                   return 100
               else:
                   return 10
       return None


   @property
   def link_full_duplex(self):
       """ "Returns if link is full duplex."""
       if self._chip_type == "w5500":
           data = self.read(REG_PHYCFGR, 0x00)
           if data[0] & 0x01:  # link up
               if data[0] & 0x04:
                   return True
               else:
                   return False
       return None


   def remote_port(self, socket_num):
       """Returns the port of the host who sent the current incoming packet."""
       if socket_num >= self.max_sockets:
           return self._pbuff
       for octet in range(0, 2):
           self._pbuff[octet] = self._read_socket(socket_num, REG_SNDPORT + octet)[0]
       return int((self._pbuff[0] << 8) | self._pbuff[0])

   @property
   def ifconfig(self):
       """Returns the network configuration as a tuple."""
       print('IFCONFIG')    
       return (self.ip_address, self.read(REG_SUBR, 0x00, 4), self.read(REG_GAR, 0x00, 4), self._dns)
       

   @ifconfig.setter
   def ifconfig(self, params):
       """Sets network configuration to provided tuple in format:
       (ip_address, subnet_mask, gateway_address, dns_server).
       """
       ip_address, subnet_mask, gateway_address, dns_server = params
       self.write(REG_SIPR, 0x04, ip_address)
       self.write(REG_SUBR, 0x04, subnet_mask)
       self.write(REG_GAR, 0x04, gateway_address)
       self._dns = dns_server


   def _w5100_init(self):
       """Initializes and detects a wiznet5k module."""
       time.sleep(1)
       self._cs.value(1)
       # Detect if chip is Wiznet W5500
       if self.detect_w5500() == 1:
           # perform w5500 initialization
           for i in range(0, W5200_W5500_MAX_SOCK_NUM):
               ctrl_byte = 0x0C + (i << 5)
               self.write(0x1E, ctrl_byte, 2)
               self.write(0x1F, ctrl_byte, 2)
       else:
           return 0
       return 1


   def detect_w5500(self):
       """Detects W5500 chip."""
       assert self.sw_reset() == 0, "Chip not reset properly!"
       self._write_mr(0x08)
       assert self._read_mr()[0] == 0x08, "Expected 0x08."
       self._write_mr(0x10)
       assert self._read_mr()[0] == 0x10, "Expected 0x10."
       self._write_mr(0x00)
       assert self._read_mr()[0] == 0x00, "Expected 0x00."
       if self.read(REG_VERSIONR_W5500, 0x00)[0] != 0x04:
           return -1
       self._chip_type = "w5500"
       self._ch_base_msb = 0x10
       return 1


   def sw_reset(self):
       """Performs a soft-reset on a Wiznet chip
       by writing to its MR register reset bit.
       """
       mode_reg = self._read_mr()
       self._write_mr(0x80)
       mode_reg = self._read_mr()
       if mode_reg[0] != 0x00:
           return -1
       return 0


   def _read_mr(self):
       """Reads from the Mode Register (MR)."""
       res = self.read(REG_MR, 0x00)
       return res


   def _write_mr(self, data):
       """Writes to the mode register (MR).
       :param int data: Data to write to the mode register.
       """
       self.write(REG_MR, 0x04, data)


   def read(self, addr, callback, length=1, buffer=None):
       """Reads data from a register address.
       :param int addr: Register address.
       """
       if self._debug:
           print(f'READ from {addr}')
       self._cs.value(0)
       self._device.write(bytes([addr >> 8]))  # pylint: disable=no-member
       self._device.write(bytes([addr & 0xFF]))  # pylint: disable=no-member
       self._device.write(bytes([callback]))  # pylint: disable=no-member
       if buffer is None:
           self._rxbuf = bytearray(length)
           self._device.readinto(self._rxbuf)  # pylint: disable=no-member
           self._cs.value(1)
           return self._rxbuf
       self._device.readinto(buffer, end=length)  # pylint: disable=no-member
       self._cs.value(1)
       return buffer


   def write(self, addr, callback, data):
       """Write data to a register address.
       :param int addr: Destination address.
       :param int callback: Callback reference.
       :param int data: Data to write, as an integer.
       :param bytearray data: Data to write, as a bytearray.
       """
       if self._debug:
           print(f'WRITE on {addr}')
       self._cs.value(0)
       self._device.write(bytes([addr >> 8]))  # pylint: disable=no-member
       self._device.write(bytes([addr & 0xFF]))  # pylint: disable=no-member
       self._device.write(bytes([callback]))  # pylint: disable=no-member

       if hasattr(data, "from_bytes"):
           self._device.write(bytes([data]))  # pylint: disable=no-member
       else:
           for i, _ in enumerate(data):
               self._device.write(bytes([data[i]]))  # pylint: disable=no-member
       self._cs.value(1)


   # Socket-Register API
   def udp_remaining(self):
       """Returns amount of bytes remaining in a udp socket."""
       if self._debug:
           print("* UDP Bytes Remaining: ", UDP_SOCK["bytes_remaining"])
       return UDP_SOCK["bytes_remaining"]


   def socket_available(self, socket_num, sock_type=SNMR_TCP):
       """Returns the amount of bytes to be read from the socket.
       :param int socket_num: Desired socket to return bytes from.
       :param int sock_type: Socket type, defaults to TCP.
       """
       if self._debug:
           print("* socket_available called on socket {}, protocol {}".format(socket_num, sock_type))
       assert socket_num <= self.max_sockets, "Provided socket exceeds max_sockets."
       res = self._get_rx_rcv_size(socket_num)
       if sock_type == SNMR_TCP:
           return res
       if res > 0:
           if UDP_SOCK["bytes_remaining"]:
               return UDP_SOCK["bytes_remaining"]
           # parse the udp rx packet
           # read the first 8 header bytes
           ret, self._pbuff = self.socket_read(socket_num, 8)
           if ret > 0:
               UDP_SOCK["remote_ip"] = self._pbuff[:4]
               UDP_SOCK["remote_port"] = (self._pbuff[4] << 8) + self._pbuff[5]
               UDP_SOCK["bytes_remaining"] = (self._pbuff[6] << 8) + self._pbuff[7]
               ret = UDP_SOCK["bytes_remaining"]
               return ret
       return 0


   def socket_status(self, socket_num):
       """Returns the socket connection status. Can be: SNSR_SOCK_CLOSED,
       SNSR_SOCK_INIT, SNSR_SOCK_LISTEN, SNSR_SOCK_SYNSENT, SNSR_SOCK_SYNRECV,
       SNSR_SYN_SOCK_ESTABLISHED, SNSR_SOCK_FIN_WAIT, SNSR_SOCK_CLOSING,
       SNSR_SOCK_TIME_WAIT, SNSR_SOCK_CLOSE_WAIT, SNSR_LAST_ACK,
       SNSR_SOCK_UDP, SNSR_SOCK_IPRAW, SNSR_SOCK_MACRAW, SNSR_SOCK_PPOE.
       """
       return self._read_snsr(socket_num)


   def socket_connect(self, socket_num, dest, port, conn_mode=SNMR_TCP):
       """Open and verify we've connected a socket to a dest IP address
       or hostname. By default, we use 'conn_mode'= SNMR_TCP but we
       may also use SNMR_UDP.
       """
       assert self.link_status, "Ethernet cable disconnected!"
       if self._debug:
           print("* w5k socket connect, protocol={}, port={}, ip={}".format(conn_mode, port, self.pretty_ip(dest)))
       # initialize a socket and set the mode
       res = self.socket_open(socket_num, conn_mode=conn_mode)
       if res == 1:
           raise RuntimeError("Failed to initalize a connection with the socket.")
       # set socket destination IP and port
       self._write_sndipr(socket_num, dest)
       self._write_sndport(socket_num, port)
       self._send_socket_cmd(socket_num, CMD_SOCK_CONNECT)

       if conn_mode == SNMR_TCP:
           # wait for tcp connection establishment
           while self.socket_status(socket_num)[0] != SNSR_SOCK_ESTABLISHED:
               time.sleep(0.001)
               if self._debug:
                   print("SN_SR:", self.socket_status(socket_num)[0])
               if self.socket_status(socket_num)[0] == SNSR_SOCK_CLOSED:
                   raise RuntimeError("Failed to establish connection.")
       elif conn_mode == SNMR_UDP:
           UDP_SOCK["bytes_remaining"] = 0
       return 1


   def _send_socket_cmd(self, socket, cmd):
       self._write_sncr(socket, cmd)
       while self._read_sncr(socket) != b"\x00":
           if self._debug:
               print("waiting for sncr to clear...")


   def get_socket(self):
       """Requests, allocates and returns a socket from the W5k
       chip. Returned socket number may not exceed max_sockets.
       """
       if self._debug:
           print("*** Get socket")
       sock = SOCKET_INVALID
       for _sock in range(self.max_sockets):
           status = self.socket_status(_sock)[0]
           if status == SNSR_SOCK_CLOSED:
               sock = _sock
               break
       if self._debug:
           print("Allocated socket #{}".format(sock))
       return sock


   def socket_listen(self, socket_num, port, conn_mode=SNMR_TCP):
       """Start listening on a socket (default TCP mode).
       :parm int socket_num: socket number
       :parm int port: port to listen on
       :parm int conn_mode: connection mode SNMR_TCP (default) or SNMR_UDP
       """
       assert self.link_status, "Ethernet cable disconnected!"
       if self._debug:
           print("* Listening on port={}, ip={}".format(port, self.pretty_ip(self.ip_address)))
       # Initialize a socket and set the mode
       self.src_port = port
       res = self.socket_open(socket_num, conn_mode=conn_mode)
       self.src_port = 0
       if res == 1:
           raise RuntimeError("Failed to initalize the socket.")
       # Send listen command
       self._send_socket_cmd(socket_num, CMD_SOCK_LISTEN)
       # Wait until ready
       status = [SNSR_SOCK_CLOSED]
       while status[0] not in (SNSR_SOCK_LISTEN, SNSR_SOCK_ESTABLISHED, SNSR_SOCK_UDP):
           status = self._read_snsr(socket_num)
           if status[0] == SNSR_SOCK_CLOSED:
               raise RuntimeError("Listening socket closed.")


   def socket_accept(self, socket_num):
       """Gets the dest IP and port from an incoming connection.
       Returns the next socket number so listening can continue
       :parm int socket_num: socket number
       """
       dest_ip = self.remote_ip(socket_num)
       dest_port = self.remote_port(socket_num)
       next_socknum = self.get_socket()
       if self._debug:
           print("* Dest is ({}, {}), Next listen socknum is #{}".format(dest_ip, dest_port, next_socknum))
       return next_socknum, (dest_ip, dest_port)


   def socket_open(self, socket_num, conn_mode=SNMR_TCP):
       """Opens a TCP or UDP socket. By default, we use
       'conn_mode'=SNMR_TCP but we may also use SNMR_UDP.
       """
       assert self.link_status, "Ethernet cable disconnected!"
       if self._debug:
           print("*** Opening socket %d" % socket_num)
       status = self._read_snsr(socket_num)[0]
       if status in (
           SNSR_SOCK_CLOSED,
           SNSR_SOCK_TIME_WAIT,
           SNSR_SOCK_FIN_WAIT,
           SNSR_SOCK_CLOSE_WAIT,
           SNSR_SOCK_CLOSING,
           SNSR_SOCK_UDP,
           SNSR_SOCK_LISTEN,
       ):
           if self._debug:
               print("* Opening W5k Socket, protocol={}".format(conn_mode))
           time.sleep(0.00025)
           self._write_snmr(socket_num, conn_mode)
           self._write_snir(socket_num, 0xFF)
           if self.src_port > 0:
               # write to socket source port
               self._write_sock_port(socket_num, self.src_port)
           else:
               s_port = randint(49152, 65535)
               while s_port in SRC_PORTS:
                   s_port = randint(49152, 65535)
               self._write_sock_port(socket_num, s_port)
               SRC_PORTS[socket_num] = s_port
           # open socket
           self._write_sncr(socket_num, CMD_SOCK_OPEN)
           self._read_sncr(socket_num)
           assert (self._read_snsr((socket_num))[0] == 0x13
               or self._read_snsr((socket_num))[0] == 0x22 ), "Could not open socket in TCP or UDP mode."
           return 0
       return 1


   def socket_close(self, socket_num):
       """Closes a socket."""
       if self._debug:
           print("*** Closing socket #%d" % socket_num)
       self._write_sncr(socket_num, CMD_SOCK_CLOSE)
       self._read_sncr(socket_num)


   def socket_disconnect(self, socket_num):
       """Disconnect a TCP connection."""
       if self._debug:
           print("*** Disconnecting socket #%d" % socket_num)
       self._write_sncr(socket_num, CMD_SOCK_DISCON)
       self._read_sncr(socket_num)


   def socket_read(self, socket_num, length):
       """Reads data from a socket into a buffer.
       Returns buffer.
       """
       assert self.link_status, "Ethernet cable disconnected!"
       assert socket_num <= self.max_sockets, "Provided socket exceeds max_sockets."
       # Check if there is data available on the socket
       ret = self._get_rx_rcv_size(socket_num)
       if self._debug:
           print("Bytes avail. on sock: ", ret)
       if ret == 0:
           # no data on socket?
           status = self._read_snmr(socket_num)
           if status in (SNSR_SOCK_LISTEN, SNSR_SOCK_CLOSED, SNSR_SOCK_CLOSE_WAIT):
               # remote end closed its side of the connection, EOF state
               ret = 0
               resp = 0
           else:
               # connection is alive, no data waiting to be read
               ret = -1
               resp = -1
       elif ret > length:
           # set ret to the length of buffer
           ret = length
       if ret > 0:
           if self._debug:
               print("\t * Processing {} bytes of data".format(ret))
           # Read the starting save address of the received data
           ptr = self._read_snrx_rd(socket_num)
           # Read data from the starting address of snrx_rd
           ctrl_byte = 0x18 + (socket_num << 5)
           resp = self.read(ptr, ctrl_byte, ret)
           #  After reading the received data, update Sn_RX_RD to the increased
           # value as many as the reading size.
           ptr += ret
           self._write_snrx_rd(socket_num, ptr)
           # Notify the W5k of the updated Sn_Rx_RD
           self._write_sncr(socket_num, CMD_SOCK_RECV)
           self._read_sncr(socket_num)
       return ret, resp


   def read_udp(self, socket_num, length):
       """Read UDP socket's remaining bytes."""
       if UDP_SOCK["bytes_remaining"] > 0:
           if UDP_SOCK["bytes_remaining"] <= length:
               ret, resp = self.socket_read(socket_num, UDP_SOCK["bytes_remaining"])
           else:
               ret, resp = self.socket_read(socket_num, length)
           if ret > 0:
               UDP_SOCK["bytes_remaining"] -= ret
           return ret, resp
       return -1


   def socket_write(self, socket_num, buffer, timeout=0):
       """Writes a bytearray to a provided socket."""
       assert self.link_status, "Ethernet cable disconnected!"
       assert socket_num <= self.max_sockets, "Provided socket exceeds max_sockets."
       status = 0
       ret = 0
       free_size = 0
       if len(buffer) > SOCK_SIZE:
           ret = SOCK_SIZE
       else:
           ret = len(buffer)
       stamp = time.time()
       # if buffer is available, start the transfer
       free_size = self._get_tx_free_size(socket_num)
       while free_size < ret:
           free_size = self._get_tx_free_size(socket_num)
           status = self.socket_status(socket_num)[0]
           if status not in (SNSR_SOCK_ESTABLISHED, SNSR_SOCK_CLOSE_WAIT) or (timeout and time.time() - stamp > timeout):
               ret = 0
               break
       # Read the starting address for saving the transmitting data.
       ptr = self._read_sntx_wr(socket_num)
       offset = ptr & 0x07FF
       dst_addr = offset + (socket_num * 2048 + 0x8000)
       # update sn_tx_wr to the value + data size
       ptr = (ptr + len(buffer)) & 0xFFFF
       self._write_sntx_wr(socket_num, ptr)
       cntl_byte = 0x14 + (socket_num << 5)
       self.write(dst_addr, cntl_byte, buffer)
       self._write_sncr(socket_num, CMD_SOCK_SEND)
       self._read_sncr(socket_num)
       # check data was  transferred correctly
       while (self._read_socket(socket_num, REG_SNIR)[0] & SNIR_SEND_OK) != SNIR_SEND_OK:
           if (self.socket_status(socket_num)[0] in (
                   SNSR_SOCK_CLOSED,
                   SNSR_SOCK_TIME_WAIT,
                   SNSR_SOCK_FIN_WAIT,
                   SNSR_SOCK_CLOSE_WAIT,
                   SNSR_SOCK_CLOSING,
               )
               or (timeout and time.time() - stamp > timeout)
           ):
               # self.socket_close(socket_num)
               return 0
           time.sleep(0.01)
       self._write_snir(socket_num, SNIR_SEND_OK)
       return ret


   # Socket-Register Methods

   def _get_rx_rcv_size(self, sock):
       """Get size of recieved and saved in socket buffer."""
       val = 0
       val_1 = self._read_snrx_rsr(sock)
       while val != val_1:
           val_1 = self._read_snrx_rsr(sock)
           if val_1 != 0:
               val = self._read_snrx_rsr(sock)
       return int.from_bytes(val, "b")


   def _get_tx_free_size(self, sock):
       """Get free size of sock's tx buffer block."""
       val = 0
       val_1 = self._read_sntx_fsr(sock)
       while val != val_1:
           val_1 = self._read_sntx_fsr(sock)
           if val_1 != 0:
               val = self._read_sntx_fsr(sock)
       return int.from_bytes(val, "b")


   def _read_snrx_rd(self, sock):
       self._pbuff[0] = self._read_socket(sock, REG_SNRX_RD)[0]
       self._pbuff[1] = self._read_socket(sock, REG_SNRX_RD + 1)[0]
       return self._pbuff[0] << 8 | self._pbuff[1]


   def _write_snrx_rd(self, sock, data):
       self._write_socket(sock, REG_SNRX_RD, data >> 8 & 0xFF)
       self._write_socket(sock, REG_SNRX_RD + 1, data & 0xFF)


   def _write_sntx_wr(self, sock, data):
       self._write_socket(sock, REG_SNTX_WR, data >> 8 & 0xFF)
       self._write_socket(sock, REG_SNTX_WR + 1, data & 0xFF)


   def _read_sntx_wr(self, sock):
       self._pbuff[0] = self._read_socket(sock, 0x0024)[0]
       self._pbuff[1] = self._read_socket(sock, 0x0024 + 1)[0]
       return self._pbuff[0] << 8 | self._pbuff[1]


   def _read_sntx_fsr(self, sock):
       data = self._read_socket(sock, REG_SNTX_FSR)
       data += self._read_socket(sock, REG_SNTX_FSR + 1)
       return data


   def _read_snrx_rsr(self, sock):
       data = self._read_socket(sock, REG_SNRX_RSR)
       data += self._read_socket(sock, REG_SNRX_RSR + 1)
       return data


   def _write_sndipr(self, sock, ip_addr):
       """Writes to socket destination IP Address."""
       for octet in range(0, 4):
           self._write_socket(sock, REG_SNDIPR + octet, ip_addr[octet])


   def _write_sndport(self, sock, port):
       """Writes to socket destination port."""
       self._write_socket(sock, REG_SNDPORT, port >> 8)
       self._write_socket(sock, REG_SNDPORT + 1, port & 0xFF)


   def _read_snsr(self, sock):
       """Reads Socket n Status Register."""
       return self._read_socket(sock, REG_SNSR)


   def _write_snmr(self, sock, protocol):
       """Write to Socket n Mode Register."""
       self._write_socket(sock, REG_SNMR, protocol)


   def _write_snir(self, sock, data):
       """Write to Socket n Interrupt Register."""
       self._write_socket(sock, REG_SNIR, data)


   def _write_sock_port(self, sock, port):
       """Write to the socket port number."""
       self._write_socket(sock, REG_SNPORT, port >> 8)
       self._write_socket(sock, REG_SNPORT + 1, port & 0xFF)


   def _write_sncr(self, sock, data):
       self._write_socket(sock, REG_SNCR, data)


   def _read_sncr(self, sock):
       return self._read_socket(sock, REG_SNCR)


   def _read_snmr(self, sock):
       return self._read_socket(sock, REG_SNMR)


   def _write_socket(self, sock, address, data):
       """Write to a W5k socket register."""
       base = self._ch_base_msb << 8
       cntl_byte = (sock << 5) + 0x0C
       return self.write(base + sock * CH_SIZE + address, cntl_byte, data)


   def _read_socket(self, sock, address):
       """Read a W5k socket register."""
       cntl_byte = (sock << 5) + 0x08
       return self.read(address, cntl_byte)

Four, drive 2

wiznet5k_dhcp.py

# SPDX-FileCopyrightText: 2009 Jordan Terell (blog.jordanterrell.com)
# SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries
# SPDX-FileCopyrightText: 2021 Patrick Van Oosterwijck @ Silicognition LLC
# SPDX-FileCopyrightText: 2021 Vincenzo D'Angelo
#
# SPDX-License-Identifier: MIT

"""
`wiznet5k_dhcp`
================================================================================
Pure-Python implementation of Jordan Terrell's DHCP library v0.3
* Author(s): Jordan Terrell, Brent Rubell, Vincenzo D'Angelo
"""
import gc
import time
import wiznet5k_socket as socket

from random import randint
from micropython import const
from wiznet5k_socket import htonl, htons


# DHCP State Machine
STATE_DHCP_START = const(0x00)
STATE_DHCP_DISCOVER = const(0x01)
STATE_DHCP_REQUEST = const(0x02)
STATE_DHCP_LEASED = const(0x03)
STATE_DHCP_REREQUEST = const(0x04)
STATE_DHCP_RELEASE = const(0x05)
STATE_DHCP_WAIT = const(0x06)
STATE_DHCP_DISCONN = const(0x07)

# DHCP wait time between attempts
DHCP_WAIT_TIME = const(60)

# DHCP Message Types
DHCP_DISCOVER = const(1)
DHCP_OFFER = const(2)
DHCP_REQUEST = const(3)
DHCP_DECLINE = const(4)
DHCP_ACK = const(5)
DHCP_NAK = const(6)
DHCP_RELEASE = const(7)
DHCP_INFORM = const(8)

# DHCP Message OP Codes
DHCP_BOOT_REQUEST = const(0x01)
DHCP_BOOT_REPLY = const(0x02)

DHCP_HTYPE10MB = const(0x01)
DHCP_HTYPE100MB = const(0x02)

DHCP_HLENETHERNET = const(0x06)
DHCP_HOPS = const(0x00)

MAGIC_COOKIE = const(0x63825363)
MAX_DHCP_OPT = const(0x10)

# Default DHCP Server port
DHCP_SERVER_PORT = const(67)
# DHCP Lease Time, in seconds
DEFAULT_LEASE_TIME = const(900)
BROADCAST_SERVER_ADDR = (255, 255, 255, 255)

# DHCP Response Options
MSG_TYPE = 53
SUBNET_MASK = 1
ROUTERS_ON_SUBNET = 3
DNS_SERVERS = 6
DHCP_SERVER_ID = 54
T1_VAL = 58
T2_VAL = 59
LEASE_TIME = 51
OPT_END = 255

# Packet buffer
_BUFF = bytearray(318)


class DHCP:
   """W5k DHCP Client implementation.
   :param eth: Wiznet 5k object
   :param list mac_address: Hardware MAC.
   :param str hostname: The desired hostname, with optional {} to fill in MAC.
   :param int response_timeout: DHCP Response timeout.
   :param bool debug: Enable debugging output.
   """

   # pylint: disable=too-many-arguments, too-many-instance-attributes, invalid-name
   def __init__(
       self, eth, mac_address, hostname=None, response_timeout=30, debug=False
   ):
       self._debug = debug
       self._response_timeout = response_timeout
       self._mac_address = mac_address

       # Set socket interface
       socket.set_interface(eth)
       self._eth = eth
       self._sock = None

       # DHCP state machine
       self._dhcp_state = STATE_DHCP_START
       self._initial_xid = 0
       self._transaction_id = 0
       self._start_time = 0

       # DHCP server configuration
       self.dhcp_server_ip = BROADCAST_SERVER_ADDR
       self.local_ip = 0
       self.gateway_ip = 0
       self.subnet_mask = 0
       self.dns_server_ip = 0

       # Lease configuration
       self._lease_time = 0
       self._last_lease_time = 0
       self._renew_in_sec = 0
       self._rebind_in_sec = 0
       self._t1 = 0
       self._t2 = 0

       # Select an initial transaction id
       self._transaction_id = randint(1, 0x7FFFFFFF)

       # Host name
       mac_string = "".join("{:02X}".format(o) for o in mac_address)
       self._hostname = bytes(
           (hostname or "WIZnet{}").split(".")[0].format(mac_string)[:42], "utf-8"
       )

   # pylint: disable=too-many-statements
   def send_dhcp_message(self, state, time_elapsed, renew=False):
       """Assemble and send a DHCP message packet to a socket.
       :param int state: DHCP Message state.
       :param float time_elapsed: Number of seconds elapsed since DHCP process started
       :param bool renew: Set True for renew and rebind
       """
       _BUFF[:] = b"\x00" * len(_BUFF)
       # OP
       _BUFF[0] = DHCP_BOOT_REQUEST
       # HTYPE
       _BUFF[1] = DHCP_HTYPE10MB
       # HLEN
       _BUFF[2] = DHCP_HLENETHERNET
       # HOPS
       _BUFF[3] = DHCP_HOPS

       # Transaction ID (xid)
       self._initial_xid = htonl(self._transaction_id)
       self._initial_xid = self._initial_xid.to_bytes(4, "l")
       _BUFF[4:7] = self._initial_xid

       # seconds elapsed
       _BUFF[8] = (int(time_elapsed) & 0xFF00) >> 8
       _BUFF[9] = int(time_elapsed) & 0x00FF

       # flags
       flags = htons(0x8000)
       flags = flags.to_bytes(2, "b")
       _BUFF[10] = flags[1]
       _BUFF[11] = flags[0]

       # NOTE: Skipping ciaddr/yiaddr/siaddr/giaddr
       # as they're already set to 0.0.0.0
       # Except when renewing, then fill in ciaddr
       if renew:
           _BUFF[12:15] = bytes(self.local_ip)

       # chaddr
       _BUFF[28:34] = self._mac_address

       # NOTE:  192 octets of 0's, BOOTP legacy

       # Magic Cookie
       _BUFF[236] = (MAGIC_COOKIE >> 24) & 0xFF
       _BUFF[237] = (MAGIC_COOKIE >> 16) & 0xFF
       _BUFF[238] = (MAGIC_COOKIE >> 8) & 0xFF
       _BUFF[239] = MAGIC_COOKIE & 0xFF

       # Option - DHCP Message Type
       _BUFF[240] = 53
       _BUFF[241] = 0x01
       _BUFF[242] = state

       # Option - Client Identifier
       _BUFF[243] = 61
       # Length
       _BUFF[244] = 0x07
       # HW Type - ETH
       _BUFF[245] = 0x01
       # Client MAC Address
       for mac in range(0, len(self._mac_address)):
           _BUFF[246 + mac] = self._mac_address[mac]

       # Option - Host Name
       _BUFF[252] = 12
       hostname_len = len(self._hostname)
       after_hostname = 254 + hostname_len
       _BUFF[253] = hostname_len
       _BUFF[254:after_hostname] = self._hostname

       if state == DHCP_REQUEST and not renew:
           # Set the parsed local IP addr
           _BUFF[after_hostname] = 50
           _BUFF[after_hostname + 1] = 0x04
           _BUFF[after_hostname + 2 : after_hostname + 6] = bytes(self.local_ip)
           # Set the parsed dhcp server ip addr
           _BUFF[after_hostname + 6] = 54
           _BUFF[after_hostname + 7] = 0x04
           _BUFF[after_hostname + 8 : after_hostname + 12] = bytes(self.dhcp_server_ip)

       _BUFF[after_hostname + 12] = 55
       _BUFF[after_hostname + 13] = 0x06
       # subnet mask
       _BUFF[after_hostname + 14] = 1
       # routers on subnet
       _BUFF[after_hostname + 15] = 3
       # DNS
       _BUFF[after_hostname + 16] = 6
       # domain name
       _BUFF[after_hostname + 17] = 15
       # renewal (T1) value
       _BUFF[after_hostname + 18] = 58
       # rebinding (T2) value
       _BUFF[after_hostname + 19] = 59
       _BUFF[after_hostname + 20] = 255

       # Send DHCP packet
       self._sock.send(_BUFF)

   # pylint: disable=too-many-branches, too-many-statements
   def parse_dhcp_response(self):
       """Parse DHCP response from DHCP server.
       Returns DHCP packet type.
       """
       # store packet in buffer
       _BUFF = self._sock.recv()
       if self._debug:
           print("DHCP Response: ", _BUFF)

       # -- Parse Packet, FIXED -- #
       # Validate OP
       assert (
           _BUFF[0] == DHCP_BOOT_REPLY
       ), "Malformed Packet - \
           DHCP message OP is not expected BOOT Reply."

       xid = _BUFF[4:8]
       if bytes(xid) < self._initial_xid:
           print("f")
           return 0, 0

       self.local_ip = tuple(_BUFF[16:20])
       if _BUFF[28:34] == 0:
           return 0, 0

       if int.from_bytes(_BUFF[235:240], "l") != MAGIC_COOKIE:
           return 0, 0

       # -- Parse Packet, VARIABLE -- #
       ptr = 240
       while _BUFF[ptr] != OPT_END:
           if _BUFF[ptr] == MSG_TYPE:
               ptr += 1
               opt_len = _BUFF[ptr]
               ptr += opt_len
               msg_type = _BUFF[ptr]
               ptr += 1
           elif _BUFF[ptr] == SUBNET_MASK:
               ptr += 1
               opt_len = _BUFF[ptr]
               ptr += 1
               self.subnet_mask = tuple(_BUFF[ptr : ptr + opt_len])
               ptr += opt_len
           elif _BUFF[ptr] == DHCP_SERVER_ID:
               ptr += 1
               opt_len = _BUFF[ptr]
               ptr += 1
               self.dhcp_server_ip = tuple(_BUFF[ptr : ptr + opt_len])
               ptr += opt_len
           elif _BUFF[ptr] == LEASE_TIME:
               ptr += 1
               opt_len = _BUFF[ptr]
               ptr += 1
               self._lease_time = int.from_bytes(_BUFF[ptr : ptr + opt_len], "l")
               ptr += opt_len
           elif _BUFF[ptr] == ROUTERS_ON_SUBNET:
               ptr += 1
               opt_len = _BUFF[ptr]
               ptr += 1
               self.gateway_ip = tuple(_BUFF[ptr : ptr + opt_len])
               ptr += opt_len
           elif _BUFF[ptr] == DNS_SERVERS:
               ptr += 1
               opt_len = _BUFF[ptr]
               ptr += 1
               self.dns_server_ip = tuple(_BUFF[ptr : ptr + 4])
               ptr += opt_len  # still increment even though we only read 1 addr.
           elif _BUFF[ptr] == T1_VAL:
               ptr += 1
               opt_len = _BUFF[ptr]
               ptr += 1
               self._t1 = int.from_bytes(_BUFF[ptr : ptr + opt_len], "l")
               ptr += opt_len
           elif _BUFF[ptr] == T2_VAL:
               ptr += 1
               opt_len = _BUFF[ptr]
               ptr += 1
               self._t2 = int.from_bytes(_BUFF[ptr : ptr + opt_len], "l")
               ptr += opt_len
           elif _BUFF[ptr] == 0:
               break
           else:
               # We're not interested in this option
               ptr += 1
               opt_len = _BUFF[ptr]
               ptr += 1
               # no-op
               ptr += opt_len

       if self._debug:
           print(
               "Msg Type: {}\nSubnet Mask: {}\nDHCP Server IP: {}\nDNS Server IP: {}\
                 \nGateway IP: {}\nLocal IP: {}\nT1: {}\nT2: {}\nLease Time: {}".format(
                   msg_type,
                   self.subnet_mask,
                   self.dhcp_server_ip,
                   self.dns_server_ip,
                   self.gateway_ip,
                   self.local_ip,
                   self._t1,
                   self._t2,
                   self._lease_time,
               )
           )

       gc.collect()
       return msg_type, xid

   # pylint: disable=too-many-branches, too-many-statements
   def _dhcp_state_machine(self):
       """DHCP state machine without wait loops to enable cooperative multi tasking
       This state machine is used both by the initial blocking lease request and
       the non-blocking DHCP maintenance function"""
       if self._eth.link_status:
           if self._dhcp_state == STATE_DHCP_DISCONN:
               self._dhcp_state = STATE_DHCP_START
       else:
           if self._dhcp_state != STATE_DHCP_DISCONN:
               self._dhcp_state = STATE_DHCP_DISCONN
               self.dhcp_server_ip = BROADCAST_SERVER_ADDR
               self._last_lease_time = 0
               reset_ip = (0, 0, 0, 0)
               self._eth.ifconfig = (reset_ip, reset_ip, reset_ip, reset_ip)
               if self._sock is not None:
                   self._sock.close()
                   self._sock = None

       if self._dhcp_state == STATE_DHCP_START:
           self._start_time = time.time()
           self._transaction_id = (self._transaction_id + 1) & 0x7FFFFFFF
           try:
               self._sock = socket.socket(type=socket.SOCK_DGRAM)
           except RuntimeError:
               if self._debug:
                   print("* DHCP: Failed to allocate socket")
               self._dhcp_state = STATE_DHCP_WAIT
           else:
               self._sock.settimeout(self._response_timeout)
               self._sock.bind((None, 68))
               self._sock.connect((self.dhcp_server_ip, DHCP_SERVER_PORT))
               if self._last_lease_time == 0 or time.time() > (
                   self._last_lease_time + self._lease_time
               ):
                   if self._debug:
                       print("* DHCP: Send discover to {}".format(self.dhcp_server_ip))
                   self.send_dhcp_message(
                       STATE_DHCP_DISCOVER, (time.time() - self._start_time)
                   )
                   self._dhcp_state = STATE_DHCP_DISCOVER
               else:
                   if self._debug:
                       print("* DHCP: Send request to {}".format(self.dhcp_server_ip))
                   self.send_dhcp_message(
                       DHCP_REQUEST, (time.time() - self._start_time), True
                   )
                   self._dhcp_state = STATE_DHCP_REQUEST

       elif self._dhcp_state == STATE_DHCP_DISCOVER:
           if self._sock.available():
               if self._debug:
                   print("* DHCP: Parsing OFFER")
               msg_type, xid = self.parse_dhcp_response()
               if msg_type == DHCP_OFFER:
                   # Check if transaction ID matches, otherwise it may be an offer
                   # for another device
                   if htonl(self._transaction_id) == int.from_bytes(xid, "l"):
                       if self._debug:
                           print(
                               "* DHCP: Send request to {}".format(self.dhcp_server_ip)
                           )
                       self._transaction_id = (self._transaction_id + 1) & 0x7FFFFFFF
                       self.send_dhcp_message(
                           DHCP_REQUEST, (time.time() - self._start_time)
                       )
                       self._dhcp_state = STATE_DHCP_REQUEST
                   else:
                       if self._debug:
                           print("* DHCP: Received OFFER with non-matching xid")
               else:
                   if self._debug:
                       print("* DHCP: Received DHCP Message is not OFFER")

       elif self._dhcp_state == STATE_DHCP_REQUEST:
           if self._sock.available():
               if self._debug:
                   print("* DHCP: Parsing ACK")
               msg_type, xid = self.parse_dhcp_response()
               # Check if transaction ID matches, otherwise it may be
               # for another device
               if htonl(self._transaction_id) == int.from_bytes(xid, "l"):
                   if msg_type == DHCP_ACK:
                       if self._debug:
                           print("* DHCP: Successful lease")
                       self._sock.close()
                       self._sock = None
                       self._dhcp_state = STATE_DHCP_LEASED
                       self._last_lease_time = self._start_time
                       if self._lease_time == 0:
                           self._lease_time = DEFAULT_LEASE_TIME
                       if self._t1 == 0:
                           # T1 is 50% of _lease_time
                           self._t1 = self._lease_time >> 1
                       if self._t2 == 0:
                           # T2 is 87.5% of _lease_time
                           self._t2 = self._lease_time - (self._lease_time >> 3)
                       self._renew_in_sec = self._t1
                       self._rebind_in_sec = self._t2
                       self._eth.ifconfig = (
                           self.local_ip,
                           self.subnet_mask,
                           self.gateway_ip,
                           self.dns_server_ip,
                       )
                       gc.collect()
                   else:
                       if self._debug:
                           print("* DHCP: Received DHCP Message is not ACK")
               else:
                   if self._debug:
                       print("* DHCP: Received non-matching xid")

       elif self._dhcp_state == STATE_DHCP_WAIT:
           if time.time() > (self._start_time + DHCP_WAIT_TIME):
               if self._debug:
                   print("* DHCP: Begin retry")
               self._dhcp_state = STATE_DHCP_START
               if time.time() > (self._last_lease_time + self._rebind_in_sec):
                   self.dhcp_server_ip = BROADCAST_SERVER_ADDR
               if time.time() > (self._last_lease_time + self._lease_time):
                   reset_ip = (0, 0, 0, 0)
                   self._eth.ifconfig = (reset_ip, reset_ip, reset_ip, reset_ip)

       elif self._dhcp_state == STATE_DHCP_LEASED:
           if time.time() > (self._last_lease_time + self._renew_in_sec):
               self._dhcp_state = STATE_DHCP_START
               if self._debug:
                   print("* DHCP: Time to renew lease")

       if (
           self._dhcp_state == STATE_DHCP_DISCOVER
           or self._dhcp_state == STATE_DHCP_REQUEST
       ) and time.time() > (self._start_time + self._response_timeout):
           self._dhcp_state = STATE_DHCP_WAIT
           if self._sock is not None:
               self._sock.close()
               self._sock = None

   def request_dhcp_lease(self):
       """Request to renew or acquire a DHCP lease."""
       if self._dhcp_state == STATE_DHCP_LEASED or self._dhcp_state == STATE_DHCP_WAIT:
           self._dhcp_state = STATE_DHCP_START

       while (
           self._dhcp_state != STATE_DHCP_LEASED
           and self._dhcp_state != STATE_DHCP_WAIT
       ):
           self._dhcp_state_machine()

       return self._dhcp_state == STATE_DHCP_LEASED

   def maintain_dhcp_lease(self):
       """Maintain DHCP lease"""
       self._dhcp_state_machine()

Five, drive 3

wiznet5k_dns.py

# SPDX-FileCopyrightText: 2009-2010 MCQN Ltd
# SPDX-FileCopyrightText: Brent Rubell for Adafruit Industries
# SPDX-FileCopyrightText: 2021 Vincenzo D'Angelo
#
# SPDX-License-Identifier: MIT

"""
`wiznet5k_dns`
================================================================================
Pure-Python implementation of the Micropython on ESP32 DNS client for WIZnet 5k-based
ethernet modules.
* Author(s): MCQN Ltd, Brent Rubell, Vincenzo D'Angelo
"""
import time
import wiznet5k_socket as socket

from wiznet5k_socket import htons
from random import getrandbits
from micropython import const


QUERY_FLAG = const(0x00)
OPCODE_STANDARD_QUERY = const(0x00)
RECURSION_DESIRED_FLAG = 1 << 8

TYPE_A = const(0x0001)
CLASS_IN = const(0x0001)
DATA_LEN = const(0x0004)

# Return codes for gethostbyname
SUCCESS = const(1)
TIMED_OUT = const(-1)
INVALID_SERVER = const(-2)
TRUNCATED = const(-3)
INVALID_RESPONSE = const(-4)

DNS_PORT = const(0x35)  # port used for DNS request


class DNS:
   """W5K DNS implementation.
   :param iface: Network interface
   """

   def __init__(self, iface, dns_address, debug=False):
       self._debug = debug
       self._iface = iface
       socket.set_interface(iface)
       self._sock = socket.socket(type=socket.SOCK_DGRAM)
       self._sock.settimeout(1)

       self._dns_server = dns_address
       self._host = 0
       self._request_id = 0  # request identifier
       self._pkt_buf = bytearray()


   def gethostbyname(self, hostname):
       """Translate a host name to IPv4 address format.
       :param str hostname: Desired host name to connect to.
       Returns the IPv4 address as a bytearray if successful, -1 otherwise.
       """
       if self._dns_server is None:
           return INVALID_SERVER
       self._host = hostname
       # build DNS request packet
       self._build_dns_header()
       self._build_dns_question()

       # Send DNS request packet
       self._sock.bind((None, DNS_PORT))
       self._sock.connect((self._dns_server, DNS_PORT))
       if self._debug:
           print("* DNS: Sending request packet...")
       self._sock.send(self._pkt_buf)

       # wait and retry 3 times for a response
       retries = 0
       addr = -1
       while (retries < 5) and (addr == -1):
           addr = self._parse_dns_response()
           if addr == -1 and self._debug:
               print("* DNS ERROR: Failed to resolve DNS response, retrying...")
           retries += 1

       self._sock.close()
       return addr


   # pylint: disable=too-many-return-statements, too-many-branches, too-many-statements, too-many-locals
   def _parse_dns_response(self):
       """Receives and parses DNS query response.
       Returns desired hostname address if obtained, -1 otherwise.
       """
       # wait for a response
       start_time = time.time()
       packet_sz = self._sock.available()
       while packet_sz <= 0:
           packet_sz = self._sock.available()
           if (time.time() - start_time) > 1.0:
               if self._debug:
                   print("* DNS ERROR: Did not receive DNS response!")
               return -1
           time.sleep(0.05)
       # recv packet into buf
       self._pkt_buf = self._sock.recv()

       if self._debug:
           print("DNS Packet Received: ", self._pkt_buf)

       # Validate request identifier
       xid = int.from_bytes(self._pkt_buf[0:2], "l")
       if not xid == self._request_id:
           if self._debug:
               print("* DNS ERROR: Received request identifer {} does not match expected {}".format(xid, self._request_id))
           return -1
       # Validate flags
       flags = int.from_bytes(self._pkt_buf[2:4], "l")
       if not flags in (0x8180, 0x8580):
           if self._debug:
               print("* DNS ERROR: Invalid flags, ", flags)
           return -1
       # Number of questions
       qr_count = int.from_bytes(self._pkt_buf[4:6], "l")
       if not qr_count >= 1:
           if self._debug:
               print("* DNS ERROR: Question count >=1, ", qr_count)
           return -1
       # Number of answers
       an_count = int.from_bytes(self._pkt_buf[6:8], "l")
       if self._debug:
           print("* DNS Answer Count: ", an_count)
       if not an_count >= 1:
           return -1

       # Parse query
       ptr = 12
       name_len = 1
       while name_len > 0:
           # read the length of the name
           name_len = self._pkt_buf[ptr]
           if name_len == 0x00:
               # we reached the end of this name
               ptr += 1  # inc. pointer by 0x00
               break
           # advance pointer
           ptr += name_len + 1

       # Validate Query is Type A
       q_type = int.from_bytes(self._pkt_buf[ptr : ptr + 2], "l")
       if not q_type == TYPE_A:
           if self._debug:
               print("* DNS ERROR: Incorrect Query Type: ", q_type)
           return -1
       ptr += 2

       # Validate Query is Type A
       q_class = int.from_bytes(self._pkt_buf[ptr : ptr + 2], "l")
       if not q_class == TYPE_A:
           if self._debug:
               print("* DNS ERROR: Incorrect Query Class: ", q_class)
           return -1
       ptr += 2

       # Let's take the first type-a answer
       if self._pkt_buf[ptr] != 0xC0:
           return -1
       ptr += 1

       if self._pkt_buf[ptr] != 0xC:
           return -1
       ptr += 1

       # Validate Answer Type A
       ans_type = int.from_bytes(self._pkt_buf[ptr : ptr + 2], "l")
       if not ans_type == TYPE_A:
           if self._debug:
               print("* DNS ERROR: Incorrect Answer Type: ", ans_type)
           return -1
       ptr += 2

       # Validate Answer Class IN
       ans_class = int.from_bytes(self._pkt_buf[ptr : ptr + 2], "l")
       if not ans_class == TYPE_A:
           if self._debug:
               print("* DNS ERROR: Incorrect Answer Class: ", ans_class)
           return -1
       ptr += 2

       # skip over TTL
       ptr += 4

       # Validate addr is IPv4
       data_len = int.from_bytes(self._pkt_buf[ptr : ptr + 2], "l")
       if not data_len == DATA_LEN:
           if self._debug:
               print("* DNS ERROR: Unexpected Data Length: ", data_len)
           return -1
       ptr += 2
       # Return address
       return self._pkt_buf[ptr : ptr + 4]


   def _build_dns_header(self):
       """Builds DNS header."""
       # generate a random, 16-bit, request identifier
       self._request_id = getrandbits(16)

       # ID, 16-bit identifier
       self._pkt_buf.append(self._request_id >> 8)
       self._pkt_buf.append(self._request_id & 0xFF)

       # Flags (0x0100)
       self._pkt_buf.append(0x01)
       self._pkt_buf.append(0x00)

       # QDCOUNT
       self._pkt_buf.append(0x00)
       self._pkt_buf.append(0x01)
       # ANCOUNT
       self._pkt_buf.append(0x00)
       self._pkt_buf.append(0x00)
       # NSCOUNT
       self._pkt_buf.append(0x00)
       self._pkt_buf.append(0x00)
       # ARCOUNT
       self._pkt_buf.append(0x00)
       self._pkt_buf.append(0x00)


   def _build_dns_question(self):
       """Build DNS question"""
       host = self._host.decode("utf-8")
       host = host.split(".")
       # write out each section of host
       for i, _ in enumerate(host):
           # append the sz of the section
           self._pkt_buf.append(len(host[i]))
           # append the section data
           self._pkt_buf += host[i]
       # end of the name
       self._pkt_buf.append(0x00)
       # Type A record
       self._pkt_buf.append(htons(TYPE_A) & 0xFF)
       self._pkt_buf.append(htons(TYPE_A) >> 8)
       # Class IN
       self._pkt_buf.append(htons(CLASS_IN) & 0xFF)
       self._pkt_buf.append(htons(CLASS_IN) >> 8)

6. Drive 4

wiznet5k_socket.py

# SPDX-FileCopyrightText: 2019 ladyada for Adafruit Industries
# SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries
# SPDX-FileCopyrightText: 2021 Vincenzo D'Angelo
#
# SPDX-License-Identifier: MIT

"""
`wiznet5k_socket`
================================================================================
A socket compatible interface with the Wiznet5k module.
* Author(s): ladyada, Brent Rubell, Patrick Van Oosterwijck, Adam Cummick, Vincenzo D'Angelo
"""

import gc
import time
import wiznet5k as wiznet5k

from micropython import const


_the_interface = None  # pylint: disable=invalid-name

def set_interface(iface):
   """Helper to set the global internet interface."""
   global _the_interface  # pylint: disable=global-statement, invalid-name
   _the_interface = iface


def htonl(x):
   """Convert 32-bit positive integers from host to network byte order."""
   return (
       ((x) << 24 & 0xFF000000)
       | ((x) << 8 & 0x00FF0000)
       | ((x) >> 8 & 0x0000FF00)
       | ((x) >> 24 & 0x000000FF)
   )


def htons(x):
   """Convert 16-bit positive integers from host to network byte order."""
   return (((x) << 8) & 0xFF00) | (((x) >> 8) & 0xFF)


SOCK_STREAM = const(0x21)  # TCP
TCP_MODE = 80
SOCK_DGRAM = const(0x02)  # UDP
AF_INET = const(3)
SOCKET_INVALID = const(255)


# pylint: disable=too-many-arguments, unused-argument
def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0):
   """Translate the host/port argument into a sequence of 5-tuples that
   contain all the necessary arguments for creating a socket connected to that service.
   """
   if not isinstance(port, int):
       raise RuntimeError("Port must be an integer")
   if is_ipv4(host):
       return [(AF_INET, socktype, proto, "", (host, port))]
   return [(AF_INET, socktype, proto, "", (gethostbyname(host), port))]


def gethostbyname(hostname):
   """Translate a host name to IPv4 address format. The IPv4 address
   is returned as a string.
   :param str hostname: Desired hostname.
   """
   addr = _the_interface.get_host_by_name(hostname)
   addr = "{}.{}.{}.{}".format(addr[0], addr[1], addr[2], addr[3])
   return addr


def is_ipv4(host):
   """Checks if a host string is an IPv4 address.
   :param str host: host's name or ip
   """
   octets = host.split(".", 3)
   if len(octets) != 4 or not "".join(octets).isdigit():
       return False
   for octet in octets:
       if int(octet) > 255:
           return False
   return True


# pylint: disable=invalid-name, too-many-public-methods
class socket:
   """A simplified implementation of the Python 'socket' class
   for connecting to a Wiznet5k module.
   :param int family: Socket address (and protocol) family.
   :param int type: Socket type.
   """

   # pylint: disable=redefined-builtin,unused-argument
   def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None, socknum=None):
       if family != AF_INET:
           raise RuntimeError("Only AF_INET family supported by W5K modules.")
       self._sock_type = type
       self._buffer = b""
       self._timeout = 0
       self._listen_port = None

       self._socknum = _the_interface.get_socket()
       if self._socknum == SOCKET_INVALID:
           raise RuntimeError("Failed to allocate socket.")

   def __enter__(self):
       return self

   def __exit__(self, exc_type, exc_val, exc_tb):
       if self._sock_type == SOCK_STREAM:
           self.disconnect()
           stamp = time.time()
           while self.status == wiznet5k.SNSR_SOCK_FIN_WAIT:
               if time.time() - stamp > 1000:
                   raise RuntimeError("Failed to disconnect socket")
       self.close()
       stamp = time.time()
       while self.status != wiznet5k.SNSR_SOCK_CLOSED:
           if time.time() - stamp > 1000:
               raise RuntimeError("Failed to close socket")

   @property
   def socknum(self):
       """Returns the socket object's socket number."""
       return self._socknum

   @property
   def status(self):
       """Returns the status of the socket"""
       return _the_interface.socket_status(self.socknum)[0]

   @property
   def connected(self):
       """Returns whether or not we are connected to the socket."""
       if self.socknum >= _the_interface.max_sockets:
           return False
       status = _the_interface.socket_status(self.socknum)[0]
       if (
           status == wiznet5k.SNSR_SOCK_CLOSE_WAIT
           and self.available() == 0
       ):
           result = False
       else:
           result = status not in (
               wiznet5k.SNSR_SOCK_CLOSED,
               wiznet5k.SNSR_SOCK_LISTEN,
               wiznet5k.SNSR_SOCK_TIME_WAIT,
               wiznet5k.SNSR_SOCK_FIN_WAIT,
           )
       if not result and status != wiznet5k.SNSR_SOCK_LISTEN:
           self.close()
       return result

   def getpeername(self):
       """Return the remote address to which the socket is connected."""
       return _the_interface.remote_ip(self.socknum)

   def inet_aton(self, ip_string):
       """Convert an IPv4 address from dotted-quad string format.
       :param str ip_string: IP Address, as a dotted-quad string.
       """
       self._buffer = b""
       self._buffer = [int(item) for item in ip_string.split(".")]
       self._buffer = bytearray(self._buffer)
       return self._buffer

   def bind(self, address):
       """Bind the socket to the listen port, if host is specified the interface
       will be reconfigured to that IP.
       :param tuple address: local socket as a (host, port) tuple.
       """
       if address[0] is not None:
           ip_address = _the_interface.unpretty_ip(address[0])
           current_ip, subnet_mask, gw_addr, dns = _the_interface.ifconfig
           if ip_address != current_ip:
               _the_interface.ifconfig = (ip_address, subnet_mask, gw_addr, dns)
       self._listen_port = address[1]
       # For UDP servers we need to open the socket here because we won't call
       # listen
       if self._sock_type == SOCK_DGRAM:
           _the_interface.socket_listen(
               self.socknum, self._listen_port, wiznet5k.SNMR_UDP
           )
           self._buffer = b""

   def listen(self, backlog=None):
       """Listen on the port specified by bind.
       :param backlog: For compatibility but ignored.
       """
       assert self._listen_port is not None, "Use bind to set the port before listen!"
       _the_interface.socket_listen(self.socknum, self._listen_port)
       self._buffer = b""

   def accept(self):
       """Accept a connection. The socket must be bound to an address and listening for
       connections. The return value is a pair (conn, address) where conn is a new
       socket object usable to send and receive data on the connection, and address is
       the address bound to the socket on the other end of the connection.
       """
       stamp = time.time()
       while self.status not in (
           wiznet5k.SNSR_SOCK_SYNRECV,
           wiznet5k.SNSR_SOCK_ESTABLISHED,
       ):
           if self._timeout > 0 and time.time() - stamp > self._timeout:
               return None
           if self.status == wiznet5k.SNSR_SOCK_CLOSED:
               self.close()
               self.listen()

       new_listen_socknum, addr = _the_interface.socket_accept(self.socknum)
       current_socknum = self.socknum
       # Create a new socket object and swap socket nums so we can continue listening
       client_sock = socket()
       client_sock._socknum = current_socknum  # pylint: disable=protected-access
       self._socknum = new_listen_socknum  # pylint: disable=protected-access
       self.bind((None, self._listen_port))
       self.listen()
       while self.status != wiznet5k.SNSR_SOCK_LISTEN:
           raise RuntimeError("Failed to open new listening socket")
       return client_sock, addr

   def connect(self, address, conntype=None):
       """Connect to a remote socket at address.
       :param tuple address: Remote socket as a (host, port) tuple.
       """
       assert (
           conntype != 0x03
       ), "Error: SSL/TLS is not currently supported by CircuitPython."
       host, port = address

       if hasattr(host, "split"):
           try:
               host = tuple(map(int, host.split(".")))
           except ValueError:
               host = _the_interface.get_host_by_name(host)
       if self._listen_port is not None:
           _the_interface.src_port = self._listen_port
       result = _the_interface.socket_connect(
           self.socknum, host, port, conn_mode=self._sock_type
       )
       _the_interface.src_port = 0
       if not result:
           raise RuntimeError("Failed to connect to host", host)
       self._buffer = b""

   def send(self, data):
       """Send data to the socket. The socket must be connected to
       a remote socket.
       :param bytearray data: Desired data to send to the socket.
       """
       _the_interface.socket_write(self.socknum, data, self._timeout)
       gc.collect()

   def sendto(self, data, address):
       """Send data to the socket. The socket must be connected to
       a remote socket.
       :param bytearray data: Desired data to send to the socket.
       :param tuple address: Remote socket as a (host, port) tuple.
       """
       self.connect(address)
       return self.send(data)

   def recv(self, bufsize=0, flags=0):  # pylint: disable=too-many-branches
       """Reads some bytes from the connected remote address.
       :param int bufsize: Maximum number of bytes to receive.
       :param int flags: ignored, present for compatibility.
       """
       if self.status == wiznet5k.SNSR_SOCK_CLOSED:
           return b""

       if bufsize == 0:
           # read everything on the socket
           while True:
               avail = self.available()
               if avail:
                   if self._sock_type == SOCK_STREAM:
                       self._buffer += _the_interface.socket_read(self.socknum, avail)[
                           1
                       ]
                   elif self._sock_type == SOCK_DGRAM:
                       self._buffer += _the_interface.read_udp(self.socknum, avail)[1]
               else:
                   break
           gc.collect()
           ret = self._buffer
           self._buffer = b""
           gc.collect()
           return ret
       stamp = time.time()

       to_read = bufsize - len(self._buffer)
       received = []
       while to_read > 0:
           avail = self.available()
           if avail:
               stamp = time.time()
               if self._sock_type == SOCK_STREAM:
                   recv = _the_interface.socket_read(
                       self.socknum, min(to_read, avail)
                   )[1]
               elif self._sock_type == SOCK_DGRAM:
                   recv = _the_interface.read_udp(self.socknum, min(to_read, avail))[1]
               recv = bytes(recv)
               received.append(recv)
               to_read -= len(recv)
               gc.collect()
           if self._timeout > 0 and time.time() - stamp > self._timeout:
               break
       self._buffer += b"".join(received)

       ret = None
       if len(self._buffer) == bufsize:
           ret = self._buffer
           self._buffer = b""
       else:
           ret = self._buffer[:bufsize]
           self._buffer = self._buffer[bufsize:]
       gc.collect()
       return ret

   def recvfrom(self, bufsize=0, flags=0):
       """Reads some bytes from the connected remote address.
       :param int bufsize: Maximum number of bytes to receive.
       :param int flags: ignored, present for compatibility.
       :returns: a tuple (bytes, address) where address is a tuple (ip, port)
       """
       return (
           self.recv(bufsize),
           (
               _the_interface.remote_ip(self.socknum),
               _the_interface.remote_port(self.socknum),
           ),
       )

   def recv_into(self, buf, nbytes=0, flags=0):
       """Reads some bytes from the connected remote address info the provided buffer.
       :param bytearray buf: Data buffer
       :param nbytes: Maximum number of bytes to receive
       :param int flags: ignored, present for compatibility.
       :returns: the number of bytes received
       """
       if nbytes == 0:
           nbytes = len(buf)
       ret = self.recv(nbytes)
       nbytes = len(ret)
       buf[:nbytes] = ret
       return nbytes

   def recvfrom_into(self, buf, nbytes=0, flags=0):
       """Reads some bytes from the connected remote address info the provided buffer.
       :param bytearray buf: Data buffer
       :param nbytes: Maximum number of bytes to receive
       :param int flags: ignored, present for compatibility.
       :returns a tuple (nbytes, address) where address is a tuple (ip, port)
       """
       return (
           self.recv_into(buf, nbytes),
           (
               _the_interface.remote_ip(self.socknum),
               _the_interface.remote_port(self.socknum),
           ),
       )

   def readline(self):
       """Attempt to return as many bytes as we can up to \
       but not including '\r\n'.
       """
       stamp = time.time()
       while b"\r\n" not in self._buffer:
           avail = self.available()
           if avail:
               if self._sock_type == SOCK_STREAM:
                   self._buffer += _the_interface.socket_read(self.socknum, avail)[1]
               elif self._sock_type == SOCK_DGRAM:
                   self._buffer += _the_interface.read_udp(self.socknum, avail)[1]
           if (
               not avail
               and self._timeout > 0
               and time.time() - stamp > self._timeout
           ):
               self.close()
               raise RuntimeError("Didn't receive response, failing out...")
       firstline, self._buffer = self._buffer.split(b"\r\n", 1)
       gc.collect()
       return firstline

   def disconnect(self):
       """Disconnects a TCP socket."""
       assert self._sock_type == SOCK_STREAM, "Socket must be a TCP socket."
       _the_interface.socket_disconnect(self.socknum)

   def close(self):
       """Closes the socket."""
       _the_interface.socket_close(self.socknum)

   def available(self):
       """Returns how many bytes of data are available to be read from the socket."""
       return _the_interface.socket_available(self.socknum, self._sock_type)

   def settimeout(self, value):
       """Sets socket read timeout.
       :param int value: Socket read timeout, in seconds.
       """
       if value < 0:
           raise Exception("Timeout period should be non-negative.")
       self._timeout = value

   def gettimeout(self):
       """Return the timeout in seconds (float) associated
       with socket operations, or None if no timeout is set.
       """
       return self._timeout

Seven, drive 5

sma_esp32_w5500_requests.py

"""
`sma_requests`
================================================================================
A requests-like library for web interfacing
* Author(s): Seyed Mohammad Ayyoubzadeh
Implementation Notes
--------------------
Adapted from https://github.com/adafruit/Adafruit_CircuitPython_Requests
author='Seyed Mohammad Ayyoubzadeh'
license='MIT'
__version__ = "1.0.0"
__repo__ = "https://github.com/Ayyoubzadeh/Micropython-ESP32-W5500--Wiznet-"
"""

import errno
import sys

import json as json_module



always_safe = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
              'abcdefghijklmnopqrstuvwxyz'
              '0123456789' '_.-')
def encode(s):
   res = []
   replacements = {}
   for c in s:
       if c in always_safe:
           res.append(c)
           continue
       res.append('%%%x' % ord(c))
   return ''.join(res)

def cast(_t, value):
   return value

class _RawResponse:
   def __init__(self, response: "Response") -> None:
       self._response = response

   def read(self, size: int = -1) -> bytes:
       """Read as much as available or up to size and return it in a byte string.
       Do NOT use this unless you really need to. Reusing memory with `readinto` is much better.
       """
       if size == -1:
           return self._response.content
       return self._response.socket.recv(size)

   def readinto(self, buf: bytearray) -> int:
       """Read as much as available into buf or until it is full. Returns the number of bytes read
       into buf."""
       return self._response._readinto(buf)  # pylint: disable=protected-access


class OutOfRetries(Exception):
   """Raised when requests has retried to make a request unsuccessfully."""


class Response:
   """The response from a request, contains all the headers/content"""

   # pylint: disable=too-many-instance-attributes

   encoding = None

   def __init__(self, sock: SocketType, session: Optional["Session"] = None) -> None:
       self.socket = sock
       self.encoding = "utf-8"
       self._cached = None
       self._headers = {}

       # _start_index and _receive_buffer are used when parsing headers.
       # _receive_buffer will grow by 32 bytes everytime it is too small.
       self._received_length = 0
       self._receive_buffer = bytearray(32)
       self._remaining = None
       self._chunked = False

       self._backwards_compatible = not hasattr(sock, "recv_into")

       http = self._readto(b" ")
       if not http:
           if session:
               session._close_socket(self.socket)
           else:
               self.socket.close()
           raise RuntimeError("Unable to read HTTP response.")
       self.status_code = int(bytes(self._readto(b" ")))
       self.reason = self._readto(b"\r\n")
       self._parse_headers()
       self._raw = None
       self._session = session

   def __enter__(self) -> "Response":
       return self

   def __exit__(
       self,
       exc_type: Optional[Type[type]],
       exc_value: Optional[BaseException],
       traceback: Optional[TracebackType],
   ) -> None:
       self.close()

   def _recv_into(self, buf: bytearray, size: int = 0) -> int:
       if self._backwards_compatible:
           size = len(buf) if size == 0 else size
           b = self.socket.recv(size)
           read_size = len(b)
           buf[:read_size] = b
           return read_size
       return cast("SupportsRecvInto", self.socket).recv_into(buf, size)

   def _readto(self, stop: bytes) -> bytearray:
       buf = self._receive_buffer
       end = self._received_length
       while True:
           i = bytes(buf).find(stop, 0, end)
           if i >= 0:
               # Stop was found. Return everything up to but not including stop.
               result = buf[:i]
               new_start = i + len(stop)
               # Remove everything up to and including stop from the buffer.
               new_end = end - new_start
               buf[:new_end] = buf[new_start:end]
               self._received_length = new_end
               return result

           # Not found so load more bytes.
           # If our buffer is full, then make it bigger to load more.
           if end == len(buf):
               new_buf = bytearray(len(buf) + 32)
               new_buf[: len(buf)] = buf
               buf = new_buf
               self._receive_buffer = buf

           read = self._recv_into(memoryview(buf)[end:])
           if read == 0:
               self._received_length = 0
               return buf[:end]
           end += read

   def _read_from_buffer(
       self, buf: Optional[bytearray] = None, nbytes: Optional[int] = None
   ) -> int:
       if self._received_length == 0:
           return 0
       read = self._received_length
       if nbytes < read:
           read = nbytes
       membuf = memoryview(self._receive_buffer)
       if buf:
           buf[:read] = membuf[:read]
       if read < self._received_length:
           new_end = self._received_length - read
           self._receive_buffer[:new_end] = membuf[read : self._received_length]
           self._received_length = new_end
       else:
           self._received_length = 0
       return read

   def _readinto(self, buf: bytearray) -> int:
       if not self.socket:
           raise RuntimeError(
               "Newer Response closed this one. Use Responses immediately."
           )

       if not self._remaining:
           # Consume the chunk header if need be.
           if self._chunked:
               # Consume trailing \r\n for chunks 2+
               if self._remaining == 0:
                   self._throw_away(2)
               chunk_header = bytes(self._readto(b"\r\n")).split(b";", 1)[0]
               http_chunk_size = int(bytes(chunk_header), 16)
               if http_chunk_size == 0:
                   self._chunked = False
                   self._parse_headers()
                   return 0
               self._remaining = http_chunk_size
           else:
               return 0

       nbytes = len(buf)
       if nbytes > self._remaining:
           nbytes = self._remaining

       read = self._read_from_buffer(buf, nbytes)
       if read == 0:
           read = self._recv_into(buf, nbytes)
       self._remaining -= read

       return read

   def _throw_away(self, nbytes: int) -> None:
       nbytes -= self._read_from_buffer(nbytes=nbytes)

       buf = self._receive_buffer
       len_buf = len(buf)
       for _ in range(nbytes // len_buf):
           to_read = len_buf
           while to_read > 0:
               to_read -= self._recv_into(buf, to_read)
       to_read = nbytes % len_buf
       while to_read > 0:
           to_read -= self._recv_into(buf, to_read)

   def close(self) -> None:
       """Drain the remaining ESP socket buffers. We assume we already got what we wanted."""
       if not self.socket:
           return
       # Make sure we've read all of our response.
       if self._cached is None:
           if self._remaining and self._remaining > 0:
               self._throw_away(self._remaining)
           elif self._chunked:
               while True:
                   chunk_header = bytes(self._readto(b"\r\n")).split(b";", 1)[0]
                   chunk_size = int(bytes(chunk_header), 16)
                   if chunk_size == 0:
                       break
                   self._throw_away(chunk_size + 2)
               self._parse_headers()
       if self._session:
           self._session._free_socket(self.socket)  # pylint: disable=protected-access
       else:
           self.socket.close()
       self.socket = None

   def _parse_headers(self) -> None:
       """
       Parses the header portion of an HTTP request/response from the socket.
       Expects first line of HTTP request/response to have been read already.
       """
       while True:
           header = self._readto(b"\r\n")
           if not header:
               break
           title, content = bytes(header).split(b": ", 1)
           if title and content:
               # enforce that all headers are lowercase
               title = str(title, "utf-8").lower()
               content = str(content, "utf-8")
               if title == "content-length":
                   self._remaining = int(content)
               if title == "transfer-encoding":
                   self._chunked = content.strip().lower() == "chunked"
               if title == "set-cookie" and title in self._headers:
                   self._headers[title] += ", " + content
               else:
                   self._headers[title] = content

   def _validate_not_gzip(self) -> None:
       """gzip encoding is not supported. Raise an exception if found."""
       if (
           "content-encoding" in self.headers
           and self.headers["content-encoding"] == "gzip"
       ):
           raise ValueError(
               "Content-encoding is gzip, data cannot be accessed as json or text. "
               "Use content property to access raw bytes."
           )

   @property
   def headers(self) -> Dict[str, str]:
       """
       The response headers. Does not include headers from the trailer until
       the content has been read.
       """
       return self._headers

   @property
   def content(self) -> bytes:
       """The HTTP content direct from the socket, as bytes"""
       if self._cached is not None:
           if isinstance(self._cached, bytes):
               return self._cached
           raise RuntimeError("Cannot access content after getting text or json")

       self._cached = b"".join(self.iter_content(chunk_size=32))
       return self._cached

   @property
   def text(self) -> str:
       """The HTTP content, encoded into a string according to the HTTP
       header encoding"""
       if self._cached is not None:
           if isinstance(self._cached, str):
               return self._cached
           raise RuntimeError("Cannot access text after getting content or json")

       self._validate_not_gzip()

       self._cached = str(self.content, self.encoding)
       return self._cached

   def json(self) -> Any:
       """The HTTP content, parsed into a json dictionary"""
       # The cached JSON will be a list or dictionary.
       if self._cached:
           if isinstance(self._cached, (list, dict)):
               return self._cached
           raise RuntimeError("Cannot access json after getting text or content")
       if not self._raw:
           self._raw = _RawResponse(self)

       self._validate_not_gzip()

       obj = json_module.load(self._raw)
       if not self._cached:
           self._cached = obj
       self.close()
       return obj

   def iter_content(self, chunk_size: int = 1, decode_unicode: bool = False) -> bytes:
       """An iterator that will stream data by only reading 'chunk_size'
       bytes and yielding them, when we can't buffer the whole datastream"""
       if decode_unicode:
           raise NotImplementedError("Unicode not supported")

       b = bytearray(chunk_size)
       while True:
           size = self._readinto(b)
           if size == 0:
               break
           if size < chunk_size:
               chunk = bytes(memoryview(b)[:size])
           else:
               chunk = bytes(b)
           yield chunk
       self.close()


class Session:
   """HTTP session that shares sockets and ssl context."""

   def __init__(
       self,
       socket_pool: SocketpoolModuleType,
       ssl_context: Optional[SSLContextType] = None,
   ) -> None:
       self._socket_pool = socket_pool
       self._ssl_context = ssl_context
       # Hang onto open sockets so that we can reuse them.
       self._open_sockets = {}
       self._socket_free = {}
       self._last_response = None

   def _free_socket(self, socket: SocketType) -> None:
       if socket not in self._open_sockets.values():
           raise RuntimeError("Socket not from session")
       self._socket_free[socket] = True

   def _close_socket(self, sock: SocketType) -> None:
       sock.close()
       del self._socket_free[sock]
       key = None
       for k in self._open_sockets:  # pylint: disable=consider-using-dict-items
           if self._open_sockets[k] == sock:
               key = k
               break
       if key:
           del self._open_sockets[key]

   def _free_sockets(self) -> None:
       free_sockets = []
       for sock, val in self._socket_free.items():
           if val:
               free_sockets.append(sock)
       for sock in free_sockets:
           self._close_socket(sock)

   def _get_socket(
       self, host: str, port: int, proto: str, *, timeout: float = 1
   ) -> CircuitPythonSocketType:
       # pylint: disable=too-many-branches
       key = (host, port, proto)
       if key in self._open_sockets:
           sock = self._open_sockets[key]
           if self._socket_free[sock]:
               self._socket_free[sock] = False
               return sock
       if proto == "https:" and not self._ssl_context:
           raise RuntimeError(
               "ssl_context must be set before using adafruit_requests for https"
           )
       addr_info = self._socket_pool.getaddrinfo(
           host, port, 0, self._socket_pool.SOCK_STREAM
       )[0]
       retry_count = 0
       sock = None
       while retry_count < 5 and sock is None:
           if retry_count > 0:
               if any(self._socket_free.items()):
                   self._free_sockets()
               else:
                   raise RuntimeError("Sending request failed")
           retry_count += 1

           try:
               sock = self._socket_pool.socket(addr_info[0], addr_info[1])
           except OSError:
               continue
           except RuntimeError:
               continue

           connect_host = addr_info[-1][0]
           if proto == "https:":
               sock = self._ssl_context.wrap_socket(sock, server_hostname=host)
               connect_host = host
           sock.settimeout(timeout)  # socket read timeout

           try:
               sock.connect((connect_host, port))
           except MemoryError:
               sock.close()
               sock = None
           except OSError:
               sock.close()
               sock = None

       if sock is None:
           raise RuntimeError("Repeated socket failures")

       self._open_sockets[key] = sock
       self._socket_free[sock] = False
       return sock

   @staticmethod
   def _send(socket: SocketType, data: bytes):
       total_sent = 0
       while total_sent < len(data):
           # ESP32SPI sockets raise a RuntimeError when unable to send.
           try:
               sent = socket.send(data[total_sent:])
               #print("\r\nsent",sent,"bytes")
           except OSError as exc:
               if exc.errno == errno.EAGAIN:
                   #print("\r\ncant send")
                   # Can't send right now (e.g., no buffer space), try again.
                   continue
               # Some worse error.
               raise
           except RuntimeError as exc:
               #print("\r\ncant send2")
               raise OSError(errno.EIO) from exc
           if sent is None:
               sent = len(data)
           if sent == 0:
               # Not EAGAIN; that was already handled.
               #print("\r\ncant send3")
               raise OSError(errno.EIO)
          #print("sent at the end",sent)
           total_sent += sent
           #print("total_sent at the end",total_sent)

   def _send_request(
       self,
       socket: SocketType,
       host: str,
       method: str,
       path: str,
       headers: List[Dict[str, str]],
       data: Any,
       json: Any,
   ):
       # pylint: disable=too-many-arguments
       h=b''
       h+=bytes(method, "utf-8")
       h+=b" /"
       h+=bytes(path, "utf-8")
       h+=b" HTTP/1.1\r\n"
       if "Host" not in headers:
           h+=b"Host: "
           h+=bytes(host, "utf-8")
           h+=b"\r\n"
       if "User-Agent" not in headers:
           h+=b"User-Agent: Mozilla/5.0\r\n"
       # Iterate over keys to avoid tuple alloc
       for k in headers:
           h+=k.encode()
           h+=b": "
           h+=headers[k].encode()
           h+=b"\r\n"
       if json is not None:
           assert data is None
           data = json_module.dumps(json)
           h+=b"Content-Type: application/json\r\n"
       if data:
           if isinstance(data, dict):
               h+=b"Content-Type: application/x-www-form-urlencoded\r\n"
               _post_data = ""
               for k in data:
                   data[k]=data[k]
                   _post_data +=(str(k)+"="+encode(str(data[k]))+"&")
               data = _post_data[:-1]
           if isinstance(data, str):
               data = bytes(data, "utf-8")
           h+=(b"Content-Length: %d\r\n" % len(data))
       h+=b"\r\n"
       if data:
           h+=bytes(data)
       #print("\r\nencoded data",h)
       self._send(socket, h)
       #print("\r\nsent",h)

   # pylint: disable=too-many-branches, too-many-statements, unused-argument, too-many-arguments, too-many-locals
   def request(
       self,
       method: str,
       url: str,
       data: Optional[Any] = None,
       json: Optional[Any] = None,
       headers: Optional[List[Dict[str, str]]] = None,
       stream: bool = False,
       timeout: float = 60,
   ) -> Response:
       """Perform an HTTP request to the given url which we will parse to determine
       whether to use SSL ('https://') or not. We can also send some provided 'data'
       or a json dictionary which we will stringify. 'headers' is optional HTTP headers
       sent along. 'stream' will determine if we buffer everything, or whether to only
       read only when requested
       """
       if not headers:
           headers = {}

       try:
           proto, dummy, host, path = url.split("/", 3)
           # replace spaces in path
           path = path.replace(" ", "%20")
       except ValueError:
           proto, dummy, host = url.split("/", 2)
           path = ""
       if proto == "http:":
           port = 80
       elif proto == "https:":
           port = 443
       else:
           raise ValueError("Unsupported protocol: " + proto)

       if ":" in host:
           host, port = host.split(":", 1)
           port = int(port)

       if self._last_response:
           self._last_response.close()
           self._last_response = None

       # We may fail to send the request if the socket we got is closed already. So, try a second
       # time in that case.
       retry_count = 0
       while retry_count < 2:
           retry_count += 1
           socket = self._get_socket(host, port, proto, timeout=timeout)
           ok = True
           try:
               self._send_request(socket, host, method, path, headers, data, json)
               #print('sent')
           except OSError:
               #print('OSError')
               ok = False
           if ok:
               # Read the H of "HTTP/1.1" to make sure the socket is alive. send can appear to work
               # even when the socket is closed.
               if hasattr(socket, "recv"):
                   result = socket.recv(1)
               else:
                   result = bytearray(1)
                   try:
                       socket.recv_into(result)
                   except OSError:
                       #print('OSError')
                       pass
               if result == b"H":
                   # Things seem to be ok so break with socket set.
                   break
           self._close_socket(socket)
           socket = None

       if not socket:
           raise OutOfRetries("Repeated socket failures")

       resp = Response(socket, self)  # our response
       #print(resp.text)
       #print(resp.headers)
       #print("\r\n"+str(resp.status_code)+"\r\n")
       if "location" in resp.headers and 300 <= resp.status_code <= 399:
           return resp
           # a naive handler for redirects
           redirect = resp.headers["location"]

           if redirect.startswith("http"):
               # absolute URL
               url = redirect
           elif redirect[0] == "/":
               # relative URL, absolute path
               url = "/".join([proto, dummy, host, redirect[1:]])
           else:
               # relative URL, relative path
               path = path.rsplit("/", 1)[0]

               while redirect.startswith("../"):
                   path = path.rsplit("/", 1)[0]
                   redirect = redirect.split("../", 1)[1]

               url = "/".join([proto, dummy, host, path, redirect])

           self._last_response = resp
           resp = self.request(method, url, data, json, headers, stream, timeout)

       self._last_response = resp
       return resp

   def head(self, url: str, **kw) -> Response:
       """Send HTTP HEAD request"""
       return self.request("HEAD", url, **kw)

   def get(self, url: str, **kw) -> Response:
       """Send HTTP GET request"""
       return self.request("GET", url, **kw)

   def post(self, url: str, **kw) -> Response:
       """Send HTTP POST request"""
       return self.request("POST", url, **kw)

   def put(self, url: str, **kw) -> Response:
       """Send HTTP PUT request"""
       return self.request("PUT", url, **kw)

   def patch(self, url: str, **kw) -> Response:
       """Send HTTP PATCH request"""
       return self.request("PATCH", url, **kw)

   def delete(self, url: str, **kw) -> Response:
       """Send HTTP DELETE request"""
       return self.request("DELETE", url, **kw)


# Backwards compatible API:

_default_session = None  # pylint: disable=invalid-name


class _FakeSSLSocket:
   def __init__(self, socket: CircuitPythonSocketType, tls_mode: int) -> None:
       self._socket = socket
       self._mode = tls_mode
       self.settimeout = socket.settimeout
       self.send = socket.send
       self.recv = socket.recv
       self.close = socket.close

   def connect(self, address: Tuple[str, int]) -> None:
       """connect wrapper to add non-standard mode parameter"""
       try:
           return self._socket.connect(address, self._mode)
       except RuntimeError as error:
           raise OSError(errno.ENOMEM) from error


class _FakeSSLContext:
   def __init__(self, iface: InterfaceType) -> None:
       self._iface = iface

   def wrap_socket(
       self, socket: CircuitPythonSocketType, server_hostname: Optional[str] = None
   ) -> _FakeSSLSocket:
       """Return the same socket"""
       # pylint: disable=unused-argument
       return _FakeSSLSocket(socket, self._iface.TLS_MODE)


def set_socket(
   sock: SocketpoolModuleType, iface: Optional[InterfaceType] = None
) -> None:
   """Legacy API for setting the socket and network interface. Use a `Session` instead."""
   global _default_session  # pylint: disable=global-statement,invalid-name
   if not iface:
       # pylint: disable=protected-access
       _default_session = Session(sock, _FakeSSLContext(sock._the_interface))
   else:
       _default_session = Session(sock, _FakeSSLContext(iface))
   sock.set_interface(iface)


def request(
   method: str,
   url: str,
   data: Optional[Any] = None,
   json: Optional[Any] = None,
   headers: Optional[List[Dict[str, str]]] = None,
   stream: bool = False,
   timeout: float = 1,
) -> None:
   """Send HTTP request"""
   # pylint: disable=too-many-arguments
   _default_session.request(
       method,
       url,
       data=data,
       json=json,
       headers=headers,
       stream=stream,
       timeout=timeout,
   )


def head(url: str, **kw):
   """Send HTTP HEAD request"""
   return _default_session.request("HEAD", url, **kw)


def get(url: str, **kw):
   """Send HTTP GET request"""
   return _default_session.request("GET", url, **kw)


def post(url: str, **kw):
   """Send HTTP POST request"""
   return _default_session.request("POST", url, **kw)


def put(url: str, **kw):
   """Send HTTP PUT request"""
   return _default_session.request("PUT", url, **kw)


def patch(url: str, **kw):
   """Send HTTP PATCH request"""
   return _default_session.request("PATCH", url, **kw)


def delete(url: str, **kw):
   """Send HTTP DELETE request"""
   return _default_session.request("DELETE", url, **kw)

Eight, sample code

example.py

from machine import Pin,SPI
from wiznet5k import WIZNET5K
import wiznet5k_socket as socket
import time
#import socket

spi = SPI(1,baudrate = 8_000_000,sck = Pin(2),mosi = Pin(3),miso = Pin(10,Pin.IN))
net = WIZNET5K(spi,cs = Pin(7,Pin.OUT),reset = Pin(6))
led = Pin(12,Pin.OUT)

#目标主机地址
addr = ("192.168.1.10",8080)

#文本
Text = "Shanghai欢迎你的光临".encode("UTF-8")

# UDP套接字
udp_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
# 绑定地址和端口
udp_socket.bind(("192.168.1.55",8080))

# TCP套接字
tcp_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 绑定地址和端口
tcp_socket.bind(("192.168.1.12",8080))
# 连接服务器
tcp_socket.connect(("192.168.1.10",8080))

def W5500_Init():
   Chip = net.chip
   Mac  = net.pretty_mac(net.mac_address)
   ip   = net.pretty_ip(net.ip_address)
   
   print("Chip: %s"%(Chip))
   print("Mac: %s"%(Mac))
   print("Ip: %s"%(ip))
   
   # 网卡显示
   # 显示MAC
   # IP地址

def W5500_UDP():
   #实现发送数据到调测端
   try:
       for i in range(10):
           udp_socket.sendto(Text + str(i),addr)
           #udp_socket.sendto("Shanghai欢迎你的光临".encode("UTF-8"),addr)
           time.sleep(0.5)
   except:
       udp_socket.close()
   
def W5500_TCP(mode):
   # 实现发送数据和接收数据
   try:
       if mode:
           for i in range(10):
               tcp_socket.send(Text)
               time.sleep(0.5)
       else:
           data,address = tcp_socket.recvfrom(1)
           
           print("data:%s address:%s"%(data.decode("UTF-8"),address))
           
           Data = bytearray(data).decode("UTF-8")

           if Data == 'k':
               led.value(1)
           else:
               led.value(0) 
   except:
       tcp_socket.close()
       
def main():   
   W5500_Init()
   while True:
       #W5500_UDP()
       W5500_TCP(0)


if __name__ == "__main__":
   main()

Download the network debugging assistant, Baidu download it yourself or get it from my article:

NetAssist Network Debugging Assistant V5.0.7-Software Tools-Savage Home

Internet of Things Development Notes (52) - Using Micropython to develop ESP32 development board W5500 Ethernet network module wired network communication_esp32 wired network_Modupiaoxue Blog-CSDN Blog
Use Micropython to develop wired network communication of W5500 Ethernet network module of ESP32 development board
https://blog.csdn.net/zhusongziye/article/details/128027511?ops_request_misc=&request_id=77ca849dc17f40b4905c552f9c85d1a8&biz_id=&utm_medium=distribute.pc_search_result.none-task-blog- 2~blog~koosearch~default-1-128027511-null- null.268^v1^control&utm_term=w5500&spm=1018.2226.3001.4450&ydreferer=aHR0cHM6Ly9zby5jc2RuLm5ldC9zby9zZWFyY2g%2FcT13NTUwMCZ0PWJsb2cmdT16aHVzb25neml 5ZQ%3D%3D
Then open the network debugging assistant:

 

Run the program, then you can see the relevant information received and sent. It's very simple, let's try it out for ourselves.

9. Module Introduction

 

 

Documents
Comments Write