At the high level, I wanted a Python program running as a started task on z/OS to be able to catch operator requests, such as shutdown. I thought the solutions I came up with were a little complex for what I wanted, then I saw an example of using callback which did “After a period of time, invoke this function with these parameters”. Could this be adapted to provide “call this Python function when an operator issues a command to the started task?” As usual it got me into areas I was unfamiliar with, but the answer is yes it can be adapted.
Background
The interface for an application to be notified of an operator request is the z/OS QEDIT interface. There is an Event Control Block(ECB) which gets posted when there is data for it. An application can wait on this ECB.
There are several approaches that can be taken for a (Python) program
- Have an application loop round checking the ECB to see if it has been posted. If it has been posted, issue a WAIT on the ECB, which will wake up immediately; get the message and return. This would work, but how long do you wait between loops? The smaller the time, the more frequently you scan, and so use up more CPU.
- Have a thread which waits to be posted. The thread wakes up and notifies the application.
- Python has an ASYNC interface where applications can multithread on one thread. The code has to be well behaved. It has to give up control to the main thread when it has no work to do. It the (single) thread does an operating system wait, all work stops until the wait completes. This approach will not work as the thread has to wait for the ECB.
- Use a thread from the Python thread pool. You can get a thread from Python, which can wait for the ECB. This thread has to be well behaved and release the Global Interpreter Lock (GIL) (which controls Python multi programming). An application can only update Python data when it has the GIL. It prevents problems with concurrent access to fields.
- Use a thread which is not from the Python task pool. This thread can callback into Python and run a function.
This blog post is about the last item in the above list; using a thread which is not in the Python thread pool, to call back into a function in the main Python program.
High level view of the program
There are several “moving parts” to the program.
- A Python external function which is passed the Python function and any parameters for the function. This external function creates a z/OS thread and passes the Python function name and its parameters to the thread.
- Register a Python shutdown clean-up exit, to wake up or cancel the async thread when the Python program finishes.
- The C program which runs as an independent thread (also known as a subtask or TCB). It registers the thread with Python (and gets the GIL lock) then loops:
- Release the GIL lock
- Waits for the QEDIT ECB to be posted
- Get the GIL lock
- Builds the parameter list of the data received
- Calls the python function passing the original data, and the received data
- The Python function is passed the original parameters, and the data from the request. The Python function can add data to a queue, update Python variables and can enable an Python event. The main task can waiting on this event, and so process the requests when they come in.
The main python program
The handle = zconsole.acb(ccp_cb,[exit_event]) creates the async thread, and returns a handle. The handle is used to cancel the outstanding wait.
There is code to update variables in a thread safe manner by using a threadlock.
An event is used to signal completion.
import zconsole as zconsole ... # This is the callback function which gets control from the C program def ccp_cb(args,QEDIT_data) : global stop # set this to stop to 1 to end processing global global_counter # increment this parms = args[1] # [functionName,[parms]) e = parms[0] # event with threadLock: global_counter += 1 print("qedit",QEDIT_data) # display what we received if QEDIT_data["verb"] == "Stop": stop = 1 e.set() # post event - wake up main ############################################### threadLock = threading.Lock() # for serialisation of updates exit_event = threading.Event() # for event processing # wait for up to 30 seconds at most 4 times # initiate the console wait. using Asynchronouse CallBack handle = zconsole.acb(ccp_cb,[exit_event]) # This returns a handle. for i in range (0,3): # at most 4 times exit_flag.wait(timeout=30) # set 30 seconds time out if (exit_flag.is_set() == False): # we timed out break print("GlobalCounter",global_counter) print("stop",stop) # debug info if stop == 1: break print("after stop ",stop) zconsole.cancel(handle) # stop the async task
The external function zconsole.acb(function,[parms])
The external acb (asychronous call back) function (written in C) has code
- to read the parameters passed to the function
- increment the use count of the python fields to prevent Python from freeing them. The async thread decrements the use-count
- attaches a thread to run a program (called cthread).
... pthread_t thid; PyObject * method = NULL; PyObject * parms = NULL; // get the data as objects if (!PyArg_ParseTuple(args,"OO", &method, // function &parms )) // parms { /// problem? return NULL; } ... // zargs is used to hold the parameters zargs -> method = method; zargs -> parms = parms; // the following are decremented within the Async thread Py_INCREF(zargs -> parms; /* Prevent it from being deallocated. */ Py_INCREF(zargs -> method);/* Prevent it from being deallocated. */ // create the thread rc = pthread_create(&thid, NULL, cthread, zargs);
The async C thread to process the QEDIT data
This program
- is passed the parameter list containing the Python function Object, and the Python function parameter list object
- releases the GIL
- executes the assembler program which waits on the QEDIT ECB
- when this returns, it gets the GIL
- builds a dictionary of parameters (“name”:”value”,…) from the QEDIT data
- calls the Python function passing the function object, the parameters passed to the external function, and the dictionary of parameters from the operator request (from QEDIT).
void * cthread(void *_arg) { struct thread_args * zargs = (struct thread_args *) _arg ; PyGILState_STATE gstate; PyObject *rv = NULL; // returned object PyObject *x = NULL; // returned object char * ret = 0; long rc; int stop = 0; rc = 0; // register this thread to Python gstate = PyGILState_Ensure(); loop{ Py_BEGIN_ALLOW_THREADS // QEDIT waits to be posted and returns the data rc = QEDIT( pMsg); // assembler function // get the GIL and stop any other work Py_END_ALLOW_THREADS ... // convert console name from EBCDIC to ASCII __e2a_l( pCIBX ->consolename ,8 ); // build the parameter list to pass to Python function rv = Py_BuildValue("{s:i,s:s,s:s#,s:s#,s:y#,s:y#,s:y#}", "rc", rc, "verb", pVerb, "data",&(pCIB -> data[0]),lData, "console",&(pCIBX -> consolename),l8, "cart",&(pCIBX -> CART),l8, "consoleid",&(pCIBX -> consoleid),l8, "oconsoleid",&(pCIBX -> consoleid),l8); Py_INCREF(rv); /* Prevent it from being deallocated. */ // Call the Python function x = PyObject_CallFunctionObjArgs( zargs -> method,zargs -> a1,rv , NULL); if ( x != NULL) Py_DECREF(x ); /* Prevent x from being deallocated. */ if (stop >0) { //printf("Stop found - cthread exiting \n"); break; } } // end of main loop if ( zargs -> a1 != NULL) Py_DECREF(zargs -> a1); /* allow it to be deallocated. */ if ( zargs -> method != NULL) Py_DECREF(zargs -> method); /* Alllow it to be deallocated. */ pthread_exit(ret); return 0;
Ending the thread
A thread running asynchronously needs to end when the caller end. If it stays running you will get a system abend A03.
You have a choice
- Pass a “shutdown ECB” to the thread, and have the thread wait on an ECBLIST (shutdown ECB, and QEDIT ECB). The high level application can then post this ECB. I had an external function zconsole.cancel(handle). This got the address of the ECB from the parameter, and posted it
- Cancel the thread. I had an external function zconsole.cancel(…). This was passed the thread-id, and issued pthread_cancel(thread-id). In the end I used the shutdown ECB as it was cleaner.
I found it best to use a class for my thread, and register for a function to be called at the Python program shutdown.
For example
class console: handle = None def __init__(self,a): print("console.__init__",a) def cb(self,a,b): # call the function to create the async task # and return the handle self.handle = zconsole.acb(a,b) #register cleanup for shutdown atexit.register(self.cleanup,self.handle) def cleanup(self,handle): print("IN CLEANUP") if handle != None: zconsole.cancel(self.handle)
This says when the cb function is called to set up the callback, add this object and the cleanup routine to the list of “shutdown” activities. The cleanup function, tells the async thread to shutdown.
How do you know the thread has ended?
You can use code like pthread_cleanup_push and pthread_cleanup_pop to call an ending function. This function is called when the thread:
• Calls pthread_exit()
• Does a return from the start routine
• Is cancelled because of a pthread_cancel()
In your cleanup routine you need to check for locks and other resources owned by the thread, and release them.
PyGILState_STATE gstate; // referred to from cthread and cleanup void cleanup(void * arg) { printf("Thread was cancelled!\n\n"); int s = PyGILState_Check(); printf("chthread Python latch %d\n",s); // release the lock if we have it if (s) PyGILState_Release(gstate); } void * cthread(void *_arg) { pthread_cleanup_push(cleanup,NULL); struct thread_args * tA = (struct thread_args *) _arg ; ... pthread_cleanup_pop(0); pthread_exit(ret); }