3 \ @s<dZdgZddlZddlZddlZddlZddlZddlZy ddlZWne k r^dZYnXddl m Z ddl m Z ddl m Z ddl mZdd l mZdd l mZdd l mZdd l mZdd lmZddlmZddZGddde jZGdddejejZGdddeZGdddeZGdddeZdS)zEvent loop using a selector and related classes. A selector is a "notify-when-ready" multiplexer. For a subclass which also includes support for signal handling, see the unix_events sub-module. BaseSelectorEventLoopN) base_events)compat) constants)events)futures) selectors) transports)sslproto) coroutine)loggerc Cs6y|j|}Wntk r"dSXt|j|@SdS)NF)get_keyKeyErrorboolr)selectorfdZeventkeyr//usr/lib64/python3.6/asyncio/selector_events.py_test_selector_event s rcsreZdZdZdOfdd ZdPdddddZdQddddd d d Zddddd d d ZdRddZfddZ ddZ ddZ ddZ ddZ ddZddZdSdd ZdTd!d"ZedUd#d$Zd%d&Zd'd(Zd)d*Zd+d,Zd-d.Zd/d0Zd1d2Zd3d4Zd5d6Zd7d8Zd9d:Zd;d<Zd=d>Z ed?d@Z!dAdBZ"dCdDZ#dEdFZ$dGdHZ%dIdJZ&dKdLZ'dMdNZ(Z)S)VrzJSelector event loop. See events.EventLoop for API specification. NcsFtj|dkrtj}tjd|jj||_|j t j |_ dS)NzUsing selector: %s) super__init__r ZDefaultSelectorr debug __class____name__ _selector_make_self_pipeweakrefWeakValueDictionary _transports)selfr)rrrr1s zBaseSelectorEventLoop.__init__)extraservercCst||||||S)N)_SelectorSocketTransport)r!sockprotocolwaiterr"r#rrr_make_socket_transport;s z,BaseSelectorEventLoop._make_socket_transportF) server_sideserver_hostnamer"r#c CsNtjs"|j||||||||dStj||||||} t||| ||d| jS)N)r)r*r"r#)r"r#)r Z_is_sslproto_available_make_legacy_ssl_transportZ SSLProtocolr$Z_app_transport) r!rawsockr& sslcontextr'r)r*r"r#Z ssl_protocolrrr_make_ssl_transport@s   z)BaseSelectorEventLoop._make_ssl_transportc Cst||||||||| S)N)_SelectorSslTransport) r!r,r&r-r'r)r*r"r#rrrr+Os z0BaseSelectorEventLoop._make_legacy_ssl_transportcCst||||||S)N)_SelectorDatagramTransport)r!r%r&addressr'r"rrr_make_datagram_transportYsz.BaseSelectorEventLoop._make_datagram_transportcsL|jrtd|jrdS|jtj|jdk rH|jjd|_dS)Nz!Cannot close a running event loop)Z is_running RuntimeError is_closed_close_self_pipercloser)r!)rrrr6^s   zBaseSelectorEventLoop.closecCstdS)N)NotImplementedError)r!rrr _socketpairisz!BaseSelectorEventLoop._socketpaircCsB|j|jj|jjd|_|jjd|_|jd8_dS)Nr)_remove_reader_ssockfilenor6_csock _internal_fds)r!rrrr5ls   z&BaseSelectorEventLoop._close_self_pipecCsN|j\|_|_|jjd|jjd|jd7_|j|jj|jdS)NFr)r8r:r< setblockingr= _add_readerr;_read_from_self)r!rrrrts   z%BaseSelectorEventLoop._make_self_pipecCsdS)Nr)r!datarrr_process_self_data|sz(BaseSelectorEventLoop._process_self_datac CsVxPy |jjd}|sP|j|Wqtk r8wYqtk rLPYqXqWdS)Ni)r:recvrBInterruptedErrorBlockingIOError)r!rArrrr@s z%BaseSelectorEventLoop._read_from_selfc CsJ|j}|dk rFy|jdWn(tk rD|jr@tjdddYnXdS)Nz3Fail to write a null byte into the self-pipe socketT)exc_info)r<sendOSError_debugr r)r!Zcsockrrr_write_to_selfsz$BaseSelectorEventLoop._write_to_selfdcCs |j|j|j|||||dS)N)r?r;_accept_connection)r!protocol_factoryr%r-r#backlogrrr_start_servingsz$BaseSelectorEventLoop._start_servingc Csxt|D]}y0|j\}}|jr2tjd||||jdWntttfk rXdSt k r} z^| j t j t j t j t jfkr|jd| |d|j|j|jtj|j|||||nWYdd} ~ Xq Xd|i} |j||| ||} |j| q WdS)Nz#%r got a new connection from %r: %rFz&socket.accept() out of system resource)message exceptionsocketpeername)rangeacceptrJr rr>rErDConnectionAbortedErrorrIerrnoZEMFILEZENFILEZENOBUFSZENOMEMcall_exception_handlerr9r;Z call_laterrZACCEPT_RETRY_DELAYrP_accept_connection2Z create_task) r!rNr%r-r#rO_connaddrexcr"rVrrrrMs4     z(BaseSelectorEventLoop._accept_connectionc csd}d}yj|}|j}|r6|j||||d||d}n|j|||||d}y|EdHWn|jYnXWn\tk r} z@|jrd| d} |dk r|| d<|dk r|| d<|j| WYdd} ~ XnXdS)NT)r'r)r"r#)r'r"r#z3Error on transport creation for incoming connection)rQrRr& transport) create_futurer.r(r6 ExceptionrJrY) r!rNr\r"r-r#r&r_r'r^contextrrrrZs4 z)BaseSelectorEventLoop._accept_connection2c Cs@y|j|}Wntk r"YnX|jsX|j|j }\}}|jj ||tjB||f|dk r|j dS)N) _check_closedrHandlerrrregisterr EVENT_READrAmodifycancel) r!rcallbackargshandlermaskreaderwriterrrrr?s  z!BaseSelectorEventLoop._add_readerc Cs|jr dSy|jj|}Wntk r0dSX|j|j}\}}|tjM}|sb|jj|n|jj ||d|f|dk r|j dSdSdS)NFT) r4rrrrrAr ri unregisterrjrk)r!rrrorprqrrrr9s z$BaseSelectorEventLoop._remove_readerc Gs|jtj|||}y|jj|}Wn*tk rP|jj|tjd|fYn>X|j|j }\}}|jj ||tjB||f|dk r|j dS)N) rfrrgrrrrhr EVENT_WRITErArjrk) r!rrlrmrnrrorprqrrr _add_writers  z!BaseSelectorEventLoop._add_writerc Cs|jr dSy|jj|}Wntk r0dSX|j|j}\}}|tjM}|sb|jj|n|jj |||df|dk r|j dSdSdS)zRemove a writer callback.FNT) r4rrrrrAr rsrrrjrk)r!rrrorprqrrr_remove_writer,s z$BaseSelectorEventLoop._remove_writercGs|j||j||f|S)zAdd a reader callback.)rer?)r!rrlrmrrr add_readerCs z BaseSelectorEventLoop.add_readercCs|j||j|S)zRemove a reader callback.)rer9)r!rrrr remove_readerHs z#BaseSelectorEventLoop.remove_readercGs|j||j||f|S)zAdd a writer callback..)rert)r!rrlrmrrr add_writerMs z BaseSelectorEventLoop.add_writercCs|j||j|S)zRemove a writer callback.)reru)r!rrrr remove_writerRs z#BaseSelectorEventLoop.remove_writercCs6|jr|jdkrtd|j}|j|d|||S)zReceive data from the socket. The return value is a bytes object representing the data received. The maximum amount of data to be received at once is specified by nbytes. This method is a coroutine. rzthe socket must be non-blockingN)rJ gettimeout ValueErrorr` _sock_recv)r!r%nfutrrr sock_recvWs zBaseSelectorEventLoop.sock_recvcCs|dk r|j||jrdSy|j|}Wn`ttfk rb|j}|j||j||||Yn6tk r}z|j |WYdd}~Xn X|j |dS)N) rw cancelledrCrErDr;rvr|ra set_exception set_result)r!r~ registered_fdr%r}rArr^rrrr|fs z BaseSelectorEventLoop._sock_recvcCsF|jr|jdkrtd|j}|r8|j|d||n |jd|S)aSend data to the socket. The socket must be connected to a remote socket. This method continues to send data from data until either all data has been sent or an error occurs. None is returned on success. On error, an exception is raised, and there is no way to determine how much data, if any, was successfully processed by the receiving end of the connection. This method is a coroutine. rzthe socket must be non-blockingN)rJrzr{r` _sock_sendallr)r!r%rAr~rrr sock_sendall{s  z"BaseSelectorEventLoop.sock_sendallcCs|dk r|j||jrdSy|j|}WnDttfk rHd}Yn*tk rp}z|j|dSd}~XnX|t|kr|jdn.|r||d}|j }|j ||j ||||dS)Nr) ryrrHrErDrarlenrr;rxr)r!r~rr%rAr}r^rrrrrs"     z#BaseSelectorEventLoop._sock_sendallccs|jr|jdkrtdttd s2|jtjkrptj||j|j |d}|j sZ|EdH|j d\}}}}}|j }|j ||||EdHS)zTConnect to a remote socket at address. This method is a coroutine. rzthe socket must be non-blockingAF_UNIX)familyprotoloopN)rJrzr{hasattrrSrrrZ_ensure_resolvedrdoneresultr` _sock_connect)r!r%r1Zresolvedr[r~rrr sock_connects z"BaseSelectorEventLoop.sock_connectcCs|j}y|j|Wnjttfk rV|jtj|j||j||j |||Yn6t k r}z|j |WYdd}~Xn X|j ddS)N) r;ZconnectrErDZadd_done_callback functoolspartial_sock_connect_donerx_sock_connect_cbrarr)r!r~r%r1rr^rrrrsz#BaseSelectorEventLoop._sock_connectcCs|j|dS)N)ry)r!rr~rrrrsz(BaseSelectorEventLoop._sock_connect_donecCs|jr dSy,|jtjtj}|dkr6t|d|fWnBttfk rPYn6tk rz}z|j |WYdd}~Xn X|j ddS)NrzConnect call failed %s) rZ getsockoptrSZ SOL_SOCKETZSO_ERRORrIrErDrarr)r!r~r%r1errr^rrrrsz&BaseSelectorEventLoop._sock_connect_cbcCs4|jr|jdkrtd|j}|j|d||S)a|Accept a connection. The socket must be bound to an address and listening for connections. The return value is a pair (conn, address) where conn is a new socket object usable to send and receive data on the connection, and address is the address bound to the socket on the other end of the connection. This method is a coroutine. rzthe socket must be non-blockingF)rJrzr{r` _sock_accept)r!r%r~rrr sock_accepts z!BaseSelectorEventLoop.sock_acceptcCs|j}|r|j||jr"dSy|j\}}|jdWnVttfk rh|j||j|d|Yn:t k r}z|j |WYdd}~XnX|j ||fdS)NFT) r;rwrrVr>rErDrvrrarr)r!r~Z registeredr%rr\r1r^rrrrs  z"BaseSelectorEventLoop._sock_acceptcCsx~|D]v\}}|j|j}\}}|tj@rN|dk rN|jrD|j|n |j||tj@r|dk r|jrr|j|q|j|qWdS)N) fileobjrAr riZ _cancelledr9Z _add_callbackrsru)r!Z event_listrrorrprqrrr_process_eventss   z%BaseSelectorEventLoop._process_eventscCs|j|j|jdS)N)r9r;r6)r!r%rrr _stop_serving sz#BaseSelectorEventLoop._stop_serving)N)N)N)NNN)NNrL)NNrL)NN)*r __module__ __qualname____doc__rr(r.r+r2r6r8r5rrBr@rKrPrMr rZrer?r9rtrurvrwrxryrr|rrrrrrrrrr __classcell__rr)rrr+sT      ( #  cseZdZdZeZdZd fdd ZddZdd Z d d Z d d Z ddZ ddZ ejr`ddZd!ddZddZddZddZddZZS)"_SelectorTransportiNc stj||||jd<|j|jd<d|jkrdy|j|jd<Wn tjk rbd|jd<YnX||_|j|_ ||_ d|_ ||_ |j |_d|_d|_|j dk r|j j||j|j <dS)NrSZsocknamerTTrF)rr_extraZ getsocknameZ getpeernamerSerror_sockr;_sock_fd _protocol_protocol_connected_server_buffer_factory_buffer _conn_lost_closingZ_attachr )r!rr%r&r"r#)rrrrs&      z_SelectorTransport.__init__cCs|jjg}|jdkr |jdn|jr0|jd|jd|j|jdk r|jj rt|jj |jt j }|rz|jdn |jdt|jj |jt j }|rd}nd}|j }|jd||fd d j|S) Nclosedclosingzfd=%sz read=pollingz read=idlepollingZidlezwrite=<%s, bufsize=%s>z<%s> )rrrappendrr_loopr4rrr rirsget_write_buffer_sizejoin)r!inforstatebufsizerrr__repr__2s*       z_SelectorTransport.__repr__cCs|jddS)N) _force_close)r!rrrabortNsz_SelectorTransport.abortcCs ||_dS)N)r)r!r&rrr set_protocolQsz_SelectorTransport.set_protocolcCs|jS)N)r)r!rrr get_protocolTsz_SelectorTransport.get_protocolcCs|jS)N)r)r!rrrrcWsz_SelectorTransport.is_closingcCsT|jr dSd|_|jj|j|jsP|jd7_|jj|j|jj|jddS)NTr) rrr9rrrru call_soon_call_connection_lost)r!rrrr6Zsz_SelectorTransport.closecCs,|jdk r(tjd|t|d|jjdS)Nzunclosed transport %r)source)rwarningswarnResourceWarningr6)r!rrr__del__hs  z_SelectorTransport.__del__Fatal error on transportcCsPt|tjr*|jjrBtjd||ddn|jj||||jd|j |dS)Nz%r: %sT)rG)rQrRr_r&) isinstancerZ_FATAL_ERROR_IGNOREr get_debugr rrYrr)r!r^rQrrr _fatal_errorns   z_SelectorTransport._fatal_errorcCsd|jr dS|jr(|jj|jj|j|jsBd|_|jj|j|jd7_|jj|j |dS)NTr) rrclearrrurrr9rr)r!r^rrrr|s z_SelectorTransport._force_closec CsVz|jr|jj|Wd|jjd|_d|_d|_|j}|dk rP|jd|_XdS)N)rrZconnection_lostrr6rrZ_detach)r!r^r#rrrrs z(_SelectorTransport._call_connection_lostcCs t|jS)N)rr)r!rrrrsz(_SelectorTransport.get_write_buffer_sizecGs"|jr dS|jj||f|dS)N)rrr?)r!rrlrmrrrr?sz_SelectorTransport._add_readeri)NN)r)rrrmax_size bytearrayrrrrrrrrcr6rZPY34rrrrrr?rrr)rrrs"   rcsVeZdZdfdd ZddZddZdd Zd d Zd d ZddZ ddZ Z S)r$Ncsrtj|||||d|_d|_tj|j|jj|j j ||jj|j |j |j |dk rn|jjtj|ddS)NF)rr_eof_pausedrZ _set_nodelayrrrrconnection_mader?r _read_readyr_set_result_unless_cancelled)r!rr%r&r'r"r#)rrrrs    z!_SelectorSocketTransport.__init__cCs>|js |jrdSd|_|jj|j|jjr:tjd|dS)NTz%r pauses reading)rrrr9rrr r)r!rrr pause_readings   z&_SelectorSocketTransport.pause_readingcCsB|js|j rdSd|_|j|j|j|jjr>tjd|dS)NFz%r resumes reading) rrr?rrrrr r)r!rrrresume_readings  z'_SelectorSocketTransport.resume_readingcCs|jr dSy|jj|j}WnDttfk r4Yn|tk r`}z|j|dWYdd}~XnPX|rt|jj |n<|j j rt j d||jj}|r|j j|jn|jdS)Nz$Fatal read error on socket transportz%r received EOF)rrrCrrErDrarr data_receivedrrr r eof_receivedr9rr6)r!rAr^ keep_openrrrrs    z$_SelectorSocketTransport._read_readycCst|tttfs"tdt|j|jr0td|s8dS|j rf|j t j krTt j d|j d7_ dS|jsy|jj|}WnBttfk rYn@tk r}z|j|ddSd}~XnX||d}|sdS|jj|j|j|jj||jdS)Nz1data argument must be a bytes-like object, not %rz%Cannot call write() after write_eof()zsocket.send() raised exception.rz%Fatal write error on socket transport)rbytesr memoryview TypeErrortyperrr3rr!LOG_THRESHOLD_FOR_CONNLOST_WRITESr warningrrrHrErDrarrrtr _write_readyextend_maybe_pause_protocol)r!rAr}r^rrrwrites4     z_SelectorSocketTransport.writecCs|jstd|jrdSy|jj|j}Wn\ttfk rBYntk r}z*|jj |j |jj |j |dWYdd}~XnTX|r|jd|=|j |js|jj |j |jr|jdn|jr|jjtjdS)NzData should not be emptyz%Fatal write error on socket transport)rAssertionErrorrrrHrErDrarrurrr_maybe_resume_protocolrrrshutdownrSSHUT_WR)r!r}r^rrrrs(   z%_SelectorSocketTransport._write_readycCs.|js |jrdSd|_|js*|jjtjdS)NT)rrrrrrSr)r!rrr write_eofs  z"_SelectorSocketTransport.write_eofcCsdS)NTr)r!rrr can_write_eof sz&_SelectorSocketTransport.can_write_eof)NNN) rrrrrrrrrrrrrr)rrr$s#r$csdeZdZeZdfdd ZdddZddZd d Zd d Z d dZ ddZ ddZ ddZ ZS)r/NFc stdkrtd|s tj||}|dd} |r<| r<|| d<|j|f| } tj|| ||| d|_||_||_ ||_ d|_ |j j |d|jjrtjd||jj} nd} |j| dS)Nzstdlib ssl module not availableF)r)Zdo_handshake_on_connectr*)r-z%r starts SSL handshake)sslr3r Z_create_transport_contextZ wrap_socketrrr_server_hostname_waiter _sslcontextrrupdaterrr rtime _on_handshake) r!rr,r&r-r'r)r*r"r#Z wrap_kwargsZsslsock start_time)rrrr(s*     z_SelectorSslTransport.__init__cCsD|jdkrdS|jjs:|dk r.|jj|n |jjdd|_dS)N)rrrr)r!r^rrr_wakeup_waiterLs   z$_SelectorSslTransport._wakeup_waiterc"Cs$y|jjWntjk r8|jj|j|j|dStjk r`|jj |j|j|dSt k r}z`|jj rt j d|dd|jj|j|jj|j|jj|j|t|trdSWYdd}~XnX|jj|j|jj|j|jj}t|jds|jr|jjtjkrytj||jWnRtk r}z4|jj rjt j d|dd|jj|j|dSd}~XnX|jj||jj|jj|jdd|_d|_ |jj|j|j!d|_"|jj#|j$j%||jj#|j|jj r |jj&|}t j'd||d dS) Nz%r: SSL handshake failedT)rGZcheck_hostnamez1%r: SSL handshake failed on matching the hostname)peercertcipher compressionZ ssl_objectFz%r: SSL handshake took %.1f msg@@)(rZ do_handshakerSSLWantReadErrorrr?rrSSLWantWriteErrorrt BaseExceptionrr rr9rur6rrraZ getpeercertrrrZ verify_modeZ CERT_NONEZmatch_hostnamerrrr_read_wants_write_write_wants_readrrrrrrr)r!rr^rZdtrrrrVsb                z#_SelectorSslTransport._on_handshakecCsJ|jrtd|jrtdd|_|jj|j|jjrFtjd|dS)Nz#Cannot pause_reading() when closingzAlready pausedTz%r pauses reading) rr3rrr9rrr r)r!rrrrs z#_SelectorSslTransport.pause_readingcCsJ|jstdd|_|jrdS|jj|j|j|jjrFtj d|dS)Nz Not pausedFz%r resumes reading) rr3rrr?rrrr r)r!rrrrs z$_SelectorSslTransport.resume_readingcCs"|jr dS|jr6d|_|j|jr6|jj|j|jy|jj|j }Wnt t t j fk rdYnt jk rd|_|jj|j|jj|j|jYntk r}z|j|dWYdd}~XnTX|r|jj|n@z4|jjrtjd||jj}|rtjdWd|jXdS)NFTz!Fatal read error on SSL transportz%r received EOFz?returning true from eof_received() has no effect when using ssl)rrrrrrtrrrCrrErDrrrrr9rarrrrr rrrr6)r!rAr^rrrrrs4   z!_SelectorSslTransport._read_readycCs(|jr dS|jrszC_SelectorDatagramTransport.get_write_buffer_size..)sumr)r!rrrrsz0_SelectorDatagramTransport.get_write_buffer_sizecCs|jr dSy|jj|j\}}Wnpttfk r8Ynhtk rd}z|jj|WYdd}~Xn<t k r}z|j |dWYdd}~XnX|jj ||dS)Nz&Fatal read error on datagram transport) rrZrecvfromrrErDrIrerror_receivedrarZdatagram_received)r!rAr]r^rrrr sz&_SelectorDatagramTransport._read_readycCsTt|tttfs"tdt|j|s*dS|jrN|d|jfkrNtd|jf|j r|jr|j t j krpt j d|j d7_ dS|js4y&|jr|jj|n|jj||dSttfk r|jj|j|jYnZtk r}z|jj|dSd}~Xn.tk r2}z|j|ddSd}~XnX|jjt||f|jdS)Nz1data argument must be a bytes-like object, not %rz#Invalid address: must be None or %szsocket.send() raised exception.rz'Fatal write error on datagram transport)rrrrrrrrr{rrrr rrrrHsendtorErDrrtr _sendto_readyrIrrrarrr)r!rAr]r^rrrr.s<     z!_SelectorDatagramTransport.sendtocCsx|jr|jj\}}y&|jr,|jj|n|jj||Wqttfk rf|jj||fPYqt k r}z|j j |dSd}~Xqt k r}z|j |ddSd}~XqXqW|j|js|jj|j|jr|jddS)Nz'Fatal write error on datagram transport)rpopleftrrrHrrErD appendleftrIrrrarrrrurrr)r!rAr]r^rrrrUs* z(_SelectorDatagramTransport._sendto_ready)NNN)N) rrr collectionsdequerrrrrrrrr)rrr0 s  'r0) r__all__rrXrrSrrr ImportErrorrrrrrr r r Z coroutinesr logr rZ BaseEventLooprZ_FlowControlMixinZ Transportrr$r/r0rrrrsD             ii