Changeset 10

Show
Ignore:
Timestamp:
01/15/09 13:56:19 (20 months ago)
Author:
Denis Bilenko <denis@ag-projects.com>
Hashname:
20090115125619-b2873-fe426e55ab44d12f778abe0d8e0e4fd2a991dfd0
Message:

added Reception class and rudimentary room support

Location:
sipchatserver
Files:
2 modified

Legend:

Unmodified
Added
Removed
  • sipchatserver/chatroom.py

    r2 r10  
    4949class ChatRoom: 
    5050 
    51     def __init__(self, traffic_logger): 
     51    def __init__(self, room_id, reception, traffic_logger): 
     52        self.room_id = room_id 
     53        self.reception = reception 
    5254        self.traffic_logger = traffic_logger 
    5355        self.sessions = [] 
    5456        self.accept_incoming_job = None 
    5557        self.incoming_queue = queue() 
    56         self.message_dispatcher_job = proc.spawn_link(self._message_dispatcher) 
     58        self.message_dispatcher_job = proc.spawn_link_exception(self._message_dispatcher) 
    5759 
    5860    def _message_dispatcher(self): 
    5961        """Read from self.incoming_queue and dispatch the messages to other participants""" 
    60         while True: 
    61             session, message = self.incoming_queue.wait() 
    62             self._dispatch_message(session, message) 
     62        try: 
     63            while True: 
     64                session, message = self.incoming_queue.wait() 
     65                self._dispatch_message(session, message) 
     66        except proc.ProcExit, ex: 
     67            return ex 
    6368 
    6469    def _dispatch_message(self, session, message): 
     
    7176                    self.remove_session(session) 
    7277                except: 
     78                    self.remove_session(session) 
    7379                    import traceback 
    7480                    traceback.print_exc() 
     
    8187        self.sessions = [] 
    8288 
    83     def start_accept_incoming(self, *args, **kwargs): 
    84         assert not self.accept_incoming_job, self.accept_incoming_job 
    85         self.accept_incoming_job = proc.spawn(self._accept_incoming, *args, **kwargs) 
    86  
    87     def stop_accept_incoming(self): 
    88         if self.accept_incoming_job: 
    89             self.accept_incoming_job.kill() 
    90             self.accept_incoming_job = None 
    91  
    92     def add_session(self, session, msrp): 
    93         proc.spawn(self._forwarder, msrp, session) 
     89    def add_session(self, session): 
     90        proc.spawn(self._forwarder, session) 
    9491        self.sessions.append(session) 
    9592 
    9693    def remove_session(self, session): 
    9794        try: 
    98             #session.end() 
     95            session.end() 
    9996            self.sessions.remove(session) 
    10097        except ValueError: 
    10198            pass 
     99        if not self.sessions: 
     100            self.reception.remove_room(self) 
    102101 
    103     def _forwarder(self, msrp, session): 
     102    def _forwarder(self, session): 
    104103        while True: 
    105104            try: 
    106                 chunk = msrp.receive_chunk() 
     105                chunk = session.msrp.receive_chunk() 
    107106            except ConnectionClosed: 
    108107                self.remove_session(session) 
     
    114113        def new_session(sip, msrp): 
    115114            session = MSRPSession(sip, msrp) 
    116             self.add_session(session, msrp) 
     115            self.add_session(session) 
    117116        acceptor = MSRPAcceptFactory.new(relay, self.traffic_logger) 
    118117        handler1 = JoinHandler(acceptor, new_session) 
     
    125124                print ex 
    126125                sleep(1) 
     126 
     127 
     128class Reception: 
     129 
     130    def __init__(self, traffic_logger): 
     131        self.traffic_logger = traffic_logger 
     132        self.rooms = {} # maps room_id -> ChatRoom 
     133 
     134    def add_session(self, session): 
     135        room_id = '%s@%s' % (session.local_uri.user, session.local_uri.host) 
     136        room = self.rooms.get(room_id) 
     137        if room is None: 
     138            print 'Creating a room %s' % room_id 
     139            room = ChatRoom(room_id, self, self.traffic_logger) 
     140            self.rooms[room_id] = room 
     141        room.add_session(session) 
     142 
     143    def remove_room(self, room): 
     144        self.rooms.pop(room.room_id, None) 
     145 
     146    def accept_incoming_loop(self, e, relay=None, local_uri=None): 
     147        def new_session(sip, msrp): 
     148            session = MSRPSession(sip, msrp) 
     149            self.add_session(session) 
     150        acceptor = MSRPAcceptFactory.new(relay, self.traffic_logger) 
     151        handler1 = JoinHandler(acceptor, new_session) 
     152        handler = IncomingSessionHandler() 
     153        handler.add_handler(handler1) 
     154        while True: 
     155            try: 
     156                handler.wait_and_handle(e, local_uri) 
     157            except MSRPSessionErrors, ex: 
     158                print ex 
     159                sleep(1) 
     160            except proc.ProcExit: 
     161                return 
     162 
  • sipchatserver/server.py

    r3 r10  
    1010 
    1111import sipchatserver 
    12 from sipchatserver.chatroom import ChatRoom 
     12from sipchatserver.chatroom import Reception 
    1313from sipchatserver.tls import Certificate, PrivateKey 
    1414 
     
    5959        reactor.addSystemEventTrigger('before', 'shutdown', shutdown) 
    6060        logger = TrafficLogger.to_file(is_enabled_func = lambda: options.trace_msrp) 
    61         room = ChatRoom(logger) 
    62         room.start_accept_incoming(e, options.relay, server_uri) 
     61        reception = Reception(logger) 
     62        proc.spawn_link_exception(reception.accept_incoming_loop, e, options.relay, server_uri) 
    6363 
    6464    def run_reactor(self):