I hit an interesting challenge when working with SMF-real-time.
An application can connect to the SMF real time service, and it will be sent data as it is produced. Depending on a flag the application can use:
- Non blocking. The application gets a record if there is one available. If not it gets are return code saying “No records available”.
- Blocking. The application waits (potentially for ever) for a record.
Which is the best one to use?
This is a general problem, it not just SMF real time that has these challenges.
What are the options- generally?
Non blocking
You need to loop to see if there are records, and if not wait. The challenge is how long should you wait for. If you wait for 10 seconds, then you may get a 10 second delay between the record being created, and your application getting it. If you specify a shorter time you reduce this window. If you reduce the delay time, then the application does more loops per minute and so there is an increase in CPU cost.
Blocking
If you use blocking – there is no extra CPU looping round. The problem is how to do you stop processing your application cleanly when it is in a blocked wait, to allow cleaning up at the end of the processing.
A third way
MQSeries process application messages asynchronously. You can say wait for a message – but time out after a specified time interval if no messages have been received.
This method is very successful. But some application abused it. They want their application to wake up every n second, and check their application’s shutdown flag. If their flag is set, then shutdown.
The correct answer in this case is to have MQ Post an Event Control Block(ECB), the application’s code posts another ECB; gthe mainline code waits for either of the EBCs to be posted, and take the appropriate action. However the lazy way of sleeping, waking, checking and sleeping is quick and easy to code.
What are the options for SMF real time?
With the SMF real time code, while one thread is running – and in a blocked wait, another thread can issue a disconnect() request. This wakes up the waiting thread with a “you’ve been woken up because of a disconnect” return code.
The solution is to use a threading model.
- Make the SMF get a blocking get with timeout
- Wait for a terminal interrupt
- Use an event with time out
The basic SMF get code
while True:
x, rc = f.get(wait=True) # Blocking get
if x == "" : # Happens after disc()
print("Get returned Null string, exiting loop")
break
if rc != "OK":
print("get returned",rc)
print("Disconnect",f.disc())
i += 1
# it worked do something with the record
...
print(i, len(x)) # Process the data
...
Make the SMF get a blocking get with timeout
With every get request, create another thread to wake up and issue the disconnect, to cause the blocked get to wake up.
This may be expensive doing a timer thread create and cancel with every record.
def blocking_get_loop(f, timeout=10, event=None,max_records=None,):
i = 0
while True:
t = Timer(timeout, timerpop(f)) # execute the timeout function after 10 seconds
x, rc = f.get(wait=True) # Blocking get
t.cancel()
....
# This wakes after the specified interval then executes.
def timerpop(f):
f.disc()
Wait for a terminal interrupt
Vignesh S sent me some code which I’ve taken and modified.
The code to the SMF get is in a separate thread. This means the main thread can execute the disconnect and wake up the blocked request.
# Usage example: run until Enter or interrupt, or max_records
if __name__ == "__main__":
blocking_smf(max_records=6) # execute the code below.
def blocking_smf(stream_name="IFASMF.INMEM", debug=0, max_records=None):
f = pySMFRT(stream_name, debug=debug)
f.conn(stream_name,debug=2) # Explicit connect
# Start the blocking loop in a separate thread
get_thread = threading.Thread(target=mainlogic,args=(f),kwargs={"max_records": 4}))
get_thread.start()
try:
# Main thread: wait for user input to stop
input("Press Enter to stop...\n")
print("Stopping...")
except KeyboardInterrupt:
print("Interrupted, stopping...")
finally:
f.disc() # This unblocks the get() call
get_thread.join() # Wait for thread to exit
The key code is
get_thread = threading.Thread(target=mainlogic,args=(f),kwargs={"max_records": 4}))
get_thread.start()
This attaches a thread, execute the function mainlogic, passing the positional parameters f, and keyword arguments max_records.
The code to do the gets and process the requests is the same as before, with the addition of the count of records processed.
def blocking_get_loop(f, max_records=None):
i = 0
while True:
x, rc = f.get(wait=True) # Blocking get
if x == "" : # Happens after disc()
print("Get returned Null string, exiting loop")
break
if rc != "OK":
print("get returned",rc)
print("Disconnect",f.disc())
i += 1
# it worked do something with the record
...
print(i, len(x)) # Process the data
...
#
if max_records and i >= max_records:
print("Reached max records, stopping")
print("Disconnect",f.disc()) # clean up if ended because of number of records
break
If the requested number of records has been processed, or there has been an error, or unexpected data, then disconnect is called, and the function returns.
Handle a terminal interrupt
The code
try:
# Main thread: wait for user input to stop
input("Press Enter to stop...\n")
print("Stopping...")
except KeyboardInterrupt:
print("Interrupted, stopping...")
finally:
f.disc() # This unblocks the get() call
get_thread.join() # Wait for thread to exit
waits for input from the terminal.
This solution is not perfect because if the requested number of records are processed quickly, you still have to enter something at the keyboard.
Use an event with time out
One problem has been notifying the main task when the SMF get task has finished. You can use an event for this.
In the main logic have
def blocking_smf(stream_name="IFASMF.INMEM", debug=0, max_records=None):
f = pySMFRT(stream_name, debug=debug)
f.conn(stream_name,debug=2) # Explicit connect
myevent = threading.Event()
# Start the blocking loop in a separate thread
get_thread = threading.Thread(target=blocking_get_loop, args=(f,),
kwargs={"max_records": max_records,
"event":myevent})
get_thread.start()
# wait for the SMF get task to end - or the event time out
if myevent.wait(timeout=30) is False:
print("We timed out")
f.disc() # wakeup the blocking get
get_thread.join # Wait for it to finish
In the SMF code
def blocking_get_loop(f, t,max_records=None, event=None):
i = 0
while True:
#t = Timer(timeout, timerpop(f))
x, rc = f.get(wait=True) # Blocking get
#t.cancel()
if x == "" : # Happens after disc()
print("Get returned Null string, exiting loop")
break
if rc != "OK":
print("get returned",rc)
print("Disconnect",f.disc())
i += 1
print(i, len(x)) # Process the data
if max_records and i >= max_records:
print("Reached max records, stopping")
print("Disconnect",f.disc())
break
if event is not None:
event.set() # wake up the main task
s