]> git.k1024.org Git - pyxattr.git/blob - test/test_xattr.py
Tests: ensure resources as closed in subject tests
[pyxattr.git] / test / test_xattr.py
1 #
2 #
3
4 import sys
5 import tempfile
6 import os
7 import errno
8 import pytest
9 import pathlib
10 import platform
11 import io
12 import contextlib
13
14 import xattr
15 from xattr import NS_USER, XATTR_CREATE, XATTR_REPLACE
16
17 NAMESPACE = os.environ.get("NAMESPACE", NS_USER)
18
19 EMPTY_NS = bytes()
20
21 TEST_DIR = os.environ.get("TEST_DIR", ".")
22 TEST_IGNORE_XATTRS = os.environ.get("TEST_IGNORE_XATTRS", "")
23 if TEST_IGNORE_XATTRS == "":
24     TEST_IGNORE_XATTRS = []
25 else:
26     TEST_IGNORE_XATTRS = TEST_IGNORE_XATTRS.split(",")
27     # The following has to be a list comprehension, not a generator, to
28     # avoid weird consequences of lazy evaluation.
29     TEST_IGNORE_XATTRS.extend([a.encode() for a in TEST_IGNORE_XATTRS])
30
31 USER_NN = "test"
32 USER_ATTR = NAMESPACE.decode() + "." + USER_NN
33 USER_VAL = "abc"
34 EMPTY_VAL = ""
35 LARGE_VAL = "x" * 2048
36 MANYOPS_COUNT = 16384
37
38 USER_NN = USER_NN.encode()
39 USER_VAL = USER_VAL.encode()
40 USER_ATTR = USER_ATTR.encode()
41 EMPTY_VAL = EMPTY_VAL.encode()
42 LARGE_VAL = LARGE_VAL.encode()
43
44 # Helper functions
45
46 def ignore_tuples(attrs):
47     """Remove ignored attributes from the output of xattr.get_all."""
48     return [attr for attr in attrs
49             if attr[0] not in TEST_IGNORE_XATTRS]
50
51 def ignore(attrs):
52     """Remove ignored attributes from the output of xattr.list"""
53     return [attr for attr in attrs
54             if attr not in TEST_IGNORE_XATTRS]
55
56 def lists_equal(attrs, value):
57     """Helper to check list equivalence, skipping TEST_IGNORE_XATTRS."""
58     assert ignore(attrs) == value
59
60 def tuples_equal(attrs, value):
61     """Helper to check list equivalence, skipping TEST_IGNORE_XATTRS."""
62     assert ignore_tuples(attrs) == value
63
64 # Fixtures and helpers
65
66 @pytest.fixture
67 def testdir():
68     """per-test temp dir based in TEST_DIR"""
69     with tempfile.TemporaryDirectory(dir=TEST_DIR) as dname:
70         yield dname
71
72 def get_file(path):
73     fh, fname = tempfile.mkstemp(".test", "xattr-", path)
74     return fh, fname
75
76 @contextlib.contextmanager
77 def get_file_name(path):
78     fh, fname = get_file(path)
79     os.close(fh)
80     yield fname
81
82 @contextlib.contextmanager
83 def get_file_fd(path):
84     fd = get_file(path)[0]
85     yield fd
86     os.close(fd)
87
88 @contextlib.contextmanager
89 def get_file_object(path):
90     fd = get_file(path)[0]
91     with os.fdopen(fd) as f:
92         yield f
93
94 @contextlib.contextmanager
95 def get_dir(path):
96     yield tempfile.mkdtemp(".test", "xattr-", path)
97
98 def get_symlink(path, dangling=True):
99     """create a symlink"""
100     fh, fname = get_file(path)
101     os.close(fh)
102     if dangling:
103         os.unlink(fname)
104     sname = fname + ".symlink"
105     os.symlink(fname, sname)
106     return fname, sname
107
108 @contextlib.contextmanager
109 def get_valid_symlink(path):
110     yield get_symlink(path, dangling=False)[1]
111
112 @contextlib.contextmanager
113 def get_dangling_symlink(path):
114     yield get_symlink(path, dangling=True)[1]
115
116 def as_wrapper(call, fn, closer=None):
117     @contextlib.contextmanager
118     def f(path):
119         with call(path) as r:
120             val = fn(r)
121             yield val
122             if closer is not None:
123                 closer(val)
124     return f
125
126 def as_bytes(call):
127     return as_wrapper(call, lambda r: r.encode())
128
129 def as_fspath(call):
130     return as_wrapper(call, pathlib.PurePath)
131
132 def as_iostream(call):
133     opener = lambda f: io.open(f, "r")
134     closer = lambda r: r.close()
135     return as_wrapper(call, opener, closer)
136
137 NOT_BEFORE_36 = pytest.mark.xfail(condition="sys.version_info < (3,6)",
138                                   strict=True)
139 NOT_PYPY = pytest.mark.xfail(condition="platform.python_implementation() == 'PyPy'",
140                                   strict=False)
141
142 # Note: user attributes are only allowed on files and directories, so
143 # we have to skip the symlinks here. See xattr(7).
144 ITEMS_P = [
145     (get_file_name, False),
146     (as_bytes(get_file_name), False),
147     pytest.param((as_fspath(get_file_name), False),
148                  marks=[NOT_BEFORE_36, NOT_PYPY]),
149     (get_file_fd, False),
150     (get_file_object, False),
151     (as_iostream(get_file_name), False),
152     (get_dir, False),
153     (as_bytes(get_dir), False),
154     pytest.param((as_fspath(get_dir), False),
155                  marks=[NOT_BEFORE_36, NOT_PYPY]),
156     (get_valid_symlink, False),
157     (as_bytes(get_valid_symlink), False),
158     pytest.param((as_fspath(get_valid_symlink), False),
159                  marks=[NOT_BEFORE_36, NOT_PYPY]),
160 ]
161
162 ITEMS_D = [
163     "file name",
164     "file name (bytes)",
165     "file name (path)",
166     "file FD",
167     "file object",
168     "file io stream",
169     "directory",
170     "directory (bytes)",
171     "directory (path)",
172     "file via symlink",
173     "file via symlink (bytes)",
174     "file via symlink (path)",
175 ]
176
177 ALL_ITEMS_P = ITEMS_P + [
178     (get_valid_symlink, True),
179     (as_bytes(get_valid_symlink), True),
180     (get_dangling_symlink, True),
181     (as_bytes(get_dangling_symlink), True),
182 ]
183
184 ALL_ITEMS_D = ITEMS_D + [
185     "valid symlink",
186     "valid symlink (bytes)",
187     "dangling symlink",
188     "dangling symlink (bytes)"
189 ]
190
191 @pytest.fixture(params=ITEMS_P, ids=ITEMS_D)
192 def subject(testdir, request):
193     with request.param[0](testdir) as value:
194         yield value, request.param[1]
195
196 @pytest.fixture(params=ALL_ITEMS_P, ids=ALL_ITEMS_D)
197 def any_subject(testdir, request):
198     with request.param[0](testdir) as value:
199         yield value, request.param[1]
200
201 @pytest.fixture(params=[True, False], ids=["with namespace", "no namespace"])
202 def use_ns(request):
203     return request.param
204
205 @pytest.fixture(params=[True, False], ids=["dangling", "valid"])
206 def use_dangling(request):
207     return request.param
208
209 ### Test functions
210
211 def test_empty_value(subject):
212     item, nofollow = subject
213     xattr.set(item, USER_ATTR, EMPTY_VAL, nofollow=nofollow)
214     assert xattr.get(item, USER_ATTR, nofollow=nofollow) == EMPTY_VAL
215
216 def test_large_value(subject):
217     item, nofollow = subject
218     xattr.set(item, USER_ATTR, LARGE_VAL)
219     assert xattr.get(item, USER_ATTR, nofollow=nofollow) == LARGE_VAL
220
221
222 def test_file_mixed_access_deprecated(testdir):
223     """test mixed access to file (deprecated functions)"""
224     fh, fname = get_file(testdir)
225     with os.fdopen(fh) as fo:
226         lists_equal(xattr.listxattr(fname), [])
227         xattr.setxattr(fname, USER_ATTR, USER_VAL)
228         lists_equal(xattr.listxattr(fh), [USER_ATTR])
229         assert xattr.getxattr(fo, USER_ATTR) == USER_VAL
230         tuples_equal(xattr.get_all(fo), [(USER_ATTR, USER_VAL)])
231         tuples_equal(xattr.get_all(fname),
232                      [(USER_ATTR, USER_VAL)])
233
234 def test_file_mixed_access(testdir):
235     """test mixed access to file"""
236     fh, fname = get_file(testdir)
237     with os.fdopen(fh) as fo:
238         lists_equal(xattr.list(fname), [])
239         xattr.set(fname, USER_ATTR, USER_VAL)
240         lists_equal(xattr.list(fh), [USER_ATTR])
241         assert xattr.list(fh, namespace=NAMESPACE) == [USER_NN]
242         assert xattr.get(fo, USER_ATTR) == USER_VAL
243         assert xattr.get(fo, USER_NN, namespace=NAMESPACE) == USER_VAL
244         tuples_equal(xattr.get_all(fo),
245                      [(USER_ATTR, USER_VAL)])
246         assert xattr.get_all(fo, namespace=NAMESPACE) == \
247             [(USER_NN, USER_VAL)]
248         tuples_equal(xattr.get_all(fname), [(USER_ATTR, USER_VAL)])
249         assert xattr.get_all(fname, namespace=NAMESPACE) == \
250             [(USER_NN, USER_VAL)]
251
252 def test_replace_on_missing(subject, use_ns):
253     item = subject[0]
254     lists_equal(xattr.list(item), [])
255     with pytest.raises(EnvironmentError):
256         if use_ns:
257             xattr.set(item, USER_NN, USER_VAL, flags=XATTR_REPLACE,
258                       namespace=NAMESPACE)
259         else:
260             xattr.set(item, USER_ATTR, USER_VAL, flags=XATTR_REPLACE)
261
262 def test_create_on_existing(subject, use_ns):
263     item = subject[0]
264     lists_equal(xattr.list(item), [])
265     if use_ns:
266         xattr.set(item, USER_NN, USER_VAL,
267                   namespace=NAMESPACE)
268     else:
269         xattr.set(item, USER_ATTR, USER_VAL)
270     with pytest.raises(EnvironmentError):
271         if use_ns:
272             xattr.set(item, USER_NN, USER_VAL,
273                       flags=XATTR_CREATE, namespace=NAMESPACE)
274         else:
275             xattr.set(item, USER_ATTR, USER_VAL, flags=XATTR_CREATE)
276
277 def test_remove_on_missing(any_subject, use_ns):
278     item, nofollow = any_subject
279     lists_equal(xattr.list(item, nofollow=nofollow), [])
280     with pytest.raises(EnvironmentError):
281         if use_ns:
282             xattr.remove(item, USER_NN, namespace=NAMESPACE,
283                          nofollow=nofollow)
284         else:
285             xattr.remove(item, USER_ATTR, nofollow=nofollow)
286
287 def test_set_get_remove(subject, use_ns):
288     item = subject[0]
289     lists_equal(xattr.list(item), [])
290     if use_ns:
291         xattr.set(item, USER_NN, USER_VAL,
292                   namespace=NAMESPACE)
293     else:
294         xattr.set(item, USER_ATTR, USER_VAL)
295     if use_ns:
296         assert xattr.list(item, namespace=NAMESPACE) == [USER_NN]
297     else:
298         lists_equal(xattr.list(item), [USER_ATTR])
299         lists_equal(xattr.list(item, namespace=EMPTY_NS),
300                     [USER_ATTR])
301     if use_ns:
302         assert xattr.get(item, USER_NN, namespace=NAMESPACE) == USER_VAL
303     else:
304         assert xattr.get(item, USER_ATTR) == USER_VAL
305     if use_ns:
306         assert xattr.get_all(item, namespace=NAMESPACE) == \
307             [(USER_NN, USER_VAL)]
308     else:
309         tuples_equal(xattr.get_all(item),
310                      [(USER_ATTR, USER_VAL)])
311     if use_ns:
312         xattr.remove(item, USER_NN, namespace=NAMESPACE)
313     else:
314         xattr.remove(item, USER_ATTR)
315     lists_equal(xattr.list(item), [])
316     tuples_equal(xattr.get_all(item), [])
317
318 def test_replace_on_missing_deprecated(subject):
319     item = subject[0]
320     lists_equal(xattr.listxattr(item), [])
321     with pytest.raises(EnvironmentError):
322         xattr.setxattr(item, USER_ATTR, USER_VAL, XATTR_REPLACE)
323
324 def test_create_on_existing_deprecated(subject):
325     item = subject[0]
326     lists_equal(xattr.listxattr(item), [])
327     xattr.setxattr(item, USER_ATTR, USER_VAL, 0)
328     with pytest.raises(EnvironmentError):
329         xattr.setxattr(item, USER_ATTR, USER_VAL, XATTR_CREATE)
330
331 def test_remove_on_missing_deprecated(any_subject):
332     """check deprecated list, set, get operations against an item"""
333     item, nofollow = any_subject
334     lists_equal(xattr.listxattr(item, nofollow), [])
335     with pytest.raises(EnvironmentError):
336         xattr.removexattr(item, USER_ATTR)
337
338 def test_set_get_remove_deprecated(subject):
339     """check deprecated list, set, get operations against an item"""
340     item = subject[0]
341     lists_equal(xattr.listxattr(item), [])
342     xattr.setxattr(item, USER_ATTR, USER_VAL, 0)
343     lists_equal(xattr.listxattr(item), [USER_ATTR])
344     assert xattr.getxattr(item, USER_ATTR) == USER_VAL
345     tuples_equal(xattr.get_all(item), [(USER_ATTR, USER_VAL)])
346     xattr.removexattr(item, USER_ATTR)
347     lists_equal(xattr.listxattr(item), [])
348     tuples_equal(xattr.get_all(item), [])
349
350 def test_many_ops(subject):
351     """test many ops"""
352     item = subject[0]
353     xattr.set(item, USER_ATTR, USER_VAL)
354     VL = [USER_ATTR]
355     VN = [USER_NN]
356     for i in range(MANYOPS_COUNT):
357         lists_equal(xattr.list(item), VL)
358         lists_equal(xattr.list(item, namespace=EMPTY_NS), VL)
359         assert xattr.list(item, namespace=NAMESPACE) == VN
360     for i in range(MANYOPS_COUNT):
361         assert xattr.get(item, USER_ATTR) == USER_VAL
362         assert xattr.get(item, USER_NN, namespace=NAMESPACE) == USER_VAL
363     for i in range(MANYOPS_COUNT):
364         tuples_equal(xattr.get_all(item),
365                      [(USER_ATTR, USER_VAL)])
366         assert xattr.get_all(item, namespace=NAMESPACE) == \
367             [(USER_NN, USER_VAL)]
368
369 def test_many_ops_deprecated(subject):
370     """test many ops (deprecated functions)"""
371     item = subject[0]
372     xattr.setxattr(item, USER_ATTR, USER_VAL)
373     VL = [USER_ATTR]
374     for i in range(MANYOPS_COUNT):
375         lists_equal(xattr.listxattr(item), VL)
376     for i in range(MANYOPS_COUNT):
377         assert xattr.getxattr(item, USER_ATTR) == USER_VAL
378     for i in range(MANYOPS_COUNT):
379         tuples_equal(xattr.get_all(item),
380                      [(USER_ATTR, USER_VAL)])
381
382 def test_no_attributes_deprecated(any_subject):
383     """test no attributes (deprecated functions)"""
384     item, nofollow = any_subject
385     lists_equal(xattr.listxattr(item, True), [])
386     tuples_equal(xattr.get_all(item, True), [])
387     with pytest.raises(EnvironmentError):
388         xattr.getxattr(item, USER_ATTR, True)
389
390 def test_no_attributes(any_subject):
391     """test no attributes"""
392     item, nofollow = any_subject
393     lists_equal(xattr.list(item, nofollow=nofollow), [])
394     assert xattr.list(item, nofollow=nofollow,
395                       namespace=NAMESPACE) == []
396     tuples_equal(xattr.get_all(item, nofollow=nofollow), [])
397     assert xattr.get_all(item, nofollow=nofollow,
398                          namespace=NAMESPACE) == []
399     with pytest.raises(EnvironmentError):
400         xattr.get(item, USER_NN, nofollow=nofollow,
401                   namespace=NAMESPACE)
402
403 def test_binary_payload_deprecated(subject):
404     """test binary values (deprecated functions)"""
405     item = subject[0]
406     BINVAL = b"abc\0def"
407     xattr.setxattr(item, USER_ATTR, BINVAL)
408     lists_equal(xattr.listxattr(item), [USER_ATTR])
409     assert xattr.getxattr(item, USER_ATTR) == BINVAL
410     tuples_equal(xattr.get_all(item), [(USER_ATTR, BINVAL)])
411     xattr.removexattr(item, USER_ATTR)
412
413 def test_binary_payload(subject):
414     """test binary values"""
415     item = subject[0]
416     BINVAL = b"abc\0def"
417     xattr.set(item, USER_ATTR, BINVAL)
418     lists_equal(xattr.list(item), [USER_ATTR])
419     assert xattr.list(item, namespace=NAMESPACE) == [USER_NN]
420     assert xattr.get(item, USER_ATTR) == BINVAL
421     assert xattr.get(item, USER_NN, namespace=NAMESPACE) == BINVAL
422     tuples_equal(xattr.get_all(item), [(USER_ATTR, BINVAL)])
423     assert xattr.get_all(item, namespace=NAMESPACE) == [(USER_NN, BINVAL)]
424     xattr.remove(item, USER_ATTR)
425
426 def test_symlinks_user_fail(testdir, use_dangling):
427     _, sname = get_symlink(testdir, dangling=use_dangling)
428     with pytest.raises(IOError):
429         xattr.set(sname, USER_ATTR, USER_VAL, nofollow=True)
430     with pytest.raises(IOError):
431         xattr.set(sname, USER_NN, USER_VAL, namespace=NAMESPACE,
432                   nofollow=True)
433     with pytest.raises(IOError):
434         xattr.setxattr(sname, USER_ATTR, USER_VAL, XATTR_CREATE, True)
435
436 @pytest.mark.parametrize(
437     "call, args", [(xattr.get, [USER_ATTR]),
438                    (xattr.list, []),
439                    (xattr.remove, [USER_ATTR]),
440                    (xattr.get, [USER_ATTR]),
441                    (xattr.set, [USER_ATTR, USER_VAL])])
442 def test_none_namespace(testdir, call, args):
443     # Don't want to use subject, since that would prevent xfail test
444     # on path objects (due to hiding the exception here).
445     f = get_file_name(testdir)
446     with pytest.raises(TypeError):
447         call(f, *args, namespace=None)
448     fd = get_file_fd(testdir)
449     with pytest.raises(TypeError):
450         call(fd, *args, namespace=None)
451
452 @pytest.mark.parametrize(
453     "call",
454     [xattr.get, xattr.list, xattr.listxattr,
455      xattr.remove, xattr.removexattr,
456      xattr.set, xattr.setxattr,
457      xattr.get, xattr.getxattr])
458 def test_wrong_call(call):
459     with pytest.raises(TypeError):
460         call()
461
462 @pytest.mark.parametrize(
463     "call, args", [(xattr.get, [USER_ATTR]),
464                    (xattr.listxattr, []),
465                    (xattr.list, []),
466                    (xattr.remove, [USER_ATTR]),
467                    (xattr.removexattr, [USER_ATTR]),
468                    (xattr.get, [USER_ATTR]),
469                    (xattr.getxattr, [USER_ATTR]),
470                    (xattr.set, [USER_ATTR, USER_VAL]),
471                    (xattr.setxattr, [USER_ATTR, USER_VAL])])
472 def test_wrong_argument_type(call, args):
473     with pytest.raises(TypeError):
474         call(object(), *args)