TCP/UDPのクライアント/サーバーツールをpythonで作る
単純なTCP、UDP通信がGUIで使い勝手よく行えるツールが欲しくなったので作ってみた。
通信ツール自体はググればいろいろとあるけど、自分で作れて用途に応じていじれた方が何かと使い勝手がいいし。
一応exe化して置いといたのでツールだけ使いたい人はここから引っ張ってください。
SRC
ライブラリは以下。
PySimpleGUI 4.60.1
クライアント
メインのTCP、UDPフレーム送信ツール実装。
# coding: utf -8
import PySimpleGUI as sg # ライブラリの読み込み
from socket import socket, AF_INET, SOCK_DGRAM, SOCK_STREAM
# バッファサイズ指定
BUFSIZE = 14600
# テーマの設定
sg.theme("SystemDefault ")
# 事前設定
L1 = [
# 診断機設定
[sg.Text("・src IP address ",size=(15,1)),
sg.InputText(default_text="127.0.0.1" , text_color = "#000000",background_color ="#ffffff", size=(15,1), key="-SRC_IP_ADDR-" ),
sg.Text(" ")],
[sg.Text("・src port num ",size=(15,1)),
sg.InputText(default_text="49152" , text_color = "#000000",background_color ="#ffffff" , size=(8,1), key="-SRC_PORT_NUM-" )],
[sg.Text("・dest IP address ",size=(15,1)),
sg.InputText(default_text="127.0.0.2" , text_color = "#000000",background_color ="#ffffff" , size=(15,1), key="-DEST_IP_ADDR-" ),
sg.Text(" ")],
[sg.Text("・dest port num ",size=(15,1)),
sg.InputText(default_text="49152" , text_color = "#000000",background_color ="#ffffff" , size=(8,1), key="-DEST_PORT_NUM-" )],
]
# UDP
L2 = [[sg.Button("send", border_width=4 , size =(10,1), key="-BTN_SEND_UDP-")]
]
# TCP
L3 = [[sg.Button("connect", border_width=4 , size =(10,1), key="-BTN_CONECT_TCP-"),
sg.Button("send", border_width=4 , size =(10,1), key="-BTN_SEND_TCP-"),
sg.Button("disconnect", border_width=4 , size =(10,1), key="-BTN_DISSCONECT_TCP-")]
]
# Send data
L4 = [
[sg.Multiline(default_text="" , border_width=2, size=(55,22), key="-PAYLOAD-")]
]
L5 = [
[sg.Multiline(default_text="", border_width=1, size=(58,26),autoscroll=True, key="-COM_ST-")]]
L =[
[
sg.Frame("Settings",
[
[sg.Frame("Socket config ",L1,size=(420,130))],
[sg.Frame("Send data",L4)]
]
),
sg.Frame("communication",
[
[sg.Frame("UDP",L2),sg.Frame("TCP",L3)],
[sg.Frame("Connection status",L5)]
]
),
]
]
L_NEW = []
# ウィンドウ作成
window = sg.Window ("TCP/UDP cliant tool", L)
values = ""
def main():
global values
# イベントループ
while True:
# イベントの読み取り(イベント待ち)
event , values = window.read()
window_txt = ""
# 確認表示
# print(" イベント:",event ,", 値:",values)
# 終了条件( None: クローズボタン)
if event == "-BTN_SEND_UDP-":
window_txt += "----------UDP send----------"
window_txt += main_udp_send(values)
elif event == "-BTN_CONECT_TCP-":
tcp_connect( values['-SRC_IP_ADDR-'] , values['-DEST_IP_ADDR-'] , int(values['-SRC_PORT_NUM-']) , int(values['-DEST_PORT_NUM-']) )
window_txt += "----------TCP connect----------\n"
elif event == "-BTN_SEND_TCP-":
window_txt += "----------TCP send----------\n"
window_txt += main_tcp_send(values)
elif event == "-BTN_DISSCONECT_TCP-":
tcp_close()
window_txt += "----------TCP disconnect----------\n"
window["-SRC_PORT_NUM-"].Update( int(values["-SRC_PORT_NUM-"]) + 1)
elif event == None:
print(" 終了します. ")
break
print(window_txt.replace("\n\n\n","\n\n"))
if "" == values['-COM_ST-']:
window["-COM_ST-"].Update(window_txt.replace("\n\n\n","\n\n"))
else:
window["-COM_ST-"].Update(values['-COM_ST-']+ "\n\n" + window_txt.replace("\n\n\n","\n\n"))
# 終了処理
window.close()
def main_udp_send(values):
rtn = ""
if None != values["-PAYLOAD-"]:
rtn = "\nsend data:" + values["-PAYLOAD-"].upper()+ "\n\n"
udp_send( values['-SRC_IP_ADDR-'] , values['-DEST_IP_ADDR-'] , int(values['-SRC_PORT_NUM-']) , int(values['-DEST_PORT_NUM-']) , bytes.fromhex(values["-PAYLOAD-"]) )
return rtn
def udp_send( src_ip , dst_ip , src_port , dst_port , data ):
# 送信側アドレスをtupleに格納
SrcAddr = ( src_ip , src_port )
# 受信側アドレスをtupleに格納
DstAddr = ( dst_ip , dst_port )
# ソケット作成
udpClntSock = socket(AF_INET, SOCK_DGRAM)
# 送信側アドレスでソケットを設定
udpClntSock.bind(SrcAddr)
# 受信側アドレスにペイロード長単位で送信
chunk_size = 1468
# ペイロード長単位で送信
for i in range(0, len(data), chunk_size ):
chunk = data[i:i+chunk_size]
udpClntSock.sendto(chunk,DstAddr)
return
def tcp_connect( src_ip , dst_ip , src_port , dst_port ):
global tcpClntSock
# ソケット作成
tcpClntSock = socket(AF_INET, SOCK_STREAM)
tcpClntSock.settimeout(0.5)
# 送信側アドレスをtupleに格納
SrcAddr = ( src_ip , src_port )
# 送信側アドレスでソケットを設定
tcpClntSock.bind(SrcAddr)
# サーバーに接続
tcpClntSock.connect((dst_ip, dst_port))
def tcp_close():
global tcpClntSock
# ソケットクローズ
tcpClntSock.close()
def main_tcp_send(values):
rtn = ""
if None != values["-PAYLOAD-"]:
rtn = "send data:" + values["-PAYLOAD-"].upper()+ "\n\n"
tcp_send( bytes.fromhex(values["-PAYLOAD-"]) )
return rtn
def tcp_send( data ):
global tcpClntSock
chunk_size = 1456
# ペイロード長単位で送信
for i in range(0, len(data), chunk_size ):
chunk = data[i:i+chunk_size]
tcpClntSock.send(chunk)
return
if __name__ == '__main__':
main()
サーバー
クライアントツールの確認用サーバ立ち上げツール実装。
# coding: utf -8
import PySimpleGUI as sg # ライブラリの読み込み
import threading
import sys
import socket
from socket import socket, AF_INET, SOCK_DGRAM ,SOCK_STREAM
import time
# バッファサイズ指定
BUFSIZE = 14600
# テーマの設定
sg.theme("SystemDefault ")
# 事前設定
L1 = [[sg.Text("・IP address " ,size=(20,1)),
sg.InputText(default_text="127.0.0.2" , text_color = "#000000",background_color ="#ffffff" , size=(40,1), key="-IP_ADDR" )],
[sg.Text("・port num " ,size=(20,1)),
sg.InputText(default_text="49152" , text_color = "#000000",background_color ="#ffffff" , size=(40,1), key="-PORT_NUM-" )]
]
# UDP
L2 = [[sg.Button("OPEN", border_width=4 , size =(25,1), key="-BTN_UDP_OPEN-")]]
# TCP
L3 = [[sg.Button("OPEN", border_width=4 , size =(25,1), key="-BTN_TCP_OPEN-")]]
# 通信ステータス
L4 = [[sg.Multiline(default_text="", border_width=1, size=(62,10), key="-COM_ST-")]]
L = [[sg.Frame("Socket config",L1)],
[sg.Frame("UDP",L2),sg.Frame("TCP",L3)],
[sg.Frame("Connection status",L4)]]
# ウィンドウ作成
window = sg.Window ("TCP/UDP sever tool", L)
values = ""
def main():
global values
threads = []
# イベントループ
while True:
# イベントの読み取り(イベント待ち)
event , values = window.read()
# 確認表示
# print(" イベント:",event ,", 値:",values)
# 終了条件( None: クローズボタン)
if event == "-BTN_UDP_OPEN-":
t1 = threading.Thread(target=udp_recv, args=(values['-IP_ADDR'] , int(values['-PORT_NUM-']),))
threads.append(t1)
t1.setDaemon(True)
t1.start()
window["-COM_ST-"].Update("----------UDP open----------")
# TCP接続
elif event == "-BTN_TCP_OPEN-":
t2 = threading.Thread(target=tcp_recv, args=(values['-IP_ADDR'] , int(values['-PORT_NUM-']),))
threads.append(t2)
t2.setDaemon(True)
t2.start()
window["-COM_ST-"].Update("----------TCP open----------")
elif event == None:
sys.exit()
def main_window_update_open():
global values
window_txt = "----------TCP connect----------\n"
window["-COM_ST-"].Update(window_txt)
print(window_txt)
return
def main_window_update(protocol,data):
global values
window_txt = ""
window_txt += "----------{} data recieve----------\n".format(protocol) \
+ "Recv data:" + data + "\n"
window["-COM_ST-"].Update(window_txt)
print(window_txt)
return
def main_window_update_close():
global values
window_txt = "----------TCP disconnect----------\n"
window["-COM_ST-"].Update(window_txt)
print(window_txt)
return
def udp_recv( ip_addr , port ):
protocol = "UDP"
# 受信側アドレスをtupleに格納
SrcAddr = ( ip_addr , port)
# ソケット作成
udpServSock = socket(AF_INET, SOCK_DGRAM)
# 受信側アドレスでソケットを設定
udpServSock.bind(SrcAddr)
time.sleep(1)
# While文を使用して常に受信待ちのループを実行
while True:
time.sleep(1)
# ソケットにデータを受信した場合の処理
# 受信データを変数に設定
data, addr = udpServSock.recvfrom(BUFSIZE)
# 受信データを確認
main_window_update(protocol,data.hex())
def recv_client(connection, client):
protocol = "TCP"
while True:
try:
data = connection.recv(BUFSIZE)
if 0 != len(data):
main_window_update(protocol,data.hex())
else:
break
except ConnectionResetError:
break
connection.close()
main_window_update_close()
def tcp_recv( ip_addr , port ):
tcp_server = socket(AF_INET, SOCK_STREAM)
tcp_server.bind(( ip_addr , port))
tcp_server.listen()
time.sleep(1)
while True:
(connection, client) = tcp_server.accept()
main_window_update_open()
thread = threading.Thread(target=recv_client, args=(connection, client))
# スレッド処理開始
thread.start()
if __name__ == '__main__':
main()
実行結果
クライアント
各種パラメータ指定後、ボタン押下でフレームを送信します。
サーバ
クライアントツールからフレームを投げる前にサーバを立ち上げて任意のパラメータで起動する必要があります。
wiresharkキャプチャ
ローカルの通信結果です。