Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions nydus/compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import sys

try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty # noqa


PY2 = sys.version_info[0] == 2
PY3 = sys.version_info[0] == 3


if PY3:
string_types = str,
else:
string_types = basestring, # noqa


try:
import httplib
except ImportError:
from http import client as httplib # noqa


try:
advance_iterator = next
except NameError:
def advance_iterator(it):
return it.next()
next = advance_iterator


try:
# Python 2
from itertools import izip
except ImportError:
# Python 3
izip = zip


if PY3:
def iteritems(d, **kw):
return iter(d.items(**kw))

def itervalues(d, **kw):
return iter(d.values(**kw))

def iterkeys(d, **kw):
return iter(d.keys(**kw))
else:
def iteritems(d, **kw):
return iter(d.iteritems(**kw))

def iterkeys(d, **kw):
return iter(d.iterkeys(**kw))

def itervalues(d, **kw):
return iter(d.itervalues(**kw))


try:
xrange = xrange
except NameError:
xrange = range


def python_2_unicode_compatible(klass):
"""
A decorator that defines __unicode__ and __str__ methods under Python 2.
Under Python 3 it does nothing.
To support Python 2 and 3 with a single code base, define a __str__ method
returning text and apply this decorator to the class.
"""
if PY2:
klass.__unicode__ = klass.__str__
klass.__str__ = lambda self: self.__unicode__().encode('utf-8')
return klass
4 changes: 3 additions & 1 deletion nydus/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

import warnings

from nydus.compat import iteritems

CONNECTIONS = {}


def configure(kwargs):
for k, v in kwargs.iteritems():
for k, v in iteritems(kwargs):
if k.upper() != k:
warnings.warn('Invalid setting, \'%s\' which is not defined by Nydus' % k)
elif k not in globals():
Expand Down
58 changes: 37 additions & 21 deletions nydus/contrib/ketama.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@
Rewrited from the original source: http://www.audioscrobbler.net/development/ketama/

"""
from __future__ import print_function, unicode_literals

import hashlib
import math
from bisect import bisect

from nydus.compat import xrange, PY3


__author__ = "Andrey Nikishaev"
__email__ = "creotiv@gmail.com"
__version__ = 0.1
__status__ = "productrion"

__all__ = ['Ketama']

import hashlib
import math
from bisect import bisect


class Ketama(object):

Expand Down Expand Up @@ -49,10 +54,10 @@ def _build_circle(self):
b_key = self._md5_digest('%s-%s-salt' % (node, i))

for l in xrange(0, 4):
key = ((b_key[3 + l * 4] << 24)
| (b_key[2 + l * 4] << 16)
| (b_key[1 + l * 4] << 8)
| b_key[l * 4])
key = ((b_key[3 + l * 4] << 24) |
(b_key[2 + l * 4] << 16) |
(b_key[1 + l * 4] << 8) |
b_key[l * 4])

self._hashring[key] = node
self._sorted_keys.append(key)
Expand Down Expand Up @@ -84,13 +89,24 @@ def _gen_key(self, key):
return self._hashi(b_key, lambda x: x)

def _hashi(self, b_key, fn):
return ((b_key[fn(3)] << 24)
| (b_key[fn(2)] << 16)
| (b_key[fn(1)] << 8)
| b_key[fn(0)])
return ((b_key[fn(3)] << 24) |
(b_key[fn(2)] << 16) |
(b_key[fn(1)] << 8) |
b_key[fn(0)])

def _md5_digest(self, key):
return map(ord, hashlib.md5(key).digest())
if PY3:
key = key.encode('utf-8')

m = hashlib.md5()
m.update(key)

digest = m.digest()

if PY3:
digest = digest.decode('latin-1')

return list(map(ord, digest))

def remove_node(self, node):
"""
Expand Down Expand Up @@ -134,14 +150,14 @@ def test(k):
tower = k.get_node('a' + str(i))
data.setdefault(tower, 0)
data[tower] += 1
print 'Number of caches on each node: '
print data
print ''

print k.get_node('Aplple')
print k.get_node('Hello')
print k.get_node('Data')
print k.get_node('Computer')
print('Number of caches on each node: ')
print(data)
print('')

print(k.get_node('Aplple'))
print(k.get_node('Hello'))
print(k.get_node('Data'))
print(k.get_node('Computer'))

