pythonでDoIP ~④DoIP通信用GUI作成~

DoIP,GUI,python

今回はDoIPツールに合わせたGUIを、pythonのpysimpleguiで作成してみました。

対象はGUIなので、まだDoIPプロトコルの実装まではいきません。

ツール構成

DoIPクライアント、DoIPサーバそれぞれに割り当てるコマンドは以下の通り。

コマンド名役割プロトコル
Generic DoIP header negative acknowledge非対応UDP/TCP
Vehicle identification request messageクライアントUDP
Vehicle identification request message with EIDクライアントUDP
Vehicle identification request message with VINクライアントUDP
Vehicle announcement message非対応UDP
vehicle identification response messageサーバUDP
DoIP entity status requestクライアントUDP
DoIP entity status responseサーバUDP
Diagnostic power mode information requestクライアントUDP
Diagnostic power mode information responseサーバUDP
Routing activation requestクライアントTCP
Routing activation responseサーバTCP
Alive check requestクライアントTCP
Alive check responseサーバTCP
Diagnostic messageクライアント/サーバTCP
Diagnostic message positive acknowledgementクライアント/サーバTCP
Diagnostic message negative acknowledgementクライアント/サーバTCP

今回のツールにおいては異常系の対応は非対応とします。(余裕があればやるかも)

ユーザはクライアントツールで期待するコマンドを選択し送信。

サーバに関してはあらかじめ起動しておき、設定してあるパラメータや通信状態に応じてクライアントからの要求に応答する。

各コマンドの詳細については以下を確認してください

ソース

DoIPクライアント

# coding: utf -8
import PySimpleGUI as sg # ライブラリの読み込み
import udp
import tcp

# テーマの設定
sg.theme("SystemDefault ")

# 事前設定
L1 = [
	# 診断機設定
	[sg.Text("・IPアドレス "),
	sg.InputText(default_text="127.0.0.1" , text_color = "#000000",background_color ="#ffffff",		size=(15,1),	key="-IP_EQP-" ),
	sg.Text("     ")],
	[sg.Text("・ポート番号 "),
	sg.InputText(default_text="13401" ,	text_color = "#000000",background_color ="#ffffff" ,		size=(8,1),		key="-PORT_EQP-" )]]

	# GW設定
L2 = [
	[sg.Text("・IPアドレス "),
	sg.InputText(default_text="127.0.0.2" , text_color = "#000000",background_color ="#ffffff" ,	size=(15,1),	key="-IP_GW-" ),
	sg.Text("     ")],
	[sg.Text("・ポート番号 "),
	sg.InputText(default_text="13400" ,	text_color = "#000000",background_color ="#ffffff" ,		size=(8,1),		key="-PORT_GW-" )]]
L3 = [
	[sg.Multiline(default_text="", border_width=1,	size=(145,10),	key="-COM_ST-")]]

# UDP
L4 = [[sg.Button("送信", border_width=4 ,												size =(15,1),	key="-BTN_SEND_UDP-")],			# ボタン
	## Vehicle identification request message
	[sg.Radio(text='Vehicle identification request message',							size=(60,1) ,	key="-CMD_UDP_VIR-"	,	group_id='CMD_UDP') ],
	## Vehicle identification request message with EID
	[sg.Radio(text='Vehicle identification request message with EID',					size=(35,1) ,	key="-CMD_UDP_VIR_EID-" ,	group_id='CMD_UDP') ],
	[sg.Text("・EID", size=(10,1)) , sg.InputText(default_text="112233445566" ,			size=(23,1) ,	key="-TXT_ID-")],
	## Vehicle identification request message with VIN
	[sg.Radio(text='Vehicle identification request message with VIN',					size=(35,1) ,	key="-CMD_UDP_VIR_VIN-" ,	group_id='CMD_UDP')],
	[sg.Text("・VIN (ASCII)", size=(10,1)) , sg.InputText(default_text="0123456789ABCDEFF0" ,		size=(23,1) ,	key="-TXT_VIN-")],
	## DoIP entity status request
	[sg.Radio(text='DoIP entity status request',										size=(35,1) ,	key="-CMD_UDP_DISR-" ,	group_id='CMD_UDP')],
	## Diagnostic power mode information request
	[sg.Radio(text='Diagnostic power mode information request',							size=(35,1) ,	key="-CMD_UDP_DPMIR-" ,	group_id='CMD_UDP')],
	## UDP FREE Message
	[sg.Radio(text='FREE MESSAGE',														size=(15,1) ,	key="-CMD_UDP_FREE-"	,	group_id='CMD_UDP')],
	[sg.Multiline(default_text="" , border_width=2,										size=(63,8),	key="-TXT_UDP_FREE-")]]

