import json
import requests
import os
import pandas as pd
class LieYing:
def __init__(self,key, sid=None, tid=None, trid=None, data=None, preprocess_params=None):
self.key = key
self.sid = sid
self.tid = tid
self.trid = trid
self.data = data
self.preprocess_params = preprocess_params
self.result_data = None
def add_service(self):
# 每个Key下最多注册15个Service
url = 'https://tsapi.amap.com/v1/track/service/add'
# 默认name为service_1
default_name = 'service_1'
params = {'key': self.key, 'name': default_name}
response = requests.post(url, data=params)
if response.status_code == 200:
response = response.json()
if response['errcode'] == 10000:
sid = response['data']['sid']
self.result_data = {'service': {'sid': sid, 's_name': default_name}}
else:
print(response['errmsg'])
else:
print('创建服务失败')
def add_terminal(self):
url = 'https://tsapi.amap.com/v1/track/terminal/add'
# 默认name为terminal_1
default_name = 'terminal_1'
params = {'key': self.key, 'sid': self.sid, 'name': default_name}
response = requests.post(url, data=params)
if response.status_code == 200:
response = response.json()
if response['errcode'] == 10000:
tid = response['data']['tid']
self.result_data = {'terminal':{'tid': tid, 't_name': default_name}}
else:
print(response['errmsg'])
else:
print('创建终端失败')
def add_track(self):
url = f'https://tsapi.amap.com/v1/track/trace/add'
params = {'key': self.key, 'sid': self.sid, 'tid': self.tid}
response = requests.post(url, data=params)
if response.status_code == 200:
response = response.json()
if response['errcode'] == 10000:
trid = response['data']['trid']
self.result_data = {'trace': {'trid': trid}}
if 'trname' in response['data']:
trname = response['data']['trname']
self.result_data['trace']['tr_name'] = trname
else:
print(response['errmsg'])
else:
print('添加轨迹失败')
def upload_track(self):
if 'timestamp' not in self.data or 'lng' not in self.data or 'lat' not in self.data:
print('轨迹数据格式错误')
return None
input_data = self.data.rename(columns={'timestamp': 'locatetime'})
# 构造轨迹查询所需输入(坐标格式为X,Y,其中小数点后最多6位)
input_data['location'] = input_data.apply(
lambda row: ','.join([str(round(row.lng, 6)), str(round(row.lat, 6))]), axis=1)
# 假设direction、speed同时存在或不存在(可单独判断)
if 'direction' in input_data and 'speed' in input_data:
points = input_data[['location', 'locatetime', 'direction', 'speed']].to_dict(orient='records')
else:
points = input_data[['location', 'locatetime']].to_dict(orient='records')
url = f'https://tsapi.amap.com/v1/track/point/upload'
params = {'key': self.key, 'sid': self.sid, 'tid': self.tid, 'trid': self.trid}
num = len(points)
if num > 5000:
print('超出个人用户每日限额')
self.result_data = {'upload': {'total': num, 'upload_status': 'all', 'upload_num': num}}
# 每次最多100个轨迹点,分批上传
# 若有相同的unix时间戳则会覆盖旧的点
i = 0
while i < num:
if num - i < 100:
# 结束循环
batch_data = points[i:]
i = num
else:
batch_data = points[i:i + 100]
i += 100
# 使用json.dumps()将轨迹点转换为json字符串
params["points"] = json.dumps(batch_data)
response = requests.post(url, data=params)
if response.status_code == 200:
response = response.json()
if response['errcode'] != 10000:
print('轨迹上传失败')
print(response['errmsg'])
self.result_data['upload']['upload_status'] = 'part'
self.result_data['upload']['upload_num'] = num - len(batch_data)
return None
def search_track(self):
url = 'https://tsapi.amap.com/v1/track/terminal/trsearch'
params = {'key':self.key, 'sid': self.sid, 'tid': self.tid, 'trid': self.trid}
params.update(self.preprocess_params)
# 设置轨迹处理相关的参数
# trid与starttime与endtime必填一项
if self.trid is None and ('starttime' not in params or 'endtime' not in params):
print('trid、starttime、endtime必填一项')
return None
response = requests.get(url,params=params)
if response.status_code == 200:
response = response.json()
if response['errcode'] == 10000:
points = []
tracks = response['data']['tracks']
for track in tracks:
points.extend(track['points'])
self.result_data = {'search': {'points': points}}
# 后续可解析为dataframe:location、locatetime、speed、direction、accuracy
return self.result_data
else:
print(response['errmsg'])
return None
return None
def process(self):
if self.sid is None:
# 创建新的服务
self.add_service()
return self.result_data
if self.tid is None:
# 创建新的终端
self.add_terminal()
return self.result_data
if self.trid is None:
# 创建新的轨迹
self.add_track()
return self.result_data
if self.data is not None:
# 上传轨迹(轨迹数据)
self.upload_track()
return self.result_data
# 查询轨迹(包含预处理)
self.search_track()
return self.result_data
if __name__ == '__main__':
path = r'data/raw_data'
# 【孤立噪点】
file = '孤立噪点.json'
# 【多个噪点集中分布】
# file = '多个噪点集中分布_1.json'
# file = '多个噪点集中分布_2.json'
# 【多个噪点反复横跳】
# file = '多个噪点反复横跳.json'
# 读取异常段轨迹信息,用于上传
with open(os.path.join(path, file), 'r', encoding='utf-8') as f:
data = json.load(f)
data = pd.DataFrame(data['points'])
# 高德猎鹰轨迹服务
# 添加service
# lie_ying = LieYing(key='0ea6bcXXXXXXXXXXXXXb67f575c26')
# 添加terminal
# lie_ying = LieYing(key='0ea6bcXXXXXXXXXXXXXb67f575c26',
# sid=1049770)
# 添加trace
# lie_ying = LieYing(key='0ea6bcXXXXXXXXXXXXXb67f575c26',
# sid=1049770,
# tid=1322647895)
# 上传轨迹(批量上传)
# lie_ying = LieYing(key='0ea6bcXXXXXXXXXXXXXb67f575c26',
# sid=1049770,
# tid=1322647895,
# trid=60,
# data=sub_data)
# 轨迹预处理:降噪、纠偏...
# 轨迹降噪/纠偏相关参数为correction;轨迹补全相关参数为recoup
# 【注意】:起止时间差最多24小时
preprocess_params = {'starttime': 17