forked from rbrune/fuelband-usb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pcap_dissect.py
executable file
·301 lines (257 loc) · 9.74 KB
/
pcap_dissect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
#!/usr/bin/env python3
from collections import deque
from enum import Enum
import argparse
import nike
import nike.utils as utils
import time
MAX_BYTES_PER_LINE = 16
# bytes offsets into txt file line
DATA_START_IDX = 6
DATA_WIDTH = MAX_BYTES_PER_LINE * 3 - 1
DATA_END_IDX = DATA_START_IDX + DATA_WIDTH
ASCII_START_IDX = DATA_START_IDX + DATA_WIDTH + 3
class RequestType(Enum):
SUBMIT = 0x00
COMPLETE = 0x01
class ReportType(Enum):
SET_REPORT = 0x00
GET_REPORT = 0x80
class Packet(object):
def __init__(self, id, data):
self.id = id
self.data = bytearray()
self.data[:] = data # store copy of data
self.request_type = RequestType(data[3])
self.report_type = ReportType(data[30] & 0x80)
class Request(Packet):
def __init__(self, pkt):
super(Request, self).__init__(pkt.id, pkt.data)
self.pkt = pkt
FB_CMD_OFFSET = 32 # 32 -> Mac, 36 -> Linux
self.report_id = self.data[FB_CMD_OFFSET]
self.req_len = self.data[FB_CMD_OFFSET + 1]
self.tag = self.data[FB_CMD_OFFSET + 2]
self.opcode = nike.SE_Opcode(self.data[FB_CMD_OFFSET + 3])
self.payload = self.data[FB_CMD_OFFSET + 4:FB_CMD_OFFSET + 4 + self.req_len - 1]
self.subcmd_code = None
self.subcmd_len = 0
self.subcmd_val = []
if self.opcode == nike.SE_Opcode.SETTING_SET:
self.subcmd_code = nike.SE_SubCmdSett(self.payload[0])
self.subcmd_len = int(self.payload[1])
self.subcmd_val = self.payload[2:2+self.subcmd_len]
elif self.opcode == nike.SE_Opcode.SETTING_GET:
length = int(self.payload[0])
if length != 1:
raise RuntimeError("SETTING_GET request should have length == 1, but it's %d" % length)
self.subcmd_code = nike.SE_SubCmdSett(self.payload[1])
elif self.opcode == nike.SE_Opcode.BATTERY_STATE:
self.subcmd_code = nike.SE_SubCmdBatt(self.payload[0])
# replay the request to a real fuelband device
# returns the response buffer from the device
def send_to_device(self, fb_dev):
cmd = [self.opcode]
# payload is bytearray and can't append lists with bytearrays,
# soooo.... lazily append bytes onto our command list
for data in self.payload:
cmd.append(data)
# send it!
return fb_dev.send(
cmd,
report_id=self.report_id,
tag=self.tag)
def pretty_str(self, **kwargs):
out = "req - "
out += "op: %s; " % self.opcode.name
if self.subcmd_code:
out += "subcmd: %s; " % self.subcmd_code.name
out += "subcmd_len: %d; " % self.subcmd_len
if self.subcmd_len > 0:
out += "\n%s" % nike.utils.to_hex_with_ascii(self.subcmd_val, indent=4)
return out
class GraphicsPack(Request):
def __init__(self, pkt):
super(GraphicsPack, self).__init__(pkt.id, pkt.data)
self.index = int(self.payload[0])
self.address = utils.intFromLittleEndian(self.payload[1:2])
self.graphics_len = int(self.payload[3])
self.graphics_data = int(self.payload[4:])
if len(self.graphics_data) != self.graphics_len:
raise RuntimeError("graphics data length mismatch!")
def pretty_str(self, **kwargs):
out = "req - "
out += "op: %s; " % self.opcode.name
out += "index: %d; " % self.index
out += "\n%s" % utils.to_hex_with_ascii(self.graphics_data, indent=4)
return out
class GenericMemoryBlock(Request):
def __init__(self, pkt):
super(GenericMemoryBlock, self).__init__(pkt)
self.rw_mode = nike.SE_MemCmds(self.payload[0])
self.address = utils.intFromLittleEndian(self.payload[1:3])
self.mem_len = utils.intFromLittleEndian(self.payload[3:5])
self.mem = []
if self.rw_mode == nike.SE_MemCmds.WRITE_CHUNK:
self.mem = self.payload[5:5+self.mem_len]
def pretty_str(self, **kwargs):
out = "req - "
out += "op: %s (%s); " % (self.opcode.name, self.rw_mode.name)
out += "address: 0x%04x; " % self.address
out += "mem_len: %d; " % self.mem_len
if self.rw_mode == nike.SE_MemCmds.WRITE_CHUNK:
out += "\n%s" % utils.to_hex_with_ascii(self.mem, indent=4)
return out
class UploadGraphicsPack(GenericMemoryBlock):
def __init__(self, pkt):
super(UploadGraphicsPack, self).__init__(pkt)
def upcast_request(req):
if req.opcode == nike.SE_Opcode.UPLOAD_GRAPHICS_PACK:
return UploadGraphicsPack(req.pkt)
elif req.opcode == nike.SE_Opcode.DESKTOP_DATA:
return GenericMemoryBlock(req.pkt)
return req
class Response(Packet):
def __init__(self, pkt):
super(Response, self).__init__(pkt.id, pkt.data)
class MemDump(object):
def __init__(self, size):
self.mem = bytearray(size)
def resize(self, new_size):
old_mem = self.mem
self.mem = bytearray(new_size)
n_to_copy = min(new_size, len(old_mem))
for i in range(n_to_copy):
self.mem[i] = old_mem[i]
def add_block(self, at_idx, block):
block_len = len(block)
req_size = at_idx + block_len
# resize our memory if block goes past the end
if req_size > len(self.mem):
self.resize(req_size)
# insert block into our memory at given start idx
for i in range(block_len):
self.mem[at_idx + i] = block[i]
def parse_pkts_from_file(pcap_file, **kwargs):
max_pkts = kwargs.get('max_pkts', None)
verbose = kwargs.get('verbose', False)
# parse packets from pcap text file
pkts = deque()
pkt_data = bytearray()
pkt_idx = 0
for line_num,line in enumerate(pcap_file):
if len(line) < DATA_END_IDX:
if verbose:
print("pkt_idx: %d" % pkt_idx)
print("pkt_data:")
utils.print_hex_with_ascii(pkt_data)
if len(pkt_data) >= 35:
pkts.append(Packet(pkt_idx, pkt_data))
pkt_idx += 1
if max_pkts and pkt_idx >= max_pkts:
break
pkt_data.clear()
continue # skip empty line
line = line.decode('utf-8')
offset = int(line[0:4], 16)
data = utils.hex_row_to_bytes(line[DATA_START_IDX:DATA_END_IDX])
pkt_data += data
return pkts
# waits for fuelband device to reconnect to PC and returns it
def wait_for_device(timeout=10):
for i in range(timeout):
fb = nike.open_fuelband()
if fb == None:
time.sleep(1.0)
else:
return fb
raise TimeoutError("couldn't open Fuelband after %ds" % timeout)
# extracts all Fuelband requests from pkts list
def get_all_requests(pkts):
requests = deque()
for idx, pkt in enumerate(pkts):
if pkt.report_type != ReportType.SET_REPORT:
continue
if pkt.request_type != RequestType.COMPLETE:
continue
requests.append(Request(pkt))
return requests
# replays request packets to a real fuelband device
def replay(fb, requests, **kwargs):
verbose = kwargs.get('verbose', False)
bad_pkts = []
try:
for idx, req in enumerate(requests):
if verbose:
print("sending request #%d ..." % idx)
print(req.pretty_str())
# some requests cause device to reboot. catch the error due
# to reboot, wait for device to reconnect, and keep on going
try:
resp = req.send_to_device(fb)
if verbose:
print("resp: %s" % resp)
except OSError:
if verbose:
print("device seems to have rebooted on pkt #%d" % idx)
bad_pkts.append(idx)
fb = wait_for_device()
except KeyboardInterrupt:
pass
if verbose:
print("bad_pkts = %s" % bad_pkts)
return bad_pkts
def dissect_pkts(pkts, **kwargs):
gpack_file = kwargs.get('gpack_file', None)
gpack_mem = MemDump(64 * 1024)
for pkt in pkts:
if pkt.report_type != ReportType.SET_REPORT:
continue
if pkt.request_type != RequestType.COMPLETE:
continue
req = upcast_request(Request(pkt))
print(req.pretty_str())
if isinstance(req, UploadGraphicsPack):
gpack_mem.add_block(req.address, req.mem)
if gpack_file:
gpack_file.write(gpack_mem.mem)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog='pcap_dissect',
description="dissects fuelband pcap text files")
parser.add_argument(
'pcap',
type=argparse.FileType('rb'),
help="the pcap text file to read")
parser.add_argument(
'-m','--max-pkts',
default=None,
type=int,
help="max number of packets to analyze")
parser.add_argument(
'-g','--gpack-file',
default=None,
type=argparse.FileType('wb'),
help="graphics pack output file")
parser.add_argument(
'--replay',
default=False,
action='store_true',
help="replay pcap file to a connected Fuelband device")
args = parser.parse_args()
pkts = parse_pkts_from_file(
args.pcap,
max_pkts=args.max_pkts)
if args.replay:
fb = nike.open_fuelband()
if fb == None:
print("No fuelband devices found")
exit(-1)
requests = get_all_requests(pkts)
print("replaying %d request(s) ..." % len(requests))
bad_pkts = replay(fb, requests)
print("done! %d bad packet(s)" % len(bad_pkts))
else:
dissect_pkts(
pkts,
gpack_file=args.gpack_file)