Python黑帽子-网络编程

作者: const27 分类: All,开发-Python 发布时间: 2020-09-11 18:07

socket 库

速查:https://www.cnblogs.com/wumingxiaoyao/p/7047658.html

TCP服务端

import socket
import threading
host="0.0.0.0"
port=

def client_handle(client):
    request=client.recv(1024)   #request是从client发来的数据的前1024字节的数据
    print("[+]recv:%s"%(request))
    client.send("ACK!".encode())        #向客户端返回数据,是bytes,所以需要对其使用encode将其从str变为bytes
server=socket.socket(socket.AF_INET,socket.SOCK_STREAM) #创立一个socker套接字,AF_INET是启用IPV4地址,SOCK_STRAM是启用TCP协议
server.bind((host,port))       #将服务器绑定到指定IP的指定端口,bind方法只接受一个参数
server.listen(5)             #设置服务器可同时链接五个请求
print("[+]Server is listening %s%d"%(host,port))

while true:
    client,addr=client.accept() #client变量用于处理客户端情况,addr变量是用于远程链接细节(第一个变量,第二个变量)
    client_handler=threading.Thread(target=client_handle,args=(client,))
    client_handler.start()

TCP客户端

import socket

t_host="127.0.0.1"
t_port=8080

client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect((t_host,t_port))           #connect只支持一个参数
strs="nmsl"
client.send(strs.encode(encoding="utf8"))   #发送数据到服务器
response=client.recv(1024)   
print(response)

UDP客户端

import socket

target_host="127.0.0.1"
target_port=8080

client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)   #SOCK_DGRAM表明是UDP
client.sendto("NMSL".encode(),(target_host,target_port)) #socket的udp套接字可以直接向指定HOST,PORT发送数据而不需要链接
request=client.resvfrom(1024)    #UDP网络套接字用recvfrom接收指定字节
print(request)

UDP服务端

import socket

host="0.0.0.0"
port=8080

server=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
server.bind((host,port))
print("[+]Server is listening %s:%d"%(host,port))
while True:
    data,addr=server.recvfrom(1024)    #data用于接收传来的数据,addr记录数据来源的IP和PORT
    print("[*]recv data:%s from %s"%(data,addr))
    print("[*]Input your strs:",end="")
    strs=input() 
    server.sendto(strs.encode(),addr)
    print("[*]data:(%s) has been sent"%(strs))

网络嗅探

网络嗅探会用到的一些函数

s=socket.socket(family=AF_INET(地址簇), type=SOCK_STREAM(套接字类型), proto=0(协议号), fileno=None)   创建套接字
地址簇: AF_INET启用IPV4  AF_INET启ipv6  AF_UNIX单一的Unix系统进程间通信 或其他
套接字类型:SOCK_STREAM tcp类型、SOCK_DGRAM udp类型、SOCK_RAW 原始套接字 或其他

s.setsockopt(level,optname.value:int)   为套接字设置选项
level:选择所在的协议层 SOL_SOCKET:通用套接字选项.IPPROTO_IP:IP选项.IPPROTO_TCP:TCP选项. 
optname:指定所选level的控制方式,有很多。举例:IP_HDRINCL,IPPROTO_IP level的选项,用于在数据包中包含IP首部
value: 是optname选项的参数值。
详见https://www.cnblogs.com/gangzilife/p/9766114.html

s.ioctl(control,option)    用于控制windows的ioctl(用户隔离模式与内核模式下的组件通信方式)
举例: s.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON) 发送ioctl信号请求网卡开启混杂模式(可以嗅探到所有经过网卡的数据包)

本地包嗅探

import socket
import os

host="127.0.0.1"

if os.name=="nt":  #os.name表示计算机系统名,posix , nt , java, 对应linux/windows/java虚拟机
    proctool = socket.IPPROTO_IP  #用于socket.socket的参数,意义在于仅处理IP包
