TCP/UDPのクライアント/サーバーツールをpythonで作る

TCP/IP,PySimpleGUI,python,TCP,UDP,通信

単純なTCP、UDP通信がGUIで使い勝手よく行えるツールが欲しくなったので作ってみた。

通信ツール自体はググればいろいろとあるけど、自分で作れて用途に応じていじれた方が何かと使い勝手がいいし。

一応exe化して置いといたのでツールだけ使いたい人はここから引っ張ってください。

SRC

ライブラリは以下。

PySimpleGUI               4.60.1

クライアント

メインのTCP、UDPフレーム送信ツール実装。

# coding: utf -8
import PySimpleGUI as sg # ライブラリの読み込み
from socket import socket, AF_INET, SOCK_DGRAM, SOCK_STREAM
# バッファサイズ指定
BUFSIZE = 14600
# テーマの設定
sg.theme("SystemDefault ")

# 事前設定
L1 = [
    # 診断機設定
    [sg.Text("・src IP address ",size=(15,1)),
    sg.InputText(default_text="127.0.0.1" , text_color = "#000000",background_color ="#ffffff",        size=(15,1),    key="-SRC_IP_ADDR-" ),
    sg.Text("     ")],
    [sg.Text("・src port num  ",size=(15,1)),
    sg.InputText(default_text="49152" ,    text_color = "#000000",background_color ="#ffffff" ,        size=(8,1),        key="-SRC_PORT_NUM-" )],
    [sg.Text("・dest IP address ",size=(15,1)),
    sg.InputText(default_text="127.0.0.2" , text_color = "#000000",background_color ="#ffffff" ,    size=(15,1),    key="-DEST_IP_ADDR-" ),
    sg.Text("     ")],
    [sg.Text("・dest port num  ",size=(15,1)),
    sg.InputText(default_text="49152" ,    text_color = "#000000",background_color ="#ffffff" ,        size=(8,1),        key="-DEST_PORT_NUM-" )],
 ]

# UDP
L2 = [[sg.Button("send", border_width=4 ,    size =(10,1),    key="-BTN_SEND_UDP-")]
]

# TCP
L3 = [[sg.Button("connect", border_width=4 , size =(10,1),    key="-BTN_CONECT_TCP-"),
    sg.Button("send", border_width=4 ,         size =(10,1),    key="-BTN_SEND_TCP-"),
    sg.Button("disconnect", border_width=4 , size =(10,1),    key="-BTN_DISSCONECT_TCP-")]
]

# Send data
L4 = [
    [sg.Multiline(default_text="" , border_width=2,        size=(55,22),    key="-PAYLOAD-")]
]

L5 = [
    [sg.Multiline(default_text="", border_width=1,    size=(58,26),autoscroll=True,    key="-COM_ST-")]]

L =[
    [
    sg.Frame("Settings",
        [
            [sg.Frame("Socket config ",L1,size=(420,130))],
            [sg.Frame("Send data",L4)]
        ]
    ),
    sg.Frame("communication",
        [
            [sg.Frame("UDP",L2),sg.Frame("TCP",L3)],
            [sg.Frame("Connection status",L5)]
        ]
    ),
    ]
]

L_NEW = []

# ウィンドウ作成
window = sg.Window ("TCP/UDP cliant tool", L)
values = ""

