pythonでイーサパケットを解析 ~イーサネットヘッダとIPヘッダのプロトコル解析~

python,TCP/IP,イーサネット,通信

今回はイーサフレームのダンプデータのうち、イーサネットヘッダどIPヘッダをpythonで解析するツールを作成してみました。

本来wiresharkで通信をキャプチャすれば事足りることなのですが、現在実装している組み込み機器において、機器内部で扱っている通信データを直接ダンプして確認したいことがあったので作ってみました。

また、本記事で解説しているsrcは以下に挙げています。

検討

前述の通り、イーサネットヘッダとIPヘッダを解析できるようなツールを作成します。

両ヘッダを含めたフレームの全体図が以下になります。

イーサネットヘッダ

イーサネットヘッダはOSI基本参照モデルにおけるデータリンク層(L2)としての役割を持っています。

ヘッダ内のMACアドレスから通信相手を特定するために使用します。

MACアドレスは機器ごとに固有の識別子であり、郵便配達における個人名によく例えられます。

イーサネットヘッダはVLANタグありの場合となしの場合でフレームフォーマットが異なり、今回は両方に対応できる解析処理を実装します。

VLANの詳細については以下などを参照。

各フレームフォーマットの差分は以下の通り。

イーサネットヘッダフォーマット
パラメータ名称サイズ説明
タイプ
Ethernetタイプ番号2byte上位層のプロトコルを識別するための番号。
今回はIPv4の0x0800のみ対応。
TPID Tag Protocol Identifier2byteタグ付きフレームであることを示す識別子。
値は0x8100固定。
TCI Tag Control Information2byte各フィールド(PCP、CFI、VID)ごとにVLANタグに関する詳細情報が格納されている。
PCP Priority Code Point3bitフレームの優先度を0(最低)から7(最高)で示し、各種トラフィック(音声、動画、データなど)の優先順位付けに利用できる。
CFICanonical Format Indicator1bitイーサネットとトークンリングの相互接続時に使われる1bitの識別子。
イーサネットの場合0となる。
(現在では実用性が低くなったため、以下のDEI に置き換った)
DEIDrop Eligibility Indicator1bit1 の場合、輻輳が発生したときに優先的に廃棄することを促す。
VIDVLAN ID12bitそのフレームが属するVLANを指定する。
VLAN ID= 0 と 4095 は予約されているため、有効範囲は1~4094となる。
0の場合、どのVLANにも属していないことを意味し、単なる優先度タグ (priority tag) として使われる。
各パラメータ概要

IPヘッダ

IPヘッダはOSI基本参照モデルにおけるネットワーク層(L3)としての役割を持っています。

コンピュータ間でデータをやり取りする際の約束事の確認や、通信相手が属するネットワークを特定するのに使用します。

IPアドレスは通信相手が存在するネットワークの識別子であり、郵便配達における住所によく例えられます。

IPヘッダの構造と各種パラメータについては以下の通り。

IPヘッダ構造
TOS構造
パラメータ名称サイズ概要
VersionVersion4bitIPのバージョンであり、IPv4の場合は4が格納される。
その他設定値については以下の通り。
  4:Internet Protocol (IP)
  6:Internet Protocol version 6 (IPv6)
  7:TP/IX : The Next Internet (TP/IX)
  8:The P Internet Protocol (PIP)
  9:TUBA (TUBA)
