import requests import time from concurrent.futures import ThreadPoolExecutor, as_completed # 设置参数 INPUT_FILE = "urls.txt" # 输入的URL文件 OUTPUT_FILE = "results.txt" # 输出结果文件 MAX_THREADS = 20 # 最大并发数 TIMEOUT = 5 # 每个请求的超时时间(秒) HEADERS = { # 自定义请求头 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36" } PRINT_PROGRESS_EVERY = 5 # 每5条URL显示一次进度 # 检查链接是否有效 def check_url(url): try: # 使用 HEAD 请求以避免下载文件内容 response = requests.head(url, headers=HEADERS, timeout=TIMEOUT, allow_redirects=True) # 检查状态码 if response.status_code == 200: return f"{url}@有效" elif response.status_code == 404: return f"{url}@无效 - 路径不存在" else: return f"{url}@无效 - 状态码: {response.status_code}" except requests.RequestException as e: return f"{url}@无效 - 错误: {e}" def main(): # 从文件中读取URL with open(INPUT_FILE, "r", encoding="utf-8") as file: urls = [line.strip() for line in file] total_urls = len(urls) results = [] print(f"共 {total_urls} 条 URL,开始验证...") # 开始计时 start_time = time.time() # 并发验证URL with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: futures = {executor.submit(check_url, url): url for url in urls} for i, future in enumerate(as_completed(futures), start=1): result = future.result() results.append(result) # 进度显示 if i % PRINT_PROGRESS_EVERY == 0 or i == total_urls: elapsed = time.time() - start_time print(f"正在执行第 {i}/{total_urls} 条 URL,用时 {elapsed:.2f} 秒") # 记录总用时 total_time = time.time() - start_time # 将结果写入到输出文件中,确保编码为UTF-8 with open(OUTPUT_FILE, "w", encoding="utf-8") as file: for result in results: file.write(result + "\n") print(f"验证完成!共执行了 {total_urls} 条链接,总用时 {total_time:.2f} 秒。结果已保存到 {OUTPUT_FILE}") input("按回车键退出...") # 执行程序 main()
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。
评论0+