else:
    proctool = socket.IPPROTO_ICMP  #意义在于仅处理ICMP

while True:
    s=socket.socket(socket.AF_INET,socket.SOCK_RAW,proctool) #SOCK_RAW 获得原始的数据
    s.bind((host,8000))
    s.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)  #设置数据包自动包含IP头部

    if os.name=="nt":
        s.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)   #windows特有机制,需要发送ioctl信号告知内核开启网卡混杂模式

    print(s.recvfrom(8096))

    if os.name=="nt":
        s.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)
#这里需要注意的是,这个方法,Windows可以抓所有类型的数据包,而linux只能抓ICMP包

这是嗅探到数据的模样

可以发现是一堆无法辨认的字符,我们就需要对其进行解码

IP&ICMP解码

在这里就需要介绍一下python处理数据时用到的ctyes和structs了。

ctypes

ctypes是python的一个库,它提供了与C兼容的数据类型,也就是说可以使用C语言的一些东西
我们这里就仅介绍一下结构体(因为接下来会用到)

以上是结构体中指定的数据类型,那么我们如何去定义一个结构体呢。

1.从字节录入数据到结构体
from ctypes import *

class Student(Structure):  #定义一个类继承自Structure
    _fields_=[             #定义好我们的结构体。结构体的数据部分一般有三个参数。(变量名,变量类型,变量为空时的预设值,from_buffer_copy返回的数据就会存储到这个变量
        ("ihl",     c_ubyte,4),
        ("version", c_ubyte,4),
        ("tos",     c_ubyte),
        ("len",     c_ushort),
        ("id",      c_ushort),
        ("offset",  c_ubyte),
        ("ttl",     c_ubyte),
        ("protocol_num", c_ubyte),
        ("sum",     c_ushort),
        ("src",     c_ulong),
        ("dst",     c_ulong)
        ] 
    def __new__(self,socket_buffer=None):  #从字节录入数据时必须同时用到__new__和__init__  其中__new__的意义是如何实例化一个对象。
        return self.from_buffer_copy(socket_buffer)  #这里如何实例化对象:从缓冲区读取数据然后实例化对象。从字节录入数据时必须用到此方法
    def __init__(self,socket_buffer=None):  #__init__方法就任意了
        pass

a=b'E\x00\x00r\xf5n@\x00@\x06\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01'
ss=Student(a)
print(ss.tos)

##说实话我并不是很能理解字节是怎么按规矩录入结构体的
>>>5

2.从列表录入数据到结构体
from ctypes import *

class Student(Structure):
    _fields_=[           #列表的数据就会存储到这个变量
        ("name",c_char),
        ("age",c_int),
        ("sex",c_char)
    ]

lista=[c_char(b"\x09"),c_int(11),c_char(b"\x07")] #定义一个列表,数据可以不需要定义类型,结构体的数据和列表的数据是一一对应录入的,所以顺序不能错
ss=Student(*lista)  #必须打*表示实际传入的参数数量不确定
print(ss.age)

>>>11

struct

struct库一般用于处理数据,其方法数量较少。常用的一般为pack,unpack
pack方法功能是按照指定的格式解析数据。
pack(fmt,v1,v2,v3…)
fmt格式如下

字符字节顺序大小对齐方式
@按原字节按原字节按原字节
=按原字节标准
<小端标准
>大端标准
!网络(=大端)标准
字符C类型python类型标准尺寸
x填充字节没有意义的值
cchar长度为1的字节1
bsigned char整型1
Bunsigned char整型1
_Bool布尔1
hshort整型2
Hunsigned short整型2
iint整型4
Iunsigned int整型4
llong整型4
Lunsigned long整型4
qlong long整型8
Qunsigned long long整型8
nssize_t整型
Nsize_t整型
e浮动2
ffloat浮动4
ddouble浮动8
schar[]字节
pchar[]字节
Pvoid *整型

大端和小端

