1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
<?php
namespace Illuminate\Redis;
use Closure;
use Predis\Client;
use Illuminate\Support\Arr;
use Illuminate\Contracts\Redis\Database as DatabaseContract;
class Database implements DatabaseContract
{
/**
* The host address of the database.
*
* @var array
*/
protected $clients;
/**
* Create a new Redis connection instance.
*
* @param array $servers
* @return void
*/
public function __construct(array $servers = [])
{
$cluster = Arr::pull($servers, 'cluster');
$options = array_merge(['timeout' => 10.0], (array) Arr::pull($servers, 'options'));
if ($cluster) {
$this->clients = $this->createAggregateClient($servers, $options);
} else {
$this->clients = $this->createSingleClients($servers, $options);
}
}
/**
* Create a new aggregate client supporting sharding.
*
* @param array $servers
* @param array $options
* @return array
*/
protected function createAggregateClient(array $servers, array $options = [])
{
return ['default' => new Client(array_values($servers), $options)];
}
/**
* Create an array of single connection clients.
*
* @param array $servers
* @param array $options
* @return array
*/
protected function createSingleClients(array $servers, array $options = [])
{
$clients = [];
foreach ($servers as $key => $server) {
$clients[$key] = new Client($server, $options);
}
return $clients;
}
/**
* Get a specific Redis connection instance.
*
* @param string $name
* @return \Predis\ClientInterface|null
*/
public function connection($name = 'default')
{
return Arr::get($this->clients, $name ?: 'default');
}
/**
* Run a command against the Redis database.
*
* @param string $method
* @param array $parameters
* @return mixed
*/
public function command($method, array $parameters = [])
{
return call_user_func_array([$this->clients['default'], $method], $parameters);
}
/**
* Subscribe to a set of given channels for messages.
*
* @param array|string $channels
* @param \Closure $callback
* @param string $connection
* @param string $method
* @return void
*/
public function subscribe($channels, Closure $callback, $connection = null, $method = 'subscribe')
{
$loop = $this->connection($connection)->pubSubLoop();
call_user_func_array([$loop, $method], (array) $channels);
foreach ($loop as $message) {
if ($message->kind === 'message' || $message->kind === 'pmessage') {
call_user_func($callback, $message->payload, $message->channel);
}
}
unset($loop);
}
/**
* Subscribe to a set of given channels with wildcards.
*
* @param array|string $channels
* @param \Closure $callback
* @param string $connection
* @return void
*/
public function psubscribe($channels, Closure $callback, $connection = null)
{
return $this->subscribe($channels, $callback, $connection, __FUNCTION__);
}
/**
* Dynamically make a Redis command.
*
* @param string $method
* @param array $parameters
* @return mixed
*/
public function __call($method, $parameters)
{
return $this->command($method, $parameters);
}
}