def main():
    global values
    # イベントループ
    while True:
        # イベントの読み取り(イベント待ち)
        event , values = window.read()
        window_txt = ""
        # 確認表示
        # print(" イベント:",event ,", 値:",values)
        # 終了条件( None: クローズボタン)
        if event == "-BTN_SEND_UDP-":
            window_txt += "----------UDP send----------"
            window_txt += main_udp_send(values)
        elif event == "-BTN_CONECT_TCP-":
            tcp_connect( values['-SRC_IP_ADDR-'] , values['-DEST_IP_ADDR-'] , int(values['-SRC_PORT_NUM-']) , int(values['-DEST_PORT_NUM-']) )
            window_txt +=  "----------TCP connect----------\n"
        elif event == "-BTN_SEND_TCP-":
            window_txt += "----------TCP send----------\n"
            window_txt += main_tcp_send(values)
        elif event == "-BTN_DISSCONECT_TCP-":
            tcp_close()
            window_txt +=  "----------TCP disconnect----------\n"
            window["-SRC_PORT_NUM-"].Update( int(values["-SRC_PORT_NUM-"]) + 1)
        elif event == None:
            print(" 終了します. ")
            break
        print(window_txt.replace("\n\n\n","\n\n"))
        if "" == values['-COM_ST-']:
            window["-COM_ST-"].Update(window_txt.replace("\n\n\n","\n\n"))
        else:
            window["-COM_ST-"].Update(values['-COM_ST-']+ "\n\n" + window_txt.replace("\n\n\n","\n\n"))
    # 終了処理
    window.close()


def main_udp_send(values):
    rtn = ""
    if None != values["-PAYLOAD-"]:
        rtn =  "\nsend data:" + values["-PAYLOAD-"].upper()+ "\n\n"
        udp_send( values['-SRC_IP_ADDR-'] , values['-DEST_IP_ADDR-'] , int(values['-SRC_PORT_NUM-']) , int(values['-DEST_PORT_NUM-']) , bytes.fromhex(values["-PAYLOAD-"]) )
    return rtn


def udp_send( src_ip , dst_ip , src_port , dst_port , data ):
    # 送信側アドレスをtupleに格納
    SrcAddr = ( src_ip , src_port )
    # 受信側アドレスをtupleに格納
    DstAddr = ( dst_ip , dst_port )
    # ソケット作成
    udpClntSock = socket(AF_INET, SOCK_DGRAM)
    # 送信側アドレスでソケットを設定
    udpClntSock.bind(SrcAddr)
    # 受信側アドレスにペイロード長単位で送信
    chunk_size = 1468
    # ペイロード長単位で送信
    for i in range(0, len(data), chunk_size ):
        chunk = data[i:i+chunk_size]
        udpClntSock.sendto(chunk,DstAddr)
    return


def tcp_connect( src_ip , dst_ip , src_port , dst_port ):
    global tcpClntSock
    # ソケット作成
    tcpClntSock = socket(AF_INET, SOCK_STREAM)
    tcpClntSock.settimeout(0.5)
    # 送信側アドレスをtupleに格納
    SrcAddr = ( src_ip , src_port )
    # 送信側アドレスでソケットを設定
    tcpClntSock.bind(SrcAddr)
    # サーバーに接続
    tcpClntSock.connect((dst_ip, dst_port))


def tcp_close():
    global tcpClntSock
    # ソケットクローズ
    tcpClntSock.close()


def main_tcp_send(values):
    rtn = ""
    if None != values["-PAYLOAD-"]:
        rtn = "send data:" + values["-PAYLOAD-"].upper()+ "\n\n"
        tcp_send( bytes.fromhex(values["-PAYLOAD-"]) )
    return rtn


def tcp_send( data ):
    global tcpClntSock
    chunk_size = 1456
    # ペイロード長単位で送信
    for i in range(0, len(data), chunk_size ):
        chunk = data[i:i+chunk_size]
        tcpClntSock.send(chunk)
    return


if __name__ == '__main__':
    main()

サーバー

クライアントツールの確認用サーバ立ち上げツール実装。

# coding: utf -8
import PySimpleGUI as sg # ライブラリの読み込み
import threading
import sys
import socket
from socket import socket, AF_INET, SOCK_DGRAM ,SOCK_STREAM
import time

# バッファサイズ指定
BUFSIZE = 14600
# テーマの設定
sg.theme("SystemDefault ")

# 事前設定
L1 = [[sg.Text("・IP address "            ,size=(20,1)),
    sg.InputText(default_text="127.0.0.2" ,     text_color = "#000000",background_color ="#ffffff" ,    size=(40,1),    key="-IP_ADDR" )],
    [sg.Text("・port num "            ,size=(20,1)),
    sg.InputText(default_text="49152" ,            text_color = "#000000",background_color ="#ffffff" ,    size=(40,1),    key="-PORT_NUM-" )]
]
# UDP
L2 = [[sg.Button("OPEN", border_width=4 ,    size =(25,1),    key="-BTN_UDP_OPEN-")]]

