Compare commits

...

2 Commits

6 changed files with 152 additions and 120 deletions

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
.vscode .vscode
__pycache__ __pycache__
.envrc

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.14

View File

@@ -1,4 +1,4 @@
FROM python:3-alpine FROM python:3.14-alpine
LABEL com.telegram-moviebot.version="main" LABEL com.telegram-moviebot.version="main"

View File

@@ -1,2 +1,2 @@
requests requests~=2.32
python-telegram-bot python-telegram-bot~=22.0

View File

@@ -1,25 +1,24 @@
#!/usr/bin/python3
import requests
import re import re
from datetime import datetime from datetime import datetime
import requests
def tmdb_lookup(tmdb_url, tmdb_headers, movie, year=None): def tmdb_lookup(tmdb_url, tmdb_headers, movie, year=None):
movie = re.sub(r"[^a-zA-Z.\d\s]", "", movie)
movie = re.sub('[^a-zA-Z.\d\s]', '', movie)
tmdb_params = { tmdb_params = {
"language": "en-US", "language": "en-US",
"query": movie, "query": movie,
"page": 1, "page": 1,
"include_adult": False "include_adult": False,
} }
if year: if year:
tmdb_params["primary_release_year"] = year tmdb_params["primary_release_year"] = year
tmdb_search = requests.get(f"{tmdb_url}/search/movie", params=tmdb_params, tmdb_search = requests.get(
headers=tmdb_headers) f"{tmdb_url}/search/movie", params=tmdb_params, headers=tmdb_headers
)
if tmdb_search.status_code == 401: if tmdb_search.status_code == 401:
return "401", "401", "401", "401" return "401", "401", "401", "401"
@@ -29,18 +28,19 @@ def tmdb_lookup(tmdb_url, tmdb_headers, movie, year=None):
if not tmdb_search["results"]: if not tmdb_search["results"]:
return "404", "404", "404", "404" return "404", "404", "404", "404"
movie_id = tmdb_search['results'][0]['id'] movie_id = tmdb_search["results"][0]["id"]
movie_title = tmdb_search['results'][0]['title'] movie_title = tmdb_search["results"][0]["title"]
movie_release_check = tmdb_search['results'][0]['release_date'] movie_release_check = tmdb_search["results"][0]["release_date"]
if movie_release_check: if movie_release_check:
movie_release = datetime.strptime( movie_release = datetime.strptime(
tmdb_search['results'][0]['release_date'], "%Y-%m-%d") tmdb_search["results"][0]["release_date"], "%Y-%m-%d"
)
movie_year = movie_release.year movie_year = movie_release.year
else: else:
movie_year = "???" movie_year = "???"
movie_rating = tmdb_search['results'][0]['vote_average'] movie_rating = tmdb_search["results"][0]["vote_average"]
return movie_id, movie_title, movie_year, movie_rating return movie_id, movie_title, movie_year, movie_rating
@@ -49,11 +49,10 @@ def sa_lookup(sa_url, sa_headers, movie_id, country):
sa_params = { sa_params = {
"country": country, "country": country,
"tmdb_id": f"movie/{movie_id}", "tmdb_id": f"movie/{movie_id}",
"output_language": "en" "output_language": "en",
} }
sa_request = requests.request("GET", sa_url, headers=sa_headers, sa_request = requests.request("GET", sa_url, headers=sa_headers, params=sa_params)
params=sa_params)
if sa_request.status_code == 401: if sa_request.status_code == 401:
sa_response = "401" sa_response = "401"
@@ -78,11 +77,11 @@ def services_speller(service):
elif service == "netflix": elif service == "netflix":
service_proper = "Netflix" service_proper = "Netflix"
elif service == "disney": elif service == "disney":
service_proper = "Disney\+" service_proper = "Disney\\+"
elif service == "apple": elif service == "apple":
service_proper = "Apple TV\+" service_proper = "Apple TV\\+"
elif service == "paramount": elif service == "paramount":
service_proper = "Paramount\+" service_proper = "Paramount\\+"
elif service == "starz": elif service == "starz":
service_proper = "STARZ" service_proper = "STARZ"
elif service == "showtime": elif service == "showtime":
@@ -95,11 +94,11 @@ def services_speller(service):
def char_cleanup(variable): def char_cleanup(variable):
variable = str(variable).replace('-', '\-') variable = str(variable).replace("-", "\\-")
variable = str(variable).replace('(', '\(') variable = str(variable).replace("(", "\\(")
variable = str(variable).replace(')', '\)') variable = str(variable).replace(")", "\\)")
variable = str(variable).replace('+', '\+') variable = str(variable).replace("+", "\\+")
variable = str(variable).replace('.', '\.') variable = str(variable).replace(".", "\\.")
variable = str(variable).replace('!', '\!') variable = str(variable).replace("!", "\\!")
return variable return variable

