Bioprobes_data/python_server/util.py

254 lines
7.8 KiB
Python

import csv
import os
import math
import time
def save_login_action(uuid, data, csv_path):
create_login_csv(csv_path)
gap = data['gapList']
offset = data['offsetList']
hold_time, inter_time = get_hold_inter(gap)
distance = get_distance(offset)
pressures = get_pressures(data['pressures'])
touch_size = get_touch_size(data['touch_size'])
distanceMove = get_distanceMove(data['distanceMoveList'])
create_data = time.strftime("%Y-%m-%d-%H:%M:%S", time.localtime(int(time.time())))
try:
csv_file = open(csv_path, 'a', newline='', encoding='utf-8')
writer = csv.writer(csv_file)
row = [create_data, uuid, data['data_stamp'], data['uuid'], data['gesture'], data['gesture_phone'],
data['input_duration'], data['username_len'],
data['pswd_len'], \
hold_time, inter_time, distance, distanceMove, pressures, touch_size] # data['is_ios']]
writer.writerow(row)
csv_file.close()
return True
except BaseException:
return False
def create_login_csv(base_path):
if not os.path.exists(base_path):
print(f'{base_path} 不存在,创建该文件。')
csv_file = open(base_path, 'w', newline='', encoding='utf-8')
writer = csv.writer(csv_file)
titles = ['create_date', 'uuid', 'data_stamp', 'label', 'gesture', 'gesture_phone', 'input_duration',
'usrn_len', 'pswd_len', \
'hold-time', 'inter-time', 'distance', 'distanceMove', 'pressures', 'touch_size'] # 'total_time'
writer.writerow(titles)
csv_file.close()
def get_pressures(pressures):
pressures = [p for p in pressures if p is not None]
return str(pressures)
def get_touch_size(touch_size):
touch_size = [t for t in touch_size if t is not None]
return str(touch_size)
def get_distanceMove(distanceMoveList):
distanceMove = [d for d in distanceMoveList if d is not None]
return str(distanceMove)
def get_hold_inter(gap):
hold_time = []
inter_time = [0]
for i, item in zip(range(len(gap)), gap):
if i % 2 == 0:
hold_time.append(item)
else:
inter_time.append(item)
hold_time = str(hold_time)
inter_time = str(inter_time)
return hold_time, inter_time
def get_distance(offset):
distance = [0]
x1, y1 = offset[0], offset[1]
x2, y2 = 0, 0
for i, item in zip(range(len(offset) - 2), offset[2:]):
if i % 2 == 0:
x2 = item
else:
y2 = item
D = math.pow(math.pow(x2 - x1, 2) + math.pow(y2 - y1, 2), 0.5)
distance.append(D)
x1 = x2
y1 = y2
return str(distance)
def save_sensor(base_path, login_action_uuid, data):
# try:
is_ios = data['is_ios']
file_path = os.path.join(base_path, login_action_uuid)
csv_path = f'{file_path}.csv'
csv_file = open(csv_path, 'w', newline='', encoding='utf-8')
writer = csv.writer(csv_file)
writer.writerow(['accX', 'accY', 'accZ', 'gyroX', 'gyroY', 'gyroZ', 'magX', 'magY', 'magZ'])
accelerometer, gyroscope = data['accelerometer_data'], data['gyroscope_data']
length = min(len(accelerometer), len(gyroscope))
print(f'is_ios:{is_ios}')
if is_ios:
row = [0, 1, 2, 3, 4, 5]
for i, acc, gyro in zip(range(1, length + 1), accelerometer, gyroscope):
if i % 3 == 1:
row[0] = acc
row[3] = gyro
if i % 3 == 2:
row[1] = acc
row[4] = gyro
if i % 3 == 0:
row[2] = acc
row[5] = gyro
if i % 3 == 0:
writer.writerow(row)
else:
ac, gy, mag = getSimultaneousData(data['accelerometer_data'], data['accelerometer_time'],
data['gyroscope_data'],
data['gyroscope_time'], data['magnetometer_data'], data['magnetometer_time'])
# print("--------------------------------")
# print(mag)
row = [0, 1, 2, 3, 4, 5, 6, 7, 8]
for a, g, m in zip(ac, gy, mag):
row[0] = a[0]
row[1] = a[1]
row[2] = a[2]
row[3] = g[0]
row[4] = g[1]
row[5] = g[2]
row[6] = m[0]
row[7] = m[1]
row[8] = m[2]
writer.writerow(row)
csv_file.close()
return True
def save_random_action(uuid, data, csv_path):
create_random_csv(csv_path)
gap = data['gapList']
offset = data['offsetList']
hold_time, inter_time = get_hold_inter(gap)
distance = get_distance(offset)
pressures = get_pressures(data['pressures'])
touch_size = get_touch_size(data['touch_size'])
distanceMove = get_distanceMove(data['distanceMoveList'])
create_data = time.strftime("%Y-%m-%d-%H:%M:%S", time.localtime(int(time.time())))
try:
csv_file = open(csv_path, 'a', newline='', encoding='utf-8')
writer = csv.writer(csv_file)
row = [create_data, uuid, data['data_stamp'], data['uuid'], data['gesture'], data['gesture_phone'],
data['input_duration'],
data['str_len'], hold_time, inter_time, distance, distanceMove, pressures, touch_size]
writer.writerow(row)
csv_file.close()
return True
except BaseException:
return False
def create_random_csv(base_path):
if not os.path.exists(base_path):
print(f'{base_path} 不存在,创建该文件。')
csv_file = open(base_path, 'w', newline='', encoding='utf-8')
writer = csv.writer(csv_file)
titles = ['create_date', 'uuid', 'data_stamp', 'label', 'gesture', 'gesture_phone', 'input_duration', 'str_len',
'hold-time', 'inter-time', 'distance', 'distanceMove', 'pressures', 'touch_size']
writer.writerow(titles)
csv_file.close()
def getSimultaneousData(accel, accelTime, gyro, gyroTime, magnet, magnetTime):
ac, gy, mag = [], [], []
map_ac, map_gy, map_mag = getMapXX(accel, accelTime), getMapXX(gyro, gyroTime), getMapXX(magnet, magnetTime)
ac_keys = list(map_ac.keys())
gy_keys = list(map_gy.keys())
mag_keys = list(map_mag.keys())
error = 5
ac_i, gy_i = 0, 0
while ac_i < len(ac_keys) and gy_i < len(gy_keys):
margin_left = ac_keys[ac_i] - error
margin_right = ac_keys[ac_i] + error
if gy_keys[gy_i] < margin_left:
gy_i += 1
elif margin_left <= gy_keys[gy_i] and gy_keys[gy_i] <= margin_right:
ac.append(map_ac[ac_keys[ac_i]])
gy.append(map_gy[gy_keys[gy_i]])
# print(ac_keys[ac_i], gy_keys[gy_i])
ac_i += 1
gy_i += 1
elif gy_keys[gy_i] > margin_right:
ac_i += 1
ac_i, mag_i = 0, 0
while ac_i < len(ac_keys) and mag_i < len(mag_keys):
margin_left = ac_keys[ac_i] - error
margin_right = ac_keys[ac_i] + error
if mag_keys[mag_i] < margin_left:
mag_i += 1
elif margin_left <= mag_keys[mag_i] and mag_keys[mag_i] <= margin_right:
mag.append(map_mag[mag_keys[mag_i]])
# print(ac_keys[ac_i], gy_keys[gy_i])
ac_i += 1
mag_i += 1
elif mag_keys[mag_i] > margin_right:
ac_i += 1
return ac, gy, mag
def getMapXX(xx, xxTime):
map_xx = {}
j = 0
row = [0, 1, 2]
for i, x in zip(range(len(xx)), xx):
if i % 3 == 0:
row[0] = x
if i % 3 == 1:
row[1] = x
if i % 3 == 2:
row[2] = x
if i % 3 == 2:
map_xx[xxTime[j]] = row.copy()
j += 1
return map_xx
def getError(l, n, a):
gap_sum = 0
bef = l[0]
for g in l[1:n]:
gap_sum += (g - bef)
bef = g
error = gap_sum / (n - 1) * a
return error