下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1133
这个系统包含三个主要组件:电话号码生成器、短信发送器和日志记录器。电话号码生成器可以批量创建随机手机号并保存到CSV文件。短信发送器使用多线程技术提高发送效率,并支持重试机制。日志记录器会实时记录发送状态。使用时需要替换API URL和密钥。
代码语言:txt复制
import random
import time
import csv
from datetime import datetime
import requests
from threading import Thread
from queue import Queue
class PhoneNumberGenerator:
def __init__(self, country_code="86"):
self.country_code = country_code
self.operators = ["134", "135", "136", "137", "138", "139",
"150", "151", "152", "157", "158", "159",
"187", "188", "130", "131", "132", "155",
"156", "185", "186", "133", "153", "180", "189"]
def generate_random_number(self):
prefix = random.choice(self.operators)
suffix = ''.join([str(random.randint(0, 9)) for _ in range(8)])
return f"{self.country_code}{prefix}{suffix}"
def generate_batch_numbers(self, count=100):
return [self.generate_random_number() for _ in range(count)]
def save_numbers_to_csv(self, filename, numbers):
with open(filename, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerow(["Phone Number"])
for num in numbers:
writer.writerow([num])
class SMSSender:
def __init__(self, api_url, api_key, max_retries=3):
self.api_url = api_url
self.api_key = api_key
self.max_retries = max_retries
self.log_queue = Queue()
self.sent_count = 0
self.failed_count = 0
def send_single_sms(self, phone_number, message):
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"phone": phone_number,
"message": message,
"timestamp": str(datetime.now())
}
for attempt in range(self.max_retries):
try:
response = requests.post(
self.api_url,
json=payload,
headers=headers,
timeout=10
)
if response.status_code == 200:
self.sent_count += 1
self.log_queue.put(f"SUCCESS: Sent to {phone_number}")
return True
else:
time.sleep(1)
except Exception as e:
self.log_queue.put(f"ERROR: {str(e)}")
time.sleep(2)
self.failed_count += 1
self.log_queue.put(f"FAILED: Could not send to {phone_number}")
return False
def send_bulk_sms(self, phone_numbers, message, max_threads=5):
threads = []
for number in phone_numbers:
while threading.active_count() >= max_threads:
time.sleep(0.1)
t = Thread(target=self.send_single_sms, args=(number, message))
t.start()
threads.append(t)
for t in threads:
t.join()
return {
"total": len(phone_numbers),
"sent": self.sent_count,
"failed": self.failed_count
}
class SMSLogger:
def __init__(self, log_file="sms_log.txt"):
self.log_file = log_file
def start_logging(self, log_queue):
with open(self.log_file, 'a') as f:
while True:
log_entry = log_queue.get()
if log_entry == "STOP":
break
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
f.write(f"[{timestamp}] {log_entry}\n")
f.flush()
def main():
# 初始化组件
generator = PhoneNumberGenerator()
sender = SMSSender(
api_url="https://api.smsprovider.com/v1/send",
api_key="your_api_key_here"
)
logger = SMSLogger()
# 生成电话号码
print("Generating phone numbers...")
numbers = generator.generate_batch_numbers(1000)
generator.save_numbers_to_csv("phone_numbers.csv", numbers)
# 启动日志线程
log_thread = Thread(target=logger.start_logging, args=(sender.log_queue,))
log_thread.start()
# 发送短信
print("Starting SMS sending...")
message = "您好,这是测试短信,请忽略。"
results = sender.send_bulk_sms(numbers[:100], message)
# 停止日志并显示结果
sender.log_queue.put("STOP")
log_thread.join()
print("\nSending completed!")
print(f"Total: {results['total']}")
print(f"Sent: {results['sent']}")
print(f"Failed: {results['failed']}")
if __name__ == "__main__":
main()