View File

@@ -1,17 +1,18 @@
#!/usr/bin/python3 import difflib
from telegram.ext import (
Updater,
CommandHandler,
CallbackContext,
MessageHandler,
Filters)
import logging import logging
from telegram import Update, ParseMode
import os import os
from datetime import datetime from datetime import datetime
import movie_check import movie_check
import difflib from telegram import Update
from telegram.constants import ParseMode
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
tmdb_api_token = os.environ.get("TMDB_API_TOKEN") tmdb_api_token = os.environ.get("TMDB_API_TOKEN")
sa_api_token = os.environ.get("SA_API_TOKEN") sa_api_token = os.environ.get("SA_API_TOKEN")
@@ -23,91 +24,93 @@ logging_debug = os.environ.get("TG_DEBUG")
tmdb_url = "https://api.themoviedb.org/3" tmdb_url = "https://api.themoviedb.org/3"
tmdb_headers = { tmdb_headers = {
'Authorization': f'Bearer {tmdb_api_token}', "Authorization": f"Bearer {tmdb_api_token}",
'Content-Type': 'application/json;charset=utf-8', "Content-Type": "application/json;charset=utf-8",
'Accept': 'application/json;charset=utf-8' "Accept": "application/json;charset=utf-8",
} }
sa_url = "https://streaming-availability.p.rapidapi.com/get/basic" sa_url = "https://streaming-availability.p.rapidapi.com/get/basic"
sa_headers = { sa_headers = {
'x-rapidapi-host': "streaming-availability.p.rapidapi.com", "x-rapidapi-host": "streaming-availability.p.rapidapi.com",
'x-rapidapi-key': sa_api_token "x-rapidapi-key": sa_api_token,
} }
updater = Updater(token=bot_token, use_context=True)
dispatcher = updater.dispatcher
if logging_debug and logging_debug == "True": if logging_debug and logging_debug == "True":
logging.basicConfig( logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.DEBUG) level=logging.DEBUG,
)
else: else:
logging.basicConfig( logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO) level=logging.INFO,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def shutdown(): async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
updater.stop() user_firstname = update.message.from_user["first_name"]
updater.is_idle = False user_id = update.message.from_user["id"]
username = update.message.from_user["username"] or "empty"
logger.info(f"Session initiated by user: {user_firstname} ({username}, {user_id})")
def start(update: Update, context: CallbackContext): await context.bot.send_message(
user_firstname = update.message.from_user['first_name'] chat_id=update.effective_chat.id,
user_id = update.message.from_user['id'] text="I'm a movie streaming bot! Type in a "
username = update.message.from_user['username'] or 'empty' + "movie and I'll tell you where to stream it.",
logger.info(f'Session initiated by user: {user_firstname} ({username}, {user_id})') )
movie_handler = MessageHandler(Filters.text & (~Filters.command),
input_movie)
dispatcher.add_handler(movie_handler)
context.bot.send_message(chat_id=update.effective_chat.id,
text="I'm a movie streaming bot! Type in a " +
"movie and I'll tell you where to stream it.")
def movie_lookup(movie, user_firstname): def movie_lookup(movie, user_firstname):
if "-Year" in movie: if "-Year" in movie:
year = movie.split("-Year")[1].strip() year = movie.split("-Year")[1].strip()
movie = movie.split("-Year")[0].strip() movie = movie.split("-Year")[0].strip()
logger.info(f'{user_firstname}: Looking up movie: "{movie}" ({year})') logger.info(f'{user_firstname}: Looking up movie: "{movie}" ({year})')
movie_id, movie_title, movie_year, movie_rating = ( movie_id, movie_title, movie_year, movie_rating = movie_check.tmdb_lookup(
movie_check.tmdb_lookup(tmdb_url, tmdb_headers, movie, year)) tmdb_url, tmdb_headers, movie, year
)
else: else:
logger.info(f'{user_firstname}: Looking up movie: "{movie}"') logger.info(f'{user_firstname}: Looking up movie: "{movie}"')
movie_id, movie_title, movie_year, movie_rating = ( movie_id, movie_title, movie_year, movie_rating = movie_check.tmdb_lookup(
movie_check.tmdb_lookup(tmdb_url, tmdb_headers, movie)) tmdb_url, tmdb_headers, movie
)
tmdb_page = "https://themoviedb.org/movie/" tmdb_page = "https://themoviedb.org/movie/"
if movie_id == "404": if movie_id == "404":
tg_reply = (f"{user_firstname}: I'm having trouble finding that movie\. " + tg_reply = (
"Check your spelling and try again\.") f"{user_firstname}: I'm having trouble finding that movie\\. "
+ "Check your spelling and try again\\."
)
logger.warning(f'{user_firstname}: Movie "{movie}" not found in TMDB.') logger.warning(f'{user_firstname}: Movie "{movie}" not found in TMDB.')
similarity = 0 similarity = 0
error_response = False error_response = False
return tg_reply, similarity, error_response return tg_reply, similarity, error_response
if movie_id == "401": if movie_id == "401":
tg_reply = ("Invalid TMDB API token\. " + tg_reply = (
"Bot shutting down until restarted\.\.\.") "Invalid TMDB API token\\. " + "Bot shutting down until restarted\\.\\.\\."
logger.error('Invalid TMDB API token. Exiting...') )
logger.error("Invalid TMDB API token. Exiting...")
similarity = 0 similarity = 0
error_response = True error_response = True
return tg_reply, similarity, error_response return tg_reply, similarity, error_response
sa_response, services = movie_check.sa_lookup(sa_url, sa_headers, movie_id, country) sa_response, services = movie_check.sa_lookup(sa_url, sa_headers, movie_id, country)
if sa_response == "404": if sa_response == "404":
logger.warning(f'{user_firstname}: Movie "{movie}" not found by the Streaming Availability API.') logger.warning(
f'{user_firstname}: Movie "{movie}" not found by the Streaming Availability API.'
)
if sa_response == "401": if sa_response == "401":
tg_reply = ("Invalid Streaming Availability API token\. " + tg_reply = (
"Bot shutting down until restarted\.\.\.") "Invalid Streaming Availability API token\\. "
logger.error(f'{user_firstname}: Invalid Streaming Availability API token. Exiting...') + "Bot shutting down until restarted\\.\\.\\."
)
logger.error(
f"{user_firstname}: Invalid Streaming Availability API token. Exiting..."
)
similarity = 0 similarity = 0
error_response = True error_response = True
return tg_reply, similarity, error_response return tg_reply, similarity, error_response
@@ -115,24 +118,29 @@ def movie_lookup(movie, user_firstname):
similarity = difflib.SequenceMatcher(None, movie, movie_title).ratio() similarity = difflib.SequenceMatcher(None, movie, movie_title).ratio()
sim_percent = "{0:.0f}%".format(similarity * 100) sim_percent = "{0:.0f}%".format(similarity * 100)
logger.info(f'{user_firstname}: Result was a {sim_percent} match.') logger.info(f"{user_firstname}: Result was a {sim_percent} match.")
movie_title = movie_check.char_cleanup(movie_title) movie_title = movie_check.char_cleanup(movie_title)
movie_year = movie_check.char_cleanup(movie_year) movie_year = movie_check.char_cleanup(movie_year)
movie_rating = movie_check.char_cleanup(movie_rating) movie_rating = movie_check.char_cleanup(movie_rating)
tg_reply = (f"{movie_title} \({movie_year}\)\nRating: {movie_rating}" + tg_reply = (
f"\n[TMDB]({tmdb_page}{movie_id})") f"{movie_title} \\({movie_year}\\)\nRating: {movie_rating}"
+ f"\n[TMDB]({tmdb_page}{movie_id})"
)
logger.info(f'{user_firstname}: Returning movie: "{movie_title}: ({movie_year})"') logger.info(f'{user_firstname}: Returning movie: "{movie_title}: ({movie_year})"')
if not services or sa_response == "404": if not services or sa_response == "404":
tg_reply = tg_reply + "\n\nStreaming not available :\(" tg_reply = tg_reply + "\n\nStreaming not available :\\("
logger.info(f'{user_firstname}: No streaming available for "{movie_title}: ({movie_year})"') logger.info(
f'{user_firstname}: No streaming available for "{movie_title}: ({movie_year})"'
)
else: else:
for s in services: for s in services:
leaving_epoch = sa_response["streamingInfo"][s]["us"]["leaving"] leaving_epoch = sa_response["streamingInfo"][s]["us"]["leaving"]
leaving_date = datetime.fromtimestamp( leaving_date = datetime.fromtimestamp(int(leaving_epoch)).strftime(
int(leaving_epoch)).strftime('%Y\-%m\-%d') "%Y\\-%m\\-%d"
)
link = sa_response["streamingInfo"][s]["us"]["link"] link = sa_response["streamingInfo"][s]["us"]["link"]
s_pretty = movie_check.services_speller(s) s_pretty = movie_check.services_speller(s)
@@ -147,29 +155,43 @@ def movie_lookup(movie, user_firstname):
return tg_reply, similarity, error_response return tg_reply, similarity, error_response
def input_movie(update: Update, context: CallbackContext): async def input_movie(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_firstname = update.message.from_user['first_name'] user_firstname = update.message.from_user["first_name"]
movie = update.message.text.title() movie = update.message.text.title()
movie_info, similarity, error_response = movie_lookup(movie, user_firstname) movie_info, similarity, error_response = movie_lookup(movie, user_firstname)
context.bot.send_message(chat_id=update.effective_chat.id, await context.bot.send_message(
text=movie_info, parse_mode=ParseMode.MARKDOWN_V2) chat_id=update.effective_chat.id,
text=movie_info,
parse_mode=ParseMode.MARKDOWN_V2,
)
if error_response: if error_response:
shutdown() # In v20+, we can't easily stop the application from within a handler
if similarity < .80 and similarity != 0: # The application will need to be stopped externally or we raise an exception
logger.info(f"{user_firstname}: Result accuracy was below the threshold. Sending follow-up message.") logger.error("Critical error occurred. Application should be restarted.")
followup_msg = ("Not the movie you're looking for? " + return
"Try adding '\-year' followed by the release year after the title\.") if similarity < 0.80 and similarity != 0:
context.bot.send_message(chat_id=update.effective_chat.id, logger.info(
text=followup_msg, parse_mode=ParseMode.MARKDOWN_V2) f"{user_firstname}: Result accuracy was below the threshold. Sending follow-up message."
)
followup_msg = (
"Not the movie you're looking for? "
+ "Try adding '\\-year' followed by the release year after the title\\."
)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=followup_msg,
parse_mode=ParseMode.MARKDOWN_V2,
)
def unknown(update: Update, context: CallbackContext): async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
context.bot.send_message(chat_id=update.effective_chat.id, await context.bot.send_message(
text="Sorry, I didn't understand that command.") chat_id=update.effective_chat.id,
text="Sorry, I didn't understand that command.",
)
def main(): def main():
if not tmdb_api_token: if not tmdb_api_token:
logger.error("ERROR: TMDB API token not provided. Exiting...") logger.error("ERROR: TMDB API token not provided. Exiting...")
exit() exit()
@@ -182,21 +204,30 @@ def main():
logger.error("ERROR: Telegram bot token not provided. Exiting...") logger.error("ERROR: Telegram bot token not provided. Exiting...")
exit() exit()
# Build the Application
application = Application.builder().token(bot_token).build()
# Add start handler with optional user filter
if filter_user: if filter_user:
start_handler = CommandHandler('start', start, start_handler = CommandHandler(
Filters.user(username=filter_user)) "start", start, filters.User(username=filter_user)
)
else: else:
start_handler = CommandHandler('start', start) start_handler = CommandHandler("start", start)
dispatcher.add_handler(start_handler) application.add_handler(start_handler)
unknown_handler = MessageHandler(Filters.command, unknown) # Add movie input handler (text messages that are not commands)
movie_handler = MessageHandler(filters.TEXT & (~filters.COMMAND), input_movie)
application.add_handler(movie_handler)
dispatcher.add_handler(unknown_handler) # Add unknown command handler
unknown_handler = MessageHandler(filters.COMMAND, unknown)
application.add_handler(unknown_handler)
updater.start_polling() # Start the bot
application.run_polling()
if __name__ == '__main__': if __name__ == "__main__":
main() main()