eckity.before_after_publisher
1BEFORE_OPERATOR_EVENT_NAME = 'before_operator' 2AFTER_OPERATOR_EVENT_NAME = 'after_operator' 3 4 5class BeforeAfterPublisher: 6 def __init__(self, events=None, event_names=None): 7 ext_events_names = event_names if event_names is not None else [] 8 if events is None: 9 # Initialize events dictionary with event names as keys and subscribers as values 10 ext_events_names.extend([BEFORE_OPERATOR_EVENT_NAME, AFTER_OPERATOR_EVENT_NAME]) 11 self.events = {event: {} for event in ext_events_names} 12 else: 13 # Assign an already existing dictionary, in case of deserialization/clone 14 self.events = events 15 self.customers_id = 0 16 17 def _get_subscribers(self, event_name): 18 return self.events[event_name] 19 20 def register(self, event, callback=None): 21 # TODO warning if register to a fake event 22 if callback is None: 23 callback = {lambda _: None} 24 self._get_subscribers(event)[self.customers_id] = callback 25 self.customers_id += 1 26 return self.customers_id - 1 27 28 def unregister(self, event, customers_id): 29 # TODO warn if unregister to a fake event 30 del self._get_subscribers(event)[customers_id] 31 32 def publish(self, event_name): 33 struct = self.event_name_to_data(event_name) 34 for subscriber, callback in self._get_subscribers(event_name).items(): 35 callback(self, struct) 36 37 def event_name_to_data(self, event_name): 38 return {} # TODO abs? 39 40 def act_and_publish_before_after(self, act_func: callable): 41 self.publish(BEFORE_OPERATOR_EVENT_NAME) 42 return_val = act_func() 43 self.publish(AFTER_OPERATOR_EVENT_NAME) 44 return return_val
BEFORE_OPERATOR_EVENT_NAME =
'before_operator'
AFTER_OPERATOR_EVENT_NAME =
'after_operator'
class
BeforeAfterPublisher:
6class BeforeAfterPublisher: 7 def __init__(self, events=None, event_names=None): 8 ext_events_names = event_names if event_names is not None else [] 9 if events is None: 10 # Initialize events dictionary with event names as keys and subscribers as values 11 ext_events_names.extend([BEFORE_OPERATOR_EVENT_NAME, AFTER_OPERATOR_EVENT_NAME]) 12 self.events = {event: {} for event in ext_events_names} 13 else: 14 # Assign an already existing dictionary, in case of deserialization/clone 15 self.events = events 16 self.customers_id = 0 17 18 def _get_subscribers(self, event_name): 19 return self.events[event_name] 20 21 def register(self, event, callback=None): 22 # TODO warning if register to a fake event 23 if callback is None: 24 callback = {lambda _: None} 25 self._get_subscribers(event)[self.customers_id] = callback 26 self.customers_id += 1 27 return self.customers_id - 1 28 29 def unregister(self, event, customers_id): 30 # TODO warn if unregister to a fake event 31 del self._get_subscribers(event)[customers_id] 32 33 def publish(self, event_name): 34 struct = self.event_name_to_data(event_name) 35 for subscriber, callback in self._get_subscribers(event_name).items(): 36 callback(self, struct) 37 38 def event_name_to_data(self, event_name): 39 return {} # TODO abs? 40 41 def act_and_publish_before_after(self, act_func: callable): 42 self.publish(BEFORE_OPERATOR_EVENT_NAME) 43 return_val = act_func() 44 self.publish(AFTER_OPERATOR_EVENT_NAME) 45 return return_val
BeforeAfterPublisher(events=None, event_names=None)
7 def __init__(self, events=None, event_names=None): 8 ext_events_names = event_names if event_names is not None else [] 9 if events is None: 10 # Initialize events dictionary with event names as keys and subscribers as values 11 ext_events_names.extend([BEFORE_OPERATOR_EVENT_NAME, AFTER_OPERATOR_EVENT_NAME]) 12 self.events = {event: {} for event in ext_events_names} 13 else: 14 # Assign an already existing dictionary, in case of deserialization/clone 15 self.events = events 16 self.customers_id = 0