tg2/main.py

216 lines
6.9 KiB
Python

import tgtg
from tgtg import TgtgClient
from tgtg.exceptions import TgtgAPIError
import os
import json
import telegram
import asyncio
import time
import datetime
import requests
import random
###################################
# UTILS #
###################################
def get_env(name, default=None, mandatory=False):
"""Get an environment variable, with a default value and type casting."""
content = os.environ.get(name, default)
if mandatory and content is None:
raise ValueError(f'Missing environment variable {name}')
if type(default) == str:
return content.lower() in ('true', '1')
if type(default) == bool:
return content
if type(default) == int:
return int(content)
if type(default) == float:
return float(content)
return content
def parse_duration(seconds):
"""Parse a duration in seconds to a human readable format."""
seconds = int(seconds)
minutes = seconds // 60
seconds = seconds % 60
hours = minutes // 60
minutes = minutes % 60
if hours > 0:
return f'{hours}h {minutes}m {seconds}s'
elif minutes > 0:
return f'{minutes}m {seconds}s'
else:
return f'{seconds}s'
def apply_randomness(value, randomness):
"""Apply randomness to a value."""
return round(value + randomness * value * (2 * random.random() - 1))
####################################
# CONFIG #
####################################
TGTG_EMAIL = get_env('TGTG_EMAIL', mandatory=True)
TELEGRAM_NOTIFICATIONS = get_env('TELEGRAM_NOTIFICATIONS', False)
TELEGRAM_TOKEN = get_env('TELEGRAM_TOKEN', mandatory=TELEGRAM_NOTIFICATIONS)
TELEGRAM_ID = get_env('TELEGRAM_ID', mandatory=TELEGRAM_NOTIFICATIONS)
MATRIX_NOTIFICATIONS = get_env('MATRIX_NOTIFICATIONS', False)
MATRIX_URL = get_env('MATRIX_URL', mandatory=MATRIX_NOTIFICATIONS)
MATRIX_BASIC_AUTH_USER = get_env('MATRIX_BASIC_AUTH_USER', mandatory=MATRIX_NOTIFICATIONS)
MATRIX_BASIC_AUTH_PASS = get_env('MATRIX_BASIC_AUTH_PASS', mandatory=MATRIX_NOTIFICATIONS)
INITIAL_WAITING_TIME = get_env('INITIAL_WAITING_TIME', 60)
WAITING_TIME_LIMIT = get_env('WAITING_TIME_LIMIT', 60 * 60 * 24)
WAITING_TIME_INCREASE = get_env('WAITING_TIME_FACTOR', 2)
RANDOMNESS = get_env('RANDOMNESS', 0.1)
INTERVAL = get_env('INTERVAL', 60)
REMOVAL_NOTIFICATIONS = get_env('REMOVAL_NOTIFICATIONS', False)
# Reduce frequency of polling to avoid rate limiting
tgtg.POLLING_WAIT_TIME = get_env('LOGIN_POLLING_WAIT_TIME', 30)
tgtg.MAX_POLLING_TRIES = get_env('LOGIN_MAX_POLLING_TRIES', 10)
TOKEN_PATH = get_env('TOKEN_PATH', '/data/token')
####################################
# MAIN #
####################################
waiting_time = INITIAL_WAITING_TIME
tgtgClient = None
telegram_bot = telegram.Bot(TELEGRAM_TOKEN) if TELEGRAM_NOTIFICATIONS else None
async def send_message(text):
if TELEGRAM_NOTIFICATIONS:
async with telegram_bot:
await telegram_bot.send_message(
chat_id=TELEGRAM_ID,
text='\n'.join(['Too good to Go'] + text)
)
if MATRIX_NOTIFICATIONS:
requests.post(
url=get_env('MATRIX_URL'),
headers={
'Content-Type': 'application/json',
},
auth=(MATRIX_BASIC_AUTH_USER, MATRIX_BASIC_AUTH_PASS),
json={
'title': 'Too Good to Go',
'list': text,
}
)
def catch_api_error(e, message):
global waiting_time
print('ERROR: ', e)
real_waiting_time = apply_randomness(waiting_time, RANDOMNESS)
print(f'{message}, retrying in {parse_duration(real_waiting_time)}')
time.sleep(real_waiting_time)
waiting_time = min(waiting_time * WAITING_TIME_INCREASE, WAITING_TIME_LIMIT)
async def get_credentials():
while True:
try:
await send_message(['Open the link to login to tgtg'])
return tgtgClient.get_credentials()
except TgtgAPIError as e:
catch_api_error(e, 'tg² failed to get credentials')
async def load_creds():
global tgtgClient, telegram_bot
if not os.path.exists(TOKEN_PATH):
tgtgClient = TgtgClient(email=TGTG_EMAIL)
print('Waiting for credentials ...')
credentials = await get_credentials()
with open(TOKEN_PATH, 'w') as file:
file.write(str(credentials))
print('Credentials stored in file')
else:
with open(TOKEN_PATH, 'r') as file:
credentials = json.loads(file.read().replace('\'', '"'))
print('Credentials loaded from file')
tgtgClient = TgtgClient(**credentials)
async def main():
await load_creds()
await send_message(['tg² telegram_bot is watching!'])
last = []
while True:
try:
items = tgtgClient.get_items()
texts = []
next = []
for item in items:
if item['items_available'] > 0:
next.append(item["item"]["item_id"])
if item["item"]["item_id"] not in last:
amount = item["items_available"]
item_name = item["item"]["name"]
price = item["item"]["price_including_taxes"]["minor_units"]/(10**item["item"]["price_including_taxes"]["decimals"])
store_name = item["store"]["store_name"]
store_branch = item["store"]["branch"]
name = ', '.join(filter(bool, [item_name, store_name, store_branch]))
if not name:
name = "Panier anti-gaspi"
texts.append(f'{amount} x "{name}" ({price:.2f}€)')
elif REMOVAL_NOTIFICATIONS and item["item"]["item_id"] in last:
amount = item["items_available"]
name = item["item"]["name"]
price = item["item"]["price_including_taxes"]["minor_units"]/(10**item["item"]["price_including_taxes"]["decimals"])
store_name = item["store"]["store_name"]
store_branch = item["store"]["branch"]
name = ', '.join(filter(bool, [item_name, store_name, store_branch]))
if not name:
name = "Panier anti-gaspi"
texts.append(f'no more "{name}"')
if len(texts) > 1:
print(f'\n{datetime.datetime.now()}: {len(texts)-1} new items available')
await send_message(texts)
else:
print('-', end='', flush=True)
last = next
real_interval = INTERVAL + RANDOMNESS * INTERVAL * (2 * random.random() - 1)
time.sleep(INTERVAL)
waiting_time = INITIAL_WAITING_TIME
except TgtgAPIError as e:
catch_api_error(e, 'tg² failed to get items')
if __name__ == '__main__':
asyncio.run(main())