]> git.k1024.org Git - pyxattr.git/blob - tests/test_xattr.py
Release version 0.7.2
[pyxattr.git] / tests / test_xattr.py
1 #
2 #
3
4 import sys
5 import tempfile
6 import os
7 import errno
8 import pytest
9 import pathlib
10 import platform
11 import io
12 import contextlib
13
14 import xattr
15 from xattr import NS_USER, XATTR_CREATE, XATTR_REPLACE
16
17 NAMESPACE = os.environ.get("NAMESPACE", NS_USER)
18
19 EMPTY_NS = bytes()
20
21 TEST_DIR = os.environ.get("TEST_DIR", ".")
22 TEST_IGNORE_XATTRS = os.environ.get("TEST_IGNORE_XATTRS", "")
23 if TEST_IGNORE_XATTRS == "":
24     TEST_IGNORE_XATTRS = []
25 else:
26     TEST_IGNORE_XATTRS = TEST_IGNORE_XATTRS.split(",")
27     # The following has to be a list comprehension, not a generator, to
28     # avoid weird consequences of lazy evaluation.
29     TEST_IGNORE_XATTRS.extend([a.encode() for a in TEST_IGNORE_XATTRS])
30
31 USER_NN = "test"
32 USER_ATTR = NAMESPACE.decode() + "." + USER_NN
33 USER_VAL = "abc"
34 EMPTY_VAL = ""
35 LARGE_VAL = "x" * 2048
36 MANYOPS_COUNT = 16384
37
38 USER_NN = USER_NN.encode()
39 USER_VAL = USER_VAL.encode()
40 USER_ATTR = USER_ATTR.encode()
41 EMPTY_VAL = EMPTY_VAL.encode()
42 LARGE_VAL = LARGE_VAL.encode()
43
44 # Helper functions
45
46 def ignore_tuples(attrs):
47     """Remove ignored attributes from the output of xattr.get_all."""
48     return [attr for attr in attrs
49             if attr[0] not in TEST_IGNORE_XATTRS]
50
51 def ignore(attrs):
52     """Remove ignored attributes from the output of xattr.list"""
53     return [attr for attr in attrs
54             if attr not in TEST_IGNORE_XATTRS]
55
56 def lists_equal(attrs, value):
57     """Helper to check list equivalence, skipping TEST_IGNORE_XATTRS."""
58     assert ignore(attrs) == value
59
60 def tuples_equal(attrs, value):
61     """Helper to check list equivalence, skipping TEST_IGNORE_XATTRS."""
62     assert ignore_tuples(attrs) == value
63
64 # Fixtures and helpers
65
66 @pytest.fixture
67 def testdir():
68     """per-test temp dir based in TEST_DIR"""
69     with tempfile.TemporaryDirectory(dir=TEST_DIR) as dname:
70         yield dname
71
72 def get_file(path):
73     fh, fname = tempfile.mkstemp(".test", "xattr-", path)
74     return fh, fname
75
76 @contextlib.contextmanager
77 def get_file_name(path):
78     fh, fname = get_file(path)
79     os.close(fh)
80     yield fname
81
82 @contextlib.contextmanager
83 def get_file_fd(path):
84     fd = get_file(path)[0]
85     yield fd
86     os.close(fd)
87
88 @contextlib.contextmanager
89 def get_file_object(path):
90     fd = get_file(path)[0]
91     with os.fdopen(fd) as f:
92         yield f
93
94 @contextlib.contextmanager
95 def get_dir(path):
96     yield tempfile.mkdtemp(".test", "xattr-", path)
97
98 def get_symlink(path, dangling=True):
99     """create a symlink"""
100     fh, fname = get_file(path)
101     os.close(fh)
102     if dangling:
103         os.unlink(fname)
104     sname = fname + ".symlink"
105     os.symlink(fname, sname)
106     return fname, sname
107
108 @contextlib.contextmanager
109 def get_valid_symlink(path):
110     yield get_symlink(path, dangling=False)[1]
111
112 @contextlib.contextmanager
113 def get_dangling_symlink(path):
114     yield get_symlink(path, dangling=True)[1]
115
116 @contextlib.contextmanager
117 def get_file_and_symlink(path):
118     yield get_symlink(path, dangling=False)
119
120 @contextlib.contextmanager
121 def get_file_and_fobject(path):
122     fh, fname = get_file(path)
123     with os.fdopen(fh) as fo:
124         yield fname, fo
125
126 # Wrappers that build upon existing values
127
128 def as_wrapper(call, fn, closer=None):
129     @contextlib.contextmanager
130     def f(path):
131         with call(path) as r:
132             val = fn(r)
133             yield val
134             if closer is not None:
135                 closer(val)
136     return f
137
138 def as_bytes(call):
139     return as_wrapper(call, lambda r: r.encode())
140
141 def as_fspath(call):
142     return as_wrapper(call, pathlib.PurePath)
143
144 def as_iostream(call):
145     opener = lambda f: io.open(f, "r")
146     closer = lambda r: r.close()
147     return as_wrapper(call, opener, closer)
148
149 NOT_BEFORE_36 = pytest.mark.xfail(condition="sys.version_info < (3,6)",
150                                   strict=True)
151 NOT_PYPY = pytest.mark.xfail(condition="platform.python_implementation() == 'PyPy'",
152                                   strict=False)
153
154 # Note: user attributes are only allowed on files and directories, so
155 # we have to skip the symlinks here. See xattr(7).
156 ITEMS_P = [
157     (get_file_name, False),
158     (as_bytes(get_file_name), False),
159     pytest.param((as_fspath(get_file_name), False),
160                  marks=[NOT_BEFORE_36, NOT_PYPY]),
161     (get_file_fd, False),
162     (get_file_object, False),
163     (as_iostream(get_file_name), False),
164     (get_dir, False),
165     (as_bytes(get_dir), False),
166     pytest.param((as_fspath(get_dir), False),
167                  marks=[NOT_BEFORE_36, NOT_PYPY]),
168     (get_valid_symlink, False),
169     (as_bytes(get_valid_symlink), False),
170     pytest.param((as_fspath(get_valid_symlink), False),
171                  marks=[NOT_BEFORE_36, NOT_PYPY]),
172 ]
173
174 ITEMS_D = [
175     "file name",
176     "file name (bytes)",
177     "file name (path)",
178     "file FD",
179     "file object",
180     "file io stream",
181     "directory",
182     "directory (bytes)",
183     "directory (path)",
184     "file via symlink",
185     "file via symlink (bytes)",
186     "file via symlink (path)",
187 ]
188
189 ALL_ITEMS_P = ITEMS_P + [
190     (get_valid_symlink, True),
191     (as_bytes(get_valid_symlink), True),
192     (get_dangling_symlink, True),
193     (as_bytes(get_dangling_symlink), True),
194 ]
195
196 ALL_ITEMS_D = ITEMS_D + [
197     "valid symlink",
198     "valid symlink (bytes)",
199     "dangling symlink",
200     "dangling symlink (bytes)"
201 ]
202
203 @pytest.fixture(params=ITEMS_P, ids=ITEMS_D)
204 def subject(testdir, request):
205     with request.param[0](testdir) as value:
206         yield value, request.param[1]
207
208 @pytest.fixture(params=ALL_ITEMS_P, ids=ALL_ITEMS_D)
209 def any_subject(testdir, request):
210     with request.param[0](testdir) as value:
211         yield value, request.param[1]
212
213 @pytest.fixture(params=[True, False], ids=["with namespace", "no namespace"])
214 def use_ns(request):
215     return request.param
216
217 @pytest.fixture(params=[True, False], ids=["dangling", "valid"])
218 def use_dangling(request):
219     return request.param
220
221 ### Test functions
222
223 def test_empty_value(subject):
224     item, nofollow = subject
225     xattr.set(item, USER_ATTR, EMPTY_VAL, nofollow=nofollow)
226     assert xattr.get(item, USER_ATTR, nofollow=nofollow) == EMPTY_VAL
227
228 def test_large_value(subject):
229     item, nofollow = subject
230     xattr.set(item, USER_ATTR, LARGE_VAL)
231     assert xattr.get(item, USER_ATTR, nofollow=nofollow) == LARGE_VAL
232
233 @pytest.mark.parametrize(
234     "gen", [ get_file_and_symlink, get_file_and_fobject ])
235 def test_mixed_access(testdir, gen):
236     """test mixed access to file"""
237     with gen(testdir) as (a, b):
238         # Check empty
239         lists_equal(xattr.list(a), [])
240         lists_equal(xattr.listxattr(b), [])
241
242         # Check value
243         xattr.set(a, USER_ATTR, USER_VAL)
244         for i in [a, b]:
245             # Deprecated functions
246             lists_equal(xattr.listxattr(i), [USER_ATTR])
247             assert xattr.getxattr(i, USER_ATTR) == USER_VAL
248             tuples_equal(xattr.get_all(i), [(USER_ATTR, USER_VAL)])
249             # Current functions
250             lists_equal(xattr.list(i), [USER_ATTR])
251             assert xattr.list(i, namespace=NAMESPACE) == [USER_NN]
252             assert xattr.get(i, USER_ATTR) == USER_VAL
253             assert xattr.get(i, USER_NN, namespace=NAMESPACE) == USER_VAL
254             tuples_equal(xattr.get_all(i),
255                          [(USER_ATTR, USER_VAL)])
256             assert xattr.get_all(i, namespace=NAMESPACE) == \
257                 [(USER_NN, USER_VAL)]
258
259         # Overwrite
260         xattr.set(b, USER_ATTR, LARGE_VAL, flags=xattr.XATTR_REPLACE)
261         assert xattr.get(a, USER_ATTR) == LARGE_VAL
262         assert xattr.getxattr(a, USER_ATTR) == LARGE_VAL
263         xattr.removexattr(b, USER_ATTR)
264         assert xattr.get_all(a, namespace=NAMESPACE) == []
265         assert xattr.get_all(b, namespace=NAMESPACE) == []
266
267 def test_replace_on_missing(subject, use_ns):
268     item = subject[0]
269     lists_equal(xattr.list(item), [])
270     with pytest.raises(EnvironmentError):
271         if use_ns:
272             xattr.set(item, USER_NN, USER_VAL, flags=XATTR_REPLACE,
273                       namespace=NAMESPACE)
274         else:
275             xattr.set(item, USER_ATTR, USER_VAL, flags=XATTR_REPLACE)
276
277 def test_create_on_existing(subject, use_ns):
278     item = subject[0]
279     lists_equal(xattr.list(item), [])
280     if use_ns:
281         xattr.set(item, USER_NN, USER_VAL,
282                   namespace=NAMESPACE)
283     else:
284         xattr.set(item, USER_ATTR, USER_VAL)
285     with pytest.raises(EnvironmentError):
286         if use_ns:
287             xattr.set(item, USER_NN, USER_VAL,
288                       flags=XATTR_CREATE, namespace=NAMESPACE)
289         else:
290             xattr.set(item, USER_ATTR, USER_VAL, flags=XATTR_CREATE)
291
292 def test_remove_on_missing(any_subject, use_ns):
293     item, nofollow = any_subject
294     lists_equal(xattr.list(item, nofollow=nofollow), [])
295     with pytest.raises(EnvironmentError):
296         if use_ns:
297             xattr.remove(item, USER_NN, namespace=NAMESPACE,
298                          nofollow=nofollow)
299         else:
300             xattr.remove(item, USER_ATTR, nofollow=nofollow)
301
302 def test_set_get_remove(subject, use_ns):
303     item = subject[0]
304     lists_equal(xattr.list(item), [])
305     if use_ns:
306         xattr.set(item, USER_NN, USER_VAL,
307                   namespace=NAMESPACE)
308     else:
309         xattr.set(item, USER_ATTR, USER_VAL)
310     if use_ns:
311         assert xattr.list(item, namespace=NAMESPACE) == [USER_NN]
312     else:
313         lists_equal(xattr.list(item), [USER_ATTR])
314         lists_equal(xattr.list(item, namespace=EMPTY_NS),
315                     [USER_ATTR])
316     if use_ns:
317         assert xattr.get(item, USER_NN, namespace=NAMESPACE) == USER_VAL
318     else:
319         assert xattr.get(item, USER_ATTR) == USER_VAL
320     if use_ns:
321         assert xattr.get_all(item, namespace=NAMESPACE) == \
322             [(USER_NN, USER_VAL)]
323     else:
324         tuples_equal(xattr.get_all(item),
325                      [(USER_ATTR, USER_VAL)])
326     if use_ns:
327         xattr.remove(item, USER_NN, namespace=NAMESPACE)
328     else:
329         xattr.remove(item, USER_ATTR)
330     lists_equal(xattr.list(item), [])
331     tuples_equal(xattr.get_all(item), [])
332
333 def test_replace_on_missing_deprecated(subject):
334     item = subject[0]
335     lists_equal(xattr.listxattr(item), [])
336     with pytest.raises(EnvironmentError):
337         xattr.setxattr(item, USER_ATTR, USER_VAL, XATTR_REPLACE)
338
339 def test_create_on_existing_deprecated(subject):
340     item = subject[0]
341     lists_equal(xattr.listxattr(item), [])
342     xattr.setxattr(item, USER_ATTR, USER_VAL, 0)
343     with pytest.raises(EnvironmentError):
344         xattr.setxattr(item, USER_ATTR, USER_VAL, XATTR_CREATE)
345
346 def test_remove_on_missing_deprecated(any_subject):
347     """check deprecated list, set, get operations against an item"""
348     item, nofollow = any_subject
349     lists_equal(xattr.listxattr(item, nofollow), [])
350     with pytest.raises(EnvironmentError):
351         xattr.removexattr(item, USER_ATTR)
352
353 def test_set_get_remove_deprecated(subject):
354     """check deprecated list, set, get operations against an item"""
355     item = subject[0]
356     lists_equal(xattr.listxattr(item), [])
357     xattr.setxattr(item, USER_ATTR, USER_VAL, 0)
358     lists_equal(xattr.listxattr(item), [USER_ATTR])
359     assert xattr.getxattr(item, USER_ATTR) == USER_VAL
360     tuples_equal(xattr.get_all(item), [(USER_ATTR, USER_VAL)])
361     xattr.removexattr(item, USER_ATTR)
362     lists_equal(xattr.listxattr(item), [])
363     tuples_equal(xattr.get_all(item), [])
364
365 def test_many_ops(subject):
366     """test many ops"""
367     item = subject[0]
368     xattr.set(item, USER_ATTR, USER_VAL)
369     VL = [USER_ATTR]
370     VN = [USER_NN]
371     for i in range(MANYOPS_COUNT):
372         lists_equal(xattr.list(item), VL)
373         lists_equal(xattr.list(item, namespace=EMPTY_NS), VL)
374         assert xattr.list(item, namespace=NAMESPACE) == VN
375     for i in range(MANYOPS_COUNT):
376         assert xattr.get(item, USER_ATTR) == USER_VAL
377         assert xattr.get(item, USER_NN, namespace=NAMESPACE) == USER_VAL
378     for i in range(MANYOPS_COUNT):
379         tuples_equal(xattr.get_all(item),
380                      [(USER_ATTR, USER_VAL)])
381         assert xattr.get_all(item, namespace=NAMESPACE) == \
382             [(USER_NN, USER_VAL)]
383
384 def test_many_ops_deprecated(subject):
385     """test many ops (deprecated functions)"""
386     item = subject[0]
387     xattr.setxattr(item, USER_ATTR, USER_VAL)
388     VL = [USER_ATTR]
389     for i in range(MANYOPS_COUNT):
390         lists_equal(xattr.listxattr(item), VL)
391     for i in range(MANYOPS_COUNT):
392         assert xattr.getxattr(item, USER_ATTR) == USER_VAL
393     for i in range(MANYOPS_COUNT):
394         tuples_equal(xattr.get_all(item),
395                      [(USER_ATTR, USER_VAL)])
396
397 def test_no_attributes_deprecated(any_subject):
398     """test no attributes (deprecated functions)"""
399     item, nofollow = any_subject
400     lists_equal(xattr.listxattr(item, True), [])
401     tuples_equal(xattr.get_all(item, True), [])
402     with pytest.raises(EnvironmentError):
403         xattr.getxattr(item, USER_ATTR, True)
404
405 def test_no_attributes(any_subject):
406     """test no attributes"""
407     item, nofollow = any_subject
408     lists_equal(xattr.list(item, nofollow=nofollow), [])
409     assert xattr.list(item, nofollow=nofollow,
410                       namespace=NAMESPACE) == []
411     tuples_equal(xattr.get_all(item, nofollow=nofollow), [])
412     assert xattr.get_all(item, nofollow=nofollow,
413                          namespace=NAMESPACE) == []
414     with pytest.raises(EnvironmentError):
415         xattr.get(item, USER_NN, nofollow=nofollow,
416                   namespace=NAMESPACE)
417
418 def test_binary_payload_deprecated(subject):
419     """test binary values (deprecated functions)"""
420     item = subject[0]
421     BINVAL = b"abc\0def"
422     xattr.setxattr(item, USER_ATTR, BINVAL)
423     lists_equal(xattr.listxattr(item), [USER_ATTR])
424     assert xattr.getxattr(item, USER_ATTR) == BINVAL
425     tuples_equal(xattr.get_all(item), [(USER_ATTR, BINVAL)])
426     xattr.removexattr(item, USER_ATTR)
427
428 def test_binary_payload(subject):
429     """test binary values"""
430     item = subject[0]
431     BINVAL = b"abc\0def"
432     xattr.set(item, USER_ATTR, BINVAL)
433     lists_equal(xattr.list(item), [USER_ATTR])
434     assert xattr.list(item, namespace=NAMESPACE) == [USER_NN]
435     assert xattr.get(item, USER_ATTR) == BINVAL
436     assert xattr.get(item, USER_NN, namespace=NAMESPACE) == BINVAL
437     tuples_equal(xattr.get_all(item), [(USER_ATTR, BINVAL)])
438     assert xattr.get_all(item, namespace=NAMESPACE) == [(USER_NN, BINVAL)]
439     xattr.remove(item, USER_ATTR)
440
441 def test_symlinks_user_fail(testdir, use_dangling):
442     _, sname = get_symlink(testdir, dangling=use_dangling)
443     with pytest.raises(IOError):
444         xattr.set(sname, USER_ATTR, USER_VAL, nofollow=True)
445     with pytest.raises(IOError):
446         xattr.set(sname, USER_NN, USER_VAL, namespace=NAMESPACE,
447                   nofollow=True)
448     with pytest.raises(IOError):
449         xattr.setxattr(sname, USER_ATTR, USER_VAL, XATTR_CREATE, True)
450
451 @pytest.mark.parametrize(
452     "call, args", [(xattr.get, [USER_ATTR]),
453                    (xattr.list, []),
454                    (xattr.remove, [USER_ATTR]),
455                    (xattr.get, [USER_ATTR]),
456                    (xattr.set, [USER_ATTR, USER_VAL])])
457 def test_none_namespace(testdir, call, args):
458     # Don't want to use subject, since that would prevent xfail test
459     # on path objects (due to hiding the exception here).
460     f = get_file_name(testdir)
461     with pytest.raises(TypeError):
462         call(f, *args, namespace=None)
463     fd = get_file_fd(testdir)
464     with pytest.raises(TypeError):
465         call(fd, *args, namespace=None)
466
467 @pytest.mark.parametrize(
468     "call",
469     [xattr.get, xattr.list, xattr.listxattr,
470      xattr.remove, xattr.removexattr,
471      xattr.set, xattr.setxattr,
472      xattr.get, xattr.getxattr])
473 def test_wrong_call(call):
474     with pytest.raises(TypeError):
475         call()
476
477 @pytest.mark.parametrize(
478     "call, args", [(xattr.get, [USER_ATTR]),
479                    (xattr.listxattr, []),
480                    (xattr.list, []),
481                    (xattr.remove, [USER_ATTR]),
482                    (xattr.removexattr, [USER_ATTR]),
483                    (xattr.get, [USER_ATTR]),
484                    (xattr.getxattr, [USER_ATTR]),
485                    (xattr.set, [USER_ATTR, USER_VAL]),
486                    (xattr.setxattr, [USER_ATTR, USER_VAL])])
487 def test_wrong_argument_type(call, args):
488     with pytest.raises(TypeError):
489         call(object(), *args)