mirror of
https://github.com/python/cpython.git
synced 2026-01-06 07:22:09 +00:00
bpo-22367: Add tests for fcntl.lockf(). (GH-17010)
This commit is contained in:
parent
6cbc84fb99
commit
befa032d88
1 changed files with 28 additions and 1 deletions
|
|
@ -5,6 +5,7 @@
|
|||
import struct
|
||||
import sys
|
||||
import unittest
|
||||
from multiprocessing import Process
|
||||
from test.support import (verbose, TESTFN, unlink, run_unittest, import_module,
|
||||
cpython_only)
|
||||
|
||||
|
|
@ -12,7 +13,6 @@
|
|||
fcntl = import_module('fcntl')
|
||||
|
||||
|
||||
# TODO - Write tests for flock() and lockf().
|
||||
|
||||
def get_lockdata():
|
||||
try:
|
||||
|
|
@ -138,6 +138,33 @@ def test_flock(self):
|
|||
self.assertRaises(ValueError, fcntl.flock, -1, fcntl.LOCK_SH)
|
||||
self.assertRaises(TypeError, fcntl.flock, 'spam', fcntl.LOCK_SH)
|
||||
|
||||
def test_lockf_exclusive(self):
|
||||
self.f = open(TESTFN, 'wb+')
|
||||
cmd = fcntl.LOCK_EX | fcntl.LOCK_NB
|
||||
def try_lockf_on_other_process():
|
||||
self.assertRaises(BlockingIOError, fcntl.lockf, self.f, cmd)
|
||||
|
||||
fcntl.lockf(self.f, cmd)
|
||||
p = Process(target=try_lockf_on_other_process)
|
||||
p.start()
|
||||
p.join()
|
||||
fcntl.lockf(self.f, fcntl.LOCK_UN)
|
||||
self.assertEqual(p.exitcode, 0)
|
||||
|
||||
def test_lockf_share(self):
|
||||
self.f = open(TESTFN, 'wb+')
|
||||
cmd = fcntl.LOCK_SH | fcntl.LOCK_NB
|
||||
def try_lockf_on_other_process():
|
||||
fcntl.lockf(self.f, cmd)
|
||||
fcntl.lockf(self.f, fcntl.LOCK_UN)
|
||||
|
||||
fcntl.lockf(self.f, cmd)
|
||||
p = Process(target=try_lockf_on_other_process)
|
||||
p.start()
|
||||
p.join()
|
||||
fcntl.lockf(self.f, fcntl.LOCK_UN)
|
||||
self.assertEqual(p.exitcode, 0)
|
||||
|
||||
@cpython_only
|
||||
def test_flock_overflow(self):
|
||||
import _testcapi
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue