How to write dbus service for Linux (in Python)

D-Bus@wiki is a inter-process communication (IPC) system, allowing multiple, concurrently-running computer programs (processes) to communicate with one another.

It includes a libdbus, a dbus-daemon (a message-bus daemon executable, that multiple applications can connect to) and wrappers for each application frameworks.

Select bus type: either ‘SessionBus’ each user login has a session bus, used to communicate between desktop applications; or ‘SystemBus’ global and usually started during boot, used to communicate with udev, hald, networkmanager

Register bus name: string which identifies the application or the service which is provided by the application, e.g.”org.documentroot.Calculator”

Register bus objects: identified by an object path (like a unix path), e.g.: ‘/MainApplication’

Dbus query

# show all bus names: (session bus):
# show all objects offered by klauncher:
qdbus org.kde.klauncher
# show all methods/signals offered by the KLauncher object:
qdbus org.kde.klauncher /KLauncher

Dbus Service in Python, see dbus-python API

## install
$ sudo apt-get install python-dbus | sudo yum install dbus-python

## daemon
$ cat
#!/usr/bin/env python
import dbus
import dbus.service

class SomeObject(dbus.service.Object):
    def __init__(self):
        self.session_bus = dbus.SessionBus()
        name = dbus.service.BusName("com.example.SampleService", bus=self.session_bus)
        dbus.service.Object.__init__(self, name, '/SomeObject')
    @dbus.service.method("com.example.SampleInterface", in_signature='s', out_signature='as')
    def HelloWorld(self, hello_message):
        return ["Hello", "from", "with unique name", self.session_bus.get_unique_name()]
    @dbus.service.method("com.example.SampleInterface", in_signature='', out_signature='')
    def Exit(self):
if __name__ == '__main__':
    # using glib
    import dbus.mainloop.glib
    import gobject
    loop = gobject.MainLoop()
    object = SomeObject()

Using qt4/gk loop

$ cat
if __name__ == '__main__':
    # using qt4 loop    
    #import dbus.mainloop.qt
    #from PyQt4.QtCore import *
    #app = QCoreApplication([])
    #object = SomeObject()

    # using gtk loop    
    #import dbus.mainloop.glib
    #from gi.repository import Gtk
    #object = SomeObject()

Just run the script at startup (or login). Or send a dbus-send message to the service, and dbus will start it. It will be terminated as part of shutdown (or logout). Alternativelly Dbus-initiated start

$ cat /usr/share/dbus-1/services/sample.service
[D-BUS Service]


$ qdbus com.example.SampleService /SomeObject HelloWorld "hello from cli"
$ dbus-send --session --print-reply --dest="com.example.SampleService" /SomeObject com.example.SampleInterface.HelloWorld string:"hello from cli"

$ cat
#!/usr/bin/env python
import sys
from traceback import print_exc
import dbus

def main():
    bus = dbus.SessionBus()
    remote_object = bus.get_object("com.example.SampleService", "/SomeObject")
    print ' '.join(remote_object.HelloWorld("Hello from!", dbus_interface = "com.example.SampleInterface"))
    # ... or create an Interface wrapper for the remote object
    iface = dbus.Interface(remote_object, "com.example.SampleInterface")
    print iface.HelloWorld("Hello from!")
    # introspection is automatically supported
    print remote_object.Introspect(dbus_interface="org.freedesktop.DBus.Introspectable")
    if sys.argv[1:] == ['--exit-service']:

if __name__ == '__main__':

Queueing: If we try to register a bus name (via dbus.service.BusName) which is already occupied, the request is silently appended to a queue and waits for the bus name to become available.

'do_not_queue=True' to disable queuing
'replace_existing=True' to try to replace the bus name if it exists
'allow_replacement=True' to allow other process to replace the newly registered bus name

Making asynchronous calls: pass ‘reply_handler’ and ‘error_handler’

$ cat
#!/usr/bin/env python
import sys
import dbus
import dbus.mainloop.glib

def handle_hello_reply(r): print r
def handle_hello_error(e): print e
def make_calls():
    remote_object.HelloWorld("Hello from!", dbus_interface='com.example.SampleInterface', 
        reply_handler=handle_hello_reply, error_handler=handle_hello_error)
    return False
if __name__ == '__main__':
    bus = dbus.SessionBus()
    remote_object = bus.get_object("com.example.SampleService","/SomeObject")
    import gobject
    # delay call
    gobject.timeout_add(1000, make_calls)

Signals are one way messages. They carry input parameters, which are received by all objects which have registered for such a signal.

$ cat
#!/usr/bin/env python
import dbus
import dbus.service
import dbus.mainloop.glib

class TestObject(dbus.service.Object):
    def __init__(self, conn, object_path='/com/example/TestService/object'):
        dbus.service.Object.__init__(self, conn, object_path)
    def HelloSignal(self, message):
        # signal is emitted when this method exits
    def emitHelloSignal(self):
        # you emit signals by calling the signal's skeleton method
        return 'Signal emitted'
if __name__ == '__main__':
    session_bus = dbus.SessionBus()
    name = dbus.service.BusName('com.example.TestService', session_bus)
    object = TestObject(session_bus)
    import gobject

$ cat
#!/usr/bin/env python
import gobject
import sys
import traceback
import dbus
import dbus.mainloop.glib

def handle_reply(msg): print msg
def handle_error(e): print str(e)
def emit_signal():
   # call the emitHelloSignal method 
                          #reply_handler=handle_reply, error_handler=handle_error)
   # exit after waiting a short time for the signal
   gobject.timeout_add(2000, loop.quit)
   return False
def hello_signal_handler(hello_string):
    print ("Received signal (by connecting using remote object) and it says: " + hello_string)
def catchall_signal_handler(*args, **kwargs):
    print ("Caught signal (in catchall handler) ", kwargs['dbus_interface'] + "." + kwargs['member'])
def catchall_hello_signals_handler(hello_string):
    print "Received a hello signal and it says " + hello_string

if __name__ == '__main__':
    bus = dbus.SessionBus()
    object = bus.get_object("com.example.TestService","/com/example/TestService/object")
    object.connect_to_signal("HelloSignal", hello_signal_handler, dbus_interface="com.example.TestService", arg0="Hello")
    #lets make a catchall
    bus.add_signal_receiver(catchall_signal_handler, interface_keyword='dbus_interface', member_keyword='member')
    bus.add_signal_receiver(catchall_hello_signals_handler, dbus_interface="com.example.TestService", signal_name="HelloSignal")
    gobject.timeout_add(2000, emit_signal)

from dbus-python tutorial, Dbus Tutorial – Create a service and Interprocess Communication with D-Bus and Python


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s