Micropython ESP32 C3 connected to W5500 wired Ethernet UDP&TCP protocol communication module
Micropython ESP32 C3 connected to W5500 wired Ethernet UDP&TCP protocol communication module

First, the purpose
In this section, we will learn how to use Hezhou ESP32 C3 to connect W5500 wired Ethernet UDP&TCP protocol communication module for network communication experiments.
2. Environment
ESP32 C3 development board (MicroPython v1.19.1 on 2022-06-18) + W5500 wired Ethernet UDP&TCP protocol communication module + several Dupont cables + Win10 commercial version
ESP32 C3 and module wiring method:
PS: Please note that the W5500 module is connected to the development board through the SPI serial port Dupont cable, the development board is connected to the computer through the USB to serial port, the computer is connected to the router, and the W5500 LAN port is connected to the router through the network cable, so that the W5500 and the computer are in the same network segment .
3. Drive 1
wiznet5k.py
# SPDX-FileCopyrightText: 2008 Bjoern Hartmann
# SPDX-FileCopyrightText: 2010 WIZnet
# SPDX-FileCopyrightText: 2010 Arduino LLC
# SPDX-FileCopyrightText: 2018 Paul Stoffregen
# SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries
# SPDX-FileCopyrightText: 2021 Patrick Van Oosterwijck
# SPDX-FileCopyrightText: 2021 Adam Cummick
# SPDX-FileCopyrightText: 2021 Vincenzo D'Angelo
#
# SPDX-License-Identifier: MIT
"""
`wiznet5k`
================================================================================
Pure-Python interface for WIZNET 5k ethernet modules.
* Author(s): WIZnet, Arduino LLC, Bjoern Hartmann, Paul Stoffregen, Brent Rubell,
Patrick Van Oosterwijck, Vincenzo D'Angelo
Implementation Notes
--------------------
**Software and Dependencies:**
* Micropython on ESP32 (tested on ESP32-WROVER-IE equipped of Wiznet5500)
"""
import time
import wiznet5k_dhcp as dhcp
import wiznet5k_dns as dns
from random import randint
from micropython import const
from machine import SPI
# Wiznet5k Registers
REG_MR = const(0x0000) # Mode
REG_GAR = const(0x0001) # Gateway IP Address
REG_SUBR = const(0x0005) # Subnet Mask Address
REG_VERSIONR_W5500 = const(0x0039) # W5500 Silicon Version
REG_SHAR = const(0x0009) # Source Hardware Address
REG_SIPR = const(0x000F) # Source IP Address
REG_PHYCFGR = const(0x002E) # W5500 PHY Configuration
# Wiznet5k Socket Registers
REG_SNMR = const(0x0000) # Socket n Mode
REG_SNCR = const(0x0001) # Socket n Command
REG_SNIR = const(0x0002) # Socket n Interrupt
REG_SNSR = const(0x0003) # Socket n Status
REG_SNPORT = const(0x0004) # Socket n Source Port
REG_SNDIPR = const(0x000C) # Destination IP Address
REG_SNDPORT = const(0x0010) # Destination Port
REG_SNRX_RSR = const(0x0026) # RX Free Size
REG_SNRX_RD = const(0x0028) # Read Size Pointer
REG_SNTX_FSR = const(0x0020) # Socket n TX Free Size
REG_SNTX_WR = const(0x0024) # TX Write Pointer
# SNSR Commands
SNSR_SOCK_CLOSED = const(0x00)
SNSR_SOCK_INIT = const(0x13)
SNSR_SOCK_LISTEN = const(0x14)
SNSR_SOCK_SYNSENT = const(0x15)
SNSR_SOCK_SYNRECV = const(0x16)
SNSR_SOCK_ESTABLISHED = const(0x17)
SNSR_SOCK_FIN_WAIT = const(0x18)
SNSR_SOCK_CLOSING = const(0x1A)
SNSR_SOCK_TIME_WAIT = const(0x1B)
SNSR_SOCK_CLOSE_WAIT = const(0x1C)
SNSR_SOCK_LAST_ACK = const(0x1D)
SNSR_SOCK_UDP = const(0x22)
SNSR_SOCK_IPRAW = const(0x32)
SNSR_SOCK_MACRAW = const(0x42)
SNSR_SOCK_PPPOE = const(0x5F)
# Sock Commands (CMD)
CMD_SOCK_OPEN = const(0x01)
CMD_SOCK_LISTEN = const(0x02)
CMD_SOCK_CONNECT = const(0x04)
CMD_SOCK_DISCON = const(0x08)
CMD_SOCK_CLOSE = const(0x10)
CMD_SOCK_SEND = const(0x20)
CMD_SOCK_SEND_MAC = const(0x21)
CMD_SOCK_SEND_KEEP = const(0x22)
CMD_SOCK_RECV = const(0x40)
# Socket n Interrupt Register
SNIR_SEND_OK = const(0x10)
SNIR_TIMEOUT = const(0x08)
SNIR_RECV = const(0x04)
SNIR_DISCON = const(0x02)
SNIR_CON = const(0x01)
CH_SIZE = const(0x100)
SOCK_SIZE = const(0x800) # MAX W5k socket size
# Register commands
MR_RST = const(0x80) # Mode Register RST
# Socket mode register
SNMR_CLOSE = const(0x00)
SNMR_TCP = const(0x21)
SNMR_UDP = const(0x02)
SNMR_IPRAW = const(0x03)
SNMR_MACRAW = const(0x04)
SNMR_PPPOE = const(0x05)
MAX_PACKET = const(4000)
LOCAL_PORT = const(0x400)
# Default hardware MAC address
DEFAULT_MAC = (0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED)
# Maximum number of sockets to support, differs between chip versions.
W5200_W5500_MAX_SOCK_NUM = const(0x08)
SOCKET_INVALID = const(255)
# UDP socket struct.
UDP_SOCK = {"bytes_remaining": 0, "remote_ip": 0, "remote_port": 0}
# Source ports in use
SRC_PORTS = [0] * W5200_W5500_MAX_SOCK_NUM
class WIZNET5K: # pylint: disable=too-many-public-methods
"""Interface for WIZNET5K module.
:param ~machine.SPI spi_bus: The SPI bus the Wiznet module is connected to.
:param ~machine.Pin cs: Chip select pin.
:param ~machine.Pin rst: Optional reset pin.
:param bool is_dhcp: Whether to start DHCP automatically or not.
:param list mac: The Wiznet's MAC Address.
:param str hostname: The desired hostname, with optional {} to fill in MAC.
:param int dhcp_timeout: Timeout in seconds for DHCP response.
:param bool debug: Enable debugging output.
"""
TCP_MODE = const(0x21)
UDP_MODE = const(0x02)
TLS_MODE = const(0x03) # This is NOT currently implemented
# pylint: disable=too-many-arguments
def __init__(self, spi_bus, cs, reset=None, is_dhcp=True, mac=DEFAULT_MAC, hostname=None, dhcp_timeout=30, debug=False):
self._debug = debug
self._chip_type = None
self._device = spi_bus
self._device.init(baudrate=500000, polarity=0, phase=0) #baudrate=8000000
self._cs = cs
# reset wiznet module prior to initialization
if reset:
reset.on()
time.sleep(0.1)
reset.off()
time.sleep(0.1)
# Buffer for reading params from module
self._pbuff = bytearray(8)
self._rxbuf = bytearray(MAX_PACKET)
# attempt to initialize the module
self._ch_base_msb = 0
assert self._w5100_init() == 1, "Failed to initialize WIZnet module."
# Set MAC address
self.mac_address = mac
self.src_port = 0
self._dns = 0
# Set DHCP
self._dhcp_client = None
if is_dhcp:
ret = self.set_dhcp(hostname, dhcp_timeout)
if ret != 0:
self._dhcp_client = None
assert ret == 0, "Failed to configure DHCP Server!"
def set_dhcp(self, hostname=None, response_timeout=30):
"""Initializes the DHCP client and attempts to retrieve
and set network configuration from the DHCP server.
Returns 0 if DHCP configured, -1 otherwise.
:param str hostname: The desired hostname, with optional {} to fill in MAC.
:param int response_timeout: Time to wait for server to return packet, in seconds.
"""
if self._debug:
print("* Initializing DHCP")
# First, wait link status is on
# to avoid the code during DHCP - assert self.link_status, "Ethernet cable disconnected!"
start_time = time.time()
while True:
if self.link_status or ((time.time() - start_time) > 5):
break
time.sleep(1)
if self._debug:
print("My Link is:", self.link_status)
# Return IP assigned by DHCP
self._dhcp_client = dhcp.DHCP(self, self.mac_address, hostname, response_timeout, debug=self._debug)
ret = self._dhcp_client.request_dhcp_lease()
if ret == 1:
if self._debug:
_ifconfig = self.ifconfig
print("* Found DHCP Server:")
print("IP: {}\nSubnet Mask: {}\nGW Addr: {}\nDNS Server: {}".format(*_ifconfig))
return 0
return -1
def maintain_dhcp_lease(self):
"""Maintain DHCP lease"""
if self._dhcp_client is not None:
self._dhcp_client.maintain_dhcp_lease()
def get_host_by_name(self, hostname):
"""Convert a hostname to a packed 4-byte IP Address.
Returns a 4 bytearray.
"""
if self._debug:
print("* Get host by name")
if isinstance(hostname, str):
hostname = bytes(hostname, "utf-8")
# Return IP assigned by DHCP
_dns_client = dns.DNS(self, self._dns, debug=self._debug)
ret = _dns_client.gethostbyname(hostname)
if self._debug:
print("* Resolved IP: ", ret)
assert ret != -1, "Failed to resolve hostname!"
return ret
@property
def max_sockets(self):
"""Returns max number of sockets supported by chip."""
if self._chip_type == "w5500":
return W5200_W5500_MAX_SOCK_NUM
return -1
@property
def chip(self):
"""Returns the chip type."""
return self._chip_type
@property
def ip_address(self):
"""Returns the configured IP address."""
return self.read(REG_SIPR, 0x00, 4)
def pretty_ip(self, ip): # pylint: disable=no-self-use, invalid-name
"""Converts a bytearray IP address to a
dotted-quad string for printing
"""
return "%d.%d.%d.%d" % (ip[0], ip[1], ip[2], ip[3])
def unpretty_ip(self, ip): # pylint: disable=no-self-use, invalid-name
"""Converts a dotted-quad string to a bytearray IP address"""
octets = [int(x) for x in ip.split(".")]
return bytes(octets)
@property
def mac_address(self):
"""Returns the hardware's MAC address."""
return self.read(REG_SHAR, 0x00, 6)
@mac_address.setter
def mac_address(self, address):
"""Sets the hardware MAC address.
:param tuple address: Hardware MAC address.
"""
self.write(REG_SHAR, 0x04, address)
def pretty_mac(self, mac): # pylint: disable=no-self-use, invalid-name
"""Converts a bytearray MAC address to a
dotted-quad string for printing
"""
return "%s:%s:%s:%s:%s:%s" % (
hex(mac[0]),
hex(mac[1]),
hex(mac[2]),
hex(mac[3]),
hex(mac[4]),
hex(mac[5]),
)
def remote_ip(self, socket_num):
"""Returns the IP address of the host who sent the current incoming packet.
:param int socket num: Desired socket.
"""
if socket_num >= self.max_sockets:
return self._pbuff
for octet in range(0, 4):
self._pbuff[octet] = self._read_socket(socket_num, REG_SNDIPR + octet)[0]
return self.pretty_ip(self._pbuff)
@property
def link_status(self):
""" "Returns if the PHY is connected."""
if self._chip_type == "w5500":
data = self.read(REG_PHYCFGR, 0x00)
return data[0] & 0x01
return 0
@property
def link_speed(self):
""" "Returns link speed."""
if self._chip_type == "w5500":
data = self.read(REG_PHYCFGR, 0x00)
if data[0] & 0x01: # link up
if data[0] & 0x02:
return 100
else:
return 10
return None
@property
def link_full_duplex(self):
""" "Returns if link is full duplex."""
if self._chip_type == "w5500":
data = self.read(REG_PHYCFGR, 0x00)
if data[0] & 0x01: # link up
if data[0] & 0x04:
return True
else:
return False
return None
def remote_port(self, socket_num):
"""Returns the port of the host who sent the current incoming packet."""
if socket_num >= self.max_sockets:
return self._pbuff
for octet in range(0, 2):
self._pbuff[octet] = self._read_socket(socket_num, REG_SNDPORT + octet)[0]
return int((self._pbuff[0] << 8) | self._pbuff[0])
@property
def ifconfig(self):
"""Returns the network configuration as a tuple."""
print('IFCONFIG')
return (self.ip_address, self.read(REG_SUBR, 0x00, 4), self.read(REG_GAR, 0x00, 4), self._dns)
@ifconfig.setter
def ifconfig(self, params):
"""Sets network configuration to provided tuple in format:
(ip_address, subnet_mask, gateway_address, dns_server).
"""
ip_address, subnet_mask, gateway_address, dns_server = params
self.write(REG_SIPR, 0x04, ip_address)
self.write(REG_SUBR, 0x04, subnet_mask)
self.write(REG_GAR, 0x04, gateway_address)
self._dns = dns_server
def _w5100_init(self):
"""Initializes and detects a wiznet5k module."""
time.sleep(1)
self._cs.value(1)
# Detect if chip is Wiznet W5500
if self.detect_w5500() == 1:
# perform w5500 initialization
for i in range(0, W5200_W5500_MAX_SOCK_NUM):
ctrl_byte = 0x0C + (i << 5)
self.write(0x1E, ctrl_byte, 2)
self.write(0x1F, ctrl_byte, 2)
else:
return 0
return 1
def detect_w5500(self):
"""Detects W5500 chip."""
assert self.sw_reset() == 0, "Chip not reset properly!"
self._write_mr(0x08)
assert self._read_mr()[0] == 0x08, "Expected 0x08."
self._write_mr(0x10)
assert self._read_mr()[0] == 0x10, "Expected 0x10."
self._write_mr(0x00)
assert self._read_mr()[0] == 0x00, "Expected 0x00."
if self.read(REG_VERSIONR_W5500, 0x00)[0] != 0x04:
return -1
self._chip_type = "w5500"
self._ch_base_msb = 0x10
return 1
def sw_reset(self):
"""Performs a soft-reset on a Wiznet chip
by writing to its MR register reset bit.
"""
mode_reg = self._read_mr()
self._write_mr(0x80)
mode_reg = self._read_mr()
if mode_reg[0] != 0x00:
return -1
return 0
def _read_mr(self):
"""Reads from the Mode Register (MR)."""
res = self.read(REG_MR, 0x00)
return res
def _write_mr(self, data):
"""Writes to the mode register (MR).
:param int data: Data to write to the mode register.
"""
self.write(REG_MR, 0x04, data)
def read(self, addr, callback, length=1, buffer=None):
"""Reads data from a register address.
:param int addr: Register address.
"""
if self._debug:
print(f'READ from {addr}')
self._cs.value(0)
self._device.write(bytes([addr >> 8])) # pylint: disable=no-member
self._device.write(bytes([addr & 0xFF])) # pylint: disable=no-member
self._device.write(bytes([callback])) # pylint: disable=no-member
if buffer is None:
self._rxbuf = bytearray(length)
self._device.readinto(self._rxbuf) # pylint: disable=no-member
self._cs.value(1)
return self._rxbuf
self._device.readinto(buffer, end=length) # pylint: disable=no-member
self._cs.value(1)
return buffer
def write(self, addr, callback, data):
"""Write data to a register address.
:param int addr: Destination address.
:param int callback: Callback reference.
:param int data: Data to write, as an integer.
:param bytearray data: Data to write, as a bytearray.
"""
if self._debug:
print(f'WRITE on {addr}')
self._cs.value(0)
self._device.write(bytes([addr >> 8])) # pylint: disable=no-member
self._device.write(bytes([addr & 0xFF])) # pylint: disable=no-member
self._device.write(bytes([callback])) # pylint: disable=no-member
if hasattr(data, "from_bytes"):
self._device.write(bytes([data])) # pylint: disable=no-member
else:
for i, _ in enumerate(data):
self._device.write(bytes([data[i]])) # pylint: disable=no-member
self._cs.value(1)
# Socket-Register API
def udp_remaining(self):
"""Returns amount of bytes remaining in a udp socket."""
if self._debug:
print("* UDP Bytes Remaining: ", UDP_SOCK["bytes_remaining"])
return UDP_SOCK["bytes_remaining"]
def socket_available(self, socket_num, sock_type=SNMR_TCP):
"""Returns the amount of bytes to be read from the socket.
:param int socket_num: Desired socket to return bytes from.
:param int sock_type: Socket type, defaults to TCP.
"""
if self._debug:
print("* socket_available called on socket {}, protocol {}".format(socket_num, sock_type))
assert socket_num <= self.max_sockets, "Provided socket exceeds max_sockets."
res = self._get_rx_rcv_size(socket_num)
if sock_type == SNMR_TCP:
return res
if res > 0:
if UDP_SOCK["bytes_remaining"]:
return UDP_SOCK["bytes_remaining"]
# parse the udp rx packet
# read the first 8 header bytes
ret, self._pbuff = self.socket_read(socket_num, 8)
if ret > 0:
UDP_SOCK["remote_ip"] = self._pbuff[:4]
UDP_SOCK["remote_port"] = (self._pbuff[4] << 8) + self._pbuff[5]
UDP_SOCK["bytes_remaining"] = (self._pbuff[6] << 8) + self._pbuff[7]
ret = UDP_SOCK["bytes_remaining"]
return ret
return 0
def socket_status(self, socket_num):
"""Returns the socket connection status. Can be: SNSR_SOCK_CLOSED,
SNSR_SOCK_INIT, SNSR_SOCK_LISTEN, SNSR_SOCK_SYNSENT, SNSR_SOCK_SYNRECV,
SNSR_SYN_SOCK_ESTABLISHED, SNSR_SOCK_FIN_WAIT, SNSR_SOCK_CLOSING,
SNSR_SOCK_TIME_WAIT, SNSR_SOCK_CLOSE_WAIT, SNSR_LAST_ACK,
SNSR_SOCK_UDP, SNSR_SOCK_IPRAW, SNSR_SOCK_MACRAW, SNSR_SOCK_PPOE.
"""
return self._read_snsr(socket_num)
def socket_connect(self, socket_num, dest, port, conn_mode=SNMR_TCP):
"""Open and verify we've connected a socket to a dest IP address
or hostname. By default, we use 'conn_mode'= SNMR_TCP but we
may also use SNMR_UDP.
"""
assert self.link_status, "Ethernet cable disconnected!"
if self._debug:
print("* w5k socket connect, protocol={}, port={}, ip={}".format(conn_mode, port, self.pretty_ip(dest)))
# initialize a socket and set the mode
res = self.socket_open(socket_num, conn_mode=conn_mode)
if res == 1:
raise RuntimeError("Failed to initalize a connection with the socket.")
# set socket destination IP and port
self._write_sndipr(socket_num, dest)
self._write_sndport(socket_num, port)
self._send_socket_cmd(socket_num, CMD_SOCK_CONNECT)
if conn_mode == SNMR_TCP:
# wait for tcp connection establishment
while self.socket_status(socket_num)[0] != SNSR_SOCK_ESTABLISHED:
time.sleep(0.001)
if self._debug:
print("SN_SR:", self.socket_status(socket_num)[0])
if self.socket_status(socket_num)[0] == SNSR_SOCK_CLOSED:
raise RuntimeError("Failed to establish connection.")
elif conn_mode == SNMR_UDP:
UDP_SOCK["bytes_remaining"] = 0
return 1
def _send_socket_cmd(self, socket, cmd):
self._write_sncr(socket, cmd)
while self._read_sncr(socket) != b"\x00":
if self._debug:
print("waiting for sncr to clear...")
def get_socket(self):
"""Requests, allocates and returns a socket from the W5k
chip. Returned socket number may not exceed max_sockets.
"""
if self._debug:
print("*** Get socket")
sock = SOCKET_INVALID
for _sock in range(self.max_sockets):
status = self.socket_status(_sock)[0]
if status == SNSR_SOCK_CLOSED:
sock = _sock
break
if self._debug:
print("Allocated socket #{}".format(sock))
return sock
def socket_listen(self, socket_num, port, conn_mode=SNMR_TCP):
"""Start listening on a socket (default TCP mode).
:parm int socket_num: socket number
:parm int port: port to listen on
:parm int conn_mode: connection mode SNMR_TCP (default) or SNMR_UDP
"""
assert self.link_status, "Ethernet cable disconnected!"
if self._debug:
print("* Listening on port={}, ip={}".format(port, self.pretty_ip(self.ip_address)))
# Initialize a socket and set the mode
self.src_port = port
res = self.socket_open(socket_num, conn_mode=conn_mode)
self.src_port = 0
if res == 1:
raise RuntimeError("Failed to initalize the socket.")
# Send listen command
self._send_socket_cmd(socket_num, CMD_SOCK_LISTEN)
# Wait until ready
status = [SNSR_SOCK_CLOSED]
while status[0] not in (SNSR_SOCK_LISTEN, SNSR_SOCK_ESTABLISHED, SNSR_SOCK_UDP):
status = self._read_snsr(socket_num)
if status[0] == SNSR_SOCK_CLOSED:
raise RuntimeError("Listening socket closed.")
def socket_accept(self, socket_num):
"""Gets the dest IP and port from an incoming connection.
Returns the next socket number so listening can continue
:parm int socket_num: socket number
"""
dest_ip = self.remote_ip(socket_num)
dest_port = self.remote_port(socket_num)
next_socknum = self.get_socket()
if self._debug:
print("* Dest is ({}, {}), Next listen socknum is #{}".format(dest_ip, dest_port, next_socknum))
return next_socknum, (dest_ip, dest_port)
def socket_open(self, socket_num, conn_mode=SNMR_TCP):
"""Opens a TCP or UDP socket. By default, we use
'conn_mode'=SNMR_TCP but we may also use SNMR_UDP.
"""
assert self.link_status, "Ethernet cable disconnected!"
if self._debug:
print("*** Opening socket %d" % socket_num)
status = self._read_snsr(socket_num)[0]
if status in (
SNSR_SOCK_CLOSED,
SNSR_SOCK_TIME_WAIT,
SNSR_SOCK_FIN_WAIT,
SNSR_SOCK_CLOSE_WAIT,
SNSR_SOCK_CLOSING,
SNSR_SOCK_UDP,
SNSR_SOCK_LISTEN,
):
if self._debug:
print("* Opening W5k Socket, protocol={}".format(conn_mode))
time.sleep(0.00025)
self._write_snmr(socket_num, conn_mode)
self._write_snir(socket_num, 0xFF)
if self.src_port > 0:
# write to socket source port
self._write_sock_port(socket_num, self.src_port)
else:
s_port = randint(49152, 65535)
while s_port in SRC_PORTS:
s_port = randint(49152, 65535)
self._write_sock_port(socket_num, s_port)
SRC_PORTS[socket_num] = s_port
# open socket
self._write_sncr(socket_num, CMD_SOCK_OPEN)
self._read_sncr(socket_num)
assert (self._read_snsr((socket_num))[0] == 0x13
or self._read_snsr((socket_num))[0] == 0x22 ), "Could not open socket in TCP or UDP mode."
return 0
return 1
def socket_close(self, socket_num):
"""Closes a socket."""
if self._debug:
print("*** Closing socket #%d" % socket_num)
self._write_sncr(socket_num, CMD_SOCK_CLOSE)
self._read_sncr(socket_num)
def socket_disconnect(self, socket_num):
"""Disconnect a TCP connection."""
if self._debug:
print("*** Disconnecting socket #%d" % socket_num)
self._write_sncr(socket_num, CMD_SOCK_DISCON)
self._read_sncr(socket_num)
def socket_read(self, socket_num, length):
"""Reads data from a socket into a buffer.
Returns buffer.
"""
assert self.link_status, "Ethernet cable disconnected!"
assert socket_num <= self.max_sockets, "Provided socket exceeds max_sockets."
# Check if there is data available on the socket
ret = self._get_rx_rcv_size(socket_num)
if self._debug:
print("Bytes avail. on sock: ", ret)
if ret == 0:
# no data on socket?
status = self._read_snmr(socket_num)
if status in (SNSR_SOCK_LISTEN, SNSR_SOCK_CLOSED, SNSR_SOCK_CLOSE_WAIT):
# remote end closed its side of the connection, EOF state
ret = 0
resp = 0
else:
# connection is alive, no data waiting to be read
ret = -1
resp = -1
elif ret > length:
# set ret to the length of buffer
ret = length
if ret > 0:
if self._debug:
print("\t * Processing {} bytes of data".format(ret))
# Read the starting save address of the received data
ptr = self._read_snrx_rd(socket_num)
# Read data from the starting address of snrx_rd
ctrl_byte = 0x18 + (socket_num << 5)
resp = self.read(ptr, ctrl_byte, ret)
# After reading the received data, update Sn_RX_RD to the increased
# value as many as the reading size.
ptr += ret
self._write_snrx_rd(socket_num, ptr)
# Notify the W5k of the updated Sn_Rx_RD
self._write_sncr(socket_num, CMD_SOCK_RECV)
self._read_sncr(socket_num)
return ret, resp
def read_udp(self, socket_num, length):
"""Read UDP socket's remaining bytes."""
if UDP_SOCK["bytes_remaining"] > 0:
if UDP_SOCK["bytes_remaining"] <= length:
ret, resp = self.socket_read(socket_num, UDP_SOCK["bytes_remaining"])
else:
ret, resp = self.socket_read(socket_num, length)
if ret > 0:
UDP_SOCK["bytes_remaining"] -= ret
return ret, resp
return -1
def socket_write(self, socket_num, buffer, timeout=0):
"""Writes a bytearray to a provided socket."""
assert self.link_status, "Ethernet cable disconnected!"
assert socket_num <= self.max_sockets, "Provided socket exceeds max_sockets."
status = 0
ret = 0
free_size = 0
if len(buffer) > SOCK_SIZE:
ret = SOCK_SIZE
else:
ret = len(buffer)
stamp = time.time()
# if buffer is available, start the transfer
free_size = self._get_tx_free_size(socket_num)
while free_size < ret:
free_size = self._get_tx_free_size(socket_num)
status = self.socket_status(socket_num)[0]
if status not in (SNSR_SOCK_ESTABLISHED, SNSR_SOCK_CLOSE_WAIT) or (timeout and time.time() - stamp > timeout):
ret = 0
break
# Read the starting address for saving the transmitting data.
ptr = self._read_sntx_wr(socket_num)
offset = ptr & 0x07FF
dst_addr = offset + (socket_num * 2048 + 0x8000)
# update sn_tx_wr to the value + data size
ptr = (ptr + len(buffer)) & 0xFFFF
self._write_sntx_wr(socket_num, ptr)
cntl_byte = 0x14 + (socket_num << 5)
self.write(dst_addr, cntl_byte, buffer)
self._write_sncr(socket_num, CMD_SOCK_SEND)
self._read_sncr(socket_num)
# check data was transferred correctly
while (self._read_socket(socket_num, REG_SNIR)[0] & SNIR_SEND_OK) != SNIR_SEND_OK:
if (self.socket_status(socket_num)[0] in (
SNSR_SOCK_CLOSED,
SNSR_SOCK_TIME_WAIT,
SNSR_SOCK_FIN_WAIT,
SNSR_SOCK_CLOSE_WAIT,
SNSR_SOCK_CLOSING,
)
or (timeout and time.time() - stamp > timeout)
):
# self.socket_close(socket_num)
return 0
time.sleep(0.01)
self._write_snir(socket_num, SNIR_SEND_OK)
return ret
# Socket-Register Methods
def _get_rx_rcv_size(self, sock):
"""Get size of recieved and saved in socket buffer."""
val = 0
val_1 = self._read_snrx_rsr(sock)
while val != val_1:
val_1 = self._read_snrx_rsr(sock)
if val_1 != 0:
val = self._read_snrx_rsr(sock)
return int.from_bytes(val, "b")
def _get_tx_free_size(self, sock):
"""Get free size of sock's tx buffer block."""
val = 0
val_1 = self._read_sntx_fsr(sock)
while val != val_1:
val_1 = self._read_sntx_fsr(sock)
if val_1 != 0:
val = self._read_sntx_fsr(sock)
return int.from_bytes(val, "b")
def _read_snrx_rd(self, sock):
self._pbuff[0] = self._read_socket(sock, REG_SNRX_RD)[0]
self._pbuff[1] = self._read_socket(sock, REG_SNRX_RD + 1)[0]
return self._pbuff[0] << 8 | self._pbuff[1]
def _write_snrx_rd(self, sock, data):
self._write_socket(sock, REG_SNRX_RD, data >> 8 & 0xFF)
self._write_socket(sock, REG_SNRX_RD + 1, data & 0xFF)
def _write_sntx_wr(self, sock, data):
self._write_socket(sock, REG_SNTX_WR, data >> 8 & 0xFF)
self._write_socket(sock, REG_SNTX_WR + 1, data & 0xFF)
def _read_sntx_wr(self, sock):
self._pbuff[0] = self._read_socket(sock, 0x0024)[0]
self._pbuff[1] = self._read_socket(sock, 0x0024 + 1)[0]
return self._pbuff[0] << 8 | self._pbuff[1]
def _read_sntx_fsr(self, sock):
data = self._read_socket(sock, REG_SNTX_FSR)
data += self._read_socket(sock, REG_SNTX_FSR + 1)
return data
def _read_snrx_rsr(self, sock):
data = self._read_socket(sock, REG_SNRX_RSR)
data += self._read_socket(sock, REG_SNRX_RSR + 1)
return data
def _write_sndipr(self, sock, ip_addr):
"""Writes to socket destination IP Address."""
for octet in range(0, 4):
self._write_socket(sock, REG_SNDIPR + octet, ip_addr[octet])
def _write_sndport(self, sock, port):
"""Writes to socket destination port."""
self._write_socket(sock, REG_SNDPORT, port >> 8)
self._write_socket(sock, REG_SNDPORT + 1, port & 0xFF)
def _read_snsr(self, sock):
"""Reads Socket n Status Register."""
return self._read_socket(sock, REG_SNSR)
def _write_snmr(self, sock, protocol):
"""Write to Socket n Mode Register."""
self._write_socket(sock, REG_SNMR, protocol)
def _write_snir(self, sock, data):
"""Write to Socket n Interrupt Register."""
self._write_socket(sock, REG_SNIR, data)
def _write_sock_port(self, sock, port):
"""Write to the socket port number."""
self._write_socket(sock, REG_SNPORT, port >> 8)
self._write_socket(sock, REG_SNPORT + 1, port & 0xFF)
def _write_sncr(self, sock, data):
self._write_socket(sock, REG_SNCR, data)
def _read_sncr(self, sock):
return self._read_socket(sock, REG_SNCR)
def _read_snmr(self, sock):
return self._read_socket(sock, REG_SNMR)
def _write_socket(self, sock, address, data):
"""Write to a W5k socket register."""
base = self._ch_base_msb << 8
cntl_byte = (sock << 5) + 0x0C
return self.write(base + sock * CH_SIZE + address, cntl_byte, data)
def _read_socket(self, sock, address):
"""Read a W5k socket register."""
cntl_byte = (sock << 5) + 0x08
return self.read(address, cntl_byte)
Four, drive 2
wiznet5k_dhcp.py
# SPDX-FileCopyrightText: 2009 Jordan Terell (blog.jordanterrell.com)
# SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries
# SPDX-FileCopyrightText: 2021 Patrick Van Oosterwijck @ Silicognition LLC
# SPDX-FileCopyrightText: 2021 Vincenzo D'Angelo
#
# SPDX-License-Identifier: MIT
"""
`wiznet5k_dhcp`
================================================================================
Pure-Python implementation of Jordan Terrell's DHCP library v0.3
* Author(s): Jordan Terrell, Brent Rubell, Vincenzo D'Angelo
"""
import gc
import time
import wiznet5k_socket as socket
from random import randint
from micropython import const
from wiznet5k_socket import htonl, htons
# DHCP State Machine
STATE_DHCP_START = const(0x00)
STATE_DHCP_DISCOVER = const(0x01)
STATE_DHCP_REQUEST = const(0x02)
STATE_DHCP_LEASED = const(0x03)
STATE_DHCP_REREQUEST = const(0x04)
STATE_DHCP_RELEASE = const(0x05)
STATE_DHCP_WAIT = const(0x06)
STATE_DHCP_DISCONN = const(0x07)
# DHCP wait time between attempts
DHCP_WAIT_TIME = const(60)
# DHCP Message Types
DHCP_DISCOVER = const(1)
DHCP_OFFER = const(2)
DHCP_REQUEST = const(3)
DHCP_DECLINE = const(4)
DHCP_ACK = const(5)
DHCP_NAK = const(6)
DHCP_RELEASE = const(7)
DHCP_INFORM = const(8)
# DHCP Message OP Codes
DHCP_BOOT_REQUEST = const(0x01)
DHCP_BOOT_REPLY = const(0x02)
DHCP_HTYPE10MB = const(0x01)
DHCP_HTYPE100MB = const(0x02)
DHCP_HLENETHERNET = const(0x06)
DHCP_HOPS = const(0x00)
MAGIC_COOKIE = const(0x63825363)
MAX_DHCP_OPT = const(0x10)
# Default DHCP Server port
DHCP_SERVER_PORT = const(67)
# DHCP Lease Time, in seconds
DEFAULT_LEASE_TIME = const(900)
BROADCAST_SERVER_ADDR = (255, 255, 255, 255)
# DHCP Response Options
MSG_TYPE = 53
SUBNET_MASK = 1
ROUTERS_ON_SUBNET = 3
DNS_SERVERS = 6
DHCP_SERVER_ID = 54
T1_VAL = 58
T2_VAL = 59
LEASE_TIME = 51
OPT_END = 255
# Packet buffer
_BUFF = bytearray(318)
class DHCP:
"""W5k DHCP Client implementation.
:param eth: Wiznet 5k object
:param list mac_address: Hardware MAC.
:param str hostname: The desired hostname, with optional {} to fill in MAC.
:param int response_timeout: DHCP Response timeout.
:param bool debug: Enable debugging output.
"""
# pylint: disable=too-many-arguments, too-many-instance-attributes, invalid-name
def __init__(
self, eth, mac_address, hostname=None, response_timeout=30, debug=False
):
self._debug = debug
self._response_timeout = response_timeout
self._mac_address = mac_address
# Set socket interface
socket.set_interface(eth)
self._eth = eth
self._sock = None
# DHCP state machine
self._dhcp_state = STATE_DHCP_START
self._initial_xid = 0
self._transaction_id = 0
self._start_time = 0
# DHCP server configuration
self.dhcp_server_ip = BROADCAST_SERVER_ADDR
self.local_ip = 0
self.gateway_ip = 0
self.subnet_mask = 0
self.dns_server_ip = 0
# Lease configuration
self._lease_time = 0
self._last_lease_time = 0
self._renew_in_sec = 0
self._rebind_in_sec = 0
self._t1 = 0
self._t2 = 0
# Select an initial transaction id
self._transaction_id = randint(1, 0x7FFFFFFF)
# Host name
mac_string = "".join("{:02X}".format(o) for o in mac_address)
self._hostname = bytes(
(hostname or "WIZnet{}").split(".")[0].format(mac_string)[:42], "utf-8"
)
# pylint: disable=too-many-statements
def send_dhcp_message(self, state, time_elapsed, renew=False):
"""Assemble and send a DHCP message packet to a socket.
:param int state: DHCP Message state.
:param float time_elapsed: Number of seconds elapsed since DHCP process started
:param bool renew: Set True for renew and rebind
"""
_BUFF[:] = b"\x00" * len(_BUFF)
# OP
_BUFF[0] = DHCP_BOOT_REQUEST
# HTYPE
_BUFF[1] = DHCP_HTYPE10MB
# HLEN
_BUFF[2] = DHCP_HLENETHERNET
# HOPS
_BUFF[3] = DHCP_HOPS
# Transaction ID (xid)
self._initial_xid = htonl(self._transaction_id)
self._initial_xid = self._initial_xid.to_bytes(4, "l")
_BUFF[4:7] = self._initial_xid
# seconds elapsed
_BUFF[8] = (int(time_elapsed) & 0xFF00) >> 8
_BUFF[9] = int(time_elapsed) & 0x00FF
# flags
flags = htons(0x8000)
flags = flags.to_bytes(2, "b")
_BUFF[10] = flags[1]
_BUFF[11] = flags[0]
# NOTE: Skipping ciaddr/yiaddr/siaddr/giaddr
# as they're already set to 0.0.0.0
# Except when renewing, then fill in ciaddr
if renew:
_BUFF[12:15] = bytes(self.local_ip)
# chaddr
_BUFF[28:34] = self._mac_address
# NOTE: 192 octets of 0's, BOOTP legacy
# Magic Cookie
_BUFF[236] = (MAGIC_COOKIE >> 24) & 0xFF
_BUFF[237] = (MAGIC_COOKIE >> 16) & 0xFF
_BUFF[238] = (MAGIC_COOKIE >> 8) & 0xFF
_BUFF[239] = MAGIC_COOKIE & 0xFF
# Option - DHCP Message Type
_BUFF[240] = 53
_BUFF[241] = 0x01
_BUFF[242] = state
# Option - Client Identifier
_BUFF[243] = 61
# Length
_BUFF[244] = 0x07
# HW Type - ETH
_BUFF[245] = 0x01
# Client MAC Address
for mac in range(0, len(self._mac_address)):
_BUFF[246 + mac] = self._mac_address[mac]
# Option - Host Name
_BUFF[252] = 12
hostname_len = len(self._hostname)
after_hostname = 254 + hostname_len
_BUFF[253] = hostname_len
_BUFF[254:after_hostname] = self._hostname
if state == DHCP_REQUEST and not renew:
# Set the parsed local IP addr
_BUFF[after_hostname] = 50
_BUFF[after_hostname + 1] = 0x04
_BUFF[after_hostname + 2 : after_hostname + 6] = bytes(self.local_ip)
# Set the parsed dhcp server ip addr
_BUFF[after_hostname + 6] = 54
_BUFF[after_hostname + 7] = 0x04
_BUFF[after_hostname + 8 : after_hostname + 12] = bytes(self.dhcp_server_ip)
_BUFF[after_hostname + 12] = 55
_BUFF[after_hostname + 13] = 0x06
# subnet mask
_BUFF[after_hostname + 14] = 1
# routers on subnet
_BUFF[after_hostname + 15] = 3
# DNS
_BUFF[after_hostname + 16] = 6
# domain name
_BUFF[after_hostname + 17] = 15
# renewal (T1) value
_BUFF[after_hostname + 18] = 58
# rebinding (T2) value
_BUFF[after_hostname + 19] = 59
_BUFF[after_hostname + 20] = 255
# Send DHCP packet
self._sock.send(_BUFF)
# pylint: disable=too-many-branches, too-many-statements
def parse_dhcp_response(self):
"""Parse DHCP response from DHCP server.
Returns DHCP packet type.
"""
# store packet in buffer
_BUFF = self._sock.recv()
if self._debug:
print("DHCP Response: ", _BUFF)
# -- Parse Packet, FIXED -- #
# Validate OP
assert (
_BUFF[0] == DHCP_BOOT_REPLY
), "Malformed Packet - \
DHCP message OP is not expected BOOT Reply."
xid = _BUFF[4:8]
if bytes(xid) < self._initial_xid:
print("f")
return 0, 0
self.local_ip = tuple(_BUFF[16:20])
if _BUFF[28:34] == 0:
return 0, 0
if int.from_bytes(_BUFF[235:240], "l") != MAGIC_COOKIE:
return 0, 0
# -- Parse Packet, VARIABLE -- #
ptr = 240
while _BUFF[ptr] != OPT_END:
if _BUFF[ptr] == MSG_TYPE:
ptr += 1
opt_len = _BUFF[ptr]
ptr += opt_len
msg_type = _BUFF[ptr]
ptr += 1
elif _BUFF[ptr] == SUBNET_MASK:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self.subnet_mask = tuple(_BUFF[ptr : ptr + opt_len])
ptr += opt_len
elif _BUFF[ptr] == DHCP_SERVER_ID:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self.dhcp_server_ip = tuple(_BUFF[ptr : ptr + opt_len])
ptr += opt_len
elif _BUFF[ptr] == LEASE_TIME:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self._lease_time = int.from_bytes(_BUFF[ptr : ptr + opt_len], "l")
ptr += opt_len
elif _BUFF[ptr] == ROUTERS_ON_SUBNET:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self.gateway_ip = tuple(_BUFF[ptr : ptr + opt_len])
ptr += opt_len
elif _BUFF[ptr] == DNS_SERVERS:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self.dns_server_ip = tuple(_BUFF[ptr : ptr + 4])
ptr += opt_len # still increment even though we only read 1 addr.
elif _BUFF[ptr] == T1_VAL:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self._t1 = int.from_bytes(_BUFF[ptr : ptr + opt_len], "l")
ptr += opt_len
elif _BUFF[ptr] == T2_VAL:
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
self._t2 = int.from_bytes(_BUFF[ptr : ptr + opt_len], "l")
ptr += opt_len
elif _BUFF[ptr] == 0:
break
else:
# We're not interested in this option
ptr += 1
opt_len = _BUFF[ptr]
ptr += 1
# no-op
ptr += opt_len
if self._debug:
print(
"Msg Type: {}\nSubnet Mask: {}\nDHCP Server IP: {}\nDNS Server IP: {}\
\nGateway IP: {}\nLocal IP: {}\nT1: {}\nT2: {}\nLease Time: {}".format(
msg_type,
self.subnet_mask,
self.dhcp_server_ip,
self.dns_server_ip,
self.gateway_ip,
self.local_ip,
self._t1,
self._t2,
self._lease_time,
)
)
gc.collect()
return msg_type, xid
# pylint: disable=too-many-branches, too-many-statements
def _dhcp_state_machine(self):
"""DHCP state machine without wait loops to enable cooperative multi tasking
This state machine is used both by the initial blocking lease request and
the non-blocking DHCP maintenance function"""
if self._eth.link_status:
if self._dhcp_state == STATE_DHCP_DISCONN:
self._dhcp_state = STATE_DHCP_START
else:
if self._dhcp_state != STATE_DHCP_DISCONN:
self._dhcp_state = STATE_DHCP_DISCONN
self.dhcp_server_ip = BROADCAST_SERVER_ADDR
self._last_lease_time = 0
reset_ip = (0, 0, 0, 0)
self._eth.ifconfig = (reset_ip, reset_ip, reset_ip, reset_ip)
if self._sock is not None:
self._sock.close()
self._sock = None
if self._dhcp_state == STATE_DHCP_START:
self._start_time = time.time()
self._transaction_id = (self._transaction_id + 1) & 0x7FFFFFFF
try:
self._sock = socket.socket(type=socket.SOCK_DGRAM)
except RuntimeError:
if self._debug:
print("* DHCP: Failed to allocate socket")
self._dhcp_state = STATE_DHCP_WAIT
else:
self._sock.settimeout(self._response_timeout)
self._sock.bind((None, 68))
self._sock.connect((self.dhcp_server_ip, DHCP_SERVER_PORT))
if self._last_lease_time == 0 or time.time() > (
self._last_lease_time + self._lease_time
):
if self._debug:
print("* DHCP: Send discover to {}".format(self.dhcp_server_ip))
self.send_dhcp_message(
STATE_DHCP_DISCOVER, (time.time() - self._start_time)
)
self._dhcp_state = STATE_DHCP_DISCOVER
else:
if self._debug:
print("* DHCP: Send request to {}".format(self.dhcp_server_ip))
self.send_dhcp_message(
DHCP_REQUEST, (time.time() - self._start_time), True
)
self._dhcp_state = STATE_DHCP_REQUEST
elif self._dhcp_state == STATE_DHCP_DISCOVER:
if self._sock.available():
if self._debug:
print("* DHCP: Parsing OFFER")
msg_type, xid = self.parse_dhcp_response()
if msg_type == DHCP_OFFER:
# Check if transaction ID matches, otherwise it may be an offer
# for another device
if htonl(self._transaction_id) == int.from_bytes(xid, "l"):
if self._debug:
print(
"* DHCP: Send request to {}".format(self.dhcp_server_ip)
)
self._transaction_id = (self._transaction_id + 1) & 0x7FFFFFFF
self.send_dhcp_message(
DHCP_REQUEST, (time.time() - self._start_time)
)
self._dhcp_state = STATE_DHCP_REQUEST
else:
if self._debug:
print("* DHCP: Received OFFER with non-matching xid")
else:
if self._debug:
print("* DHCP: Received DHCP Message is not OFFER")
elif self._dhcp_state == STATE_DHCP_REQUEST:
if self._sock.available():
if self._debug:
print("* DHCP: Parsing ACK")
msg_type, xid = self.parse_dhcp_response()
# Check if transaction ID matches, otherwise it may be
# for another device
if htonl(self._transaction_id) == int.from_bytes(xid, "l"):
if msg_type == DHCP_ACK:
if self._debug:
print("* DHCP: Successful lease")
self._sock.close()
self._sock = None
self._dhcp_state = STATE_DHCP_LEASED
self._last_lease_time = self._start_time
if self._lease_time == 0:
self._lease_time = DEFAULT_LEASE_TIME
if self._t1 == 0:
# T1 is 50% of _lease_time
self._t1 = self._lease_time >> 1
if self._t2 == 0:
# T2 is 87.5% of _lease_time
self._t2 = self._lease_time - (self._lease_time >> 3)
self._renew_in_sec = self._t1
self._rebind_in_sec = self._t2
self._eth.ifconfig = (
self.local_ip,
self.subnet_mask,
self.gateway_ip,
self.dns_server_ip,
)
gc.collect()
else:
if self._debug:
print("* DHCP: Received DHCP Message is not ACK")
else:
if self._debug:
print("* DHCP: Received non-matching xid")
elif self._dhcp_state == STATE_DHCP_WAIT:
if time.time() > (self._start_time + DHCP_WAIT_TIME):
if self._debug:
print("* DHCP: Begin retry")
self._dhcp_state = STATE_DHCP_START
if time.time() > (self._last_lease_time + self._rebind_in_sec):
self.dhcp_server_ip = BROADCAST_SERVER_ADDR
if time.time() > (self._last_lease_time + self._lease_time):
reset_ip = (0, 0, 0, 0)
self._eth.ifconfig = (reset_ip, reset_ip, reset_ip, reset_ip)
elif self._dhcp_state == STATE_DHCP_LEASED:
if time.time() > (self._last_lease_time + self._renew_in_sec):
self._dhcp_state = STATE_DHCP_START
if self._debug:
print("* DHCP: Time to renew lease")
if (
self._dhcp_state == STATE_DHCP_DISCOVER
or self._dhcp_state == STATE_DHCP_REQUEST
) and time.time() > (self._start_time + self._response_timeout):
self._dhcp_state = STATE_DHCP_WAIT
if self._sock is not None:
self._sock.close()
self._sock = None
def request_dhcp_lease(self):
"""Request to renew or acquire a DHCP lease."""
if self._dhcp_state == STATE_DHCP_LEASED or self._dhcp_state == STATE_DHCP_WAIT:
self._dhcp_state = STATE_DHCP_START
while (
self._dhcp_state != STATE_DHCP_LEASED
and self._dhcp_state != STATE_DHCP_WAIT
):
self._dhcp_state_machine()
return self._dhcp_state == STATE_DHCP_LEASED
def maintain_dhcp_lease(self):
"""Maintain DHCP lease"""
self._dhcp_state_machine()
Five, drive 3
wiznet5k_dns.py
# SPDX-FileCopyrightText: 2009-2010 MCQN Ltd
# SPDX-FileCopyrightText: Brent Rubell for Adafruit Industries
# SPDX-FileCopyrightText: 2021 Vincenzo D'Angelo
#
# SPDX-License-Identifier: MIT
"""
`wiznet5k_dns`
================================================================================
Pure-Python implementation of the Micropython on ESP32 DNS client for WIZnet 5k-based
ethernet modules.
* Author(s): MCQN Ltd, Brent Rubell, Vincenzo D'Angelo
"""
import time
import wiznet5k_socket as socket
from wiznet5k_socket import htons
from random import getrandbits
from micropython import const
QUERY_FLAG = const(0x00)
OPCODE_STANDARD_QUERY = const(0x00)
RECURSION_DESIRED_FLAG = 1 << 8
TYPE_A = const(0x0001)
CLASS_IN = const(0x0001)
DATA_LEN = const(0x0004)
# Return codes for gethostbyname
SUCCESS = const(1)
TIMED_OUT = const(-1)
INVALID_SERVER = const(-2)
TRUNCATED = const(-3)
INVALID_RESPONSE = const(-4)
DNS_PORT = const(0x35) # port used for DNS request
class DNS:
"""W5K DNS implementation.
:param iface: Network interface
"""
def __init__(self, iface, dns_address, debug=False):
self._debug = debug
self._iface = iface
socket.set_interface(iface)
self._sock = socket.socket(type=socket.SOCK_DGRAM)
self._sock.settimeout(1)
self._dns_server = dns_address
self._host = 0
self._request_id = 0 # request identifier
self._pkt_buf = bytearray()
def gethostbyname(self, hostname):
"""Translate a host name to IPv4 address format.
:param str hostname: Desired host name to connect to.
Returns the IPv4 address as a bytearray if successful, -1 otherwise.
"""
if self._dns_server is None:
return INVALID_SERVER
self._host = hostname
# build DNS request packet
self._build_dns_header()
self._build_dns_question()
# Send DNS request packet
self._sock.bind((None, DNS_PORT))
self._sock.connect((self._dns_server, DNS_PORT))
if self._debug:
print("* DNS: Sending request packet...")
self._sock.send(self._pkt_buf)
# wait and retry 3 times for a response
retries = 0
addr = -1
while (retries < 5) and (addr == -1):
addr = self._parse_dns_response()
if addr == -1 and self._debug:
print("* DNS ERROR: Failed to resolve DNS response, retrying...")
retries += 1
self._sock.close()
return addr
# pylint: disable=too-many-return-statements, too-many-branches, too-many-statements, too-many-locals
def _parse_dns_response(self):
"""Receives and parses DNS query response.
Returns desired hostname address if obtained, -1 otherwise.
"""
# wait for a response
start_time = time.time()
packet_sz = self._sock.available()
while packet_sz <= 0:
packet_sz = self._sock.available()
if (time.time() - start_time) > 1.0:
if self._debug:
print("* DNS ERROR: Did not receive DNS response!")
return -1
time.sleep(0.05)
# recv packet into buf
self._pkt_buf = self._sock.recv()
if self._debug:
print("DNS Packet Received: ", self._pkt_buf)
# Validate request identifier
xid = int.from_bytes(self._pkt_buf[0:2], "l")
if not xid == self._request_id:
if self._debug:
print("* DNS ERROR: Received request identifer {} does not match expected {}".format(xid, self._request_id))
return -1
# Validate flags
flags = int.from_bytes(self._pkt_buf[2:4], "l")
if not flags in (0x8180, 0x8580):
if self._debug:
print("* DNS ERROR: Invalid flags, ", flags)
return -1
# Number of questions
qr_count = int.from_bytes(self._pkt_buf[4:6], "l")
if not qr_count >= 1:
if self._debug:
print("* DNS ERROR: Question count >=1, ", qr_count)
return -1
# Number of answers
an_count = int.from_bytes(self._pkt_buf[6:8], "l")
if self._debug:
print("* DNS Answer Count: ", an_count)
if not an_count >= 1:
return -1
# Parse query
ptr = 12
name_len = 1
while name_len > 0:
# read the length of the name
name_len = self._pkt_buf[ptr]
if name_len == 0x00:
# we reached the end of this name
ptr += 1 # inc. pointer by 0x00
break
# advance pointer
ptr += name_len + 1
# Validate Query is Type A
q_type = int.from_bytes(self._pkt_buf[ptr : ptr + 2], "l")
if not q_type == TYPE_A:
if self._debug:
print("* DNS ERROR: Incorrect Query Type: ", q_type)
return -1
ptr += 2
# Validate Query is Type A
q_class = int.from_bytes(self._pkt_buf[ptr : ptr + 2], "l")
if not q_class == TYPE_A:
if self._debug:
print("* DNS ERROR: Incorrect Query Class: ", q_class)
return -1
ptr += 2
# Let's take the first type-a answer
if self._pkt_buf[ptr] != 0xC0:
return -1
ptr += 1
if self._pkt_buf[ptr] != 0xC:
return -1
ptr += 1
# Validate Answer Type A
ans_type = int.from_bytes(self._pkt_buf[ptr : ptr + 2], "l")
if not ans_type == TYPE_A:
if self._debug:
print("* DNS ERROR: Incorrect Answer Type: ", ans_type)
return -1
ptr += 2
# Validate Answer Class IN
ans_class = int.from_bytes(self._pkt_buf[ptr : ptr + 2], "l")
if not ans_class == TYPE_A:
if self._debug:
print("* DNS ERROR: Incorrect Answer Class: ", ans_class)
return -1
ptr += 2
# skip over TTL
ptr += 4
# Validate addr is IPv4
data_len = int.from_bytes(self._pkt_buf[ptr : ptr + 2], "l")
if not data_len == DATA_LEN:
if self._debug:
print("* DNS ERROR: Unexpected Data Length: ", data_len)
return -1
ptr += 2
# Return address
return self._pkt_buf[ptr : ptr + 4]
def _build_dns_header(self):
"""Builds DNS header."""
# generate a random, 16-bit, request identifier
self._request_id = getrandbits(16)
# ID, 16-bit identifier
self._pkt_buf.append(self._request_id >> 8)
self._pkt_buf.append(self._request_id & 0xFF)
# Flags (0x0100)
self._pkt_buf.append(0x01)
self._pkt_buf.append(0x00)
# QDCOUNT
self._pkt_buf.append(0x00)
self._pkt_buf.append(0x01)
# ANCOUNT
self._pkt_buf.append(0x00)
self._pkt_buf.append(0x00)
# NSCOUNT
self._pkt_buf.append(0x00)
self._pkt_buf.append(0x00)
# ARCOUNT
self._pkt_buf.append(0x00)
self._pkt_buf.append(0x00)
def _build_dns_question(self):
"""Build DNS question"""
host = self._host.decode("utf-8")
host = host.split(".")
# write out each section of host
for i, _ in enumerate(host):
# append the sz of the section
self._pkt_buf.append(len(host[i]))
# append the section data
self._pkt_buf += host[i]
# end of the name
self._pkt_buf.append(0x00)
# Type A record
self._pkt_buf.append(htons(TYPE_A) & 0xFF)
self._pkt_buf.append(htons(TYPE_A) >> 8)
# Class IN
self._pkt_buf.append(htons(CLASS_IN) & 0xFF)
self._pkt_buf.append(htons(CLASS_IN) >> 8)
6. Drive 4
wiznet5k_socket.py
# SPDX-FileCopyrightText: 2019 ladyada for Adafruit Industries
# SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries
# SPDX-FileCopyrightText: 2021 Vincenzo D'Angelo
#
# SPDX-License-Identifier: MIT
"""
`wiznet5k_socket`
================================================================================
A socket compatible interface with the Wiznet5k module.
* Author(s): ladyada, Brent Rubell, Patrick Van Oosterwijck, Adam Cummick, Vincenzo D'Angelo
"""
import gc
import time
import wiznet5k as wiznet5k
from micropython import const
_the_interface = None # pylint: disable=invalid-name
def set_interface(iface):
"""Helper to set the global internet interface."""
global _the_interface # pylint: disable=global-statement, invalid-name
_the_interface = iface
def htonl(x):
"""Convert 32-bit positive integers from host to network byte order."""
return (
((x) << 24 & 0xFF000000)
| ((x) << 8 & 0x00FF0000)
| ((x) >> 8 & 0x0000FF00)
| ((x) >> 24 & 0x000000FF)
)
def htons(x):
"""Convert 16-bit positive integers from host to network byte order."""
return (((x) << 8) & 0xFF00) | (((x) >> 8) & 0xFF)
SOCK_STREAM = const(0x21) # TCP
TCP_MODE = 80
SOCK_DGRAM = const(0x02) # UDP
AF_INET = const(3)
SOCKET_INVALID = const(255)
# pylint: disable=too-many-arguments, unused-argument
def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0):
"""Translate the host/port argument into a sequence of 5-tuples that
contain all the necessary arguments for creating a socket connected to that service.
"""
if not isinstance(port, int):
raise RuntimeError("Port must be an integer")
if is_ipv4(host):
return [(AF_INET, socktype, proto, "", (host, port))]
return [(AF_INET, socktype, proto, "", (gethostbyname(host), port))]
def gethostbyname(hostname):
"""Translate a host name to IPv4 address format. The IPv4 address
is returned as a string.
:param str hostname: Desired hostname.
"""
addr = _the_interface.get_host_by_name(hostname)
addr = "{}.{}.{}.{}".format(addr[0], addr[1], addr[2], addr[3])
return addr
def is_ipv4(host):
"""Checks if a host string is an IPv4 address.
:param str host: host's name or ip
"""
octets = host.split(".", 3)
if len(octets) != 4 or not "".join(octets).isdigit():
return False
for octet in octets:
if int(octet) > 255:
return False
return True
# pylint: disable=invalid-name, too-many-public-methods
class socket:
"""A simplified implementation of the Python 'socket' class
for connecting to a Wiznet5k module.
:param int family: Socket address (and protocol) family.
:param int type: Socket type.
"""
# pylint: disable=redefined-builtin,unused-argument
def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None, socknum=None):
if family != AF_INET:
raise RuntimeError("Only AF_INET family supported by W5K modules.")
self._sock_type = type
self._buffer = b""
self._timeout = 0
self._listen_port = None
self._socknum = _the_interface.get_socket()
if self._socknum == SOCKET_INVALID:
raise RuntimeError("Failed to allocate socket.")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._sock_type == SOCK_STREAM:
self.disconnect()
stamp = time.time()
while self.status == wiznet5k.SNSR_SOCK_FIN_WAIT:
if time.time() - stamp > 1000:
raise RuntimeError("Failed to disconnect socket")
self.close()
stamp = time.time()
while self.status != wiznet5k.SNSR_SOCK_CLOSED:
if time.time() - stamp > 1000:
raise RuntimeError("Failed to close socket")
@property
def socknum(self):
"""Returns the socket object's socket number."""
return self._socknum
@property
def status(self):
"""Returns the status of the socket"""
return _the_interface.socket_status(self.socknum)[0]
@property
def connected(self):
"""Returns whether or not we are connected to the socket."""
if self.socknum >= _the_interface.max_sockets:
return False
status = _the_interface.socket_status(self.socknum)[0]
if (
status == wiznet5k.SNSR_SOCK_CLOSE_WAIT
and self.available() == 0
):
result = False
else:
result = status not in (
wiznet5k.SNSR_SOCK_CLOSED,
wiznet5k.SNSR_SOCK_LISTEN,
wiznet5k.SNSR_SOCK_TIME_WAIT,
wiznet5k.SNSR_SOCK_FIN_WAIT,
)
if not result and status != wiznet5k.SNSR_SOCK_LISTEN:
self.close()
return result
def getpeername(self):
"""Return the remote address to which the socket is connected."""
return _the_interface.remote_ip(self.socknum)
def inet_aton(self, ip_string):
"""Convert an IPv4 address from dotted-quad string format.
:param str ip_string: IP Address, as a dotted-quad string.
"""
self._buffer = b""
self._buffer = [int(item) for item in ip_string.split(".")]
self._buffer = bytearray(self._buffer)
return self._buffer
def bind(self, address):
"""Bind the socket to the listen port, if host is specified the interface
will be reconfigured to that IP.
:param tuple address: local socket as a (host, port) tuple.
"""
if address[0] is not None:
ip_address = _the_interface.unpretty_ip(address[0])
current_ip, subnet_mask, gw_addr, dns = _the_interface.ifconfig
if ip_address != current_ip:
_the_interface.ifconfig = (ip_address, subnet_mask, gw_addr, dns)
self._listen_port = address[1]
# For UDP servers we need to open the socket here because we won't call
# listen
if self._sock_type == SOCK_DGRAM:
_the_interface.socket_listen(
self.socknum, self._listen_port, wiznet5k.SNMR_UDP
)
self._buffer = b""
def listen(self, backlog=None):
"""Listen on the port specified by bind.
:param backlog: For compatibility but ignored.
"""
assert self._listen_port is not None, "Use bind to set the port before listen!"
_the_interface.socket_listen(self.socknum, self._listen_port)
self._buffer = b""
def accept(self):
"""Accept a connection. The socket must be bound to an address and listening for
connections. The return value is a pair (conn, address) where conn is a new
socket object usable to send and receive data on the connection, and address is
the address bound to the socket on the other end of the connection.
"""
stamp = time.time()
while self.status not in (
wiznet5k.SNSR_SOCK_SYNRECV,
wiznet5k.SNSR_SOCK_ESTABLISHED,
):
if self._timeout > 0 and time.time() - stamp > self._timeout:
return None
if self.status == wiznet5k.SNSR_SOCK_CLOSED:
self.close()
self.listen()
new_listen_socknum, addr = _the_interface.socket_accept(self.socknum)
current_socknum = self.socknum
# Create a new socket object and swap socket nums so we can continue listening
client_sock = socket()
client_sock._socknum = current_socknum # pylint: disable=protected-access
self._socknum = new_listen_socknum # pylint: disable=protected-access
self.bind((None, self._listen_port))
self.listen()
while self.status != wiznet5k.SNSR_SOCK_LISTEN:
raise RuntimeError("Failed to open new listening socket")
return client_sock, addr
def connect(self, address, conntype=None):
"""Connect to a remote socket at address.
:param tuple address: Remote socket as a (host, port) tuple.
"""
assert (
conntype != 0x03
), "Error: SSL/TLS is not currently supported by CircuitPython."
host, port = address
if hasattr(host, "split"):
try:
host = tuple(map(int, host.split(".")))
except ValueError:
host = _the_interface.get_host_by_name(host)
if self._listen_port is not None:
_the_interface.src_port = self._listen_port
result = _the_interface.socket_connect(
self.socknum, host, port, conn_mode=self._sock_type
)
_the_interface.src_port = 0
if not result:
raise RuntimeError("Failed to connect to host", host)
self._buffer = b""
def send(self, data):
"""Send data to the socket. The socket must be connected to
a remote socket.
:param bytearray data: Desired data to send to the socket.
"""
_the_interface.socket_write(self.socknum, data, self._timeout)
gc.collect()
def sendto(self, data, address):
"""Send data to the socket. The socket must be connected to
a remote socket.
:param bytearray data: Desired data to send to the socket.
:param tuple address: Remote socket as a (host, port) tuple.
"""
self.connect(address)
return self.send(data)
def recv(self, bufsize=0, flags=0): # pylint: disable=too-many-branches
"""Reads some bytes from the connected remote address.
:param int bufsize: Maximum number of bytes to receive.
:param int flags: ignored, present for compatibility.
"""
if self.status == wiznet5k.SNSR_SOCK_CLOSED:
return b""
if bufsize == 0:
# read everything on the socket
while True:
avail = self.available()
if avail:
if self._sock_type == SOCK_STREAM:
self._buffer += _the_interface.socket_read(self.socknum, avail)[
1
]
elif self._sock_type == SOCK_DGRAM:
self._buffer += _the_interface.read_udp(self.socknum, avail)[1]
else:
break
gc.collect()
ret = self._buffer
self._buffer = b""
gc.collect()
return ret
stamp = time.time()
to_read = bufsize - len(self._buffer)
received = []
while to_read > 0:
avail = self.available()
if avail:
stamp = time.time()
if self._sock_type == SOCK_STREAM:
recv = _the_interface.socket_read(
self.socknum, min(to_read, avail)
)[1]
elif self._sock_type == SOCK_DGRAM:
recv = _the_interface.read_udp(self.socknum, min(to_read, avail))[1]
recv = bytes(recv)
received.append(recv)
to_read -= len(recv)
gc.collect()
if self._timeout > 0 and time.time() - stamp > self._timeout:
break
self._buffer += b"".join(received)
ret = None
if len(self._buffer) == bufsize:
ret = self._buffer
self._buffer = b""
else:
ret = self._buffer[:bufsize]
self._buffer = self._buffer[bufsize:]
gc.collect()
return ret
def recvfrom(self, bufsize=0, flags=0):
"""Reads some bytes from the connected remote address.
:param int bufsize: Maximum number of bytes to receive.
:param int flags: ignored, present for compatibility.
:returns: a tuple (bytes, address) where address is a tuple (ip, port)
"""
return (
self.recv(bufsize),
(
_the_interface.remote_ip(self.socknum),
_the_interface.remote_port(self.socknum),
),
)
def recv_into(self, buf, nbytes=0, flags=0):
"""Reads some bytes from the connected remote address info the provided buffer.
:param bytearray buf: Data buffer
:param nbytes: Maximum number of bytes to receive
:param int flags: ignored, present for compatibility.
:returns: the number of bytes received
"""
if nbytes == 0:
nbytes = len(buf)
ret = self.recv(nbytes)
nbytes = len(ret)
buf[:nbytes] = ret
return nbytes
def recvfrom_into(self, buf, nbytes=0, flags=0):
"""Reads some bytes from the connected remote address info the provided buffer.
:param bytearray buf: Data buffer
:param nbytes: Maximum number of bytes to receive
:param int flags: ignored, present for compatibility.
:returns a tuple (nbytes, address) where address is a tuple (ip, port)
"""
return (
self.recv_into(buf, nbytes),
(
_the_interface.remote_ip(self.socknum),
_the_interface.remote_port(self.socknum),
),
)
def readline(self):
"""Attempt to return as many bytes as we can up to \
but not including '\r\n'.
"""
stamp = time.time()
while b"\r\n" not in self._buffer:
avail = self.available()
if avail:
if self._sock_type == SOCK_STREAM:
self._buffer += _the_interface.socket_read(self.socknum, avail)[1]
elif self._sock_type == SOCK_DGRAM:
self._buffer += _the_interface.read_udp(self.socknum, avail)[1]
if (
not avail
and self._timeout > 0
and time.time() - stamp > self._timeout
):
self.close()
raise RuntimeError("Didn't receive response, failing out...")
firstline, self._buffer = self._buffer.split(b"\r\n", 1)
gc.collect()
return firstline
def disconnect(self):
"""Disconnects a TCP socket."""
assert self._sock_type == SOCK_STREAM, "Socket must be a TCP socket."
_the_interface.socket_disconnect(self.socknum)
def close(self):
"""Closes the socket."""
_the_interface.socket_close(self.socknum)
def available(self):
"""Returns how many bytes of data are available to be read from the socket."""
return _the_interface.socket_available(self.socknum, self._sock_type)
def settimeout(self, value):
"""Sets socket read timeout.
:param int value: Socket read timeout, in seconds.
"""
if value < 0:
raise Exception("Timeout period should be non-negative.")
self._timeout = value
def gettimeout(self):
"""Return the timeout in seconds (float) associated
with socket operations, or None if no timeout is set.
"""
return self._timeout
Seven, drive 5
sma_esp32_w5500_requests.py
"""
`sma_requests`
================================================================================
A requests-like library for web interfacing
* Author(s): Seyed Mohammad Ayyoubzadeh
Implementation Notes
--------------------
Adapted from https://github.com/adafruit/Adafruit_CircuitPython_Requests
author='Seyed Mohammad Ayyoubzadeh'
license='MIT'
__version__ = "1.0.0"
__repo__ = "https://github.com/Ayyoubzadeh/Micropython-ESP32-W5500--Wiznet-"
"""
import errno
import sys
import json as json_module
always_safe = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
'abcdefghijklmnopqrstuvwxyz'
'0123456789' '_.-')
def encode(s):
res = []
replacements = {}
for c in s:
if c in always_safe:
res.append(c)
continue
res.append('%%%x' % ord(c))
return ''.join(res)
def cast(_t, value):
return value
class _RawResponse:
def __init__(self, response: "Response") -> None:
self._response = response
def read(self, size: int = -1) -> bytes:
"""Read as much as available or up to size and return it in a byte string.
Do NOT use this unless you really need to. Reusing memory with `readinto` is much better.
"""
if size == -1:
return self._response.content
return self._response.socket.recv(size)
def readinto(self, buf: bytearray) -> int:
"""Read as much as available into buf or until it is full. Returns the number of bytes read
into buf."""
return self._response._readinto(buf) # pylint: disable=protected-access
class OutOfRetries(Exception):
"""Raised when requests has retried to make a request unsuccessfully."""
class Response:
"""The response from a request, contains all the headers/content"""
# pylint: disable=too-many-instance-attributes
encoding = None
def __init__(self, sock: SocketType, session: Optional["Session"] = None) -> None:
self.socket = sock
self.encoding = "utf-8"
self._cached = None
self._headers = {}
# _start_index and _receive_buffer are used when parsing headers.
# _receive_buffer will grow by 32 bytes everytime it is too small.
self._received_length = 0
self._receive_buffer = bytearray(32)
self._remaining = None
self._chunked = False
self._backwards_compatible = not hasattr(sock, "recv_into")
http = self._readto(b" ")
if not http:
if session:
session._close_socket(self.socket)
else:
self.socket.close()
raise RuntimeError("Unable to read HTTP response.")
self.status_code = int(bytes(self._readto(b" ")))
self.reason = self._readto(b"\r\n")
self._parse_headers()
self._raw = None
self._session = session
def __enter__(self) -> "Response":
return self
def __exit__(
self,
exc_type: Optional[Type[type]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
self.close()
def _recv_into(self, buf: bytearray, size: int = 0) -> int:
if self._backwards_compatible:
size = len(buf) if size == 0 else size
b = self.socket.recv(size)
read_size = len(b)
buf[:read_size] = b
return read_size
return cast("SupportsRecvInto", self.socket).recv_into(buf, size)
def _readto(self, stop: bytes) -> bytearray:
buf = self._receive_buffer
end = self._received_length
while True:
i = bytes(buf).find(stop, 0, end)
if i >= 0:
# Stop was found. Return everything up to but not including stop.
result = buf[:i]
new_start = i + len(stop)
# Remove everything up to and including stop from the buffer.
new_end = end - new_start
buf[:new_end] = buf[new_start:end]
self._received_length = new_end
return result
# Not found so load more bytes.
# If our buffer is full, then make it bigger to load more.
if end == len(buf):
new_buf = bytearray(len(buf) + 32)
new_buf[: len(buf)] = buf
buf = new_buf
self._receive_buffer = buf
read = self._recv_into(memoryview(buf)[end:])
if read == 0:
self._received_length = 0
return buf[:end]
end += read
def _read_from_buffer(
self, buf: Optional[bytearray] = None, nbytes: Optional[int] = None
) -> int:
if self._received_length == 0:
return 0
read = self._received_length
if nbytes < read:
read = nbytes
membuf = memoryview(self._receive_buffer)
if buf:
buf[:read] = membuf[:read]
if read < self._received_length:
new_end = self._received_length - read
self._receive_buffer[:new_end] = membuf[read : self._received_length]
self._received_length = new_end
else:
self._received_length = 0
return read
def _readinto(self, buf: bytearray) -> int:
if not self.socket:
raise RuntimeError(
"Newer Response closed this one. Use Responses immediately."
)
if not self._remaining:
# Consume the chunk header if need be.
if self._chunked:
# Consume trailing \r\n for chunks 2+
if self._remaining == 0:
self._throw_away(2)
chunk_header = bytes(self._readto(b"\r\n")).split(b";", 1)[0]
http_chunk_size = int(bytes(chunk_header), 16)
if http_chunk_size == 0:
self._chunked = False
self._parse_headers()
return 0
self._remaining = http_chunk_size
else:
return 0
nbytes = len(buf)
if nbytes > self._remaining:
nbytes = self._remaining
read = self._read_from_buffer(buf, nbytes)
if read == 0:
read = self._recv_into(buf, nbytes)
self._remaining -= read
return read
def _throw_away(self, nbytes: int) -> None:
nbytes -= self._read_from_buffer(nbytes=nbytes)
buf = self._receive_buffer
len_buf = len(buf)
for _ in range(nbytes // len_buf):
to_read = len_buf
while to_read > 0:
to_read -= self._recv_into(buf, to_read)
to_read = nbytes % len_buf
while to_read > 0:
to_read -= self._recv_into(buf, to_read)
def close(self) -> None:
"""Drain the remaining ESP socket buffers. We assume we already got what we wanted."""
if not self.socket:
return
# Make sure we've read all of our response.
if self._cached is None:
if self._remaining and self._remaining > 0:
self._throw_away(self._remaining)
elif self._chunked:
while True:
chunk_header = bytes(self._readto(b"\r\n")).split(b";", 1)[0]
chunk_size = int(bytes(chunk_header), 16)
if chunk_size == 0:
break
self._throw_away(chunk_size + 2)
self._parse_headers()
if self._session:
self._session._free_socket(self.socket) # pylint: disable=protected-access
else:
self.socket.close()
self.socket = None
def _parse_headers(self) -> None:
"""
Parses the header portion of an HTTP request/response from the socket.
Expects first line of HTTP request/response to have been read already.
"""
while True:
header = self._readto(b"\r\n")
if not header:
break
title, content = bytes(header).split(b": ", 1)
if title and content:
# enforce that all headers are lowercase
title = str(title, "utf-8").lower()
content = str(content, "utf-8")
if title == "content-length":
self._remaining = int(content)
if title == "transfer-encoding":
self._chunked = content.strip().lower() == "chunked"
if title == "set-cookie" and title in self._headers:
self._headers[title] += ", " + content
else:
self._headers[title] = content
def _validate_not_gzip(self) -> None:
"""gzip encoding is not supported. Raise an exception if found."""
if (
"content-encoding" in self.headers
and self.headers["content-encoding"] == "gzip"
):
raise ValueError(
"Content-encoding is gzip, data cannot be accessed as json or text. "
"Use content property to access raw bytes."
)
@property
def headers(self) -> Dict[str, str]:
"""
The response headers. Does not include headers from the trailer until
the content has been read.
"""
return self._headers
@property
def content(self) -> bytes:
"""The HTTP content direct from the socket, as bytes"""
if self._cached is not None:
if isinstance(self._cached, bytes):
return self._cached
raise RuntimeError("Cannot access content after getting text or json")
self._cached = b"".join(self.iter_content(chunk_size=32))
return self._cached
@property
def text(self) -> str:
"""The HTTP content, encoded into a string according to the HTTP
header encoding"""
if self._cached is not None:
if isinstance(self._cached, str):
return self._cached
raise RuntimeError("Cannot access text after getting content or json")
self._validate_not_gzip()
self._cached = str(self.content, self.encoding)
return self._cached
def json(self) -> Any:
"""The HTTP content, parsed into a json dictionary"""
# The cached JSON will be a list or dictionary.
if self._cached:
if isinstance(self._cached, (list, dict)):
return self._cached
raise RuntimeError("Cannot access json after getting text or content")
if not self._raw:
self._raw = _RawResponse(self)
self._validate_not_gzip()
obj = json_module.load(self._raw)
if not self._cached:
self._cached = obj
self.close()
return obj
def iter_content(self, chunk_size: int = 1, decode_unicode: bool = False) -> bytes:
"""An iterator that will stream data by only reading 'chunk_size'
bytes and yielding them, when we can't buffer the whole datastream"""
if decode_unicode:
raise NotImplementedError("Unicode not supported")
b = bytearray(chunk_size)
while True:
size = self._readinto(b)
if size == 0:
break
if size < chunk_size:
chunk = bytes(memoryview(b)[:size])
else:
chunk = bytes(b)
yield chunk
self.close()
class Session:
"""HTTP session that shares sockets and ssl context."""
def __init__(
self,
socket_pool: SocketpoolModuleType,
ssl_context: Optional[SSLContextType] = None,
) -> None:
self._socket_pool = socket_pool
self._ssl_context = ssl_context
# Hang onto open sockets so that we can reuse them.
self._open_sockets = {}
self._socket_free = {}
self._last_response = None
def _free_socket(self, socket: SocketType) -> None:
if socket not in self._open_sockets.values():
raise RuntimeError("Socket not from session")
self._socket_free[socket] = True
def _close_socket(self, sock: SocketType) -> None:
sock.close()
del self._socket_free[sock]
key = None
for k in self._open_sockets: # pylint: disable=consider-using-dict-items
if self._open_sockets[k] == sock:
key = k
break
if key:
del self._open_sockets[key]
def _free_sockets(self) -> None:
free_sockets = []
for sock, val in self._socket_free.items():
if val:
free_sockets.append(sock)
for sock in free_sockets:
self._close_socket(sock)
def _get_socket(
self, host: str, port: int, proto: str, *, timeout: float = 1
) -> CircuitPythonSocketType:
# pylint: disable=too-many-branches
key = (host, port, proto)
if key in self._open_sockets:
sock = self._open_sockets[key]
if self._socket_free[sock]:
self._socket_free[sock] = False
return sock
if proto == "https:" and not self._ssl_context:
raise RuntimeError(
"ssl_context must be set before using adafruit_requests for https"
)
addr_info = self._socket_pool.getaddrinfo(
host, port, 0, self._socket_pool.SOCK_STREAM
)[0]
retry_count = 0
sock = None
while retry_count < 5 and sock is None:
if retry_count > 0:
if any(self._socket_free.items()):
self._free_sockets()
else:
raise RuntimeError("Sending request failed")
retry_count += 1
try:
sock = self._socket_pool.socket(addr_info[0], addr_info[1])
except OSError:
continue
except RuntimeError:
continue
connect_host = addr_info[-1][0]
if proto == "https:":
sock = self._ssl_context.wrap_socket(sock, server_hostname=host)
connect_host = host
sock.settimeout(timeout) # socket read timeout
try:
sock.connect((connect_host, port))
except MemoryError:
sock.close()
sock = None
except OSError:
sock.close()
sock = None
if sock is None:
raise RuntimeError("Repeated socket failures")
self._open_sockets[key] = sock
self._socket_free[sock] = False
return sock
@staticmethod
def _send(socket: SocketType, data: bytes):
total_sent = 0
while total_sent < len(data):
# ESP32SPI sockets raise a RuntimeError when unable to send.
try:
sent = socket.send(data[total_sent:])
#print("\r\nsent",sent,"bytes")
except OSError as exc:
if exc.errno == errno.EAGAIN:
#print("\r\ncant send")
# Can't send right now (e.g., no buffer space), try again.
continue
# Some worse error.
raise
except RuntimeError as exc:
#print("\r\ncant send2")
raise OSError(errno.EIO) from exc
if sent is None:
sent = len(data)
if sent == 0:
# Not EAGAIN; that was already handled.
#print("\r\ncant send3")
raise OSError(errno.EIO)
#print("sent at the end",sent)
total_sent += sent
#print("total_sent at the end",total_sent)
def _send_request(
self,
socket: SocketType,
host: str,
method: str,
path: str,
headers: List[Dict[str, str]],
data: Any,
json: Any,
):
# pylint: disable=too-many-arguments
h=b''
h+=bytes(method, "utf-8")
h+=b" /"
h+=bytes(path, "utf-8")
h+=b" HTTP/1.1\r\n"
if "Host" not in headers:
h+=b"Host: "
h+=bytes(host, "utf-8")
h+=b"\r\n"
if "User-Agent" not in headers:
h+=b"User-Agent: Mozilla/5.0\r\n"
# Iterate over keys to avoid tuple alloc
for k in headers:
h+=k.encode()
h+=b": "
h+=headers[k].encode()
h+=b"\r\n"
if json is not None:
assert data is None
data = json_module.dumps(json)
h+=b"Content-Type: application/json\r\n"
if data:
if isinstance(data, dict):
h+=b"Content-Type: application/x-www-form-urlencoded\r\n"
_post_data = ""
for k in data:
data[k]=data[k]
_post_data +=(str(k)+"="+encode(str(data[k]))+"&")
data = _post_data[:-1]
if isinstance(data, str):
data = bytes(data, "utf-8")
h+=(b"Content-Length: %d\r\n" % len(data))
h+=b"\r\n"
if data:
h+=bytes(data)
#print("\r\nencoded data",h)
self._send(socket, h)
#print("\r\nsent",h)
# pylint: disable=too-many-branches, too-many-statements, unused-argument, too-many-arguments, too-many-locals
def request(
self,
method: str,
url: str,
data: Optional[Any] = None,
json: Optional[Any] = None,
headers: Optional[List[Dict[str, str]]] = None,
stream: bool = False,
timeout: float = 60,
) -> Response:
"""Perform an HTTP request to the given url which we will parse to determine
whether to use SSL ('https://') or not. We can also send some provided 'data'
or a json dictionary which we will stringify. 'headers' is optional HTTP headers
sent along. 'stream' will determine if we buffer everything, or whether to only
read only when requested
"""
if not headers:
headers = {}
try:
proto, dummy, host, path = url.split("/", 3)
# replace spaces in path
path = path.replace(" ", "%20")
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if proto == "http:":
port = 80
elif proto == "https:":
port = 443
else:
raise ValueError("Unsupported protocol: " + proto)
if ":" in host:
host, port = host.split(":", 1)
port = int(port)
if self._last_response:
self._last_response.close()
self._last_response = None
# We may fail to send the request if the socket we got is closed already. So, try a second
# time in that case.
retry_count = 0
while retry_count < 2:
retry_count += 1
socket = self._get_socket(host, port, proto, timeout=timeout)
ok = True
try:
self._send_request(socket, host, method, path, headers, data, json)
#print('sent')
except OSError:
#print('OSError')
ok = False
if ok:
# Read the H of "HTTP/1.1" to make sure the socket is alive. send can appear to work
# even when the socket is closed.
if hasattr(socket, "recv"):
result = socket.recv(1)
else:
result = bytearray(1)
try:
socket.recv_into(result)
except OSError:
#print('OSError')
pass
if result == b"H":
# Things seem to be ok so break with socket set.
break
self._close_socket(socket)
socket = None
if not socket:
raise OutOfRetries("Repeated socket failures")
resp = Response(socket, self) # our response
#print(resp.text)
#print(resp.headers)
#print("\r\n"+str(resp.status_code)+"\r\n")
if "location" in resp.headers and 300 <= resp.status_code <= 399:
return resp
# a naive handler for redirects
redirect = resp.headers["location"]
if redirect.startswith("http"):
# absolute URL
url = redirect
elif redirect[0] == "/":
# relative URL, absolute path
url = "/".join([proto, dummy, host, redirect[1:]])
else:
# relative URL, relative path
path = path.rsplit("/", 1)[0]
while redirect.startswith("../"):
path = path.rsplit("/", 1)[0]
redirect = redirect.split("../", 1)[1]
url = "/".join([proto, dummy, host, path, redirect])
self._last_response = resp
resp = self.request(method, url, data, json, headers, stream, timeout)
self._last_response = resp
return resp
def head(self, url: str, **kw) -> Response:
"""Send HTTP HEAD request"""
return self.request("HEAD", url, **kw)
def get(self, url: str, **kw) -> Response:
"""Send HTTP GET request"""
return self.request("GET", url, **kw)
def post(self, url: str, **kw) -> Response:
"""Send HTTP POST request"""
return self.request("POST", url, **kw)
def put(self, url: str, **kw) -> Response:
"""Send HTTP PUT request"""
return self.request("PUT", url, **kw)
def patch(self, url: str, **kw) -> Response:
"""Send HTTP PATCH request"""
return self.request("PATCH", url, **kw)
def delete(self, url: str, **kw) -> Response:
"""Send HTTP DELETE request"""
return self.request("DELETE", url, **kw)
# Backwards compatible API:
_default_session = None # pylint: disable=invalid-name
class _FakeSSLSocket:
def __init__(self, socket: CircuitPythonSocketType, tls_mode: int) -> None:
self._socket = socket
self._mode = tls_mode
self.settimeout = socket.settimeout
self.send = socket.send
self.recv = socket.recv
self.close = socket.close
def connect(self, address: Tuple[str, int]) -> None:
"""connect wrapper to add non-standard mode parameter"""
try:
return self._socket.connect(address, self._mode)
except RuntimeError as error:
raise OSError(errno.ENOMEM) from error
class _FakeSSLContext:
def __init__(self, iface: InterfaceType) -> None:
self._iface = iface
def wrap_socket(
self, socket: CircuitPythonSocketType, server_hostname: Optional[str] = None
) -> _FakeSSLSocket:
"""Return the same socket"""
# pylint: disable=unused-argument
return _FakeSSLSocket(socket, self._iface.TLS_MODE)
def set_socket(
sock: SocketpoolModuleType, iface: Optional[InterfaceType] = None
) -> None:
"""Legacy API for setting the socket and network interface. Use a `Session` instead."""
global _default_session # pylint: disable=global-statement,invalid-name
if not iface:
# pylint: disable=protected-access
_default_session = Session(sock, _FakeSSLContext(sock._the_interface))
else:
_default_session = Session(sock, _FakeSSLContext(iface))
sock.set_interface(iface)
def request(
method: str,
url: str,
data: Optional[Any] = None,
json: Optional[Any] = None,
headers: Optional[List[Dict[str, str]]] = None,
stream: bool = False,
timeout: float = 1,
) -> None:
"""Send HTTP request"""
# pylint: disable=too-many-arguments
_default_session.request(
method,
url,
data=data,
json=json,
headers=headers,
stream=stream,
timeout=timeout,
)
def head(url: str, **kw):
"""Send HTTP HEAD request"""
return _default_session.request("HEAD", url, **kw)
def get(url: str, **kw):
"""Send HTTP GET request"""
return _default_session.request("GET", url, **kw)
def post(url: str, **kw):
"""Send HTTP POST request"""
return _default_session.request("POST", url, **kw)
def put(url: str, **kw):
"""Send HTTP PUT request"""
return _default_session.request("PUT", url, **kw)
def patch(url: str, **kw):
"""Send HTTP PATCH request"""
return _default_session.request("PATCH", url, **kw)
def delete(url: str, **kw):
"""Send HTTP DELETE request"""
return _default_session.request("DELETE", url, **kw)
Eight, sample code
example.py
from machine import Pin,SPI
from wiznet5k import WIZNET5K
import wiznet5k_socket as socket
import time
#import socket
spi = SPI(1,baudrate = 8_000_000,sck = Pin(2),mosi = Pin(3),miso = Pin(10,Pin.IN))
net = WIZNET5K(spi,cs = Pin(7,Pin.OUT),reset = Pin(6))
led = Pin(12,Pin.OUT)
#目标主机地址
addr = ("192.168.1.10",8080)
#文本
Text = "Shanghai欢迎你的光临".encode("UTF-8")
# UDP套接字
udp_socket = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
# 绑定地址和端口
udp_socket.bind(("192.168.1.55",8080))
# TCP套接字
tcp_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 绑定地址和端口
tcp_socket.bind(("192.168.1.12",8080))
# 连接服务器
tcp_socket.connect(("192.168.1.10",8080))
def W5500_Init():
Chip = net.chip
Mac = net.pretty_mac(net.mac_address)
ip = net.pretty_ip(net.ip_address)
print("Chip: %s"%(Chip))
print("Mac: %s"%(Mac))
print("Ip: %s"%(ip))
# 网卡显示
# 显示MAC
# IP地址
def W5500_UDP():
#实现发送数据到调测端
try:
for i in range(10):
udp_socket.sendto(Text + str(i),addr)
#udp_socket.sendto("Shanghai欢迎你的光临".encode("UTF-8"),addr)
time.sleep(0.5)
except:
udp_socket.close()
def W5500_TCP(mode):
# 实现发送数据和接收数据
try:
if mode:
for i in range(10):
tcp_socket.send(Text)
time.sleep(0.5)
else:
data,address = tcp_socket.recvfrom(1)
print("data:%s address:%s"%(data.decode("UTF-8"),address))
Data = bytearray(data).decode("UTF-8")
if Data == 'k':
led.value(1)
else:
led.value(0)
except:
tcp_socket.close()
def main():
W5500_Init()
while True:
#W5500_UDP()
W5500_TCP(0)
if __name__ == "__main__":
main()
Download the network debugging assistant, Baidu download it yourself or get it from my article:
NetAssist Network Debugging Assistant V5.0.7-Software Tools-Savage Home
Internet of Things Development Notes (52) - Using Micropython to develop ESP32 development board W5500 Ethernet network module wired network communication_esp32 wired network_Modupiaoxue Blog-CSDN Blog
Use Micropython to develop wired network communication of W5500 Ethernet network module of ESP32 development board
https://blog.csdn.net/zhusongziye/article/details/128027511?ops_request_misc=&request_id=77ca849dc17f40b4905c552f9c85d1a8&biz_id=&utm_medium=distribute.pc_search_result.none-task-blog- 2~blog~koosearch~default-1-128027511-null- null.268^v1^control&utm_term=w5500&spm=1018.2226.3001.4450&ydreferer=aHR0cHM6Ly9zby5jc2RuLm5ldC9zby9zZWFyY2g%2FcT13NTUwMCZ0PWJsb2cmdT16aHVzb25neml 5ZQ%3D%3D
Then open the network debugging assistant:
Run the program, then you can see the relevant information received and sent. It's very simple, let's try it out for ourselves.
9. Module Introduction