NODES = [
'192.168.0.1:6000', '192.168.0.1:6001', '192.168.0.1:6002',
Expand Down
15 changes: 8 additions & 7 deletions nydus/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@
:copyright: (c) 2011-2012 DISQUS.
:license: Apache License 2.0, see LICENSE for more details.
"""

__all__ = ('create_cluster', 'connections', 'Cluster')

import copy

from nydus import conf
from nydus.compat import string_types
from nydus.db.base import LazyConnectionHandler
from nydus.db.routers.base import BaseRouter
from nydus.utils import import_string, apply_defaults
from nydus.utils import import_string


__all__ = ('create_cluster', 'connections', 'Cluster')


def create_cluster(settings):
Expand All @@ -49,7 +50,7 @@ def create_cluster(settings):
# Pull in our client
settings = copy.deepcopy(settings)
backend = settings.pop('engine', settings.pop('backend', None))
if isinstance(backend, basestring):
if isinstance(backend, string_types):
Conn = import_string(backend)
elif backend:
Conn = backend
Expand All @@ -60,7 +61,7 @@ def create_cluster(settings):
cluster = settings.pop('cluster', None)
if not cluster:
Cluster = Conn.get_cluster()
elif isinstance(cluster, basestring):
elif isinstance(cluster, string_types):
Cluster = import_string(cluster)
else:
Cluster = cluster
Expand All @@ -69,7 +70,7 @@ def create_cluster(settings):
router = settings.pop('router', None)
if not router:
Router = BaseRouter
elif isinstance(router, basestring):
elif isinstance(router, string_types):
Router = import_string(router)
else:
Router = router
Expand Down
2 changes: 1 addition & 1 deletion nydus/db/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@
:license: Apache License 2.0, see LICENSE for more details.
"""

from .base import BaseConnection, BasePipeline
from .base import BaseConnection, BasePipeline # noqa
4 changes: 2 additions & 2 deletions nydus/db/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
:copyright: (c) 2011-2012 DISQUS.
:license: Apache License 2.0, see LICENSE for more details.
"""
from nydus.db.base import BaseCluster

__all__ = ('BaseConnection',)

from nydus.db.base import BaseCluster
__all__ = ('BaseConnection',)


class BasePipeline(object):
Expand Down
4 changes: 2 additions & 2 deletions nydus/db/backends/memcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@

import pylibmc

from itertools import izip
from nydus.db.backends import BaseConnection, BasePipeline
from nydus.db.promise import EventualCommand
from nydus.utils import peek
from nydus.compat import izip


class Memcache(BaseConnection):
Expand All @@ -22,7 +22,7 @@ class Memcache(BaseConnection):
supports_pipelines = True

def __init__(self, num, host='localhost', port=11211, binary=True,
behaviors=None, **options):
behaviors=None, **options):
self.host = host
self.port = port
self.binary = binary
Expand Down
2 changes: 1 addition & 1 deletion nydus/db/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@

from __future__ import absolute_import

from itertools import izip
from redis import Redis as RedisClient, StrictRedis
from redis import ConnectionError, InvalidResponse

from nydus.db.backends import BaseConnection, BasePipeline
from nydus.compat import izip


class RedisPipeline(BasePipeline):
Expand Down
4 changes: 2 additions & 2 deletions nydus/db/backends/riak.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
from __future__ import absolute_import

import socket
import httplib

from riak import RiakClient, RiakError

from nydus.db.backends import BaseConnection
from nydus.compat import httplib


class Riak(BaseConnection):
Expand All @@ -22,7 +22,7 @@ class Riak(BaseConnection):
supports_pipelines = False

def __init__(self, num, host='127.0.0.1', port=8098, prefix='riak', mapred_prefix='mapred', client_id=None,
transport_class=None, solr_transport_class=None, transport_options=None, **options):
transport_class=None, solr_transport_class=None, transport_options=None, **options):

self.host = host
self.port = port
Expand Down
17 changes: 10 additions & 7 deletions nydus/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@
:license: Apache License 2.0, see LICENSE for more details.
"""

__all__ = ('LazyConnectionHandler', 'BaseCluster')

import collections

from nydus.db.map import DistributedContextManager
from nydus.db.routers import BaseRouter, routing_params
from nydus.utils import apply_defaults
from nydus.compat import iteritems, iterkeys, xrange, itervalues


__all__ = ('LazyConnectionHandler', 'BaseCluster')


def iter_hosts(hosts):
# this can either be a dictionary (with the key acting as the numeric
# index) or it can be a sorted list.
if isinstance(hosts, collections.Mapping):
return hosts.iteritems()
return iteritems(hosts)
return enumerate(hosts)


Expand Down Expand Up @@ -59,7 +62,7 @@ def __getattr__(self, name):
return CallProxy(self, name)

def __iter__(self):
for name in self.hosts.iterkeys():
for name in iterkeys(self.hosts):
yield name

def install_router(self, router):
Expand All @@ -76,7 +79,7 @@ def execute(self, path, args, kwargs):
func = getattr(func, piece)
try:
results.append(func(*args, **kwargs))
except tuple(conn.retryable_exceptions), e:
except tuple(conn.retryable_exceptions) as e:
if not self.router.retryable:
raise e
elif retry == self.max_connection_retries - 1:
Expand All @@ -94,7 +97,7 @@ def execute(self, path, args, kwargs):

def disconnect(self):
"""Disconnects all connections in cluster"""
for connection in self.hosts.itervalues():
for connection in itervalues(self.hosts):
connection.disconnect()

def get_conn(self, *args, **kwargs):
Expand Down Expand Up @@ -155,7 +158,7 @@ def is_ready(self):
def reload(self):
from nydus.db import create_cluster

for conn_alias, conn_settings in self.conf_callback().iteritems():
for conn_alias, conn_settings in iteritems(self.conf_callback()):
self[conn_alias] = create_cluster(conn_settings)
self._is_ready = True

Expand Down
Loading