IHLInternet Header Length4bitIPヘッダーの長さ。
TOSType Of Service8bitIPパケットの優先度などパケットの品質を決める情報。
DSCP、ECT、CEが格納されている。
DSCPDifferentiated Services Code Points6bit「0~63の64段階」の 優先度を定義する。
ECTECN Capable Transport1bit上位層がECNの仕組みに対応しているかどうかを示す。
ECNとはExplicit Congestion Notificationの略であり、中継ルータが明示的に congestion(ネットワークの輻輳)の発生を知らせる機能。
CECongestion Experience1bitcongestionが起きていることを示すビット。
Total LengthTotal Length16bitIPヘッダを含むパケットの全長。
IdentificationIdentification16bitIPパケットの識別子。
複数のIPパケットに分割してメッセージを送受信する際、分割されたパケットを結合するための識別子として使用する。
FlagFlag3bitIPパケットの分割を制御する。
1bit目:未使用
2bit目:0:分割可、1:分割不可
3bit目:0:最終パケット、1:中間(または初回)パケット
Flagment OffsetFlagment Offset13bit分割されたパケットが、元のデータのどこに位置しているかを示す。
byte単位で最大値65536。
TTLTime to Live8bitパケットが通過可能なルータの数。
ルータを経由するたび1づつ減っていき、0になった時点でパケットは破棄されます。
ProtocolProtocol8bitIPの上位プロトコルを表す。
各番号の対応については以下参照。
プロトコル番号
Header CheckcumHeader Checkcum16bitIPヘッダ部分(VersionからDataの前まで)だけのチェックサム。
計算方法については下記参照。
チェックサム計算方法
Source AddressSource Address32bit送信元のIPアドレス。
Destination AddressDestination Address32bit宛先のIPアドレス
OptionsOptions可変通常は使用されない。
デバッグやテストを行う際に使用される。
PaddingPadding可変通常は使用されない。
オプション使用時に、IPヘッダ長が4バイト(32ビット)の整数倍にならなかった場合、オプションのサイズを4バイトの整数倍にするために使用する。値は0を設定する。
各パラメータ概要

実装

GUIと解析処理にソースを分けて実装しました。

GUI

GUIの表示と更新の役割を担います。

難しい処理はしていないのですが、ちょうどよいレイアウトに調整するのは骨が折れました。

# coding: utf -8
import PySimpleGUI as sg # ライブラリの読み込み
import re
import analyze

# テーマの設定
sg.theme("SystemDefault ")

# ダンプデータ
L_dump_input = [
    [sg.Multiline(default_text="",
    border_width=1,
    size=(75,60),
    autoscroll=True,
    key="-DUMP_INPUT_DATA-")]
]
# 整形データ
L_dump_output = [
    [sg.Multiline(default_text="",
    border_width=1,
    size=(75,30),
    autoscroll=True,
    key="-DUMP_OUTPUT_DATA-")]
]
# 解析開始ボタン
L_start_btn = sg.Button("開始",
                            border_width=4 ,
                            size =(58,1),
                            key="-BTN_START-")
