Spaces:
Runtime error
Runtime error
| import datetime | |
| import requests | |
| class Trigger: | |
| def __init__(self, trigger_tags, comparison_tags, time_definition, event_name, included=True): | |
| self.trigger_tags = set(trigger_tags) | |
| self.comparison_tags = set(comparison_tags) | |
| self.time_definition = time_definition | |
| self.event_name = event_name | |
| self.included = included | |
| self.threshold = 0 | |
| self.actions = [] | |
| self.sources = [] | |
| def add_action(self, action): | |
| self.actions.append(action) | |
| def remove_action(self, action): | |
| if action in self.actions: | |
| self.actions.remove(action) | |
| else: | |
| print("Action not found") | |
| def add_source(self, source): | |
| self.sources.append(source) | |
| def remove_source(self, source): | |
| if source in self.sources: | |
| self.sources.remove(source) | |
| else: | |
| print("Source not found") | |
| def check_trigger(self, current_tags, current_time): | |
| if self.included: | |
| if current_time in self.time_definition and self.trigger_tags.issubset(current_tags): | |
| self.threshold += 1 | |
| else: | |
| self.threshold = 0 | |
| else: | |
| if current_time in self.time_definition and not self.trigger_tags.intersection(current_tags): | |
| self.threshold += 1 | |
| else: | |
| self.threshold = 0 | |
| if self.threshold >= len(self.time_definition): | |
| self.fire_actions() | |
| self.make_requests() | |
| def fire_actions(self): | |
| for action in self.actions: | |
| action(self.event_name) | |
| def make_requests(self): | |
| for source in self.sources: | |
| try: | |
| response = requests.get(source) | |
| # Procesar la respuesta aqu铆 si es necesario | |
| print(f"Request made to {source}. Status code: {response.status_code}") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error making request to {source}: {e}") | |
| # Ejemplo de uso: | |
| def action_function(event_name): | |
| print(f"Trigger fired for event: {event_name}") | |
| if __name__ == "__main__": | |
| # Definici贸n de un trigger | |
| trigger = Trigger(["tag1", "tag2"], ["tag3", "tag4"], [datetime.time(10, 0), datetime.time(15, 0)], "Event1") | |
| # A帽adir una acci贸n al trigger | |
| trigger.add_action(action_function) | |
| # A帽adir una fuente al trigger | |
| trigger.add_source("https://example.com/api/data") | |
| # Simular la comprobaci贸n peri贸dica del trigger (aqu铆 se usar铆a en un bucle de tiempo real) | |
| current_tags = {"tag1", "tag2", "tag3"} | |
| current_time = datetime.datetime.now().time() | |
| trigger.check_trigger(current_tags, current_time) | |