0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
TestSource.cc
Go to the documentation of this file.
1 
22 #include "Common/Compat.h"
23 #include "Common/String.h"
24 #include <cerrno>
25 #include <cstring>
26 
27 #include <boost/algorithm/string.hpp>
28 #include <boost/shared_array.hpp>
29 
30 #include "Common/DynamicBuffer.h"
31 
32 #include "Key.h"
33 
34 #include "TestSource.h"
35 
36 using namespace std;
37 using namespace Hypertable;
38 
39 bool TestSource::next(ByteString &key, ByteString &value) {
40  string line;
41  boost::shared_array<char> line_ptr;
42  char *base, *ptr, *last;
43  char *rowkey;
44  char *column;
45  char *value_str;
46  int64_t timestamp;
47  static char emptybuf[] = { 0 };
48 
49  while (getline(m_fin, line)) {
50  m_cur_line++;
51 
52  boost::trim(line);
53 
54  line_ptr.reset(new char [strlen(line.c_str()) + 1]);
55  base = line_ptr.get();
56  strcpy(base, line.c_str());
57 
58  if ((ptr = strtok_r(base, "\t", &last)) == 0) {
59  cerr << "Mal-formed input on line " << (m_cur_line-1) << endl;
60  continue;
61  }
62 
63  if (!strcasecmp(ptr, "AUTO")) {
64  timestamp = AUTO_ASSIGN;
65  }
66  else {
67  timestamp = strtoll(ptr, 0, 0);
68  if (timestamp == 0 && errno == EINVAL) {
69  cerr << "Invalid timestamp (" << ptr << ") on line " << (m_cur_line-1)
70  << endl;
71  continue;
72  }
73  if (m_min_timestamp == 0 || timestamp < m_min_timestamp)
74  m_min_timestamp = timestamp;
75  }
76 
77  if ((rowkey = strtok_r(0, "\t", &last)) == 0) {
78  cerr << "Mal-formed input on line " << (m_cur_line-1) << endl;
79  continue;
80  }
81 
85  size_t row_key_len = strlen(rowkey);
86  if (row_key_len >= 2) {
87  if (!strcmp(&rowkey[row_key_len-2], "??")) {
88  rowkey[row_key_len-1] = (char)0xff;
89  rowkey[row_key_len-2] = (char)0xff;
90  }
91  }
92 
93  if ((column = strtok_r(0, "\t", &last)) == 0) {
94  cerr << "Mal-formed input on line " << (m_cur_line-1) << endl;
95  continue;
96  }
97 
98  if (!strcmp(column, "DELETE")) {
99  if (!create_row_delete(rowkey, timestamp, key, value)) {
100  cerr << "Mal-formed input on line " << (m_cur_line-1) << endl;
101  continue;
102  }
103  return true;
104  }
105 
106  if ((value_str = strtok_r(0, "\t", &last)) == 0)
107  value_str = emptybuf;
108 
109  if (!strcmp(value_str, "DELETE")) {
110  if (!create_column_delete(rowkey, column, timestamp, key, value)) {
111  cerr << "Mal-formed input on line " << (m_cur_line-1) << endl;
112  continue;
113  }
114  return true;
115  }
116 
117  row_key_len = strlen(value_str);
118  if (row_key_len >= 2) {
119  if (!strcmp(&value_str[row_key_len-2], "??")) {
120  value_str[row_key_len-1] = (char)0xff;
121  value_str[row_key_len-2] = (char)0xff;
122  cerr << "converting end of value (" << value_str << ")" << endl;
123  }
124  }
125 
126  if (!create_insert(rowkey, column, timestamp, value_str, key, value)) {
127  cerr << "Mal-formed input on line " << (m_cur_line-1) << endl;
128  continue;
129  }
130 
131  return true;
132  }
133 
134  return false;
135 }
136 
137 
138 bool
139 TestSource::create_row_delete(const char *row, int64_t timestamp,
140  ByteString &key, ByteString &value) {
141  int32_t keylen = strlen(row) + 13;
142  uint8_t control = 0;
143 
144  if (timestamp == AUTO_ASSIGN)
145  control = Key::AUTO_TIMESTAMP;
146  else if (timestamp)
147  control = Key::HAVE_TIMESTAMP;
148 
149  m_key_buffer.clear();
150  m_key_buffer.ensure(keylen+6);
151 
152  Serialization::encode_vi32(&m_key_buffer.ptr, keylen);
153  *m_key_buffer.ptr++ = control;
154  m_key_buffer.add_unchecked(row, strlen(row)+1);
155  *m_key_buffer.ptr++ = 0;
156  *m_key_buffer.ptr++ = 0;
157  *m_key_buffer.ptr++ = FLAG_DELETE_ROW;
158  Key::encode_ts64(&m_key_buffer.ptr, timestamp);
159 
160  key.ptr = m_key_buffer.base;
161 
162  m_value_buffer.clear();
163  append_as_byte_string(m_value_buffer, 0, 0);
164  value.ptr = m_value_buffer.base;
165  return true;
166 }
167 
168 
169 bool
170 TestSource::create_column_delete(const char *row, const char *column,
171  int64_t timestamp, ByteString &key, ByteString &value) {
172  int32_t keylen = 0;
173  string cfstr;
174  const char *qualifier = "";
175  const char *ptr = strchr(column, ':');
176  uint8_t control = 0;
177  bool col_family_delete = false;
178 
179  if (timestamp == AUTO_ASSIGN)
180  control = Key::AUTO_TIMESTAMP;
181  else if (timestamp)
182  control = Key::HAVE_TIMESTAMP;
183 
184  if (ptr == 0) { // column family delete
185  col_family_delete = true;
186  cfstr = string(column);
187  }
188  else {
189  cfstr = string(column, ptr-column);
190  qualifier = ptr+1;
191  }
192 
193  ColumnFamilySpec *cf_spec = m_schema->get_column_family(cfstr);
194  if (cf_spec == 0) {
195  cerr << "Column family '" << cfstr << "' not found in schema" << endl;
196  return false;
197  }
198 
199  if(col_family_delete) {
200  m_key_buffer.clear();
201  keylen = strlen(row) + 13;
202  m_key_buffer.ensure(keylen+6);
203 
204  Serialization::encode_vi32(&m_key_buffer.ptr, keylen);
205  *m_key_buffer.ptr++ = control;
206  m_key_buffer.add_unchecked(row, strlen(row)+1);
207  *m_key_buffer.ptr++ = cf_spec->get_id();
208  *m_key_buffer.ptr++ = 0;
209  *m_key_buffer.ptr++ = FLAG_DELETE_COLUMN_FAMILY;
210  Key::encode_ts64(&m_key_buffer.ptr, timestamp);
211 
212  key.ptr = m_key_buffer.base;
213 
214  m_value_buffer.clear();
215  append_as_byte_string(m_value_buffer, 0, 0);
216  value.ptr = m_value_buffer.base;
217  }
218  else {
219  m_key_buffer.clear();
220  keylen = strlen(row) + strlen(qualifier) + 13;
221  m_key_buffer.ensure(keylen+6);
222 
223  Serialization::encode_vi32(&m_key_buffer.ptr, keylen);
224  *m_key_buffer.ptr++ = control;
225  m_key_buffer.add_unchecked(row, strlen(row)+1);
226  *m_key_buffer.ptr++ = cf_spec->get_id();
227  m_key_buffer.add_unchecked(qualifier, strlen(qualifier)+1);
228  *m_key_buffer.ptr++ = FLAG_DELETE_CELL;
229  Key::encode_ts64(&m_key_buffer.ptr, timestamp);
230 
231  key.ptr = m_key_buffer.base;
232 
233  m_value_buffer.clear();
234  append_as_byte_string(m_value_buffer, 0, 0);
235  value.ptr = m_value_buffer.base;
236  }
237 
238  return true;
239 }
240 
241 
242 bool
243 TestSource::create_insert(const char *row, const char *column,
244  int64_t timestamp, const char *value_str, ByteString &key,
245  ByteString &value) {
246  int32_t keylen = 0;
247  string cfstr;
248  const char *qualifier;
249  const char *ptr = strchr(column, ':');
250  uint8_t control = 0;
251 
252  if (timestamp == AUTO_ASSIGN)
253  control = Key::AUTO_TIMESTAMP;
254  else if (timestamp)
255  control = Key::HAVE_TIMESTAMP;
256 
257  if (ptr == 0) {
258  cerr << "Bad column family specifier (no family)" << endl;
259  return false;
260  }
261 
262  cfstr = string(column, ptr-column);
263  qualifier = ptr+1;
264 
265  ColumnFamilySpec *cf_spec = m_schema->get_column_family(cfstr);
266  if (cf_spec == 0) {
267  cerr << "Column family '" << cfstr << "' not found in schema" << endl;
268  return false;
269  }
270 
271  m_key_buffer.clear();
272  keylen = strlen(row) + strlen(qualifier) + 13;
273  m_key_buffer.ensure(keylen+6);
274 
275  Serialization::encode_vi32(&m_key_buffer.ptr, keylen);
276  *m_key_buffer.ptr++ = control;
277  m_key_buffer.add_unchecked(row, strlen(row)+1);
278  *m_key_buffer.ptr++ = cf_spec->get_id();
279  m_key_buffer.add_unchecked(qualifier, strlen(qualifier)+1);
280  *m_key_buffer.ptr++ = FLAG_INSERT;
281  Key::encode_ts64(&m_key_buffer.ptr, timestamp);
282 
283  key.ptr = m_key_buffer.base;
284 
285  m_value_buffer.clear();
286  append_as_byte_string(m_value_buffer, value_str, strlen(value_str));
287  value.ptr = m_value_buffer.base;
288  return true;
289 }
290 
static const uint32_t FLAG_DELETE_ROW
Definition: KeySpec.h:40
static const uint32_t FLAG_INSERT
Definition: KeySpec.h:47
Column family specification.
static const uint32_t FLAG_DELETE_CELL
Definition: KeySpec.h:42
STL namespace.
static const uint32_t FLAG_DELETE_COLUMN_FAMILY
Definition: KeySpec.h:41
void append_as_byte_string(DynamicBuffer &dst_buf, const void *value, uint32_t value_len)
Serializes and appends a byte array to a DynamicBuffer object.
Definition: ByteString.h:130
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
A dynamic, resizable memory buffer.
Compatibility Macros for C/C++.
int32_t get_id() const
Gets column ID.
const uint8_t * ptr
The pointer to the serialized data.
Definition: ByteString.h:121
Hypertable definitions
void encode_vi32(uint8_t **bufp, uint32_t val)
Encode a integer (up to 32-bit) in variable length encoding.
A String class based on std::string.
static const int64_t AUTO_ASSIGN
Definition: KeySpec.h:38