需求分析
本工具旨在帮助用户根据质谱数据(质量数),自动从HMDB数据库批量检索代谢物信息,包括HMDB ID、常用名称、化学式及结构图片等,并导出为Excel可直接打开的CSV文件,极大提升代谢物注释效率。
程序介绍
- 准备数据:将所有待检索的质量数写入
negative.txt文件,每行一个。 - 运行脚本:
- 双击或命令行运行
hmdb_metabolite_extractor.py脚本。 - 程序自动读取
negative.txt,并按负离子模式批量检索HMDB。 - 检索结果自动保存为
代谢物数据.csv,日志信息保存在代谢物提取.log。
- 双击或命令行运行
- 主要功能模块:
- 质量数批量检索:自动根据质量数和离子模式检索HMDB ID
- 代谢物详细信息抓取:根据HMDB ID获取常用名称、化学式、结构图片等信息
- 并发加速与重试机制:采用多线程并发抓取,自动重试机制
- 结果导出:自动整理为CSV文件,支持Excel直接打开
部分代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""代谢物数据处理工具"""
import json, os, logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from typing import List, Dict, Optional
class DataManager:
def __init__(self, cache_file='cache.json', progress_file='progress.json'):
self.cache_file = cache_file
self.progress_file = progress_file
self.cache = self._load_json(cache_file, {})
self.progress = self._load_json(progress_file, {'processed': []})
def _load_json(self, file: str, default: Dict) -> Dict:
if os.path.exists(file):
try:
with open(file, 'r', encoding='utf-8') as f:
return json.load(f)
except: pass
return default
def save_progress(self, item_id: str):
if item_id not in self.progress['processed']:
self.progress['processed'].append(item_id)
self._save_json(self.progress_file, self.progress)
def _save_json(self, file: str, data: Dict):
try:
with open(file, 'w', encoding='utf-8') as f:
json.dump(data, f)
except Exception as e:
logging.error(f"保存文件失败: {e}")
def is_processed(self, item_id: str) -> bool:
return item_id in self.progress['processed']
def get_cache(self, key: str) -> Optional[Dict]:
return self.cache.get(key)
def set_cache(self, key: str, data: Dict):
self.cache[key] = data
self._save_json(self.cache_file, self.cache)
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("处理日志.log", encoding='utf-8'),
logging.StreamHandler()
]
)
def process_data(items: List[str], resume: bool = False) -> List[Dict]:
"""处理数据的主要函数
Args:
items: 待处理的数据项列表
resume: 是否继续上次的进度
Returns:
处理结果列表
"""
data_manager = DataManager()
results = []
for item in items:
# 检查是否已处理
if resume and data_manager.is_processed(item):
continue
# 获取数据 -
data = {"id": item, "status": "processed"}
results.append(data)
# 保存进度
data_manager.save_progress(item)
return results
def main():
setup_logging()
try:
# 读取输入数据 -
items = ["item1", "item2", "item3"]
# 处理数据
results = process_data(items, resume=True)
# 保存结果 -
if results:
logging.info(f"成功处理 {len(results)} 条数据")
except Exception as e:
logging.error(f"程序执行出错: {e}")
if __name__ == "__main__":
main()
视频版本
- 哔哩哔哩
- YouTube
▶ 远程修电脑,请访问 章九工具箱,点击电脑维修,加我微信咨询。
▶ 本网站的部分内容可能来源于网络,仅供大家学习与参考,如有侵权请联系我核实删除。
▶ 我是小章,目前全职提供电脑维修和IT咨询服务。如果您有任何电脑相关的问题,都可以问我噢。
文章来自互联网,只做分享使用。发布者:走不完的路,转转请注明出处:https://www.dingdanghao.com/article/1024980.html
