218 lines
6.9 KiB
Python
218 lines
6.9 KiB
Python
import tgtg
|
|
from tgtg import TgtgClient
|
|
from tgtg.exceptions import TgtgAPIError
|
|
import os
|
|
import json
|
|
import telegram
|
|
import asyncio
|
|
import time
|
|
import datetime
|
|
import requests
|
|
import random
|
|
|
|
###################################
|
|
# UTILS #
|
|
###################################
|
|
|
|
def get_env(name, default=None, mandatory=False):
|
|
"""Get an environment variable, with a default value and type casting."""
|
|
content = os.environ.get(name, default)
|
|
|
|
if mandatory and content is None:
|
|
raise ValueError(f'Missing environment variable {name}')
|
|
|
|
if type(default) == bool:
|
|
return content.lower() in ('true', '1')
|
|
if type(default) == int:
|
|
return int(content)
|
|
if type(default) == float:
|
|
return float(content)
|
|
return content
|
|
|
|
def parse_duration(seconds):
|
|
"""Parse a duration in seconds to a human readable format."""
|
|
seconds = int(seconds)
|
|
minutes = seconds // 60
|
|
seconds = seconds % 60
|
|
hours = minutes // 60
|
|
minutes = minutes % 60
|
|
|
|
if hours > 0:
|
|
return f'{hours}h {minutes}m {seconds}s'
|
|
elif minutes > 0:
|
|
return f'{minutes}m {seconds}s'
|
|
else:
|
|
return f'{seconds}s'
|
|
|
|
def apply_randomness(value, randomness):
|
|
"""Apply randomness to a value."""
|
|
return round(value + randomness * value * (2 * random.random() - 1))
|
|
|
|
####################################
|
|
# CONFIG #
|
|
####################################
|
|
|
|
TGTG_EMAIL = get_env('TGTG_EMAIL', mandatory=True)
|
|
TELEGRAM_NOTIFICATIONS = get_env('TELEGRAM_NOTIFICATIONS', False)
|
|
TELEGRAM_TOKEN = get_env('TELEGRAM_TOKEN', mandatory=TELEGRAM_NOTIFICATIONS)
|
|
TELEGRAM_ID = get_env('TELEGRAM_ID', mandatory=TELEGRAM_NOTIFICATIONS)
|
|
MATRIX_NOTIFICATIONS = get_env('MATRIX_NOTIFICATIONS', False)
|
|
MATRIX_URL = get_env('MATRIX_URL', mandatory=MATRIX_NOTIFICATIONS)
|
|
MATRIX_BASIC_AUTH_USER = get_env('MATRIX_BASIC_AUTH_USER', mandatory=MATRIX_NOTIFICATIONS)
|
|
MATRIX_BASIC_AUTH_PASS = get_env('MATRIX_BASIC_AUTH_PASS', mandatory=MATRIX_NOTIFICATIONS)
|
|
|
|
INITIAL_WAITING_TIME = get_env('INITIAL_WAITING_TIME', 60)
|
|
WAITING_TIME_LIMIT = get_env('WAITING_TIME_LIMIT', 60 * 60 * 24)
|
|
WAITING_TIME_INCREASE = get_env('WAITING_TIME_FACTOR', 2)
|
|
RANDOMNESS = get_env('RANDOMNESS', 0.1)
|
|
INTERVAL = get_env('INTERVAL', 60)
|
|
|
|
REMOVAL_NOTIFICATIONS = get_env('REMOVAL_NOTIFICATIONS', False)
|
|
|
|
# Reduce frequency of polling to avoid rate limiting
|
|
tgtg.POLLING_WAIT_TIME = get_env('LOGIN_POLLING_WAIT_TIME', 30)
|
|
tgtg.MAX_POLLING_TRIES = get_env('LOGIN_MAX_POLLING_TRIES', 10)
|
|
|
|
TOKEN_PATH = get_env('TOKEN_PATH', '/data/token')
|
|
|
|
####################################
|
|
# MAIN #
|
|
####################################
|
|
|
|
waiting_time = INITIAL_WAITING_TIME
|
|
|
|
tgtgClient = None
|
|
telegram_bot = telegram.Bot(TELEGRAM_TOKEN) if TELEGRAM_NOTIFICATIONS else None
|
|
|
|
|
|
async def send_message(text):
|
|
if TELEGRAM_NOTIFICATIONS:
|
|
async with telegram_bot:
|
|
await telegram_bot.send_message(
|
|
chat_id=TELEGRAM_ID,
|
|
text='\n'.join(['Too good to Go'] + text)
|
|
)
|
|
|
|
if MATRIX_NOTIFICATIONS:
|
|
requests.post(
|
|
url=get_env('MATRIX_URL'),
|
|
headers={
|
|
'Content-Type': 'application/json',
|
|
},
|
|
auth=(MATRIX_BASIC_AUTH_USER, MATRIX_BASIC_AUTH_PASS),
|
|
json={
|
|
'title': 'Too Good to Go',
|
|
'list': text,
|
|
}
|
|
)
|
|
|
|
|
|
def catch_api_error(e, message):
|
|
global waiting_time
|
|
|
|
print('ERROR: ', e)
|
|
|
|
real_waiting_time = apply_randomness(waiting_time, RANDOMNESS)
|
|
print(f'{message}, retrying in {parse_duration(real_waiting_time)}')
|
|
|
|
time.sleep(real_waiting_time)
|
|
|
|
waiting_time = min(waiting_time * WAITING_TIME_INCREASE, WAITING_TIME_LIMIT)
|
|
|
|
|
|
def get_credentials():
|
|
while True:
|
|
try:
|
|
await send_message(['Open the link to login to tgtg'])
|
|
return tgtgClient.get_credentials()
|
|
except TgtgAPIError as e:
|
|
catch_api_error(e, 'tg² failed to get credentials')
|
|
|
|
|
|
def load_creds():
|
|
global tgtgClient, telegram_bot
|
|
|
|
if not os.path.exists(TOKEN_PATH):
|
|
tgtgClient = TgtgClient(email=TGTG_EMAIL)
|
|
print('Waiting for credentials ...')
|
|
credentials = get_credentials()
|
|
with open(TOKEN_PATH, 'w') as file:
|
|
file.write(str(credentials))
|
|
print('Credentials stored in file')
|
|
else:
|
|
with open(TOKEN_PATH, 'r') as file:
|
|
credentials = json.loads(file.read().replace('\'', '"'))
|
|
print('Credentials loaded from file')
|
|
|
|
tgtgClient = TgtgClient(**credentials)
|
|
|
|
|
|
async def main():
|
|
await send_message(['tg² telegram_bot is watching!'])
|
|
|
|
last = []
|
|
|
|
while True:
|
|
try:
|
|
items = tgtgClient.get_items()
|
|
|
|
texts = []
|
|
|
|
next = []
|
|
|
|
for item in items:
|
|
if item['items_available'] > 0:
|
|
next.append(item["item"]["item_id"])
|
|
if item["item"]["item_id"] not in last:
|
|
amount = item["items_available"]
|
|
item_name = item["item"]["name"]
|
|
price = item["item"]["price_including_taxes"]["minor_units"]/(10**item["item"]["price_including_taxes"]["decimals"])
|
|
store_name = item["store"]["store_name"]
|
|
store_branch = item["store"]["branch"]
|
|
|
|
name = ', '.join(filter(bool, [item_name, store_name, store_branch]))
|
|
|
|
if not name:
|
|
name = "Panier anti-gaspi"
|
|
|
|
texts.append(f'{amount} x "{name}" ({price:.2f}€)')
|
|
elif REMOVAL_NOTIFICATIONS and item["item"]["item_id"] in last:
|
|
amount = item["items_available"]
|
|
name = item["item"]["name"]
|
|
price = item["item"]["price_including_taxes"]["minor_units"]/(10**item["item"]["price_including_taxes"]["decimals"])
|
|
store_name = item["store"]["store_name"]
|
|
store_branch = item["store"]["branch"]
|
|
|
|
name = ', '.join(filter(bool, [item_name, store_name, store_branch]))
|
|
|
|
if not name:
|
|
name = "Panier anti-gaspi"
|
|
|
|
texts.append(f'no more "{name}"')
|
|
|
|
|
|
if len(texts) > 1:
|
|
|
|
print(f'\n{datetime.datetime.now()}: {len(texts)-1} new items available')
|
|
|
|
await send_message(texts)
|
|
else:
|
|
print('-', end='', flush=True)
|
|
|
|
last = next
|
|
real_interval = INTERVAL + RANDOMNESS * INTERVAL * (2 * random.random() - 1)
|
|
time.sleep(INTERVAL)
|
|
|
|
waiting_time = INITIAL_WAITING_TIME
|
|
|
|
except TgtgAPIError as e:
|
|
catch_api_error(e, 'tg² failed to get items')
|
|
|
|
if __name__ == '__main__':
|
|
if not check_env():
|
|
print('Missing environment variables')
|
|
exit(1)
|
|
|
|
load_creds()
|
|
asyncio.run(main())
|