# 宛先MACアドレス
L_ether_mac_addr_des = [
    sg.Text("・Destination MAC address", size=(20,1)) ,
    sg.InputText(default_text="" ,
    size=(20,1) ,
    key="-ETHER_MAC_DES_ADDR-")
]
# 送信元MACアドレス
L_ether_mac_addr_src =[
    sg.Text("・source MAC address", size=(20,1)) ,
    sg.InputText(default_text="" ,
    size=(20,1) ,
    key="-ETHER_MAC_SRC_ADDR-")
]
# VLAN
L_ether_tpid = [
    sg.Text("・TPID", size=(19,1)) ,
    sg.InputText(default_text="" ,
    size=(10,1) ,
    key="-ETHER_VLAN_TPID-")
]
# TCI
L_ether_tci =[
    [
        sg.Text("・PCP", size=(18,1)) ,
        sg.InputText(default_text="" ,
        size=(10,1) ,
        key="-ETHER_VLAN_PCP-")
    ],
    [
        sg.Text("・CFI", size=(18,1)) ,
        sg.InputText(default_text="" ,
        size=(10,1) ,
        key="-ETHER_VLAN_CFI-")
    ],
    [
        sg.Text("・VID", size=(18,1)) ,
        sg.InputText(default_text="" ,
        size=(10,1) ,
        key="-ETHER_VLAN_VID-")
    ],
]
# タイプ
L_ether_type =[
    sg.Text("・Type", size=(20,1)) ,
    sg.InputText(default_text="" ,
    size=(10,1) ,
    key="-ETHER_TYPE-")
]
# IPヘッダ
# Version
L_ip_version = [
    sg.Text("・Version", size=(20,1)) ,
    sg.InputText(default_text="" ,
    size=(10,1) ,
    key="-IP_VERSION-")
]
# IHL
L_ip_ihl = [
    sg.Text("・IHL", size=(20,1)) ,
    sg.InputText(default_text="" ,
    size=(10,1) ,
    key="-IP_IHL-")
]
# TOS
L_ip_tos =[
    [
        sg.Text("・DSCP", size=(19,1)) ,
        sg.InputText(default_text="" ,
        size=(10,1) ,
        key="-IP_TOS_DSCP-")
    ],
    [
        sg.Text("・ECT", size=(19,1)) ,
        sg.InputText(default_text="" ,
        size=(10,1) ,
        key="-IP_TOS_ECT-")
    ],
    [
        sg.Text("・CE", size=(19,1)) ,
        sg.InputText(default_text="" ,
        size=(10,1) ,
        key="-IP_TOS_CE-")
    ],
]
# Total Length
L_ip_length = [
    sg.Text("・Total Length", size=(20,1)) ,
    sg.InputText(default_text="" ,
    size=(10,1) ,
    key="-IP_LENGTH-")
]
# Identification
L_ip_id = [
    sg.Text("・Identification", size=(20,1)) ,
    sg.InputText(default_text="" ,
    size=(10,1) ,
    key="-IP_ID-")
]
# Flag
L_ip_flag = [
    sg.Text("・Flag", size=(20,1)) ,
    sg.InputText(default_text="" ,
    size=(10,1) ,
    key="-IP_FLAG-")
]
# Flagment Offset
L_ip_offset = [
    sg.Text("・Offset", size=(20,1)) ,
    sg.InputText(default_text="" ,
    size=(10,1) ,
    key="-IP_OFFSET-")
]
# TTL
L_ip_ttl = [
    sg.Text("・TTL", size=(20,1)) ,
    sg.InputText(default_text="" ,
    size=(10,1) ,
    key="-IP_TTL-")
]
# Protocol
L_ip_protcol = [
    sg.Text("・Protocol", size=(20,1)) ,
    sg.InputText(default_text="" ,
    size=(10,1) ,
    key="-IP_PROT-")
]
# Header Checkcum
L_ip_chksum = [
    sg.Text("・Header Checkcum", size=(20,1)) ,
    sg.InputText(default_text="" ,
    size=(10,1) ,
    key="-IP_CHKSUM-")
]
# Source Address
L_ip_src_addr = [
    sg.Text("・Source IP Address", size=(20,1)) ,
    sg.InputText(default_text="" ,
    size=(12,1) ,
    key="-IP_SRC_ADDR-")
]
# Destination Address
L_ip_dst_addr = [
    sg.Text("・Destination IP Address", size=(20,1)) ,
    sg.InputText(default_text="" ,
    size=(12,1) ,
    key="-IP_DST_ADDR-")
]
#全体レイアウト
L = [
[
    sg.Frame("解析データ",L_dump_input, size=(480, 800)),
    sg.Frame("解析結果",
    [
        [
            L_start_btn
        ],
        [
            # イーサヘッダ解析結果
            sg.Frame("Ether header",
            [
                L_ether_mac_addr_des,
                L_ether_mac_addr_src,
                [
                    sg.Frame("VLAN",
                    [
                        L_ether_tpid,
                        [
                            sg.Frame("TCI",L_ether_tci, size=(440, 110))
                        ]
                    ], size=(460, 160)
                    )
                ],
                L_ether_type
            ], size=(480, 270)
            )
        ],
        [
            # IPヘッダ解析結果
            sg.Frame("IP header",
            [
                L_ip_version,
                L_ip_ihl,
                [
                    sg.Frame("TOS", L_ip_tos, size=(460, 110))
                ],
                L_ip_length,
                L_ip_id,
                L_ip_flag,
                L_ip_offset,
                L_ip_ttl,
                L_ip_protcol,
                L_ip_chksum,
                L_ip_src_addr,
                L_ip_dst_addr
            ], size=(480, 450)
            )
        ]
    ], size=(400, 800)
    )
],
]

# ウィンドウ作成
window = sg.Window ("Pcket analyze tool", L, resizable=True)
values = ""

def main():
    global values
    # イベントループ
    while True:
        # イベントの読み取り(イベント待ち)
        event , values = window.read()
        # 解析開始
        if event == "-BTN_START-":
            input_txt = re.sub('[^0123456789abcdefABCDEF]', '', values["-DUMP_INPUT_DATA-"])
            update_list = analyze.header_chk(input_txt)
            for update_inf in update_list:
                window[update_inf[0]].Update(update_inf[1])
        # 終了条件( None: クローズボタン)
        elif event == None:
            print(" 終了します. ")
            break

    # 終了処理
    window.close()

if __name__ == '__main__':
    main()

解析処理

各パラメータの位置情報をテーブル化し、極力パラメータごとの解析処理を共通の処理で扱えるようにしました。