对于整型、长整型等数据类型,Big endian 认为第一个字节是最高位字节(按照从低地址到高地址的顺序存放数据的高位字节到低位字节);而 Little endian 则相反,它认为第一个字节是最低位字节(按照从低地址到高地址的顺序存放据的低位字节到高位字节)。

python中的__new__和__init__区别

定义一个类时可能会用到__new__和__init__.
区别在于__new__的意义在于如何实例化一个对象
__init__的意义在于实例化对象后对变量进行赋值

IPV4&&ICMP数据包

IP解码

import socket
import os
import struct
from ctypes import *

host="127.0.0.1"

class IP(Structure):  #这里是定义一个结构体类,继承自Structure
    _fields_=[        #_fields_是该类的一个特别变量,用于存储实例化时传入的数据
        ("ihl",     c_ubyte,4),
        ("version", c_ubyte,4),
        ("tos",     c_ubyte),
        ("len",     c_ushort),
        ("id",      c_ushort),
        ("offset",  c_ushort),
        ("ttl",     c_ubyte),
        ("protocol_num", c_ubyte),
        ("sum",     c_ushort),
        ("src",     c_ulong),
        ("dst",     c_ulong)
        ] 
    def __new__(self,socket_buffer=None):  #将缓冲区里的数据拿进来实例化对象(若不进行这一步,那么传入到对象的数据无法被加载进结构体)
        return self.from_buffer_copy(socket_buffer)

    def __init__(self,socket_buffer=None):
        self.protocol_map={1:"ICMP",6:"TCP",17:"UDP"}
        self.src_address=socket.inet_ntoa(struct.pack("<L",self.src)) #struck.pack按照<L的形式格式化返回指定字符串。socket.inet_ntoa将32位IPV4字节转换为可读的IPV4地址如 127.0.0.1
        self.dst_address=socket.inet_ntoa(struct.pack("<L",self.dst))

        try:
            self.protocol=self.protocol_map[self.protocol_num]
        except:
            self.protocol=str(self.protocol_num)

if os.name=="nt":
    socket_protocol=socket.IPPROTO_IP
else:
    socket_protocol=socket.IPPROTO_ICMP


sniffer=socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)

sniffer.bind((host,0))

sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

if os.name=="nt":
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

try:

    while True:
        data=(sniffer.recvfrom(8096)[0])
        ip_header=IP(data)
        print("Protocol:%s %s -> %s"% (ip_header.protocol,ip_header.src_address,ip_header.dst_address))
except KeyboardInterrupt:
    if os.name=="nt":
        sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

ICMP解码


#coding=utf-8
import socket
import os
import struct
from ctypes import *
 
host = "127.0.0.1"
 
class IP(Structure):
	"""docstring for IP"""
	_fields_ = [
		("ihl",			c_ubyte, 4),
		("version",		c_ubyte, 4),
		("tos",			c_ubyte),
		("len",			c_ushort),
		("id",			c_ushort),
		("offset",		c_ushort),
		("ttl",			c_ubyte),
		("protocol_num",	c_ubyte),
		("sum",			c_ushort),
		("src",			c_ulong),
		("dst",			c_ulong)
	]
 
	def __new__(self,socket_buffer=None):
		return self.from_buffer_copy(socket_buffer)
 
	def __init__(self, socket_buffer=None):
		self.protocol_map = {1:"ICMP",6:"TCP",17:"UDP"}
 
		self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
		self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
 
		try:
			self.protocol = self.protocol_map[self.protocol_num]
		except:
			self.protocol = str(self.protocol_num)
		
class ICMP(Structure):
	"""docstring for ICMP"""
	_fields_ = [
		("type",			c_ubyte),
		("code",			c_ubyte),
		("checksum",		c_ushort),
		("unused",		c_ushort),
		("next_hop_mtu",	c_ushort)
	]
		
	def __new__(self,socket_buffer):
		return self.from_buffer_copy(socket_buffer)
 
	def __init__(self,socket_buffer):
		pass
 