# TCP
L5 = [[sg.Button("接続", border_width=4 , 												size =(15,1),	key="btn_connect_tcp"),				# ボタン
	sg.Button("切断", border_width=4 , 													size =(15,1),	key="btn_close_tcp"),				# ボタン
	sg.Button("送信", border_width=4 , 													size =(15,1),	key="btn_send_tcp")],				# ボタン
	## Routing activation request
	[sg.Radio(text='Routing activation request',										size=(60,1) ,	key="-CMD_TCP_RAR-"	,	group_id='CMD_TCP') ],
	[sg.Text("・SA", size=(10,1)) , sg.InputText(default_text="0E00" ,					size=(7,1) ,	key="-TXT_SA1-")],
	## Alive check request
	[sg.Radio(text='Alive check request',												size=(35,1) ,	key="-CMD_TCP_ACR-"	,	group_id='CMD_TCP') ],
	## Diagnostic message
	[sg.Radio(text='Diagnostic message',												size=(35,1) ,	key="-CMD_TCP_DM-"	,	group_id='CMD_TCP')],
	[sg.Text("・SA", size=(10,1)) , sg.InputText(default_text="0E00" ,					size=(7,1) ,	key="-TXT_SA2-"),
	sg.Text("・TA", size=(7,1)) , sg.InputText(default_text="0F00" ,					size=(7,1) ,	key="-TXT_TA-")],
	[sg.Text("・User data", size=(10,1)),sg.Multiline(default_text="", border_width=2,	size=(50,3),	key="-TXT_DIAG-")],
	## FREE Message
	[sg.Radio(text='FREE MESSAGE',														size=(15,1) ,	key="-CMD_TCP_FREE-"	,	group_id='CMD_TCP')],
	[sg.Multiline(default_text="" , border_width=2,										size=(63,8),	key="-TXT_TCP_FREE-")]]


L = [[sg.Frame("DoIPクライアント(診断機)",L1),sg.Frame("DoIPサーバ(ゲートウェイ)",L2)],
	[sg.Frame("UDP",L4),sg.Frame("TCP",L5)],
	[sg.Frame("通信ステータス",L3)]]

# ウィンドウ作成
window = sg.Window ("DoIP cliant tool", L)


def main():
	rtn = ""
	# イベントループ
	while True:
		# イベントの読み取り(イベント待ち)
		event , values = window.read()
		# 確認表示
		print(" イベント:",event ,", 値:",values)
		# print(bytearray.fromhex(values['-TXT_ID-']))
		# 終了条件( None: クローズボタン)
		if event == "-BTN_SEND_UDP-":
			print("UDP送信")
			rtn = main_udp_send_cmd(values)
		# TCP接続
		elif event == "btn_connect_tcp":
			rtn = tcp.tcp_connect( values['-IP_EQP-'] , values['-IP_GW-'] , int(values['-PORT_EQP-']) , int(values['-PORT_GW-']) )
		# TCP切断
		elif event == "btn_close_tcp":
			rtn = tcp.tcp_close()
		elif event == "btn_send_tcp":
			rtn = main_tcp_send_cmd(values)
		elif event == None:
			print(" 終了します. ")
			break
		window["-COM_ST-"].Update(rtn)
	# 終了処理
	window.close()


def main_udp_send_cmd(values):
	if values["-CMD_UDP_VIR-"] == True:
		data = "-CMD_UDP_VIR-"
	elif values["-CMD_UDP_VIR_EID-"] == True:
		data = "-CMD_UDP_VIR_EID-"
	elif values["-CMD_UDP_VIR_VIN-"] == True:
		data = "-CMD_UDP_VIR_VIN-"
	elif values["-CMD_UDP_DISR-"] == True:
		data = "-CMD_UDP_DISR-"
	elif values["-CMD_UDP_DPMIR-"] == True:
		data = "-CMD_UDP_DPMIR-"
	elif values["-CMD_UDP_FREE-"] == True:
		data = "-CMD_UDP_FREE-"
	else:
		data = "UDP送信エラー"
		return
	rtn = udp.udp_send( values['-IP_EQP-'] , values['-IP_GW-'] , int(values['-PORT_EQP-']) , int(values['-PORT_GW-']) , data )
	return data