おかげで解析処理の方がGUIよりもずっと短い行数で実装できています。

import pandas as pd

# イーサヘッダテーブル
ether_header_tbl = \
[   #"KEY"                  ,"OFFSET_BYTE","OFFSET_BIT" ,"SIZE_BYTE" ,"SIZE_BIT","SIZE","TYPE"
    ["-ETHER_MAC_DES_ADDR-" ,0            ,0            ,6           ,0         ,12        ,"x"  ],
    ["-ETHER_MAC_SRC_ADDR-" ,6            ,0            ,6           ,0         ,12        ,"x"  ],
    ["-ETHER_VLAN_TPID-"    ,12           ,0            ,2           ,0         ,4         ,"x"  ],
    ["-ETHER_VLAN_PCP-"     ,14           ,0            ,0           ,3         ,3         ,"b"  ],
    ["-ETHER_VLAN_CFI-"     ,14           ,3            ,0           ,1         ,1         ,"b"  ],
    ["-ETHER_VLAN_VID-"     ,14           ,1            ,1           ,4         ,3         ,"d"  ],
    ["-ETHER_TYPE-"         ,16           ,0            ,2           ,0         ,4         ,"x"  ],
]

ether_header_df = pd.DataFrame((ether_header_tbl), columns=["KEY","OFFSET_BYTE","OFFSET_BIT" ,"SIZE_BYTE" ,"SIZE_BIT","SIZE","TYPE"])

# IPヘッダテーブル
ip_header_tbl = \
[   #"KEY"                  ,"OFFSET_BYTE","OFFSET_BIT" ,"SIZE_BYTE" ,"SIZE_BIT","SIZE","TYPE"
    ["-IP_VERSION-"         ,0            ,0            ,0           ,4         ,1         ,"d"  ],
    ["-IP_IHL-"             ,0            ,4            ,0           ,4         ,1         ,"d"  ],
    ["-IP_TOS_DSCP-"        ,1            ,0            ,0           ,6         ,2         ,"d"  ],
    ["-IP_TOS_ECT-"         ,1            ,6            ,0           ,1         ,1         ,"b"  ],
    ["-IP_TOS_CE-"          ,1            ,7            ,0           ,1         ,1         ,"b"  ],
    ["-IP_LENGTH-"          ,2            ,0            ,2           ,0         ,4         ,"d"  ],
    ["-IP_ID-"              ,4            ,0            ,2           ,0         ,4         ,"x"  ],
    ["-IP_FLAG-"            ,6            ,0            ,0           ,3         ,3         ,"b"  ],
    ["-IP_OFFSET-"          ,6            ,3            ,1           ,5         ,4         ,"d"  ],
    ["-IP_TTL-"             ,8            ,0            ,1           ,0         ,3         ,"d"  ],
    ["-IP_PROT-"            ,9            ,0            ,1           ,0         ,3         ,"d"  ],
    ["-IP_CHKSUM-"          ,10           ,0            ,2           ,0         ,4         ,"x"  ],
    ["-IP_SRC_ADDR-"        ,12           ,0            ,4           ,0         ,8         ,"x"  ],
    ["-IP_DST_ADDR-"        ,16           ,0            ,4           ,0         ,8         ,"x"  ],
]
ip_header_df = pd.DataFrame((ip_header_tbl), columns=["KEY","OFFSET_BYTE","OFFSET_BIT" ,"SIZE_BYTE" ,"SIZE_BIT","SIZE","TYPE"])

# ヘッダ解析
def header_chk(msg):
    rtn_list = []
    # イーサヘッダ解析
    packet , rtn_list = ether_header_chk(msg)
    # IPヘッダ解析
    rtn_list = ip_header_chk(packet,rtn_list)
    return rtn_list

