]> git.k1024.org Git - pyxattr.git/blob - test/test_xattr.py
Run tests on file io streams as well
[pyxattr.git] / test / test_xattr.py
1 #
2 #
3
4 import sys
5 import tempfile
6 import os
7 import errno
8 import pytest
9 import pathlib
10 import platform
11 import io
12
13 import xattr
14 from xattr import NS_USER, XATTR_CREATE, XATTR_REPLACE
15
16 NAMESPACE = os.environ.get("NAMESPACE", NS_USER)
17
18 EMPTY_NS = bytes()
19
20 TEST_DIR = os.environ.get("TEST_DIR", ".")
21 TEST_IGNORE_XATTRS = os.environ.get("TEST_IGNORE_XATTRS", "")
22 if TEST_IGNORE_XATTRS == "":
23     TEST_IGNORE_XATTRS = []
24 else:
25     TEST_IGNORE_XATTRS = TEST_IGNORE_XATTRS.split(",")
26     # The following has to be a list comprehension, not a generator, to
27     # avoid weird consequences of lazy evaluation.
28     TEST_IGNORE_XATTRS.extend([a.encode() for a in TEST_IGNORE_XATTRS])
29
30 USER_NN = "test"
31 USER_ATTR = NAMESPACE.decode() + "." + USER_NN
32 USER_VAL = "abc"
33 EMPTY_VAL = ""
34 LARGE_VAL = "x" * 2048
35 MANYOPS_COUNT = 16384
36
37 USER_NN = USER_NN.encode()
38 USER_VAL = USER_VAL.encode()
39 USER_ATTR = USER_ATTR.encode()
40 EMPTY_VAL = EMPTY_VAL.encode()
41 LARGE_VAL = LARGE_VAL.encode()
42
43 # Helper functions
44
45 def ignore_tuples(attrs):
46     """Remove ignored attributes from the output of xattr.get_all."""
47     return [attr for attr in attrs
48             if attr[0] not in TEST_IGNORE_XATTRS]
49
50 def ignore(attrs):
51     """Remove ignored attributes from the output of xattr.list"""
52     return [attr for attr in attrs
53             if attr not in TEST_IGNORE_XATTRS]
54
55 def lists_equal(attrs, value):
56     """Helper to check list equivalence, skipping TEST_IGNORE_XATTRS."""
57     assert ignore(attrs) == value
58
59 def tuples_equal(attrs, value):
60     """Helper to check list equivalence, skipping TEST_IGNORE_XATTRS."""
61     assert ignore_tuples(attrs) == value
62
63 # Fixtures and helpers
64
65 @pytest.fixture
66 def testdir():
67     """per-test temp dir based in TEST_DIR"""
68     with tempfile.TemporaryDirectory(dir=TEST_DIR) as dname:
69         yield dname
70
71 def get_file(path):
72     fh, fname = tempfile.mkstemp(".test", "xattr-", path)
73     return fh, fname
74
75 def get_file_name(path):
76     fh, fname = get_file(path)
77     os.close(fh)
78     return fname
79
80 def get_file_fd(path):
81     return get_file(path)[0]
82
83 def get_file_object(path):
84     fd = get_file(path)[0]
85     return os.fdopen(fd)
86
87 def get_dir(path):
88     return tempfile.mkdtemp(".test", "xattr-", path)
89
90 def get_symlink(path, dangling=True):
91     """create a symlink"""
92     fh, fname = get_file(path)
93     os.close(fh)
94     if dangling:
95         os.unlink(fname)
96     sname = fname + ".symlink"
97     os.symlink(fname, sname)
98     return fname, sname
99
100 def get_valid_symlink(path):
101     return get_symlink(path, dangling=False)[1]
102
103 def get_dangling_symlink(path):
104     return get_symlink(path, dangling=True)[1]
105
106 def as_bytes(call):
107     def f(path):
108         return call(path).encode()
109     return f
110
111 def as_fspath(call):
112     def f(path):
113         return pathlib.PurePath(call(path))
114     return f
115
116 def as_iostream(call):
117     def f(path):
118         return io.open(call(path), "r")
119     return f
120
121 NOT_BEFORE_36 = pytest.mark.xfail(condition="sys.version_info < (3,6)",
122                                   strict=True)
123 NOT_PYPY = pytest.mark.xfail(condition="platform.python_implementation() == 'PyPy'",
124                                   strict=False)
125
126 # Note: user attributes are only allowed on files and directories, so
127 # we have to skip the symlinks here. See xattr(7).
128 ITEMS_P = [
129     (get_file_name, False),
130     (as_bytes(get_file_name), False),
131     pytest.param((as_fspath(get_file_name), False),
132                  marks=[NOT_BEFORE_36, NOT_PYPY]),
133     (get_file_fd, False),
134     (get_file_object, False),
135     (as_iostream(get_file_name), False),
136     (get_dir, False),
137     (as_bytes(get_dir), False),
138     pytest.param((as_fspath(get_dir), False),
139                  marks=[NOT_BEFORE_36, NOT_PYPY]),
140     (get_valid_symlink, False),
141     (as_bytes(get_valid_symlink), False),
142     pytest.param((as_fspath(get_valid_symlink), False),
143                  marks=[NOT_BEFORE_36, NOT_PYPY]),
144 ]
145
146 ITEMS_D = [
147     "file name",
148     "file name (bytes)",
149     "file name (path)",
150     "file FD",
151     "file object",
152     "file io stream",
153     "directory",
154     "directory (bytes)",
155     "directory (path)",
156     "file via symlink",
157     "file via symlink (bytes)",
158     "file via symlink (path)",
159 ]
160
161 ALL_ITEMS_P = ITEMS_P + [
162     (get_valid_symlink, True),
163     (as_bytes(get_valid_symlink), True),
164     (get_dangling_symlink, True),
165     (as_bytes(get_dangling_symlink), True),
166 ]
167
168 ALL_ITEMS_D = ITEMS_D + [
169     "valid symlink",
170     "valid symlink (bytes)",
171     "dangling symlink",
172     "dangling symlink (bytes)"
173 ]
174
175 @pytest.fixture(params=ITEMS_P, ids=ITEMS_D)
176 def subject(testdir, request):
177     return request.param[0](testdir), request.param[1]
178
179 @pytest.fixture(params=ALL_ITEMS_P, ids=ALL_ITEMS_D)
180 def any_subject(testdir, request):
181     return request.param[0](testdir), request.param[1]
182
183 @pytest.fixture(params=[True, False], ids=["with namespace", "no namespace"])
184 def use_ns(request):
185     return request.param
186
187 @pytest.fixture(params=[True, False], ids=["dangling", "valid"])
188 def use_dangling(request):
189     return request.param
190
191 ### Test functions
192
193 def test_empty_value(subject):
194     item, nofollow = subject
195     xattr.set(item, USER_ATTR, EMPTY_VAL, nofollow=nofollow)
196     assert xattr.get(item, USER_ATTR, nofollow=nofollow) == EMPTY_VAL
197
198 def test_large_value(subject):
199     item, nofollow = subject
200     xattr.set(item, USER_ATTR, LARGE_VAL)
201     assert xattr.get(item, USER_ATTR, nofollow=nofollow) == LARGE_VAL
202
203
204 def test_file_mixed_access_deprecated(testdir):
205     """test mixed access to file (deprecated functions)"""
206     fh, fname = get_file(testdir)
207     with os.fdopen(fh) as fo:
208         lists_equal(xattr.listxattr(fname), [])
209         xattr.setxattr(fname, USER_ATTR, USER_VAL)
210         lists_equal(xattr.listxattr(fh), [USER_ATTR])
211         assert xattr.getxattr(fo, USER_ATTR) == USER_VAL
212         tuples_equal(xattr.get_all(fo), [(USER_ATTR, USER_VAL)])
213         tuples_equal(xattr.get_all(fname),
214                      [(USER_ATTR, USER_VAL)])
215
216 def test_file_mixed_access(testdir):
217     """test mixed access to file"""
218     fh, fname = get_file(testdir)
219     with os.fdopen(fh) as fo:
220         lists_equal(xattr.list(fname), [])
221         xattr.set(fname, USER_ATTR, USER_VAL)
222         lists_equal(xattr.list(fh), [USER_ATTR])
223         assert xattr.list(fh, namespace=NAMESPACE) == [USER_NN]
224         assert xattr.get(fo, USER_ATTR) == USER_VAL
225         assert xattr.get(fo, USER_NN, namespace=NAMESPACE) == USER_VAL
226         tuples_equal(xattr.get_all(fo),
227                      [(USER_ATTR, USER_VAL)])
228         assert xattr.get_all(fo, namespace=NAMESPACE) == \
229             [(USER_NN, USER_VAL)]
230         tuples_equal(xattr.get_all(fname), [(USER_ATTR, USER_VAL)])
231         assert xattr.get_all(fname, namespace=NAMESPACE) == \
232             [(USER_NN, USER_VAL)]
233
234 def test_replace_on_missing(subject, use_ns):
235     item = subject[0]
236     lists_equal(xattr.list(item), [])
237     with pytest.raises(EnvironmentError):
238         if use_ns:
239             xattr.set(item, USER_NN, USER_VAL, flags=XATTR_REPLACE,
240                       namespace=NAMESPACE)
241         else:
242             xattr.set(item, USER_ATTR, USER_VAL, flags=XATTR_REPLACE)
243
244 def test_create_on_existing(subject, use_ns):
245     item = subject[0]
246     lists_equal(xattr.list(item), [])
247     if use_ns:
248         xattr.set(item, USER_NN, USER_VAL,
249                   namespace=NAMESPACE)
250     else:
251         xattr.set(item, USER_ATTR, USER_VAL)
252     with pytest.raises(EnvironmentError):
253         if use_ns:
254             xattr.set(item, USER_NN, USER_VAL,
255                       flags=XATTR_CREATE, namespace=NAMESPACE)
256         else:
257             xattr.set(item, USER_ATTR, USER_VAL, flags=XATTR_CREATE)
258
259 def test_remove_on_missing(any_subject, use_ns):
260     item, nofollow = any_subject
261     lists_equal(xattr.list(item, nofollow=nofollow), [])
262     with pytest.raises(EnvironmentError):
263         if use_ns:
264             xattr.remove(item, USER_NN, namespace=NAMESPACE,
265                          nofollow=nofollow)
266         else:
267             xattr.remove(item, USER_ATTR, nofollow=nofollow)
268
269 def test_set_get_remove(subject, use_ns):
270     item = subject[0]
271     lists_equal(xattr.list(item), [])
272     if use_ns:
273         xattr.set(item, USER_NN, USER_VAL,
274                   namespace=NAMESPACE)
275     else:
276         xattr.set(item, USER_ATTR, USER_VAL)
277     if use_ns:
278         assert xattr.list(item, namespace=NAMESPACE) == [USER_NN]
279     else:
280         lists_equal(xattr.list(item), [USER_ATTR])
281         lists_equal(xattr.list(item, namespace=EMPTY_NS),
282                     [USER_ATTR])
283     if use_ns:
284         assert xattr.get(item, USER_NN, namespace=NAMESPACE) == USER_VAL
285     else:
286         assert xattr.get(item, USER_ATTR) == USER_VAL
287     if use_ns:
288         assert xattr.get_all(item, namespace=NAMESPACE) == \
289             [(USER_NN, USER_VAL)]
290     else:
291         tuples_equal(xattr.get_all(item),
292                      [(USER_ATTR, USER_VAL)])
293     if use_ns:
294         xattr.remove(item, USER_NN, namespace=NAMESPACE)
295     else:
296         xattr.remove(item, USER_ATTR)
297     lists_equal(xattr.list(item), [])
298     tuples_equal(xattr.get_all(item), [])
299
300 def test_replace_on_missing_deprecated(subject):
301     item = subject[0]
302     lists_equal(xattr.listxattr(item), [])
303     with pytest.raises(EnvironmentError):
304         xattr.setxattr(item, USER_ATTR, USER_VAL, XATTR_REPLACE)
305
306 def test_create_on_existing_deprecated(subject):
307     item = subject[0]
308     lists_equal(xattr.listxattr(item), [])
309     xattr.setxattr(item, USER_ATTR, USER_VAL, 0)
310     with pytest.raises(EnvironmentError):
311         xattr.setxattr(item, USER_ATTR, USER_VAL, XATTR_CREATE)
312
313 def test_remove_on_missing_deprecated(any_subject):
314     """check deprecated list, set, get operations against an item"""
315     item, nofollow = any_subject
316     lists_equal(xattr.listxattr(item, nofollow), [])
317     with pytest.raises(EnvironmentError):
318         xattr.removexattr(item, USER_ATTR)
319
320 def test_set_get_remove_deprecated(subject):
321     """check deprecated list, set, get operations against an item"""
322     item = subject[0]
323     lists_equal(xattr.listxattr(item), [])
324     xattr.setxattr(item, USER_ATTR, USER_VAL, 0)
325     lists_equal(xattr.listxattr(item), [USER_ATTR])
326     assert xattr.getxattr(item, USER_ATTR) == USER_VAL
327     tuples_equal(xattr.get_all(item), [(USER_ATTR, USER_VAL)])
328     xattr.removexattr(item, USER_ATTR)
329     lists_equal(xattr.listxattr(item), [])
330     tuples_equal(xattr.get_all(item), [])
331
332 def test_many_ops(subject):
333     """test many ops"""
334     item = subject[0]
335     xattr.set(item, USER_ATTR, USER_VAL)
336     VL = [USER_ATTR]
337     VN = [USER_NN]
338     for i in range(MANYOPS_COUNT):
339         lists_equal(xattr.list(item), VL)
340         lists_equal(xattr.list(item, namespace=EMPTY_NS), VL)
341         assert xattr.list(item, namespace=NAMESPACE) == VN
342     for i in range(MANYOPS_COUNT):
343         assert xattr.get(item, USER_ATTR) == USER_VAL
344         assert xattr.get(item, USER_NN, namespace=NAMESPACE) == USER_VAL
345     for i in range(MANYOPS_COUNT):
346         tuples_equal(xattr.get_all(item),
347                      [(USER_ATTR, USER_VAL)])
348         assert xattr.get_all(item, namespace=NAMESPACE) == \
349             [(USER_NN, USER_VAL)]
350
351 def test_many_ops_deprecated(subject):
352     """test many ops (deprecated functions)"""
353     item = subject[0]
354     xattr.setxattr(item, USER_ATTR, USER_VAL)
355     VL = [USER_ATTR]
356     for i in range(MANYOPS_COUNT):
357         lists_equal(xattr.listxattr(item), VL)
358     for i in range(MANYOPS_COUNT):
359         assert xattr.getxattr(item, USER_ATTR) == USER_VAL
360     for i in range(MANYOPS_COUNT):
361         tuples_equal(xattr.get_all(item),
362                      [(USER_ATTR, USER_VAL)])
363
364 def test_no_attributes_deprecated(any_subject):
365     """test no attributes (deprecated functions)"""
366     item, nofollow = any_subject
367     lists_equal(xattr.listxattr(item, True), [])
368     tuples_equal(xattr.get_all(item, True), [])
369     with pytest.raises(EnvironmentError):
370         xattr.getxattr(item, USER_ATTR, True)
371
372 def test_no_attributes(any_subject):
373     """test no attributes"""
374     item, nofollow = any_subject
375     lists_equal(xattr.list(item, nofollow=nofollow), [])
376     assert xattr.list(item, nofollow=nofollow,
377                       namespace=NAMESPACE) == []
378     tuples_equal(xattr.get_all(item, nofollow=nofollow), [])
379     assert xattr.get_all(item, nofollow=nofollow,
380                          namespace=NAMESPACE) == []
381     with pytest.raises(EnvironmentError):
382         xattr.get(item, USER_NN, nofollow=nofollow,
383                   namespace=NAMESPACE)
384
385 def test_binary_payload_deprecated(subject):
386     """test binary values (deprecated functions)"""
387     item = subject[0]
388     BINVAL = b"abc\0def"
389     xattr.setxattr(item, USER_ATTR, BINVAL)
390     lists_equal(xattr.listxattr(item), [USER_ATTR])
391     assert xattr.getxattr(item, USER_ATTR) == BINVAL
392     tuples_equal(xattr.get_all(item), [(USER_ATTR, BINVAL)])
393     xattr.removexattr(item, USER_ATTR)
394
395 def test_binary_payload(subject):
396     """test binary values"""
397     item = subject[0]
398     BINVAL = b"abc\0def"
399     xattr.set(item, USER_ATTR, BINVAL)
400     lists_equal(xattr.list(item), [USER_ATTR])
401     assert xattr.list(item, namespace=NAMESPACE) == [USER_NN]
402     assert xattr.get(item, USER_ATTR) == BINVAL
403     assert xattr.get(item, USER_NN, namespace=NAMESPACE) == BINVAL
404     tuples_equal(xattr.get_all(item), [(USER_ATTR, BINVAL)])
405     assert xattr.get_all(item, namespace=NAMESPACE) == [(USER_NN, BINVAL)]
406     xattr.remove(item, USER_ATTR)
407
408 def test_symlinks_user_fail(testdir, use_dangling):
409     _, sname = get_symlink(testdir, dangling=use_dangling)
410     with pytest.raises(IOError):
411         xattr.set(sname, USER_ATTR, USER_VAL, nofollow=True)
412     with pytest.raises(IOError):
413         xattr.set(sname, USER_NN, USER_VAL, namespace=NAMESPACE,
414                   nofollow=True)
415     with pytest.raises(IOError):
416         xattr.setxattr(sname, USER_ATTR, USER_VAL, XATTR_CREATE, True)
417
418 @pytest.mark.parametrize(
419     "call, args", [(xattr.get, [USER_ATTR]),
420                    (xattr.list, []),
421                    (xattr.remove, [USER_ATTR]),
422                    (xattr.get, [USER_ATTR]),
423                    (xattr.set, [USER_ATTR, USER_VAL])])
424 def test_none_namespace(testdir, call, args):
425     # Don't want to use subject, since that would prevent xfail test
426     # on path objects (due to hiding the exception here).
427     f = get_file_name(testdir)
428     with pytest.raises(TypeError):
429         call(f, *args, namespace=None)
430     fd = get_file_fd(testdir)
431     with pytest.raises(TypeError):
432         call(fd, *args, namespace=None)
433
434 @pytest.mark.parametrize(
435     "call",
436     [xattr.get, xattr.list, xattr.listxattr,
437      xattr.remove, xattr.removexattr,
438      xattr.set, xattr.setxattr,
439      xattr.get, xattr.getxattr])
440 def test_wrong_call(call):
441     with pytest.raises(TypeError):
442         call()
443
444 @pytest.mark.parametrize(
445     "call, args", [(xattr.get, [USER_ATTR]),
446                    (xattr.listxattr, []),
447                    (xattr.list, []),
448                    (xattr.remove, [USER_ATTR]),
449                    (xattr.removexattr, [USER_ATTR]),
450                    (xattr.get, [USER_ATTR]),
451                    (xattr.getxattr, [USER_ATTR]),
452                    (xattr.set, [USER_ATTR, USER_VAL]),
453                    (xattr.setxattr, [USER_ATTR, USER_VAL])])
454 def test_wrong_argument_type(call, args):
455     with pytest.raises(TypeError):
456         call(object(), *args)