]> git.k1024.org Git - pyxattr.git/blob - tests/test_xattr.py
Expand python versions in CI action
[pyxattr.git] / tests / test_xattr.py
1 #
2 #
3
4 import sys
5 import tempfile
6 import os
7 import errno
8 import pytest
9 import pathlib
10 import platform
11 import io
12 import contextlib
13
14 import xattr
15 from xattr import NS_USER, XATTR_CREATE, XATTR_REPLACE
16
17 NAMESPACE = os.environ.get("NAMESPACE", NS_USER)
18
19 EMPTY_NS = bytes()
20
21 TEST_DIR = os.environ.get("TEST_DIR", ".")
22 TEST_IGNORE_XATTRS = os.environ.get("TEST_IGNORE_XATTRS", "")
23 if TEST_IGNORE_XATTRS == "":
24     TEST_IGNORE_XATTRS = []
25 else:
26     TEST_IGNORE_XATTRS = TEST_IGNORE_XATTRS.split(",")
27     # The following has to be a list comprehension, not a generator, to
28     # avoid weird consequences of lazy evaluation.
29     TEST_IGNORE_XATTRS.extend([a.encode() for a in TEST_IGNORE_XATTRS])
30
31 USER_NN = "test"
32 USER_ATTR = NAMESPACE.decode() + "." + USER_NN
33 USER_VAL = "abc"
34 EMPTY_VAL = ""
35 LARGE_VAL = "x" * 2048
36 MANYOPS_COUNT = 16384
37
38 USER_NN = USER_NN.encode()
39 USER_VAL = USER_VAL.encode()
40 USER_ATTR = USER_ATTR.encode()
41 EMPTY_VAL = EMPTY_VAL.encode()
42 LARGE_VAL = LARGE_VAL.encode()
43
44 # Helper functions
45
46 def ignore_tuples(attrs):
47     """Remove ignored attributes from the output of xattr.get_all."""
48     return [attr for attr in attrs
49             if attr[0] not in TEST_IGNORE_XATTRS]
50
51 def ignore(attrs):
52     """Remove ignored attributes from the output of xattr.list"""
53     return [attr for attr in attrs
54             if attr not in TEST_IGNORE_XATTRS]
55
56 def lists_equal(attrs, value):
57     """Helper to check list equivalence, skipping TEST_IGNORE_XATTRS."""
58     assert ignore(attrs) == value
59
60 def tuples_equal(attrs, value):
61     """Helper to check list equivalence, skipping TEST_IGNORE_XATTRS."""
62     assert ignore_tuples(attrs) == value
63
64 # Fixtures and helpers
65
66 @pytest.fixture
67 def testdir():
68     """per-test temp dir based in TEST_DIR"""
69     with tempfile.TemporaryDirectory(dir=TEST_DIR) as dname:
70         yield dname
71
72 def get_file(path):
73     fh, fname = tempfile.mkstemp(".test", "xattr-", path)
74     return fh, fname
75
76 @contextlib.contextmanager
77 def get_file_name(path):
78     fh, fname = get_file(path)
79     os.close(fh)
80     yield fname
81
82 @contextlib.contextmanager
83 def get_file_fd(path):
84     fd = get_file(path)[0]
85     yield fd
86     os.close(fd)
87
88 @contextlib.contextmanager
89 def get_file_object(path):
90     fd = get_file(path)[0]
91     with os.fdopen(fd) as f:
92         yield f
93
94 @contextlib.contextmanager
95 def get_dir(path):
96     yield tempfile.mkdtemp(".test", "xattr-", path)
97
98 def get_symlink(path, dangling=True):
99     """create a symlink"""
100     fh, fname = get_file(path)
101     os.close(fh)
102     if dangling:
103         os.unlink(fname)
104     sname = fname + ".symlink"
105     os.symlink(fname, sname)
106     return fname, sname
107
108 @contextlib.contextmanager
109 def get_valid_symlink(path):
110     yield get_symlink(path, dangling=False)[1]
111
112 @contextlib.contextmanager
113 def get_dangling_symlink(path):
114     yield get_symlink(path, dangling=True)[1]
115
116 @contextlib.contextmanager
117 def get_file_and_symlink(path):
118     yield get_symlink(path, dangling=False)
119
120 @contextlib.contextmanager
121 def get_file_and_fobject(path):
122     fh, fname = get_file(path)
123     with os.fdopen(fh) as fo:
124         yield fname, fo
125
126 # Wrappers that build upon existing values
127
128 def as_wrapper(call, fn, closer=None):
129     @contextlib.contextmanager
130     def f(path):
131         with call(path) as r:
132             val = fn(r)
133             yield val
134             if closer is not None:
135                 closer(val)
136     return f
137
138 def as_bytes(call):
139     return as_wrapper(call, lambda r: r.encode())
140
141 def as_fspath(call):
142     return as_wrapper(call, pathlib.PurePath)
143
144 def as_iostream(call):
145     opener = lambda f: io.open(f, "r")
146     closer = lambda r: r.close()
147     return as_wrapper(call, opener, closer)
148
149 NOT_BEFORE_36 = pytest.mark.xfail(condition="sys.version_info < (3,6)",
150                                   strict=True)
151 NOT_PYPY = pytest.mark.xfail(condition="platform.python_implementation() == 'PyPy'",
152                                   strict=False)
153
154 NOT_MACOSX = pytest.mark.xfail(condition="sys.platform.startswith('darwin')",
155                                reason="Test not supported on MacOS",
156                                strict=True)
157
158 # Note: user attributes are only allowed on files and directories, so
159 # we have to skip the symlinks here. See xattr(7).
160 ITEMS_P = [
161     (get_file_name, False),
162     (as_bytes(get_file_name), False),
163     pytest.param((as_fspath(get_file_name), False),
164                  marks=[NOT_BEFORE_36, NOT_PYPY]),
165     (get_file_fd, False),
166     (get_file_object, False),
167     (as_iostream(get_file_name), False),
168     (get_dir, False),
169     (as_bytes(get_dir), False),
170     pytest.param((as_fspath(get_dir), False),
171                  marks=[NOT_BEFORE_36, NOT_PYPY]),
172     (get_valid_symlink, False),
173     (as_bytes(get_valid_symlink), False),
174     pytest.param((as_fspath(get_valid_symlink), False),
175                  marks=[NOT_BEFORE_36, NOT_PYPY]),
176 ]
177
178 ITEMS_D = [
179     "file name",
180     "file name (bytes)",
181     "file name (path)",
182     "file FD",
183     "file object",
184     "file io stream",
185     "directory",
186     "directory (bytes)",
187     "directory (path)",
188     "file via symlink",
189     "file via symlink (bytes)",
190     "file via symlink (path)",
191 ]
192
193 ALL_ITEMS_P = ITEMS_P + [
194     (get_valid_symlink, True),
195     (as_bytes(get_valid_symlink), True),
196     (get_dangling_symlink, True),
197     (as_bytes(get_dangling_symlink), True),
198 ]
199
200 ALL_ITEMS_D = ITEMS_D + [
201     "valid symlink",
202     "valid symlink (bytes)",
203     "dangling symlink",
204     "dangling symlink (bytes)"
205 ]
206
207 @pytest.fixture(params=ITEMS_P, ids=ITEMS_D)
208 def subject(testdir, request):
209     with request.param[0](testdir) as value:
210         yield value, request.param[1]
211
212 @pytest.fixture(params=ALL_ITEMS_P, ids=ALL_ITEMS_D)
213 def any_subject(testdir, request):
214     with request.param[0](testdir) as value:
215         yield value, request.param[1]
216
217 @pytest.fixture(params=[True, False], ids=["with namespace", "no namespace"])
218 def use_ns(request):
219     return request.param
220
221 @pytest.fixture(params=[True, False], ids=["dangling", "valid"])
222 def use_dangling(request):
223     return request.param
224
225 ### Test functions
226
227 def test_empty_value(subject):
228     item, nofollow = subject
229     xattr.set(item, USER_ATTR, EMPTY_VAL, nofollow=nofollow)
230     assert xattr.get(item, USER_ATTR, nofollow=nofollow) == EMPTY_VAL
231
232 def test_large_value(subject):
233     item, nofollow = subject
234     xattr.set(item, USER_ATTR, LARGE_VAL)
235     assert xattr.get(item, USER_ATTR, nofollow=nofollow) == LARGE_VAL
236
237 @pytest.mark.parametrize(
238     "gen", [ get_file_and_symlink, get_file_and_fobject ])
239 def test_mixed_access(testdir, gen):
240     """test mixed access to file"""
241     with gen(testdir) as (a, b):
242         # Check empty
243         lists_equal(xattr.list(a), [])
244         lists_equal(xattr.listxattr(b), [])
245
246         # Check value
247         xattr.set(a, USER_ATTR, USER_VAL)
248         for i in [a, b]:
249             # Deprecated functions
250             lists_equal(xattr.listxattr(i), [USER_ATTR])
251             assert xattr.getxattr(i, USER_ATTR) == USER_VAL
252             tuples_equal(xattr.get_all(i), [(USER_ATTR, USER_VAL)])
253             # Current functions
254             lists_equal(xattr.list(i), [USER_ATTR])
255             assert xattr.list(i, namespace=NAMESPACE) == [USER_NN]
256             assert xattr.get(i, USER_ATTR) == USER_VAL
257             assert xattr.get(i, USER_NN, namespace=NAMESPACE) == USER_VAL
258             tuples_equal(xattr.get_all(i),
259                          [(USER_ATTR, USER_VAL)])
260             assert xattr.get_all(i, namespace=NAMESPACE) == \
261                 [(USER_NN, USER_VAL)]
262
263         # Overwrite
264         xattr.set(b, USER_ATTR, LARGE_VAL, flags=xattr.XATTR_REPLACE)
265         assert xattr.get(a, USER_ATTR) == LARGE_VAL
266         assert xattr.getxattr(a, USER_ATTR) == LARGE_VAL
267         xattr.removexattr(b, USER_ATTR)
268         assert xattr.get_all(a, namespace=NAMESPACE) == []
269         assert xattr.get_all(b, namespace=NAMESPACE) == []
270
271 def test_replace_on_missing(subject, use_ns):
272     item = subject[0]
273     lists_equal(xattr.list(item), [])
274     with pytest.raises(EnvironmentError):
275         if use_ns:
276             xattr.set(item, USER_NN, USER_VAL, flags=XATTR_REPLACE,
277                       namespace=NAMESPACE)
278         else:
279             xattr.set(item, USER_ATTR, USER_VAL, flags=XATTR_REPLACE)
280
281 def test_create_on_existing(subject, use_ns):
282     item = subject[0]
283     lists_equal(xattr.list(item), [])
284     if use_ns:
285         xattr.set(item, USER_NN, USER_VAL,
286                   namespace=NAMESPACE)
287     else:
288         xattr.set(item, USER_ATTR, USER_VAL)
289     with pytest.raises(EnvironmentError):
290         if use_ns:
291             xattr.set(item, USER_NN, USER_VAL,
292                       flags=XATTR_CREATE, namespace=NAMESPACE)
293         else:
294             xattr.set(item, USER_ATTR, USER_VAL, flags=XATTR_CREATE)
295
296 def test_remove_on_missing(any_subject, use_ns):
297     item, nofollow = any_subject
298     lists_equal(xattr.list(item, nofollow=nofollow), [])
299     with pytest.raises(EnvironmentError):
300         if use_ns:
301             xattr.remove(item, USER_NN, namespace=NAMESPACE,
302                          nofollow=nofollow)
303         else:
304             xattr.remove(item, USER_ATTR, nofollow=nofollow)
305
306 def test_set_get_remove(subject, use_ns):
307     item = subject[0]
308     lists_equal(xattr.list(item), [])
309     if use_ns:
310         xattr.set(item, USER_NN, USER_VAL,
311                   namespace=NAMESPACE)
312     else:
313         xattr.set(item, USER_ATTR, USER_VAL)
314     if use_ns:
315         assert xattr.list(item, namespace=NAMESPACE) == [USER_NN]
316     else:
317         lists_equal(xattr.list(item), [USER_ATTR])
318         lists_equal(xattr.list(item, namespace=EMPTY_NS),
319                     [USER_ATTR])
320     if use_ns:
321         assert xattr.get(item, USER_NN, namespace=NAMESPACE) == USER_VAL
322     else:
323         assert xattr.get(item, USER_ATTR) == USER_VAL
324     if use_ns:
325         assert xattr.get_all(item, namespace=NAMESPACE) == \
326             [(USER_NN, USER_VAL)]
327     else:
328         tuples_equal(xattr.get_all(item),
329                      [(USER_ATTR, USER_VAL)])
330     if use_ns:
331         xattr.remove(item, USER_NN, namespace=NAMESPACE)
332     else:
333         xattr.remove(item, USER_ATTR)
334     lists_equal(xattr.list(item), [])
335     tuples_equal(xattr.get_all(item), [])
336
337 def test_replace_on_missing_deprecated(subject):
338     item = subject[0]
339     lists_equal(xattr.listxattr(item), [])
340     with pytest.raises(EnvironmentError):
341         xattr.setxattr(item, USER_ATTR, USER_VAL, XATTR_REPLACE)
342
343 def test_create_on_existing_deprecated(subject):
344     item = subject[0]
345     lists_equal(xattr.listxattr(item), [])
346     xattr.setxattr(item, USER_ATTR, USER_VAL, 0)
347     with pytest.raises(EnvironmentError):
348         xattr.setxattr(item, USER_ATTR, USER_VAL, XATTR_CREATE)
349
350 def test_remove_on_missing_deprecated(any_subject):
351     """check deprecated list, set, get operations against an item"""
352     item, nofollow = any_subject
353     lists_equal(xattr.listxattr(item, nofollow), [])
354     with pytest.raises(EnvironmentError):
355         xattr.removexattr(item, USER_ATTR)
356
357 def test_set_get_remove_deprecated(subject):
358     """check deprecated list, set, get operations against an item"""
359     item = subject[0]
360     lists_equal(xattr.listxattr(item), [])
361     xattr.setxattr(item, USER_ATTR, USER_VAL, 0)
362     lists_equal(xattr.listxattr(item), [USER_ATTR])
363     assert xattr.getxattr(item, USER_ATTR) == USER_VAL
364     tuples_equal(xattr.get_all(item), [(USER_ATTR, USER_VAL)])
365     xattr.removexattr(item, USER_ATTR)
366     lists_equal(xattr.listxattr(item), [])
367     tuples_equal(xattr.get_all(item), [])
368
369 def test_many_ops(subject):
370     """test many ops"""
371     item = subject[0]
372     xattr.set(item, USER_ATTR, USER_VAL)
373     VL = [USER_ATTR]
374     VN = [USER_NN]
375     for i in range(MANYOPS_COUNT):
376         lists_equal(xattr.list(item), VL)
377         lists_equal(xattr.list(item, namespace=EMPTY_NS), VL)
378         assert xattr.list(item, namespace=NAMESPACE) == VN
379     for i in range(MANYOPS_COUNT):
380         assert xattr.get(item, USER_ATTR) == USER_VAL
381         assert xattr.get(item, USER_NN, namespace=NAMESPACE) == USER_VAL
382     for i in range(MANYOPS_COUNT):
383         tuples_equal(xattr.get_all(item),
384                      [(USER_ATTR, USER_VAL)])
385         assert xattr.get_all(item, namespace=NAMESPACE) == \
386             [(USER_NN, USER_VAL)]
387
388 def test_many_ops_deprecated(subject):
389     """test many ops (deprecated functions)"""
390     item = subject[0]
391     xattr.setxattr(item, USER_ATTR, USER_VAL)
392     VL = [USER_ATTR]
393     for i in range(MANYOPS_COUNT):
394         lists_equal(xattr.listxattr(item), VL)
395     for i in range(MANYOPS_COUNT):
396         assert xattr.getxattr(item, USER_ATTR) == USER_VAL
397     for i in range(MANYOPS_COUNT):
398         tuples_equal(xattr.get_all(item),
399                      [(USER_ATTR, USER_VAL)])
400
401 def test_no_attributes_deprecated(any_subject):
402     """test no attributes (deprecated functions)"""
403     item, nofollow = any_subject
404     lists_equal(xattr.listxattr(item, True), [])
405     tuples_equal(xattr.get_all(item, True), [])
406     with pytest.raises(EnvironmentError):
407         xattr.getxattr(item, USER_ATTR, True)
408
409 def test_no_attributes(any_subject):
410     """test no attributes"""
411     item, nofollow = any_subject
412     lists_equal(xattr.list(item, nofollow=nofollow), [])
413     assert xattr.list(item, nofollow=nofollow,
414                       namespace=NAMESPACE) == []
415     tuples_equal(xattr.get_all(item, nofollow=nofollow), [])
416     assert xattr.get_all(item, nofollow=nofollow,
417                          namespace=NAMESPACE) == []
418     with pytest.raises(EnvironmentError):
419         xattr.get(item, USER_NN, nofollow=nofollow,
420                   namespace=NAMESPACE)
421
422 def test_binary_payload_deprecated(subject):
423     """test binary values (deprecated functions)"""
424     item = subject[0]
425     BINVAL = b"abc\0def"
426     xattr.setxattr(item, USER_ATTR, BINVAL)
427     lists_equal(xattr.listxattr(item), [USER_ATTR])
428     assert xattr.getxattr(item, USER_ATTR) == BINVAL
429     tuples_equal(xattr.get_all(item), [(USER_ATTR, BINVAL)])
430     xattr.removexattr(item, USER_ATTR)
431
432 def test_binary_payload(subject):
433     """test binary values"""
434     item = subject[0]
435     BINVAL = b"abc\0def"
436     xattr.set(item, USER_ATTR, BINVAL)
437     lists_equal(xattr.list(item), [USER_ATTR])
438     assert xattr.list(item, namespace=NAMESPACE) == [USER_NN]
439     assert xattr.get(item, USER_ATTR) == BINVAL
440     assert xattr.get(item, USER_NN, namespace=NAMESPACE) == BINVAL
441     tuples_equal(xattr.get_all(item), [(USER_ATTR, BINVAL)])
442     assert xattr.get_all(item, namespace=NAMESPACE) == [(USER_NN, BINVAL)]
443     xattr.remove(item, USER_ATTR)
444
445 @NOT_MACOSX
446 def test_symlinks_user_fail(testdir, use_dangling):
447     _, sname = get_symlink(testdir, dangling=use_dangling)
448     with pytest.raises(IOError):
449         xattr.set(sname, USER_ATTR, USER_VAL, nofollow=True)
450     with pytest.raises(IOError):
451         xattr.set(sname, USER_NN, USER_VAL, namespace=NAMESPACE,
452                   nofollow=True)
453     with pytest.raises(IOError):
454         xattr.setxattr(sname, USER_ATTR, USER_VAL, XATTR_CREATE, True)
455
456 @pytest.mark.parametrize(
457     "call, args", [(xattr.get, [USER_ATTR]),
458                    (xattr.list, []),
459                    (xattr.remove, [USER_ATTR]),
460                    (xattr.get, [USER_ATTR]),
461                    (xattr.set, [USER_ATTR, USER_VAL])])
462 def test_none_namespace(testdir, call, args):
463     # Don't want to use subject, since that would prevent xfail test
464     # on path objects (due to hiding the exception here).
465     f = get_file_name(testdir)
466     with pytest.raises(TypeError):
467         call(f, *args, namespace=None)
468     fd = get_file_fd(testdir)
469     with pytest.raises(TypeError):
470         call(fd, *args, namespace=None)
471
472 @pytest.mark.parametrize(
473     "call",
474     [xattr.get, xattr.list, xattr.listxattr,
475      xattr.remove, xattr.removexattr,
476      xattr.set, xattr.setxattr,
477      xattr.get, xattr.getxattr])
478 def test_wrong_call(call):
479     with pytest.raises(TypeError):
480         call()
481
482 @pytest.mark.parametrize(
483     "call, args", [(xattr.get, [USER_ATTR]),
484                    (xattr.listxattr, []),
485                    (xattr.list, []),
486                    (xattr.remove, [USER_ATTR]),
487                    (xattr.removexattr, [USER_ATTR]),
488                    (xattr.get, [USER_ATTR]),
489                    (xattr.getxattr, [USER_ATTR]),
490                    (xattr.set, [USER_ATTR, USER_VAL]),
491                    (xattr.setxattr, [USER_ATTR, USER_VAL])])
492 def test_wrong_argument_type(call, args):
493     with pytest.raises(TypeError):
494         call(object(), *args)