mirror of
https://github.com/skoobasteeve/telegram-moviebot.git
synced 2026-03-20 03:28:57 +00:00
Update for Python 3.14 and python-telegram-bot version 22
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1,2 +1,3 @@
|
||||
.vscode
|
||||
__pycache__
|
||||
__pycache__
|
||||
.envrc
|
||||
|
||||
1
.python-version
Normal file
1
.python-version
Normal file
@@ -0,0 +1 @@
|
||||
3.14
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM python:3-alpine
|
||||
FROM python:3.14-alpine
|
||||
|
||||
LABEL com.telegram-moviebot.version="main"
|
||||
|
||||
@@ -13,4 +13,4 @@ USER telegram:telegram
|
||||
|
||||
COPY . .
|
||||
|
||||
CMD [ "python", "/telegram-moviebot/telegram-moviebot.py" ]
|
||||
CMD [ "python", "/telegram-moviebot/telegram-moviebot.py" ]
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
requests
|
||||
python-telegram-bot
|
||||
requests~=2.32
|
||||
python-telegram-bot~=20.0
|
||||
|
||||
@@ -1,46 +1,46 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import requests
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
def tmdb_lookup(tmdb_url, tmdb_headers, movie, year=None):
|
||||
|
||||
movie = re.sub('[^a-zA-Z.\d\s]', '', movie)
|
||||
movie = re.sub(r"[^a-zA-Z.\d\s]", "", movie)
|
||||
tmdb_params = {
|
||||
"language": "en-US",
|
||||
"query": movie,
|
||||
"page": 1,
|
||||
"include_adult": False
|
||||
"include_adult": False,
|
||||
}
|
||||
|
||||
if year:
|
||||
tmdb_params["primary_release_year"] = year
|
||||
|
||||
tmdb_search = requests.get(f"{tmdb_url}/search/movie", params=tmdb_params,
|
||||
headers=tmdb_headers)
|
||||
tmdb_search = requests.get(
|
||||
f"{tmdb_url}/search/movie", params=tmdb_params, headers=tmdb_headers
|
||||
)
|
||||
|
||||
if tmdb_search.status_code == 401:
|
||||
return "401", "401", "401", "401"
|
||||
|
||||
|
||||
tmdb_search = tmdb_search.json()
|
||||
|
||||
if not tmdb_search["results"]:
|
||||
return "404", "404", "404", "404"
|
||||
|
||||
movie_id = tmdb_search['results'][0]['id']
|
||||
movie_title = tmdb_search['results'][0]['title']
|
||||
movie_release_check = tmdb_search['results'][0]['release_date']
|
||||
movie_id = tmdb_search["results"][0]["id"]
|
||||
movie_title = tmdb_search["results"][0]["title"]
|
||||
movie_release_check = tmdb_search["results"][0]["release_date"]
|
||||
|
||||
if movie_release_check:
|
||||
movie_release = datetime.strptime(
|
||||
tmdb_search['results'][0]['release_date'], "%Y-%m-%d")
|
||||
tmdb_search["results"][0]["release_date"], "%Y-%m-%d"
|
||||
)
|
||||
movie_year = movie_release.year
|
||||
else:
|
||||
movie_year = "???"
|
||||
|
||||
movie_rating = tmdb_search['results'][0]['vote_average']
|
||||
movie_rating = tmdb_search["results"][0]["vote_average"]
|
||||
|
||||
return movie_id, movie_title, movie_year, movie_rating
|
||||
|
||||
@@ -49,11 +49,10 @@ def sa_lookup(sa_url, sa_headers, movie_id, country):
|
||||
sa_params = {
|
||||
"country": country,
|
||||
"tmdb_id": f"movie/{movie_id}",
|
||||
"output_language": "en"
|
||||
}
|
||||
"output_language": "en",
|
||||
}
|
||||
|
||||
sa_request = requests.request("GET", sa_url, headers=sa_headers,
|
||||
params=sa_params)
|
||||
sa_request = requests.request("GET", sa_url, headers=sa_headers, params=sa_params)
|
||||
|
||||
if sa_request.status_code == 401:
|
||||
sa_response = "401"
|
||||
@@ -78,11 +77,11 @@ def services_speller(service):
|
||||
elif service == "netflix":
|
||||
service_proper = "Netflix"
|
||||
elif service == "disney":
|
||||
service_proper = "Disney\+"
|
||||
service_proper = "Disney\\+"
|
||||
elif service == "apple":
|
||||
service_proper = "Apple TV\+"
|
||||
service_proper = "Apple TV\\+"
|
||||
elif service == "paramount":
|
||||
service_proper = "Paramount\+"
|
||||
service_proper = "Paramount\\+"
|
||||
elif service == "starz":
|
||||
service_proper = "STARZ"
|
||||
elif service == "showtime":
|
||||
@@ -95,11 +94,11 @@ def services_speller(service):
|
||||
|
||||
|
||||
def char_cleanup(variable):
|
||||
variable = str(variable).replace('-', '\-')
|
||||
variable = str(variable).replace('(', '\(')
|
||||
variable = str(variable).replace(')', '\)')
|
||||
variable = str(variable).replace('+', '\+')
|
||||
variable = str(variable).replace('.', '\.')
|
||||
variable = str(variable).replace('!', '\!')
|
||||
variable = str(variable).replace("-", "\\-")
|
||||
variable = str(variable).replace("(", "\\(")
|
||||
variable = str(variable).replace(")", "\\)")
|
||||
variable = str(variable).replace("+", "\\+")
|
||||
variable = str(variable).replace(".", "\\.")
|
||||
variable = str(variable).replace("!", "\\!")
|
||||
|
||||
return variable
|
||||
|
||||
@@ -1,17 +1,18 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
from telegram.ext import (
|
||||
Updater,
|
||||
CommandHandler,
|
||||
CallbackContext,
|
||||
MessageHandler,
|
||||
Filters)
|
||||
import difflib
|
||||
import logging
|
||||
from telegram import Update, ParseMode
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
import movie_check
|
||||
import difflib
|
||||
from telegram import Update
|
||||
from telegram.constants import ParseMode
|
||||
from telegram.ext import (
|
||||
Application,
|
||||
CommandHandler,
|
||||
ContextTypes,
|
||||
MessageHandler,
|
||||
filters,
|
||||
)
|
||||
|
||||
tmdb_api_token = os.environ.get("TMDB_API_TOKEN")
|
||||
sa_api_token = os.environ.get("SA_API_TOKEN")
|
||||
@@ -23,91 +24,93 @@ logging_debug = os.environ.get("TG_DEBUG")
|
||||
|
||||
tmdb_url = "https://api.themoviedb.org/3"
|
||||
tmdb_headers = {
|
||||
'Authorization': f'Bearer {tmdb_api_token}',
|
||||
'Content-Type': 'application/json;charset=utf-8',
|
||||
'Accept': 'application/json;charset=utf-8'
|
||||
"Authorization": f"Bearer {tmdb_api_token}",
|
||||
"Content-Type": "application/json;charset=utf-8",
|
||||
"Accept": "application/json;charset=utf-8",
|
||||
}
|
||||
|
||||
sa_url = "https://streaming-availability.p.rapidapi.com/get/basic"
|
||||
sa_headers = {
|
||||
'x-rapidapi-host': "streaming-availability.p.rapidapi.com",
|
||||
'x-rapidapi-key': sa_api_token
|
||||
}
|
||||
|
||||
|
||||
updater = Updater(token=bot_token, use_context=True)
|
||||
dispatcher = updater.dispatcher
|
||||
"x-rapidapi-host": "streaming-availability.p.rapidapi.com",
|
||||
"x-rapidapi-key": sa_api_token,
|
||||
}
|
||||
|
||||
|
||||
if logging_debug and logging_debug == "True":
|
||||
logging.basicConfig(
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
level=logging.DEBUG)
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
level=logging.DEBUG,
|
||||
)
|
||||
else:
|
||||
logging.basicConfig(
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
level=logging.INFO)
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
level=logging.INFO,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def shutdown():
|
||||
updater.stop()
|
||||
updater.is_idle = False
|
||||
|
||||
|
||||
def start(update: Update, context: CallbackContext):
|
||||
user_firstname = update.message.from_user['first_name']
|
||||
user_id = update.message.from_user['id']
|
||||
username = update.message.from_user['username'] or 'empty'
|
||||
logger.info(f'Session initiated by user: {user_firstname} ({username}, {user_id})')
|
||||
movie_handler = MessageHandler(Filters.text & (~Filters.command),
|
||||
input_movie)
|
||||
dispatcher.add_handler(movie_handler)
|
||||
context.bot.send_message(chat_id=update.effective_chat.id,
|
||||
text="I'm a movie streaming bot! Type in a " +
|
||||
"movie and I'll tell you where to stream it.")
|
||||
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
user_firstname = update.message.from_user["first_name"]
|
||||
user_id = update.message.from_user["id"]
|
||||
username = update.message.from_user["username"] or "empty"
|
||||
logger.info(f"Session initiated by user: {user_firstname} ({username}, {user_id})")
|
||||
await context.bot.send_message(
|
||||
chat_id=update.effective_chat.id,
|
||||
text="I'm a movie streaming bot! Type in a "
|
||||
+ "movie and I'll tell you where to stream it.",
|
||||
)
|
||||
|
||||
|
||||
def movie_lookup(movie, user_firstname):
|
||||
|
||||
if "-Year" in movie:
|
||||
year = movie.split("-Year")[1].strip()
|
||||
movie = movie.split("-Year")[0].strip()
|
||||
logger.info(f'{user_firstname}: Looking up movie: "{movie}" ({year})')
|
||||
movie_id, movie_title, movie_year, movie_rating = (
|
||||
movie_check.tmdb_lookup(tmdb_url, tmdb_headers, movie, year))
|
||||
movie_id, movie_title, movie_year, movie_rating = movie_check.tmdb_lookup(
|
||||
tmdb_url, tmdb_headers, movie, year
|
||||
)
|
||||
else:
|
||||
logger.info(f'{user_firstname}: Looking up movie: "{movie}"')
|
||||
movie_id, movie_title, movie_year, movie_rating = (
|
||||
movie_check.tmdb_lookup(tmdb_url, tmdb_headers, movie))
|
||||
movie_id, movie_title, movie_year, movie_rating = movie_check.tmdb_lookup(
|
||||
tmdb_url, tmdb_headers, movie
|
||||
)
|
||||
|
||||
tmdb_page = "https://themoviedb.org/movie/"
|
||||
|
||||
if movie_id == "404":
|
||||
tg_reply = (f"{user_firstname}: I'm having trouble finding that movie\. " +
|
||||
"Check your spelling and try again\.")
|
||||
tg_reply = (
|
||||
f"{user_firstname}: I'm having trouble finding that movie\\. "
|
||||
+ "Check your spelling and try again\\."
|
||||
)
|
||||
logger.warning(f'{user_firstname}: Movie "{movie}" not found in TMDB.')
|
||||
similarity = 0
|
||||
error_response = False
|
||||
return tg_reply, similarity, error_response
|
||||
|
||||
|
||||
if movie_id == "401":
|
||||
tg_reply = ("Invalid TMDB API token\. " +
|
||||
"Bot shutting down until restarted\.\.\.")
|
||||
logger.error('Invalid TMDB API token. Exiting...')
|
||||
tg_reply = (
|
||||
"Invalid TMDB API token\\. " + "Bot shutting down until restarted\\.\\.\\."
|
||||
)
|
||||
logger.error("Invalid TMDB API token. Exiting...")
|
||||
similarity = 0
|
||||
error_response = True
|
||||
return tg_reply, similarity, error_response
|
||||
|
||||
sa_response, services = movie_check.sa_lookup(sa_url, sa_headers, movie_id, country)
|
||||
if sa_response == "404":
|
||||
logger.warning(f'{user_firstname}: Movie "{movie}" not found by the Streaming Availability API.')
|
||||
|
||||
logger.warning(
|
||||
f'{user_firstname}: Movie "{movie}" not found by the Streaming Availability API.'
|
||||
)
|
||||
|
||||
if sa_response == "401":
|
||||
tg_reply = ("Invalid Streaming Availability API token\. " +
|
||||
"Bot shutting down until restarted\.\.\.")
|
||||
logger.error(f'{user_firstname}: Invalid Streaming Availability API token. Exiting...')
|
||||
tg_reply = (
|
||||
"Invalid Streaming Availability API token\\. "
|
||||
+ "Bot shutting down until restarted\\.\\.\\."
|
||||
)
|
||||
logger.error(
|
||||
f"{user_firstname}: Invalid Streaming Availability API token. Exiting..."
|
||||
)
|
||||
similarity = 0
|
||||
error_response = True
|
||||
return tg_reply, similarity, error_response
|
||||
@@ -115,24 +118,29 @@ def movie_lookup(movie, user_firstname):
|
||||
similarity = difflib.SequenceMatcher(None, movie, movie_title).ratio()
|
||||
sim_percent = "{0:.0f}%".format(similarity * 100)
|
||||
|
||||
logger.info(f'{user_firstname}: Result was a {sim_percent} match.')
|
||||
logger.info(f"{user_firstname}: Result was a {sim_percent} match.")
|
||||
|
||||
movie_title = movie_check.char_cleanup(movie_title)
|
||||
movie_year = movie_check.char_cleanup(movie_year)
|
||||
movie_rating = movie_check.char_cleanup(movie_rating)
|
||||
|
||||
tg_reply = (f"{movie_title} \({movie_year}\)\nRating: {movie_rating}" +
|
||||
f"\n[TMDB]({tmdb_page}{movie_id})")
|
||||
tg_reply = (
|
||||
f"{movie_title} \\({movie_year}\\)\nRating: {movie_rating}"
|
||||
+ f"\n[TMDB]({tmdb_page}{movie_id})"
|
||||
)
|
||||
logger.info(f'{user_firstname}: Returning movie: "{movie_title}: ({movie_year})"')
|
||||
|
||||
if not services or sa_response == "404":
|
||||
tg_reply = tg_reply + "\n\nStreaming not available :\("
|
||||
logger.info(f'{user_firstname}: No streaming available for "{movie_title}: ({movie_year})"')
|
||||
tg_reply = tg_reply + "\n\nStreaming not available :\\("
|
||||
logger.info(
|
||||
f'{user_firstname}: No streaming available for "{movie_title}: ({movie_year})"'
|
||||
)
|
||||
else:
|
||||
for s in services:
|
||||
leaving_epoch = sa_response["streamingInfo"][s]["us"]["leaving"]
|
||||
leaving_date = datetime.fromtimestamp(
|
||||
int(leaving_epoch)).strftime('%Y\-%m\-%d')
|
||||
leaving_date = datetime.fromtimestamp(int(leaving_epoch)).strftime(
|
||||
"%Y\\-%m\\-%d"
|
||||
)
|
||||
link = sa_response["streamingInfo"][s]["us"]["link"]
|
||||
|
||||
s_pretty = movie_check.services_speller(s)
|
||||
@@ -147,56 +155,79 @@ def movie_lookup(movie, user_firstname):
|
||||
return tg_reply, similarity, error_response
|
||||
|
||||
|
||||
def input_movie(update: Update, context: CallbackContext):
|
||||
user_firstname = update.message.from_user['first_name']
|
||||
async def input_movie(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
user_firstname = update.message.from_user["first_name"]
|
||||
movie = update.message.text.title()
|
||||
movie_info, similarity, error_response = movie_lookup(movie, user_firstname)
|
||||
context.bot.send_message(chat_id=update.effective_chat.id,
|
||||
text=movie_info, parse_mode=ParseMode.MARKDOWN_V2)
|
||||
await context.bot.send_message(
|
||||
chat_id=update.effective_chat.id,
|
||||
text=movie_info,
|
||||
parse_mode=ParseMode.MARKDOWN_V2,
|
||||
)
|
||||
if error_response:
|
||||
shutdown()
|
||||
if similarity < .80 and similarity != 0:
|
||||
logger.info(f"{user_firstname}: Result accuracy was below the threshold. Sending follow-up message.")
|
||||
followup_msg = ("Not the movie you're looking for? " +
|
||||
"Try adding '\-year' followed by the release year after the title\.")
|
||||
context.bot.send_message(chat_id=update.effective_chat.id,
|
||||
text=followup_msg, parse_mode=ParseMode.MARKDOWN_V2)
|
||||
# In v20+, we can't easily stop the application from within a handler
|
||||
# The application will need to be stopped externally or we raise an exception
|
||||
logger.error("Critical error occurred. Application should be restarted.")
|
||||
return
|
||||
if similarity < 0.80 and similarity != 0:
|
||||
logger.info(
|
||||
f"{user_firstname}: Result accuracy was below the threshold. Sending follow-up message."
|
||||
)
|
||||
followup_msg = (
|
||||
"Not the movie you're looking for? "
|
||||
+ "Try adding '\\-year' followed by the release year after the title\\."
|
||||
)
|
||||
await context.bot.send_message(
|
||||
chat_id=update.effective_chat.id,
|
||||
text=followup_msg,
|
||||
parse_mode=ParseMode.MARKDOWN_V2,
|
||||
)
|
||||
|
||||
|
||||
def unknown(update: Update, context: CallbackContext):
|
||||
context.bot.send_message(chat_id=update.effective_chat.id,
|
||||
text="Sorry, I didn't understand that command.")
|
||||
async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
||||
await context.bot.send_message(
|
||||
chat_id=update.effective_chat.id,
|
||||
text="Sorry, I didn't understand that command.",
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
if not tmdb_api_token:
|
||||
logger.error("ERROR: TMDB API token not provided. Exiting...")
|
||||
exit()
|
||||
|
||||
|
||||
if not sa_api_token:
|
||||
logger.error("ERROR: Streaming Availability API token not provided. Exiting...")
|
||||
exit()
|
||||
|
||||
|
||||
if not bot_token:
|
||||
logger.error("ERROR: Telegram bot token not provided. Exiting...")
|
||||
exit()
|
||||
|
||||
# Build the Application
|
||||
application = Application.builder().token(bot_token).build()
|
||||
|
||||
# Add start handler with optional user filter
|
||||
if filter_user:
|
||||
start_handler = CommandHandler('start', start,
|
||||
Filters.user(username=filter_user))
|
||||
start_handler = CommandHandler(
|
||||
"start", start, filters.User(username=filter_user)
|
||||
)
|
||||
else:
|
||||
start_handler = CommandHandler('start', start)
|
||||
start_handler = CommandHandler("start", start)
|
||||
|
||||
dispatcher.add_handler(start_handler)
|
||||
application.add_handler(start_handler)
|
||||
|
||||
unknown_handler = MessageHandler(Filters.command, unknown)
|
||||
# Add movie input handler (text messages that are not commands)
|
||||
movie_handler = MessageHandler(filters.TEXT & (~filters.COMMAND), input_movie)
|
||||
application.add_handler(movie_handler)
|
||||
|
||||
dispatcher.add_handler(unknown_handler)
|
||||
# Add unknown command handler
|
||||
unknown_handler = MessageHandler(filters.COMMAND, unknown)
|
||||
application.add_handler(unknown_handler)
|
||||
|
||||
updater.start_polling()
|
||||
# Start the bot
|
||||
application.run_polling()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user