# イーサヘッダ解析
def ether_header_chk(msg):
    rtn_list = []
    ip_msg = ""
    int_data = int("0x"+msg,16)
    total_bit_size = int(len(msg)/2*8)
    # イーサヘッダテーブル検索
    for index, row in ether_header_df.iterrows():
        # オフセットだけシフト
        offset_bit = row["OFFSET_BYTE"]*8 + row["OFFSET_BIT"] + row["SIZE_BYTE"]*8 + row["SIZE_BIT"]
        shift_data = int_data >> (total_bit_size - offset_bit)
        # サイズ分マスク
        mask_bit = 2**(row["SIZE_BYTE"]*8 + row["SIZE_BIT"]) - 1
        mask_data = shift_data & mask_bit
        # 任意の型へ変換
        if "d" == row["TYPE"]:
            data = int(mask_data)
        else:
            data = format(mask_data, "#0"+ str(row["SIZE"]+2) + row["TYPE"])
        if "-ETHER_VLAN_TPID-" == row["KEY"] and "0x0800" == data :
            update_inf = ["-ETHER_TYPE-",data]
            rtn_list.append(update_inf)
            print(update_inf)
            ip_msg = msg[14*2:]
            break
        elif "-ETHER_TYPE-" == row["KEY"] and "0x0800" == data :
            update_inf = ["-ETHER_TYPE-",data]
            rtn_list.append(update_inf)
            print(update_inf)
            ip_msg = msg[18*2:]
            break
        else:
            update_inf = [row["KEY"],data]
            rtn_list.append(update_inf)
            print(update_inf)
    return ip_msg,rtn_list

# ipヘッダ解析
def ip_header_chk(packet,get_list):
    rtn_list = get_list
    int_data = int("0x"+packet,16)
    total_bit_size = int(len(packet)/2*8)
    for index, row in ip_header_df.iterrows():
        # オフセットだけシフト
        offset_bit = row["OFFSET_BYTE"]*8 + row["OFFSET_BIT"] + row["SIZE_BYTE"]*8 + row["SIZE_BIT"]
        shift_data = int_data >> (total_bit_size - offset_bit)
        # サイズ分マスク
        mask_bit = 2**(row["SIZE_BYTE"]*8 + row["SIZE_BIT"]) - 1
        mask_data = shift_data & mask_bit
        # 任意の型へ変換
        if "d" == row["TYPE"]:
            data = int(mask_data)
        else:
            data = format(mask_data, "#0"+ str(row["SIZE"]+2) + row["TYPE"])
        update_inf = [row["KEY"],data]
        rtn_list.append(update_inf)
        print(update_inf)
    return rtn_list

動作確認

試験データ

以前pythonでDHCPサーバを作った時のwiresharkログがあったので、そこから取得したダンプデータを少し弄って動作確認に使用しました。

過去記事については以下参照。

VLANタグなしテストデータ

ff ff ff ff ff ff 11 22 33 44 55 66 08 00 45 00
01 4a 31 31 00 00 80 11 00 00 00 00 00 00 ff ff
ff ff 00 44 00 43 01 36 e7 04 01 01 06 00 7e a2
14 fc 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 11 22 33 44 55 66 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 63 82 53 63 35 01 01 3d 07 01
11 22 33 44 55 66 32 04 c0 a8 00 f9 0c 0f 44 45
53 4b 54 4f 50 2d 42 55 51 41 4b 42 51 3c 08 4d
53 46 54 20 35 2e 30 37 0e 01 03 06 0f 1f 21 2b
2c 2e 2f 77 79 f9 fc ff

VLANタグありテストデータ

ff ff ff ff ff ff 11 22 33 44 55 66 81 00 FF FF 08 00 45 00
01 4a 31 31 00 00 80 11 00 00 00 00 00 00 ff ff
ff ff 00 44 00 43 01 36 e7 04 01 01 06 00 7e a2
14 fc 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 11 22 33 44 55 66 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00 00 00 00 00 00 63 82 53 63 35 01 01 3d 07 01
11 22 33 44 55 66 32 04 c0 a8 00 f9 0c 0f 44 45
53 4b 54 4f 50 2d 42 55 51 41 4b 42 51 3c 08 4d
53 46 54 20 35 2e 30 37 0e 01 03 06 0f 1f 21 2b
2c 2e 2f 77 79 f9 fc ff

実行

実行時の画面表示は次のようになっています。

解析データ領域にダンプデータをペーストしたのち、右側の開始ボタンを押せば各種パラメータの値が確認できます。

VLANタグなし出力結果

VLANタグあり出力結果

最後に

各種ヘッダのいい勉強になりました。

やはり知識の定着には、実際に手を動かしてみることって重要ですよね。

近いうちに、UDP/TCPの解析なんかにも対応してみます。

追記(2023/01/28)

pythonで頑張っていましたが、もっと簡単にwiresharkで解析できる方法が分かりました。

良ければこちらを参考にしてください。