166 lines
5.3 KiB
Python
166 lines
5.3 KiB
Python
from tgtg import TgtgClient
|
|
from tgtg.exceptions import TgtgAPIError
|
|
import os
|
|
import json
|
|
import telegram
|
|
import asyncio
|
|
import time
|
|
import datetime
|
|
import requests
|
|
import random
|
|
|
|
initial_waiting_time = 60
|
|
waiting_time = initial_waiting_time
|
|
waiting_time_limit = 60 * 60 * 24
|
|
waiting_time_increase = 2
|
|
|
|
randomness = 0.1
|
|
|
|
interval = 60
|
|
|
|
client = None
|
|
telegram_bot = None
|
|
removal_notification = os.environ.get('REMOVAL_NOTIFICATION')
|
|
|
|
if removal_notification is None:
|
|
removal_notification = False
|
|
|
|
|
|
def parse_duration(seconds):
|
|
seconds = int(seconds)
|
|
minutes = seconds // 60
|
|
seconds = seconds % 60
|
|
hours = minutes // 60
|
|
minutes = minutes % 60
|
|
|
|
if hours > 0:
|
|
return f'{hours}h {minutes}m {seconds}s'
|
|
elif minutes > 0:
|
|
return f'{minutes}m {seconds}s'
|
|
else:
|
|
return f'{seconds}s'
|
|
|
|
|
|
def check_env():
|
|
if os.environ.get('TGTG_EMAIL') is None:
|
|
return False
|
|
if os.environ.get('TELEGRAM') is None and os.environ.get('MATRIX') is None:
|
|
return False
|
|
if os.environ.get('TELEGRAM') is not None and (os.environ.get('TELEGRAM_TOKEN') is None or os.environ.get('TELEGRAM_ID') is None):
|
|
return False
|
|
if os.environ.get('MATRIX') is not None and os.environ.get('MATRIX_URL') is None:
|
|
return False
|
|
return True
|
|
|
|
|
|
def load_creds():
|
|
global client, telegram_bot
|
|
|
|
if not os.path.exists('/data/token'):
|
|
client = TgtgClient(email=os.environ.get('TGTG_EMAIL'))
|
|
print('Waiting for credentials ...')
|
|
credentials = client.get_credentials()
|
|
with open('/data/token', 'w') as file:
|
|
file.write(str(credentials))
|
|
print('Credentials stored in file')
|
|
else:
|
|
with open('/data/token', 'r') as file:
|
|
credentials = json.loads(file.read().replace('\'', '"'))
|
|
print('Credentials loaded from file')
|
|
|
|
client = TgtgClient(**credentials)
|
|
if os.environ.get('TELEGRAM') is not None:
|
|
telegram_bot = telegram.Bot(os.environ['TELEGRAM_TOKEN'])
|
|
|
|
|
|
async def send_message(text):
|
|
if os.environ.get('TELEGRAM') is not None:
|
|
async with telegram_bot:
|
|
await telegram_bot.send_message(chat_id=os.environ.get('TELEGRAM_ID'), text='\n'.join(['Too good to Go'] + text))
|
|
if os.environ.get('MATRIX') is not None:
|
|
dic = {
|
|
'title': 'Too Good to Go',
|
|
'list': text,
|
|
}
|
|
|
|
response = requests.post(
|
|
url=os.environ.get('MATRIX_URL'),
|
|
headers={
|
|
'Content-Type': 'application/json',
|
|
},
|
|
auth=(os.environ.get('MATRIX_BASIC_AUTH_USER'), os.environ.get('MATRIX_BASIC_AUTH_PASS')),
|
|
json=dic
|
|
)
|
|
|
|
|
|
async def main():
|
|
await send_message(['tg² telegram_bot is watching!'])
|
|
|
|
last = []
|
|
|
|
while True:
|
|
try:
|
|
items = client.get_items()
|
|
|
|
texts = []
|
|
|
|
next = []
|
|
|
|
for item in items:
|
|
if item['items_available'] > 0:
|
|
next.append(item["item"]["item_id"])
|
|
if item["item"]["item_id"] not in last:
|
|
amount = item["items_available"]
|
|
item_name = item["item"]["name"]
|
|
price = item["item"]["price_including_taxes"]["minor_units"]/(10**item["item"]["price_including_taxes"]["decimals"])
|
|
store_name = item["store"]["store_name"]
|
|
store_branch = item["store"]["branch"]
|
|
|
|
name = ', '.join(filter(bool, [item_name, store_name, store_branch]))
|
|
|
|
if not name:
|
|
name = "Panier anti-gaspi"
|
|
|
|
texts.append(f'{amount} x "{name}" ({price:.2f}€)')
|
|
elif removal_notification and item["item"]["item_id"] in last:
|
|
amount = item["items_available"]
|
|
name = item["item"]["name"]
|
|
price = item["item"]["price_including_taxes"]["minor_units"]/(10**item["item"]["price_including_taxes"]["decimals"])
|
|
store_name = item["store"]["store_name"]
|
|
store_branch = item["store"]["branch"]
|
|
|
|
name = ', '.join(filter(bool, [item_name, store_name, store_branch]))
|
|
|
|
if not name:
|
|
name = "Panier anti-gaspi"
|
|
|
|
texts.append(f'no more "{name}"')
|
|
|
|
|
|
if len(texts) > 1:
|
|
|
|
print(f'\n{datetime.datetime.now()}: {len(texts)-1} new items available')
|
|
|
|
await send_message(texts)
|
|
else:
|
|
print('-', end='', flush=True)
|
|
|
|
last = next
|
|
time.sleep(interval + randomness * interval * (2 * random.random() - 1))
|
|
|
|
waiting_time = initial_waiting_time
|
|
|
|
except TgtgAPIError as e:
|
|
print(e)
|
|
real_waiting_time = round(waiting_time + randomness * waiting_time * (2 * random.random() - 1))
|
|
await send_message([f'tg² failed to fetch data, retrying in {parse_duration(real_waiting_time)}'])
|
|
time.sleep(real_waiting_time)
|
|
|
|
if waiting_time < waiting_time_limit:
|
|
waiting_time *= waiting_time_increase
|
|
|
|
if __name__ == '__main__':
|
|
print('Check environ:', check_env())
|
|
load_creds()
|
|
asyncio.run(main())
|