下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1133

这个系统包含三个主要组件:电话号码生成器、短信发送器和日志记录器。电话号码生成器可以批量创建随机手机号并保存到CSV文件。短信发送器使用多线程技术提高发送效率,并支持重试机制。日志记录器会实时记录发送状态。使用时需要替换API URL和密钥。

代码语言:txt复制

import random

import time

import csv

from datetime import datetime

import requests

from threading import Thread

from queue import Queue

class PhoneNumberGenerator:

def __init__(self, country_code="86"):

self.country_code = country_code

self.operators = ["134", "135", "136", "137", "138", "139",

"150", "151", "152", "157", "158", "159",

"187", "188", "130", "131", "132", "155",

"156", "185", "186", "133", "153", "180", "189"]

def generate_random_number(self):

prefix = random.choice(self.operators)

suffix = ''.join([str(random.randint(0, 9)) for _ in range(8)])

return f"{self.country_code}{prefix}{suffix}"

def generate_batch_numbers(self, count=100):

return [self.generate_random_number() for _ in range(count)]

def save_numbers_to_csv(self, filename, numbers):

with open(filename, 'w', newline='') as file:

writer = csv.writer(file)

writer.writerow(["Phone Number"])

for num in numbers:

writer.writerow([num])

class SMSSender:

def __init__(self, api_url, api_key, max_retries=3):

self.api_url = api_url

self.api_key = api_key

self.max_retries = max_retries

self.log_queue = Queue()

self.sent_count = 0

self.failed_count = 0

def send_single_sms(self, phone_number, message):

headers = {

"Authorization": f"Bearer {self.api_key}",

"Content-Type": "application/json"

}

payload = {

"phone": phone_number,

"message": message,

"timestamp": str(datetime.now())

}

for attempt in range(self.max_retries):

try:

response = requests.post(

self.api_url,

json=payload,

headers=headers,

timeout=10

)

if response.status_code == 200:

self.sent_count += 1

self.log_queue.put(f"SUCCESS: Sent to {phone_number}")

return True

else:

time.sleep(1)

except Exception as e:

self.log_queue.put(f"ERROR: {str(e)}")

time.sleep(2)

self.failed_count += 1

self.log_queue.put(f"FAILED: Could not send to {phone_number}")

return False

def send_bulk_sms(self, phone_numbers, message, max_threads=5):

threads = []

for number in phone_numbers:

while threading.active_count() >= max_threads:

time.sleep(0.1)

t = Thread(target=self.send_single_sms, args=(number, message))

t.start()

threads.append(t)

for t in threads:

t.join()

return {

"total": len(phone_numbers),

"sent": self.sent_count,

"failed": self.failed_count

}

class SMSLogger:

def __init__(self, log_file="sms_log.txt"):

self.log_file = log_file

def start_logging(self, log_queue):

with open(self.log_file, 'a') as f:

while True:

log_entry = log_queue.get()

if log_entry == "STOP":

break

timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

f.write(f"[{timestamp}] {log_entry}\n")

f.flush()

def main():

# 初始化组件

generator = PhoneNumberGenerator()

sender = SMSSender(

api_url="https://api.smsprovider.com/v1/send",

api_key="your_api_key_here"

)

logger = SMSLogger()

# 生成电话号码

print("Generating phone numbers...")

numbers = generator.generate_batch_numbers(1000)

generator.save_numbers_to_csv("phone_numbers.csv", numbers)

# 启动日志线程

log_thread = Thread(target=logger.start_logging, args=(sender.log_queue,))

log_thread.start()

# 发送短信

print("Starting SMS sending...")

message = "您好,这是测试短信,请忽略。"

results = sender.send_bulk_sms(numbers[:100], message)

# 停止日志并显示结果

sender.log_queue.put("STOP")

log_thread.join()

print("\nSending completed!")

print(f"Total: {results['total']}")

print(f"Sent: {results['sent']}")

print(f"Failed: {results['failed']}")

if __name__ == "__main__":

main()