mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 13:41:24 +00:00 
			
		
		
		
	 068d5724d8
			
		
	
	
		068d5724d8
		
	
	
	
	
		
			
			terminated; this makes the final assert in the self-test code fail if the parent runs faster than the children. Fix this by calling wait() on the remaining children instead.
		
			
				
	
	
		
			100 lines
		
	
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			100 lines
		
	
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import sys
 | |
| import string
 | |
| 
 | |
| MAXFD = 256     # Max number of file descriptors (os.getdtablesize()???)
 | |
| 
 | |
| _active = []
 | |
| 
 | |
| def _cleanup():
 | |
|     for inst in _active[:]:
 | |
|         inst.poll()
 | |
| 
 | |
| class Popen3:
 | |
|     def __init__(self, cmd, capturestderr=0, bufsize=-1):
 | |
|         if type(cmd) == type(''):
 | |
|             cmd = ['/bin/sh', '-c', cmd]
 | |
|         p2cread, p2cwrite = os.pipe()
 | |
|         c2pread, c2pwrite = os.pipe()
 | |
|         if capturestderr:
 | |
|             errout, errin = os.pipe()
 | |
|         self.pid = os.fork()
 | |
|         if self.pid == 0:
 | |
|             # Child
 | |
|             os.close(0)
 | |
|             os.close(1)
 | |
|             if os.dup(p2cread) <> 0:
 | |
|                 sys.stderr.write('popen2: bad read dup\n')
 | |
|             if os.dup(c2pwrite) <> 1:
 | |
|                 sys.stderr.write('popen2: bad write dup\n')
 | |
|             if capturestderr:
 | |
|                 os.close(2)
 | |
|                 if os.dup(errin) <> 2: pass
 | |
|             for i in range(3, MAXFD):
 | |
|                 try:
 | |
|                     os.close(i)
 | |
|                 except: pass
 | |
|             try:
 | |
|                 os.execvp(cmd[0], cmd)
 | |
|             finally:
 | |
|                 os._exit(1)
 | |
|             # Shouldn't come here, I guess
 | |
|             os._exit(1)
 | |
|         os.close(p2cread)
 | |
|         self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
 | |
|         os.close(c2pwrite)
 | |
|         self.fromchild = os.fdopen(c2pread, 'r', bufsize)
 | |
|         if capturestderr:
 | |
|             os.close(errin)
 | |
|             self.childerr = os.fdopen(errout, 'r', bufsize)
 | |
|         else:
 | |
|             self.childerr = None
 | |
|         self.sts = -1 # Child not completed yet
 | |
|         _active.append(self)
 | |
|     def poll(self):
 | |
|         if self.sts < 0:
 | |
|             try:
 | |
|                 pid, sts = os.waitpid(self.pid, os.WNOHANG)
 | |
|                 if pid == self.pid:
 | |
|                     self.sts = sts
 | |
|                     _active.remove(self)
 | |
|             except os.error:
 | |
|                 pass
 | |
|         return self.sts
 | |
|     def wait(self):
 | |
|         pid, sts = os.waitpid(self.pid, 0)
 | |
|         if pid == self.pid:
 | |
|             self.sts = sts
 | |
|             _active.remove(self)
 | |
|         return self.sts
 | |
| 
 | |
| def popen2(cmd, bufsize=-1):
 | |
|     _cleanup()
 | |
|     inst = Popen3(cmd, 0, bufsize)
 | |
|     return inst.fromchild, inst.tochild
 | |
| 
 | |
| def popen3(cmd, bufsize=-1):
 | |
|     _cleanup()
 | |
|     inst = Popen3(cmd, 1, bufsize)
 | |
|     return inst.fromchild, inst.tochild, inst.childerr
 | |
| 
 | |
| def _test():
 | |
|     teststr = "abc\n"
 | |
|     print "testing popen2..."
 | |
|     r, w = popen2('cat')
 | |
|     w.write(teststr)
 | |
|     w.close()
 | |
|     assert r.read() == teststr
 | |
|     print "testing popen3..."
 | |
|     r, w, e = popen3(['cat'])
 | |
|     w.write(teststr)
 | |
|     w.close()
 | |
|     assert r.read() == teststr
 | |
|     assert e.read() == ""
 | |
|     for inst in _active[:]:
 | |
|         inst.wait()
 | |
|     assert not _active
 | |
|     print "All OK"
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     _test()
 |