# TCP
L3 = [[sg.Button("OPEN", border_width=4 ,     size =(25,1),    key="-BTN_TCP_OPEN-")]]

# 通信ステータス
L4 = [[sg.Multiline(default_text="", border_width=1,    size=(62,10),    key="-COM_ST-")]]

L = [[sg.Frame("Socket config",L1)],
    [sg.Frame("UDP",L2),sg.Frame("TCP",L3)],
    [sg.Frame("Connection status",L4)]]

# ウィンドウ作成
window = sg.Window ("TCP/UDP sever tool", L)
values = ""

def main():
    global values
    threads = []
    # イベントループ
    while True:
        # イベントの読み取り(イベント待ち)
        event , values = window.read()
        # 確認表示
        # print(" イベント:",event ,", 値:",values)
        # 終了条件( None: クローズボタン)
        if event == "-BTN_UDP_OPEN-":
            t1 = threading.Thread(target=udp_recv, args=(values['-IP_ADDR'] , int(values['-PORT_NUM-']),))
            threads.append(t1)
            t1.setDaemon(True)
            t1.start()
            window["-COM_ST-"].Update("----------UDP open----------")

        # TCP接続
        elif event == "-BTN_TCP_OPEN-":
            t2 = threading.Thread(target=tcp_recv, args=(values['-IP_ADDR'] , int(values['-PORT_NUM-']),))
            threads.append(t2)
            t2.setDaemon(True)
            t2.start()
            window["-COM_ST-"].Update("----------TCP open----------")
        elif event == None:
            sys.exit()


def main_window_update_open():
    global values
    window_txt = "----------TCP connect----------\n"
    window["-COM_ST-"].Update(window_txt)
    print(window_txt)
    return


def main_window_update(protocol,data):
    global values
    window_txt = ""
    window_txt += "----------{} data recieve----------\n".format(protocol) \
                + "Recv data:" + data + "\n"
    window["-COM_ST-"].Update(window_txt)
    print(window_txt)
    return


def main_window_update_close():
    global values
    window_txt = "----------TCP disconnect----------\n"
    window["-COM_ST-"].Update(window_txt)
    print(window_txt)
    return


def udp_recv( ip_addr , port ):
    protocol = "UDP"
    # 受信側アドレスをtupleに格納
    SrcAddr = ( ip_addr , port)
    # ソケット作成
    udpServSock = socket(AF_INET, SOCK_DGRAM)
    # 受信側アドレスでソケットを設定
    udpServSock.bind(SrcAddr)
    time.sleep(1)
    # While文を使用して常に受信待ちのループを実行
    while True:
        time.sleep(1)
        # ソケットにデータを受信した場合の処理
        # 受信データを変数に設定
        data, addr = udpServSock.recvfrom(BUFSIZE)
        # 受信データを確認
        main_window_update(protocol,data.hex())


def recv_client(connection, client):
    protocol = "TCP"
    while True:
        try:
            data = connection.recv(BUFSIZE)
            if 0 != len(data):
                main_window_update(protocol,data.hex())
            else:
                break
        except ConnectionResetError:
            break
    connection.close()
    main_window_update_close()


def tcp_recv( ip_addr , port ):
    tcp_server = socket(AF_INET, SOCK_STREAM)
    tcp_server.bind(( ip_addr , port))
    tcp_server.listen()
    time.sleep(1)
    while True:
        (connection, client) = tcp_server.accept()
        main_window_update_open()
        thread = threading.Thread(target=recv_client, args=(connection, client))
        # スレッド処理開始
        thread.start()


if __name__ == '__main__':
    main()

実行結果

クライアント

各種パラメータ指定後、ボタン押下でフレームを送信します。

サーバ

クライアントツールからフレームを投げる前にサーバを立ち上げて任意のパラメータで起動する必要があります。

wiresharkキャプチャ

ローカルの通信結果です。