threads_auto_reply/src/threads.py

69 lines
1.9 KiB
Python

import requests
import time
import json
class ThreadsAPI:
def __init__(self):
self.ACCESS_TOKEN = self.get_thread_token()
self.user_id = "32143803045264731"
def get_thread_token(self):
with open('token.json') as fp:
token = json.load(fp)['token']
return token
def get_posts(self):
URL = "https://graph.threads.net/v1.0/me/threads?fields=id,permalink,owner,username,text,shortcode&access_token={}"
response = requests.get(URL.format(self.ACCESS_TOKEN)).json()
posts = response['data']
return posts
def get_comments_by_post(self, post):
URL = "https://graph.threads.net/v1.0/{}/conversation?fields=id,text,username,is_reply_owned_by_me&reverse=false&access_token={}"
response = requests.get(URL.format(post['id'], self.ACCESS_TOKEN)).json()
comments = response['data']
return comments
def get_post_by_link(self, link):
posts = self.get_posts()
target_post = None
for post in posts:
if post['permalink'] in link:
target_post = post
break
return target_post
def send_reply(self, comment_id: str, reply: str):
url = "https://graph.threads.net/v1.0/me/threads"
payload = {
"media_type": "TEXT_POST", # 如果只是文字回覆就用 TEXT
"text": reply,
"reply_to_id": comment_id,
"access_token": self.ACCESS_TOKEN,
}
response = requests.post(url, data=payload)
print(response)
print(response.json())
CONTAINER_ID = response.json()['id']
url = f"https://graph.threads.net/v1.0/{self.user_id}/threads_publish"
payload = {
"creation_id": CONTAINER_ID,
"access_token": self.ACCESS_TOKEN,
}
time.sleep(5)
response = requests.post(url, data=payload)
print(response)
print(response.json())