def main_tcp_send_cmd(values):
	if values["-CMD_TCP_RAR-"] == True:
		data = "-CMD_TCP_RAR-"
	elif values["-CMD_TCP_ACR-"] == True:
		data = "-CMD_TCP_ACR-"
	elif values["-CMD_TCP_DM-"] == True:
		data = "-CMD_TCP_DM-"
	elif values["-CMD_TCP_FREE-"] == True:
		data = "-CMD_TCP_FREE-"
	else:
		data = "TCP送信エラー"
		return
	rtn = tcp.tcp_send( data )
	return data


main()

DoIPサーバ

# coding: utf -8
import PySimpleGUI as sg # ライブラリの読み込み
import udp
import tcp
import threading
import sys

# テーマの設定
sg.theme("SystemDefault ")

# 事前設定
L1 = [[sg.Text("・IPアドレス ",size=(18,1)),
	sg.InputText(default_text="127.0.0.2" , text_color = "#000000",background_color ="#ffffff" ,	size=(23,1),	key="-IP_GW" )],
	[sg.Text("・ポート番号 ",size=(18,1)),
	sg.InputText(default_text="13400" ,	text_color = "#000000",background_color ="#ffffff" ,		size=(23,1),		key="-PORT_GW-" )],
	[sg.Text("・EID ",size=(18,1)),
	sg.InputText(default_text="112233445566" ,	text_color = "#000000",background_color ="#ffffff" ,		size=(23,1),		key="-EID_GW-" )],
	[sg.Text("・VIN (ASCII) ",size=(18,1)),
	sg.InputText(default_text="0123456789ABCDEFF0" ,	text_color = "#000000",background_color ="#ffffff" ,		size=(23,1),		key="-VIN_GW-" )],
	[sg.Text("・ロジカルアドレス ",size=(18,1)),
	sg.InputText(default_text="0001" ,	text_color = "#000000",background_color ="#ffffff" ,		size=(23,1),		key="-LOGICAL_ADDRESS-" )]]
# UDP
L2 = [[sg.Button("OPEN", border_width=4 ,												size =(16,1),	key="-BTN_UDP_OPEN-")]]			# ボタン

# TCP
L3 = [[sg.Button("OPEN", border_width=4 , 												size =(16,1),	key="-BTN_TCP_OPEN-")]]				# ボタン

# 通信ステータス
L4 = [[sg.Multiline(default_text="", border_width=1,	size=(42,10),	key="-COM_ST-")]]

L = [[sg.Frame("DoIPサーバ設定",L1)],
	[sg.Frame("UDP",L2),sg.Frame("TCP",L3)],
	[sg.Frame("通信ステータス",L4)]]

# ウィンドウ作成
window = sg.Window ("DoIP sever tool", L)

if __name__ == '__main__':
	threads = []
	# イベントループ
	while True:
		# イベントの読み取り(イベント待ち)
		event , values = window.read()
		# 確認表示
		print(" イベント:",event ,", 値:",values)
		# 終了条件( None: クローズボタン)
		if event == "-BTN_UDP_OPEN-":
			t1 = threading.Thread(target=udp.udp_recv, args=(values['-IP_GW'] , int(values['-PORT_GW-']),))
			threads.append(t1)
			t1.setDaemon(True)
			t1.start()
			print("UDP OPEN")
		# TCP接続
		elif event == "-BTN_TCP_OPEN-":
			t2 = threading.Thread(target=tcp.tcp_recv, args=(values['-IP_GW'] , int(values['-PORT_GW-']),))
			threads.append(t2)
			t2.setDaemon(True)
			t2.start()
			print("TCP OPEN")
		elif event == None:
			sys.exit()


その他ソケット回り

今回はプロトタイプとしてクライアントーサーバ間で上記GUIを使用した簡単なTCP・UDP通信が実施できるようにしてみました。

通信内容は簡単なデータのやり取りで、DoIPプロトコルの実装は次回以降対応予定です。

ソケット回りのコードについては以下で確認お願いします。

起動

GUI画面はセンスが出るところだと思います。

自身はありませんが個人的にこういったレイアウトを試行錯誤するのは好きです。

DoIPクライアント

通信ステータス部分にはDoIPの通信内容を表示させる予定です。

DoIPサーバ

一度起動したら特に操作する必要はないため、クライアントに比べてシンプルになってます。

事前に設定したパラメータに応じて、応答内容が変わります。

最後に

いよいよツールの形も見えてきたので、次回でいよいよDoIPプロトコルを実装して完了したいと思います。

DoIP,GUI,pythonDoIP,GUI,python