| 
									
										
										
										
											2010-07-08 21:39:08 +00:00
										 |  |  | """
 | 
					
						
							|  |  |  | A module built to test if the current process has the privilege to | 
					
						
							|  |  |  | create symlinks on Windows. | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # allow script to run natively under python 2.6+ | 
					
						
							|  |  |  | from __future__ import print_function | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ctypes | 
					
						
							|  |  |  | from ctypes import wintypes | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | GetCurrentProcess = ctypes.windll.kernel32.GetCurrentProcess | 
					
						
							|  |  |  | GetCurrentProcess.restype = wintypes.HANDLE | 
					
						
							|  |  |  | OpenProcessToken = ctypes.windll.advapi32.OpenProcessToken | 
					
						
							| 
									
										
										
										
											2010-07-09 15:58:59 +00:00
										 |  |  | OpenProcessToken.argtypes = (wintypes.HANDLE, wintypes.DWORD, | 
					
						
							|  |  |  |                              ctypes.POINTER(wintypes.HANDLE)) | 
					
						
							| 
									
										
										
										
											2010-07-08 21:39:08 +00:00
										 |  |  | OpenProcessToken.restype = wintypes.BOOL | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class LUID(ctypes.Structure): | 
					
						
							|  |  |  |     _fields_ = [ | 
					
						
							|  |  |  |             ('low_part', wintypes.DWORD), | 
					
						
							|  |  |  |             ('high_part', wintypes.LONG), | 
					
						
							|  |  |  |             ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __eq__(self, other): | 
					
						
							|  |  |  |         return ( | 
					
						
							|  |  |  |                 self.high_part == other.high_part and | 
					
						
							|  |  |  |                 self.low_part == other.low_part | 
					
						
							|  |  |  |                 ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __ne__(self, other): | 
					
						
							|  |  |  |         return not (self==other) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | LookupPrivilegeValue = ctypes.windll.advapi32.LookupPrivilegeValueW | 
					
						
							|  |  |  | LookupPrivilegeValue.argtypes = ( | 
					
						
							|  |  |  |         wintypes.LPWSTR, # system name | 
					
						
							|  |  |  |         wintypes.LPWSTR, # name | 
					
						
							|  |  |  |         ctypes.POINTER(LUID), | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | LookupPrivilegeValue.restype = wintypes.BOOL | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TOKEN_INFORMATION_CLASS: | 
					
						
							|  |  |  |     TokenUser = 1 | 
					
						
							|  |  |  |     TokenGroups = 2 | 
					
						
							|  |  |  |     TokenPrivileges = 3 | 
					
						
							|  |  |  |     # ... see http://msdn.microsoft.com/en-us/library/aa379626%28VS.85%29.aspx | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | SE_PRIVILEGE_ENABLED_BY_DEFAULT = (0x00000001) | 
					
						
							|  |  |  | SE_PRIVILEGE_ENABLED            = (0x00000002) | 
					
						
							|  |  |  | SE_PRIVILEGE_REMOVED            = (0x00000004) | 
					
						
							|  |  |  | SE_PRIVILEGE_USED_FOR_ACCESS    = (0x80000000) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class LUID_AND_ATTRIBUTES(ctypes.Structure): | 
					
						
							|  |  |  |     _fields_ = [ | 
					
						
							|  |  |  |             ('LUID', LUID), | 
					
						
							|  |  |  |             ('attributes', wintypes.DWORD), | 
					
						
							|  |  |  |             ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def is_enabled(self): | 
					
						
							|  |  |  |         return bool(self.attributes & SE_PRIVILEGE_ENABLED) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def enable(self): | 
					
						
							|  |  |  |         self.attributes |= SE_PRIVILEGE_ENABLED | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_name(self): | 
					
						
							|  |  |  |         size = wintypes.DWORD(10240) | 
					
						
							|  |  |  |         buf = ctypes.create_unicode_buffer(size.value) | 
					
						
							|  |  |  |         res = LookupPrivilegeName(None, self.LUID, buf, size) | 
					
						
							|  |  |  |         if res == 0: | 
					
						
							|  |  |  |             raise RuntimeError | 
					
						
							|  |  |  |         return buf[:size.value] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __str__(self): | 
					
						
							|  |  |  |         name = self.name | 
					
						
							|  |  |  |         fmt = ['{name}', '{name} (enabled)'][self.is_enabled()] | 
					
						
							|  |  |  |         return fmt.format(**vars()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | LookupPrivilegeName = ctypes.windll.advapi32.LookupPrivilegeNameW | 
					
						
							|  |  |  | LookupPrivilegeName.argtypes = ( | 
					
						
							|  |  |  |         wintypes.LPWSTR, # lpSystemName | 
					
						
							|  |  |  |         ctypes.POINTER(LUID), # lpLuid | 
					
						
							|  |  |  |         wintypes.LPWSTR, # lpName | 
					
						
							|  |  |  |         ctypes.POINTER(wintypes.DWORD), #cchName | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | LookupPrivilegeName.restype = wintypes.BOOL | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TOKEN_PRIVILEGES(ctypes.Structure): | 
					
						
							|  |  |  |     _fields_ = [ | 
					
						
							|  |  |  |             ('count', wintypes.DWORD), | 
					
						
							|  |  |  |             ('privileges', LUID_AND_ATTRIBUTES*0), | 
					
						
							|  |  |  |             ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_array(self): | 
					
						
							|  |  |  |         array_type = LUID_AND_ATTRIBUTES*self.count | 
					
						
							| 
									
										
										
										
											2010-07-09 15:58:59 +00:00
										 |  |  |         privileges = ctypes.cast(self.privileges, | 
					
						
							|  |  |  |                                  ctypes.POINTER(array_type)).contents | 
					
						
							| 
									
										
										
										
											2010-07-08 21:39:08 +00:00
										 |  |  |         return privileges | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __iter__(self): | 
					
						
							|  |  |  |         return iter(self.get_array()) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | PTOKEN_PRIVILEGES = ctypes.POINTER(TOKEN_PRIVILEGES) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | GetTokenInformation = ctypes.windll.advapi32.GetTokenInformation | 
					
						
							|  |  |  | GetTokenInformation.argtypes = [ | 
					
						
							|  |  |  |         wintypes.HANDLE, # TokenHandle | 
					
						
							|  |  |  |         ctypes.c_uint, # TOKEN_INFORMATION_CLASS value | 
					
						
							|  |  |  |         ctypes.c_void_p, # TokenInformation | 
					
						
							|  |  |  |         wintypes.DWORD, # TokenInformationLength | 
					
						
							|  |  |  |         ctypes.POINTER(wintypes.DWORD), # ReturnLength | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  | GetTokenInformation.restype = wintypes.BOOL | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # http://msdn.microsoft.com/en-us/library/aa375202%28VS.85%29.aspx | 
					
						
							|  |  |  | AdjustTokenPrivileges = ctypes.windll.advapi32.AdjustTokenPrivileges | 
					
						
							|  |  |  | AdjustTokenPrivileges.restype = wintypes.BOOL | 
					
						
							|  |  |  | AdjustTokenPrivileges.argtypes = [ | 
					
						
							|  |  |  |         wintypes.HANDLE,                # TokenHandle | 
					
						
							|  |  |  |         wintypes.BOOL,                  # DisableAllPrivileges | 
					
						
							|  |  |  |         PTOKEN_PRIVILEGES,              # NewState (optional) | 
					
						
							|  |  |  |         wintypes.DWORD,                 # BufferLength of PreviousState | 
					
						
							|  |  |  |         PTOKEN_PRIVILEGES,              # PreviousState (out, optional) | 
					
						
							|  |  |  |         ctypes.POINTER(wintypes.DWORD), # ReturnLength | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_process_token(): | 
					
						
							|  |  |  |     "Get the current process token" | 
					
						
							|  |  |  |     token = wintypes.HANDLE() | 
					
						
							|  |  |  |     TOKEN_ALL_ACCESS = 0xf01ff | 
					
						
							|  |  |  |     res = OpenProcessToken(GetCurrentProcess(), TOKEN_ALL_ACCESS, token) | 
					
						
							|  |  |  |     if not res > 0: | 
					
						
							|  |  |  |         raise RuntimeError("Couldn't get process token") | 
					
						
							|  |  |  |     return token | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_symlink_luid(): | 
					
						
							|  |  |  |     "Get the LUID for the SeCreateSymbolicLinkPrivilege" | 
					
						
							|  |  |  |     symlink_luid = LUID() | 
					
						
							| 
									
										
										
										
											2010-07-09 15:58:59 +00:00
										 |  |  |     res = LookupPrivilegeValue(None, "SeCreateSymbolicLinkPrivilege", | 
					
						
							|  |  |  |                                symlink_luid) | 
					
						
							| 
									
										
										
										
											2010-07-08 21:39:08 +00:00
										 |  |  |     if not res > 0: | 
					
						
							|  |  |  |         raise RuntimeError("Couldn't lookup privilege value") | 
					
						
							|  |  |  |     return symlink_luid | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_privilege_information(): | 
					
						
							|  |  |  |     "Get all privileges associated with the current process." | 
					
						
							|  |  |  |     # first call with zero length to determine what size buffer we need | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return_length = wintypes.DWORD() | 
					
						
							|  |  |  |     params = [ | 
					
						
							|  |  |  |             get_process_token(), | 
					
						
							|  |  |  |             TOKEN_INFORMATION_CLASS.TokenPrivileges, | 
					
						
							|  |  |  |             None, | 
					
						
							|  |  |  |             0, | 
					
						
							|  |  |  |             return_length, | 
					
						
							|  |  |  |             ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     res = GetTokenInformation(*params) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # assume we now have the necessary length in return_length | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     buffer = ctypes.create_string_buffer(return_length.value) | 
					
						
							|  |  |  |     params[2] = buffer | 
					
						
							|  |  |  |     params[3] = return_length.value | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     res = GetTokenInformation(*params) | 
					
						
							|  |  |  |     assert res > 0, "Error in second GetTokenInformation (%d)" % res | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     privileges = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents | 
					
						
							|  |  |  |     return privileges | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def report_privilege_information(): | 
					
						
							|  |  |  |     "Report all privilege information assigned to the current process." | 
					
						
							|  |  |  |     privileges = get_privilege_information() | 
					
						
							|  |  |  |     print("found {0} privileges".format(privileges.count)) | 
					
						
							|  |  |  |     tuple(map(print, privileges)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def enable_symlink_privilege(): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Try to assign the symlink privilege to the current process token. | 
					
						
							|  |  |  |     Return True if the assignment is successful. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     # create a space in memory for a TOKEN_PRIVILEGES structure | 
					
						
							|  |  |  |     #  with one element | 
					
						
							|  |  |  |     size = ctypes.sizeof(TOKEN_PRIVILEGES) | 
					
						
							|  |  |  |     size += ctypes.sizeof(LUID_AND_ATTRIBUTES) | 
					
						
							|  |  |  |     buffer = ctypes.create_string_buffer(size) | 
					
						
							|  |  |  |     tp = ctypes.cast(buffer, ctypes.POINTER(TOKEN_PRIVILEGES)).contents | 
					
						
							|  |  |  |     tp.count = 1 | 
					
						
							|  |  |  |     tp.get_array()[0].enable() | 
					
						
							|  |  |  |     tp.get_array()[0].LUID = get_symlink_luid() | 
					
						
							|  |  |  |     token = get_process_token() | 
					
						
							|  |  |  |     res = AdjustTokenPrivileges(token, False, tp, 0, None, None) | 
					
						
							|  |  |  |     if res == 0: | 
					
						
							|  |  |  |         raise RuntimeError("Error in AdjustTokenPrivileges") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     ERROR_NOT_ALL_ASSIGNED = 1300 | 
					
						
							|  |  |  |     return ctypes.windll.kernel32.GetLastError() != ERROR_NOT_ALL_ASSIGNED | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def main(): | 
					
						
							|  |  |  |     assigned = enable_symlink_privilege() | 
					
						
							|  |  |  |     msg = ['failure', 'success'][assigned] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     print("Symlink privilege assignment completed with {0}".format(msg)) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == '__main__': | 
					
						
							|  |  |  |     main() |