if os.name == "nt":
	socket_protocol = socket.IPPROTO_IP
else:
	socket_protocol = socket.IPPROTO_ICMP
 
sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
 
sniffer.bind((host,0))
sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)
 
if os.name == "nt":
	sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)
 
try:
	while True:
		
		raw_buffer = sniffer.recvfrom(65565)[0]
 
		ip_header = IP(raw_buffer[0:20])
 
		print("Protocol : %s %s -> %s"%(ip_header.protocol,ip_header.src_address,ip_header.dst_address))
 
		#如果为ICMP,进行处理
		if ip_header.protocol == "ICMP":
 
			#计算ICMP包的起始位置
			offset = ip_header.ihl*4  #IPV4包的ihl部分长度乘4就结束了IPV4包结构进入下一层(这里是进入ICMP包)
			buf = raw_buffer[offset:offset + sizeof(ICMP)]
 
			#解析ICMP数据
			icmp_header = ICMP(buf)

			print("ICMP -> Type : %d  Code : %d"%(icmp_header.type,icmp_header.code))
 
except KeyboardInterrupt:
	
	if os.name == "nt":
		sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

一个我自己都觉得很垃圾的基于纯udp嗅探的局域网主机扫描

#只要目标机器不发烧udp包那我就永远无法扫出来,太垃圾了
import socket
import os
import time
from ctypes import *
from netaddr import *
import struct

host = "192.168.43.4"
subnet="192.168.43.0/24"

class IP(Structure):
	"""docstring for IP"""
	_fields_ = [
		("ihl",			c_ubyte, 4),
		("version",		c_ubyte, 4),
		("tos",			c_ubyte),
		("len",			c_ushort),
		("id",			c_ushort),
		("offset",		c_ushort),
		("ttl",			c_ubyte),
		("protocol_num",	c_ubyte),
		("sum",			c_ushort),
		("src",			c_ulong),
		("dst",			c_ulong)
	]
 
	def __new__(self,socket_buffer=None):
		return self.from_buffer_copy(socket_buffer)
 
	def __init__(self, socket_buffer=None):
		self.protocol_map = {1:"ICMP",6:"TCP",17:"UDP"}
 
		self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
		self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
 
		try:
			self.protocol = self.protocol_map[self.protocol_num]
		except:
			self.protocol = str(self.protocol_num)
		

if os.name=="nt":
    sniffer=socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.IPPROTO_IP)
else:
    sniffer=socket.socket(socket.AF_INET,socket.SOCK_RAW,socket.IPPROTO_ICMP)

sniffer.bind((host,0))
sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

if os.name=="nt":
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

ip_list=[]
while True:
    raw_data=sniffer.recvfrom(100000)[0]
    ip_rex=IP(raw_data)
    if IPAddress(ip_rex.src_address) in IPNetwork(subnet):
        if ip_rex.src_address not in ip_list and ip_rex.src_address != host:
            ip_list.append(ip_rex.src_address)
            print("[+]Host up:%s"%ip_rex.src_address)

Scapy

scapy库,很牛逼,上面说的东西在scapy库中都能很简单很简单的完成。

嗅探器 sniff

scapy创建一个本地嗅探器非常非常简单

sniff(filter="",iface="",prn=function,count=Num)
就这样,一个嗅探器就建立好了,来解析一下各个参数
filter是添加过滤条件,采用的是BPF过滤器(wireshark那个过滤语法),留空则没有过滤
iface选中网卡,留空则嗅探所有网卡
prn 设置回调函数
count设置嗅探多少个包就停止嗅探

然后我们使用回调函数获得sniff得到的包

from scapy.all import *

def a(packet):
    print(packet.show())   #对包使用show函数,即可得到该包的详细信息

sniff(filter="",prn=a,count=1,iface="WLAN")  #只有当嗅探停止后,才会调用回调

>>>
###[ Ethernet ]###
  dst       = ff:ff:ff:ff:ff:ff
  src       = 98:3b:8f:97:15:ad
  type      = ARP
###[ ARP ]###
     hwtype    = 0x1
     ptype     = IPv4
     hwlen     = 6
     plen      = 4
     op        = who-has
     hwsrc     = 98:3b:8f:97:15:ad
     psrc      = 192.168.43.187
     hwdst     = 00:00:00:00:00:00
     pdst      = 169.254.255.255

None

当然你也可以选择输出你想得到的包的某个数据
from scapy.all import *

def a(packet):
    print(packet.show())
    print(packet[TCP].sport)   #输出数据包的TCP部分的sport属性
    print(packet[IP].payload)  #输出数据包的IP部分的载荷部分

sniff(filter="tcp",prn=a,count=1,iface="WLAN")

>>>
###[ Ethernet ]### 
  dst       = 3c:f0:11:8a:a1:dd
  src       = 12:0b:fb:56:7b:7e
  type      = IPv4
###[ IP ]###
     version   = 4
     ihl       = 5
     tos       = 0x0
     len       = 40
     id        = 34196
     flags     = DF
     frag      = 0
     ttl       = 50
     proto     = tcp
     chksum    = 0xf549
     src       = 202.108.23.152
     dst       = 192.168.43.69
     \options   \
###[ TCP ]###
        sport     = https
        dport     = 49025
        seq       = 3617286723
        ack       = 3302556124
        dataofs   = 5
        reserved  = 0
        flags     = A
        window    = 948
        chksum    = 0x145d
        urgptr    = 0
        options   = []

None
443
b'\x01\xbb\xbf\x81\xd7\x9bjC\xc4\xd9\x01\xdcP\x10\x03\xb4\x14]\x00\x00'

相比于刚刚使用socket和ctypes等进行本地包嗅探,scapy显得如此友好

ARP局域网主机扫描

通过这个例子认识一下scapy数据包的收发功能,同时学习一下ARP扫描。
(因为上面那个扫描器垃圾死了)

arp扫描

关于ARP,可以看这里:http://www.const27.com/2020/06/03/arp%e5%8d%8f%e8%ae%ae%e4%b8%8earp%e6%ac%ba%e9%aa%97/

ok,arp扫描原理就是:
向广播域依次询问0/24 IP段的mac地址,若某个包有回应就说明存在对应IP的主机。

scapy 数据包收发

构造一个数据包:
a = Ether(*args)/IP(*args)/TCP(*args)/应用层数据
从左到右依次按从下往上构造数据包,其中的args可以自行构造

收发数据包
send/sendp(a)   分别在第三层和第二层发送数据包,但这两个函数仅能发送不负责收返回的数据包
ans,unans=sr/srp/sr1/srloop(a)  收和发数据包,ans存储有回应的数据包,unans存储无回应的数据包,常用srp

code

from scapy.all import *

scan = Ether(dst="ff:ff:ff:ff:ff:ff",src="3c:f0:11:8a:a1:dd")/ARP(pdst="192.168.43.0/24")   #Ether设置为广播域

ans,unans=srp(scan,iface="WLAN",timeout=2)  #ans和unans都是二维元组,元组第一项是发送包,第二项是响应包

result=[]

for s,r in ans:
    result.append([r[ARP].psrc,r[ARP].hwsrc])

result.sort()

for ip,mac in result:
    print(ip,"--->",mac)

>>>
Begin emission:
...*........................................................................................................................................................................................*......Finished sending 256 packets..
..............................*..................................*.
Received 263 packets, got 4 answers, remaining 252 packets
192.168.43.1 ---> 12:0b:fb:56:7b:7e
192.168.43.181 ---> a0:51:0b:3d:6f:0d
192.168.43.187 ---> 98:3b:8f:97:15:ad
192.168.43.252 ---> 48:89:e7:c2:df:a3

ARP投毒

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

Leave a Reply

Your email address will not be published. Required fields are marked *

标签云