python脚本下载连续波形数据更新。
import os
import numpy as np
from obspy import UTCDateTime, Stream, Trace
from obspy.io.sac import SACTrace
from obspy.clients.fdsn import Client
from obspy.signal.rotate import rotate_ne_rt
from concurrent.futures import ThreadPoolExecutor, as_completed
import traceback
from collections import defaultdict
# 参数设置
client = Client("IRIS")
output_dir = "daily_waveforms"
os.makedirs(output_dir, exist_ok=True)
sta_file = "station.list"
start_date = UTCDateTime("2023-09-16")
end_date = UTCDateTime("2023-09-27") # 包括该天
thread_workers = 6
exception_log = "exceptions.txt"
sampling_rate = 1.0 # LH通道的采样率 (1 Hz)
expected_npts = 86400 # 86400秒 * 1Hz采样率
# 打印工作目录与输出目录
print(f"📁📁📁📁 当前工作目录: {os.getcwd()}")
print(f"📁📁📁📁 保存路径: {os.path.abspath(output_dir)}")
# 读取台站列表
sta_list = []
with open(sta_file, "r") as sf:
for line in sf:
if line.strip() and not line.strip().startswith("#"):
parts = line.strip().split()
if len(parts) >= 2:
net, sta = parts[0], parts[1]
sta_list.append((net, sta))
# 记录异常到文件
def log_exception(msg):
with open(exception_log, "a") as f:
f.write(f"{UTCDateTime.now().isoformat()} - {msg}\n")
# 保存为SAC文件(每个通道一个文件)
def save_channel_sac(tr, day_dir, net, sta, station_coords):
try:
# 确保目录存在
os.makedirs(day_dir, exist_ok=True)
# 文件名格式: NET_STA_CHAN.SAC
chan = tr.stats.channel
filename = f"{net}_{sta}_{chan}.SAC"
filepath = os.path.join(day_dir, filename)
# 创建SACTrace对象
sac = SACTrace.from_obspy_trace(tr)
# 设置台站信息
sac.kstnm = sta
sac.knetwk = net
# 设置坐标信息
if station_coords is not None:
sac.stla = station_coords["latitude"]
sac.stlo = station_coords["longitude"]
sac.stel = station_coords["elevation"]
sac.stdp = station_coords.get("local_depth", 0.0)
print(f" ✅ 添加坐标: {sac.stla:.4f}, {sac.stlo:.4f}")
else:
sac.stla = 0.0
sac.stlo = 0.0
sac.stel = 0.0
sac.stdp = 0.0
print(f" ⚠⚠⚠️ 无坐标信息")
# 保存文件
sac.write(filepath)
print(f" 保存: {filename}")
return filepath
except Exception as e:
log_exception(f"保存SAC文件失败 {net}.{sta}.{chan}: {str(e)}")
traceback.print_exc()
return None
# 数据补零处理
def fill_gaps(tr, start_time):
"""
确保数据有完整的86400个点,缺失部分补零
"""
try:
# 创建新的头信息
new_header = tr.stats.copy()
new_header.starttime = start_time # 确保从当天00:00:00开始
new_header.npts = expected_npts
new_header.sampling_rate = sampling_rate
# 创建全零数据数组
new_data = np.zeros(expected_npts, dtype=tr.data.dtype)
# 计算数据偏移量(当前数据在当天中的起始位置)
start_diff = int((tr.stats.starttime - start_time))
# 确保偏移量在合理范围内
if 0 <= start_diff < expected_npts:
end_index = min(start_diff + tr.stats.npts, expected_npts)
valid_length = end_index - start_diff
# 将实际数据复制到全零数组的相应位置
if valid_length > 0:
new_data[start_diff:end_index] = tr.data[:valid_length]
# 创建新的Trace
return Trace(data=new_data, header=new_header)
except Exception as e:
log_exception(f"数据补零失败: {str(e)}")
return tr
# 旋转LH1/LH2分量到LHE/LHN
def rotate_to_EN(tr1, tr2):
"""将两个水平分量(LH1/LH2)旋转到地理坐标系(LHE/LHN)"""
try:
# 假设LH1是北分量,LH2是东分量
north = tr1.data
east = tr2.data
# 旋转分量
n, e = rotate_ne_rt(north, east, 0)
# 创建新的Trace对象
trN = Trace(data=n)
trN.stats = tr1.stats.copy()
trN.stats.channel = "LHN"
trE = Trace(data=e)
trE.stats = tr2.stats.copy()
trE.stats.channel = "LHE"
return trN, trE
except Exception as e:
log_exception(f"旋转分量失败: {e}")
return tr1, tr2
# 处理水平分量
def process_horizontal_components(st):
"""
处理水平分量:
1. 如果只有LH1和LH2,旋转为LHE和LHN,并删除原始的极LH1/LH2
2. 如果存在LHN和LHE,优先使用它们,并删除任何LH1/LH2分量
"""
try:
# 按通道类型分组
comp_groups = defaultdict(list)
for tr in st:
comp_groups[tr.stats.channel].append(tr)
# 处理水平分量
horizontal_stream = Stream()
# 优先选择LHN和LHE分量
en_comps = ["LHE", "LHN"]
has_EN = any(comp in comp_groups for comp in en_comps)
if has_EN:
# 使用已有的LHN/LHE分量(只取第一个)
for comp in en_comps:
if comp in comp_groups:
horizontal_stream.append(comp_groups[comp][0])
print(" 使用现有的LHN/LHE分量")
# 删除任何存在的LH1/LH2分量
if "LH1" in comp_groups or "LH2" in comp_groups:
print(" 删除原始的LH1/LH2分量")
else:
# 检查是否有LH1和LH2
rt_comps = ["LH1", "LH2"]
if all(comp in comp_groups for comp in rt_comps):
# 获取LH1和LH2分量(只取第一个)
lh1 = comp_groups["LH1"][0]
lh2 = comp_groups["LH2"][0]
# 旋转到EN分量
trN, trE = rotate_to_EN(lh1, lh2)
horizontal_stream.append(trN)
horizontal_stream.append(trE)
print(" 旋转LH1/LH2为LHN/LHE")
# 删除原始的LH1/LH2分量
print(" 删除原始的LH1/LH2分量")
return horizontal_stream
except Exception as e:
log_exception(f"处理水平分量失败: {e}")
return Stream()
# 下载并处理单个台站某天的数据
def download_station(net, sta, day):
station_coords = None # 存储台站坐标
try:
start = UTCDateTime(day)
end = start + 86400
day_str = start.strftime("%Y%m%d")
# 获取台站元数据
print(f" 获取 {net}.{sta} 元数据...")
try:
inv = client.get_stations(
network=net,
station=sta,
starttime=start,
endtime=end,
level="channel"
)
# 尝试获取台站坐标(使用LHZ通道)
try:
station_coords = inv.get_coordinates(f"{net}.{sta}.00.LHZ", start)
print(f" ✅ 获取坐标: {station_coords['latitude']:.4f}, {station_coords['longitude']:.4f}")
except:
# 如果LHZ失败,尝试其他LH通道
for chan in ["LHN", "LHE", "LH1", "LH2"]:
try:
station_coords = inv.get_coordinates(f"{net}.{sta}.00.{chan}", start)
print(f" ✅ 获取坐标: {station_coords['latitude']:.4f}, {station_coords['longitude']:.4f}")
break
except:
continue
if station_coords is None:
print(" ⚠⚠⚠️ 无法获取坐标")
except Exception as e:
print(f" ⚠⚠⚠️ 元数据获取失败: {str(e)}")
# 下载波形数据
print(f" 下载 {net}.{sta} 波形数据...")
st = client.get_waveforms(net, sta, "*", "LH?", start, end, attach_response=True)
# 如果没数据,直接返回
if len(st) == 0:
return (net, sta, False, "无数据")
# 检查是否有LHZ分量(只取第一个)
vertical_st = st.select(channel="LHZ")
if len(vertical_st) == 0:
print(f" ⚠⚠⚠️ 跳过 {net}.{sta} - 无LHZ分量")
return (net, sta, False, "无LHZ分量")
else:
# 只保留第一个LHZ分量
vertical_tr = vertical_st[0]
# 去除仪器响应
print(" 去除仪器响应...")
st.remove_response(output="VEL", pre_filt=(0.008, 0.01, 0.3, 0.4),
taper=True, zero_mean=True, taper_fraction=0.05)
# 处理水平分量
print(" 处理水平分量...")
horizontal_st = process_horizontal_components(st)
# 合并垂直和水平分量
processed_st = Stream([vertical_tr]) + horizontal_st
# 确保只有三个分量:LHZ, LHN, LHE
final_st = Stream()
channels = set()
for tr in processed_st:
# 只添加LHZ、LHN和LHE分量
if tr.stats.channel in ["LHZ", "LHN", "LHE"]:
if tr.stats.channel not in channels:
final_st.append(tr)
channels.add(tr.stats.channel)
else:
print(f" ⚠⚠⚠️ 跳过重复通道: {tr.stats.channel}")
# 数据补零处理
print(" 数据补零处理...")
filled_st = Stream()
for tr in final_st:
filled_tr = fill_gaps(tr, start)
filled_st.append(filled_tr)
# 创建日期目录
day_dir = os.path.join(output_dir, day_str)
# 保存每个通道的数据
saved_files = []
for tr in filled_st:
filepath = save_channel_sac(tr, day_dir, net, sta, station_coords)
if filepath:
saved_files.append(filepath)
return (net, sta, True, saved_files)
except Exception as e:
error_msg = f"{str(e)}"
if hasattr(e, 'response') and e.response is not None:
error_msg += f" (Status: {e.response.status_code})"
return (net, sta, False, error_msg)
# 遍历日期,按天下载并保存
current_day = start_date
while current_day <= end_date:
day_str = current_day.strftime("%Y%m%d")
day_dir = os.path.join(output_dir, day_str)
# 检查是否已下载
if os.path.exists(day_dir) and os.path.isdir(day_dir):
print(f"\n📆📆📆📆 日期 {current_day.date} 已存在,跳过处理")
# 进入下一天
current_day += 86400
continue
print(f"\n📆📆📆📆 正在处理日期: {current_day.date}")
# 创建日期目录
os.makedirs(day_dir, exist_ok=True)
# 异常记录每天追加
log_lines = []
success_count = 0
fail_count = 0
# 启动多线程下载当天所有台站数据
with ThreadPoolExecutor(max_workers=thread_workers) as executor:
futures = {executor.submit(download_station, net, sta, current_day): (net, sta) for net, sta in sta_list}
for future in as_completed(futures):
net, sta = futures[future]
try:
net, sta, ok, result = future.result()
if ok:
success_count += 1
file_count = len(result)
print(f"✅ {net}.{sta} 处理成功 - 保存了 {file_count} 个SAC文件")
for filepath in result:
print(f" ↳↳ {os.path.basename(filepath)}")
else:
fail_count += 1
print(f"❌❌❌❌ {net}.{sta} 处理失败: {result}")
log_lines.append(f"{current_day.date} {net}.{sta} ❌❌❌❌ {result}")
except Exception as e:
fail_count += 1
error_msg = f"{str(e)}"
print(f"❌❌❌❌ {net}.{sta} 异常: {error_msg}")
log_lines.append(f"{current_day.date} {net}.{sta} ❌❌❌❌ {error_msg}")
traceback.print_exc()
print(f"\n📊📊 本日统计: {success_count} 个台站成功, {fail_count} 个台站失败")
# 写入异常日志
if log_lines:
with open(exception_log, "a") as elog:
elog.write("\n".join(log_lines) + "\n")
# 进入下一天
current_day += 86400
print("\n🎉🎉 所有日期处理完成!")
此脚本完成以下操作: * 多线程下载指定定时间段的LH分量数据。 * 按天保存到同一个文件夹,检查当天的文件是否已经建立,如果已建立则跳过(防止重复下载)。 * 检查是否有LHZ分量,如果没有则跳过此台。 * 去除仪器响应,保存为速度记录,滤波到0.008-0.4Hz。 * 仅保存第一个location(空,00,01)的LHZ,LHE,LHN。 * 如果同时有LH1,LH2,LHE,LHN则删除LH1,LH2分量。 * 如果仅有LH1,LH2,则旋转到LHE,LHN,删除LH1,LH2。 * 如果不足86400则补零,对齐到当天的00:00:00。 * 保存为SAC格式,文件名为NET_STA_COM.SAC。