!C99Shell v. 2.0 [PHP 7 Update] [25.02.2019]!

Software: Apache. PHP/5.6.40 

uname -a: Linux cpanel06wh.bkk1.cloud.z.com 2.6.32-954.3.5.lve1.4.80.el6.x86_64 #1 SMP Thu Sep 24
01:42:00 EDT 2020 x86_64
 

uid=851(cp949260) gid=853(cp949260) groups=853(cp949260) 

Safe-mode: OFF (not secure)

/opt/alt/python37/lib64/python3.7/site-packages/psycopg2/tests/   drwxr-xr-x
Free 220.4 GB of 981.82 GB (22.45%)
Home    Back    Forward    UPDIR    Refresh    Search    Buffer    Encoder    Tools    Proc.    FTP brute    Sec.    SQL    PHP-code    Update    Feedback    Self remove    Logout    


Viewing file:     test_cancel.py (3.64 KB)      -rw-r--r--
Select action/file-type:
(+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# test_cancel.py - unit test for query cancellation
#
# Copyright (C) 2010-2011 Jan UrbaƄski  <wulczer@wulczer.org>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# In addition, as a special exception, the copyright holders give
# permission to link this program with the OpenSSL library (or with
# modified versions of OpenSSL that use the same license as OpenSSL),
# and distribute linked combinations including the two.
#
# You must obey the GNU Lesser General Public License in all respects for
# all of the code used other than OpenSSL.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
# License for more details.

import threading

import psycopg2
import psycopg2.extensions
from psycopg2 import extras

from .testconfig import dsn
from .testutils import unittest, ConnectingTestCase, skip_before_postgres

class CancelTests(ConnectingTestCase):

    def setUp(self):
        ConnectingTestCase.setUp(self)

        cur = self.conn.cursor()
        cur.execute('''
            CREATE TEMPORARY TABLE table1 (
              id int PRIMARY KEY
            )''')
        self.conn.commit()

    def test_empty_cancel(self):
        self.conn.cancel()

    @skip_before_postgres(8, 2)
    def test_cancel(self):
        errors = []

        def neverending(conn):
            cur = conn.cursor()
            try:
                self.assertRaises(psycopg2.extensions.QueryCanceledError,
                                  cur.execute, "select pg_sleep(60)")
            # make sure the connection still works
                conn.rollback()
                cur.execute("select 1")
                self.assertEqual(cur.fetchall(), [(1, )])
            except Exception as e:
                errors.append(e)
                raise

        def canceller(conn):
            cur = conn.cursor()
            try:
                conn.cancel()
            except Exception as e:
                errors.append(e)
                raise

        thread1 = threading.Thread(target=neverending, args=(self.conn, ))
        # wait a bit to make sure that the other thread is already in
        # pg_sleep -- ugly and racy, but the chances are ridiculously low
        thread2 = threading.Timer(0.3, canceller, args=(self.conn, ))
        thread1.start()
        thread2.start()
        thread1.join()
        thread2.join()

        self.assertEqual(errors, [])

    @skip_before_postgres(8, 2)
    def test__async_cancel(self):
        _async_conn = psycopg2.connect(dsn, _async=True)
        self.assertRaises(psycopg2.OperationalError, _async_conn.cancel)
        extras.wait_select(_async_conn)
        cur = _async_conn.cursor()
        cur.execute("select pg_sleep(10000)")
        self.assertTrue(_async_conn.isexecuting())
        _async_conn.cancel()
        self.assertRaises(psycopg2.extensions.QueryCanceledError,
                          extras.wait_select, _async_conn)
        cur.execute("select 1")
        extras.wait_select(_async_conn)
        self.assertEqual(cur.fetchall(), [(1, )])

    def test__async_connection_cancel(self):
        _async_conn = psycopg2.connect(dsn, _async=True)
        _async_conn.close()
        self.assertTrue(_async_conn.closed)


def test_suite():
    return unittest.TestLoader().loadTestsFromName(__name__)

if __name__ == "__main__":
    unittest.main()

:: Command execute ::

Enter:
 
Select:
 

:: Search ::
  - regexp 

:: Upload ::
 
[ Read-Only ]

:: Make Dir ::
 
[ Read-Only ]
:: Make File ::
 
[ Read-Only ]

:: Go Dir ::
 
:: Go File ::
 

--[ c99shell v. 2.0 [PHP 7 Update] [25.02.2019] maintained by KaizenLouie | C99Shell Github | Generation time: 0.2076 ]--