-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpingClient.py
More file actions
145 lines (109 loc) · 4.48 KB
/
pingClient.py
File metadata and controls
145 lines (109 loc) · 4.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import os
import socket
import struct
import time
def ping(hostname):
# Create a raw socket
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
# Set 4 second timeout for ping requests
sock.settimeout(4)
# 32 bytes worth of dummy data
ping_data = 'abcdefghijklmnopqrstuvwabcdefghi'
# Get corresponding IP
try:
dest_ip = socket.gethostbyname(hostname)
except socket.gaierror:
print(f'Ping request could not find host {hostname}. Please check the name and try again.')
return
# Print request
print(f"PING {hostname} ({dest_ip}): {len(ping_data)} data bytes")
# Send 4 individual echo requests
for i in range(1, 5):
# Record time
time_sent = time.time()
# Send and record request
request = send_echo_request(sock, dest_ip, ping_data, i)
# Listen for reply
response_datagram = check_for_reply(sock)
icmp_reply = response_datagram[20:] # The first 20 bytes will be the IP header
rtt = get_response_time(time_sent)
# If reply is valid
if validate_reply(request, icmp_reply):
# Print details
print_response_stats(response_datagram, dest_ip, rtt)
# Then wait until 1 second has passed
time.sleep(1 - (rtt / 1000)) if rtt < 1000 else None
def print_response_stats(response_datagram, dest_ip, rtt):
# Extract relevant values
ttl = struct.unpack('B', response_datagram[8:9])[0] # Get ttl from IP header
reply_data_length = len(response_datagram[28:]) # Anything past byte 28 will be the payload
# Unpack ICMP header field values
icmp_type, icmp_code, icmp_cs, icmp_id, icmp_seq_num = struct.unpack('>bbHHh', response_datagram[20:28])
print(f'{reply_data_length} bytes from {dest_ip}: icmp_seq={icmp_seq_num} ttl={ttl} time={rtt}ms')
def validate_reply(request, reply):
# Unpack header details for request and reply
req_id, req_seq_num = struct.unpack('Hh', request[4:8])
reply_type, reply_code, reply_checksum, reply_id, reply_seq_num = struct.unpack('bbHHh', reply[:8])
# Check correct type of message
if reply_type != 0 or reply_code != 0:
print("Received response is not a valid echo reply.")
return False
# Validate checksum
if calc_checksum(reply) != 0:
print("Invalid checksum.")
return False
# Ensure IDs match
if reply_id != req_id:
print("Received reply has an invalid ID.")
return False
# Ensure sequence numbers match
if reply_seq_num != req_seq_num:
print("Received reply has an incorrect sequence number.")
return False
return True
def get_response_time(time_sent):
# Calculate difference and convert to ms
response_time = (time.time() - time_sent) * 1000
return round(response_time, 3)
def send_echo_request(sock, dest_ip, data, icmp_seq_num):
# Build ICMP message
icmp_type = 8 # Echo request
icmp_code = 0
icmp_checksum = 0 # Set to zero initially, before calculation
icmp_id = os.getpid() & 0xFFFF # Use process ID (16 bit) for ICMP ID
# Create message with checksum 0
header = struct.pack('>bbHHh', icmp_type, icmp_code, icmp_checksum, icmp_id, icmp_seq_num)
icmp_data = data.encode()
message = header + icmp_data
# Calculate checksum
icmp_checksum = calc_checksum(message)
# Recreate message with checksum
header = struct.pack('>bbHHh', icmp_type, icmp_code, icmp_checksum, icmp_id, icmp_seq_num)
message = header + icmp_data
# Send to port 0 (network layer)
sock.sendto(message, (dest_ip, 0))
return message
def calc_checksum(message):
result = 0
length = len(message)
# If the length is odd, add an extra empty byte
if length % 2 != 0:
message += b'\x00'
length += 1
# Iterate through each set of two bytes
for i in range(0, length, 2):
# Add the bytes to result as a 16 bit word
result += (message[i] << 8) + message[i + 1]
# Fold over the carry bits
result = (result & 0xFFFF) + (result >> 16)
# Take the ones complement of the sum
return ~result & 0xFFFF
def check_for_reply(sock):
try:
# Wait for a response until timeout
response, address = sock.recvfrom(104) # 104 = 64 bytes (standard windows) + 40 bytes for IPv6 header
return response
except TimeoutError:
print("Request timed out.")
user_input = input('Enter hostname: ')
ping(user_input)