I wanted to have a long running started task with Python acting as a server. As part of this I needed to wait on more than one event. This proved to be a hard challenge to get working.
Background
On z/OS a “process” is an address space, and a thread is a TCB.
There are several Python models for doing asynchronous work, and waiting for one or more events.
- Multi processing. One thread acting as a dispatcher. “threads” are put on the work queue when they are ready to run, and taken off the work queue when they are waiting. Just like an operating system. This is the asyncio model.
- Using multiple thread for the application. This is the ThreadPoolExecutor.
- Using different address spaces for the application. This is the ProcessPoolExecutor.
- Create threads within an extension function.
Information
I found the following very useful
- Asyncio Python documentation
- ThreadPoolExecutor in Python: The Complete Guide
- How To Stop Running Tasks in the ThreadPoolExecutor in Python
- Grok the GIL: How to write fast and thread-safe Python
- Compare ThreadPool and Asyncio.
Background knowledge
It took me a couple of days to get my parallel processing program to work. Even when I understood the concepts I still go it wrong, till I had a flash of understanding.
The Python Global Interpreter Lock (GIL)
To understand how Python works especially with multiple concurrent tasks you need to understand the Python Global Interpreter Lock.
Python code is not threadsafe, it is pseudo threadsafe. Rather than have to worry about concurrent access to data, and different threads being able to read and change data at the same time, Python allows only one application to run at a time. It uses a global lock for this. Your application gets the lock, does some work, and releases the lock. With a simple application this is invisible. When you try to develop an application with parallel “threads” you need to understand the lock.
My first operating system
When people start writing an operating system from scratch they may have logic like
- Start the I/O
- Spin in an instruction loop waiting for the I/O to complete
- Do some more work
If you have only one processor in your system, no other work is done while waiting for the I/O to complete.
My second operating system
Having written your first operating system, the next operating system is more refined and has logic like
- Start the I/O
- Give up control – but resume the application when the I/O completes
- Resume from here.
In this case even with just one processor in your system, it can do lots of other work while the I/O is in progress. That application instance is suspended until the I/O completes.
The same principles apply to Python.
Python concurrent processing models
As well as the “single threading” standard Python program, Python supports 3 concurrent processing models
- One thread in one process (one address space). It can support concurrent bits of application as long as they cooperate while they are waiting for something. This is known as the asyncio model.
- Multiple threads in one process (one address space). A typical use of this is CPU intensive threads, or operating systems waits. Conceptually there is no cooperation with Python waiting. This is known as the ThreadPool model.
- One or more threads in multiple processes (Multiple address spaces). This is known as the ProcessPool model. I cannot see many usage cases for this model.
I’ll give you an exercise to help you understand the processing.
async def cons(name): print(name, "start",datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f'),flush=True) time.sleep(10) print(name,"stop",datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f'),flush=True) return (42) w = asyncio.create_task(cons("A")) c = asyncio.create_task(cons("B") done, pending = await asyncio.wait([c,w],return_when=asyncio.ALL_COMPLETED)
The above code (based on examples from the web) creates two asynchronous instances which allow you to run them in parallel. The instance is created with the create_task, and the wait for them both to complete is in the asyncio.wait([]) function.
Does it print out
A start 16:00:00
B start 16:00:00
A stop 16:00:10
B stop 16:00.10
Or
A start 16:00:00
A stop 16:00:10
B start 16:00.10
B stop 16:00:20
Full marks if you chose the second one. This is because time.sleep(10) does not give up control. It runs, waits, ends, and only then can B run.
If we replace time.sleep(10) with await asyncio.sleep(10). This “sleep” function has been enhanced with cooperation or “give up control”. When this is used, you get the first output, and both finish in 10 second.
From this I learned that not all Python functions are designed for running in parallel.
By displaying information about what was running, I could see that both instances were running on the same thread(TCB).
Using multiple TCBs.
I wrote an extension which waited for a z/OS console event. I had a “console” routine, and a “wait” routing in the Python program
When I used the asyncio model, there is only one task (TCB). All work was suspended while the my z/OS wait was in progress. As soon as this finished, other work ran. In this case using the asycnio model, and my external function doing an operating system wait, did not work.
I then switched to the ThreadPool model, so I could use one TCB for the z/OS wait thread, and the other Python work could run on a different TCB.
However this appeared to have the same problem. No work was done while the z/OS wait was in progress.
This was because of the Global Interpreter Lock. My thread was dispatched holding the GIL across the wait, so no other Python work ran – it was all waiting for the GIL.
To fix this I had to change my program to “cooperate” with Python and release the lock when it was not needed. In my C program I used
Py_BEGIN_ALLOW_THREADS
rc =ProgramToWaitForOperatorData()
Py_END_ALLOW_THREADS
- Py_BEGIN_ALLOW_THREADS says give up the Python lock.
- Py_END_ALLOW_THREADS says I’m ready to run – please give me the Python lock.
With this small coding fix, I got my parallelism.
From this I learned that you need to worry about the Global Lock if your Python Extension issues a wait, or can be suspended.
More information on coding with Asyncio
This model has one task which does all of the work. To successfully work in this environment, they need to use “cooperative function”. For example “await asyncio.sleep(2)” instead of the uncooperative “time.sleep(2)”. Extensions must not use long waits. If the extension waits, everything waits.
Minimum setup
You need
- import asyncio at the top of your program
- asyncio.run(main2()) to actually run your function (main2) in asyncio mode.
For example
import asyncio # The following is defined as a function - but it does all the work async def main2(): ... # This runs the above routine as an async thread. asyncio.run(main2())
I defined the mywait function. It is passed an event so it can post (set) it and wake up the caller.
async def mywait(event): print("WAIT Start",datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f'),flush=True) time_event = threading.Event() for i in range(0,4): time_event.wait(10) # every 10 seconds print("WAIT Woke ",datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f'),flush=True) if event.is_set(): print("WAIT Event",datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f'),flush=True) break print("WAIT STOP ",datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f'),flush=True) event.set() return 44
To create an asynchronous thread and start it running, use
w = asyncio.create_task(mywait(event),name=”MYWait”)
print(“W”,w)
gives
W <Task pending name=’MYWait’ coro=<mywait() running at /u/tmp/console/AS2.py:18>>
This means
- It it a Task
- It is pending execution (not finished running yet)
- The name is ‘MYWait’
- The routine is a function “mywait()”
- from at /u/tmp/console/AS2.py:18
To wait for one or more tasks to complete use
done, pending = await asyncio.wait([c,w],return_when=asyncio.ALL_COMPLETED)
You give a list of the threads [c,w] and specify when you want it to return
- return_when=asyncio.ALL_COMPLETED
- return_when=asyncio.FIRST_COMPLETED
This returns a list of the tasks (done) which have finished, and a list of those (pending) which have not finished yet.
You can use
if c in done: print(c.result()) do something else
My console routine is defined
async def cons(event):
print("CONS start",datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f'),flush=True)
await asyncio.sleep(2) # do something which cooperates
print("CONS Stop ",datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f'),flush=True)
return (42)
Coding with ThreadPoolExecutor
With ThreadPoolExecutor you setup a thread pool. Any requests that are created, use a thread from this pool. If there are no available threads, the request is delayed until a thread is available.
A thread can use an operating system sleep but extensions need to release and obtain the Python GIL lock.
Minimum setup
You need
- import concurrent.futures at the top of your program
- executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) to create a thread pool.
To create an asynchronous thread and start it running with function “mywait” use
w = executor.submit(mywait,parm1,parm2)
Note: This is different to the asyncio model where you passed mywait(parm1,parm2) .
print(“W”,w) gives
W < Future at 0x5008b9a4c0 state=running>
To wait for one or more tasks to complete use
done, pending = concurrent.futures.wait( [w,c], return_when=concurrent.futures.FIRST_COMPLETED)
You give a list of the threads [c,w] and specify when you want it to return
- return_when=asyncio.ALL_COMPLETED
- return_when=asyncio.FIRST_COMPLETED
This returns a list of the tasks (done) which have finished, and a list of those (pending) which have not finished yet.
You can use
if c in done: print(c.result()) do something else
The routine is defined
def cons(event):
print(“CONS start”,datetime.utcnow().strftime(‘%Y-%m-%d %H:%M:%S.%f’),flush=True)
…
print(“CONS Stop “,datetime.utcnow().strftime(‘%Y-%m-%d %H:%M:%S.%f’),flush=True)
return yy
ProcessPoolExecutor
I cannot see many uses for the ProcessPoolExecutor model. This runs threads in different address spaces. It makes sharing of information (such as program variables) much harder.
The basic programs is like
import concurrent.futures def cons(): zconsole.put("CONS TASK") # do something involving a long wait return x def foo(a): zconsole.put("FOO TASK") # do something involving a long wait return z zconsole.put("MAIN TASK") executor = concurrent.futures.ProcessPoolExecutor(max_workers=3) w = executor.submit(foo2,"parameter1") c = executor.submit(cons) done, pending = concurrent.futures.wait([w,c],return_when=concurrent.futures.FIRST_COMPLETED) if c in done: print("cons task finished: result",c.result())
The output on the z/OS console included
S PYT
IEF695I START PYT WITH JOBNAME PYT IS ASSIGNED TO USER START1
STC06801 +MAIN TASK
IRR812I PROFILE * (G) IN THE STARTED CLASS WAS USED
TO START BPXAS WITH JOBNAME BPXAS.
IEF403I BPXAS – STARTED – TIME=06.29.05
BPXP024I BPXAS INITIATOR STARTED ON BEHALF OF JOB PYT RUNNING IN ASID
0045
IRR812I PROFILE * (G) IN THE STARTED CLASS WAS USED 617
TO START BPXAS WITH JOBNAME BPXAS.
IEF403I BPXAS – STARTED – TIME=06.29.05
BPXP024I BPXAS INITIATOR STARTED ON BEHALF OF JOB PYT RUNNING IN ASID
0045
IEF403I BPXAS – STARTED – TIME=06.29.06
BPXP024I BPXAS INITIATOR STARTED ON BEHALF OF JOB PYT RUNNING IN ASID
0045
STC06802 +FOO TASK
STC06803+CONS TASK
Where 3 address spaces were started up, and the three Write To Operator requests are shown in bold, each coming from a different address space.
It takes a second or so to start each address space, so the start up of this approach is slower than using the thread model.
How it works
Your program is run in each address space.
You need to have
def main2: .... if name == 'main': main2()
You need the “if name == ‘main'” to prevent the “main” starting in all the address spaces.
You can pass data to the asynchronous object for example
w = executor.submit(foo2,"parameter1")
I do not think the objects are shared between different address spaces, so I think you need to treat these asynchronous functions as an opaque box. You give it data at start time, and you get the result when it has finished.
With the asynio and the ThreadPoolExecutor, they both run in the same address space, so an Python Object is available to all functions and threads.
Creating a thread in an external function
You can create a thread from an external function, so you are responsible for creating and ending the threads.
These threads can use Python services, such as call back to execute a Python function, or access variables and other information.
Your thread needs to register to Python using PyGILState_Ensure()… PyGILState_Release(). The thread has the GIL, and this must be given up when the thread is doing non Python work, and acquired when doing anything with Python.
PyGILState_STATE gstate; gstate = PyGILState_Ensure(); // this gets the GIL ... // Give up the Python GIL lock Py_BEGIN_ALLOW_THREADS ... do some non Python work including wait // Need to do some Python work, so get the GIL Py_END_ALLOW_THREADS Py_BuildValue.... PyGILState_Release(gstate); pthread_exit(0);
You are responsible for terminating the thread at shutdown. This can be done using pthread_cancel(), or passing a request to the thread saying it needs to end.